edge/system/athena/sunnylinkd.py

251 lines
9.7 KiB
Python
Executable File

#!/usr/bin/env python3
from __future__ import annotations
import base64
import gzip
import os
import ssl
import threading
import time
from openpilot.system.athena.athenad import ws_send, jsonrpc_handler, \
recv_queue, UploadQueueCache, upload_queue, cur_upload_items, backoff, ws_manage, log_handler
from jsonrpc import dispatcher
from websocket import (ABNF, WebSocket, WebSocketException, WebSocketTimeoutException,
create_connection)
from openpilot.common.api import SunnylinkApi
from openpilot.common.params import Params
from openpilot.common.realtime import set_core_affinity
from openpilot.common.swaglog import cloudlog
from openpilot.system.manager.sunnylink import sunnylink_need_register, sunnylink_ready
import cereal.messaging as messaging
SUNNYLINK_ATHENA_HOST = os.getenv('SUNNYLINK_ATHENA_HOST', 'wss://ws.stg.api.sunnypilot.ai')
HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4"))
LOCAL_PORT_WHITELIST = {8022}
SUNNYLINK_LOG_ATTR_NAME = "user.sunny.upload"
SUNNYLINK_RECONNECT_TIMEOUT_S = 70 # FYI changing this will also would require a change on sidebar.cc
DISALLOW_LOG_UPLOAD = threading.Event()
params = Params()
sunnylink_dongle_id = params.get("SunnylinkDongleId", encoding='utf-8')
sunnylink_api = SunnylinkApi(sunnylink_dongle_id)
def handle_long_poll(ws: WebSocket, exit_event: threading.Event | None) -> None:
cloudlog.info("sunnylinkd.handle_long_poll started")
sm = messaging.SubMaster(['deviceState'])
end_event = threading.Event()
comma_prime_cellular_end_event = threading.Event()
threads = [
threading.Thread(target=ws_manage, args=(ws, end_event), name='ws_manage'),
threading.Thread(target=ws_recv, args=(ws, end_event), name='ws_recv'),
threading.Thread(target=ws_send, args=(ws, end_event), name='ws_send'),
threading.Thread(target=ws_ping, args=(ws, end_event), name='ws_ping'),
threading.Thread(target=ws_queue, args=(end_event,), name='ws_queue'),
# threading.Thread(target=upload_handler, args=(end_event,), name='upload_handler'),
# threading.Thread(target=sunny_log_handler, args=(end_event, comma_prime_cellular_end_event), name='log_handler'),
# threading.Thread(target=stat_handler, args=(end_event,), name='stat_handler'),
] + [
threading.Thread(target=jsonrpc_handler, args=(end_event,), name=f'worker_{x}')
for x in range(HANDLER_THREADS)
]
for thread in threads:
thread.start()
try:
while not end_event.wait(0.1):
if not sunnylink_ready(params):
cloudlog.warning("Exiting sunnylinkd.handle_long_poll as SunnylinkEnabled is False")
break
sm.update(0)
if exit_event is not None and exit_event.is_set():
end_event.set()
comma_prime_cellular_end_event.set()
prime_type = params.get("PrimeType", encoding='utf-8') or 0
metered = sm['deviceState'].networkMetered
if DISALLOW_LOG_UPLOAD.is_set() and not comma_prime_cellular_end_event.is_set():
cloudlog.debug(f"sunnylinkd.handle_long_poll: DISALLOW_LOG_UPLOAD, setting comma_prime_cellular_end_event")
comma_prime_cellular_end_event.set()
elif metered and int(prime_type) > 2:
cloudlog.debug(f"sunnylinkd.handle_long_poll: PrimeType({prime_type}) > 2 and networkMetered({metered})")
comma_prime_cellular_end_event.set()
elif comma_prime_cellular_end_event.is_set() and not DISALLOW_LOG_UPLOAD.is_set():
cloudlog.debug(f"sunnylinkd.handle_long_poll: comma_prime_cellular_end_event is set and not PrimeType({prime_type}) > 2 or not networkMetered({metered})")
comma_prime_cellular_end_event.clear()
finally:
end_event.set()
comma_prime_cellular_end_event.set()
for thread in threads:
cloudlog.debug(f"sunnylinkd athena.joining {thread.name}")
thread.join()
cloudlog.debug(f"sunnylinkd athena.joined {thread.name}")
def ws_recv(ws: WebSocket, end_event: threading.Event) -> None:
last_ping = int(time.monotonic() * 1e9)
while not end_event.is_set():
try:
opcode, data = ws.recv_data(control_frame=True)
if opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY):
if opcode == ABNF.OPCODE_TEXT:
data = data.decode("utf-8")
recv_queue.put_nowait(data)
cloudlog.debug(f"sunnylinkd.ws_recv.recv {data}")
elif opcode in (ABNF.OPCODE_PING, ABNF.OPCODE_PONG):
cloudlog.debug(f"sunnylinkd.ws_recv.pong")
last_ping = int(time.monotonic() * 1e9)
Params().put("LastSunnylinkPingTime", str(last_ping))
except WebSocketTimeoutException:
ns_since_last_ping = int(time.monotonic() * 1e9) - last_ping
if ns_since_last_ping > SUNNYLINK_RECONNECT_TIMEOUT_S * 1e9:
cloudlog.exception("sunnylinkd.ws_recv.timeout")
end_event.set()
except Exception:
cloudlog.exception("sunnylinkd.ws_recv.exception")
end_event.set()
def ws_ping(ws: WebSocket, end_event: threading.Event) -> None:
ws.ping() # Send the first ping
while not end_event.wait(SUNNYLINK_RECONNECT_TIMEOUT_S * 0.7): # Sleep about 70% before a timeout
try:
ws.ping()
cloudlog.debug(f"sunnylinkd.ws_recv.ws_ping: Pinging")
except Exception:
cloudlog.exception("sunnylinkd.ws_ping.exception")
end_event.set()
cloudlog.debug(f"sunnylinkd.ws_ping.end_event is set, exiting ws_ping thread")
def ws_queue(end_event: threading.Event) -> None:
resume_requested = False
tries = 0
while not end_event.is_set() and not resume_requested:
try:
if not resume_requested:
cloudlog.debug(f"sunnylinkd.ws_queue.resume_queued")
sunnylink_api.resume_queued(timeout=29)
resume_requested = True
tries = 0
except Exception:
cloudlog.exception("sunnylinkd.ws_queue.resume_queued.exception")
resume_requested = False
tries += 1
time.sleep(backoff(tries)) # Wait for the backoff time before the next attempt
if end_event.is_set():
cloudlog.debug("end_event is set, exiting ws_queue thread")
elif resume_requested:
cloudlog.debug(f"Resume requested to server after {tries} tries")
else:
cloudlog.error(f"Reached end of ws_queue while end_event is not set and resume_requested is {resume_requested}")
def sunny_log_handler(end_event: threading.Event, comma_prime_cellular_end_event: threading.Event) -> None:
while not end_event.wait(0.1):
if not comma_prime_cellular_end_event.is_set():
log_handler(comma_prime_cellular_end_event, SUNNYLINK_LOG_ATTR_NAME)
comma_prime_cellular_end_event.set()
@dispatcher.add_method
def toggleLogUpload(enabled: bool):
DISALLOW_LOG_UPLOAD.clear() if enabled and DISALLOW_LOG_UPLOAD.is_set() else DISALLOW_LOG_UPLOAD.set()
@dispatcher.add_method
def getParamsAllKeys() -> list[str]:
keys: list[str] = [k.decode('utf-8') for k in Params().all_keys()]
return keys
@dispatcher.add_method
def getParams(params_keys: list[str], compression: bool = False) -> str | dict[str, str]:
try:
params = Params()
params_dict: dict[str, bytes] = {key: params.get(key) or b'' for key in params_keys}
# Compress the values before encoding to base64 as output from params.get is bytes and same for compression
if compression:
params_dict = {key: gzip.compress(value) for key, value in params_dict.items()}
# Last step is to encode the values to base64 and decode to utf-8 for JSON serialization
return {key: base64.b64encode(value).decode('utf-8') for key, value in params_dict.items()}
except Exception as e:
return cloudlog.exception("sunnylinkd.getParams.exception", e)
@dispatcher.add_method
def saveParams(params_to_update: dict[str, str], compression: bool = False) -> None:
params = Params()
try:
params_dict = {key: base64.b64decode(value) for key, value in params_to_update.items()}
if compression:
params_dict = {key: gzip.decompress(value) for key, value in params_dict.items()}
for key, value in params_dict.items():
params.put(key, value)
except Exception as e:
return cloudlog.exception("sunnylinkd.saveParams.exception", e)
def main(exit_event: threading.Event = None):
try:
set_core_affinity([0, 1, 2, 3])
except Exception:
cloudlog.exception("failed to set core affinity")
while sunnylink_need_register(params):
cloudlog.info("Waiting for sunnylink registration to complete")
time.sleep(10)
UploadQueueCache.initialize(upload_queue)
ws_uri = f"{SUNNYLINK_ATHENA_HOST}"
conn_start = None
conn_retries = 0
while (exit_event is None or not exit_event.is_set()) and sunnylink_ready(params):
try:
if conn_start is None:
conn_start = time.monotonic()
cloudlog.event("sunnylinkd.main.connecting_ws", ws_uri=ws_uri, retries=conn_retries)
ws = create_connection(ws_uri,
header={"Authorization": f"Bearer {sunnylink_api.get_token()}"},
enable_multithread=True,
sslopt={"cert_reqs": ssl.CERT_NONE if "localhost" in ws_uri else ssl.CERT_REQUIRED},
timeout=SUNNYLINK_RECONNECT_TIMEOUT_S)
cloudlog.event("sunnylinkd.main.connected_ws", ws_uri=ws_uri, retries=conn_retries,
duration=time.monotonic() - conn_start)
conn_start = None
conn_retries = 0
cur_upload_items.clear()
handle_long_poll(ws, exit_event)
except (KeyboardInterrupt, SystemExit):
break
except (ConnectionError, TimeoutError, WebSocketException):
conn_retries += 1
params.remove("LastSunnylinkPingTime")
except Exception:
cloudlog.exception("sunnylinkd.main.exception")
conn_retries += 1
params.remove("LastSunnylinkPingTime")
time.sleep(backoff(conn_retries))
if not sunnylink_ready(params):
cloudlog.debug("Reached end of sunnylinkd.main while sunnylink is not ready. Waiting 60s before retrying")
time.sleep(60)
if __name__ == "__main__":
main()