aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorpzread <netfirewall@gmail.com>2013-05-06 00:52:28 +0800
committerpzread <netfirewall@gmail.com>2013-05-06 00:52:28 +0800
commitc4edd77fed0ed54ed1b92dedd6bbdafb972b6da1 (patch)
tree81362de5553f9cc9468fa9948cc82a54cb93f72a /src
parentae30c79b6068c32fb587d338d069e449c4a85511 (diff)
downloadtaiwan-online-judge-c4edd77fed0ed54ed1b92dedd6bbdafb972b6da1.tar
taiwan-online-judge-c4edd77fed0ed54ed1b92dedd6bbdafb972b6da1.tar.gz
taiwan-online-judge-c4edd77fed0ed54ed1b92dedd6bbdafb972b6da1.tar.bz2
taiwan-online-judge-c4edd77fed0ed54ed1b92dedd6bbdafb972b6da1.tar.lz
taiwan-online-judge-c4edd77fed0ed54ed1b92dedd6bbdafb972b6da1.tar.xz
taiwan-online-judge-c4edd77fed0ed54ed1b92dedd6bbdafb972b6da1.tar.zst
taiwan-online-judge-c4edd77fed0ed54ed1b92dedd6bbdafb972b6da1.zip
Done _send_msg_call
Diffstat (limited to 'src')
-rw-r--r--src/py/backend_server.py9
-rw-r--r--src/py/center_server.py4
-rw-r--r--src/py/imcproxy.py52
-rw-r--r--src/py/netio.py4
4 files changed, 49 insertions, 20 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py
index c295160..544f647 100644
--- a/src/py/backend_server.py
+++ b/src/py/backend_server.py
@@ -32,13 +32,16 @@ class BackendWorker():
info = json.loads(data.decode('utf-8'))
self.linkid = info['linkid']
- self.center_conn = netio.SocketConnection(stream)
+ self.center_conn = netio.SocketConnection(info['center_linkid'],stream)
self.center_conn.add_close_callback(lambda conn : __retry())
- self.imc_proxy.add_conn(info['center_linkid'],self.center_conn)
+ self.imc_proxy.add_conn(self.center_conn)
print('/backend/' + self.linkid)
- self.imc_proxy._send_msg_call(self.center_conn,None,None,'Hello',None)
+ def ___tmp(genid):
+ print(genid)
+
+ self.imc_proxy._send_msg_call(self.center_conn,5000,13,___tmp,None,None,'Hello',None)
netio.send_pack(stream,bytes(json.dumps({
'linkclass':self.linkclass,
diff --git a/src/py/center_server.py b/src/py/center_server.py
index 4a892f8..cb4ef75 100644
--- a/src/py/center_server.py
+++ b/src/py/center_server.py
@@ -26,9 +26,9 @@ class Worker:
'center_linkid':center_serv.linkid
}),'utf-8'))
- conn = netio.SocketConnection(self.stream)
+ conn = netio.SocketConnection(self.linkid,self.stream)
conn.add_close_callback(lambda conn : self.close())
- center_serv.imc_proxy.add_conn(self.linkid,conn)
+ center_serv.imc_proxy.add_conn(conn)
def close(self):
pass
diff --git a/src/py/imcproxy.py b/src/py/imcproxy.py
index 265e9f8..25784f4 100644
--- a/src/py/imcproxy.py
+++ b/src/py/imcproxy.py
@@ -4,7 +4,8 @@ import tornado.ioloop
import tornado.stack_context
class IMCConnection:
- def __init__(self):
+ def __init__(self,linkid):
+ self.linkid = linkid
self._close_callback = []
def send_msg(self,data):
@@ -22,23 +23,25 @@ class IMCConnection:
class IMCProxy:
def __init__(self):
- self._linkid_connmap = {}
self._conn_linkidmap = {}
+ self._conn_waitretmap = {}
self.MSGTYPE_CALL = 'call'
+ self.MSGTYPE_RET = 'ret'
- self.test_count = 0
+ self._check_waitret_timer = tornado.ioloop.PeriodicCallback(self._check_waitret,1000)
+ self._check_waitret_timer.start()
- def add_conn(self,linkid,conn):
- self._linkid_connmap[id(conn)] = linkid
- self._conn_linkidmap[linkid] = conn
+ def add_conn(self,conn):
+ self._conn_linkidmap[conn.linkid] = conn
+ self._conn_waitretmap[conn.linkid] = {}
conn.add_close_callback(self._conn_close_cb)
conn.start_recvloop(self._recvloop_dispatch)
def del_conn(self,conn):
- linkid = self._linkid_connmap.pop(id(conn))
- del self._conn_linkidmap[linkid]
+ del self._conn_linkidmap[conn.linkid]
+ del self._conn_waitretmap[conn.linkid]
def get_conn(self,linkid):
if linkid not in self.conn_linkidmap:
@@ -53,17 +56,44 @@ class IMCProxy:
self._recv_msg_call(conn,msg)
def _conn_close_cb(self,conn):
+ wait_map = self._conn_waitretmap[conn.linkid]
+ wait_genids = wait_map.keys()
+ for genid in wait_genids:
+ wait_map[genid]['fail_callback'](genid)
+
self.del_conn(conn)
print('connection close')
- def _send_msg_call(self,conn,iden,dst,func,param):
+ def _check_waitret(self):
+ wait_maps = self._conn_waitretmap.values()
+ for wait_map in wait_maps:
+ wait_genids = wait_map.keys()
+ wait_del = []
+ for genid in wait_genids:
+ wait = wait_map[genid]
+ wait['timeout'] -= 1000
+
+ if wait['timeout'] <= 0:
+ wait['fail_callback'](genid)
+ wait_del.append(genid)
+
+ for genid in wait_del:
+ del wait_map[genid]
+
+ def _send_msg_call(self,conn,timeout,genid,fail_callback,iden,dst,func,param):
+ wait = {
+ 'timeout':timeout,
+ 'fail_callback':tornado.stack_context.wrap(fail_callback)
+ }
msg = {
'type':self.MSGTYPE_CALL,
+ 'genid':genid,
'iden':iden,
'dst':dst,
'func':func,
'param':param
}
+ self._conn_waitretmap[conn.linkid][genid] = wait
conn.send_msg(bytes(json.dumps(msg),'utf-8'))
def _recv_msg_call(self,conn,msg):
@@ -71,7 +101,3 @@ class IMCProxy:
dst = msg['dst']
func = msg['func']
param = msg['param']
-
- self.test_count += 1
- print(self.test_count)
- self._send_msg_call(conn,None,None,'Hello too',None)
diff --git a/src/py/netio.py b/src/py/netio.py
index 9e80d54..a08c248 100644
--- a/src/py/netio.py
+++ b/src/py/netio.py
@@ -16,8 +16,8 @@ def recv_pack(stream,callback):
stream.read_bytes(8,_recv_size)
class SocketConnection(imcproxy.IMCConnection):
- def __init__(self,stream):
- super().__init__()
+ def __init__(self,linkid,stream):
+ super().__init__(linkid)
self.ioloop = tornado.ioloop.IOLoop.current()
self.stream = stream