Merge branch 'min-feat/lat/road-edge-detection' into full

This commit is contained in:
Rick Lan
2025-06-17 19:05:17 +08:00
10 changed files with 87 additions and 11 deletions

View File

@@ -14,7 +14,9 @@ struct DpControlsState @0x81c2f05a394cf4af {
alkaActive @0 :Bool;
}
struct CustomReserved1 @0xaedffd8f31e7b55d {
struct ModelExt @0xaedffd8f31e7b55d {
leftEdgeDetected @0 :Bool;
rightEdgeDetected @1 :Bool;
}
struct CustomReserved2 @0xf35cc4560bbf6ec2 {

View File

@@ -2603,7 +2603,7 @@ struct Event {
# DON'T change the ID (e.g. @107)
# DON'T change which struct it points to
dpControlsState @107 :Custom.DpControlsState;
customReserved1 @108 :Custom.CustomReserved1;
modelExt @108 :Custom.ModelExt;
customReserved2 @109 :Custom.CustomReserved2;
customReserved3 @110 :Custom.CustomReserved3;
customReserved4 @111 :Custom.CustomReserved4;

View File

@@ -93,6 +93,7 @@ _services: dict[str, tuple] = {
"customReservedRawData1": (True, 0.),
"customReservedRawData2": (True, 0.),
"dpControlsState": (False, 100., 10),
"modelExt": (True, 20.),
}
SERVICE_LIST = {name: Service(*vals) for
idx, (name, vals) in enumerate(_services.items())}

View File

@@ -134,4 +134,5 @@ inline static std::unordered_map<std::string, uint32_t> keys = {
{"dp_device_go_off_road", CLEAR_ON_MANAGER_START},
{"dp_ui_hide_hud_speed_kph", PERSISTENT},
{"dp_lon_ext_radar", PERSISTENT},
{"dp_lat_road_edge_detection", PERSISTENT},
};

View File

@@ -0,0 +1,59 @@
#!/usr/bin/env python3
"""
MIT Non-Commercial License
Copyright (c) 2019-, rav4kumar, Rick Lan, dragonpilot community, and a number of other of contributors.
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation
files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use,
copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, for non-commercial purposes only,
subject to the following conditions:
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
* Commercial use (e.g., use in a product, service, or activity intended to generate revenue) is prohibited without explicit written permission from dragonpilot. Contact ricklan@gmail.com for inquiries.
* Any project that uses the Software must visibly mention the following acknowledgment: "This project uses software from dragonpilot and is licensed under a custom license requiring permission for use."
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
import numpy as np
NEARSIDE_PROB = 0.2
EDGE_PROB = 0.35
class RoadEdgeDetector:
def __init__(self, enabled = False):
self._is_enabled = enabled
self.left_edge_detected = False
self.right_edge_detected = False
def update(self, road_edge_stds, lane_line_probs):
if not self._is_enabled:
return
left_road_edge_prob = np.clip(1.0 - road_edge_stds[0], 0.0, 1.0)
left_lane_nearside_prob = lane_line_probs[0]
right_road_edge_prob = np.clip(1.0 - road_edge_stds[1], 0.0, 1.0)
right_lane_nearside_prob = lane_line_probs[3]
self.left_edge_detected = bool(
left_road_edge_prob > EDGE_PROB and
left_lane_nearside_prob < NEARSIDE_PROB and
right_lane_nearside_prob >= left_lane_nearside_prob
)
self.right_edge_detected = bool(
right_road_edge_prob > EDGE_PROB and
right_lane_nearside_prob < NEARSIDE_PROB and
left_lane_nearside_prob >= right_lane_nearside_prob
)
def set_enabled(self, enabled):
self._is_enabled = enabled
def is_enabled(self):
return self._is_enabled

View File

@@ -44,7 +44,7 @@ class DesireHelper:
self.dp_lat_lca_auto_sec = dp_lat_lca_auto_sec
self.dp_lat_lca_auto_sec_start = 0.
def update(self, carstate, lateral_active, lane_change_prob):
def update(self, carstate, lateral_active, lane_change_prob, left_edge_detected, right_edge_detected):
v_ego = carstate.vEgo
one_blinker = carstate.leftBlinker != carstate.rightBlinker
below_lane_change_speed = True if self.dp_lat_lca_speed == 0. else v_ego < self.dp_lat_lca_speed
@@ -70,8 +70,8 @@ class DesireHelper:
((carstate.steeringTorque > 0 and self.lane_change_direction == LaneChangeDirection.left) or
(carstate.steeringTorque < 0 and self.lane_change_direction == LaneChangeDirection.right))
blindspot_detected = ((carstate.leftBlindspot and self.lane_change_direction == LaneChangeDirection.left) or
(carstate.rightBlindspot and self.lane_change_direction == LaneChangeDirection.right))
blindspot_detected = (((carstate.leftBlindspot or left_edge_detected) and self.lane_change_direction == LaneChangeDirection.left) or
((carstate.rightBlindspot or right_edge_detected) and self.lane_change_direction == LaneChangeDirection.right))
# reset timer
if self.dp_lat_lca_auto_sec > 0.:

View File

@@ -36,6 +36,7 @@ from openpilot.selfdrive.modeld.parse_model_outputs import Parser
from openpilot.selfdrive.modeld.fill_model_msg import fill_model_msg, fill_pose_msg, PublishState
from openpilot.selfdrive.modeld.constants import ModelConstants, Plan
from openpilot.selfdrive.modeld.models.commonmodel_pyx import DrivingModelFrame, CLContext
from dragonpilot.selfdrive.controls.lib.road_edge_detector import RoadEdgeDetector
PROCESS_NAME = "selfdrive.modeld.modeld"
@@ -228,7 +229,7 @@ def main(demo=False):
cloudlog.warning(f"connected extra cam with buffer size: {vipc_client_extra.buffer_len} ({vipc_client_extra.width} x {vipc_client_extra.height})")
# messaging
pm = PubMaster(["modelV2", "drivingModelData", "cameraOdometry"])
pm = PubMaster(["modelV2", "drivingModelData", "cameraOdometry", "modelExt"])
sm = SubMaster(["deviceState", "carState", "roadCameraState", "liveCalibration", "driverMonitoringState", "carControl", "liveDelay"])
publish_state = PublishState()
@@ -262,6 +263,7 @@ def main(demo=False):
dp_lat_lca_speed = int(params.get("dp_lat_lca_speed"))
dp_lat_lca_auto_sec = float(params.get("dp_lat_lca_auto_sec"))
DH = DesireHelper(dp_lat_lca_speed=dp_lat_lca_speed, dp_lat_lca_auto_sec=dp_lat_lca_auto_sec)
RED = RoadEdgeDetector(params.get_bool("dp_lat_road_edge_detection"))
while True:
# Keep receiving frames until we are at least 1 frame ahead of previous extra frame
@@ -345,6 +347,7 @@ def main(demo=False):
modelv2_send = messaging.new_message('modelV2')
drivingdata_send = messaging.new_message('drivingModelData')
posenet_send = messaging.new_message('cameraOdometry')
model_ext_send = messaging.new_message('modelExt')
action = get_action_from_model(model_output, prev_action, lat_delay + DT_MDL, long_delay + DT_MDL, v_ego)
prev_action = action
@@ -356,7 +359,10 @@ def main(demo=False):
l_lane_change_prob = desire_state[log.Desire.laneChangeLeft]
r_lane_change_prob = desire_state[log.Desire.laneChangeRight]
lane_change_prob = l_lane_change_prob + r_lane_change_prob
DH.update(sm['carState'], sm['carControl'].latActive, lane_change_prob)
RED.update(modelv2_send.modelV2.roadEdgeStds, modelv2_send.modelV2.laneLineProbs)
model_ext_send.modelExt.leftEdgeDetected = RED.left_edge_detected
model_ext_send.modelExt.rightEdgeDetected = RED.right_edge_detected
DH.update(sm['carState'], sm['carControl'].latActive, lane_change_prob, RED.left_edge_detected, RED.right_edge_detected)
modelv2_send.modelV2.meta.laneChangeState = DH.lane_change_state
modelv2_send.modelV2.meta.laneChangeDirection = DH.lane_change_direction
drivingdata_send.drivingModelData.meta.laneChangeState = DH.lane_change_state
@@ -366,6 +372,7 @@ def main(demo=False):
pm.send('modelV2', modelv2_send)
pm.send('drivingModelData', drivingdata_send)
pm.send('cameraOdometry', posenet_send)
pm.send('modelExt', model_ext_send)
last_vipc_frame_id = meta_main.frame_id

View File

@@ -68,7 +68,7 @@ class SelfdriveD:
# TODO: de-couple selfdrived with card/conflate on carState without introducing controls mismatches
self.car_state_sock = messaging.sub_sock('carState', timeout=20)
ignore = self.sensor_packets + self.gps_packets + ['alertDebug']
ignore = self.sensor_packets + self.gps_packets + ['alertDebug'] + ['modelExt']
if SIMULATION:
ignore += ['driverCameraState', 'managerState']
if REPLAY:
@@ -79,7 +79,7 @@ class SelfdriveD:
self.sm = messaging.SubMaster(['deviceState', 'pandaStates', 'peripheralState', 'modelV2', 'liveCalibration',
'carOutput', 'driverMonitoringState', 'longitudinalPlan', 'livePose', 'liveDelay',
'managerState', 'liveParameters', 'radarState', 'liveTorqueParameters',
'controlsState', 'carControl', 'driverAssistance', 'alertDebug', 'userFlag'] + \
'controlsState', 'carControl', 'driverAssistance', 'alertDebug', 'userFlag', 'modelExt'] + \
self.camera_packets + self.sensor_packets + self.gps_packets,
ignore_alive=ignore, ignore_avg_freq=ignore,
ignore_valid=ignore, frequency=int(1/DT_CTRL))
@@ -234,8 +234,8 @@ class SelfdriveD:
# Handle lane change
if self.sm['modelV2'].meta.laneChangeState == LaneChangeState.preLaneChange:
direction = self.sm['modelV2'].meta.laneChangeDirection
if (CS.leftBlindspot and direction == LaneChangeDirection.left) or \
(CS.rightBlindspot and direction == LaneChangeDirection.right):
if ((CS.leftBlindspot or self.sm['modelExt'].leftEdgeDetected) and direction == LaneChangeDirection.left) or \
((CS.rightBlindspot or self.sm['modelExt'].rightEdgeDetected) and direction == LaneChangeDirection.right):
self.events.add(EventName.laneChangeBlocked)
else:
if direction == LaneChangeDirection.left:

View File

@@ -111,6 +111,11 @@ void DPPanel::add_lateral_toggles() {
tr("Always-on Lane Keeping Assist (ALKA)"),
"",
},
{
"dp_lat_road_edge_detection",
tr("Road Edge Detection (RED)"),
tr("Block lane change assist when the system detects the road edge.\nNOTE: This will show 'Car Detected in Blindspot' warning.")
},
};
auto lca_speed_toggle = new ParamSpinBoxControl("dp_lat_lca_speed", tr("LCA Speed:"), tr("Off = Disable LCA\n1 mph ≈ 1.2 km/h"), "", 0, 100, 5, tr(" mph"), tr("Off"));
lca_sec_toggle = new ParamDoubleSpinBoxControl("dp_lat_lca_auto_sec", QString::fromUtf8(" ") + tr("Auto Lane Change after:"), tr("Off = Disable Auto Lane Change."), "", 0, 5.0, 0.5, tr(" sec"), tr("Off"));

View File

@@ -52,6 +52,7 @@ def manager_init() -> None:
("dp_lat_lca_auto_sec", "0"),
("dp_ui_hide_hud_speed_kph", "0"),
("dp_lon_ext_radar", "0"),
("dp_lat_road_edge_detection", "0"),
]
if params.get_bool("RecordFrontLock"):