mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-02-27 15:23:53 +08:00
process_replay: improved support for drained pubs (#28452)
Stop triggering empty update if there're no drained messages in the queue
This commit is contained in:
@@ -77,9 +77,9 @@ class ReplayContext:
|
||||
def wait_for_recv_called(self):
|
||||
messaging.wait_for_one_event(self.all_recv_called_events)
|
||||
|
||||
def wait_for_next_recv(self, end_of_cycle):
|
||||
def wait_for_next_recv(self, trigger_empty_recv):
|
||||
index = messaging.wait_for_one_event(self.all_recv_called_events)
|
||||
if self.drained_pub is not None and end_of_cycle:
|
||||
if self.drained_pub is not None and trigger_empty_recv:
|
||||
self.all_recv_called_events[index].clear()
|
||||
self.all_recv_ready_events[index].set()
|
||||
self.all_recv_called_events[index].wait()
|
||||
@@ -419,12 +419,17 @@ def _replay_single_process(cfg, lr, fingerprint):
|
||||
for s in sockets.values():
|
||||
messaging.recv_one_or_none(s)
|
||||
|
||||
# empty recv on drained pub indicates the end of messages, only do that if there're any
|
||||
trigger_empty_recv = False
|
||||
if cfg.drained_pub:
|
||||
trigger_empty_recv = next((True for m in msg_queue if m.which() == cfg.drained_pub), False)
|
||||
|
||||
for m in msg_queue:
|
||||
pm.send(m.which(), m.as_builder())
|
||||
msg_queue = []
|
||||
|
||||
rc.unlock_sockets()
|
||||
rc.wait_for_next_recv(True)
|
||||
rc.wait_for_next_recv(trigger_empty_recv)
|
||||
|
||||
for s in resp_sockets:
|
||||
ms = messaging.drain_sock(sockets[s])
|
||||
|
||||
Reference in New Issue
Block a user