* feat: non-blocking

* feat: store event on buffer
This commit is contained in:
wozeparrot 2023-08-02 01:13:51 -04:00 committed by GitHub
parent b66361843a
commit 7aff8c4ded
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 4 additions and 8 deletions

View File

@ -17,7 +17,6 @@ if DEBUG >= 5:
early_exec = fromimport("extra.helpers", "enable_early_exec")()
class _CL:
def __init__(self): self.events_in_flight = []
def post_init(self, device=None):
platforms: List[List[cl.Device]] = [y for y in ([x.get_devices(device_type=cl.device_type.GPU) for x in cl.get_platforms()] + [x.get_devices(device_type=cl.device_type.CPU) for x in cl.get_platforms()]) if len(y)]
self.cl_platform = cl.get_platforms()[getenv('CL_PLATFORM', 0)]
@ -25,8 +24,6 @@ class _CL:
if DEBUG >= 1: print(f"using devices: {[ctx.devices[0].hashable_model_and_version_identifier for ctx in self.cl_ctxs]}")
self.cl_queue: List[cl.CommandQueue] = [cl.CommandQueue(ctx, device=ctx.devices[0], properties=cl.command_queue_properties.PROFILING_ENABLE) for ctx in self.cl_ctxs]
def synchronize(self):
for evt in self.events_in_flight: evt.wait()
self.events_in_flight.clear()
for q in self.cl_queue: q.finish()
CL = _CL()
CL.post_init() if not getenv("DELAYED_RUNTIME_INIT", False) else None
@ -45,13 +42,12 @@ class CLBuffer(RawBufferCopyInOut):
def _copyin(self, x:np.ndarray):
assert not self.dtype.name.startswith("image"), f"can't copyin images {self.dtype}"
CL.events_in_flight.append(cl.enqueue_copy(CL.cl_queue[self._buf.device], self._buf, np.require(x, requirements='C'), is_blocking=False))
self.event = cl.enqueue_copy(CL.cl_queue[self._buf.device], self._buf, np.require(x, requirements='C'), is_blocking=False)
def _copyout(self, x:np.ndarray):
assert not self.dtype.name.startswith("image"), f"can't copyout images {self.dtype}"
buf = cl.Buffer(CL.cl_ctxs[self._buf.device], cl.mem_flags.WRITE_ONLY | cl.mem_flags.USE_HOST_PTR, 0, hostbuf=x.data)
mapped = cl.enqueue_map_buffer(CL.cl_queue[self._buf.device], buf, cl.map_flags.WRITE, 0, self.size, dtype=self.dtype.np)
CL.synchronize()
with mapped[0].base: cl.enqueue_copy(CL.cl_queue[self._buf.device], mapped[0], self._buf, is_blocking=True, wait_for=[mapped[1]])
mapped, event = cl.enqueue_map_buffer(CL.cl_queue[self._buf.device], buf, cl.map_flags.WRITE, 0, self.size, dtype=self.dtype.np, is_blocking=False)
with mapped.base: cl.enqueue_copy(CL.cl_queue[self._buf.device], mapped, self._buf, is_blocking=True, wait_for=[event])
class CLProgram:
def __init__(self, name:str, prg:str, binary=False, argdtypes=None, options=None):
@ -80,7 +76,7 @@ class CLProgram:
def __call__(self, global_size, local_size, *bufs, wait=False) -> Optional[float]:
cl_bufs = [x._buf if isinstance(x, CLBuffer) else x for x in bufs]
e = self.clprgs[cl_bufs[0].device](CL.cl_queue[cl_bufs[0].device], [g*l for g,l in zip(global_size, local_size)] if local_size is not None else global_size, local_size, *cl_bufs)
e = self.clprgs[cl_bufs[0].device](CL.cl_queue[cl_bufs[0].device], [g*l for g,l in zip(global_size, local_size)] if local_size is not None else global_size, local_size, *cl_bufs, wait_for=[x.event for x in bufs if isinstance(x, CLBuffer) and hasattr(x, "event")])
if wait:
e.wait()
try: