Files
sunnypilot/system/ui/lib/wifi_manager.py
Jason Wen d5b25e14fd Merge branch 'upstream/openpilot/master' into sync-20260317
# Conflicts:
#	.github/workflows/auto_pr_review.yaml
#	.gitignore
#	opendbc_repo
#	panda
#	selfdrive/ui/mici/layouts/home.py
#	selfdrive/ui/mici/layouts/onboarding.py
#	selfdrive/ui/mici/layouts/settings/device.py
#	selfdrive/ui/tests/diff/replay.py
#	selfdrive/ui/translations/app_fr.po
#	system/ui/mici_setup.py
Sync: `commaai/opendbc:master` → `sunnypilot/opendbc:master`
Sync: `commaai/panda:master` → `sunnypilot/panda:master`
2026-03-17 23:02:10 -04:00

1029 lines
39 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import atexit
import threading
import time
import uuid
import subprocess
from collections.abc import Callable
from dataclasses import dataclass, replace
from enum import IntEnum
from typing import Any
from jeepney import DBusAddress, new_method_call
from jeepney.bus_messages import MatchRule, message_bus
from jeepney.io.blocking import DBusConnection, open_dbus_connection as open_dbus_connection_blocking
from jeepney.io.threading import DBusRouter, open_dbus_connection as open_dbus_connection_threading
from jeepney.low_level import MessageType
from jeepney.wrappers import Properties
from openpilot.common.swaglog import cloudlog
from openpilot.system.ui.lib.networkmanager import (NM, NM_WIRELESS_IFACE, NM_802_11_AP_SEC_PAIR_WEP40,
NM_802_11_AP_SEC_PAIR_WEP104, NM_802_11_AP_SEC_GROUP_WEP40,
NM_802_11_AP_SEC_GROUP_WEP104, NM_802_11_AP_SEC_KEY_MGMT_PSK,
NM_802_11_AP_SEC_KEY_MGMT_802_1X, NM_802_11_AP_FLAGS_NONE,
NM_802_11_AP_FLAGS_PRIVACY, NM_802_11_AP_FLAGS_WPS,
NM_PATH, NM_IFACE, NM_ACCESS_POINT_IFACE, NM_SETTINGS_PATH,
NM_SETTINGS_IFACE, NM_CONNECTION_IFACE, NM_DEVICE_IFACE,
NM_DEVICE_TYPE_WIFI, NM_DEVICE_TYPE_MODEM, NM_ACTIVE_CONNECTION_IFACE,
NM_IP4_CONFIG_IFACE, NM_PROPERTIES_IFACE, NMDeviceState, NMDeviceStateReason)
try:
from openpilot.common.params import Params
except Exception:
Params = None
TETHERING_IP_ADDRESS = "192.168.43.1"
DEFAULT_TETHERING_PASSWORD = "swagswagcomma"
SIGNAL_QUEUE_SIZE = 10
SCAN_PERIOD_SECONDS = 5
DEBUG = False
_dbus_call_idx = 0
def normalize_ssid(ssid: str) -> str:
return ssid.replace("", "'") # for iPhone hotspots
def _wrap_router(router):
def _wrap(orig):
def wrapper(msg, **kw):
global _dbus_call_idx
_dbus_call_idx += 1
if DEBUG:
h = msg.header.fields
print(f"[DBUS #{_dbus_call_idx}] {h.get(6, '?')} {h.get(3, '?')} {msg.body}")
return orig(msg, **kw)
return wrapper
router.send_and_get_reply = _wrap(router.send_and_get_reply)
router.send = _wrap(router.send)
class SecurityType(IntEnum):
OPEN = 0
WPA = 1
WPA2 = 2
WPA3 = 3
UNSUPPORTED = 4
class MeteredType(IntEnum):
UNKNOWN = 0
YES = 1
NO = 2
def get_security_type(flags: int, wpa_flags: int, rsn_flags: int) -> SecurityType:
wpa_props = wpa_flags | rsn_flags
# obtained by looking at flags of networks in the office as reported by an Android phone
supports_wpa = (NM_802_11_AP_SEC_PAIR_WEP40 | NM_802_11_AP_SEC_PAIR_WEP104 | NM_802_11_AP_SEC_GROUP_WEP40 |
NM_802_11_AP_SEC_GROUP_WEP104 | NM_802_11_AP_SEC_KEY_MGMT_PSK)
if (flags == NM_802_11_AP_FLAGS_NONE) or ((flags & NM_802_11_AP_FLAGS_WPS) and not (wpa_props & supports_wpa)):
return SecurityType.OPEN
elif (flags & NM_802_11_AP_FLAGS_PRIVACY) and (wpa_props & supports_wpa) and not (wpa_props & NM_802_11_AP_SEC_KEY_MGMT_802_1X):
return SecurityType.WPA
else:
cloudlog.warning(f"Unsupported network! flags: {flags}, wpa_flags: {wpa_flags}, rsn_flags: {rsn_flags}")
return SecurityType.UNSUPPORTED
@dataclass(frozen=True)
class Network:
ssid: str
strength: int
security_type: SecurityType
is_tethering: bool
@classmethod
def from_dbus(cls, ssid: str, aps: list["AccessPoint"], is_tethering: bool) -> "Network":
# we only want to show the strongest AP for each Network/SSID
strongest_ap = max(aps, key=lambda ap: ap.strength)
security_type = get_security_type(strongest_ap.flags, strongest_ap.wpa_flags, strongest_ap.rsn_flags)
return cls(
ssid=ssid,
strength=100 if is_tethering else strongest_ap.strength,
security_type=security_type,
is_tethering=is_tethering,
)
@dataclass(frozen=True)
class AccessPoint:
ssid: str
bssid: str
strength: int
flags: int
wpa_flags: int
rsn_flags: int
ap_path: str
@classmethod
def from_dbus(cls, ap_props: dict[str, tuple[str, Any]], ap_path: str) -> "AccessPoint":
ssid = bytes(ap_props['Ssid'][1]).decode("utf-8", "replace")
bssid = str(ap_props['HwAddress'][1])
strength = int(ap_props['Strength'][1])
flags = int(ap_props['Flags'][1])
wpa_flags = int(ap_props['WpaFlags'][1])
rsn_flags = int(ap_props['RsnFlags'][1])
return cls(
ssid=ssid,
bssid=bssid,
strength=strength,
flags=flags,
wpa_flags=wpa_flags,
rsn_flags=rsn_flags,
ap_path=ap_path,
)
class ConnectStatus(IntEnum):
DISCONNECTED = 0
CONNECTING = 1
CONNECTED = 2
@dataclass(frozen=True)
class WifiState:
ssid: str | None = None
status: ConnectStatus = ConnectStatus.DISCONNECTED
class WifiManager:
def __init__(self):
self._networks: list[Network] = [] # an unsorted list of available Networks. a Network can be comprised of multiple APs
self._active = True # used to not run when not in settings
self._exit = False
# DBus connections
try:
self._router_main = DBusRouter(open_dbus_connection_threading(bus="SYSTEM")) # used by scanner / general method calls
_wrap_router(self._router_main)
self._conn_monitor = open_dbus_connection_blocking(bus="SYSTEM") # used by state monitor thread
self._nm = DBusAddress(NM_PATH, bus_name=NM, interface=NM_IFACE)
except FileNotFoundError:
cloudlog.exception("Failed to connect to system D-Bus")
self._router_main = None
self._conn_monitor = None
self._exit = True
# Store wifi device path
self._wifi_device: str | None = None
# State
self._connections: dict[str, str] = {} # ssid -> connection path, updated via NM signals
self._wifi_state: WifiState = WifiState()
self._user_epoch: int = 0
self._ipv4_address: str = ""
self._current_network_metered: MeteredType = MeteredType.UNKNOWN
self._tethering_password: str = ""
self._ipv4_forward = False
self._last_network_scan: float = 0.0
self._callback_queue: list[Callable] = []
self._tethering_ssid = "weedle"
if Params is not None:
dongle_id = Params().get("DongleId")
if dongle_id:
self._tethering_ssid += "-" + dongle_id[:4]
# Callbacks
self._need_auth: list[Callable[[str], None]] = []
self._activated: list[Callable[[], None]] = []
self._forgotten: list[Callable[[str | None], None]] = []
self._networks_updated: list[Callable[[list[Network]], None]] = []
self._disconnected: list[Callable[[], None]] = []
self._scan_lock = threading.Lock()
self._scan_thread = threading.Thread(target=self._network_scanner, daemon=True)
self._state_thread = threading.Thread(target=self._monitor_state, daemon=True)
self._initialize()
atexit.register(self.stop)
def _initialize(self):
def worker():
self._wait_for_wifi_device()
self._init_connections()
if Params is not None and self._tethering_ssid not in self._connections:
self._add_tethering_connection()
self._init_wifi_state()
self._scan_thread.start()
self._state_thread.start()
self._tethering_password = self._get_tethering_password()
cloudlog.debug("WifiManager initialized")
threading.Thread(target=worker, daemon=True).start()
def _init_wifi_state(self, block: bool = True):
def worker():
if self._wifi_device is None:
cloudlog.warning("No WiFi device found")
return
epoch = self._user_epoch
dev_addr = DBusAddress(self._wifi_device, bus_name=NM, interface=NM_DEVICE_IFACE)
dev_state = self._router_main.send_and_get_reply(Properties(dev_addr).get('State')).body[0][1]
ssid: str | None = None
status = ConnectStatus.DISCONNECTED
if NMDeviceState.PREPARE <= dev_state <= NMDeviceState.SECONDARIES and dev_state != NMDeviceState.NEED_AUTH:
status = ConnectStatus.CONNECTING
elif dev_state == NMDeviceState.ACTIVATED:
status = ConnectStatus.CONNECTED
conn_path, _ = self._get_active_wifi_connection()
if conn_path:
ssid = next((s for s, p in self._connections.items() if p == conn_path), None)
# Discard if user acted during DBus calls
if self._user_epoch != epoch:
return
self._wifi_state = WifiState(ssid=ssid, status=status)
if block:
worker()
else:
threading.Thread(target=worker, daemon=True).start()
def add_callbacks(self, need_auth: Callable[[str], None] | None = None,
activated: Callable[[], None] | None = None,
forgotten: Callable[[str], None] | None = None,
networks_updated: Callable[[list[Network]], None] | None = None,
disconnected: Callable[[], None] | None = None):
if need_auth is not None:
self._need_auth.append(need_auth)
if activated is not None:
self._activated.append(activated)
if forgotten is not None:
self._forgotten.append(forgotten)
if networks_updated is not None:
self._networks_updated.append(networks_updated)
if disconnected is not None:
self._disconnected.append(disconnected)
@property
def networks(self) -> list[Network]:
# Sort by connected/connecting, then known, then strength, then alphabetically. This is a pure UI ordering and should not affect underlying state.
return sorted(self._networks, key=lambda n: (n.ssid != self._wifi_state.ssid, not self.is_connection_saved(n.ssid), -n.strength, n.ssid.lower()))
@property
def wifi_state(self) -> WifiState:
return self._wifi_state
@property
def ipv4_address(self) -> str:
return self._ipv4_address
@property
def current_network_metered(self) -> MeteredType:
return self._current_network_metered
@property
def connecting_to_ssid(self) -> str | None:
wifi_state = self._wifi_state
return wifi_state.ssid if wifi_state.status == ConnectStatus.CONNECTING else None
@property
def connected_ssid(self) -> str | None:
wifi_state = self._wifi_state
return wifi_state.ssid if wifi_state.status == ConnectStatus.CONNECTED else None
@property
def tethering_password(self) -> str:
return self._tethering_password
def _set_connecting(self, ssid: str | None):
# Called by user action, or sequentially from state change handler
self._user_epoch += 1
self._wifi_state = WifiState(ssid=ssid, status=ConnectStatus.DISCONNECTED if ssid is None else ConnectStatus.CONNECTING)
def _enqueue_callbacks(self, cbs: list[Callable], *args):
for cb in cbs:
self._callback_queue.append(lambda _cb=cb: _cb(*args))
def process_callbacks(self):
# Call from UI thread to run any pending callbacks
to_run, self._callback_queue = self._callback_queue, []
for cb in to_run:
cb()
def set_active(self, active: bool):
self._active = active
# Update networks and WiFi state (to self-heal) immediately when activating for UI
if active:
self._init_wifi_state(block=False)
self._update_networks(block=False)
def _monitor_state(self):
# Filter for signals
rules = (
MatchRule(
type="signal",
interface=NM_DEVICE_IFACE,
member="StateChanged",
path=self._wifi_device,
),
MatchRule(
type="signal",
interface=NM_SETTINGS_IFACE,
member="NewConnection",
path=NM_SETTINGS_PATH,
),
MatchRule(
type="signal",
interface=NM_SETTINGS_IFACE,
member="ConnectionRemoved",
path=NM_SETTINGS_PATH,
),
MatchRule(
type="signal",
interface=NM_PROPERTIES_IFACE,
member="PropertiesChanged",
path=self._wifi_device,
),
)
for rule in rules:
self._conn_monitor.send_and_get_reply(message_bus.AddMatch(rule))
with (self._conn_monitor.filter(rules[0], bufsize=SIGNAL_QUEUE_SIZE) as state_q,
self._conn_monitor.filter(rules[1], bufsize=SIGNAL_QUEUE_SIZE) as new_conn_q,
self._conn_monitor.filter(rules[2], bufsize=SIGNAL_QUEUE_SIZE) as removed_conn_q,
self._conn_monitor.filter(rules[3], bufsize=SIGNAL_QUEUE_SIZE) as props_q):
while not self._exit:
try:
self._conn_monitor.recv_messages(timeout=1)
except TimeoutError:
continue
# Connection added/removed
while len(removed_conn_q):
conn_path = removed_conn_q.popleft().body[0]
self._connection_removed(conn_path)
while len(new_conn_q):
conn_path = new_conn_q.popleft().body[0]
self._new_connection(conn_path)
# PropertiesChanged on wifi device (LastScan = scan complete)
while len(props_q):
iface, changed, _ = props_q.popleft().body
if iface == NM_WIRELESS_IFACE and 'LastScan' in changed:
self._update_networks()
# Device state changes
while len(state_q):
new_state, previous_state, change_reason = state_q.popleft().body
self._handle_state_change(new_state, previous_state, change_reason)
def _handle_state_change(self, new_state: int, prev_state: int, change_reason: int):
# Thread safety: _wifi_state is read/written by both the monitor thread (this handler)
# and the main thread (_set_connecting via connect/activate). PREPARE/CONFIG and ACTIVATED
# have a read-then-write pattern with a slow DBus call in between — if _set_connecting
# runs mid-call, the handler would overwrite the user's newer state with stale data.
#
# The _user_epoch counter solves this without locks. _set_connecting increments the epoch
# on every user action. Handlers snapshot the epoch before their DBus call and compare
# after: if it changed, a user action occurred during the call and the stale result is
# discarded. Combined with deterministic fixes (skip DBus lookup when ssid already set,
# DEACTIVATING clears CONNECTED on CONNECTION_REMOVED, CONNECTION_REMOVED guard),
# all known race windows are closed.
# TODO: Handle (FAILED, SSID_NOT_FOUND) and emit for UI to show error
# Happens when network drops off after starting connection
if new_state == NMDeviceState.DISCONNECTED:
if change_reason == NMDeviceStateReason.NEW_ACTIVATION:
return
# Guard: forget A while connecting to B fires CONNECTION_REMOVED. Don't clear B's state
# if B is still a known connection. If B hasn't arrived in _connections yet (late
# NewConnection), state clears here but PREPARE recovers via DBus lookup.
if (change_reason == NMDeviceStateReason.CONNECTION_REMOVED and self._wifi_state.ssid and
self._wifi_state.ssid in self._connections):
return
self._set_connecting(None)
elif new_state in (NMDeviceState.PREPARE, NMDeviceState.CONFIG):
epoch = self._user_epoch
if self._wifi_state.ssid is not None:
self._wifi_state = replace(self._wifi_state, status=ConnectStatus.CONNECTING)
return
# Auto-connection when NetworkManager connects to known networks on its own (ssid=None): look up ssid from NM
wifi_state = replace(self._wifi_state, status=ConnectStatus.CONNECTING)
conn_path, _ = self._get_active_wifi_connection(self._conn_monitor)
# Discard if user acted during DBus call
if self._user_epoch != epoch:
return
if conn_path is None:
cloudlog.warning("Failed to get active wifi connection during PREPARE/CONFIG state")
else:
wifi_state = replace(wifi_state, ssid=next((s for s, p in self._connections.items() if p == conn_path), None))
self._wifi_state = wifi_state
# BAD PASSWORD
# - strong network rejects with NEED_AUTH+SUPPLICANT_DISCONNECT
# - weak/gone network fails with FAILED+NO_SECRETS
# TODO: sometimes on PC it's observed no future signals are fired if mouse is held down blocking wrong password dialog
elif ((new_state == NMDeviceState.NEED_AUTH and change_reason == NMDeviceStateReason.SUPPLICANT_DISCONNECT
and prev_state == NMDeviceState.CONFIG) or
(new_state == NMDeviceState.FAILED and change_reason == NMDeviceStateReason.NO_SECRETS)):
# prev_state guard: real auth failures come from CONFIG (supplicant handshake).
# Stale NEED_AUTH from a prior connection during network switching arrives with
# prev_state=DISCONNECTED and must be ignored to avoid a false wrong-password callback.
if self._wifi_state.ssid:
self._enqueue_callbacks(self._need_auth, self._wifi_state.ssid)
self._set_connecting(None)
elif new_state in (NMDeviceState.NEED_AUTH, NMDeviceState.IP_CONFIG, NMDeviceState.IP_CHECK,
NMDeviceState.SECONDARIES, NMDeviceState.FAILED):
pass
elif new_state == NMDeviceState.ACTIVATED:
# Note that IP address from Ip4Config may not be propagated immediately and could take until the next scan results
epoch = self._user_epoch
wifi_state = replace(self._wifi_state, status=ConnectStatus.CONNECTED)
conn_path, _ = self._get_active_wifi_connection(self._conn_monitor)
# Discard if user acted during DBus call
if self._user_epoch != epoch:
return
if conn_path is None:
cloudlog.warning("Failed to get active wifi connection during ACTIVATED state")
else:
wifi_state = replace(wifi_state, ssid=next((s for s, p in self._connections.items() if p == conn_path), None))
self._wifi_state = wifi_state
self._enqueue_callbacks(self._activated)
self._update_active_connection_info()
# Persist volatile connections (created by AddAndActivateConnection2) to disk
if conn_path is not None:
conn_addr = DBusAddress(conn_path, bus_name=NM, interface=NM_CONNECTION_IFACE)
save_reply = self._conn_monitor.send_and_get_reply(new_method_call(conn_addr, 'Save'))
if save_reply.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to persist connection to disk: {save_reply}")
elif new_state == NMDeviceState.DEACTIVATING:
# Must clear state when forgetting the currently connected network so the UI
# doesn't flash "connected" after the eager "forgetting..." state resets
# (the forgotten callback fires between DEACTIVATING and DISCONNECTED).
# Only clear CONNECTED — CONNECTING must be preserved for forget-A-connect-B.
if change_reason == NMDeviceStateReason.CONNECTION_REMOVED and self._wifi_state.status == ConnectStatus.CONNECTED:
self._set_connecting(None)
def _network_scanner(self):
while not self._exit:
if self._active:
if time.monotonic() - self._last_network_scan > SCAN_PERIOD_SECONDS:
self._request_scan()
self._last_network_scan = time.monotonic()
time.sleep(1 / 2.)
def _wait_for_wifi_device(self):
while not self._exit:
device_path = self._get_adapter(NM_DEVICE_TYPE_WIFI)
if device_path is not None:
self._wifi_device = device_path
break
time.sleep(1)
def _get_adapter(self, adapter_type: int) -> str | None:
# Return the first NetworkManager device path matching adapter_type
try:
device_paths = self._router_main.send_and_get_reply(new_method_call(self._nm, 'GetDevices')).body[0]
for device_path in device_paths:
dev_addr = DBusAddress(device_path, bus_name=NM, interface=NM_DEVICE_IFACE)
dev_type = self._router_main.send_and_get_reply(Properties(dev_addr).get('DeviceType')).body[0][1]
if dev_type == adapter_type:
return str(device_path)
except Exception as e:
cloudlog.exception(f"Error getting adapter type {adapter_type}: {e}")
return None
def _init_connections(self) -> None:
settings_addr = DBusAddress(NM_SETTINGS_PATH, bus_name=NM, interface=NM_SETTINGS_IFACE)
known_connections = self._router_main.send_and_get_reply(new_method_call(settings_addr, 'ListConnections')).body[0]
conns: dict[str, str] = {}
for conn_path in known_connections:
settings = self._get_connection_settings(conn_path)
if len(settings) == 0:
cloudlog.warning(f'Failed to get connection settings for {conn_path}')
continue
if "802-11-wireless" in settings:
ssid = settings['802-11-wireless']['ssid'][1].decode("utf-8", "replace")
if ssid != "":
conns[ssid] = conn_path
self._connections = conns
def _new_connection(self, conn_path: str):
settings = self._get_connection_settings(conn_path)
if "802-11-wireless" in settings:
ssid = settings['802-11-wireless']['ssid'][1].decode("utf-8", "replace")
if ssid != "":
self._connections[ssid] = conn_path
def _connection_removed(self, conn_path: str):
self._connections = {ssid: path for ssid, path in self._connections.items() if path != conn_path}
def _get_active_connections(self, router: DBusConnection | DBusRouter | None = None):
# Returns list of ActiveConnection
if router is None:
router = self._router_main
return router.send_and_get_reply(Properties(self._nm).get('ActiveConnections')).body[0][1]
def _get_active_wifi_connection(self, router: DBusConnection | DBusRouter | None = None) -> tuple[str | None, dict | None]:
# Returns first Connection settings path and ActiveConnection props from ActiveConnections with Type 802-11-wireless
if router is None:
router = self._router_main
for active_conn in self._get_active_connections(router):
conn_addr = DBusAddress(active_conn, bus_name=NM, interface=NM_ACTIVE_CONNECTION_IFACE)
reply = router.send_and_get_reply(Properties(conn_addr).get_all())
if reply.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to get active connection properties for {active_conn}: {reply}")
continue
props = reply.body[0]
conn_path = props.get('Connection', ('o', '/'))[1]
if props.get('Type', ('s', ''))[1] == '802-11-wireless' and conn_path != '/':
return conn_path, props
return None, None
def _get_connection_settings(self, conn_path: str) -> dict:
conn_addr = DBusAddress(conn_path, bus_name=NM, interface=NM_CONNECTION_IFACE)
reply = self._router_main.send_and_get_reply(new_method_call(conn_addr, 'GetSettings'))
if reply.header.message_type == MessageType.error:
cloudlog.warning(f'Failed to get connection settings: {reply}')
return {}
return dict(reply.body[0])
def _add_tethering_connection(self):
connection = {
'connection': {
'type': ('s', '802-11-wireless'),
'uuid': ('s', str(uuid.uuid4())),
'id': ('s', 'Hotspot'),
'autoconnect-retries': ('i', 0),
'interface-name': ('s', 'wlan0'),
'autoconnect': ('b', False),
},
'802-11-wireless': {
'band': ('s', 'bg'),
'mode': ('s', 'ap'),
'ssid': ('ay', self._tethering_ssid.encode("utf-8")),
},
'802-11-wireless-security': {
'group': ('as', ['ccmp']),
'key-mgmt': ('s', 'wpa-psk'),
'pairwise': ('as', ['ccmp']),
'proto': ('as', ['rsn']),
'psk': ('s', DEFAULT_TETHERING_PASSWORD),
},
'ipv4': {
'method': ('s', 'shared'),
'address-data': ('aa{sv}', [[
('address', ('s', TETHERING_IP_ADDRESS)),
('prefix', ('u', 24)),
]]),
'gateway': ('s', TETHERING_IP_ADDRESS),
'never-default': ('b', True),
},
'ipv6': {'method': ('s', 'ignore')},
}
settings_addr = DBusAddress(NM_SETTINGS_PATH, bus_name=NM, interface=NM_SETTINGS_IFACE)
self._router_main.send_and_get_reply(new_method_call(settings_addr, 'AddConnection', 'a{sa{sv}}', (connection,)))
def connect_to_network(self, ssid: str, password: str, hidden: bool = False):
self._set_connecting(ssid)
def worker():
# Clear all connections that may already exist to the network we are connecting to
self.forget_connection(ssid, block=True)
connection = {
'connection': {
'type': ('s', '802-11-wireless'),
'uuid': ('s', str(uuid.uuid4())),
'id': ('s', f'sunnypilot connection {ssid}'),
'autoconnect-retries': ('i', 0),
},
'802-11-wireless': {
'ssid': ('ay', ssid.encode("utf-8")),
'hidden': ('b', hidden),
'mode': ('s', 'infrastructure'),
},
'ipv4': {
'method': ('s', 'auto'),
'dns-priority': ('i', 600),
},
'ipv6': {'method': ('s', 'ignore')},
}
if password:
connection['802-11-wireless-security'] = {
'key-mgmt': ('s', 'wpa-psk'),
'auth-alg': ('s', 'open'),
'psk': ('s', password),
}
# Volatile connection auto-deletes on disconnect (wrong password, user switches networks)
# Persisted to disk on ACTIVATED via Save()
if self._wifi_device is None:
cloudlog.warning("No WiFi device found")
# TODO: expose a failed connection state in the UI
self._init_wifi_state()
return
reply = self._router_main.send_and_get_reply(new_method_call(self._nm, 'AddAndActivateConnection2', 'a{sa{sv}}ooa{sv}',
(connection, self._wifi_device, "/", {'persist': ('s', 'volatile')})))
if reply.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to add and activate connection for {ssid}: {reply}")
# TODO: expose a failed connection state in the UI
self._init_wifi_state()
threading.Thread(target=worker, daemon=True).start()
def forget_connection(self, ssid: str, block: bool = False):
def worker():
conn_path = self._connections.get(ssid, None)
if conn_path is None:
cloudlog.warning(f"Trying to forget unknown connection: {ssid}")
else:
conn_addr = DBusAddress(conn_path, bus_name=NM, interface=NM_CONNECTION_IFACE)
self._router_main.send_and_get_reply(new_method_call(conn_addr, 'Delete'))
self._enqueue_callbacks(self._forgotten, ssid)
if block:
worker()
else:
threading.Thread(target=worker, daemon=True).start()
def activate_connection(self, ssid: str, block: bool = False):
self._set_connecting(ssid)
def worker():
conn_path = self._connections.get(ssid, None)
if conn_path is None or self._wifi_device is None:
cloudlog.warning(f"Failed to activate connection for {ssid}: conn_path={conn_path}, wifi_device={self._wifi_device}")
# TODO: expose a failed connection state in the UI
self._init_wifi_state()
return
reply = self._router_main.send_and_get_reply(new_method_call(self._nm, 'ActivateConnection', 'ooo',
(conn_path, self._wifi_device, "/")))
if reply.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to activate connection for {ssid}: {reply}")
# TODO: expose a failed connection state in the UI
self._init_wifi_state()
if block:
worker()
else:
threading.Thread(target=worker, daemon=True).start()
def _deactivate_connection(self, ssid: str):
for active_conn in self._get_active_connections():
conn_addr = DBusAddress(active_conn, bus_name=NM, interface=NM_ACTIVE_CONNECTION_IFACE)
reply = self._router_main.send_and_get_reply(Properties(conn_addr).get('SpecificObject'))
if reply.header.message_type == MessageType.error:
continue # object gone (e.g. rapid connect/disconnect)
specific_obj_path = reply.body[0][1]
if specific_obj_path != "/":
ap_addr = DBusAddress(specific_obj_path, bus_name=NM, interface=NM_ACCESS_POINT_IFACE)
ap_reply = self._router_main.send_and_get_reply(Properties(ap_addr).get('Ssid'))
if ap_reply.header.message_type == MessageType.error:
continue # AP gone (e.g. mode switch)
ap_ssid = bytes(ap_reply.body[0][1]).decode("utf-8", "replace")
if ap_ssid == ssid:
self._router_main.send_and_get_reply(new_method_call(self._nm, 'DeactivateConnection', 'o', (active_conn,)))
return
def is_tethering_active(self) -> bool:
# Check ssid, not connected_ssid, to also catch connecting state
return self._wifi_state.ssid == self._tethering_ssid
def is_connection_saved(self, ssid: str) -> bool:
return ssid in self._connections
def set_tethering_password(self, password: str):
def worker():
conn_path = self._connections.get(self._tethering_ssid, None)
if conn_path is None:
cloudlog.warning('No tethering connection found')
return
settings = self._get_connection_settings(conn_path)
if len(settings) == 0:
cloudlog.warning(f'Failed to get tethering settings for {conn_path}')
return
settings['802-11-wireless-security']['psk'] = ('s', password)
conn_addr = DBusAddress(conn_path, bus_name=NM, interface=NM_CONNECTION_IFACE)
reply = self._router_main.send_and_get_reply(new_method_call(conn_addr, 'Update', 'a{sa{sv}}', (settings,)))
if reply.header.message_type == MessageType.error:
cloudlog.warning(f'Failed to update tethering settings: {reply}')
return
self._tethering_password = password
if self.is_tethering_active():
self.activate_connection(self._tethering_ssid, block=True)
threading.Thread(target=worker, daemon=True).start()
def _get_tethering_password(self) -> str:
conn_path = self._connections.get(self._tethering_ssid, None)
if conn_path is None:
cloudlog.warning('No tethering connection found')
return ''
reply = self._router_main.send_and_get_reply(new_method_call(
DBusAddress(conn_path, bus_name=NM, interface=NM_CONNECTION_IFACE),
'GetSecrets', 's', ('802-11-wireless-security',)
))
if reply.header.message_type == MessageType.error:
cloudlog.warning(f'Failed to get tethering password: {reply}')
return ''
secrets = reply.body[0]
if '802-11-wireless-security' not in secrets:
return ''
return str(secrets['802-11-wireless-security'].get('psk', ('s', ''))[1])
def set_ipv4_forward(self, enabled: bool):
self._ipv4_forward = enabled
def set_tethering_active(self, active: bool):
def worker():
if active:
self.activate_connection(self._tethering_ssid, block=True)
if not self._ipv4_forward:
time.sleep(5)
cloudlog.warning("net.ipv4.ip_forward = 0")
subprocess.run(["sudo", "sysctl", "net.ipv4.ip_forward=0"], check=False)
else:
self._deactivate_connection(self._tethering_ssid)
threading.Thread(target=worker, daemon=True).start()
def set_current_network_metered(self, metered: MeteredType):
def worker():
if self.is_tethering_active():
return
conn_path, _ = self._get_active_wifi_connection()
if conn_path is None:
cloudlog.warning('No active WiFi connection found')
return
settings = self._get_connection_settings(conn_path)
if len(settings) == 0:
cloudlog.warning(f'Failed to get connection settings for {conn_path}')
return
settings['connection']['metered'] = ('i', int(metered))
conn_addr = DBusAddress(conn_path, bus_name=NM, interface=NM_CONNECTION_IFACE)
reply = self._router_main.send_and_get_reply(new_method_call(conn_addr, 'Update', 'a{sa{sv}}', (settings,)))
if reply.header.message_type == MessageType.error:
cloudlog.warning(f'Failed to update metered settings: {reply}')
threading.Thread(target=worker, daemon=True).start()
def _request_scan(self):
if self._wifi_device is None:
cloudlog.warning("No WiFi device found")
return
wifi_addr = DBusAddress(self._wifi_device, bus_name=NM, interface=NM_WIRELESS_IFACE)
reply = self._router_main.send_and_get_reply(new_method_call(wifi_addr, 'RequestScan', 'a{sv}', ({},)))
if reply.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to request scan: {reply}")
def _update_networks(self, block: bool = True):
if not self._active:
return
def worker():
with self._scan_lock:
if self._wifi_device is None:
cloudlog.warning("No WiFi device found")
return
# NOTE: AccessPoints property may exclude hidden APs (use GetAllAccessPoints method if needed)
wifi_addr = DBusAddress(self._wifi_device, NM, interface=NM_WIRELESS_IFACE)
wifi_props_reply = self._router_main.send_and_get_reply(Properties(wifi_addr).get_all())
if wifi_props_reply.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to get WiFi properties: {wifi_props_reply}")
return
ap_paths = wifi_props_reply.body[0].get('AccessPoints', ('ao', []))[1]
aps: dict[str, list[AccessPoint]] = {}
for ap_path in ap_paths:
ap_addr = DBusAddress(ap_path, NM, interface=NM_ACCESS_POINT_IFACE)
ap_props = self._router_main.send_and_get_reply(Properties(ap_addr).get_all())
# some APs have been seen dropping off during iteration
if ap_props.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to get AP properties for {ap_path}")
continue
try:
ap = AccessPoint.from_dbus(ap_props.body[0], ap_path)
if ap.ssid == "":
continue
if ap.ssid not in aps:
aps[ap.ssid] = []
aps[ap.ssid].append(ap)
except Exception:
# catch all for parsing errors
cloudlog.exception(f"Failed to parse AP properties for {ap_path}")
self._networks = [Network.from_dbus(ssid, ap_list, ssid == self._tethering_ssid) for ssid, ap_list in aps.items()]
self._update_active_connection_info()
self._enqueue_callbacks(self._networks_updated, self.networks) # sorted
if block:
worker()
else:
threading.Thread(target=worker, daemon=True).start()
def _update_active_connection_info(self):
ipv4_address = ""
metered = MeteredType.UNKNOWN
conn_path, props = self._get_active_wifi_connection()
if conn_path is not None and props is not None:
# IPv4 address
ip4config_path = props.get('Ip4Config', ('o', '/'))[1]
if ip4config_path != "/":
ip4config_addr = DBusAddress(ip4config_path, bus_name=NM, interface=NM_IP4_CONFIG_IFACE)
address_data = self._router_main.send_and_get_reply(Properties(ip4config_addr).get('AddressData')).body[0][1]
for entry in address_data:
if 'address' in entry:
ipv4_address = entry['address'][1]
break
# Metered status
settings = self._get_connection_settings(conn_path)
if len(settings) > 0:
metered_prop = settings['connection'].get('metered', ('i', 0))[1]
if metered_prop == MeteredType.YES:
metered = MeteredType.YES
elif metered_prop == MeteredType.NO:
metered = MeteredType.NO
self._ipv4_address = ipv4_address
self._current_network_metered = metered
def __del__(self):
self.stop()
def update_gsm_settings(self, roaming: bool, apn: str, metered: bool):
"""Update GSM settings for cellular connection"""
def worker():
try:
lte_connection_path = self._get_lte_connection_path()
if not lte_connection_path:
cloudlog.warning("No LTE connection found")
return
settings = self._get_connection_settings(lte_connection_path)
if len(settings) == 0:
cloudlog.warning(f"Failed to get connection settings for {lte_connection_path}")
return
# Ensure dicts exist
if 'gsm' not in settings:
settings['gsm'] = {}
if 'connection' not in settings:
settings['connection'] = {}
changes = False
auto_config = apn == ""
if settings['gsm'].get('auto-config', ('b', False))[1] != auto_config:
cloudlog.warning(f'Changing gsm.auto-config to {auto_config}')
settings['gsm']['auto-config'] = ('b', auto_config)
changes = True
if settings['gsm'].get('apn', ('s', ''))[1] != apn:
cloudlog.warning(f'Changing gsm.apn to {apn}')
settings['gsm']['apn'] = ('s', apn)
changes = True
if settings['gsm'].get('home-only', ('b', False))[1] == roaming:
cloudlog.warning(f'Changing gsm.home-only to {not roaming}')
settings['gsm']['home-only'] = ('b', not roaming)
changes = True
# Unknown means NetworkManager decides
metered_int = int(MeteredType.UNKNOWN if metered else MeteredType.NO)
if settings['connection'].get('metered', ('i', 0))[1] != metered_int:
cloudlog.warning(f'Changing connection.metered to {metered_int}')
settings['connection']['metered'] = ('i', metered_int)
changes = True
if changes:
# Update the connection settings (temporary update)
conn_addr = DBusAddress(lte_connection_path, bus_name=NM, interface=NM_CONNECTION_IFACE)
reply = self._router_main.send_and_get_reply(new_method_call(conn_addr, 'UpdateUnsaved', 'a{sa{sv}}', (settings,)))
if reply.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to update GSM settings: {reply}")
return
self._activate_modem_connection(lte_connection_path)
except Exception as e:
cloudlog.exception(f"Error updating GSM settings: {e}")
threading.Thread(target=worker, daemon=True).start()
def _get_lte_connection_path(self) -> str | None:
try:
settings_addr = DBusAddress(NM_SETTINGS_PATH, bus_name=NM, interface=NM_SETTINGS_IFACE)
known_connections = self._router_main.send_and_get_reply(new_method_call(settings_addr, 'ListConnections')).body[0]
for conn_path in known_connections:
settings = self._get_connection_settings(conn_path)
if settings and settings.get('connection', {}).get('id', ('s', ''))[1] == 'lte':
return str(conn_path)
except Exception as e:
cloudlog.exception(f"Error finding LTE connection: {e}")
return None
def _activate_modem_connection(self, connection_path: str):
try:
modem_device = self._get_adapter(NM_DEVICE_TYPE_MODEM)
if modem_device and connection_path:
self._router_main.send_and_get_reply(new_method_call(self._nm, 'ActivateConnection', 'ooo', (connection_path, modem_device, "/")))
except Exception as e:
cloudlog.exception(f"Error activating modem connection: {e}")
def stop(self):
if not self._exit:
self._exit = True
if self._scan_thread.is_alive():
self._scan_thread.join()
if self._state_thread.is_alive():
self._state_thread.join()
if self._router_main is not None:
self._router_main.close()
self._router_main.conn.close()
if self._conn_monitor is not None:
self._conn_monitor.close()