Files
sunnypilot/system/micd.py
Jimmy dcd56ae09a store mic audio with toggle (#35595)
* store/send mic audio with toggle

* script to extract audio from logs

* change description and add translation placeholders

* microphone icon

* apply toggle in loggerd

* add legnth and counter

* startFrameIdx counter

* Revert "change description and add translation placeholders"

This reverts commit 7baa1f6de99c6ebe9f9906193da7e83dad79511a.

* send mic data first and then calc

* restore changed description/icon after revert

* adjust fft samples to keep old time window

* remove extract_audio.py since audio is now stored in qcam isntead of rlog

* qt microphone recording icon

* Revert "remove extract_audio.py since audio is now stored in qcam isntead of rlog"

This reverts commit 7a3a75bd8db5376d1e442a3ba931c67550b5f132.

* move extract_audio script and output file by default

* remove length field

* recording indicator swaps sides based on lhd/rhd

* use record icon from comma body

* Update toggle description

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>

* update raylib toggle desc cause I did earlier

* microphone --> soundPressure, audioData --> rawAudioData

* cleanup unused var

* update README

* sidebar mic indicator instead of annotated camera

* improve logic readability

* remove startFrameIdx and sequenceNum

* use Q_PROPERTY/setProperty so that update() is actually called on value change

* specify old id for SoundPressure

* fix typo

---------

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
2025-06-30 13:42:21 -07:00

121 lines
4.0 KiB
Python
Executable File

#!/usr/bin/env python3
import numpy as np
from functools import cache
import threading
from cereal import messaging
from openpilot.common.realtime import Ratekeeper
from openpilot.common.retry import retry
from openpilot.common.swaglog import cloudlog
RATE = 10
FFT_SAMPLES = 1600 # 100ms
REFERENCE_SPL = 2e-5 # newtons/m^2
SAMPLE_RATE = 16000
SAMPLE_BUFFER = 800 # 50ms
@cache
def get_a_weighting_filter():
# Calculate the A-weighting filter
# https://en.wikipedia.org/wiki/A-weighting
freqs = np.fft.fftfreq(FFT_SAMPLES, d=1 / SAMPLE_RATE)
A = 12194 ** 2 * freqs ** 4 / ((freqs ** 2 + 20.6 ** 2) * (freqs ** 2 + 12194 ** 2) * np.sqrt((freqs ** 2 + 107.7 ** 2) * (freqs ** 2 + 737.9 ** 2)))
return A / np.max(A)
def calculate_spl(measurements):
# https://www.engineeringtoolbox.com/sound-pressure-d_711.html
sound_pressure = np.sqrt(np.mean(measurements ** 2)) # RMS of amplitudes
if sound_pressure > 0:
sound_pressure_level = 20 * np.log10(sound_pressure / REFERENCE_SPL) # dB
else:
sound_pressure_level = 0
return sound_pressure, sound_pressure_level
def apply_a_weighting(measurements: np.ndarray) -> np.ndarray:
# Generate a Hanning window of the same length as the audio measurements
measurements_windowed = measurements * np.hanning(len(measurements))
# Apply the A-weighting filter to the signal
return np.abs(np.fft.ifft(np.fft.fft(measurements_windowed) * get_a_weighting_filter()))
class Mic:
def __init__(self):
self.rk = Ratekeeper(RATE)
self.pm = messaging.PubMaster(['soundPressure', 'rawAudioData'])
self.measurements = np.empty(0)
self.sound_pressure = 0
self.sound_pressure_weighted = 0
self.sound_pressure_level_weighted = 0
self.lock = threading.Lock()
def update(self):
with self.lock:
sound_pressure = self.sound_pressure
sound_pressure_weighted = self.sound_pressure_weighted
sound_pressure_level_weighted = self.sound_pressure_level_weighted
msg = messaging.new_message('soundPressure', valid=True)
msg.soundPressure.soundPressure = float(sound_pressure)
msg.soundPressure.soundPressureWeighted = float(sound_pressure_weighted)
msg.soundPressure.soundPressureWeightedDb = float(sound_pressure_level_weighted)
self.pm.send('soundPressure', msg)
self.rk.keep_time()
def callback(self, indata, frames, time, status):
"""
Using amplitude measurements, calculate an uncalibrated sound pressure and sound pressure level.
Then apply A-weighting to the raw amplitudes and run the same calculations again.
Logged A-weighted equivalents are rough approximations of the human-perceived loudness.
"""
msg = messaging.new_message('rawAudioData', valid=True)
audio_data_int_16 = (indata[:, 0] * 32767).astype(np.int16)
msg.rawAudioData.data = audio_data_int_16.tobytes()
msg.rawAudioData.sampleRate = SAMPLE_RATE
self.pm.send('rawAudioData', msg)
with self.lock:
self.measurements = np.concatenate((self.measurements, indata[:, 0]))
while self.measurements.size >= FFT_SAMPLES:
measurements = self.measurements[:FFT_SAMPLES]
self.sound_pressure, _ = calculate_spl(measurements)
measurements_weighted = apply_a_weighting(measurements)
self.sound_pressure_weighted, self.sound_pressure_level_weighted = calculate_spl(measurements_weighted)
self.measurements = self.measurements[FFT_SAMPLES:]
@retry(attempts=7, delay=3)
def get_stream(self, sd):
# reload sounddevice to reinitialize portaudio
sd._terminate()
sd._initialize()
return sd.InputStream(channels=1, samplerate=SAMPLE_RATE, callback=self.callback, blocksize=SAMPLE_BUFFER)
def micd_thread(self):
# sounddevice must be imported after forking processes
import sounddevice as sd
with self.get_stream(sd) as stream:
cloudlog.info(f"micd stream started: {stream.samplerate=} {stream.channels=} {stream.dtype=} {stream.device=}, {stream.blocksize=}")
while True:
self.update()
def main():
mic = Mic()
mic.micd_thread()
if __name__ == "__main__":
main()