aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpzread <netfirewall@gmail.com>2013-05-27 01:04:31 +0800
committerpzread <netfirewall@gmail.com>2013-05-27 01:04:31 +0800
commit96e0553b08b7d7daf4e1847a697b51df696f2f89 (patch)
treea12d21bb8af88d537fb3d63203a23ab805edcd74
parent5c2764816a3db7a4a2a8d8099967abf5f9462ffe (diff)
downloadtaiwan-online-judge-96e0553b08b7d7daf4e1847a697b51df696f2f89.tar
taiwan-online-judge-96e0553b08b7d7daf4e1847a697b51df696f2f89.tar.gz
taiwan-online-judge-96e0553b08b7d7daf4e1847a697b51df696f2f89.tar.bz2
taiwan-online-judge-96e0553b08b7d7daf4e1847a697b51df696f2f89.tar.lz
taiwan-online-judge-96e0553b08b7d7daf4e1847a697b51df696f2f89.tar.xz
taiwan-online-judge-96e0553b08b7d7daf4e1847a697b51df696f2f89.tar.zst
taiwan-online-judge-96e0553b08b7d7daf4e1847a697b51df696f2f89.zip
Write SocketStream which support sendfile
-rw-r--r--src/py/backend_server.py15
-rwxr-xr-xsrc/py/imc/proxy.py57
-rw-r--r--src/py/netio.py195
3 files changed, 255 insertions, 12 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py
index cac0e11..f1537c9 100644
--- a/src/py/backend_server.py
+++ b/src/py/backend_server.py
@@ -140,24 +140,23 @@ class BackendWorker(tornado.tcpserver.TCPServer):
def __call_recv_cb(data):
def ___file_conn_cb():
- conn = netio.SocketConnection(worker_linkclass,worker_linkid,call_stream,file_stream)
- Proxy.instance.add_conn(conn)
- __handle_pend(conn)
-
netio.send_pack(file_stream,bytes(json.dumps({
'conntype':'file',
'linkid':self._linkid
}),'utf-8'))
+ conn = netio.SocketConnection(worker_linkclass,worker_linkid,call_stream,file_stream)
+ Proxy.instance.add_conn(conn)
+ __handle_pend(conn)
+
stat = json.loads(data.decode('utf-8'))
if stat == True:
- file_stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
+ file_stream = netio.SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
file_stream.connect(sock_addr,___file_conn_cb)
else:
call_stream.set_close_callback(None)
call_stream.close()
-
if self.center_conn == None:
return None
@@ -224,6 +223,7 @@ class BackendWorker(tornado.tcpserver.TCPServer):
def _handle_fileconn(self,file_stream,addr,info):
try:
+ print('ok')
self._pend_fileconn_linkidmap.pop(info['linkid'])(file_stream)
except KeyError:
@@ -241,14 +241,11 @@ class BackendWorker(tornado.tcpserver.TCPServer):
@imc.async.caller
def _test_dst(self,iden,param):
- print(auth.current_iden)
stat,ret = imc_call(self._idendesc,'/backend/' + self._linkid + '/','test_dsta',param)
- print(auth.current_iden)
return ret + ' Too'
@imc.async.caller
def _test_dsta(self,iden,param):
- print(auth.current_iden)
return param + ' Too'
class WebSocketConnHandler(tornado.websocket.WebSocketHandler):
diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py
index f0c912e..d80aa6b 100755
--- a/src/py/imc/proxy.py
+++ b/src/py/imc/proxy.py
@@ -29,6 +29,11 @@ class Connection:
class Proxy:
def __init__(self,linkclass,linkid,auth_instance,conn_linkid_fn = None):
+ self.MSGTYPE_CALL = 'call'
+ self.MSGTYPE_RET = 'ret'
+ self.MSGTYPE_SENDFILE = 'sendfile'
+ self.MSGTYPE_RECVFILE = 'recvfile'
+
self._ioloop = tornado.ioloop.IOLoop.instance()
self._linkclass = linkclass
self._linkid = linkid
@@ -43,8 +48,8 @@ class Proxy:
self._conn_retidmap = {self._linkid:{}}
self._call_pathmap = {}
- self.MSGTYPE_CALL = 'call'
- self.MSGTYPE_RET = 'ret'
+ self._conn_send_filekeymap = {self._linkid:{}}
+ self._conn_recv_filekeymap = {self._linkid:{}}
self._check_waitcaller_timer = tornado.ioloop.PeriodicCallback(self._check_waitcaller,1000)
self._check_waitcaller_timer.start()
@@ -57,6 +62,9 @@ class Proxy:
self._conn_linkidmap[conn.linkid] = conn
self._conn_retidmap[conn.linkid] = {}
+ self._conn_send_filekeymap[conn.linkid] = {}
+ self._conn_recv_filekeymap[conn.linkid] = {}
+
conn.add_close_callback(self._conn_close_cb)
conn.start_recv(self._recv_dispatch)
@@ -93,6 +101,9 @@ class Proxy:
del self._conn_linkidmap[conn.linkid]
del self._conn_retidmap[conn.linkid]
+ del self._conn_send_filekeymap[conn.linkid]
+ del self._conn_recv_filekeymap[conn.linkid]
+
def get_conn(self,linkid):
try:
return self._conn_linkidmap[linkid]
@@ -107,6 +118,26 @@ class Proxy:
caller_retid = ''.join([self._linkid,'/',caller_grid])
return self._route_call(self._linkclass,self._linkid,caller_retid,timeout,idendesc,dst,func_name,param)
+ def sendfile(self,dst_linkid,load_path):
+ conn = self._request_conn(dst_linkid)
+
+ fd = os.open(load_path,os.O_RDONLY)
+ filekey = uuid.uuid4()
+ filesize = os.fstat(fd).st_size
+
+ self._conn_send_filekeymap[conn.linkid][filekey] = {
+ 'fd':fd,
+ 'filesize':filesize
+ }
+
+ self._send_msg_sendfile(conn,filekey,filesize)
+
+ return filekey
+
+ def recvfile(self,filekey,save_path):
+
+ return
+
def _route_call(self,conn_linkclass,conn_linkid,caller_retid,timeout,idendesc,dst,func_name,param):
def __add_wait_caller(conn_linkid):
self._conn_retidmap[conn_linkid][caller_retid] = {
@@ -198,11 +229,19 @@ class Proxy:
def _recv_dispatch(self,conn,data):
msg = json.loads(data.decode('utf-8'))
msg_type = msg['type']
+
if msg_type == self.MSGTYPE_CALL:
self._recv_msg_call(conn,msg)
+
elif msg_type == self.MSGTYPE_RET:
self._recv_msg_ret(conn,msg)
+ elif msg_type == self.MSGTYPE_SENDFILE:
+ self._recv_msg_sendfile(conn,msg)
+
+ elif msg_type == self.MSGTYPE_RECVFILE:
+ self._recv_msg_recvfile(conn,msg)
+
def _conn_close_cb(self,conn):
self.del_conn(conn)
print('connection close')
@@ -267,6 +306,20 @@ class Proxy:
self._ret_call(caller_linkid,caller_retid,result)
+ def _send_msg_sendfile(self,conn,filekey,filesize):
+ msg = {
+ 'type':self.MSGTYPE_SENDFILE,
+ 'filekey':filekey,
+ 'filesize':filesize
+ }
+
+ conn.send_msg(bytes(json.dumps(msg),'utf-8'))
+
+ def _recv_msg_sendfile(self,conn,msg):
+ filekey = msg['filekey']
+ filesize = msg['filesize']
+
+
@async.callee
def imc_call(idendesc,dst,func_name,param,_grid):
return Proxy.instance.call(_grid,1000,idendesc,dst,func_name,param)
diff --git a/src/py/netio.py b/src/py/netio.py
index 0e8cab1..0b64682 100644
--- a/src/py/netio.py
+++ b/src/py/netio.py
@@ -1,4 +1,7 @@
+import os
import struct
+import socket
+from collections import deque
import tornado.ioloop
import tornado.stack_context
@@ -15,6 +18,177 @@ def recv_pack(stream,callback):
stream.read_bytes(8,_recv_size)
+class SocketStream:
+ def __init__(self,sock):
+ self.DATA_BUF = 0
+ self.DATA_FILE = 1
+
+ self._ioloop = tornado.ioloop.IOLoop.current()
+ self._sock = sock
+
+ self._conning = False
+ self._conn_callback = None
+
+ self._read_queue = deque()
+ self._write_queue = deque()
+ self._stat = tornado.ioloop.IOLoop.ERROR
+
+ self._sock.setblocking(False)
+ self._ioloop.add_handler(sock.fileno(),self._handle_event,tornado.ioloop.IOLoop.ERROR)
+
+ def connect(self,addr,callback):
+ try:
+ self._conning = True
+ self._conn_callback = tornado.stack_context.wrap(callback)
+
+ self._stat |= tornado.ioloop.IOLoop.WRITE
+ self._ioloop.update_handler(self._sock.fileno(),self._stat)
+
+ self._sock.connect(addr)
+
+ except BlockingIOError:
+ pass
+
+ def read_bytes(self,size,callback = None):
+ self._read_queue.append([self.DATA_BUF,size,bytearray(),tornado.stack_context.wrap(callback)])
+
+ self._stat |= tornado.ioloop.IOLoop.READ
+ self._ioloop.update_handler(self._sock.fileno(),self._stat)
+
+ def write(self,buf,callback = None):
+ self._write_queue.append([self.DATA_BUF,0,buf,tornado.stack_context.wrap(callback)])
+
+ self._stat |= tornado.ioloop.IOLoop.WRITE
+ self._ioloop.update_handler(self._sock.fileno(),self._stat)
+
+ def sendfile(self,fd,callback = None):
+ size = os.fstat(fd).st_size
+
+ self.write(struct.pack('l',size))
+ self._write_queue.append([self.DATA_FILE,size,fd,tornado.stack_context.wrap(callback)])
+
+ self._stat |= tornado.ioloop.IOLoop.WRITE
+ self._ioloop.update_handler(self._sock.fileno(),self._stat)
+
+ def recvfile(self,fd,callback = None):
+ def _recv_size(data):
+ size, = struct.unpack('l',data)
+ self._read_queue.append([self.DATA_FILE,size,fd,tornado.stack_context.wrap(callback)])
+
+ self._stat |= tornado.ioloop.IOLoop.READ
+ self._ioloop.update_handler(self._sock.fileno(),self._stat)
+
+ self.read_bytes(8,_recv_size)
+
+ def _handle_event(self,fd,evt):
+ if evt & tornado.ioloop.IOLoop.READ:
+ while len(self._read_queue) > 0:
+ iocb = self._read_queue[0]
+ datatype = iocb[0]
+
+ if datatype == self.DATA_BUF:
+ size = iocb[1]
+
+ try:
+ while True:
+ buf = self._sock.recv(size)
+
+ iocb[2].extend(buf)
+ size -= len(buf)
+
+ if size == 0:
+ if iocb[3] != None:
+ iocb[3](iocb[2])
+
+ self._read_queue.popleft()
+ break
+
+ except BlockingIOError:
+ iocb[1] = size
+ break
+
+ elif datatype == self.DATA_FILE:
+ size = iocb[1]
+
+ try:
+ while True:
+ buf = self._sock.recv(min(size,65536))
+
+ os.write(iocb[2],buf)
+ size -= len(buf)
+
+ if size == 0:
+ if iocb[3] != None:
+ iocb[3]()
+
+ self._read_queue.popleft()
+ break
+
+ except BlockingIOError:
+ iocb[1] = size
+ break
+
+ if evt & tornado.ioloop.IOLoop.WRITE:
+ if self._conning == True:
+ self._conning = False
+
+ if self._conn_callback != None:
+ self._conn_callback()
+
+ while len(self._write_queue) > 0:
+ iocb = self._write_queue[0]
+ datatype = iocb[0]
+
+ if datatype == self.DATA_BUF:
+ off = iocb[1]
+ buf = iocb[2]
+
+ try:
+ while True:
+ off += self._sock.send(buf[off:])
+
+ if off == len(buf):
+ if iocb[3] != None:
+ iocb[3]()
+
+ self._write_queue.popleft()
+ break
+
+ except BlockingIOError:
+ iocb[1] = off
+ break
+
+ elif datatype == self.DATA_FILE:
+ size = iocb[1]
+ filefd = iocb[2]
+ sockfd = self._sock.fileno()
+
+ try:
+ while True:
+ size -= os.sendfile(sockfd,filefd,None,min(size,65536))
+
+ if size == 0:
+ if iocb[3] != None:
+ iocb[3]()
+
+ self._write_queue.popleft()
+ break
+
+ except BlockingIOError:
+ iocb[1] = size
+ break
+
+ stat = tornado.ioloop.IOLoop.ERROR
+ if len(self._read_queue) > 0:
+ stat |= tornado.ioloop.IOLoop.READ
+
+ if len(self._write_queue) > 0:
+ stat |= tornado.ioloop.IOLoop.WRITE
+
+ if stat != self._stat:
+ self._stat = stat
+ self._ioloop.update_handler(fd,stat)
+
class SocketConnection(Connection):
def __init__(self,linkclass,linkid,call_stream,file_stream = None):
super().__init__(linkclass,linkid)
@@ -28,13 +202,32 @@ class SocketConnection(Connection):
else:
self.file_stream = file_stream
- self.file_stream.set_close_callback(self.close)
+ #self.file_stream.set_close_callback(self.close)
self._start_ping()
def send_msg(self,data):
self.call_stream.write(struct.pack('l',len(data)) + data)
+ def send_file(self,filekey,load_path):
+ def _recv_cb(data):
+ stat = json.loads(data.decode('utf-8'))
+
+ if stat == True:
+ self.file_stream.sendfile(fd,_done_cb)
+
+ else:
+ os.close(fd)
+
+ def _done_cb():
+ print('done')
+
+ fd = os.open(load_path,O_RDONLY)
+ filesize = os.fstat(fd).st_size
+
+ send_pack(self.file_stream,bytes(json.dumps({'filekey':filekey,'filesize':filesize}),'utf-8'))
+ recv_pack(self.file_stream,_recv_cb)
+
def start_recv(self,recv_callback):
def _recv_size(data):
size, = struct.unpack('l',data)