# must be built with scons from msgq.ipc_pyx import Context, Poller, SubSocket, PubSocket, SocketEventHandle, toggle_fake_events, \ set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event, \ context_is_zmq from msgq.ipc_pyx import MultiplePublishersError, IpcError from typing import Optional, List, Union assert MultiplePublishersError assert IpcError assert toggle_fake_events assert set_fake_prefix assert get_fake_prefix assert delete_fake_prefix assert wait_for_one_event assert context_is_zmq NO_TRAVERSAL_LIMIT = 2**64-1 context = Context() def fake_event_handle(endpoint: str, identifier: Optional[Union[str, bytes]] = None, override: bool = True, enable: bool = False) -> SocketEventHandle: ident = identifier if identifier is not None else get_fake_prefix() handle = SocketEventHandle(endpoint, ident, override) if override: handle.enabled = enable return handle def pub_sock(endpoint: str, segment_size: int = 0) -> PubSocket: sock = PubSocket() sock.connect(context, endpoint, segment_size) return sock def sub_sock(endpoint: str, poller: Optional[Poller] = None, addr: str = "127.0.0.1", conflate: bool = False, timeout: Optional[int] = None, segment_size: int = 0) -> SubSocket: sock = SubSocket() sock.connect(context, endpoint, addr.encode('utf8'), conflate, segment_size) if timeout is not None: sock.setTimeout(timeout) if poller is not None: poller.registerSocket(sock) return sock def drain_sock_raw(sock: SubSocket, wait_for_one: bool = False) -> List[bytes]: """Receive all message currently available on the queue""" ret: List[bytes] = [] while 1: if wait_for_one and len(ret) == 0: dat = sock.receive() else: dat = sock.receive(non_blocking=True) if dat is None: break ret.append(dat) return ret