webrtcd: ability to have multiple streams publishing same message (#31700)

Use single PubMaster with dynamic services
old-commit-hash: 032c0878b8
This commit is contained in:
Kacper Rączy 2024-03-05 21:14:50 +01:00 committed by GitHub
parent e581322de5
commit 74361d61a3
1 changed files with 17 additions and 1 deletions

View File

@ -102,7 +102,21 @@ class CerealProxyRunner:
await asyncio.sleep(0.01)
class DynamicPubMaster(messaging.PubMaster):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.lock = asyncio.Lock()
async def add_services_if_needed(self, services):
async with self.lock:
for service in services:
if service not in self.sock:
self.sock[service] = messaging.pub_sock(service)
class StreamSession:
shared_pub_master = DynamicPubMaster([])
def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], outgoing_services: list[str], debug_mode: bool = False):
from aiortc.mediastreams import VideoStreamTrack, AudioStreamTrack
from aiortc.contrib.media import MediaBlackhole
@ -129,10 +143,11 @@ class StreamSession:
self.identifier = str(uuid.uuid4())
self.incoming_bridge: CerealIncomingMessageProxy | None = None
self.incoming_bridge_services = incoming_services
self.outgoing_bridge: CerealOutgoingMessageProxy | None = None
self.outgoing_bridge_runner: CerealProxyRunner | None = None
if len(incoming_services) > 0:
self.incoming_bridge = CerealIncomingMessageProxy(messaging.PubMaster(incoming_services))
self.incoming_bridge = CerealIncomingMessageProxy(self.shared_pub_master)
if len(outgoing_services) > 0:
self.outgoing_bridge = CerealOutgoingMessageProxy(messaging.SubMaster(outgoing_services))
self.outgoing_bridge_runner = CerealProxyRunner(self.outgoing_bridge)
@ -168,6 +183,7 @@ class StreamSession:
await self.stream.wait_for_connection()
if self.stream.has_messaging_channel():
if self.incoming_bridge is not None:
await self.shared_pub_master.add_services_if_needed(self.incoming_bridge_services)
self.stream.set_message_handler(self.message_handler)
if self.outgoing_bridge_runner is not None:
channel = self.stream.get_messaging_channel()