mirror of https://github.com/commaai/teleoprtc.git
Switch to pytest (#8)
This commit is contained in:
parent
fdcff87aaf
commit
389815b8ca
|
@ -21,8 +21,7 @@ jobs:
|
|||
- name: Install package
|
||||
run: pip install -e .[dev]
|
||||
- name: Unit Tests
|
||||
run: |
|
||||
cd tests/; python -m unittest discover
|
||||
run: pytest
|
||||
static_analysis:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
|
|
|
@ -25,7 +25,10 @@ dependencies = [
|
|||
[project.optional-dependencies]
|
||||
dev = [
|
||||
"parameterized>=0.8",
|
||||
"pre-commit"
|
||||
"pre-commit",
|
||||
"pytest",
|
||||
"pytest-asyncio",
|
||||
"pytest-xdist"
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
|
@ -41,3 +44,9 @@ target-version="py38"
|
|||
select = ["E", "F", "W", "PIE", "C4", "ISC", "RUF008", "RUF100", "A", "B", "TID251"]
|
||||
ignore = ["W292", "E741", "E402", "C408", "ISC003", "B027", "B024"]
|
||||
flake8-implicit-str-concat.allow-multiline=false
|
||||
|
||||
[tool.ruff.lint.flake8-tidy-imports.banned-api]
|
||||
"unittest".msg = "Use pytest"
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
addopts = "--durations=10 -n auto"
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import unittest
|
||||
|
||||
from teleoprtc.info import parse_info_from_offer
|
||||
|
||||
|
||||
|
@ -9,7 +7,7 @@ def lf2crlf(x):
|
|||
return x.replace("\n", "\r\n")
|
||||
|
||||
|
||||
class TestStream(unittest.TestCase):
|
||||
class TestStream:
|
||||
def test_double_video_tracks(self):
|
||||
sdp = """v=0
|
||||
o=- 3910210993 3910210993 IN IP4 0.0.0.0
|
||||
|
@ -90,10 +88,10 @@ a=ice-pwd:1234
|
|||
a=fingerprint:sha-256 70:3A:2D:37:3C:52:96:0E:10:F6:4D:7A:EB:18:38:1B:FD:CA:A5:90:D7:6C:DA:A9:39:76:C9:2F:FB:FF:56:0C
|
||||
a=setup:actpass"""
|
||||
info = parse_info_from_offer(lf2crlf(sdp))
|
||||
self.assertEqual(info.n_expected_camera_tracks, 2)
|
||||
self.assertFalse(info.expected_audio_track)
|
||||
self.assertFalse(info.incoming_audio_track)
|
||||
self.assertFalse(info.incoming_datachannel)
|
||||
assert info.n_expected_camera_tracks == 2
|
||||
assert not info.expected_audio_track
|
||||
assert not info.incoming_audio_track
|
||||
assert not info.incoming_datachannel
|
||||
|
||||
def test_recvonly_audio(self):
|
||||
sdp = """v=0
|
||||
|
@ -120,10 +118,10 @@ a=ice-pwd:1234
|
|||
a=fingerprint:sha-256 40:4B:14:CF:70:B8:67:E1:B1:FF:7E:F9:22:6E:60:7D:73:B5:1E:38:4B:10:20:9C:CD:1C:47:02:52:ED:45:25
|
||||
a=setup:actpass"""
|
||||
info = parse_info_from_offer(lf2crlf(sdp))
|
||||
self.assertEqual(info.n_expected_camera_tracks, 0)
|
||||
self.assertTrue(info.expected_audio_track)
|
||||
self.assertFalse(info.incoming_audio_track)
|
||||
self.assertFalse(info.incoming_datachannel)
|
||||
assert info.n_expected_camera_tracks == 0
|
||||
assert info.expected_audio_track
|
||||
assert not info.incoming_audio_track
|
||||
assert not info.incoming_datachannel
|
||||
|
||||
def test_incoming_datachanel(self):
|
||||
sdp = """v=0
|
||||
|
@ -142,11 +140,7 @@ a=ice-pwd:1234
|
|||
a=fingerprint:sha-256 9B:C0:F3:35:8E:05:A1:15:DB:F8:39:0E:B0:E0:0C:EB:82:E4:B9:26:18:A6:43:2D:B9:9A:23:96:0A:59:B6:58
|
||||
a=setup:actpass"""
|
||||
info = parse_info_from_offer(lf2crlf(sdp))
|
||||
self.assertEqual(info.n_expected_camera_tracks, 0)
|
||||
self.assertFalse(info.expected_audio_track)
|
||||
self.assertFalse(info.incoming_audio_track)
|
||||
self.assertTrue(info.incoming_datachannel)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
assert info.n_expected_camera_tracks == 0
|
||||
assert not info.expected_audio_track
|
||||
assert not info.incoming_audio_track
|
||||
assert info.incoming_datachannel
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import pytest
|
||||
import asyncio
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
from aiortc.mediastreams import AudioStreamTrack, VideoStreamTrack
|
||||
from parameterized import parameterized
|
||||
|
@ -65,7 +65,8 @@ class SimpleAnswerProvider:
|
|||
return answer
|
||||
|
||||
|
||||
class TestStreamIntegration(unittest.IsolatedAsyncioTestCase):
|
||||
@pytest.mark.asyncio
|
||||
class TestStreamIntegration:
|
||||
@parameterized.expand([
|
||||
# name, recv_cameras, recv_audio, messaging
|
||||
("multi_camera", ["driver", "wideRoad", "road"], False, False),
|
||||
|
@ -85,53 +86,49 @@ class TestStreamIntegration(unittest.IsolatedAsyncioTestCase):
|
|||
stream = offer_builder.stream()
|
||||
|
||||
_ = await stream.start()
|
||||
self.assertTrue(stream.is_started)
|
||||
assert stream.is_started
|
||||
|
||||
try:
|
||||
async with timeout(2):
|
||||
await stream.wait_for_connection()
|
||||
except TimeoutError:
|
||||
self.fail("Timed out waiting for connection")
|
||||
self.assertTrue(stream.is_connected_and_ready)
|
||||
pytest.fail("Timed out waiting for connection")
|
||||
assert stream.is_connected_and_ready
|
||||
|
||||
self.assertEqual(stream.has_messaging_channel(), add_messaging)
|
||||
assert stream.has_messaging_channel() == add_messaging
|
||||
if stream.has_messaging_channel():
|
||||
channel = stream.get_messaging_channel()
|
||||
self.assertIsNotNone(channel)
|
||||
self.assertEqual(channel.readyState, "open")
|
||||
assert channel is not None
|
||||
assert channel.readyState == "open"
|
||||
|
||||
self.assertEqual(stream.has_incoming_audio_track(), recv_audio)
|
||||
assert stream.has_incoming_audio_track() == recv_audio
|
||||
if stream.has_incoming_audio_track():
|
||||
track = stream.get_incoming_audio_track(False)
|
||||
self.assertIsNotNone(track)
|
||||
self.assertEqual(track.readyState, "live")
|
||||
self.assertEqual(track.kind, "audio")
|
||||
assert track is not None
|
||||
assert track.readyState == "live"
|
||||
assert track.kind == "audio"
|
||||
# test audio recv
|
||||
try:
|
||||
async with timeout(1):
|
||||
await track.recv()
|
||||
except TimeoutError:
|
||||
self.fail("Timed out waiting for audio frame")
|
||||
pytest.fail("Timed out waiting for audio frame")
|
||||
|
||||
for cam in cameras:
|
||||
self.assertTrue(stream.has_incoming_video_track(cam))
|
||||
assert stream.has_incoming_video_track(cam)
|
||||
if stream.has_incoming_video_track(cam):
|
||||
track = stream.get_incoming_video_track(cam, False)
|
||||
self.assertIsNotNone(track)
|
||||
self.assertEqual(track.readyState, "live")
|
||||
self.assertEqual(track.kind, "video")
|
||||
assert track is not None
|
||||
assert track.readyState == "live"
|
||||
assert track.kind == "video"
|
||||
# test video recv
|
||||
try:
|
||||
async with timeout(1):
|
||||
await stream.get_incoming_video_track(cam, False).recv()
|
||||
except TimeoutError:
|
||||
self.fail("Timed out waiting for video frame")
|
||||
pytest.fail("Timed out waiting for video frame")
|
||||
|
||||
await stream.stop()
|
||||
await simple_answerer.stream.stop()
|
||||
self.assertFalse(stream.is_started)
|
||||
self.assertFalse(stream.is_connected_and_ready)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
assert not stream.is_started
|
||||
assert not stream.is_connected_and_ready
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import unittest
|
||||
import pytest
|
||||
|
||||
import aiortc
|
||||
from aiortc.mediastreams import AudioStreamTrack
|
||||
|
@ -29,7 +29,8 @@ class DummyH264VideoStreamTrack(TiciVideoStreamTrack):
|
|||
return "H264"
|
||||
|
||||
|
||||
class TestOfferStream(unittest.IsolatedAsyncioTestCase):
|
||||
@pytest.mark.asyncio
|
||||
class TestOfferStream:
|
||||
async def test_offer_stream_sdp_recvonly_audio(self):
|
||||
capture = OfferCapture()
|
||||
builder = WebRTCOfferBuilder(capture)
|
||||
|
@ -42,8 +43,8 @@ class TestOfferStream(unittest.IsolatedAsyncioTestCase):
|
|||
pass
|
||||
|
||||
info = parse_info_from_offer(capture.offer.sdp)
|
||||
self.assertTrue(info.expected_audio_track)
|
||||
self.assertFalse(info.incoming_audio_track)
|
||||
assert info.expected_audio_track
|
||||
assert not info.incoming_audio_track
|
||||
|
||||
async def test_offer_stream_sdp_sendonly_audio(self):
|
||||
capture = OfferCapture()
|
||||
|
@ -57,8 +58,8 @@ class TestOfferStream(unittest.IsolatedAsyncioTestCase):
|
|||
pass
|
||||
|
||||
info = parse_info_from_offer(capture.offer.sdp)
|
||||
self.assertFalse(info.expected_audio_track)
|
||||
self.assertTrue(info.incoming_audio_track)
|
||||
assert not info.expected_audio_track
|
||||
assert info.incoming_audio_track
|
||||
|
||||
async def test_offer_stream_sdp_channel(self):
|
||||
capture = OfferCapture()
|
||||
|
@ -72,10 +73,11 @@ class TestOfferStream(unittest.IsolatedAsyncioTestCase):
|
|||
pass
|
||||
|
||||
info = parse_info_from_offer(capture.offer.sdp)
|
||||
self.assertTrue(info.incoming_datachannel)
|
||||
assert info.incoming_datachannel
|
||||
|
||||
|
||||
class TestAnswerStream(unittest.IsolatedAsyncioTestCase):
|
||||
@pytest.mark.asyncio
|
||||
class TestAnswerStream:
|
||||
async def test_codec_preference(self):
|
||||
offer_sdp = """v=0
|
||||
o=- 3910274679 3910274679 IN IP4 0.0.0.0
|
||||
|
@ -115,7 +117,7 @@ a=setup:actpass"""
|
|||
sdp_desc = aiortc.sdp.SessionDescription.parse(answer.sdp)
|
||||
video_desc = [m for m in sdp_desc.media if m.kind == "video"][0]
|
||||
codecs = video_desc.rtp.codecs
|
||||
self.assertEqual(codecs[0].mimeType, "video/H264")
|
||||
assert codecs[0].mimeType == "video/H264"
|
||||
|
||||
async def test_fail_if_preferred_codec_not_in_offer(self):
|
||||
offer_sdp = """v=0
|
||||
|
@ -147,9 +149,5 @@ a=setup:actpass"""
|
|||
builder.add_video_stream("road", DummyH264VideoStreamTrack("road", 0.05))
|
||||
stream = builder.stream()
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
with pytest.raises(ValueError):
|
||||
_ = await stream.start()
|
||||
|
||||
|
||||
if __name__=="__main__":
|
||||
unittest.main()
|
|
@ -1,22 +1,22 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import unittest
|
||||
import pytest
|
||||
|
||||
import aiortc
|
||||
|
||||
from teleoprtc.tracks import video_track_id, parse_video_track_id, TiciVideoStreamTrack, TiciTrackWrapper
|
||||
|
||||
|
||||
class TestTracks(unittest.TestCase):
|
||||
class TestTracks:
|
||||
def test_track_id(self):
|
||||
expected_camera_type, expected_track_id = "driver", "test"
|
||||
track_id = video_track_id(expected_camera_type, expected_track_id)
|
||||
camera_type, track_id = parse_video_track_id(track_id)
|
||||
self.assertEqual(expected_camera_type, camera_type)
|
||||
self.assertEqual(expected_track_id, track_id)
|
||||
assert expected_camera_type == camera_type
|
||||
assert expected_track_id == track_id
|
||||
|
||||
def test_track_id_invalid(self):
|
||||
with self.assertRaises(ValueError):
|
||||
with pytest.raises(ValueError):
|
||||
parse_video_track_id("test")
|
||||
|
||||
def test_tici_track_id(self):
|
||||
|
@ -26,13 +26,9 @@ class TestTracks(unittest.TestCase):
|
|||
|
||||
track = VideoStream("driver", 0.1)
|
||||
camera_type, _ = parse_video_track_id(track.id)
|
||||
self.assertEqual("driver", camera_type)
|
||||
assert "driver" == camera_type
|
||||
|
||||
def test_tici_wrapper_id(self):
|
||||
track = TiciTrackWrapper("driver", aiortc.mediastreams.VideoStreamTrack())
|
||||
camera_type, _ = parse_video_track_id(track.id)
|
||||
self.assertEqual("driver", camera_type)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
assert "driver" == camera_type
|
||||
|
|
Loading…
Reference in New Issue