Files
IQ.Pilot/msgq_repo/msgq/__init__.py
2026-03-07 14:45:32 -06:00

64 lines
1.9 KiB
Python

# must be built with scons
from msgq.ipc_pyx import Context, Poller, SubSocket, PubSocket, SocketEventHandle, toggle_fake_events, \
set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event, \
context_is_zmq
from msgq.ipc_pyx import MultiplePublishersError, IpcError
from typing import Optional, List, Union
assert MultiplePublishersError
assert IpcError
assert toggle_fake_events
assert set_fake_prefix
assert get_fake_prefix
assert delete_fake_prefix
assert wait_for_one_event
assert context_is_zmq
NO_TRAVERSAL_LIMIT = 2**64-1
context = Context()
def fake_event_handle(endpoint: str, identifier: Optional[Union[str, bytes]] = None, override: bool = True, enable: bool = False) -> SocketEventHandle:
ident = identifier if identifier is not None else get_fake_prefix()
handle = SocketEventHandle(endpoint, ident, override)
if override:
handle.enabled = enable
return handle
def pub_sock(endpoint: str, segment_size: int = 0) -> PubSocket:
sock = PubSocket()
sock.connect(context, endpoint, segment_size)
return sock
def sub_sock(endpoint: str, poller: Optional[Poller] = None, addr: str = "127.0.0.1",
conflate: bool = False, timeout: Optional[int] = None, segment_size: int = 0) -> SubSocket:
sock = SubSocket()
sock.connect(context, endpoint, addr.encode('utf8'), conflate, segment_size)
if timeout is not None:
sock.setTimeout(timeout)
if poller is not None:
poller.registerSocket(sock)
return sock
def drain_sock_raw(sock: SubSocket, wait_for_one: bool = False) -> List[bytes]:
"""Receive all message currently available on the queue"""
ret: List[bytes] = []
while 1:
if wait_for_one and len(ret) == 0:
dat = sock.receive()
else:
dat = sock.receive(non_blocking=True)
if dat is None:
break
ret.append(dat)
return ret