170 lines
4.8 KiB
Python
170 lines
4.8 KiB
Python
|
import os
|
||
|
import sys
|
||
|
from dataclasses import dataclass, fields
|
||
|
from subprocess import check_output, CalledProcessError
|
||
|
from time import sleep
|
||
|
from typing import NoReturn
|
||
|
|
||
|
DEBUG = int(os.environ.get("DEBUG", "0"))
|
||
|
|
||
|
@dataclass
|
||
|
class GnssClockNmeaPort:
|
||
|
# flags bit mask:
|
||
|
# 0x01 = leap_seconds valid
|
||
|
# 0x02 = time_uncertainty_ns valid
|
||
|
# 0x04 = full_bias_ns valid
|
||
|
# 0x08 = bias_ns valid
|
||
|
# 0x10 = bias_uncertainty_ns valid
|
||
|
# 0x20 = drift_nsps valid
|
||
|
# 0x40 = drift_uncertainty_nsps valid
|
||
|
flags: int
|
||
|
leap_seconds: int
|
||
|
time_ns: int
|
||
|
time_uncertainty_ns: int # 1-sigma
|
||
|
full_bias_ns: int
|
||
|
bias_ns: float
|
||
|
bias_uncertainty_ns: float # 1-sigma
|
||
|
drift_nsps: float
|
||
|
drift_uncertainty_nsps: float # 1-sigma
|
||
|
|
||
|
def __post_init__(self):
|
||
|
for field in fields(self):
|
||
|
val = getattr(self, field.name)
|
||
|
setattr(self, field.name, field.type(val) if val else None)
|
||
|
|
||
|
@dataclass
|
||
|
class GnssMeasNmeaPort:
|
||
|
messageCount: int
|
||
|
messageNum: int
|
||
|
svCount: int
|
||
|
# constellation enum:
|
||
|
# 1 = GPS
|
||
|
# 2 = SBAS
|
||
|
# 3 = GLONASS
|
||
|
# 4 = QZSS
|
||
|
# 5 = BEIDOU
|
||
|
# 6 = GALILEO
|
||
|
constellation: int
|
||
|
svId: int
|
||
|
flags: int # always zero
|
||
|
time_offset_ns: int
|
||
|
# state bit mask:
|
||
|
# 0x0001 = CODE LOCK
|
||
|
# 0x0002 = BIT SYNC
|
||
|
# 0x0004 = SUBFRAME SYNC
|
||
|
# 0x0008 = TIME OF WEEK DECODED
|
||
|
# 0x0010 = MSEC AMBIGUOUS
|
||
|
# 0x0020 = SYMBOL SYNC
|
||
|
# 0x0040 = GLONASS STRING SYNC
|
||
|
# 0x0080 = GLONASS TIME OF DAY DECODED
|
||
|
# 0x0100 = BEIDOU D2 BIT SYNC
|
||
|
# 0x0200 = BEIDOU D2 SUBFRAME SYNC
|
||
|
# 0x0400 = GALILEO E1BC CODE LOCK
|
||
|
# 0x0800 = GALILEO E1C 2ND CODE LOCK
|
||
|
# 0x1000 = GALILEO E1B PAGE SYNC
|
||
|
# 0x2000 = GALILEO E1B PAGE SYNC
|
||
|
state: int
|
||
|
time_of_week_ns: int
|
||
|
time_of_week_uncertainty_ns: int # 1-sigma
|
||
|
carrier_to_noise_ratio: float
|
||
|
pseudorange_rate: float
|
||
|
pseudorange_rate_uncertainty: float # 1-sigma
|
||
|
|
||
|
def __post_init__(self):
|
||
|
for field in fields(self):
|
||
|
val = getattr(self, field.name)
|
||
|
setattr(self, field.name, field.type(val) if val else None)
|
||
|
|
||
|
def nmea_checksum_ok(s):
|
||
|
checksum = 0
|
||
|
for i, c in enumerate(s[1:]):
|
||
|
if c == "*":
|
||
|
if i != len(s) - 4: # should be 3rd to last character
|
||
|
print("ERROR: NMEA string does not have checksum delimiter in correct location:", s)
|
||
|
return False
|
||
|
break
|
||
|
checksum ^= ord(c)
|
||
|
else:
|
||
|
print("ERROR: NMEA string does not have checksum delimiter:", s)
|
||
|
return False
|
||
|
|
||
|
return True
|
||
|
|
||
|
def process_nmea_port_messages(device:str="/dev/ttyUSB1") -> NoReturn:
|
||
|
while True:
|
||
|
try:
|
||
|
with open(device) as nmeaport:
|
||
|
for line in nmeaport:
|
||
|
line = line.strip()
|
||
|
if DEBUG:
|
||
|
print(line)
|
||
|
if not line.startswith("$"): # all NMEA messages start with $
|
||
|
continue
|
||
|
if not nmea_checksum_ok(line):
|
||
|
continue
|
||
|
|
||
|
fields = line.split(",")
|
||
|
match fields[0]:
|
||
|
case "$GNCLK":
|
||
|
# fields at end are reserved (not used)
|
||
|
gnss_clock = GnssClockNmeaPort(*fields[1:10]) # type: ignore[arg-type]
|
||
|
print(gnss_clock)
|
||
|
case "$GNMEAS":
|
||
|
# fields at end are reserved (not used)
|
||
|
gnss_meas = GnssMeasNmeaPort(*fields[1:14]) # type: ignore[arg-type]
|
||
|
print(gnss_meas)
|
||
|
except Exception as e:
|
||
|
print(e)
|
||
|
sleep(1)
|
||
|
|
||
|
def main() -> NoReturn:
|
||
|
from openpilot.common.gpio import gpio_init, gpio_set
|
||
|
from openpilot.system.hardware.tici.pins import GPIO
|
||
|
from openpilot.system.qcomgpsd.qcomgpsd import at_cmd
|
||
|
|
||
|
try:
|
||
|
check_output(["pidof", "qcomgpsd"])
|
||
|
print("qcomgpsd is running, please kill openpilot before running this script! (aborted)")
|
||
|
sys.exit(1)
|
||
|
except CalledProcessError as e:
|
||
|
if e.returncode != 1: # 1 == no process found (pandad not running)
|
||
|
raise e
|
||
|
|
||
|
print("power up antenna ...")
|
||
|
gpio_init(GPIO.GNSS_PWR_EN, True)
|
||
|
gpio_set(GPIO.GNSS_PWR_EN, True)
|
||
|
|
||
|
if b"+QGPS: 0" not in (at_cmd("AT+QGPS?") or b""):
|
||
|
print("stop location tracking ...")
|
||
|
at_cmd("AT+QGPSEND")
|
||
|
|
||
|
if b'+QGPSCFG: "outport",usbnmea' not in (at_cmd('AT+QGPSCFG="outport"') or b""):
|
||
|
print("configure outport ...")
|
||
|
at_cmd('AT+QGPSCFG="outport","usbnmea"') # usbnmea = /dev/ttyUSB1
|
||
|
|
||
|
if b'+QGPSCFG: "gnssrawdata",3,0' not in (at_cmd('AT+QGPSCFG="gnssrawdata"') or b""):
|
||
|
print("configure gnssrawdata ...")
|
||
|
# AT+QGPSCFG="gnssrawdata",<constellation-mask>,<port>'
|
||
|
# <constellation-mask> values:
|
||
|
# 0x01 = GPS
|
||
|
# 0x02 = GLONASS
|
||
|
# 0x04 = BEIDOU
|
||
|
# 0x08 = GALILEO
|
||
|
# 0x10 = QZSS
|
||
|
# <port> values:
|
||
|
# 0 = NMEA port
|
||
|
# 1 = AT port
|
||
|
at_cmd('AT+QGPSCFG="gnssrawdata",3,0') # enable all constellations, output data to NMEA port
|
||
|
print("rebooting ...")
|
||
|
at_cmd('AT+CFUN=1,1')
|
||
|
print("re-run this script when it is back up")
|
||
|
sys.exit(2)
|
||
|
|
||
|
print("starting location tracking ...")
|
||
|
at_cmd("AT+QGPS=1")
|
||
|
|
||
|
process_nmea_port_messages()
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|