diff options
author | pzread <netfirewall@gmail.com> | 2013-05-06 00:52:28 +0800 |
---|---|---|
committer | pzread <netfirewall@gmail.com> | 2013-05-06 00:52:28 +0800 |
commit | c4edd77fed0ed54ed1b92dedd6bbdafb972b6da1 (patch) | |
tree | 81362de5553f9cc9468fa9948cc82a54cb93f72a /src | |
parent | ae30c79b6068c32fb587d338d069e449c4a85511 (diff) | |
download | taiwan-online-judge-c4edd77fed0ed54ed1b92dedd6bbdafb972b6da1.tar taiwan-online-judge-c4edd77fed0ed54ed1b92dedd6bbdafb972b6da1.tar.gz taiwan-online-judge-c4edd77fed0ed54ed1b92dedd6bbdafb972b6da1.tar.bz2 taiwan-online-judge-c4edd77fed0ed54ed1b92dedd6bbdafb972b6da1.tar.lz taiwan-online-judge-c4edd77fed0ed54ed1b92dedd6bbdafb972b6da1.tar.xz taiwan-online-judge-c4edd77fed0ed54ed1b92dedd6bbdafb972b6da1.tar.zst taiwan-online-judge-c4edd77fed0ed54ed1b92dedd6bbdafb972b6da1.zip |
Done _send_msg_call
Diffstat (limited to 'src')
-rw-r--r-- | src/py/backend_server.py | 9 | ||||
-rw-r--r-- | src/py/center_server.py | 4 | ||||
-rw-r--r-- | src/py/imcproxy.py | 52 | ||||
-rw-r--r-- | src/py/netio.py | 4 |
4 files changed, 49 insertions, 20 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py index c295160..544f647 100644 --- a/src/py/backend_server.py +++ b/src/py/backend_server.py @@ -32,13 +32,16 @@ class BackendWorker(): info = json.loads(data.decode('utf-8')) self.linkid = info['linkid'] - self.center_conn = netio.SocketConnection(stream) + self.center_conn = netio.SocketConnection(info['center_linkid'],stream) self.center_conn.add_close_callback(lambda conn : __retry()) - self.imc_proxy.add_conn(info['center_linkid'],self.center_conn) + self.imc_proxy.add_conn(self.center_conn) print('/backend/' + self.linkid) - self.imc_proxy._send_msg_call(self.center_conn,None,None,'Hello',None) + def ___tmp(genid): + print(genid) + + self.imc_proxy._send_msg_call(self.center_conn,5000,13,___tmp,None,None,'Hello',None) netio.send_pack(stream,bytes(json.dumps({ 'linkclass':self.linkclass, diff --git a/src/py/center_server.py b/src/py/center_server.py index 4a892f8..cb4ef75 100644 --- a/src/py/center_server.py +++ b/src/py/center_server.py @@ -26,9 +26,9 @@ class Worker: 'center_linkid':center_serv.linkid }),'utf-8')) - conn = netio.SocketConnection(self.stream) + conn = netio.SocketConnection(self.linkid,self.stream) conn.add_close_callback(lambda conn : self.close()) - center_serv.imc_proxy.add_conn(self.linkid,conn) + center_serv.imc_proxy.add_conn(conn) def close(self): pass diff --git a/src/py/imcproxy.py b/src/py/imcproxy.py index 265e9f8..25784f4 100644 --- a/src/py/imcproxy.py +++ b/src/py/imcproxy.py @@ -4,7 +4,8 @@ import tornado.ioloop import tornado.stack_context class IMCConnection: - def __init__(self): + def __init__(self,linkid): + self.linkid = linkid self._close_callback = [] def send_msg(self,data): @@ -22,23 +23,25 @@ class IMCConnection: class IMCProxy: def __init__(self): - self._linkid_connmap = {} self._conn_linkidmap = {} + self._conn_waitretmap = {} self.MSGTYPE_CALL = 'call' + self.MSGTYPE_RET = 'ret' - self.test_count = 0 + self._check_waitret_timer = tornado.ioloop.PeriodicCallback(self._check_waitret,1000) + self._check_waitret_timer.start() - def add_conn(self,linkid,conn): - self._linkid_connmap[id(conn)] = linkid - self._conn_linkidmap[linkid] = conn + def add_conn(self,conn): + self._conn_linkidmap[conn.linkid] = conn + self._conn_waitretmap[conn.linkid] = {} conn.add_close_callback(self._conn_close_cb) conn.start_recvloop(self._recvloop_dispatch) def del_conn(self,conn): - linkid = self._linkid_connmap.pop(id(conn)) - del self._conn_linkidmap[linkid] + del self._conn_linkidmap[conn.linkid] + del self._conn_waitretmap[conn.linkid] def get_conn(self,linkid): if linkid not in self.conn_linkidmap: @@ -53,17 +56,44 @@ class IMCProxy: self._recv_msg_call(conn,msg) def _conn_close_cb(self,conn): + wait_map = self._conn_waitretmap[conn.linkid] + wait_genids = wait_map.keys() + for genid in wait_genids: + wait_map[genid]['fail_callback'](genid) + self.del_conn(conn) print('connection close') - def _send_msg_call(self,conn,iden,dst,func,param): + def _check_waitret(self): + wait_maps = self._conn_waitretmap.values() + for wait_map in wait_maps: + wait_genids = wait_map.keys() + wait_del = [] + for genid in wait_genids: + wait = wait_map[genid] + wait['timeout'] -= 1000 + + if wait['timeout'] <= 0: + wait['fail_callback'](genid) + wait_del.append(genid) + + for genid in wait_del: + del wait_map[genid] + + def _send_msg_call(self,conn,timeout,genid,fail_callback,iden,dst,func,param): + wait = { + 'timeout':timeout, + 'fail_callback':tornado.stack_context.wrap(fail_callback) + } msg = { 'type':self.MSGTYPE_CALL, + 'genid':genid, 'iden':iden, 'dst':dst, 'func':func, 'param':param } + self._conn_waitretmap[conn.linkid][genid] = wait conn.send_msg(bytes(json.dumps(msg),'utf-8')) def _recv_msg_call(self,conn,msg): @@ -71,7 +101,3 @@ class IMCProxy: dst = msg['dst'] func = msg['func'] param = msg['param'] - - self.test_count += 1 - print(self.test_count) - self._send_msg_call(conn,None,None,'Hello too',None) diff --git a/src/py/netio.py b/src/py/netio.py index 9e80d54..a08c248 100644 --- a/src/py/netio.py +++ b/src/py/netio.py @@ -16,8 +16,8 @@ def recv_pack(stream,callback): stream.read_bytes(8,_recv_size) class SocketConnection(imcproxy.IMCConnection): - def __init__(self,stream): - super().__init__() + def __init__(self,linkid,stream): + super().__init__(linkid) self.ioloop = tornado.ioloop.IOLoop.current() self.stream = stream |