aboutsummaryrefslogtreecommitdiffstats
path: root/src/py/imc/proxy.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/py/imc/proxy.py')
-rwxr-xr-xsrc/py/imc/proxy.py162
1 files changed, 107 insertions, 55 deletions
diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py
index 0887b7d..87f57f0 100755
--- a/src/py/imc/proxy.py
+++ b/src/py/imc/proxy.py
@@ -9,6 +9,7 @@ from imc import nonblock
class Connection:
def __init__(self,linkid):
self.linkid = linkid
+ self.link_linkidmap = {}
self._close_callback = []
def send_msg(self,data):
@@ -25,9 +26,11 @@ class Connection:
callback(self)
class Proxy:
- def __init__(self,linkid):
+ def __init__(self,linkid,connect_linkid = None,center_conn = None):
self._ioloop = tornado.ioloop.IOLoop.instance()
self._linkid = linkid
+ self._connect_linkid = connect_linkid
+ self._center_conn = center_conn
self._conn_linkidmap = {}
self._caller_retidmap = {self._linkid:{}}
@@ -36,7 +39,6 @@ class Proxy:
self.MSGTYPE_CALL = 'call'
self.MSGTYPE_RET = 'ret'
- self.RETTYPE_GENID = 'genid'
self._check_waitcaller_timer = tornado.ioloop.PeriodicCallback(self._check_waitcaller,1000)
self._check_waitcaller_timer.start()
@@ -44,40 +46,73 @@ class Proxy:
Proxy.instance = self
def add_conn(self,conn):
+ assert conn.linkid not in self._conn_linkidmap
+
self._conn_linkidmap[conn.linkid] = conn
self._caller_retidmap[conn.linkid] = {}
conn.add_close_callback(self._conn_close_cb)
conn.start_recvloop(self._recvloop_dispatch)
+ def link_conn(self,linkid,conn):
+ assert conn.linkid in self._conn_linkidmap
+
+ conn.link_linkidmap[linkid] = True
+ self._conn_linkidmap[linkid] = conn
+
+ def unlink_conn(self,linkid):
+ assert linkid in self._conn_linkidmap
+
+ conn = self._conn_linkidmap.pop(linkid)
+ del conn.link_linkidmap[linkid]
+
def del_conn(self,conn):
wait_map = self._caller_retidmap[conn.linkid]
wait_retids = wait_map.keys()
for retid in wait_retids:
wait_map[retid]['fail_callback'](retid,'Eclose')
+ linkids = conn.link_linkidmap.keys()
+ link_del = []
+ for linkid in linkids:
+ link_del.append(linkid)
+
+ for linkid in link_del:
+ self.unlink_conn(linkid)
+
del self._conn_linkidmap[conn.linkid]
del self._caller_retidmap[conn.linkid]
def get_conn(self,linkid):
- if linkid not in self._conn_linkidmap:
+ try:
+ return self._conn_linkidmap[linkid]
+
+ except KeyError:
return None
- return self._conn_linkidmap[linkid]
+ def request_conn(self,linkid,callback,*args):
+ def _connect_cb(conn):
+ if conn != None and conn.linkid != linkid:
+ self.link_conn(linkid,conn)
- def request_conn(self,linkid,callback):
- self._route_call()
+ callback(conn,*args)
+
+ if linkid in self._conn_linkidmap:
+ callback(self._conn_linkidmap[linkid],*args)
+
+ else:
+ self._connect_linkid(linkid,_connect_cb)
def register_call(self,path,func_name,func):
- self._call_pathmap[''.join([path,'/',func_name])] = func
+ self._call_pathmap[''.join([path,func_name])] = func
def call(self,caller_genid,timeout,iden,dst,func_name,param):
- caller_retid = ''.join([self._linkid,'/',self.RETTYPE_GENID,'_',caller_genid])
+ caller_retid = ''.join([self._linkid,'/',caller_genid])
self._route_call(caller_retid,timeout,iden,dst,func_name,param)
def _route_call(self,caller_retid,timeout,iden,dst,func_name,param):
- def __add_wait_caller(callee_linkid,caller_retid,timeout,fail_callback):
- self._caller_retidmap[callee_linkid][caller_retid] = {
+ def __add_wait_caller(conn_linkid,caller_retid,timeout,fail_callback):
+ self._caller_retidmap[conn_linkid][caller_retid] = {
'timeout':timeout,
'fail_callback':tornado.stack_context.wrap(fail_callback)
}
@@ -88,22 +123,39 @@ class Proxy:
'caller_retid':caller_retid,
}
+ def __local_send_remote(conn,caller_linkid,caller_retid,timeout,iden,dst,func_name,param):
+ if conn != None:
+ __add_wait_caller(conn.linkid,caller_retid,timeout,__local_fail_cb)
+ self._send_msg_call(conn,caller_retid,timeout,iden,dst,func_name,param)
+ else:
+ __local_fail_cb(caller_retid,'Enoexist')
+
+ def __remote_send_remote(conn,caller_linkid,caller_retid,timeout,iden,dst,func_name,param):
+ if conn != None:
+ self._send_msg_call(conn,caller_retid,timeout,iden,dst,func_name,param)
+ else:
+ __remote_fail_cb(caller_retid,'Enoexist')
+
+ def __send_ret(conn,caller_linkid,caller_retid,result):
+ if conn != None:
+ self._send_msg_ret(conn,caller_linkid,caller_retid,result)
+
def __local_fail_cb(retid,err):
self._ret_call(self._linkid,retid,(False,err))
def __remote_fail_cb(retid,err):
print('Opps')
- dst_part = dst.split('/')[1:]
- linkid = dst_part[1]
- path = ''.join(dst_part[2:])
+ dst_part = dst.split('/',3)
+ linkid = dst_part[2]
+ path = dst_part[3]
- caller_linkid = iden
+ caller_linkid = iden['linkid']
assert caller_retid.split('/',1)[0] == caller_linkid
if linkid == self._linkid:
try:
- stat,data = self._call_pathmap[''.join([path,'/',func_name])](param)
+ stat,data = self._call_pathmap[''.join([path,func_name])](iden,param)
except KeyError:
raise
@@ -112,58 +164,46 @@ class Proxy:
if caller_linkid == self._linkid:
self._ioloop.add_callback(self._ret_call,caller_linkid,caller_retid,(True,data))
else:
- caller_conn = self.get_conn(caller_linkid)
- if caller_conn == None:
- pass
-
- self._send_msg_ret(caller_conn,caller_linkid,caller_retid,(True,data))
+ self.request_conn(caller_linkid,__send_ret,caller_linkid,caller_retid,(True,data))
else:
if caller_linkid == self._linkid:
- __add_wait_caller(linkid,caller_retid,timeout,__local_fail_cb)
+ __add_wait_caller(self._linkid,caller_retid,timeout,__local_fail_cb)
else:
- __add_wait_caller(linkid,caller_retid,timeout,__remote_fail_cb)
+ __add_wait_caller(self._linkid,caller_retid,timeout,__remote_fail_cb)
- __add_wait_retcall(''.join([self._linkid,'/',self.RETTYPE_GENID,'_',data]),caller_linkid,caller_retid)
+ __add_wait_retcall(''.join([self._linkid,'/',data]),caller_linkid,caller_retid)
else:
- callee_conn = self.get_conn(linkid)
- if callee_conn == None:
- pass
-
if caller_linkid == self._linkid:
- __add_wait_caller(callee_conn.linkid,caller_retid,timeout,__local_fail_cb)
- self._send_msg_call(callee_conn,caller_retid,timeout,iden,dst,func_name,param)
+ self.request_conn(linkid,__local_send_remote,caller_linkid,caller_retid,timeout,iden,dst,func_name,param)
else:
- pass
+ self.request_conn(linkid,__remote_send_remote,caller_linkid,caller_retid,timeout,iden,dst,func_name,param)
+
+ def _ret_call(self,caller_linkid,caller_retid,result):
+ def __send_ret(conn,caller_linkid,caller_retid,result):
+ if conn != None:
+ self._send_msg_ret(conn,caller_linkid,caller_retid,result)
- def _ret_call(self,caller_linkid,caller_retid,retvalue):
if caller_linkid == self._linkid:
- part = caller_retid.split('/',1)[1].split('_')
- ret_type = part[0]
- retid = part[1]
+ retid = caller_retid.split('/',1)[1]
- if ret_type == self.RETTYPE_GENID:
- stat,data = nonblock.retcall(retid,retvalue)
- if stat == True:
- try:
- ret = self._retcall_retidmap.pop(caller_retid)
- linkid = ret['caller_linkid']
- retid = ret['caller_retid']
+ stat,data = nonblock.retcall(retid,result)
+ if stat == True:
+ try:
+ ret = self._retcall_retidmap.pop(caller_retid)
+ linkid = ret['caller_linkid']
+ retid = ret['caller_retid']
- del self._caller_retidmap[caller_linkid][retid]
- self._ret_call(linkid,retid,data)
+ del self._caller_retidmap[caller_linkid][retid]
+ self._ret_call(linkid,retid,data)
- except KeyError:
- pass
+ except KeyError:
+ pass
else:
- caller_conn = self.get_conn(caller_linkid)
- if caller_conn == None:
- pass
-
- self._send_msg_ret(caller_conn,caller_linkid,caller_retid,retvalue)
+ self.request_conn(caller_linkid,__send_ret,caller_linkid,caller_retid,result)
def _recvloop_dispatch(self,conn,data):
msg = json.loads(data.decode('utf-8'))
@@ -215,33 +255,45 @@ class Proxy:
self._route_call(caller_retid,timeout,iden,dst,func_name,param)
- def _send_msg_ret(self,conn,caller_linkid,caller_retid,retvalue):
+ def _send_msg_ret(self,conn,caller_linkid,caller_retid,result):
+ stat,data = result
msg = {
'type':self.MSGTYPE_RET,
'caller_linkid':caller_linkid,
'caller_retid':caller_retid,
- 'retvalue':retvalue
+ 'result':{'stat':stat,'data':data}
}
conn.send_msg(bytes(json.dumps(msg),'utf-8'))
def _recv_msg_ret(self,conn,msg):
caller_linkid = msg['caller_linkid']
caller_retid = msg['caller_retid']
- retvalue = msg['retvalue']
+ data = msg['result']
+ result = (data['stat'],data['data'])
if caller_linkid == self._linkid:
try:
del self._caller_retidmap[conn.linkid][caller_retid]
- self._ret_call(caller_linkid,caller_retid,retvalue)
+ self._ret_call(caller_linkid,caller_retid,result)
except KeyError:
pass
+
else:
- pass
+ self._ret_call(caller_linkid,caller_retid,result)
@nonblock.call
def imc_call(iden,dst,func_name,param,_genid):
Proxy.instance.call(_genid,5000,iden,dst,func_name,param)
+def imc_call_async(iden,dst,func_name,param,callback = None):
+ @nonblock.func
+ def func():
+ result = (yield imc_call(iden,dst,func_name,param))
+ if callback != None:
+ callback(result)
+
+ func()
+
def imc_register_call(path,func_name,func):
Proxy.instance.register_call(path,func_name,func)