From 026f1974af083ddc46f3b0438cdb553923a289c6 Mon Sep 17 00:00:00 2001 From: pzread Date: Wed, 8 May 2013 14:53:00 +0800 Subject: Tmp backup --- src/py/imc/proxy.py | 102 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 68 insertions(+), 34 deletions(-) (limited to 'src/py/imc') diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py index 0d6a942..371285f 100755 --- a/src/py/imc/proxy.py +++ b/src/py/imc/proxy.py @@ -3,7 +3,7 @@ import json import tornado.ioloop import tornado.stack_context -import nonblock +from imc import nonblock class Connection: def __init__(self,linkid): @@ -25,30 +25,36 @@ class Connection: class Proxy: def __init__(self,linkid): + self._ioloop = tornado.ioloop.IOLoop.instance() self._linkid = linkid self._conn_linkidmap = {} - self._conn_waitretmap = {} + self._caller_genidmap = {self._linkid:{}} self._call_pathmap = {} self.MSGTYPE_CALL = 'call' self.MSGTYPE_RET = 'ret' - self._check_waitret_timer = tornado.ioloop.PeriodicCallback(self._check_waitret,1000) - self._check_waitret_timer.start() + self._check_waitcaller_timer = tornado.ioloop.PeriodicCallback(self._check_waitcaller,1000) + self._check_waitcaller_timer.start() Proxy.instance = self def add_conn(self,conn): self._conn_linkidmap[conn.linkid] = conn - self._conn_waitretmap[conn.linkid] = {} + self._caller_genidmap[conn.linkid] = {} conn.add_close_callback(self._conn_close_cb) conn.start_recvloop(self._recvloop_dispatch) def del_conn(self,conn): + wait_map = self._caller_genidmap[conn.linkid] + wait_genids = wait_map.keys() + for genid in wait_genids: + wait_map[genid]['callback'](genid,'close',None) + del self._conn_linkidmap[conn.linkid] - del self._conn_waitretmap[conn.linkid] + del self._caller_genidmap[conn.linkid] def get_conn(self,linkid): if linkid not in self._conn_linkidmap: @@ -56,32 +62,50 @@ class Proxy: return self._conn_linkidmap[linkid] - def call(self,genid,iden,dst,func_name,param): - def _fail_cb(genid): + def call(self,genid,timeout,iden,dst,func_name,param): + def _call_cb(genid,err,retvalue): print('Opps') - self._route_call(genid,_fail_cb,iden,dst,func_name,param) + try: + stat,retvalue = self._route_call(genid,iden,dst,func_name,param) + if stat == True: + self._ioloop.add_callback(nonblock.retcall,genid,retvalue) + else: + self._add_waitcaller(self._linkid,genid,timeout,_call_cb) + + except Exception as err: + _call_cb(genid,err,None) def register_call(self,path,func_name,func): self._call_pathmap[''.join([path,'/',func_name])] = func - def _route_call(self,genid,fail_callback,iden,dst,func_name,param): + def _route_call(self,genid,iden,dst,func_name,param): dst_part = dst.split('/')[1:] linkid = dst_part[1] path = ''.join(dst_part[2:]) if linkid == self._linkid: - self._handle_call(genid,fail_callback,iden,path,func_name,param) + stat,retvalue = self._handle_call(genid,iden,path,func_name,param) + if stat == True: + ret = (True,retvalue) + else: + ret = (False,self._linkid) + else: conn = self.get_conn(linkid) if conn == None: pass - def _handle_call(self,genid,fail_callback,iden,path,func_name,param): + self._send_msg_call(conn,genid,iden,dst,func_name,param) + ret = (False,conn.linkid) + + return ret + + def _handle_call(self,genid,iden,path,func_name,param): try: - self._call_pathmap[''.join([path,'/',func_name])](param) + return self._call_pathmap[''.join([path,'/',func_name])](param) except KeyError: - fail_callback(genid) + raise Exception('notexist') def _recvloop_dispatch(self,conn,data): msg = json.loads(data.decode('utf-8')) @@ -92,16 +116,18 @@ class Proxy: self._recv_msg_ret(conn,msg) def _conn_close_cb(self,conn): - wait_map = self._conn_waitretmap[conn.linkid] - wait_genids = wait_map.keys() - for genid in wait_genids: - wait_map[genid]['fail_callback'](genid) - self.del_conn(conn) print('connection close') - def _check_waitret(self): - wait_maps = self._conn_waitretmap.values() + def _add_waitcaller(linkid,genid,timeout,callback): + wait = { + 'timeout':timeout, + 'callback':tornado.stack_context.wrap(callback) + } + self._caller_genidmap[linkid][genid] = wait + + def _check_waitcaller(self): + wait_maps = self._caller_genidmap.values() for wait_map in wait_maps: wait_genids = wait_map.keys() wait_del = [] @@ -110,38 +136,45 @@ class Proxy: wait['timeout'] -= 1000 if wait['timeout'] <= 0: - wait['fail_callback'](genid) + wait['callback'](genid,'timeout',None) wait_del.append(genid) for genid in wait_del: del wait_map[genid] - def _send_msg_call(self,conn,timeout,genid,fail_callback,iden,dst,func,param): - wait = { - 'timeout':timeout, - 'fail_callback':tornado.stack_context.wrap(fail_callback) - } + def _send_msg_call(self,conn,genid,iden,dst,func_name,param): msg = { 'type':self.MSGTYPE_CALL, 'genid':genid, 'iden':iden, 'dst':dst, - 'func':func, + 'func_name':func_name, 'param':param } - self._conn_waitretmap[conn.linkid][genid] = wait conn.send_msg(bytes(json.dumps(msg),'utf-8')) def _recv_msg_call(self,conn,msg): genid = msg['genid'] iden = msg['iden'] dst = msg['dst'] - func = msg['func'] + func_name = msg['func_name'] param = msg['param'] - print(genid) + def _call_cb(genid,err,retvalue): + print('Opps') + + try: + stat,retvalue = self._route_call(genid,iden,dst,func_name,param) + if stat == True: + self._send_msg_ret(conn,genid,retvalue) + + else: + pass + + except Exception as err: + _call_cb(genid,err,None) - self._send_msg_ret(conn,genid,'Hello') + #self._send_msg_ret(conn,genid,'Hello') def _send_msg_ret(self,conn,genid,retvalue): msg = { @@ -155,13 +188,14 @@ class Proxy: genid = msg['genid'] retvalue = msg['retvalue'] - self._conn_waitretmap[conn.linkid].pop(genid) + print(self._caller_genidmap) + self._caller_genidmap[conn.linkid].pop(genid) print(retvalue) @nonblock.call def imc_call(iden,dst,func_name,param,_genid): - Proxy.instance.call(_genid,iden,dst,func_name,param) + Proxy.instance.call(_genid,60000,iden,dst,func_name,param) def imc_register_call(path,func_name,func): Proxy.instance.register_call(path,func_name,func) -- cgit v1.2.3