mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-02-27 01:23:56 +08:00
Laikad: process executor to fetch orbits (#24843)
* Use ProcessPoolExecutor to fetch orbits * update laika repo * Minor
This commit is contained in:
Submodule laika_repo updated: d871946134...36f2621fc5
@@ -1,6 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
import time
|
||||
from multiprocessing import Process, Queue
|
||||
from concurrent.futures import Future, ProcessPoolExecutor
|
||||
from typing import List, Optional
|
||||
|
||||
import numpy as np
|
||||
@@ -10,7 +10,7 @@ from numpy.linalg import linalg
|
||||
|
||||
from cereal import log, messaging
|
||||
from laika import AstroDog
|
||||
from laika.constants import SECS_IN_MIN
|
||||
from laika.constants import SECS_IN_HR, SECS_IN_MIN
|
||||
from laika.ephemeris import EphemerisType, convert_ublox_ephem
|
||||
from laika.gps_time import GPSTime
|
||||
from laika.helpers import ConstellationId
|
||||
@@ -29,8 +29,9 @@ class Laikad:
|
||||
def __init__(self, valid_const=("GPS", "GLONASS"), auto_update=False, valid_ephem_types=(EphemerisType.ULTRA_RAPID_ORBIT, EphemerisType.NAV)):
|
||||
self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types)
|
||||
self.gnss_kf = GNSSKalman(GENERATED_DIR)
|
||||
self.orbit_p: Optional[Process] = None
|
||||
self.orbit_q = Queue()
|
||||
self.orbit_fetch_executor = ProcessPoolExecutor()
|
||||
self.orbit_fetch_future: Optional[Future] = None
|
||||
self.last_fetch_orbits_t = None
|
||||
|
||||
def process_ublox_msg(self, ublox_msg, ublox_mono_time: int, block=False):
|
||||
if ublox_msg.which == 'measurementReport':
|
||||
@@ -82,7 +83,7 @@ class Laikad:
|
||||
return dat
|
||||
elif ublox_msg.which == 'ephemeris':
|
||||
ephem = convert_ublox_ephem(ublox_msg.ephemeris)
|
||||
self.astro_dog.add_ephems([ephem], self.astro_dog.nav)
|
||||
self.astro_dog.add_navs([ephem])
|
||||
# elif ublox_msg.which == 'ionoData':
|
||||
# todo add this. Needed to better correct messages offline. First fix ublox_msg.cc to sent them.
|
||||
|
||||
@@ -100,7 +101,7 @@ class Laikad:
|
||||
cloudlog.error("Gnss kalman std too far")
|
||||
|
||||
if len(pos_fix) == 0:
|
||||
cloudlog.error("Position fix not available when resetting kalman filter")
|
||||
cloudlog.warning("Position fix not available when resetting kalman filter")
|
||||
return
|
||||
post_est = pos_fix[0][:3].tolist()
|
||||
self.init_gnss_localizer(post_est)
|
||||
@@ -124,36 +125,33 @@ class Laikad:
|
||||
|
||||
self.gnss_kf.init_state(x_initial, covs_diag=p_initial_diag)
|
||||
|
||||
def get_orbit_data(self, t: GPSTime, queue):
|
||||
cloudlog.info(f"Start to download/parse orbits for time {t.as_datetime()}")
|
||||
start_time = time.monotonic()
|
||||
try:
|
||||
self.astro_dog.get_orbit_data(t, only_predictions=True)
|
||||
except RuntimeError as e:
|
||||
cloudlog.info(f"No orbit data found. {e}")
|
||||
return
|
||||
cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.2f}s")
|
||||
if queue is not None:
|
||||
queue.put((self.astro_dog.orbits, self.astro_dog.orbit_fetched_times))
|
||||
|
||||
def fetch_orbits(self, t: GPSTime, block):
|
||||
if t not in self.astro_dog.orbit_fetched_times:
|
||||
if block:
|
||||
self.get_orbit_data(t, None)
|
||||
return
|
||||
if self.orbit_p is None:
|
||||
self.orbit_p = Process(target=self.get_orbit_data, args=(t, self.orbit_q))
|
||||
self.orbit_p.start()
|
||||
if not self.orbit_q.empty():
|
||||
ret = self.orbit_q.get()
|
||||
if t not in self.astro_dog.orbit_fetched_times and (self.last_fetch_orbits_t is None or t - self.last_fetch_orbits_t > SECS_IN_HR):
|
||||
astro_dog_vars = self.astro_dog.valid_const, self.astro_dog.auto_update, self.astro_dog.valid_ephem_types
|
||||
if self.orbit_fetch_future is None:
|
||||
self.orbit_fetch_future = self.orbit_fetch_executor.submit(get_orbit_data, t, *astro_dog_vars)
|
||||
if block:
|
||||
self.orbit_fetch_future.result()
|
||||
if self.orbit_fetch_future.done():
|
||||
ret = self.orbit_fetch_future.result()
|
||||
if ret:
|
||||
self.astro_dog.orbits, self.astro_dog.orbit_fetched_times = ret
|
||||
self.orbit_p.join()
|
||||
self.orbit_p = None
|
||||
self.orbit_fetch_future = None
|
||||
self.last_fetch_orbits_t = t
|
||||
|
||||
def __del__(self):
|
||||
if self.orbit_p is not None:
|
||||
self.orbit_p.kill()
|
||||
|
||||
def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types):
|
||||
astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types)
|
||||
cloudlog.info(f"Start to download/parse orbits for time {t.as_datetime()}")
|
||||
start_time = time.monotonic()
|
||||
data = None
|
||||
try:
|
||||
astro_dog.get_orbit_data(t, only_predictions=True)
|
||||
data = (astro_dog.orbits, astro_dog.orbit_fetched_times)
|
||||
except RuntimeError as e:
|
||||
cloudlog.info(f"No orbit data found. {e}")
|
||||
cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.1f}s")
|
||||
return data
|
||||
|
||||
|
||||
def create_measurement_msg(meas: GNSSMeasurement):
|
||||
|
||||
@@ -69,29 +69,46 @@ class TestLaikad(unittest.TestCase):
|
||||
self.assertEqual(256, len(correct_msgs))
|
||||
self.assertEqual(256, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid]))
|
||||
|
||||
def test_laika_get_orbits(self):
|
||||
laikad = Laikad(auto_update=False)
|
||||
first_gps_time = None
|
||||
def get_first_gps_time(self):
|
||||
for m in self.logs:
|
||||
if m.ubloxGnss.which == 'measurementReport':
|
||||
new_meas = read_raw_ublox(m.ubloxGnss.measurementReport)
|
||||
if len(new_meas) != 0:
|
||||
first_gps_time = new_meas[0].recv_time
|
||||
break
|
||||
return new_meas[0].recv_time
|
||||
|
||||
def test_laika_get_orbits(self):
|
||||
laikad = Laikad(auto_update=False)
|
||||
first_gps_time = self.get_first_gps_time()
|
||||
# Pretend process has loaded the orbits on startup by using the time of the first gps message.
|
||||
laikad.fetch_orbits(first_gps_time, block=True)
|
||||
self.assertEqual(29, len(laikad.astro_dog.orbits.keys()))
|
||||
self.assertEqual(29, len(laikad.astro_dog.orbits.values()))
|
||||
self.assertGreater(min([len(v) for v in laikad.astro_dog.orbits.values()]), 0)
|
||||
|
||||
@unittest.skip("Use to debug live data")
|
||||
def test_laika_get_orbits_now(self):
|
||||
laikad = Laikad(auto_update=False)
|
||||
laikad.fetch_orbits(GPSTime.from_datetime(datetime.utcnow()), block=True)
|
||||
prn = "G01"
|
||||
self.assertLess(0, len(laikad.astro_dog.orbits[prn]))
|
||||
self.assertGreater(len(laikad.astro_dog.orbits[prn]), 0)
|
||||
prn = "R01"
|
||||
self.assertLess(0, len(laikad.astro_dog.orbits[prn]))
|
||||
self.assertGreater(len(laikad.astro_dog.orbits[prn]), 0)
|
||||
print(min(laikad.astro_dog.orbits[prn], key=lambda e: e.epoch).epoch.as_datetime())
|
||||
|
||||
def test_get_orbits_in_process(self):
|
||||
laikad = Laikad(auto_update=False)
|
||||
has_orbits = False
|
||||
for m in self.logs:
|
||||
laikad.process_ublox_msg(m.ubloxGnss, m.logMonoTime, block=False)
|
||||
if laikad.orbit_fetch_future is not None:
|
||||
laikad.orbit_fetch_future.result()
|
||||
vals = laikad.astro_dog.orbits.values()
|
||||
has_orbits = len(vals) > 0 and max([len(v) for v in vals]) > 0
|
||||
if has_orbits:
|
||||
break
|
||||
self.assertTrue(has_orbits)
|
||||
self.assertGreater(len(laikad.astro_dog.orbit_fetched_times._ranges), 0)
|
||||
self.assertEqual(None, laikad.orbit_fetch_future)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user