mirror of https://github.com/commaai/tinygrad.git
50 lines
1.5 KiB
Python
50 lines
1.5 KiB
Python
import multiprocessing, subprocess
|
|
import cloudpickle
|
|
from typing import Any
|
|
|
|
def _early_exec_process(qin, qout):
|
|
while True:
|
|
path, inp = qin.get()
|
|
try:
|
|
qout.put(subprocess.check_output(path, input=inp))
|
|
except Exception as e:
|
|
qout.put(e)
|
|
|
|
def enable_early_exec():
|
|
qin: multiprocessing.Queue = multiprocessing.Queue()
|
|
qout: multiprocessing.Queue = multiprocessing.Queue()
|
|
p = multiprocessing.Process(target=_early_exec_process, args=(qin, qout))
|
|
p.daemon = True
|
|
p.start()
|
|
def early_exec(x):
|
|
qin.put(x)
|
|
ret = qout.get()
|
|
if isinstance(ret, Exception): raise ret
|
|
else: return ret
|
|
return early_exec
|
|
|
|
def proc(itermaker, q) -> None:
|
|
try:
|
|
for x in itermaker(): q.put(x)
|
|
except Exception as e:
|
|
q.put(e)
|
|
finally:
|
|
q.put(None)
|
|
q.close()
|
|
|
|
class _CloudpickleFunctionWrapper:
|
|
def __init__(self, fn): self.fn = fn
|
|
def __getstate__(self): return cloudpickle.dumps(self.fn)
|
|
def __setstate__(self, pfn): self.fn = cloudpickle.loads(pfn)
|
|
def __call__(self, *args, **kwargs) -> Any: return self.fn(*args, **kwargs)
|
|
|
|
def cross_process(itermaker, maxsize=16):
|
|
q: multiprocessing.Queue = multiprocessing.Queue(maxsize)
|
|
# multiprocessing uses pickle which cannot dump lambdas, so use cloudpickle.
|
|
p = multiprocessing.Process(target=proc, args=(_CloudpickleFunctionWrapper(itermaker), q))
|
|
p.start()
|
|
while True:
|
|
ret = q.get()
|
|
if isinstance(ret, Exception): raise ret
|
|
elif ret is None: break
|
|
else: yield ret |