编辑beep.py
This commit is contained in:
@@ -10,60 +10,69 @@ AudibleAlert = car.CarControl.HUDControl.AudibleAlert
|
||||
class Beepd:
|
||||
def __init__(self):
|
||||
self.current_alert = AudibleAlert.none
|
||||
self.enable_gpio()
|
||||
self.startup_beep()
|
||||
|
||||
def enable_gpio(self):
|
||||
subprocess.run("echo 42 | sudo tee /sys/class/gpio/export",
|
||||
shell=True,
|
||||
stderr=subprocess.STDOUT,
|
||||
encoding='utf8')
|
||||
# 尝试 export,忽略已 export 的错误
|
||||
try:
|
||||
subprocess.run("echo 42 | sudo tee /sys/class/gpio/export",
|
||||
shell=True,
|
||||
stderr=subprocess.DEVNULL,
|
||||
stdout=subprocess.DEVNULL,
|
||||
encoding='utf8')
|
||||
except Exception:
|
||||
pass
|
||||
subprocess.run("echo \"out\" | sudo tee /sys/class/gpio/gpio42/direction",
|
||||
shell=True,
|
||||
stderr=subprocess.STDOUT,
|
||||
encoding='utf8')
|
||||
shell=True,
|
||||
stderr=subprocess.DEVNULL,
|
||||
stdout=subprocess.DEVNULL,
|
||||
encoding='utf8')
|
||||
|
||||
def _beep(self, on):
|
||||
subprocess.run(f"echo \"{"1" if on else "0"}\" | sudo tee /sys/class/gpio/gpio42/value",
|
||||
shell=True,
|
||||
stderr=subprocess.STDOUT,
|
||||
encoding='utf8')
|
||||
val = "1" if on else "0"
|
||||
subprocess.run(f"echo \"{val}\" | sudo tee /sys/class/gpio/gpio42/value",
|
||||
shell=True,
|
||||
stderr=subprocess.DEVNULL,
|
||||
stdout=subprocess.DEVNULL,
|
||||
encoding='utf8')
|
||||
|
||||
def engage(self):
|
||||
self.enable_gpio()
|
||||
self._beep(True)
|
||||
time.sleep(0.05)
|
||||
self._beep(False)
|
||||
|
||||
def disengage(self):
|
||||
self.enable_gpio()
|
||||
for _ in range(2):
|
||||
self._beep(True)
|
||||
time.sleep(0.01)
|
||||
self._beep(False)
|
||||
time.sleep(0.01)
|
||||
|
||||
def warning(self):
|
||||
self.enable_gpio()
|
||||
for _ in range(3):
|
||||
self._beep(True)
|
||||
time.sleep(0.01)
|
||||
self._beep(False)
|
||||
time.sleep(0.01)
|
||||
|
||||
def startup_beep(self):
|
||||
self._beep(True)
|
||||
time.sleep(0.1)
|
||||
self._beep(False)
|
||||
|
||||
def dispatch_beep(self, func):
|
||||
threading.Thread(target=func, daemon=True).start()
|
||||
|
||||
def update_alert(self, new_alert):
|
||||
current_alert_played_once = self.current_alert == AudibleAlert.none
|
||||
if self.current_alert != new_alert and (new_alert != AudibleAlert.none or current_alert_played_once):
|
||||
if new_alert != self.current_alert:
|
||||
self.current_alert = new_alert
|
||||
print(new_alert)
|
||||
if self.current_alert == AudibleAlert.engage:
|
||||
print(f"[BEEP] New alert: {new_alert}")
|
||||
if new_alert == AudibleAlert.engage:
|
||||
self.dispatch_beep(self.engage)
|
||||
elif self.current_alert == AudibleAlert.disengage:
|
||||
elif new_alert == AudibleAlert.disengage:
|
||||
self.dispatch_beep(self.disengage)
|
||||
elif self.current_alert == AudibleAlert.refuse:
|
||||
self.dispatch_beep(self.warning)
|
||||
elif self.current_alert == AudibleAlert.prompt:
|
||||
self.dispatch_beep(self.warning)
|
||||
elif self.current_alert == AudibleAlert.warningSoft:
|
||||
elif new_alert in [AudibleAlert.refuse, AudibleAlert.prompt, AudibleAlert.warningSoft]:
|
||||
self.dispatch_beep(self.warning)
|
||||
|
||||
def get_audible_alert(self, sm):
|
||||
@@ -83,22 +92,21 @@ class Beepd:
|
||||
cs.selfdriveState.alertSound = AudibleAlert.disengage
|
||||
if frame == 60:
|
||||
cs.selfdriveState.alertSound = AudibleAlert.prompt
|
||||
|
||||
if frame == 80:
|
||||
cs.selfdriveState.alertSound = AudibleAlert.disengage
|
||||
|
||||
if frame == 85:
|
||||
cs.selfdriveState.alertSound = AudibleAlert.prompt
|
||||
|
||||
pm.send("selfdriveState", cs)
|
||||
print("aa")
|
||||
frame += 1
|
||||
rk.keep_time()
|
||||
|
||||
def beepd_thread(self):
|
||||
def beepd_thread(self, test=False):
|
||||
if test:
|
||||
threading.Thread(target=self.test_beepd_thread, daemon=True).start()
|
||||
|
||||
sm = messaging.SubMaster(['selfdriveState'])
|
||||
rk = Ratekeeper(20)
|
||||
# threading.Thread(target=self.test_beepd_thread, daemon=True).start()
|
||||
|
||||
while True:
|
||||
sm.update(0)
|
||||
@@ -107,8 +115,7 @@ class Beepd:
|
||||
|
||||
def main():
|
||||
s = Beepd()
|
||||
s.beepd_thread()
|
||||
|
||||
s.beepd_thread(test=False) # 改成 True 可启用模拟测试数据
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user