athenad: set a timeout on proxy WebSocket receive (#32336)
* useless * Revert "useless" This reverts commit 28f0bb9e9794d60eefba8063b47d8ca113308008. * this forever hangs you disconnect (or 2 hours) * same timeout as the global websocket * Revert "same timeout as the global websocket" This reverts commit 0bd0cb8a38a3e17960c1fae205311d86a9cf8feb. * setting the timeout affects the entire websocket and disconnects, not just recv timeout * fix that * fix test old-commit-hash: 09aeab3f77727a1e2ba25e1cf14a56882b4e5e29
This commit is contained in:
@@ -657,10 +657,12 @@ def stat_handler(end_event: threading.Event) -> None:
|
||||
def ws_proxy_recv(ws: WebSocket, local_sock: socket.socket, ssock: socket.socket, end_event: threading.Event, global_end_event: threading.Event) -> None:
|
||||
while not (end_event.is_set() or global_end_event.is_set()):
|
||||
try:
|
||||
data = ws.recv()
|
||||
if isinstance(data, str):
|
||||
data = data.encode("utf-8")
|
||||
local_sock.sendall(data)
|
||||
r = select.select((ws.sock,), (), (), 30)
|
||||
if r[0]:
|
||||
data = ws.recv()
|
||||
if isinstance(data, str):
|
||||
data = data.encode("utf-8")
|
||||
local_sock.sendall(data)
|
||||
except WebSocketTimeoutException:
|
||||
pass
|
||||
except Exception:
|
||||
@@ -670,6 +672,7 @@ def ws_proxy_recv(ws: WebSocket, local_sock: socket.socket, ssock: socket.socket
|
||||
cloudlog.debug("athena.ws_proxy_recv closing sockets")
|
||||
ssock.close()
|
||||
local_sock.close()
|
||||
ws.close()
|
||||
cloudlog.debug("athena.ws_proxy_recv done closing sockets")
|
||||
|
||||
end_event.set()
|
||||
|
||||
@@ -43,6 +43,8 @@ class MockApi():
|
||||
|
||||
|
||||
class MockWebsocket():
|
||||
sock = socket.socket()
|
||||
|
||||
def __init__(self, recv_queue, send_queue):
|
||||
self.recv_queue = recv_queue
|
||||
self.send_queue = send_queue
|
||||
@@ -56,6 +58,9 @@ class MockWebsocket():
|
||||
def send(self, data, opcode):
|
||||
self.send_queue.put_nowait((data, opcode))
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
class HTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
|
||||
def do_PUT(self):
|
||||
|
||||
Reference in New Issue
Block a user