Files
sunnypilot/tools/sim/bridge/metadrive/metadrive_process.py
Hoang Bui fe9a091f11 CI: Drive a loop in MetaDrive (#32308)
* finish failure on crossing any line

* update

* standardize queue messages

* update control_command_gen

* fix

* fix logic

* update closing type

* update test

* update logic

* update test

* add out of lane to local

* ci arrive_dest

* pytest integration

* update ci_config

* fix ruff

* move test termination to time

* better

* better order

* curve_len

* add buffer

* cleanup

* cleanup

* cleanup

* cleanup

* out_of_lane

* cleanup

* merge tests

* run 90s

* change test name

* local out of lane detect

* out_of_lane

* static anal

* cleanup

* test_duration

* change setup_class -> setup_create_bridge

* no print state during test

* new out_of_lane detect

* cleanup print in common.py

* fix

* fix

* check distance vs time

* cleanup

* cleanup increase check time

* minimum bridge test time

* wording

* cleanup
2024-05-22 10:04:43 -07:00

150 lines
5.1 KiB
Python

import math
import time
import numpy as np
from collections import namedtuple
from panda3d.core import Vec3
from multiprocessing.connection import Connection
from metadrive.engine.core.engine_core import EngineCore
from metadrive.engine.core.image_buffer import ImageBuffer
from metadrive.envs.metadrive_env import MetaDriveEnv
from metadrive.obs.image_obs import ImageObservation
from openpilot.common.realtime import Ratekeeper
from openpilot.tools.sim.lib.common import vec3
from openpilot.tools.sim.lib.camerad import W, H
C3_POSITION = Vec3(0.0, 0, 1.22)
C3_HPR = Vec3(0, 0,0)
metadrive_simulation_state = namedtuple("metadrive_simulation_state", ["running", "done", "done_info"])
metadrive_vehicle_state = namedtuple("metadrive_vehicle_state", ["velocity", "position", "bearing", "steering_angle"])
def apply_metadrive_patches(arrive_dest_done=True):
# By default, metadrive won't try to use cuda images unless it's used as a sensor for vehicles, so patch that in
def add_image_sensor_patched(self, name: str, cls, args):
if self.global_config["image_on_cuda"]:# and name == self.global_config["vehicle_config"]["image_source"]:
sensor = cls(*args, self, cuda=True)
else:
sensor = cls(*args, self, cuda=False)
assert isinstance(sensor, ImageBuffer), "This API is for adding image sensor"
self.sensors[name] = sensor
EngineCore.add_image_sensor = add_image_sensor_patched
# we aren't going to use the built-in observation stack, so disable it to save time
def observe_patched(self, *args, **kwargs):
return self.state
ImageObservation.observe = observe_patched
# disable destination, we want to loop forever
def arrive_destination_patch(self, *args, **kwargs):
return False
if not arrive_dest_done:
MetaDriveEnv._is_arrive_destination = arrive_destination_patch
def metadrive_process(dual_camera: bool, config: dict, camera_array, wide_camera_array, image_lock,
controls_recv: Connection, simulation_state_send: Connection, vehicle_state_send: Connection,
exit_event, start_time, test_duration, test_run):
arrive_dest_done = config.pop("arrive_dest_done", True)
apply_metadrive_patches(arrive_dest_done)
road_image = np.frombuffer(camera_array.get_obj(), dtype=np.uint8).reshape((H, W, 3))
if dual_camera:
assert wide_camera_array is not None
wide_road_image = np.frombuffer(wide_camera_array.get_obj(), dtype=np.uint8).reshape((H, W, 3))
env = MetaDriveEnv(config)
def get_current_lane_info(vehicle):
_, lane_info, on_lane = vehicle.navigation._get_current_lane(vehicle)
lane_idx = lane_info[2] if lane_info is not None else None
return lane_idx, on_lane
def reset():
env.reset()
env.vehicle.config["max_speed_km_h"] = 1000
lane_idx_prev, _ = get_current_lane_info(env.vehicle)
simulation_state = metadrive_simulation_state(
running=True,
done=False,
done_info=None,
)
simulation_state_send.send(simulation_state)
return lane_idx_prev
lane_idx_prev = reset()
def get_cam_as_rgb(cam):
cam = env.engine.sensors[cam]
cam.get_cam().reparentTo(env.vehicle.origin)
cam.get_cam().setPos(C3_POSITION)
cam.get_cam().setHpr(C3_HPR)
img = cam.perceive(clip=False)
if type(img) != np.ndarray:
img = img.get() # convert cupy array to numpy
return img
rk = Ratekeeper(100, None)
steer_ratio = 8
vc = [0,0]
while not exit_event.is_set():
vehicle_state = metadrive_vehicle_state(
velocity=vec3(x=float(env.vehicle.velocity[0]), y=float(env.vehicle.velocity[1]), z=0),
position=env.vehicle.position,
bearing=float(math.degrees(env.vehicle.heading_theta)),
steering_angle=env.vehicle.steering * env.vehicle.MAX_STEERING
)
vehicle_state_send.send(vehicle_state)
if controls_recv.poll(0):
while controls_recv.poll(0):
steer_angle, gas, should_reset = controls_recv.recv()
steer_metadrive = steer_angle * 1 / (env.vehicle.MAX_STEERING * steer_ratio)
steer_metadrive = np.clip(steer_metadrive, -1, 1)
vc = [steer_metadrive, gas]
if should_reset:
lane_idx_prev = reset()
if rk.frame % 5 == 0:
_, _, terminated, _, _ = env.step(vc)
timeout = True if time.monotonic() - start_time >= test_duration else False
lane_idx_curr, on_lane = get_current_lane_info(env.vehicle)
out_of_lane = lane_idx_curr != lane_idx_prev or not on_lane
lane_idx_prev = lane_idx_curr
if terminated or ((out_of_lane or timeout) and test_run):
if terminated:
done_result = env.done_function("default_agent")
elif out_of_lane:
done_result = (True, {"out_of_lane" : True})
elif timeout:
done_result = (True, {"timeout" : True})
simulation_state = metadrive_simulation_state(
running=False,
done=done_result[0],
done_info=done_result[1],
)
simulation_state_send.send(simulation_state)
if dual_camera:
wide_road_image[...] = get_cam_as_rgb("rgb_wide")
road_image[...] = get_cam_as_rgb("rgb_road")
image_lock.release()
rk.keep_time()