diff --git a/.gitmodules b/.gitmodules index 4ba149cb2..73f832b1d 100644 --- a/.gitmodules +++ b/.gitmodules @@ -13,6 +13,9 @@ [submodule "body"] path = body url = ../../commaai/body.git +[submodule "teleoprtc_repo"] + path = teleoprtc_repo + url = ../../commaai/teleoprtc [submodule "tinygrad"] path = tinygrad_repo url = https://github.com/geohot/tinygrad.git diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b6a46ad50..2109c18b2 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -26,7 +26,7 @@ repos: rev: v2.2.6 hooks: - id: codespell - exclude: '^(third_party/)|(body/)|(cereal/)|(panda/)|(opendbc/)|(rednose/)|(rednose_repo/)|(selfdrive/ui/translations/.*.ts)|(poetry.lock)' + exclude: '^(third_party/)|(body/)|(cereal/)|(panda/)|(opendbc/)|(rednose/)|(rednose_repo/)|(teleoprtc/)|(teleoprtc_repo/)|(selfdrive/ui/translations/.*.ts)|(poetry.lock)' args: # if you've got a short variable name that's getting flagged, add it here - -L bu,ro,te,ue,alo,hda,ois,nam,nams,ned,som,parm,setts,inout,warmup,bumb,nd,sie,preints @@ -39,12 +39,12 @@ repos: language: system types: [python] args: ['--explicit-package-bases', '--local-partial-types'] - exclude: '^(third_party/)|(cereal/)|(opendbc/)|(panda/)|(rednose/)|(rednose_repo/)|(tinygrad/)|(tinygrad_repo/)|(xx/)' + exclude: '^(third_party/)|(cereal/)|(opendbc/)|(panda/)|(rednose/)|(rednose_repo/)|(tinygrad/)|(tinygrad_repo/)|(teleoprtc/)|(teleoprtc_repo/)|(xx/)' - repo: https://github.com/astral-sh/ruff-pre-commit rev: v0.1.6 hooks: - id: ruff - exclude: '^(third_party/)|(cereal/)|(panda/)|(rednose/)|(rednose_repo/)|(tinygrad/)|(tinygrad_repo/)' + exclude: '^(third_party/)|(cereal/)|(panda/)|(rednose/)|(rednose_repo/)|(tinygrad/)|(tinygrad_repo/)|(teleoprtc/)|(teleoprtc_repo/)' - repo: local hooks: - id: cppcheck diff --git a/poetry.lock b/poetry.lock index b05dcae11..1461d5457 100644 --- a/poetry.lock +++ b/poetry.lock @@ -112,66 +112,39 @@ ifaddr = ">=0.2.0" [[package]] name = "aiortc" -version = "1.5.0" +version = "1.6.0" description = "An implementation of WebRTC and ORTC" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "aiortc-1.5.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d1d3f2d6cc22fae5ea57b0371895b7830e878b9e3705fd3742b3453cdfa0fd51"}, - {file = "aiortc-1.5.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:2eaf758b5e0bb16f22a9aeb8ab88eb947345f47e2e46cfca18b2815d44726c4e"}, - {file = "aiortc-1.5.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ee76f6b30d7f39442ba7ac25d58114f077ead1460c5632d0c9e18179d01ad419"}, - {file = "aiortc-1.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4a766052d93474e9bf4186465298b7c8fb9af062ef7f83ba33f191baa79dac1e"}, - {file = "aiortc-1.5.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:fec292636978ed50728f1ce9b7a9f0d7d2e38bd0b920bb53e091e5728b79e231"}, - {file = "aiortc-1.5.0-cp310-cp310-win32.whl", hash = "sha256:27e879b73377d4b94bd86e4c3e8cd8913905fdca1de90a9a4efb0d9d3779dbf4"}, - {file = "aiortc-1.5.0-cp310-cp310-win_amd64.whl", hash = "sha256:a720d0dd53553f6dfc28a53bee2ffce4f13283b4cbbc7db548000054cc63a4f9"}, - {file = "aiortc-1.5.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:a5e8cbfce84badd9a8355819343570bbec1e4eef725996cad6aebe4cc3d03ae8"}, - {file = "aiortc-1.5.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:7931512dbb2ff91fb78f5512ad9ca96546452d7bb627f61bd7393bf59ee48ad3"}, - {file = "aiortc-1.5.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a6abeb857a98014fc97265891ebf4fd989987d2ee091e0844e3c8fc543b6e2f0"}, - {file = "aiortc-1.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dead42dc3a31570fb6f5b94f9be9c78e28b1dc045f71489858116840f299862e"}, - {file = "aiortc-1.5.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3a1a8081ba6d7cabc5896d10462cb50f6db7a8ccf34e6aa3e6c4a0d2d5bc5db5"}, - {file = "aiortc-1.5.0-cp311-cp311-win32.whl", hash = "sha256:cbd5d35bd34b22b8f711c708d266889c973c0dcb38da14a2a9f757266987a181"}, - {file = "aiortc-1.5.0-cp311-cp311-win_amd64.whl", hash = "sha256:6749145e3d527ac98c80837d72fd832b0c403eded3546aeb7cec6f25592b4d5e"}, - {file = "aiortc-1.5.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:50e8e8903cf55f6f2cda9b61c115fca8e444d48f299cdd071980a3b5cec594fa"}, - {file = "aiortc-1.5.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:15218a1b81f4fa1521f3b839eefdce638b34c46306e8eaf069cee7283fe8c838"}, - {file = "aiortc-1.5.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:25bca7c7bbd3619296b5737a810dd0e2fc7f6264e767fca10e65a709a443bf39"}, - {file = "aiortc-1.5.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8f1d88ae0f8b3047a279e4da06f09a35777cfbe0a9177ca8b053865a98a67912"}, - {file = "aiortc-1.5.0-cp37-cp37m-win32.whl", hash = "sha256:f86b68b182537022d4ada49a7723c7a56f39372d6fbc31a29f57315d335cdc29"}, - {file = "aiortc-1.5.0-cp37-cp37m-win_amd64.whl", hash = "sha256:e4bbc2f2b97651f7aa6f5e82c69a22590901962454fc02617c4a559a1b51c21a"}, - {file = "aiortc-1.5.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:7243bce7c3b95e47e56ddf961fbf6015702ddbbf3579b0bbf18c6173b6a6357a"}, - {file = "aiortc-1.5.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:883db8926deaf01fdcd32fbd74fcf055db63e968324ceff41d5a46ec86dff90c"}, - {file = "aiortc-1.5.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:bd663f67344e6fe240c6372f620988db5285c9b1b8336306e9fec76ffb4e5493"}, - {file = "aiortc-1.5.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fe2766f5a7a8e10b445cbf83a510b791a88180c7b1f9adef3f730840fa208afc"}, - {file = "aiortc-1.5.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ba212562025843e8d9faf66e6156b682148f8f9995a19e5c66e8ea802f3fa121"}, - {file = "aiortc-1.5.0-cp38-cp38-win32.whl", hash = "sha256:b7432c9c78e68811ee060ade8b0f867ac42a21677e4d1a9136bb88cd93ab8299"}, - {file = "aiortc-1.5.0-cp38-cp38-win_amd64.whl", hash = "sha256:2ac6e285d4035298f3025b5767dc8f8b0a5a81b2b8744aaa19c75a8fe76f3ad8"}, - {file = "aiortc-1.5.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:aa3c9306d892635dd9c38cc83c6ba67fb608c7da289f422d40f9542e104b7a0f"}, - {file = "aiortc-1.5.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:55dab49a38a212556adadb85ea06f6041d2a9e537e01092f9160b21b186b5039"}, - {file = "aiortc-1.5.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:db93641b6f31315b8fd4c81e14881aef28fbb0700f220926f82909baedfa9888"}, - {file = "aiortc-1.5.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f63fd1168df72498afe0ee06555cc86b8496115ef128519a01d1ea8e404784b8"}, - {file = "aiortc-1.5.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e436f49617887f2009c6ada872c2da201e3c8010b387e7c1057eab229ae438c3"}, - {file = "aiortc-1.5.0-cp39-cp39-win32.whl", hash = "sha256:6f23495d4e11610117d1bad8686d42d529168d463687a1a1e0bec795d1ec33ce"}, - {file = "aiortc-1.5.0-cp39-cp39-win_amd64.whl", hash = "sha256:76206601082e39fdb56d86221729f04f8bd79d65f9fd6b82121947eabf7efd6d"}, - {file = "aiortc-1.5.0-pp37-pypy37_pp73-macosx_10_9_x86_64.whl", hash = "sha256:d3b2a3b4c120a73242ea0b843ecc3efeaea32861682c771e67f7f08f9d18fddc"}, - {file = "aiortc-1.5.0-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3d3f6511f2442f49dfaf4e69865b47e0d6d95440fee2f66e6a03a8b4fa1b28e3"}, - {file = "aiortc-1.5.0-pp37-pypy37_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:77ae221c734864c8749c27cc8add22d296ef3e06ae5f6982dbcbe2d0976b10e1"}, - {file = "aiortc-1.5.0-pp37-pypy37_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7732c825ee96e9bc7fb779a4008be768e7663f7f9bf0ab3cccdd412dd7f1c820"}, - {file = "aiortc-1.5.0-pp37-pypy37_pp73-win_amd64.whl", hash = "sha256:56ffdd67161488c6d934b090a8c2d277bba8806906a3a18493f46b42976569c1"}, - {file = "aiortc-1.5.0-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:f73ba04ca3f331b0ddea0b4ff78424ba30bfd7a49d0b8bd926c75a66ad60f447"}, - {file = "aiortc-1.5.0-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:69eedeec467bd7bcac7ace6ad398133e27f18eeae195a3ad0ffda74255a8b812"}, - {file = "aiortc-1.5.0-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9e095e5fe22f5a2efd4e0657abec1fea7aca864cb32ae3f0816fbcd340a4f2b7"}, - {file = "aiortc-1.5.0-pp38-pypy38_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:dffd6899a5d3db4356d2c17521021032468931ae168545b1ff4815764a5e2873"}, - {file = "aiortc-1.5.0-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:af3eed686d621af93befd7e68bd73d6d8a8aa3e721e8fa3ce7e21b3225e37c38"}, - {file = "aiortc-1.5.0-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:15e222a308dcfc44351bd9acff21723c8065cdcd75d6649d53b2986ada64b6be"}, - {file = "aiortc-1.5.0-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d9ecd61c42e6a78c089805a47542a68eeeec6ba98bf7a2e30cafa3d3f4e94a7f"}, - {file = "aiortc-1.5.0-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2d839437c6000d77511ff1889133150f23fbc8a7365971260c45ce06ff007b0f"}, - {file = "aiortc-1.5.0-pp39-pypy39_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:025847ad6b8c5686f2895394e1de92c043e20e7d90c266de201eef1b1108c8df"}, - {file = "aiortc-1.5.0-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:85583166ab9c9052d2539bee3ba05f27af7f7b93b15c2259c2fc1bd3de5b31d5"}, - {file = "aiortc-1.5.0.tar.gz", hash = "sha256:82b4131d84f862e24e1c3550b73f78412cc9554140a2575577eb3f04675bbad2"}, + {file = "aiortc-1.6.0-cp38-abi3-macosx_10_9_x86_64.whl", hash = "sha256:4675e04d8441797fef6c8a669b3a67d750670d1b897f08886072a084d743e07d"}, + {file = "aiortc-1.6.0-cp38-abi3-macosx_11_0_arm64.whl", hash = "sha256:836c0686fca67f142c52e5af8043206c2bb702ad0ddcdc94ef19caf1c22f8d54"}, + {file = "aiortc-1.6.0-cp38-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:33f846abd753935881158751994a51f14e345762130688b19c26cab42c01266f"}, + {file = "aiortc-1.6.0-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eb4bd45397945a5323bd077d43c702a3a991d75023f23649c1d18df5d80c221d"}, + {file = "aiortc-1.6.0-cp38-abi3-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f93561b515ff478b068bb9b047d8b1e896c747dcf3ee465463047c51a7bea24d"}, + {file = "aiortc-1.6.0-cp38-abi3-win32.whl", hash = "sha256:325f847397af2892aa051dc2880a75e9bd79f535cc05ec8f4538b5ed098b3c5d"}, + {file = "aiortc-1.6.0-cp38-abi3-win_amd64.whl", hash = "sha256:98b118d53ae874126b2e9ec6bb1397ea169b85550c4bd5453e279507ff7f0cf9"}, + {file = "aiortc-1.6.0-pp310-pypy310_pp73-macosx_10_9_x86_64.whl", hash = "sha256:6677018833a5d648754c99c70e6c6f6d4f3942682cda07ed5afa73422f8a009a"}, + {file = "aiortc-1.6.0-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:090936c719225e50cd4d66f476e6c17293a8062cf7687a1baa5080f3c90ec8b1"}, + {file = "aiortc-1.6.0-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4e60d5bc269d4d12f1f5f47e2c17aa3799f3b5c8b73fd6d8d246ddc11bb29776"}, + {file = "aiortc-1.6.0-pp310-pypy310_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:607b9496b4a3c8cd9d32afb6d5bce07f9170831ec44a20ab8af54d53879aafc8"}, + {file = "aiortc-1.6.0-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:0a41f4c0b31e45548e7c7397ef1aecc4be49ab68afd8fa134c07581fe0b3a9c2"}, + {file = "aiortc-1.6.0-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:64a9939016edbc8f300de6189983c983753827813ac9acd9b5be8ce61cc32684"}, + {file = "aiortc-1.6.0-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:813e7985665c94a0e3387b66e39dba6c751e5e588aedbca06d7e52068c6e37fb"}, + {file = "aiortc-1.6.0-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:672d0b4ad35c4d8f014f44a142aa55529ec82cfe2809226e1275e35a71fd4422"}, + {file = "aiortc-1.6.0-pp38-pypy38_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1526a1904174bb11958b8f7e93f01f37f80df2190e5089f0501984bdef79595e"}, + {file = "aiortc-1.6.0-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:cecfa5f462e73218cadc9acd8013cb3a0d9007a4515bceba6e7755d77bb80061"}, + {file = "aiortc-1.6.0-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:febca6773de18a6bb9e5569ae87c8be55ed184695f1f9fc99aa4744a7b0375f8"}, + {file = "aiortc-1.6.0-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b5ba865be9708713397ec584ed1baeb2f15d2fa9c32594ce19a41ffa6e2517cb"}, + {file = "aiortc-1.6.0-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f9759a3c00e46ba1c3499dbf5a8513ae37ba65b940a56b0e7fa5070478e9379f"}, + {file = "aiortc-1.6.0-pp39-pypy39_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9cff7868663d9d1c74e237b86e45126022466240439a5f63c3440e3acdf0305b"}, + {file = "aiortc-1.6.0-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:9f20479d0dd06116ac81d332850fab874d83b561a73fd7252d218f55c6bd5b79"}, + {file = "aiortc-1.6.0.tar.gz", hash = "sha256:08cffbcde3401b33731b1d4169b9ff860b0aaaca200b62e10ce5978238671ad7"}, ] [package.dependencies] aioice = ">=0.9.0,<1.0.0" -av = ">=9.0.0,<11.0.0" +av = ">=9.0.0,<12.0.0" cffi = ">=1.0.0" cryptography = ">=2.2" google-crc32c = ">=1.1" @@ -180,7 +153,7 @@ pylibsrtp = ">=0.5.6" pyopenssl = ">=23.1.0" [package.extras] -dev = ["aiohttp (>=3.7.0)", "coverage (>=5.0)", "numpy (>=1.19.0)"] +dev = ["aiohttp (>=3.7.0)", "coverage[toml] (>=7.2.2)", "numpy (>=1.19.0)"] [[package]] name = "aiosignal" diff --git a/pyproject.toml b/pyproject.toml index 1426905d0..0e55e2ac6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ testpaths = [ "system/proclogd", "system/tests", "system/ubloxd", + "system/webrtc", "tools/lib/tests", "tools/replay", "tools/cabana" @@ -43,6 +44,8 @@ exclude = [ "rednose_repo/", "tinygrad/", "tinygrad_repo/", + "teleoprtc/", + "teleoprtc_repo/", "third_party/", ] @@ -186,6 +189,7 @@ exclude = [ "opendbc", "rednose_repo", "tinygrad_repo", + "teleoprtc", "third_party", ] flake8-implicit-str-concat.allow-multiline=false diff --git a/release/files_common b/release/files_common index af5d156d4..d92be1c3a 100644 --- a/release/files_common +++ b/release/files_common @@ -281,6 +281,11 @@ system/sensord/sensors/*.cc system/sensord/sensors/*.h system/sensord/pigeond.py +system/webrtc/__init__.py +system/webrtc/webrtcd.py +system/webrtc/device/audio.py +system/webrtc/device/video.py + selfdrive/thermald/thermald.py selfdrive/thermald/power_monitoring.py selfdrive/thermald/fan_controller.py @@ -439,6 +444,8 @@ third_party/qt5/larch64/bin/** scripts/update_now.sh scripts/stop_updater.sh +teleoprtc/** + rednose_repo/site_scons/site_tools/rednose_filter.py rednose/.gitignore rednose/** diff --git a/selfdrive/manager/process_config.py b/selfdrive/manager/process_config.py index 6f974b068..4ad257418 100644 --- a/selfdrive/manager/process_config.py +++ b/selfdrive/manager/process_config.py @@ -84,6 +84,7 @@ procs = [ # debug procs NativeProcess("bridge", "cereal/messaging", ["./bridge"], notcar), + PythonProcess("webrtcd", "system.webrtc.webrtcd", notcar), PythonProcess("webjoystick", "tools.bodyteleop.web", notcar), ] diff --git a/system/webrtc/__init__.py b/system/webrtc/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/system/webrtc/device/audio.py b/system/webrtc/device/audio.py new file mode 100644 index 000000000..3c78be675 --- /dev/null +++ b/system/webrtc/device/audio.py @@ -0,0 +1,110 @@ +import asyncio +import io +from typing import Optional, List, Tuple + +import aiortc +import av +import numpy as np +import pyaudio + + +class AudioInputStreamTrack(aiortc.mediastreams.AudioStreamTrack): + PYAUDIO_TO_AV_FORMAT_MAP = { + pyaudio.paUInt8: 'u8', + pyaudio.paInt16: 's16', + pyaudio.paInt24: 's24', + pyaudio.paInt32: 's32', + pyaudio.paFloat32: 'flt', + } + + def __init__(self, audio_format: int = pyaudio.paInt16, rate: int = 16000, channels: int = 1, packet_time: float = 0.020, device_index: Optional[int] = None): + super().__init__() + + self.p = pyaudio.PyAudio() + chunk_size = int(packet_time * rate) + self.stream = self.p.open(format=audio_format, + channels=channels, + rate=rate, + frames_per_buffer=chunk_size, + input=True, + input_device_index=device_index) + self.format = audio_format + self.rate = rate + self.channels = channels + self.packet_time = packet_time + self.chunk_size = chunk_size + self.pts = 0 + + async def recv(self): + mic_data = self.stream.read(self.chunk_size) + mic_array = np.frombuffer(mic_data, dtype=np.int16) + mic_array = np.expand_dims(mic_array, axis=0) + layout = 'stereo' if self.channels > 1 else 'mono' + frame = av.AudioFrame.from_ndarray(mic_array, format=self.PYAUDIO_TO_AV_FORMAT_MAP[self.format], layout=layout) + frame.rate = self.rate + frame.pts = self.pts + self.pts += frame.samples + + return frame + + +class AudioOutputSpeaker: + def __init__(self, audio_format: int = pyaudio.paInt16, rate: int = 48000, channels: int = 2, packet_time: float = 0.2, device_index: Optional[int] = None): + + chunk_size = int(packet_time * rate) + self.p = pyaudio.PyAudio() + self.buffer = io.BytesIO() + self.channels = channels + self.stream = self.p.open(format=audio_format, + channels=channels, + rate=rate, + frames_per_buffer=chunk_size, + output=True, + output_device_index=device_index, + stream_callback=self.__pyaudio_callback) + self.tracks_and_tasks: List[Tuple[aiortc.MediaStreamTrack, Optional[asyncio.Task]]] = [] + + def __pyaudio_callback(self, in_data, frame_count, time_info, status): + if self.buffer.getbuffer().nbytes < frame_count * self.channels * 2: + buff = b'\x00\x00' * frame_count * self.channels + elif self.buffer.getbuffer().nbytes > 115200: # 3x the usual read size + self.buffer.seek(0) + buff = self.buffer.read(frame_count * self.channels * 4) + buff = buff[:frame_count * self.channels * 2] + self.buffer.seek(2) + else: + self.buffer.seek(0) + buff = self.buffer.read(frame_count * self.channels * 2) + self.buffer.seek(2) + return (buff, pyaudio.paContinue) + + async def __consume(self, track): + while True: + try: + frame = await track.recv() + except aiortc.MediaStreamError: + return + + self.buffer.write(bytes(frame.planes[0])) + + def hasTrack(self, track: aiortc.MediaStreamTrack) -> bool: + return any(t == track for t, _ in self.tracks_and_tasks) + + def addTrack(self, track: aiortc.MediaStreamTrack): + if not self.hasTrack(track): + self.tracks_and_tasks.append((track, None)) + + def start(self): + for index, (track, task) in enumerate(self.tracks_and_tasks): + if task is None: + self.tracks_and_tasks[index] = (track, asyncio.create_task(self.__consume(track))) + + def stop(self): + for _, task in self.tracks_and_tasks: + if task is not None: + task.cancel() + + self.tracks_and_tasks = [] + self.stream.stop_stream() + self.stream.close() + self.p.terminate() diff --git a/system/webrtc/device/video.py b/system/webrtc/device/video.py new file mode 100644 index 000000000..1ecb6dbd7 --- /dev/null +++ b/system/webrtc/device/video.py @@ -0,0 +1,69 @@ +import asyncio +from typing import Optional + +import av +from teleoprtc.tracks import TiciVideoStreamTrack + +from cereal import messaging +from openpilot.tools.lib.framereader import FrameReader +from openpilot.common.realtime import DT_MDL, DT_DMON + + +class LiveStreamVideoStreamTrack(TiciVideoStreamTrack): + camera_to_sock_mapping = { + "driver": "livestreamDriverEncodeData", + "wideRoad": "livestreamWideRoadEncodeData", + "road": "livestreamRoadEncodeData", + } + + def __init__(self, camera_type: str): + dt = DT_DMON if camera_type == "driver" else DT_MDL + super().__init__(camera_type, dt) + + self._sock = messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True) + self._pts = 0 + + async def recv(self): + while True: + msg = messaging.recv_one_or_none(self._sock) + if msg is not None: + break + await asyncio.sleep(0.005) + + evta = getattr(msg, msg.which()) + + packet = av.Packet(evta.header + evta.data) + packet.time_base = self._time_base + packet.pts = self._pts + + self.log_debug("track sending frame %s", self._pts) + self._pts += self._dt * self._clock_rate + + return packet + + def codec_preference(self) -> Optional[str]: + return "H264" + + +class FrameReaderVideoStreamTrack(TiciVideoStreamTrack): + def __init__(self, input_file: str, dt: float = DT_MDL, camera_type: str = "driver"): + super().__init__(camera_type, dt) + + frame_reader = FrameReader(input_file) + self._frames = [frame_reader.get(i, pix_fmt="rgb24") for i in range(frame_reader.frame_count)] + self._frame_count = len(self.frames) + self._frame_index = 0 + self._pts = 0 + + async def recv(self): + self.log_debug("track sending frame %s", self._pts) + img = self._frames[self._frame_index] + + new_frame = av.VideoFrame.from_ndarray(img, format="rgb24") + new_frame.pts = self._pts + new_frame.time_base = self._time_base + + self._frame_index = (self._frame_index + 1) % self._frame_count + self._pts = await self.next_pts(self._pts) + + return new_frame diff --git a/system/webrtc/tests/test_stream_session.py b/system/webrtc/tests/test_stream_session.py new file mode 100755 index 000000000..2173c3806 --- /dev/null +++ b/system/webrtc/tests/test_stream_session.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +import asyncio +import unittest +from unittest.mock import Mock, MagicMock, patch +import json +# for aiortc and its dependencies +import warnings +warnings.filterwarnings("ignore", category=DeprecationWarning) + +from aiortc import RTCDataChannel +from aiortc.mediastreams import VIDEO_CLOCK_RATE, VIDEO_TIME_BASE +import capnp +import pyaudio + +from cereal import messaging, log + +from openpilot.system.webrtc.webrtcd import CerealOutgoingMessageProxy, CerealIncomingMessageProxy +from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack +from openpilot.system.webrtc.device.audio import AudioInputStreamTrack +from openpilot.common.realtime import DT_DMON + + +class TestStreamSession(unittest.TestCase): + def setUp(self): + self.loop = asyncio.new_event_loop() + + def tearDown(self): + self.loop.stop() + self.loop.close() + + def test_outgoing_proxy(self): + test_msg = log.Event.new_message() + test_msg.logMonoTime = 123 + test_msg.valid = True + test_msg.customReservedRawData0 = b"test" + expected_dict = {"type": "customReservedRawData0", "logMonoTime": 123, "valid": True, "data": "test"} + expected_json = json.dumps(expected_dict).encode() + + channel = Mock(spec=RTCDataChannel) + mocked_submaster = messaging.SubMaster(["customReservedRawData0"]) + def mocked_update(t): + mocked_submaster.update_msgs(0, [test_msg]) + + with patch.object(messaging.SubMaster, "update", side_effect=mocked_update): + proxy = CerealOutgoingMessageProxy(mocked_submaster) + proxy.add_channel(channel) + + proxy.update() + + channel.send.assert_called_once_with(expected_json) + + def test_incoming_proxy(self): + tested_msgs = [ + {"type": "customReservedRawData0", "data": "test"}, # primitive + {"type": "can", "data": [{"address": 0, "busTime": 0, "dat": "", "src": 0}]}, # list + {"type": "testJoystick", "data": {"axes": [0, 0], "buttons": [False]}}, # dict + ] + + mocked_pubmaster = MagicMock(spec=messaging.PubMaster) + + proxy = CerealIncomingMessageProxy(mocked_pubmaster) + + for msg in tested_msgs: + proxy.send(json.dumps(msg).encode()) + + mocked_pubmaster.send.assert_called_once() + mt, md = mocked_pubmaster.send.call_args.args + self.assertEqual(mt, msg["type"]) + self.assertIsInstance(md, capnp._DynamicStructBuilder) + self.assertTrue(hasattr(md, msg["type"])) + + mocked_pubmaster.reset_mock() + + def test_livestream_track(self): + fake_msg = messaging.new_message("livestreamDriverEncodeData") + + config = {"receive.return_value": fake_msg.to_bytes()} + with patch("cereal.messaging.SubSocket", spec=True, **config): + track = LiveStreamVideoStreamTrack("driver") + + self.assertTrue(track.id.startswith("driver")) + self.assertEqual(track.codec_preference(), "H264") + + for i in range(5): + packet = self.loop.run_until_complete(track.recv()) + self.assertEqual(packet.time_base, VIDEO_TIME_BASE) + self.assertEqual(packet.pts, int(i * DT_DMON * VIDEO_CLOCK_RATE)) + self.assertEqual(packet.size, 0) + + def test_input_audio_track(self): + packet_time, rate = 0.02, 16000 + sample_count = int(packet_time * rate) + mocked_stream = MagicMock(spec=pyaudio.Stream) + mocked_stream.read.return_value = b"\x00" * 2 * sample_count + + config = {"open.side_effect": lambda *args, **kwargs: mocked_stream} + with patch("pyaudio.PyAudio", spec=True, **config): + track = AudioInputStreamTrack(audio_format=pyaudio.paInt16, packet_time=packet_time, rate=rate) + + for i in range(5): + frame = self.loop.run_until_complete(track.recv()) + self.assertEqual(frame.rate, rate) + self.assertEqual(frame.samples, sample_count) + self.assertEqual(frame.pts, i * sample_count) + + +if __name__ == "__main__": + unittest.main() diff --git a/system/webrtc/tests/test_webrtcd.py b/system/webrtc/tests/test_webrtcd.py new file mode 100755 index 000000000..b48bf6bc1 --- /dev/null +++ b/system/webrtc/tests/test_webrtcd.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python +import asyncio +import json +import unittest +from unittest.mock import MagicMock, AsyncMock +# for aiortc and its dependencies +import warnings +warnings.filterwarnings("ignore", category=DeprecationWarning) + +from openpilot.system.webrtc.webrtcd import get_stream + +import aiortc +from teleoprtc import WebRTCOfferBuilder + + +class TestWebrtcdProc(unittest.IsolatedAsyncioTestCase): + async def assertCompletesWithTimeout(self, awaitable, timeout=1): + try: + async with asyncio.timeout(timeout): + await awaitable + except asyncio.TimeoutError: + self.fail("Timeout while waiting for awaitable to complete") + + async def test_webrtcd(self): + mock_request = MagicMock() + async def connect(offer): + body = {'sdp': offer.sdp, 'cameras': offer.video, 'bridge_services_in': [], 'bridge_services_out': []} + mock_request.json.side_effect = AsyncMock(return_value=body) + response = await get_stream(mock_request) + response_json = json.loads(response.text) + return aiortc.RTCSessionDescription(**response_json) + + builder = WebRTCOfferBuilder(connect) + builder.offer_to_receive_video_stream("road") + builder.offer_to_receive_audio_stream() + builder.add_messaging() + + stream = builder.stream() + + await self.assertCompletesWithTimeout(stream.start()) + await self.assertCompletesWithTimeout(stream.wait_for_connection()) + + self.assertTrue(stream.has_incoming_video_track("road")) + self.assertTrue(stream.has_incoming_audio_track()) + self.assertTrue(stream.has_messaging_channel()) + + video_track, audio_track = stream.get_incoming_video_track("road"), stream.get_incoming_audio_track() + await self.assertCompletesWithTimeout(video_track.recv()) + await self.assertCompletesWithTimeout(audio_track.recv()) + + await self.assertCompletesWithTimeout(stream.stop()) + + # cleanup, very implementation specific, test may break if it changes + self.assertTrue(mock_request.app["streams"].__setitem__.called, "Implementation changed, please update this test") + _, session = mock_request.app["streams"].__setitem__.call_args.args + await self.assertCompletesWithTimeout(session.post_run_cleanup()) + + +if __name__ == "__main__": + unittest.main() diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py new file mode 100755 index 000000000..237cae78a --- /dev/null +++ b/system/webrtc/webrtcd.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 + +import argparse +import asyncio +import json +import uuid +import logging +from dataclasses import dataclass, field +from typing import Any, List, Optional, Union + +# aiortc and its dependencies have lots of internal warnings :( +import warnings +warnings.filterwarnings("ignore", category=DeprecationWarning) + +import aiortc +from aiortc.mediastreams import VideoStreamTrack, AudioStreamTrack +from aiortc.contrib.media import MediaBlackhole +from aiohttp import web +import capnp +from teleoprtc import WebRTCAnswerBuilder +from teleoprtc.info import parse_info_from_offer + +from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack +from openpilot.system.webrtc.device.audio import AudioInputStreamTrack, AudioOutputSpeaker + +from cereal import messaging + + +class CerealOutgoingMessageProxy: + def __init__(self, sm: messaging.SubMaster): + self.sm = sm + self.channels: List[aiortc.RTCDataChannel] = [] + + def add_channel(self, channel: aiortc.RTCDataChannel): + self.channels.append(channel) + + def to_json(self, msg_content: Any): + if isinstance(msg_content, capnp._DynamicStructReader): + msg_dict = msg_content.to_dict() + elif isinstance(msg_content, capnp._DynamicListReader): + msg_dict = [self.to_json(msg) for msg in msg_content] + elif isinstance(msg_content, bytes): + msg_dict = msg_content.decode() + else: + msg_dict = msg_content + + return msg_dict + + def update(self): + # this is blocking in async context... + self.sm.update(0) + for service, updated in self.sm.updated.items(): + if not updated: + continue + msg_dict = self.to_json(self.sm[service]) + mono_time, valid = self.sm.logMonoTime[service], self.sm.valid[service] + outgoing_msg = {"type": service, "logMonoTime": mono_time, "valid": valid, "data": msg_dict} + encoded_msg = json.dumps(outgoing_msg).encode() + for channel in self.channels: + channel.send(encoded_msg) + + +class CerealIncomingMessageProxy: + def __init__(self, pm: messaging.PubMaster): + self.pm = pm + + def send(self, message: bytes): + msg_json = json.loads(message) + msg_type, msg_data = msg_json["type"], msg_json["data"] + size = None + if not isinstance(msg_data, dict): + size = len(msg_data) + + msg = messaging.new_message(msg_type, size=size) + setattr(msg, msg_type, msg_data) + self.pm.send(msg_type, msg) + + +class CerealProxyRunner: + def __init__(self, proxy: CerealOutgoingMessageProxy): + self.proxy = proxy + self.is_running = False + self.task = None + self.logger = logging.getLogger("webrtcd") + + def start(self): + assert self.task is None + self.task = asyncio.create_task(self.run()) + + def stop(self): + if self.task is None or self.task.done(): + return + self.task.cancel() + self.task = None + + async def run(self): + while True: + try: + self.proxy.update() + except Exception as ex: + self.logger.error("Cereal outgoing proxy failure: %s", ex) + await asyncio.sleep(0.01) + + +class StreamSession: + def __init__(self, sdp: str, cameras: List[str], incoming_services: List[str], outgoing_services: List[str], debug_mode: bool = False): + config = parse_info_from_offer(sdp) + builder = WebRTCAnswerBuilder(sdp) + + assert len(cameras) == config.n_expected_camera_tracks, "Incoming stream has misconfigured number of video tracks" + for cam in cameras: + track = LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack() + builder.add_video_stream(cam, track) + if config.expected_audio_track: + track = AudioInputStreamTrack() if not debug_mode else AudioStreamTrack() + builder.add_audio_stream(track) + if config.incoming_audio_track: + self.audio_output_cls = AudioOutputSpeaker if not debug_mode else MediaBlackhole + builder.offer_to_receive_audio_stream() + + self.stream = builder.stream() + self.identifier = str(uuid.uuid4()) + + self.outgoing_bridge = CerealOutgoingMessageProxy(messaging.SubMaster(outgoing_services)) + self.incoming_bridge = CerealIncomingMessageProxy(messaging.PubMaster(incoming_services)) + self.outgoing_bridge_runner = CerealProxyRunner(self.outgoing_bridge) + + self.audio_output: Optional[Union[AudioOutputSpeaker, MediaBlackhole]] = None + self.run_task: Optional[asyncio.Task] = None + self.logger = logging.getLogger("webrtcd") + self.logger.info("New stream session (%s), cameras %s, audio in %s out %s, incoming services %s, outgoing services %s", + self.identifier, cameras, config.incoming_audio_track, config.expected_audio_track, incoming_services, outgoing_services) + + def start(self): + self.run_task = asyncio.create_task(self.run()) + + def stop(self): + if self.run_task.done(): + return + self.run_task.cancel() + self.run_task = None + asyncio.run(self.post_run_cleanup()) + + async def get_answer(self): + return await self.stream.start() + + async def message_handler(self, message: bytes): + try: + self.incoming_bridge.send(message) + except Exception as ex: + self.logger.error("Cereal incoming proxy failure: %s", ex) + + async def run(self): + try: + await self.stream.wait_for_connection() + if self.stream.has_messaging_channel(): + self.stream.set_message_handler(self.message_handler) + channel = self.stream.get_messaging_channel() + self.outgoing_bridge_runner.proxy.add_channel(channel) + self.outgoing_bridge_runner.start() + if self.stream.has_incoming_audio_track(): + track = self.stream.get_incoming_audio_track(buffered=False) + self.audio_output = self.audio_output_cls() + self.audio_output.addTrack(track) + self.audio_output.start() + self.logger.info("Stream session (%s) connected", self.identifier) + + await self.stream.wait_for_disconnection() + await self.post_run_cleanup() + + self.logger.info("Stream session (%s) ended", self.identifier) + except Exception as ex: + self.logger.error("Stream session failure: %s", ex) + + async def post_run_cleanup(self): + await self.stream.stop() + self.outgoing_bridge_runner.stop() + if self.audio_output: + self.audio_output.stop() + + +@dataclass +class StreamRequestBody: + sdp: str + cameras: List[str] + bridge_services_in: List[str] = field(default_factory=list) + bridge_services_out: List[str] = field(default_factory=list) + + +async def get_stream(request: web.Request): + stream_dict, debug_mode = request.app['streams'], request.app['debug'] + raw_body = await request.json() + body = StreamRequestBody(**raw_body) + + session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode) + answer = await session.get_answer() + session.start() + + stream_dict[session.identifier] = session + + return web.json_response({"sdp": answer.sdp, "type": answer.type}) + + +async def on_shutdown(app: web.Application): + for session in app['streams'].values(): + session.stop() + del app['streams'] + + +def webrtcd_thread(host: str, port: int, debug: bool): + logging.basicConfig(level=logging.CRITICAL, handlers=[logging.StreamHandler()]) + logging_level = logging.DEBUG if debug else logging.INFO + logging.getLogger("WebRTCStream").setLevel(logging_level) + logging.getLogger("webrtcd").setLevel(logging_level) + + app = web.Application() + + app['streams'] = dict() + app['debug'] = debug + app.on_shutdown.append(on_shutdown) + app.router.add_post("/stream", get_stream) + + web.run_app(app, host=host, port=port) + + +def main(): + parser = argparse.ArgumentParser(description="WebRTC daemon") + parser.add_argument("--host", type=str, default="0.0.0.0", help="Host to listen on") + parser.add_argument("--port", type=int, default=5001, help="Port to listen on") + parser.add_argument("--debug", action="store_true", help="Enable debug mode") + args = parser.parse_args() + + webrtcd_thread(args.host, args.port, args.debug) + + +if __name__=="__main__": + main() diff --git a/teleoprtc b/teleoprtc new file mode 120000 index 000000000..3d3dbc8de --- /dev/null +++ b/teleoprtc @@ -0,0 +1 @@ +teleoprtc_repo/teleoprtc \ No newline at end of file diff --git a/teleoprtc_repo b/teleoprtc_repo new file mode 160000 index 000000000..8ec477868 --- /dev/null +++ b/teleoprtc_repo @@ -0,0 +1 @@ +Subproject commit 8ec477868591eed9a6136a44f16428bc0468b4e9 diff --git a/tools/bodyteleop/bodyav.py b/tools/bodyteleop/bodyav.py deleted file mode 100644 index 3f11f8d4f..000000000 --- a/tools/bodyteleop/bodyav.py +++ /dev/null @@ -1,159 +0,0 @@ -import asyncio -import io -import numpy as np -import pyaudio -import wave - -from aiortc.contrib.media import MediaBlackhole -from aiortc.mediastreams import AudioStreamTrack, MediaStreamError, MediaStreamTrack -from aiortc.mediastreams import VIDEO_CLOCK_RATE, VIDEO_TIME_BASE -from aiortc.rtcrtpsender import RTCRtpSender -from av import CodecContext, Packet -from pydub import AudioSegment -import cereal.messaging as messaging - -AUDIO_RATE = 16000 -SOUNDS = { - 'engage': '../../selfdrive/assets/sounds/engage.wav', - 'disengage': '../../selfdrive/assets/sounds/disengage.wav', - 'error': '../../selfdrive/assets/sounds/warning_immediate.wav', -} - - -def force_codec(pc, sender, forced_codec='video/VP9', stream_type="video"): - codecs = RTCRtpSender.getCapabilities(stream_type).codecs - codec = [codec for codec in codecs if codec.mimeType == forced_codec] - transceiver = next(t for t in pc.getTransceivers() if t.sender == sender) - transceiver.setCodecPreferences(codec) - - -class EncodedBodyVideo(MediaStreamTrack): - kind = "video" - - _start: float - _timestamp: int - - def __init__(self): - super().__init__() - sock_name = 'livestreamDriverEncodeData' - messaging.context = messaging.Context() - self.sock = messaging.sub_sock(sock_name, None, conflate=True) - self.pts = 0 - - async def recv(self) -> Packet: - while True: - msg = messaging.recv_one_or_none(self.sock) - if msg is not None: - break - await asyncio.sleep(0.005) - - evta = getattr(msg, msg.which()) - self.last_idx = evta.idx.encodeId - - packet = Packet(evta.header + evta.data) - packet.time_base = VIDEO_TIME_BASE - packet.pts = self.pts - self.pts += 0.05 * VIDEO_CLOCK_RATE - return packet - - -class WebClientSpeaker(MediaBlackhole): - def __init__(self): - super().__init__() - self.p = pyaudio.PyAudio() - self.buffer = io.BytesIO() - self.channels = 2 - self.stream = self.p.open(format=pyaudio.paInt16, channels=self.channels, rate=48000, frames_per_buffer=9600, - output=True, stream_callback=self.pyaudio_callback) - - def pyaudio_callback(self, in_data, frame_count, time_info, status): - if self.buffer.getbuffer().nbytes < frame_count * self.channels * 2: - buff = np.zeros((frame_count, 2), dtype=np.int16).tobytes() - elif self.buffer.getbuffer().nbytes > 115200: # 3x the usual read size - self.buffer.seek(0) - buff = self.buffer.read(frame_count * self.channels * 4) - buff = buff[:frame_count * self.channels * 2] - self.buffer.seek(2) - else: - self.buffer.seek(0) - buff = self.buffer.read(frame_count * self.channels * 2) - self.buffer.seek(2) - return (buff, pyaudio.paContinue) - - async def consume(self, track): - while True: - try: - frame = await track.recv() - except MediaStreamError: - return - bio = bytes(frame.planes[0]) - self.buffer.write(bio) - - async def start(self): - for track, task in self._MediaBlackhole__tracks.items(): - if task is None: - self._MediaBlackhole__tracks[track] = asyncio.ensure_future(self.consume(track)) - - async def stop(self): - for task in self._MediaBlackhole__tracks.values(): - if task is not None: - task.cancel() - self._MediaBlackhole__tracks = {} - self.stream.stop_stream() - self.stream.close() - self.p.terminate() - - -class BodyMic(AudioStreamTrack): - def __init__(self): - super().__init__() - - self.sample_rate = AUDIO_RATE - self.AUDIO_PTIME = 0.020 # 20ms audio packetization - self.samples = int(self.AUDIO_PTIME * self.sample_rate) - self.FORMAT = pyaudio.paInt16 - self.CHANNELS = 2 - self.RATE = self.sample_rate - self.CHUNK = int(AUDIO_RATE * 0.020) - self.p = pyaudio.PyAudio() - self.mic_stream = self.p.open(format=self.FORMAT, channels=1, rate=self.RATE, input=True, frames_per_buffer=self.CHUNK) - - self.codec = CodecContext.create('pcm_s16le', 'r') - self.codec.sample_rate = self.RATE - self.codec.channels = 2 - self.audio_samples = 0 - self.chunk_number = 0 - - async def recv(self): - mic_data = self.mic_stream.read(self.CHUNK) - mic_sound = AudioSegment(mic_data, sample_width=2, channels=1, frame_rate=self.RATE) - mic_sound = AudioSegment.from_mono_audiosegments(mic_sound, mic_sound) - mic_sound += 3 # increase volume by 3db - packet = Packet(mic_sound.raw_data) - frame = self.codec.decode(packet)[0] - frame.pts = self.audio_samples - self.audio_samples += frame.samples - self.chunk_number = self.chunk_number + 1 - return frame - - -async def play_sound(sound): - chunk = 5120 - with wave.open(SOUNDS[sound], 'rb') as wf: - def callback(in_data, frame_count, time_info, status): - data = wf.readframes(frame_count) - return data, pyaudio.paContinue - - p = pyaudio.PyAudio() - stream = p.open(format=p.get_format_from_width(wf.getsampwidth()), - channels=wf.getnchannels(), - rate=wf.getframerate(), - output=True, - frames_per_buffer=chunk, - stream_callback=callback) - stream.start_stream() - while stream.is_active(): - await asyncio.sleep(0) - stream.stop_stream() - stream.close() - p.terminate() diff --git a/tools/bodyteleop/static/js/jsmain.js b/tools/bodyteleop/static/js/jsmain.js index f52190572..83205a876 100644 --- a/tools/bodyteleop/static/js/jsmain.js +++ b/tools/bodyteleop/static/js/jsmain.js @@ -1,5 +1,5 @@ import { handleKeyX, executePlan } from "./controls.js"; -import { start, stop, last_ping } from "./webrtc.js"; +import { start, stop, lastChannelMessageTime, playSoundRequest } from "./webrtc.js"; export var pc = null; export var dc = null; @@ -9,10 +9,14 @@ document.addEventListener('keyup', (e)=>(handleKeyX(e.key.toLowerCase(), 0))); $(".keys").bind("mousedown touchstart", (e)=>handleKeyX($(e.target).attr('id').replace('key-', ''), 1)); $(".keys").bind("mouseup touchend", (e)=>handleKeyX($(e.target).attr('id').replace('key-', ''), 0)); $("#plan-button").click(executePlan); +$(".sound").click((e)=>{ + const sound = $(e.target).attr('id').replace('sound-', '') + return playSoundRequest(sound); +}); setInterval( () => { const dt = new Date().getTime(); - if ((dt - last_ping) > 1000) { + if ((dt - lastChannelMessageTime) > 1000) { $(".pre-blob").removeClass('blob'); $("#battery").text("-"); $("#ping-time").text('-'); @@ -20,4 +24,4 @@ setInterval( () => { } }, 5000); -start(pc, dc); \ No newline at end of file +start(pc, dc); diff --git a/tools/bodyteleop/static/js/webrtc.js b/tools/bodyteleop/static/js/webrtc.js index e2f6583c1..165a2ce6c 100644 --- a/tools/bodyteleop/static/js/webrtc.js +++ b/tools/bodyteleop/static/js/webrtc.js @@ -1,9 +1,34 @@ import { getXY } from "./controls.js"; import { pingPoints, batteryPoints, chartPing, chartBattery } from "./plots.js"; -export let dcInterval = null; -export let batteryInterval = null; -export let last_ping = null; +export let controlCommandInterval = null; +export let latencyInterval = null; +export let lastChannelMessageTime = null; + + +export function offerRtcRequest(sdp, type) { + return fetch('/offer', { + body: JSON.stringify({sdp: sdp, type: type}), + headers: {'Content-Type': 'application/json'}, + method: 'POST' + }); +} + + +export function playSoundRequest(sound) { + return fetch('/sound', { + body: JSON.stringify({sound}), + headers: {'Content-Type': 'application/json'}, + method: 'POST' + }); +} + + +export function pingHeadRequest() { + return fetch('/', { + method: 'HEAD' + }); +} export function createPeerConnection(pc) { @@ -45,16 +70,7 @@ export function negotiate(pc) { }); }).then(function() { var offer = pc.localDescription; - return fetch('/offer', { - body: JSON.stringify({ - sdp: offer.sdp, - type: offer.type, - }), - headers: { - 'Content-Type': 'application/json' - }, - method: 'POST' - }); + return offerRtcRequest(offer.sdp, offer.type); }).then(function(response) { console.log(response); return response.json(); @@ -86,25 +102,6 @@ export const constraints = { }; -export function createDummyVideoTrack() { - const canvas = document.createElement('canvas'); - const context = canvas.getContext('2d'); - - const frameWidth = 5; // Set the width of the frame - const frameHeight = 5; // Set the height of the frame - canvas.width = frameWidth; - canvas.height = frameHeight; - - context.fillStyle = 'black'; - context.fillRect(0, 0, frameWidth, frameHeight); - - const stream = canvas.captureStream(); - const videoTrack = stream.getVideoTracks()[0]; - - return videoTrack; -} - - export function start(pc, dc) { pc = createPeerConnection(pc); @@ -138,71 +135,56 @@ export function start(pc, dc) { alert('Could not acquire media: ' + err); }); - // add a fake video? - // const dummyVideoTrack = createDummyVideoTrack(); - // const dummyMediaStream = new MediaStream(); - // dummyMediaStream.addTrack(dummyVideoTrack); - // pc.addTrack(dummyVideoTrack, dummyMediaStream); - - // setInterval(() => {pc.getStats(null).then((stats) => {stats.forEach((report) => console.log(report))})}, 10000) - // var video = document.querySelector('video'); - // var print = function (e, f){console.log(e, f); video.requestVideoFrameCallback(print);}; - // video.requestVideoFrameCallback(print); - var parameters = {"ordered": true}; dc = pc.createDataChannel('data', parameters); dc.onclose = function() { - console.log("data channel closed"); - clearInterval(dcInterval); - clearInterval(batteryInterval); + clearInterval(controlCommandInterval); + clearInterval(latencyInterval); }; - function controlCommand() { + + function sendJoystickOverDataChannel() { const {x, y} = getXY(); - const dt = new Date().getTime(); - var message = JSON.stringify({type: 'control_command', x, y, dt}); + var message = JSON.stringify({type: "testJoystick", data: {axes: [x, y], buttons: [false]}}) dc.send(message); } - - function batteryLevel() { - var message = JSON.stringify({type: 'battery_level'}); - dc.send(message); + function checkLatency() { + const initialTime = new Date().getTime(); + pingHeadRequest().then(function() { + const currentTime = new Date().getTime(); + if (Math.abs(currentTime - lastChannelMessageTime) < 1000) { + const pingtime = currentTime - initialTime; + pingPoints.push({'x': currentTime, 'y': pingtime}); + if (pingPoints.length > 1000) { + pingPoints.shift(); + } + chartPing.update(); + $("#ping-time").text((pingtime) + "ms"); + } + }) } - dc.onopen = function() { - dcInterval = setInterval(controlCommand, 50); - batteryInterval = setInterval(batteryLevel, 10000); - controlCommand(); - batteryLevel(); - $(".sound").click((e)=>{ - const sound = $(e.target).attr('id').replace('sound-', '') - dc.send(JSON.stringify({type: 'play_sound', sound})); - }); + controlCommandInterval = setInterval(sendJoystickOverDataChannel, 50); + latencyInterval = setInterval(checkLatency, 1000); + sendJoystickOverDataChannel(); }; - let val_print_idx = 0; + const textDecoder = new TextDecoder(); + var carStaterIndex = 0; dc.onmessage = function(evt) { - const data = JSON.parse(evt.data); - if(val_print_idx == 0 && data.type === 'ping_time') { - const dt = new Date().getTime(); - const pingtime = dt - data.incoming_time; - pingPoints.push({'x': dt, 'y': pingtime}); - if (pingPoints.length > 1000) { - pingPoints.shift(); - } - chartPing.update(); - $("#ping-time").text((pingtime) + "ms"); - last_ping = dt; - $(".pre-blob").addClass('blob'); - } - val_print_idx = (val_print_idx + 1 ) % 20; - if(data.type === 'battery_level') { - $("#battery").text(data.value + "%"); - batteryPoints.push({'x': new Date().getTime(), 'y': data.value}); - if (batteryPoints.length > 1000) { + const text = textDecoder.decode(evt.data); + const msg = JSON.parse(text); + if (carStaterIndex % 100 == 0 && msg.type === 'carState') { + const batteryLevel = Math.round(msg.data.fuelGauge * 100); + $("#battery").text(batteryLevel + "%"); + batteryPoints.push({'x': new Date().getTime(), 'y': batteryLevel}); + if (batteryPoints.length > 1000) { batteryPoints.shift(); } chartBattery.update(); } + carStaterIndex += 1; + lastChannelMessageTime = new Date().getTime(); + $(".pre-blob").addClass('blob'); }; } diff --git a/tools/bodyteleop/web.py b/tools/bodyteleop/web.py index 929cfa26f..2ec6eb037 100644 --- a/tools/bodyteleop/web.py +++ b/tools/bodyteleop/web.py @@ -1,208 +1,121 @@ import asyncio +import dataclasses import json import logging import os import ssl -import uuid -import time +import subprocess -# aiortc and its dependencies have lots of internal warnings :( -import warnings -warnings.resetwarnings() -warnings.simplefilter("always") +from aiohttp import web, ClientSession +import pyaudio +import wave -from aiohttp import web -from aiortc import RTCPeerConnection, RTCSessionDescription - -import cereal.messaging as messaging from openpilot.common.basedir import BASEDIR -from openpilot.tools.bodyteleop.bodyav import BodyMic, WebClientSpeaker, force_codec, play_sound, MediaBlackhole, EncodedBodyVideo +from openpilot.system.webrtc.webrtcd import StreamRequestBody -from typing import Optional - -logger = logging.getLogger("pc") +logger = logging.getLogger("bodyteleop") logging.basicConfig(level=logging.INFO) -pcs: set[RTCPeerConnection] = set() -pm: Optional[messaging.PubMaster] = None -sm: Optional[messaging.SubMaster] = None TELEOPDIR = f"{BASEDIR}/tools/bodyteleop" +WEBRTCD_HOST, WEBRTCD_PORT = "localhost", 5001 +## UTILS +async def play_sound(sound): + SOUNDS = { + 'engage': 'selfdrive/assets/sounds/engage.wav', + 'disengage': 'selfdrive/assets/sounds/disengage.wav', + 'error': 'selfdrive/assets/sounds/warning_immediate.wav', + } + assert sound in SOUNDS + + chunk = 5120 + with wave.open(os.path.join(BASEDIR, SOUNDS[sound]), 'rb') as wf: + def callback(in_data, frame_count, time_info, status): + data = wf.readframes(frame_count) + return data, pyaudio.paContinue + + p = pyaudio.PyAudio() + stream = p.open(format=p.get_format_from_width(wf.getsampwidth()), + channels=wf.getnchannels(), + rate=wf.getframerate(), + output=True, + frames_per_buffer=chunk, + stream_callback=callback) + stream.start_stream() + while stream.is_active(): + await asyncio.sleep(0) + stream.stop_stream() + stream.close() + p.terminate() + +## SSL +def create_ssl_cert(cert_path, key_path): + try: + proc = subprocess.run(f'openssl req -x509 -newkey rsa:4096 -nodes -out {cert_path} -keyout {key_path} \ + -days 365 -subj "/C=US/ST=California/O=commaai/OU=comma body"', + stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + proc.check_returncode() + except subprocess.CalledProcessError as ex: + raise ValueError(f"Error creating SSL certificate:\n[stdout]\n{proc.stdout.decode()}\n[stderr]\n{proc.stderr.decode()}") from ex + + +def create_ssl_context(): + cert_path = os.path.join(TELEOPDIR, 'cert.pem') + key_path = os.path.join(TELEOPDIR, 'key.pem') + if not os.path.exists(cert_path) or not os.path.exists(key_path): + logger.info("Creating certificate...") + create_ssl_cert(cert_path, key_path) + else: + logger.info("Certificate exists!") + ssl_context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_SERVER) + ssl_context.load_cert_chain(cert_path, key_path) + + return ssl_context + +## ENDPOINTS async def index(request): - content = open(TELEOPDIR + "/static/index.html", "r").read() - now = time.monotonic() - request.app['mutable_vals']['last_send_time'] = now - request.app['mutable_vals']['last_override_time'] = now - request.app['mutable_vals']['prev_command'] = [] - request.app['mutable_vals']['find_person'] = False - - return web.Response(content_type="text/html", text=content) + with open(os.path.join(TELEOPDIR, "static", "index.html"), "r") as f: + content = f.read() + return web.Response(content_type="text/html", text=content) -async def control_body(data, app): - now = time.monotonic() - if (data['type'] == 'dummy_controls') and (now < (app['mutable_vals']['last_send_time'] + 0.2)): - return - if (data['type'] == 'control_command') and (app['mutable_vals']['prev_command'] == [data['x'], data['y']] and data['x'] == 0 and data['y'] == 0): - return - - logger.info(str(data)) - x = max(-1.0, min(1.0, data['x'])) - y = max(-1.0, min(1.0, data['y'])) - dat = messaging.new_message('testJoystick') - dat.testJoystick.axes = [x, y] - dat.testJoystick.buttons = [False] - pm.send('testJoystick', dat) - app['mutable_vals']['last_send_time'] = now - if (data['type'] == 'control_command'): - app['mutable_vals']['last_override_time'] = now - app['mutable_vals']['prev_command'] = [data['x'], data['y']] +async def ping(request): + return web.Response(text="pong") -async def dummy_controls_msg(app): - while True: - if 'last_send_time' in app['mutable_vals']: - this_time = time.monotonic() - if (app['mutable_vals']['last_send_time'] + 0.2) < this_time: - await control_body({'type': 'dummy_controls', 'x': 0, 'y': 0}, app) - await asyncio.sleep(0.2) +async def sound(request): + params = await request.json() + sound_to_play = params["sound"] - -async def start_background_tasks(app): - app['bgtask_dummy_controls_msg'] = asyncio.create_task(dummy_controls_msg(app)) - - -async def stop_background_tasks(app): - app['bgtask_dummy_controls_msg'].cancel() - await app['bgtask_dummy_controls_msg'] + await play_sound(sound_to_play) + return web.json_response({"status": "ok"}) async def offer(request): - logger.info("\n\n\nnewoffer!\n\n") - params = await request.json() - offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"]) - speaker = WebClientSpeaker() - blackhole = MediaBlackhole() + body = StreamRequestBody(params["sdp"], ["driver"], ["testJoystick"], ["carState"]) + body_json = json.dumps(dataclasses.asdict(body)) - pc = RTCPeerConnection() - pc_id = "PeerConnection(%s)" % uuid.uuid4() - pcs.add(pc) - - def log_info(msg, *args): - logger.info(pc_id + " " + msg, *args) - - log_info("Created for %s", request.remote) - - @pc.on("datachannel") - def on_datachannel(channel): - request.app['mutable_vals']['remote_channel'] = channel - - @channel.on("message") - async def on_message(message): - data = json.loads(message) - if data['type'] == 'control_command': - await control_body(data, request.app) - times = { - 'type': 'ping_time', - 'incoming_time': data['dt'], - 'outgoing_time': int(time.time() * 1000), - } - channel.send(json.dumps(times)) - if data['type'] == 'battery_level': - sm.update(timeout=0) - if sm.updated['carState']: - channel.send(json.dumps({'type': 'battery_level', 'value': int(sm['carState'].fuelGauge * 100)})) - if data['type'] == 'play_sound': - logger.info(f"Playing sound: {data['sound']}") - await play_sound(data['sound']) - if data['type'] == 'find_person': - request.app['mutable_vals']['find_person'] = data['value'] - - @pc.on("connectionstatechange") - async def on_connectionstatechange(): - log_info("Connection state is %s", pc.connectionState) - if pc.connectionState == "failed": - await pc.close() - pcs.discard(pc) - - @pc.on('track') - def on_track(track): - logger.info(f"Track received: {track.kind}") - if track.kind == "audio": - speaker.addTrack(track) - elif track.kind == "video": - blackhole.addTrack(track) - - @track.on("ended") - async def on_ended(): - log_info("Remote %s track ended", track.kind) - if track.kind == "audio": - await speaker.stop() - elif track.kind == "video": - await blackhole.stop() - - video_sender = pc.addTrack(EncodedBodyVideo()) - force_codec(pc, video_sender, forced_codec='video/H264') - _ = pc.addTrack(BodyMic()) - - await pc.setRemoteDescription(offer) - await speaker.start() - await blackhole.start() - answer = await pc.createAnswer() - await pc.setLocalDescription(answer) - - return web.Response( - content_type="application/json", - text=json.dumps( - {"sdp": pc.localDescription.sdp, "type": pc.localDescription.type} - ), - ) - - -async def on_shutdown(app): - coros = [pc.close() for pc in pcs] - await asyncio.gather(*coros) - pcs.clear() - - -async def run(cmd): - proc = await asyncio.create_subprocess_shell( - cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) - stdout, stderr = await proc.communicate() - logger.info("Created key and cert!") - if stdout: - logger.info(f'[stdout]\n{stdout.decode()}') - if stderr: - logger.info(f'[stderr]\n{stderr.decode()}') + logger.info("Sending offer to webrtcd...") + webrtcd_url = f"http://{WEBRTCD_HOST}:{WEBRTCD_PORT}/stream" + async with ClientSession() as session, session.post(webrtcd_url, data=body_json) as resp: + assert resp.status == 200 + answer = await resp.json() + return web.json_response(answer) def main(): - global pm, sm - pm = messaging.PubMaster(['testJoystick']) - sm = messaging.SubMaster(['carState', 'logMessage']) # App needs to be HTTPS for microphone and audio autoplay to work on the browser - cert_path = TELEOPDIR + '/cert.pem' - key_path = TELEOPDIR + '/key.pem' - if (not os.path.exists(cert_path)) or (not os.path.exists(key_path)): - asyncio.run(run(f'openssl req -x509 -newkey rsa:4096 -nodes -out {cert_path} -keyout {key_path} \ - -days 365 -subj "/C=US/ST=California/O=commaai/OU=comma body"')) - else: - logger.info("Certificate exists!") - ssl_context = ssl.SSLContext() - ssl_context.load_cert_chain(cert_path, key_path) + ssl_context = create_ssl_context() + app = web.Application() app['mutable_vals'] = {} - app.on_shutdown.append(on_shutdown) - app.router.add_post("/offer", offer) app.router.add_get("/", index) - app.router.add_static('/static', TELEOPDIR + '/static') - app.on_startup.append(start_background_tasks) - app.on_cleanup.append(stop_background_tasks) + app.router.add_get("/ping", ping, allow_head=True) + app.router.add_post("/offer", offer) + app.router.add_post("/sound", sound) + app.router.add_static('/static', os.path.join(TELEOPDIR, 'static')) web.run_app(app, access_log=None, host="0.0.0.0", port=5000, ssl_context=ssl_context)