mirror of https://github.com/commaai/tinygrad.git
Fix cl map buffer (#2190)
* fix gpu enqueue_map_buffer out of space * add test
This commit is contained in:
parent
c59ea32f90
commit
8c07c73a9b
|
@ -6,6 +6,7 @@ from tinygrad.ops import GlobalCounters
|
|||
from tinygrad.runtime.lib import RawBuffer, LRUAllocator
|
||||
from tinygrad.helpers import dtypes, prod
|
||||
from tinygrad.ops import Device
|
||||
from tinygrad.tensor import Tensor
|
||||
|
||||
def check_gc():
|
||||
if Device.DEFAULT == "GPU":
|
||||
|
@ -107,5 +108,29 @@ class TestAllocators(unittest.TestCase):
|
|||
test()
|
||||
check_gc()
|
||||
|
||||
@unittest.skipUnless(Device.DEFAULT == "GPU", "GPU=1 specific")
|
||||
def test_gpu_copyout(self):
|
||||
def test():
|
||||
from tinygrad.runtime.ops_gpu import CL
|
||||
|
||||
# Allocation to init the allocator.
|
||||
tx = Tensor.rand(1)
|
||||
tx.realize()
|
||||
free_space = CL.cl_allocator.free_space[tx.lazydata.realized._device]
|
||||
|
||||
# Spawning 128mb objects to fill half of free_space
|
||||
will_allocate = free_space // 3
|
||||
trash_allocation_size = free_space // 2
|
||||
|
||||
def sp():
|
||||
trash_buffer = Tensor.rand(trash_allocation_size // 4)
|
||||
trash_buffer.realize()
|
||||
sp()
|
||||
|
||||
xx = Tensor.rand(will_allocate // 4)
|
||||
_ = xx.numpy()
|
||||
test()
|
||||
check_gc()
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -70,31 +70,40 @@ class LRUAllocator:
|
|||
self.buffer_info: Dict[Any, Tuple[int, DType, str]] = dict()
|
||||
self.cached_buffers: Dict[Tuple[int, ...], Deque[Tuple[Any, int]]] = defaultdict(deque) # Cached buffer storage, splitted by type and size, newest first.
|
||||
self.aging_order: Dict[Any, Deque[Tuple[Tuple[int, ...], int]]] = defaultdict(deque) # Keys of cached_buffers, ordered from oldest to newest updates.
|
||||
|
||||
def _cache_reuse_buffer(self, rawbufs: Deque[Tuple[Any, int]]): # The newest cached buffer is reused.
|
||||
GlobalCounters.mem_cached -= self._underlying_buf_memsz(rawbufs[0][0])
|
||||
return rawbufs.popleft()[0]
|
||||
def _alloc_buffer(self, size, dtype, device, **kwargs):
|
||||
self.free_space[device] -= size*dtype.itemsize
|
||||
while len(self.aging_order[device]) and self.free_space[device] < 0: # When OOM removing lru buffers.
|
||||
|
||||
def ensure_has_free_space(self, size, dtype, device):
|
||||
while len(self.aging_order[device]) and (self.free_space[device]-size*dtype.itemsize) < 0: # When OOM removing lru buffers.
|
||||
bucket, epoch = self.aging_order[device].popleft()
|
||||
if self.cached_buffers[bucket] and self.cached_buffers[bucket][-1][1] == epoch: self._free_buffer(self.cached_buffers[bucket].pop()[0]) # Free cached buffer if it is still in cache.
|
||||
|
||||
def _alloc_buffer(self, size, dtype, device, **kwargs):
|
||||
self.ensure_has_free_space(size, dtype, device)
|
||||
self.free_space[device] -= size*dtype.itemsize
|
||||
newbuf = self._do_alloc(max(1, size), dtype, device, **kwargs)
|
||||
self.buffer_info[newbuf] = (size, dtype, device)
|
||||
return newbuf
|
||||
|
||||
def _free_buffer(self, buf_to_free):
|
||||
self.free_space[self.buffer_info[buf_to_free][2]] += self._underlying_buf_memsz(buf_to_free)
|
||||
GlobalCounters.mem_cached -= self._underlying_buf_memsz(buf_to_free)
|
||||
self.buffer_info.pop(buf_to_free)
|
||||
self._do_free(buf_to_free)
|
||||
|
||||
def alloc(self, size, dtype, device='0', **kwargs):
|
||||
rawbufs = self.cached_buffers.get(self._cached_bufkey(size, dtype, device), None)
|
||||
return self._cache_reuse_buffer(rawbufs) if rawbufs else self._alloc_buffer(size, dtype, device, **kwargs)
|
||||
|
||||
def free(self, buf): # free() just caches buffer. It might be freed later when OOM during allocation.
|
||||
self.epoch += 1
|
||||
size, dtype, device = self.buffer_info[buf]
|
||||
self.cached_buffers[self._cached_bufkey(size, dtype, device)].appendleft((buf, self.epoch))
|
||||
self.aging_order[device].append((self._cached_bufkey(size, dtype, device), self.epoch))
|
||||
GlobalCounters.mem_cached += self._underlying_buf_memsz(buf)
|
||||
|
||||
def _underlying_buf_memsz(self, buf): return self.buffer_info[buf][0] * self.buffer_info[buf][1].itemsize
|
||||
def _cached_bufkey(self, size, dtype, device) -> Tuple[int, ...]: return (device, size, dtype, dtype.shape) if isinstance(dtype, ImageDType) else (device, size, dtype) # Provides a key for reusing device buffers with identical keys.
|
||||
def _do_alloc(self, size, dtype, device, **kwargs): raise NotImplementedError("must be implemented")
|
||||
|
|
|
@ -52,6 +52,7 @@ class CLBuffer(RawBufferCopyInOut, RawBufferTransfer):
|
|||
self.event = cl.enqueue_copy(CL.cl_queue[self._buf.device], self._buf, np.require(x, requirements=['C', 'A']), is_blocking=False)
|
||||
def _copyout(self, x:np.ndarray):
|
||||
assert not self.dtype.name.startswith("image"), f"can't copyout images {self.dtype}"
|
||||
CL.cl_allocator.ensure_has_free_space(self.size, self.dtype, self._device)
|
||||
buf = cl.Buffer(CL.cl_ctxs[self._buf.device], cl.mem_flags.WRITE_ONLY | cl.mem_flags.USE_HOST_PTR, 0, hostbuf=x.data)
|
||||
mapped, event = cl.enqueue_map_buffer(CL.cl_queue[self._buf.device], buf, cl.map_flags.WRITE, 0, self.size, dtype=self.dtype.np, is_blocking=False)
|
||||
with mapped.base: cl.enqueue_copy(CL.cl_queue[self._buf.device], mapped, self._buf, is_blocking=True, wait_for=[event] + ([self.event] if hasattr(self, "event") else []))
|
||||
|
|
Loading…
Reference in New Issue