diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 293fc8b..9163417 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -21,8 +21,7 @@ jobs: - name: Install package run: pip install -e .[dev] - name: Unit Tests - run: | - cd tests/; python -m unittest discover + run: pytest static_analysis: runs-on: ubuntu-latest steps: diff --git a/pyproject.toml b/pyproject.toml index ffdef03..b93df69 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,10 @@ dependencies = [ [project.optional-dependencies] dev = [ "parameterized>=0.8", - "pre-commit" + "pre-commit", + "pytest", + "pytest-asyncio", + "pytest-xdist" ] [project.urls] @@ -41,3 +44,9 @@ target-version="py38" select = ["E", "F", "W", "PIE", "C4", "ISC", "RUF008", "RUF100", "A", "B", "TID251"] ignore = ["W292", "E741", "E402", "C408", "ISC003", "B027", "B024"] flake8-implicit-str-concat.allow-multiline=false + +[tool.ruff.lint.flake8-tidy-imports.banned-api] +"unittest".msg = "Use pytest" + +[tool.pytest.ini_options] +addopts = "--durations=10 -n auto" diff --git a/tests/test_info.py b/tests/test_info.py index 893d8a4..5e939cd 100755 --- a/tests/test_info.py +++ b/tests/test_info.py @@ -1,7 +1,5 @@ #!/usr/bin/env python3 -import unittest - from teleoprtc.info import parse_info_from_offer @@ -9,7 +7,7 @@ def lf2crlf(x): return x.replace("\n", "\r\n") -class TestStream(unittest.TestCase): +class TestStream: def test_double_video_tracks(self): sdp = """v=0 o=- 3910210993 3910210993 IN IP4 0.0.0.0 @@ -90,10 +88,10 @@ a=ice-pwd:1234 a=fingerprint:sha-256 70:3A:2D:37:3C:52:96:0E:10:F6:4D:7A:EB:18:38:1B:FD:CA:A5:90:D7:6C:DA:A9:39:76:C9:2F:FB:FF:56:0C a=setup:actpass""" info = parse_info_from_offer(lf2crlf(sdp)) - self.assertEqual(info.n_expected_camera_tracks, 2) - self.assertFalse(info.expected_audio_track) - self.assertFalse(info.incoming_audio_track) - self.assertFalse(info.incoming_datachannel) + assert info.n_expected_camera_tracks == 2 + assert not info.expected_audio_track + assert not info.incoming_audio_track + assert not info.incoming_datachannel def test_recvonly_audio(self): sdp = """v=0 @@ -120,10 +118,10 @@ a=ice-pwd:1234 a=fingerprint:sha-256 40:4B:14:CF:70:B8:67:E1:B1:FF:7E:F9:22:6E:60:7D:73:B5:1E:38:4B:10:20:9C:CD:1C:47:02:52:ED:45:25 a=setup:actpass""" info = parse_info_from_offer(lf2crlf(sdp)) - self.assertEqual(info.n_expected_camera_tracks, 0) - self.assertTrue(info.expected_audio_track) - self.assertFalse(info.incoming_audio_track) - self.assertFalse(info.incoming_datachannel) + assert info.n_expected_camera_tracks == 0 + assert info.expected_audio_track + assert not info.incoming_audio_track + assert not info.incoming_datachannel def test_incoming_datachanel(self): sdp = """v=0 @@ -142,11 +140,7 @@ a=ice-pwd:1234 a=fingerprint:sha-256 9B:C0:F3:35:8E:05:A1:15:DB:F8:39:0E:B0:E0:0C:EB:82:E4:B9:26:18:A6:43:2D:B9:9A:23:96:0A:59:B6:58 a=setup:actpass""" info = parse_info_from_offer(lf2crlf(sdp)) - self.assertEqual(info.n_expected_camera_tracks, 0) - self.assertFalse(info.expected_audio_track) - self.assertFalse(info.incoming_audio_track) - self.assertTrue(info.incoming_datachannel) - - -if __name__ == '__main__': - unittest.main() + assert info.n_expected_camera_tracks == 0 + assert not info.expected_audio_track + assert not info.incoming_audio_track + assert info.incoming_datachannel diff --git a/tests/test_integration.py b/tests/test_integration.py index 5377bce..d804af7 100755 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -1,8 +1,8 @@ #!/usr/bin/env python3 +import pytest import asyncio import sys -import unittest from aiortc.mediastreams import AudioStreamTrack, VideoStreamTrack from parameterized import parameterized @@ -65,7 +65,8 @@ class SimpleAnswerProvider: return answer -class TestStreamIntegration(unittest.IsolatedAsyncioTestCase): +@pytest.mark.asyncio +class TestStreamIntegration: @parameterized.expand([ # name, recv_cameras, recv_audio, messaging ("multi_camera", ["driver", "wideRoad", "road"], False, False), @@ -85,53 +86,49 @@ class TestStreamIntegration(unittest.IsolatedAsyncioTestCase): stream = offer_builder.stream() _ = await stream.start() - self.assertTrue(stream.is_started) + assert stream.is_started try: async with timeout(2): await stream.wait_for_connection() except TimeoutError: - self.fail("Timed out waiting for connection") - self.assertTrue(stream.is_connected_and_ready) + pytest.fail("Timed out waiting for connection") + assert stream.is_connected_and_ready - self.assertEqual(stream.has_messaging_channel(), add_messaging) + assert stream.has_messaging_channel() == add_messaging if stream.has_messaging_channel(): channel = stream.get_messaging_channel() - self.assertIsNotNone(channel) - self.assertEqual(channel.readyState, "open") + assert channel is not None + assert channel.readyState == "open" - self.assertEqual(stream.has_incoming_audio_track(), recv_audio) + assert stream.has_incoming_audio_track() == recv_audio if stream.has_incoming_audio_track(): track = stream.get_incoming_audio_track(False) - self.assertIsNotNone(track) - self.assertEqual(track.readyState, "live") - self.assertEqual(track.kind, "audio") + assert track is not None + assert track.readyState == "live" + assert track.kind == "audio" # test audio recv try: async with timeout(1): await track.recv() except TimeoutError: - self.fail("Timed out waiting for audio frame") + pytest.fail("Timed out waiting for audio frame") for cam in cameras: - self.assertTrue(stream.has_incoming_video_track(cam)) + assert stream.has_incoming_video_track(cam) if stream.has_incoming_video_track(cam): track = stream.get_incoming_video_track(cam, False) - self.assertIsNotNone(track) - self.assertEqual(track.readyState, "live") - self.assertEqual(track.kind, "video") + assert track is not None + assert track.readyState == "live" + assert track.kind == "video" # test video recv try: async with timeout(1): await stream.get_incoming_video_track(cam, False).recv() except TimeoutError: - self.fail("Timed out waiting for video frame") + pytest.fail("Timed out waiting for video frame") await stream.stop() await simple_answerer.stream.stop() - self.assertFalse(stream.is_started) - self.assertFalse(stream.is_connected_and_ready) - - -if __name__ == '__main__': - unittest.main() + assert not stream.is_started + assert not stream.is_connected_and_ready diff --git a/tests/test_stream.py b/tests/test_stream.py index 7882eed..7e697e5 100755 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -import unittest +import pytest import aiortc from aiortc.mediastreams import AudioStreamTrack @@ -29,7 +29,8 @@ class DummyH264VideoStreamTrack(TiciVideoStreamTrack): return "H264" -class TestOfferStream(unittest.IsolatedAsyncioTestCase): +@pytest.mark.asyncio +class TestOfferStream: async def test_offer_stream_sdp_recvonly_audio(self): capture = OfferCapture() builder = WebRTCOfferBuilder(capture) @@ -42,8 +43,8 @@ class TestOfferStream(unittest.IsolatedAsyncioTestCase): pass info = parse_info_from_offer(capture.offer.sdp) - self.assertTrue(info.expected_audio_track) - self.assertFalse(info.incoming_audio_track) + assert info.expected_audio_track + assert not info.incoming_audio_track async def test_offer_stream_sdp_sendonly_audio(self): capture = OfferCapture() @@ -57,8 +58,8 @@ class TestOfferStream(unittest.IsolatedAsyncioTestCase): pass info = parse_info_from_offer(capture.offer.sdp) - self.assertFalse(info.expected_audio_track) - self.assertTrue(info.incoming_audio_track) + assert not info.expected_audio_track + assert info.incoming_audio_track async def test_offer_stream_sdp_channel(self): capture = OfferCapture() @@ -72,10 +73,11 @@ class TestOfferStream(unittest.IsolatedAsyncioTestCase): pass info = parse_info_from_offer(capture.offer.sdp) - self.assertTrue(info.incoming_datachannel) + assert info.incoming_datachannel -class TestAnswerStream(unittest.IsolatedAsyncioTestCase): +@pytest.mark.asyncio +class TestAnswerStream: async def test_codec_preference(self): offer_sdp = """v=0 o=- 3910274679 3910274679 IN IP4 0.0.0.0 @@ -115,7 +117,7 @@ a=setup:actpass""" sdp_desc = aiortc.sdp.SessionDescription.parse(answer.sdp) video_desc = [m for m in sdp_desc.media if m.kind == "video"][0] codecs = video_desc.rtp.codecs - self.assertEqual(codecs[0].mimeType, "video/H264") + assert codecs[0].mimeType == "video/H264" async def test_fail_if_preferred_codec_not_in_offer(self): offer_sdp = """v=0 @@ -147,9 +149,5 @@ a=setup:actpass""" builder.add_video_stream("road", DummyH264VideoStreamTrack("road", 0.05)) stream = builder.stream() - with self.assertRaises(ValueError): + with pytest.raises(ValueError): _ = await stream.start() - - -if __name__=="__main__": - unittest.main() \ No newline at end of file diff --git a/tests/test_track.py b/tests/test_track.py index 3c8fcbb..52b4483 100755 --- a/tests/test_track.py +++ b/tests/test_track.py @@ -1,22 +1,22 @@ #!/usr/bin/env python3 -import unittest +import pytest import aiortc from teleoprtc.tracks import video_track_id, parse_video_track_id, TiciVideoStreamTrack, TiciTrackWrapper -class TestTracks(unittest.TestCase): +class TestTracks: def test_track_id(self): expected_camera_type, expected_track_id = "driver", "test" track_id = video_track_id(expected_camera_type, expected_track_id) camera_type, track_id = parse_video_track_id(track_id) - self.assertEqual(expected_camera_type, camera_type) - self.assertEqual(expected_track_id, track_id) + assert expected_camera_type == camera_type + assert expected_track_id == track_id def test_track_id_invalid(self): - with self.assertRaises(ValueError): + with pytest.raises(ValueError): parse_video_track_id("test") def test_tici_track_id(self): @@ -26,13 +26,9 @@ class TestTracks(unittest.TestCase): track = VideoStream("driver", 0.1) camera_type, _ = parse_video_track_id(track.id) - self.assertEqual("driver", camera_type) + assert "driver" == camera_type def test_tici_wrapper_id(self): track = TiciTrackWrapper("driver", aiortc.mediastreams.VideoStreamTrack()) camera_type, _ = parse_video_track_id(track.id) - self.assertEqual("driver", camera_type) - - -if __name__ == '__main__': - unittest.main() + assert "driver" == camera_type