diff options
author | pzread <netfirewall@gmail.com> | 2013-05-27 01:04:31 +0800 |
---|---|---|
committer | pzread <netfirewall@gmail.com> | 2013-05-27 01:04:31 +0800 |
commit | 96e0553b08b7d7daf4e1847a697b51df696f2f89 (patch) | |
tree | a12d21bb8af88d537fb3d63203a23ab805edcd74 | |
parent | 5c2764816a3db7a4a2a8d8099967abf5f9462ffe (diff) | |
download | taiwan-online-judge-96e0553b08b7d7daf4e1847a697b51df696f2f89.tar taiwan-online-judge-96e0553b08b7d7daf4e1847a697b51df696f2f89.tar.gz taiwan-online-judge-96e0553b08b7d7daf4e1847a697b51df696f2f89.tar.bz2 taiwan-online-judge-96e0553b08b7d7daf4e1847a697b51df696f2f89.tar.lz taiwan-online-judge-96e0553b08b7d7daf4e1847a697b51df696f2f89.tar.xz taiwan-online-judge-96e0553b08b7d7daf4e1847a697b51df696f2f89.tar.zst taiwan-online-judge-96e0553b08b7d7daf4e1847a697b51df696f2f89.zip |
Write SocketStream which support sendfile
-rw-r--r-- | src/py/backend_server.py | 15 | ||||
-rwxr-xr-x | src/py/imc/proxy.py | 57 | ||||
-rw-r--r-- | src/py/netio.py | 195 |
3 files changed, 255 insertions, 12 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py index cac0e11..f1537c9 100644 --- a/src/py/backend_server.py +++ b/src/py/backend_server.py @@ -140,24 +140,23 @@ class BackendWorker(tornado.tcpserver.TCPServer): def __call_recv_cb(data): def ___file_conn_cb(): - conn = netio.SocketConnection(worker_linkclass,worker_linkid,call_stream,file_stream) - Proxy.instance.add_conn(conn) - __handle_pend(conn) - netio.send_pack(file_stream,bytes(json.dumps({ 'conntype':'file', 'linkid':self._linkid }),'utf-8')) + conn = netio.SocketConnection(worker_linkclass,worker_linkid,call_stream,file_stream) + Proxy.instance.add_conn(conn) + __handle_pend(conn) + stat = json.loads(data.decode('utf-8')) if stat == True: - file_stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) + file_stream = netio.SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) file_stream.connect(sock_addr,___file_conn_cb) else: call_stream.set_close_callback(None) call_stream.close() - if self.center_conn == None: return None @@ -224,6 +223,7 @@ class BackendWorker(tornado.tcpserver.TCPServer): def _handle_fileconn(self,file_stream,addr,info): try: + print('ok') self._pend_fileconn_linkidmap.pop(info['linkid'])(file_stream) except KeyError: @@ -241,14 +241,11 @@ class BackendWorker(tornado.tcpserver.TCPServer): @imc.async.caller def _test_dst(self,iden,param): - print(auth.current_iden) stat,ret = imc_call(self._idendesc,'/backend/' + self._linkid + '/','test_dsta',param) - print(auth.current_iden) return ret + ' Too' @imc.async.caller def _test_dsta(self,iden,param): - print(auth.current_iden) return param + ' Too' class WebSocketConnHandler(tornado.websocket.WebSocketHandler): diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py index f0c912e..d80aa6b 100755 --- a/src/py/imc/proxy.py +++ b/src/py/imc/proxy.py @@ -29,6 +29,11 @@ class Connection: class Proxy: def __init__(self,linkclass,linkid,auth_instance,conn_linkid_fn = None): + self.MSGTYPE_CALL = 'call' + self.MSGTYPE_RET = 'ret' + self.MSGTYPE_SENDFILE = 'sendfile' + self.MSGTYPE_RECVFILE = 'recvfile' + self._ioloop = tornado.ioloop.IOLoop.instance() self._linkclass = linkclass self._linkid = linkid @@ -43,8 +48,8 @@ class Proxy: self._conn_retidmap = {self._linkid:{}} self._call_pathmap = {} - self.MSGTYPE_CALL = 'call' - self.MSGTYPE_RET = 'ret' + self._conn_send_filekeymap = {self._linkid:{}} + self._conn_recv_filekeymap = {self._linkid:{}} self._check_waitcaller_timer = tornado.ioloop.PeriodicCallback(self._check_waitcaller,1000) self._check_waitcaller_timer.start() @@ -57,6 +62,9 @@ class Proxy: self._conn_linkidmap[conn.linkid] = conn self._conn_retidmap[conn.linkid] = {} + self._conn_send_filekeymap[conn.linkid] = {} + self._conn_recv_filekeymap[conn.linkid] = {} + conn.add_close_callback(self._conn_close_cb) conn.start_recv(self._recv_dispatch) @@ -93,6 +101,9 @@ class Proxy: del self._conn_linkidmap[conn.linkid] del self._conn_retidmap[conn.linkid] + del self._conn_send_filekeymap[conn.linkid] + del self._conn_recv_filekeymap[conn.linkid] + def get_conn(self,linkid): try: return self._conn_linkidmap[linkid] @@ -107,6 +118,26 @@ class Proxy: caller_retid = ''.join([self._linkid,'/',caller_grid]) return self._route_call(self._linkclass,self._linkid,caller_retid,timeout,idendesc,dst,func_name,param) + def sendfile(self,dst_linkid,load_path): + conn = self._request_conn(dst_linkid) + + fd = os.open(load_path,os.O_RDONLY) + filekey = uuid.uuid4() + filesize = os.fstat(fd).st_size + + self._conn_send_filekeymap[conn.linkid][filekey] = { + 'fd':fd, + 'filesize':filesize + } + + self._send_msg_sendfile(conn,filekey,filesize) + + return filekey + + def recvfile(self,filekey,save_path): + + return + def _route_call(self,conn_linkclass,conn_linkid,caller_retid,timeout,idendesc,dst,func_name,param): def __add_wait_caller(conn_linkid): self._conn_retidmap[conn_linkid][caller_retid] = { @@ -198,11 +229,19 @@ class Proxy: def _recv_dispatch(self,conn,data): msg = json.loads(data.decode('utf-8')) msg_type = msg['type'] + if msg_type == self.MSGTYPE_CALL: self._recv_msg_call(conn,msg) + elif msg_type == self.MSGTYPE_RET: self._recv_msg_ret(conn,msg) + elif msg_type == self.MSGTYPE_SENDFILE: + self._recv_msg_sendfile(conn,msg) + + elif msg_type == self.MSGTYPE_RECVFILE: + self._recv_msg_recvfile(conn,msg) + def _conn_close_cb(self,conn): self.del_conn(conn) print('connection close') @@ -267,6 +306,20 @@ class Proxy: self._ret_call(caller_linkid,caller_retid,result) + def _send_msg_sendfile(self,conn,filekey,filesize): + msg = { + 'type':self.MSGTYPE_SENDFILE, + 'filekey':filekey, + 'filesize':filesize + } + + conn.send_msg(bytes(json.dumps(msg),'utf-8')) + + def _recv_msg_sendfile(self,conn,msg): + filekey = msg['filekey'] + filesize = msg['filesize'] + + @async.callee def imc_call(idendesc,dst,func_name,param,_grid): return Proxy.instance.call(_grid,1000,idendesc,dst,func_name,param) diff --git a/src/py/netio.py b/src/py/netio.py index 0e8cab1..0b64682 100644 --- a/src/py/netio.py +++ b/src/py/netio.py @@ -1,4 +1,7 @@ +import os import struct +import socket +from collections import deque import tornado.ioloop import tornado.stack_context @@ -15,6 +18,177 @@ def recv_pack(stream,callback): stream.read_bytes(8,_recv_size) +class SocketStream: + def __init__(self,sock): + self.DATA_BUF = 0 + self.DATA_FILE = 1 + + self._ioloop = tornado.ioloop.IOLoop.current() + self._sock = sock + + self._conning = False + self._conn_callback = None + + self._read_queue = deque() + self._write_queue = deque() + self._stat = tornado.ioloop.IOLoop.ERROR + + self._sock.setblocking(False) + self._ioloop.add_handler(sock.fileno(),self._handle_event,tornado.ioloop.IOLoop.ERROR) + + def connect(self,addr,callback): + try: + self._conning = True + self._conn_callback = tornado.stack_context.wrap(callback) + + self._stat |= tornado.ioloop.IOLoop.WRITE + self._ioloop.update_handler(self._sock.fileno(),self._stat) + + self._sock.connect(addr) + + except BlockingIOError: + pass + + def read_bytes(self,size,callback = None): + self._read_queue.append([self.DATA_BUF,size,bytearray(),tornado.stack_context.wrap(callback)]) + + self._stat |= tornado.ioloop.IOLoop.READ + self._ioloop.update_handler(self._sock.fileno(),self._stat) + + def write(self,buf,callback = None): + self._write_queue.append([self.DATA_BUF,0,buf,tornado.stack_context.wrap(callback)]) + + self._stat |= tornado.ioloop.IOLoop.WRITE + self._ioloop.update_handler(self._sock.fileno(),self._stat) + + def sendfile(self,fd,callback = None): + size = os.fstat(fd).st_size + + self.write(struct.pack('l',size)) + self._write_queue.append([self.DATA_FILE,size,fd,tornado.stack_context.wrap(callback)]) + + self._stat |= tornado.ioloop.IOLoop.WRITE + self._ioloop.update_handler(self._sock.fileno(),self._stat) + + def recvfile(self,fd,callback = None): + def _recv_size(data): + size, = struct.unpack('l',data) + self._read_queue.append([self.DATA_FILE,size,fd,tornado.stack_context.wrap(callback)]) + + self._stat |= tornado.ioloop.IOLoop.READ + self._ioloop.update_handler(self._sock.fileno(),self._stat) + + self.read_bytes(8,_recv_size) + + def _handle_event(self,fd,evt): + if evt & tornado.ioloop.IOLoop.READ: + while len(self._read_queue) > 0: + iocb = self._read_queue[0] + datatype = iocb[0] + + if datatype == self.DATA_BUF: + size = iocb[1] + + try: + while True: + buf = self._sock.recv(size) + + iocb[2].extend(buf) + size -= len(buf) + + if size == 0: + if iocb[3] != None: + iocb[3](iocb[2]) + + self._read_queue.popleft() + break + + except BlockingIOError: + iocb[1] = size + break + + elif datatype == self.DATA_FILE: + size = iocb[1] + + try: + while True: + buf = self._sock.recv(min(size,65536)) + + os.write(iocb[2],buf) + size -= len(buf) + + if size == 0: + if iocb[3] != None: + iocb[3]() + + self._read_queue.popleft() + break + + except BlockingIOError: + iocb[1] = size + break + + if evt & tornado.ioloop.IOLoop.WRITE: + if self._conning == True: + self._conning = False + + if self._conn_callback != None: + self._conn_callback() + + while len(self._write_queue) > 0: + iocb = self._write_queue[0] + datatype = iocb[0] + + if datatype == self.DATA_BUF: + off = iocb[1] + buf = iocb[2] + + try: + while True: + off += self._sock.send(buf[off:]) + + if off == len(buf): + if iocb[3] != None: + iocb[3]() + + self._write_queue.popleft() + break + + except BlockingIOError: + iocb[1] = off + break + + elif datatype == self.DATA_FILE: + size = iocb[1] + filefd = iocb[2] + sockfd = self._sock.fileno() + + try: + while True: + size -= os.sendfile(sockfd,filefd,None,min(size,65536)) + + if size == 0: + if iocb[3] != None: + iocb[3]() + + self._write_queue.popleft() + break + + except BlockingIOError: + iocb[1] = size + break + + stat = tornado.ioloop.IOLoop.ERROR + if len(self._read_queue) > 0: + stat |= tornado.ioloop.IOLoop.READ + + if len(self._write_queue) > 0: + stat |= tornado.ioloop.IOLoop.WRITE + + if stat != self._stat: + self._stat = stat + self._ioloop.update_handler(fd,stat) + class SocketConnection(Connection): def __init__(self,linkclass,linkid,call_stream,file_stream = None): super().__init__(linkclass,linkid) @@ -28,13 +202,32 @@ class SocketConnection(Connection): else: self.file_stream = file_stream - self.file_stream.set_close_callback(self.close) + #self.file_stream.set_close_callback(self.close) self._start_ping() def send_msg(self,data): self.call_stream.write(struct.pack('l',len(data)) + data) + def send_file(self,filekey,load_path): + def _recv_cb(data): + stat = json.loads(data.decode('utf-8')) + + if stat == True: + self.file_stream.sendfile(fd,_done_cb) + + else: + os.close(fd) + + def _done_cb(): + print('done') + + fd = os.open(load_path,O_RDONLY) + filesize = os.fstat(fd).st_size + + send_pack(self.file_stream,bytes(json.dumps({'filekey':filekey,'filesize':filesize}),'utf-8')) + recv_pack(self.file_stream,_recv_cb) + def start_recv(self,recv_callback): def _recv_size(data): size, = struct.unpack('l',data) |