CI: Drive a loop in MetaDrive (#32308)

* finish failure on crossing any line

* update

* standardize queue messages

* update control_command_gen

* fix

* fix logic

* update closing type

* update test

* update logic

* update test

* add out of lane to local

* ci arrive_dest

* pytest integration

* update ci_config

* fix ruff

* move test termination to time

* better

* better order

* curve_len

* add buffer

* cleanup

* cleanup

* cleanup

* cleanup

* out_of_lane

* cleanup

* merge tests

* run 90s

* change test name

* local out of lane detect

* out_of_lane

* static anal

* cleanup

* test_duration

* change setup_class -> setup_create_bridge

* no print state during test

* new out_of_lane detect

* cleanup print in common.py

* fix

* fix

* check distance vs time

* cleanup

* cleanup increase check time

* minimum bridge test time

* wording

* cleanup
This commit is contained in:
Hoang Bui 2024-05-22 13:04:43 -04:00 committed by GitHub
parent d96b8bbc01
commit fe9a091f11
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 110 additions and 114 deletions

View File

@ -60,6 +60,8 @@ class SimulatorBridge(ABC):
self.past_startup_engaged = False
self.startup_button_prev = True
self.test_run = False
def _on_shutdown(self, signal, frame):
self.shutdown()
@ -193,7 +195,8 @@ Ignition: {self.simulator_state.ignition} Engaged: {self.simulator_state.is_enga
self.world.tick()
self.world.read_cameras()
if self.rk.frame % 25 == 0:
# don't print during test, so no print/IO Block between OP and metadrive processes
if not self.test_run and self.rk.frame % 25 == 0:
self.print_status()
self.started.value = True

View File

@ -1,3 +1,4 @@
import math
from multiprocessing import Queue
from metadrive.component.sensors.base_camera import _cuda_enable
@ -27,20 +28,21 @@ def curve_block(length, angle=45, direction=0):
}
def create_map(track_size=60):
curve_len = track_size * 2
return dict(
type=MapGenerateMethod.PG_MAP_FILE,
lane_num=2,
lane_width=3.5,
lane_width=4.5,
config=[
None,
straight_block(track_size),
curve_block(track_size*2, 90),
curve_block(curve_len, 90),
straight_block(track_size),
curve_block(track_size*2, 90),
curve_block(curve_len, 90),
straight_block(track_size),
curve_block(track_size*2, 90),
curve_block(curve_len, 90),
straight_block(track_size),
curve_block(track_size*2, 90),
curve_block(curve_len, 90),
]
)
@ -48,11 +50,13 @@ def create_map(track_size=60):
class MetaDriveBridge(SimulatorBridge):
TICKS_PER_FRAME = 5
def __init__(self, dual_camera, high_quality):
self.should_render = False
def __init__(self, dual_camera, high_quality, test_duration=math.inf, test_run=False):
super().__init__(dual_camera, high_quality)
self.should_render = False
self.test_run = test_run
self.test_duration = test_duration if self.test_run else math.inf
def spawn_world(self, queue: Queue):
sensors = {
"rgb_road": (RGBCameraRoad, W, H, )
@ -83,4 +87,4 @@ class MetaDriveBridge(SimulatorBridge):
preload_models=False
)
return MetaDriveWorld(queue, config, self.dual_camera)
return MetaDriveWorld(queue, config, self.test_duration, self.test_run, self.dual_camera)

View File

@ -1,4 +1,5 @@
import math
import time
import numpy as np
from collections import namedtuple
@ -49,7 +50,7 @@ def apply_metadrive_patches(arrive_dest_done=True):
def metadrive_process(dual_camera: bool, config: dict, camera_array, wide_camera_array, image_lock,
controls_recv: Connection, simulation_state_send: Connection, vehicle_state_send: Connection,
exit_event):
exit_event, start_time, test_duration, test_run):
arrive_dest_done = config.pop("arrive_dest_done", True)
apply_metadrive_patches(arrive_dest_done)
@ -60,9 +61,15 @@ def metadrive_process(dual_camera: bool, config: dict, camera_array, wide_camera
env = MetaDriveEnv(config)
def get_current_lane_info(vehicle):
_, lane_info, on_lane = vehicle.navigation._get_current_lane(vehicle)
lane_idx = lane_info[2] if lane_info is not None else None
return lane_idx, on_lane
def reset():
env.reset()
env.vehicle.config["max_speed_km_h"] = 1000
lane_idx_prev, _ = get_current_lane_info(env.vehicle)
simulation_state = metadrive_simulation_state(
running=True,
@ -71,7 +78,9 @@ def metadrive_process(dual_camera: bool, config: dict, camera_array, wide_camera
)
simulation_state_send.send(simulation_state)
reset()
return lane_idx_prev
lane_idx_prev = reset()
def get_cam_as_rgb(cam):
cam = env.engine.sensors[cam]
@ -108,13 +117,23 @@ def metadrive_process(dual_camera: bool, config: dict, camera_array, wide_camera
vc = [steer_metadrive, gas]
if should_reset:
reset()
lane_idx_prev = reset()
if rk.frame % 5 == 0:
obs, _, terminated, _, info = env.step(vc)
_, _, terminated, _, _ = env.step(vc)
timeout = True if time.monotonic() - start_time >= test_duration else False
lane_idx_curr, on_lane = get_current_lane_info(env.vehicle)
out_of_lane = lane_idx_curr != lane_idx_prev or not on_lane
lane_idx_prev = lane_idx_curr
if terminated or ((out_of_lane or timeout) and test_run):
if terminated:
done_result = env.done_function("default_agent")
elif out_of_lane:
done_result = (True, {"out_of_lane" : True})
elif timeout:
done_result = (True, {"timeout" : True})
if terminated:
done_result = env.done_function("default_agent")
simulation_state = metadrive_simulation_state(
running=False,
done=done_result[0],

View File

@ -14,7 +14,7 @@ from openpilot.tools.sim.lib.camerad import W, H
class MetaDriveWorld(World):
def __init__(self, status_q, config, dual_camera = False):
def __init__(self, status_q, config, test_duration, test_run, dual_camera=False):
super().__init__(dual_camera)
self.status_q = status_q
self.camera_array = Array(ctypes.c_uint8, W*H*3)
@ -30,11 +30,18 @@ class MetaDriveWorld(World):
self.exit_event = multiprocessing.Event()
self.test_run = test_run
self.start_time = time.monotonic()
self.first_engage = None
self.last_check_timestamp = 0
self.distance_moved = 0
self.metadrive_process = multiprocessing.Process(name="metadrive process", target=
functools.partial(metadrive_process, dual_camera, config,
self.camera_array, self.wide_camera_array, self.image_lock,
self.controls_recv, self.simulation_state_send,
self.vehicle_state_send, self.exit_event))
self.vehicle_state_send, self.exit_event, self.start_time, test_duration, self.test_run))
self.metadrive_process.start()
self.status_q.put(QueueMessage(QueueMessageType.START_STATUS, "starting"))
@ -43,7 +50,7 @@ class MetaDriveWorld(World):
print("---- Spawning Metadrive world, this might take awhile ----")
print("----------------------------------------------------------")
self.vehicle_state_recv.recv() # wait for a state message to ensure metadrive is launched
self.vehicle_last_pos = self.vehicle_state_recv.recv().position # wait for a state message to ensure metadrive is launched
self.status_q.put(QueueMessage(QueueMessageType.START_STATUS, "started"))
self.steer_ratio = 15
@ -76,12 +83,38 @@ class MetaDriveWorld(World):
def read_sensors(self, state: SimulatorState):
while self.vehicle_state_recv.poll(0):
md_vehicle: metadrive_vehicle_state = self.vehicle_state_recv.recv()
curr_pos = md_vehicle.position
state.velocity = md_vehicle.velocity
state.bearing = md_vehicle.bearing
state.steering_angle = md_vehicle.steering_angle
state.gps.from_xy(md_vehicle.position)
state.gps.from_xy(curr_pos)
state.valid = True
is_engaged = state.is_engaged
if is_engaged and self.first_engage is None:
self.first_engage = time.monotonic()
# check moving 5 seconds after engaged, doesn't move right away
after_engaged_check = is_engaged and time.monotonic() - self.first_engage >= 5 and self.test_run
x_dist = abs(curr_pos[0] - self.vehicle_last_pos[0])
y_dist = abs(curr_pos[1] - self.vehicle_last_pos[1])
dist_threshold = 1
if x_dist >= dist_threshold or y_dist >= dist_threshold: # position not the same during staying still, > threshold is considered moving
self.distance_moved += x_dist + y_dist
time_check_threshold = 30
current_time = time.monotonic()
since_last_check = current_time - self.last_check_timestamp
if since_last_check >= time_check_threshold:
if after_engaged_check and self.distance_moved == 0:
self.status_q.put(QueueMessage(QueueMessageType.TERMINATION_INFO, {"vehicle_not_moving" : True}))
self.exit_event.set()
self.last_check_timestamp = current_time
self.distance_moved = 0
self.vehicle_last_pos = curr_pos
def read_cameras(self):
pass

View File

@ -1,92 +0,0 @@
#!/usr/bin/env python
from multiprocessing import Queue
from metadrive.component.sensors.base_camera import _cuda_enable
from metadrive.component.map.pg_map import MapGenerateMethod
from openpilot.tools.sim.bridge.common import SimulatorBridge
from openpilot.tools.sim.bridge.metadrive.metadrive_common import RGBCameraRoad, RGBCameraWide
from openpilot.tools.sim.bridge.metadrive.metadrive_world import MetaDriveWorld
from openpilot.tools.sim.lib.camerad import W, H
def create_map():
return dict(
type=MapGenerateMethod.PG_MAP_FILE,
lane_num=2,
lane_width=3.5,
config=[
{
"id": "S",
"pre_block_socket_index": 0,
"length": 60,
},
{
"id": "C",
"pre_block_socket_index": 0,
"length": 60,
"radius": 600,
"angle": 45,
"dir": 0,
},
]
)
class MetaDriveBridge(SimulatorBridge):
TICKS_PER_FRAME = 5
def __init__(self, world_status_q, dual_camera, high_quality):
self.world_status_q = world_status_q
self.should_render = False
super().__init__(dual_camera, high_quality)
def spawn_world(self):
sensors = {
"rgb_road": (RGBCameraRoad, W, H, )
}
if self.dual_camera:
sensors["rgb_wide"] = (RGBCameraWide, W, H)
config = dict(
use_render=self.should_render,
vehicle_config=dict(
enable_reverse=False,
image_source="rgb_road",
),
sensors=sensors,
image_on_cuda=_cuda_enable,
image_observation=True,
interface_panel=[],
out_of_route_done=True,
on_continuous_line_done=True,
crash_vehicle_done=True,
crash_object_done=True,
arrive_dest_done=True,
traffic_density=0.0,
map_config=create_map(),
map_region_size=2048,
decision_repeat=1,
physics_world_step_size=self.TICKS_PER_FRAME/100,
preload_models=False
)
return MetaDriveWorld(world_status_q, config, self.dual_camera)
if __name__ == "__main__":
command_queue: Queue = Queue()
world_status_q: Queue = Queue()
simulator_bridge = MetaDriveBridge(world_status_q, True, False)
simulator_process = simulator_bridge.run(command_queue)
while True:
world_status = world_status_q.get()
print(f"World Status: {str(world_status)}")
if world_status["status"] == "terminating":
break
simulator_process.join()

View File

@ -0,0 +1,8 @@
import pytest
def pytest_addoption(parser):
parser.addoption("--test_duration", action="store", default=60, type=int, help="Seconds to run metadrive drive")
@pytest.fixture
def test_duration(request):
return request.config.getoption("--test_duration")

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python3
import pytest
import warnings
# Since metadrive depends on pkg_resources, and pkg_resources is deprecated as an API
warnings.filterwarnings("ignore", category=DeprecationWarning)
@ -10,5 +11,12 @@ from openpilot.tools.sim.tests.test_sim_bridge import TestSimBridgeBase
@pytest.mark.slow
@pytest.mark.filterwarnings("ignore::pyopencl.CompilerWarning") # Unimportant warning of non-empty compile log
class TestMetaDriveBridge(TestSimBridgeBase):
@pytest.fixture(autouse=True)
def setup_create_bridge(self, test_duration):
# run bridge test for at least 60s, since not-moving check runs every 30s
if test_duration < 60:
test_duration = 60
self.test_duration = test_duration
def create_bridge(self):
return MetaDriveBridge(False, False)
return MetaDriveBridge(False, False, self.test_duration, True)

View File

@ -7,6 +7,7 @@ from multiprocessing import Queue
from cereal import messaging
from openpilot.common.basedir import BASEDIR
from openpilot.tools.sim.bridge.common import QueueMessageType
SIM_DIR = os.path.join(BASEDIR, "tools/sim")
@ -19,7 +20,7 @@ class TestSimBridgeBase:
def setup_method(self):
self.processes = []
def test_engage(self):
def test_driving(self):
# Startup manager and bridge.py. Check processes are running, then engage and verify.
p_manager = subprocess.Popen("./launch_openpilot.sh", cwd=SIM_DIR)
self.processes.append(p_manager)
@ -70,6 +71,18 @@ class TestSimBridgeBase:
assert min_counts_control_active == control_active, f"Simulator did not engage a minimal of {min_counts_control_active} steps was {control_active}"
failure_states = []
while bridge.started.value:
continue
while not q.empty():
state = q.get()
if state.type == QueueMessageType.TERMINATION_INFO:
done_info = state.info
failure_states = [done_state for done_state in done_info if done_state != "timeout" and done_info[done_state]]
break
assert len(failure_states) == 0, f"Simulator fails to finish a loop. Failure states: {failure_states}"
def teardown_method(self):
print("Test shutting down. CommIssues are acceptable")
for p in reversed(self.processes):