diff options
author | pzread <netfirewall@gmail.com> | 2013-05-10 01:05:10 +0800 |
---|---|---|
committer | pzread <netfirewall@gmail.com> | 2013-05-10 01:05:10 +0800 |
commit | 4b2c34236ee67a44573a538497ecc166f0c65897 (patch) | |
tree | c6e16492326f22146b5cdb18640841310b288f2d /src | |
parent | d6bb4b221306f3b549f6fb41e85a1093ca659132 (diff) | |
download | taiwan-online-judge-4b2c34236ee67a44573a538497ecc166f0c65897.tar taiwan-online-judge-4b2c34236ee67a44573a538497ecc166f0c65897.tar.gz taiwan-online-judge-4b2c34236ee67a44573a538497ecc166f0c65897.tar.bz2 taiwan-online-judge-4b2c34236ee67a44573a538497ecc166f0c65897.tar.lz taiwan-online-judge-4b2c34236ee67a44573a538497ecc166f0c65897.tar.xz taiwan-online-judge-4b2c34236ee67a44573a538497ecc166f0c65897.tar.zst taiwan-online-judge-4b2c34236ee67a44573a538497ecc166f0c65897.zip |
Done local <-> local and local <-> remote IMC call
Diffstat (limited to 'src')
-rw-r--r-- | src/py/backend_server.py | 2 | ||||
-rw-r--r-- | src/py/center_server.py | 10 | ||||
-rw-r--r-- | src/py/imc/nonblock.py | 5 | ||||
-rwxr-xr-x | src/py/imc/proxy.py | 161 |
4 files changed, 99 insertions, 79 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py index 6081a6a..e32bc12 100644 --- a/src/py/backend_server.py +++ b/src/py/backend_server.py @@ -56,7 +56,7 @@ class BackendWorker(): @imc.nonblock.func def _test_call(self,param): - ret = (yield imc_call(None,'/backend/' + self.center_conn.linkid,'test_dst','Hello')) + ret = (yield imc_call(self.linkid,'/backend/' + self.center_conn.linkid,'test_dst','Hello')) print(ret) @imc.nonblock.func diff --git a/src/py/center_server.py b/src/py/center_server.py index 0fecb23..81512b5 100644 --- a/src/py/center_server.py +++ b/src/py/center_server.py @@ -11,7 +11,7 @@ import tornado.web import netio import imc.nonblock -from imc.proxy import Proxy,Connection,imc_register_call +from imc.proxy import Proxy,Connection,imc_register_call,imc_call class Worker: def __init__(self,stream,linkclass,linkid,worker_ip): @@ -66,6 +66,7 @@ class CenterServer(tornado.tcpserver.TCPServer): print('/center/' + self.linkid) imc_register_call('','test_dst',self._test_dst) + imc_register_call('','test_dstb',self._test_dstb) def handle_stream(self,stream,address): def _recv_worker_info(data): @@ -104,7 +105,12 @@ class CenterServer(tornado.tcpserver.TCPServer): @imc.nonblock.func def _test_dst(self,param): - return 'Hello Too' + stat,ret = (yield imc_call(self.linkid,'/center/' + self.linkid,'test_dstb','Hello X')) + return ret + ' Too' + + @imc.nonblock.func + def _test_dstb(self,param): + return param + ' World' class WebConnHandler(tornado.web.RequestHandler): def get(self): diff --git a/src/py/imc/nonblock.py b/src/py/imc/nonblock.py index c2741c0..9419395 100644 --- a/src/py/imc/nonblock.py +++ b/src/py/imc/nonblock.py @@ -45,6 +45,7 @@ def retcall(genid,value): gen.send(value) return (False,gen_current_id) - except StopIteration as ret: + + except StopIteration as err: del gen_waitmap[gen_current_id] - return (True,ret) + return (True,err.value) diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py index e10a843..a428ac3 100755 --- a/src/py/imc/proxy.py +++ b/src/py/imc/proxy.py @@ -52,7 +52,7 @@ class Proxy: wait_map = self._caller_genidmap[conn.linkid] wait_genids = wait_map.keys() for genid in wait_genids: - wait_map[genid]['callback'](genid,'close',None) + wait_map[genid]['fail_callback'](genid,'Eclose') del self._conn_linkidmap[conn.linkid] del self._caller_genidmap[conn.linkid] @@ -66,64 +66,89 @@ class Proxy: def register_call(self,path,func_name,func): self._call_pathmap[''.join([path,'/',func_name])] = func - def call(self,genid,timeout,iden,dst,func_name,param): - def _call_cb(genid,err,retvalue): - print('Opps') - - try: - stat,linkid,retvalue = self._route_call(genid,iden,dst,func_name,param) - if stat == True: - self._ioloop.add_callback(self.retcall,genid,retvalue) - else: - if retvalue != None: - self._retcall_genidmap[retvalue] = { - 'genid':genid, - 'callback':tornado.stack_context.wrap(self._retcall) - } + def call(self,caller_genid,timeout,iden,dst,func_name,param): + self._route_call(caller_genid,timeout,iden,dst,func_name,param) - self._add_waitcaller(linkid,genid,timeout,_call_cb) + def _route_call(self,caller_genid,timeout,iden,dst,func_name,param): + def __add_wait_caller(callee_linkid,caller_genid,timeout,fail_callback): + self._caller_genidmap[callee_linkid][caller_genid] = { + 'timeout':timeout, + 'fail_callback':tornado.stack_context.wrap(fail_callback) + } - except Exception as err: - print(err) - _call_cb(genid,err,None) + def __add_wait_retcall(callee_genid,caller_linkid,caller_genid): + self._retcall_genidmap[callee_genid] = { + 'caller_linkid':caller_linkid, + 'caller_genid':caller_genid, + } - def retcall(self,genid,retvalue): - stat,retvalue = nonblock.retcall(genid,retvalue) - if stat == True: - try: - data = self._retcall_genidmap.pop(genid) - data['callback'](data['genid'],retvalue) + def __local_fail_cb(genid,err): + self._ret_call(self._linkid,genid,(False,err)) - except KeyError: - pass - else: - pass + def __remote_fail_cb(genid,err): + print('Opps') - def _route_call(self,genid,iden,dst,func_name,param): dst_part = dst.split('/')[1:] linkid = dst_part[1] path = ''.join(dst_part[2:]) + caller_linkid = iden + if linkid == self._linkid: try: - stat,retvalue = self._call_pathmap[''.join([path,'/',func_name])](param) - if stat == True: - ret = (True,None,retvalue) - else: - ret = (False,linkid,retvalue) + stat,data = self._call_pathmap[''.join([path,'/',func_name])](param) except KeyError: - raise Exception('notexist') + print('Enot_exist') + + if stat == True: + if caller_linkid == self._linkid: + self._ioloop.add_callback(self._ret_call,caller_linkid,caller_genid,(True,data)) + else: + caller_conn = self.get_conn(caller_linkid) + if caller_conn == None: + pass + + self._send_msg_ret(caller_conn,caller_linkid,caller_genid,(True,data)) + else: + if caller_linkid == self._linkid: + __add_wait_caller(linkid,caller_genid,timeout,__local_fail_cb) + else: + __add_wait_caller(linkid,caller_genid,timeout,__remote_fail_cb) + + __add_wait_retcall(data,caller_linkid,caller_genid) else: - conn = self.get_conn(linkid) - if conn == None: + callee_conn = self.get_conn(linkid) + if callee_conn == None: pass - self._send_msg_call(conn,genid,iden,dst,func_name,param) - ret = (False,conn.linkid,None) + if caller_linkid == self._linkid: + __add_wait_caller(callee_conn.linkid,caller_genid,timeout,__local_fail_cb) + self._send_msg_call(callee_conn,caller_genid,timeout,iden,dst,func_name,param) + else: + pass - return ret + def _ret_call(self,caller_linkid,caller_genid,retvalue): + if caller_linkid == self._linkid: + stat,data = nonblock.retcall(caller_genid,retvalue) + if stat == True: + try: + ret = self._retcall_genidmap.pop(caller_genid) + linkid = ret['caller_linkid'] + genid = ret['caller_genid'] + del self._caller_genidmap[caller_linkid][genid] + self._ret_call(linkid,genid,data) + + except KeyError: + pass + + else: + caller_conn = self.get_conn(caller_linkid) + if caller_conn == None: + pass + + self._send_msg_ret(caller_conn,caller_linkid,caller_genid,retvalue) def _recvloop_dispatch(self,conn,data): msg = json.loads(data.decode('utf-8')) @@ -136,14 +161,7 @@ class Proxy: def _conn_close_cb(self,conn): self.del_conn(conn) print('connection close') - - def _add_waitcaller(self,linkid,genid,timeout,callback): - wait = { - 'timeout':timeout, - 'callback':tornado.stack_context.wrap(callback) - } - self._caller_genidmap[linkid][genid] = wait - + def _check_waitcaller(self): wait_maps = self._caller_genidmap.values() for wait_map in wait_maps: @@ -154,16 +172,17 @@ class Proxy: wait['timeout'] -= 1000 if wait['timeout'] <= 0: - wait['callback'](genid,'timeout',None) + wait['fail_callback'](genid,'Etimeout') wait_del.append(genid) for genid in wait_del: del wait_map[genid] - def _send_msg_call(self,conn,genid,iden,dst,func_name,param): + def _send_msg_call(self,conn,caller_genid,timeout,iden,dst,func_name,param): msg = { 'type':self.MSGTYPE_CALL, - 'genid':genid, + 'caller_genid':caller_genid, + 'timeout':timeout, 'iden':iden, 'dst':dst, 'func_name':func_name, @@ -172,48 +191,42 @@ class Proxy: conn.send_msg(bytes(json.dumps(msg),'utf-8')) def _recv_msg_call(self,conn,msg): - genid = msg['genid'] + caller_genid = msg['caller_genid'] + timeout = msg['timeout'] iden = msg['iden'] dst = msg['dst'] func_name = msg['func_name'] param = msg['param'] - def _call_cb(genid,err,retvalue): - print('Opps') + self._route_call(caller_genid,timeout,iden,dst,func_name,param) - try: - stat,linkid,retvalue = self._route_call(genid,iden,dst,func_name,param) - if stat == True: - self._send_msg_ret(conn,genid,retvalue) - - else: - pass - - except Exception as err: - _call_cb(genid,err,None) - - def _send_msg_ret(self,conn,genid,retvalue): + def _send_msg_ret(self,conn,caller_linkid,caller_genid,retvalue): msg = { 'type':self.MSGTYPE_RET, - 'genid':genid, + 'caller_linkid':caller_linkid, + 'caller_genid':caller_genid, 'retvalue':retvalue } conn.send_msg(bytes(json.dumps(msg),'utf-8')) def _recv_msg_ret(self,conn,msg): - genid = msg['genid'] + caller_linkid = msg['caller_linkid'] + caller_genid = msg['caller_genid'] retvalue = msg['retvalue'] - try: - del self._caller_genidmap[conn.linkid][genid] - self.retcall(genid,retvalue) + if caller_linkid == self._linkid: + try: + del self._caller_genidmap[conn.linkid][caller_genid] + self._ret_call(caller_linkid,caller_genid,retvalue) - except KeyError: + except KeyError: + pass + else: pass @nonblock.call def imc_call(iden,dst,func_name,param,_genid): - Proxy.instance.call(_genid,60000,iden,dst,func_name,param) + Proxy.instance.call(_genid,5000,iden,dst,func_name,param) def imc_register_call(path,func_name,func): Proxy.instance.register_call(path,func_name,func) |