aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorpzread <netfirewall@gmail.com>2013-05-10 01:05:10 +0800
committerpzread <netfirewall@gmail.com>2013-05-10 01:05:10 +0800
commit4b2c34236ee67a44573a538497ecc166f0c65897 (patch)
treec6e16492326f22146b5cdb18640841310b288f2d /src
parentd6bb4b221306f3b549f6fb41e85a1093ca659132 (diff)
downloadtaiwan-online-judge-4b2c34236ee67a44573a538497ecc166f0c65897.tar
taiwan-online-judge-4b2c34236ee67a44573a538497ecc166f0c65897.tar.gz
taiwan-online-judge-4b2c34236ee67a44573a538497ecc166f0c65897.tar.bz2
taiwan-online-judge-4b2c34236ee67a44573a538497ecc166f0c65897.tar.lz
taiwan-online-judge-4b2c34236ee67a44573a538497ecc166f0c65897.tar.xz
taiwan-online-judge-4b2c34236ee67a44573a538497ecc166f0c65897.tar.zst
taiwan-online-judge-4b2c34236ee67a44573a538497ecc166f0c65897.zip
Done local <-> local and local <-> remote IMC call
Diffstat (limited to 'src')
-rw-r--r--src/py/backend_server.py2
-rw-r--r--src/py/center_server.py10
-rw-r--r--src/py/imc/nonblock.py5
-rwxr-xr-xsrc/py/imc/proxy.py161
4 files changed, 99 insertions, 79 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py
index 6081a6a..e32bc12 100644
--- a/src/py/backend_server.py
+++ b/src/py/backend_server.py
@@ -56,7 +56,7 @@ class BackendWorker():
@imc.nonblock.func
def _test_call(self,param):
- ret = (yield imc_call(None,'/backend/' + self.center_conn.linkid,'test_dst','Hello'))
+ ret = (yield imc_call(self.linkid,'/backend/' + self.center_conn.linkid,'test_dst','Hello'))
print(ret)
@imc.nonblock.func
diff --git a/src/py/center_server.py b/src/py/center_server.py
index 0fecb23..81512b5 100644
--- a/src/py/center_server.py
+++ b/src/py/center_server.py
@@ -11,7 +11,7 @@ import tornado.web
import netio
import imc.nonblock
-from imc.proxy import Proxy,Connection,imc_register_call
+from imc.proxy import Proxy,Connection,imc_register_call,imc_call
class Worker:
def __init__(self,stream,linkclass,linkid,worker_ip):
@@ -66,6 +66,7 @@ class CenterServer(tornado.tcpserver.TCPServer):
print('/center/' + self.linkid)
imc_register_call('','test_dst',self._test_dst)
+ imc_register_call('','test_dstb',self._test_dstb)
def handle_stream(self,stream,address):
def _recv_worker_info(data):
@@ -104,7 +105,12 @@ class CenterServer(tornado.tcpserver.TCPServer):
@imc.nonblock.func
def _test_dst(self,param):
- return 'Hello Too'
+ stat,ret = (yield imc_call(self.linkid,'/center/' + self.linkid,'test_dstb','Hello X'))
+ return ret + ' Too'
+
+ @imc.nonblock.func
+ def _test_dstb(self,param):
+ return param + ' World'
class WebConnHandler(tornado.web.RequestHandler):
def get(self):
diff --git a/src/py/imc/nonblock.py b/src/py/imc/nonblock.py
index c2741c0..9419395 100644
--- a/src/py/imc/nonblock.py
+++ b/src/py/imc/nonblock.py
@@ -45,6 +45,7 @@ def retcall(genid,value):
gen.send(value)
return (False,gen_current_id)
- except StopIteration as ret:
+
+ except StopIteration as err:
del gen_waitmap[gen_current_id]
- return (True,ret)
+ return (True,err.value)
diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py
index e10a843..a428ac3 100755
--- a/src/py/imc/proxy.py
+++ b/src/py/imc/proxy.py
@@ -52,7 +52,7 @@ class Proxy:
wait_map = self._caller_genidmap[conn.linkid]
wait_genids = wait_map.keys()
for genid in wait_genids:
- wait_map[genid]['callback'](genid,'close',None)
+ wait_map[genid]['fail_callback'](genid,'Eclose')
del self._conn_linkidmap[conn.linkid]
del self._caller_genidmap[conn.linkid]
@@ -66,64 +66,89 @@ class Proxy:
def register_call(self,path,func_name,func):
self._call_pathmap[''.join([path,'/',func_name])] = func
- def call(self,genid,timeout,iden,dst,func_name,param):
- def _call_cb(genid,err,retvalue):
- print('Opps')
-
- try:
- stat,linkid,retvalue = self._route_call(genid,iden,dst,func_name,param)
- if stat == True:
- self._ioloop.add_callback(self.retcall,genid,retvalue)
- else:
- if retvalue != None:
- self._retcall_genidmap[retvalue] = {
- 'genid':genid,
- 'callback':tornado.stack_context.wrap(self._retcall)
- }
+ def call(self,caller_genid,timeout,iden,dst,func_name,param):
+ self._route_call(caller_genid,timeout,iden,dst,func_name,param)
- self._add_waitcaller(linkid,genid,timeout,_call_cb)
+ def _route_call(self,caller_genid,timeout,iden,dst,func_name,param):
+ def __add_wait_caller(callee_linkid,caller_genid,timeout,fail_callback):
+ self._caller_genidmap[callee_linkid][caller_genid] = {
+ 'timeout':timeout,
+ 'fail_callback':tornado.stack_context.wrap(fail_callback)
+ }
- except Exception as err:
- print(err)
- _call_cb(genid,err,None)
+ def __add_wait_retcall(callee_genid,caller_linkid,caller_genid):
+ self._retcall_genidmap[callee_genid] = {
+ 'caller_linkid':caller_linkid,
+ 'caller_genid':caller_genid,
+ }
- def retcall(self,genid,retvalue):
- stat,retvalue = nonblock.retcall(genid,retvalue)
- if stat == True:
- try:
- data = self._retcall_genidmap.pop(genid)
- data['callback'](data['genid'],retvalue)
+ def __local_fail_cb(genid,err):
+ self._ret_call(self._linkid,genid,(False,err))
- except KeyError:
- pass
- else:
- pass
+ def __remote_fail_cb(genid,err):
+ print('Opps')
- def _route_call(self,genid,iden,dst,func_name,param):
dst_part = dst.split('/')[1:]
linkid = dst_part[1]
path = ''.join(dst_part[2:])
+ caller_linkid = iden
+
if linkid == self._linkid:
try:
- stat,retvalue = self._call_pathmap[''.join([path,'/',func_name])](param)
- if stat == True:
- ret = (True,None,retvalue)
- else:
- ret = (False,linkid,retvalue)
+ stat,data = self._call_pathmap[''.join([path,'/',func_name])](param)
except KeyError:
- raise Exception('notexist')
+ print('Enot_exist')
+
+ if stat == True:
+ if caller_linkid == self._linkid:
+ self._ioloop.add_callback(self._ret_call,caller_linkid,caller_genid,(True,data))
+ else:
+ caller_conn = self.get_conn(caller_linkid)
+ if caller_conn == None:
+ pass
+
+ self._send_msg_ret(caller_conn,caller_linkid,caller_genid,(True,data))
+ else:
+ if caller_linkid == self._linkid:
+ __add_wait_caller(linkid,caller_genid,timeout,__local_fail_cb)
+ else:
+ __add_wait_caller(linkid,caller_genid,timeout,__remote_fail_cb)
+
+ __add_wait_retcall(data,caller_linkid,caller_genid)
else:
- conn = self.get_conn(linkid)
- if conn == None:
+ callee_conn = self.get_conn(linkid)
+ if callee_conn == None:
pass
- self._send_msg_call(conn,genid,iden,dst,func_name,param)
- ret = (False,conn.linkid,None)
+ if caller_linkid == self._linkid:
+ __add_wait_caller(callee_conn.linkid,caller_genid,timeout,__local_fail_cb)
+ self._send_msg_call(callee_conn,caller_genid,timeout,iden,dst,func_name,param)
+ else:
+ pass
- return ret
+ def _ret_call(self,caller_linkid,caller_genid,retvalue):
+ if caller_linkid == self._linkid:
+ stat,data = nonblock.retcall(caller_genid,retvalue)
+ if stat == True:
+ try:
+ ret = self._retcall_genidmap.pop(caller_genid)
+ linkid = ret['caller_linkid']
+ genid = ret['caller_genid']
+ del self._caller_genidmap[caller_linkid][genid]
+ self._ret_call(linkid,genid,data)
+
+ except KeyError:
+ pass
+
+ else:
+ caller_conn = self.get_conn(caller_linkid)
+ if caller_conn == None:
+ pass
+
+ self._send_msg_ret(caller_conn,caller_linkid,caller_genid,retvalue)
def _recvloop_dispatch(self,conn,data):
msg = json.loads(data.decode('utf-8'))
@@ -136,14 +161,7 @@ class Proxy:
def _conn_close_cb(self,conn):
self.del_conn(conn)
print('connection close')
-
- def _add_waitcaller(self,linkid,genid,timeout,callback):
- wait = {
- 'timeout':timeout,
- 'callback':tornado.stack_context.wrap(callback)
- }
- self._caller_genidmap[linkid][genid] = wait
-
+
def _check_waitcaller(self):
wait_maps = self._caller_genidmap.values()
for wait_map in wait_maps:
@@ -154,16 +172,17 @@ class Proxy:
wait['timeout'] -= 1000
if wait['timeout'] <= 0:
- wait['callback'](genid,'timeout',None)
+ wait['fail_callback'](genid,'Etimeout')
wait_del.append(genid)
for genid in wait_del:
del wait_map[genid]
- def _send_msg_call(self,conn,genid,iden,dst,func_name,param):
+ def _send_msg_call(self,conn,caller_genid,timeout,iden,dst,func_name,param):
msg = {
'type':self.MSGTYPE_CALL,
- 'genid':genid,
+ 'caller_genid':caller_genid,
+ 'timeout':timeout,
'iden':iden,
'dst':dst,
'func_name':func_name,
@@ -172,48 +191,42 @@ class Proxy:
conn.send_msg(bytes(json.dumps(msg),'utf-8'))
def _recv_msg_call(self,conn,msg):
- genid = msg['genid']
+ caller_genid = msg['caller_genid']
+ timeout = msg['timeout']
iden = msg['iden']
dst = msg['dst']
func_name = msg['func_name']
param = msg['param']
- def _call_cb(genid,err,retvalue):
- print('Opps')
+ self._route_call(caller_genid,timeout,iden,dst,func_name,param)
- try:
- stat,linkid,retvalue = self._route_call(genid,iden,dst,func_name,param)
- if stat == True:
- self._send_msg_ret(conn,genid,retvalue)
-
- else:
- pass
-
- except Exception as err:
- _call_cb(genid,err,None)
-
- def _send_msg_ret(self,conn,genid,retvalue):
+ def _send_msg_ret(self,conn,caller_linkid,caller_genid,retvalue):
msg = {
'type':self.MSGTYPE_RET,
- 'genid':genid,
+ 'caller_linkid':caller_linkid,
+ 'caller_genid':caller_genid,
'retvalue':retvalue
}
conn.send_msg(bytes(json.dumps(msg),'utf-8'))
def _recv_msg_ret(self,conn,msg):
- genid = msg['genid']
+ caller_linkid = msg['caller_linkid']
+ caller_genid = msg['caller_genid']
retvalue = msg['retvalue']
- try:
- del self._caller_genidmap[conn.linkid][genid]
- self.retcall(genid,retvalue)
+ if caller_linkid == self._linkid:
+ try:
+ del self._caller_genidmap[conn.linkid][caller_genid]
+ self._ret_call(caller_linkid,caller_genid,retvalue)
- except KeyError:
+ except KeyError:
+ pass
+ else:
pass
@nonblock.call
def imc_call(iden,dst,func_name,param,_genid):
- Proxy.instance.call(_genid,60000,iden,dst,func_name,param)
+ Proxy.instance.call(_genid,5000,iden,dst,func_name,param)
def imc_register_call(path,func_name,func):
Proxy.instance.register_call(path,func_name,func)