From e265c4c78a72e2fab8d40594b82f7d047ff540ae Mon Sep 17 00:00:00 2001 From: "Mr.one" Date: Thu, 15 May 2025 11:34:02 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BC=96=E8=BE=91beep.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- selfdrive/selfdrived/beep.py | 69 ++++++++++++++++++++---------------- 1 file changed, 38 insertions(+), 31 deletions(-) diff --git a/selfdrive/selfdrived/beep.py b/selfdrive/selfdrived/beep.py index 44add999..9cdab417 100644 --- a/selfdrive/selfdrived/beep.py +++ b/selfdrive/selfdrived/beep.py @@ -10,60 +10,69 @@ AudibleAlert = car.CarControl.HUDControl.AudibleAlert class Beepd: def __init__(self): self.current_alert = AudibleAlert.none + self.enable_gpio() + self.startup_beep() def enable_gpio(self): - subprocess.run("echo 42 | sudo tee /sys/class/gpio/export", - shell=True, - stderr=subprocess.STDOUT, - encoding='utf8') + # 尝试 export,忽略已 export 的错误 + try: + subprocess.run("echo 42 | sudo tee /sys/class/gpio/export", + shell=True, + stderr=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + encoding='utf8') + except Exception: + pass subprocess.run("echo \"out\" | sudo tee /sys/class/gpio/gpio42/direction", - shell=True, - stderr=subprocess.STDOUT, - encoding='utf8') + shell=True, + stderr=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + encoding='utf8') def _beep(self, on): - subprocess.run(f"echo \"{"1" if on else "0"}\" | sudo tee /sys/class/gpio/gpio42/value", - shell=True, - stderr=subprocess.STDOUT, - encoding='utf8') + val = "1" if on else "0" + subprocess.run(f"echo \"{val}\" | sudo tee /sys/class/gpio/gpio42/value", + shell=True, + stderr=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + encoding='utf8') def engage(self): - self.enable_gpio() self._beep(True) time.sleep(0.05) self._beep(False) def disengage(self): - self.enable_gpio() for _ in range(2): self._beep(True) time.sleep(0.01) self._beep(False) + time.sleep(0.01) def warning(self): - self.enable_gpio() for _ in range(3): self._beep(True) time.sleep(0.01) self._beep(False) + time.sleep(0.01) + + def startup_beep(self): + self._beep(True) + time.sleep(0.1) + self._beep(False) def dispatch_beep(self, func): threading.Thread(target=func, daemon=True).start() def update_alert(self, new_alert): - current_alert_played_once = self.current_alert == AudibleAlert.none - if self.current_alert != new_alert and (new_alert != AudibleAlert.none or current_alert_played_once): + if new_alert != self.current_alert: self.current_alert = new_alert - print(new_alert) - if self.current_alert == AudibleAlert.engage: + print(f"[BEEP] New alert: {new_alert}") + if new_alert == AudibleAlert.engage: self.dispatch_beep(self.engage) - elif self.current_alert == AudibleAlert.disengage: + elif new_alert == AudibleAlert.disengage: self.dispatch_beep(self.disengage) - elif self.current_alert == AudibleAlert.refuse: - self.dispatch_beep(self.warning) - elif self.current_alert == AudibleAlert.prompt: - self.dispatch_beep(self.warning) - elif self.current_alert == AudibleAlert.warningSoft: + elif new_alert in [AudibleAlert.refuse, AudibleAlert.prompt, AudibleAlert.warningSoft]: self.dispatch_beep(self.warning) def get_audible_alert(self, sm): @@ -83,22 +92,21 @@ class Beepd: cs.selfdriveState.alertSound = AudibleAlert.disengage if frame == 60: cs.selfdriveState.alertSound = AudibleAlert.prompt - if frame == 80: cs.selfdriveState.alertSound = AudibleAlert.disengage - if frame == 85: cs.selfdriveState.alertSound = AudibleAlert.prompt pm.send("selfdriveState", cs) - print("aa") frame += 1 rk.keep_time() - def beepd_thread(self): + def beepd_thread(self, test=False): + if test: + threading.Thread(target=self.test_beepd_thread, daemon=True).start() + sm = messaging.SubMaster(['selfdriveState']) rk = Ratekeeper(20) - # threading.Thread(target=self.test_beepd_thread, daemon=True).start() while True: sm.update(0) @@ -107,8 +115,7 @@ class Beepd: def main(): s = Beepd() - s.beepd_thread() - + s.beepd_thread(test=False) # 改成 True 可启用模拟测试数据 if __name__ == "__main__": main()