openpilot1/selfdrive/frogpilot/frogpilot_utilities.py

105 lines
3.2 KiB
Python

import math
import numpy as np
import os
import shutil
import subprocess
import threading
import time
import urllib.request
from openpilot.common.numpy_fast import interp, mean
from openpilot.common.params_pyx import Params
EARTH_RADIUS = 6378137 # Radius of the Earth in meters
def update_frogpilot_toggles():
Params("/dev/shm/params").put_bool("FrogPilotTogglesUpdated", True)
def calculate_distance_to_point(ax, ay, bx, by):
a = math.sin((bx - ax) / 2) * math.sin((bx - ax) / 2) + math.cos(ax) * math.cos(bx) * math.sin((by - ay) / 2) * math.sin((by - ay) / 2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return EARTH_RADIUS * c
def calculate_lane_width(lane, current_lane, road_edge):
current_x = np.array(current_lane.x)
current_y = np.array(current_lane.y)
lane_y_interp = interp(current_x, np.array(lane.x), np.array(lane.y))
road_edge_y_interp = interp(current_x, np.array(road_edge.x), np.array(road_edge.y))
distance_to_lane = np.mean(np.abs(current_y - lane_y_interp))
distance_to_road_edge = np.mean(np.abs(current_y - road_edge_y_interp))
return float(min(distance_to_lane, distance_to_road_edge))
# Credit goes to Pfeiferj!
def calculate_road_curvature(modelData, v_ego):
orientation_rate = np.abs(modelData.orientationRate.z)
velocity = modelData.velocity.x
max_pred_lat_acc = np.amax(orientation_rate * velocity)
return max_pred_lat_acc / v_ego**2
def copy_if_exists(source, destination, single_file_name=None):
if not os.path.exists(source):
print(f"Source directory {source} does not exist. Skipping copy.")
return
if single_file_name:
os.makedirs(destination, exist_ok=True)
for item in os.listdir(source):
shutil.copy2(os.path.join(source, item), os.path.join(destination, single_file_name))
print(f"Successfully copied {item} to {single_file_name}.")
else:
shutil.copytree(source, destination, dirs_exist_ok=True)
print(f"Successfully copied {source} to {destination}.")
def delete_file(file):
try:
if os.path.isfile(file):
os.remove(file)
print(f"Deleted file: {file}")
else:
print(f"File not found: {file}")
except Exception as e:
print(f"An error occurred when deleting {file}: {e}")
def is_url_pingable(url, timeout=5):
try:
urllib.request.urlopen(urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}), timeout=timeout)
return True
except Exception as e:
print(f"Failed to ping {url}: {e}")
return False
def run_cmd(cmd, success_message, fail_message, retries=5, delay=1):
for attempt in range(retries):
try:
subprocess.check_call(cmd)
print(success_message)
return True
except Exception as e:
print(f"Unexpected error occurred (attempt {attempt + 1} of {retries}): {e}")
time.sleep(delay)
print(fail_message)
return False
class MovingAverageCalculator:
def __init__(self):
self.reset_data()
def add_data(self, value):
if len(self.data) == 5:
self.total -= self.data.pop(0)
self.data.append(value)
self.total += value
def get_moving_average(self):
if len(self.data) == 0:
return None
return self.total / len(self.data)
def reset_data(self):
self.data = []
self.total = 0