From 3de855ea50d72238deac14fc05cda2a611497778 Mon Sep 17 00:00:00 2001 From: George Hotz <72895+geohot@users.noreply.github.com> Date: Thu, 4 Apr 2024 17:33:21 -0700 Subject: [PATCH] don't use SVM memory in KFD (#4072) * don't use SVM memory in KFD * copy from fd * cleanups * transfer * hacks * ops_hsa * tighter API --- test/external/external_test_hcq.py | 32 +++----- tinygrad/runtime/ops_kfd.py | 126 ++++++++++++++++------------- 2 files changed, 83 insertions(+), 75 deletions(-) diff --git a/test/external/external_test_hcq.py b/test/external/external_test_hcq.py index d58381c3..24f57bf0 100644 --- a/test/external/external_test_hcq.py +++ b/test/external/external_test_hcq.py @@ -65,10 +65,7 @@ class TestHCQ(unittest.TestCase): def test_wait_signal(self): TestHCQ.d0.completion_signal.value = 1 - q = HWComputeQueue() - q.wait(TestHCQ.d0.completion_signal) - q.signal(TestHCQ.d0.completion_signal) - q.submit(TestHCQ.d0) + HWComputeQueue().wait(TestHCQ.d0.completion_signal).signal(TestHCQ.d0.completion_signal).submit(TestHCQ.d0) with self.assertRaises(RuntimeError): TestHCQ.d0._wait_on(TestHCQ.d0.completion_signal.event_id, timeout=50) # clean up @@ -77,10 +74,7 @@ class TestHCQ(unittest.TestCase): def test_wait_copy_signal(self): TestHCQ.d0.completion_signal.value = 1 - q = HWCopyQueue() - q.wait(TestHCQ.d0.completion_signal) - q.signal(TestHCQ.d0.completion_signal) - q.submit(TestHCQ.d0) + HWCopyQueue().wait(TestHCQ.d0.completion_signal).signal(TestHCQ.d0.completion_signal).submit(TestHCQ.d0) with self.assertRaises(RuntimeError): TestHCQ.d0._wait_on(TestHCQ.d0.completion_signal.event_id, timeout=50) # clean up @@ -94,16 +88,20 @@ class TestHCQ(unittest.TestCase): TestHCQ.d0._wait_on(TestHCQ.d0.completion_signal.event_id) assert (val:=TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[0]) == 1.0, f"got val {val}" + def test_submit_empty_queues(self): + HWComputeQueue().submit(TestHCQ.d0) + HWCopyQueue().submit(TestHCQ.d0) + def test_signal_timeout(self): - q = HWComputeQueue() - q.submit(TestHCQ.d0) with self.assertRaises(RuntimeError): TestHCQ.d0._wait_on(TestHCQ.d0.completion_signal.event_id, timeout=50) def test_signal(self): - q = HWComputeQueue() - q.signal(TestHCQ.d0.completion_signal) - q.submit(TestHCQ.d0) + HWComputeQueue().signal(TestHCQ.d0.completion_signal).submit(TestHCQ.d0) + TestHCQ.d0._wait_on(TestHCQ.d0.completion_signal.event_id) + + def test_copy_signal(self): + HWCopyQueue().signal(TestHCQ.d0.completion_signal).submit(TestHCQ.d0) TestHCQ.d0._wait_on(TestHCQ.d0.completion_signal.event_id) def test_run_signal(self): @@ -114,12 +112,6 @@ class TestHCQ(unittest.TestCase): TestHCQ.d0._wait_on(TestHCQ.d0.completion_signal.event_id) assert (val:=TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[0]) == 1.0, f"got val {val}" - def test_copy_signal(self): - q = HWCopyQueue() - q.signal(TestHCQ.d0.completion_signal) - q.submit(TestHCQ.d0) - TestHCQ.d0._wait_on(TestHCQ.d0.completion_signal.event_id) - def test_copy_1000_times(self): q = HWCopyQueue() q.copy(TestHCQ.a.lazydata.buffer._buf.va_addr, TestHCQ.b.lazydata.buffer._buf.va_addr, 8) @@ -169,11 +161,11 @@ class TestHCQ(unittest.TestCase): q = HWComputeQueue() qc = HWCopyQueue() q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.global_size, TestHCQ.runner.local_size) # b = [1, 2] - KFDDevice._get_signal(10).value = 1 q.signal(sig:=KFDDevice._get_signal(10)) qc.wait(sig) qc.copy(TestHCQ.a.lazydata.buffer._buf.va_addr, TestHCQ.b.lazydata.buffer._buf.va_addr, 8) qc.signal(TestHCQ.d0.completion_signal) + sig.value = 1 qc.submit(TestHCQ.d0) time.sleep(0.02) # give it time for the wait to fail q.submit(TestHCQ.d0) diff --git a/tinygrad/runtime/ops_kfd.py b/tinygrad/runtime/ops_kfd.py index 52cc8469..5f66f979 100644 --- a/tinygrad/runtime/ops_kfd.py +++ b/tinygrad/runtime/ops_kfd.py @@ -1,6 +1,6 @@ from __future__ import annotations from typing import Tuple, Any -import os, fcntl, ctypes, functools, re, pathlib, mmap, struct, errno +import os, fcntl, ctypes, functools, re, pathlib, mmap, struct, errno, io from tinygrad.device import Compiled, LRUAllocator, Compiler, BufferOptions, CompilerOptions from tinygrad.helpers import getenv, from_mv, init_c_struct_t, to_mv, round_up from tinygrad.renderer.cstyle import HIPRenderer @@ -73,8 +73,9 @@ class KFDCompiler(Compiler): AQL_PACKET_SIZE = ctypes.sizeof(hsa.hsa_kernel_dispatch_packet_t) SDMA_MAX_COPY_SIZE = 0x400000 +PAGE_SIZE = 0x1000 -SIGNAL_SIZE, SIGNAL_COUNT = ctypes.sizeof(hsa.amd_signal_t), 256 +SIGNAL_SIZE, SIGNAL_COUNT = ctypes.sizeof(hsa.amd_signal_t), 16384 VENDOR_HEADER = hsa.HSA_PACKET_TYPE_VENDOR_SPECIFIC << hsa.HSA_PACKET_HEADER_TYPE @@ -106,14 +107,17 @@ class HWComputeQueue: kernel_object=prg.handle, group_segment_size=prg.group_segment_size, private_segment_size=prg.private_segment_size, kernarg_address=kernargs, completion_signal=hsa.hsa_signal_t(ctypes.addressof(completion_signal)) if completion_signal is not None else EMPTY_SIGNAL)) + return self def signal(self, signal): self.q.append(hsa.hsa_barrier_and_packet_t(header=BARRIER_HEADER, completion_signal=hsa.hsa_signal_t(ctypes.addressof(signal)))) + return self def wait(self, signal): sig = hsa.hsa_barrier_and_packet_t(header=BARRIER_HEADER) sig.dep_signal[0] = hsa.hsa_signal_t(ctypes.addressof(signal)) self.q.append(sig) + return self def submit(self, device:KFDDevice): read_ptr = device.amd_aql_queue.read_dispatch_id @@ -125,6 +129,7 @@ class HWComputeQueue: if len(self.q): device.aql_doorbell[0] = device.aql_doorbell_value + len(self.q) - 1 device.aql_doorbell_value += len(self.q) + return self # prebuilt sdma packets sdma_flush_hdp_pkt = sdma_pkts.hdp_flush(0x8, 0x0, 0x80000000, 0x0, 0x0, 0x0) @@ -150,12 +155,14 @@ class HWCopyQueue: if (device.sdma_doorbell_value-read_ptr) > device.sdma_ring.size: raise RuntimeError("SDMA queue overrun") device.sdma_write_pointer[0] = device.sdma_doorbell_value device.sdma_doorbell[0] = device.sdma_doorbell_value + return self def timestamp(self, addr): self.q.append(sdma_pkts.timestamp(op=amd_gpu.SDMA_OP_TIMESTAMP, sub_op=amd_gpu.SDMA_SUBOP_TIMESTAMP_GET_GLOBAL, addr=addr)) + return self def copy(self, dest, src, copy_size): - self.q.append(sdma_flush_hdp_pkt) + self.q.append(sdma_flush_hdp_pkt) # TODO: do I need this? self.q.append(sdma_cache_inv) copied = 0 copies_commands = (copy_size + SDMA_MAX_COPY_SIZE - 1) // SDMA_MAX_COPY_SIZE @@ -165,6 +172,7 @@ class HWCopyQueue: count=step_copy_size-1, src_addr=src+copied, dst_addr=dest+copied)) copied += step_copy_size self.q.append(sdma_cache_wb) + return self def signal(self, completion_signal): self.q.append(sdma_pkts.atomic(op=amd_gpu.SDMA_OP_ATOMIC, operation=amd_gpu.SDMA_ATOMIC_ADD64, @@ -172,11 +180,13 @@ class HWCopyQueue: if completion_signal.event_mailbox_ptr != 0: self.q.append(sdma_pkts.fence(op=amd_gpu.SDMA_OP_FENCE, mtype=3, addr=completion_signal.event_mailbox_ptr, data=completion_signal.event_id)) self.q.append(sdma_pkts.trap(op=amd_gpu.SDMA_OP_TRAP, int_ctx=completion_signal.event_id)) + return self def wait(self, completion_signal): self.q.append(sdma_pkts.poll_regmem(op=amd_gpu.SDMA_OP_POLL_REGMEM, mem_poll=1, func=0x3, addr=ctypes.addressof(completion_signal) + getattr(hsa.amd_signal_t, 'value').offset, value=0, mask=0xffffffff, interval=0x04, retry_count=0xfff)) + return self class KFDProgram: def __init__(self, device:KFDDevice, name:str, lib:bytes): @@ -219,14 +229,10 @@ class KFDProgram: for i in range(len(vals)): args_st.__setattr__(f'v{i}', vals[i]) self.device.completion_signal.value = 1 # reset the signal before call - - self.q = HWComputeQueue() - self.q.exec(self, self.device.kernargs_ptr, global_size, local_size, self.device.completion_signal if wait else None) + HWComputeQueue().exec(self, self.device.kernargs_ptr, global_size, local_size, + self.device.completion_signal if wait else None).submit(self.device) self.device.kernargs_ptr += self.kernargs_segment_size - # one pending packet + ring doorbell - self.q.submit(self.device) - if wait: self.device._wait_on(self.device.completion_signal.event_id) assert (wp:=self.device.amd_aql_queue.write_dispatch_id) == (rp:=self.device.amd_aql_queue.read_dispatch_id), f"didn't run {wp} != {rp}" @@ -235,6 +241,7 @@ class KFDProgram: class KFDAllocator(LRUAllocator): def __init__(self, device:KFDDevice): self.device = device + self.b = [self.device._gpu_alloc(SDMA_MAX_COPY_SIZE*4, kfd.KFD_IOC_ALLOC_MEM_FLAGS_USERPTR, public=True) for _ in range(2)] super().__init__() def _alloc(self, size:int, options:BufferOptions): @@ -245,34 +252,63 @@ class KFDAllocator(LRUAllocator): if e.errno == errno.ENOMEM: raise MemoryError("Cannot allocate memory") from e else: raise - def _free(self, gpumem, options:BufferOptions): - self.device._gpu_free(gpumem) + def _free(self, gpumem, options:BufferOptions): self.device._gpu_free(gpumem) + def as_buffer(self, src:Any) -> memoryview: + self.device.synchronize() + return to_mv(src.va_addr, src.size) + + def copy_from_fd(self, dest, fd, offset, size): + fo = io.FileIO(fd, "a+b", closefd=False) + fo.seek(offset - (minor_offset:=offset % PAGE_SIZE)) + copied_in, total_copy_size = 0, round_up(size+minor_offset, PAGE_SIZE) + for i in range(0, size+minor_offset, self.b[0].size): + local_size = min(self.b[0].size, total_copy_size-i) + copy_size = min(local_size-minor_offset, size-copied_in) + if copy_size == 0: break + + fo.readinto(to_mv(self.b[1].va_addr, local_size)) + if i != 0: self.device._wait_on(self.device.completion_signal.event_id) + self.b = self.b[::-1] + self.device.completion_signal.value = 1 # TODO: when do we have to reset it? + self.device._submit_sdma(dest.va_addr+copied_in, self.b[0].va_addr+minor_offset, copy_size, completion_signal=self.device.completion_signal) + + copied_in += copy_size + minor_offset = 0 # only on the first + self.device._wait_on(self.device.completion_signal.event_id) + + def transfer(self, dest, src, sz:int, src_dev=None, dest_dev=None): + dest_dev._gpu_map(src) + q = HWComputeQueue().signal(sig := KFDDevice._get_signal()) + HWCopyQueue().wait(sig).copy(dest.va_addr, src.va_addr, sz).signal(sigc := KFDDevice._get_signal()).submit(dest_dev) + HWComputeQueue().wait(sigc).submit(dest_dev) + q.wait(sigc).submit(src_dev) def copyin(self, dest, src: memoryview): - # TODO: need to make the address visible to gpu and pass it directly to sdma. - self.device._map_userptr_to_gpu(ctypes.addressof(from_mv(src).contents), src.nbytes) - self.device.completion_signal.value = 1 - self.device._submit_sdma(dest.va_addr, ctypes.addressof(from_mv(src).contents), src.nbytes, completion_signal=self.device.completion_signal) + for i in range(0, src.nbytes, self.b[0].size): + ctypes.memmove(self.b[1].va_addr, from_mv(src[i:]), lsize:=min(self.b[0].size, src.nbytes-i)) + if i != 0: self.device._wait_on(self.device.completion_signal.event_id) + self.b = self.b[::-1] + self.device.completion_signal.value = 1 # TODO: when do we have to reset it? + self.device._submit_sdma(dest.va_addr+i, self.b[0].va_addr, lsize, completion_signal=self.device.completion_signal) self.device._wait_on(self.device.completion_signal.event_id) def copyout(self, dest:memoryview, src): - self.device._map_userptr_to_gpu(ctypes.addressof(from_mv(dest).contents), dest.nbytes) - self.device.completion_signal.value = 1 - self.device._submit_sdma(ctypes.addressof(from_mv(dest).contents), src.va_addr, dest.nbytes, completion_signal=self.device.completion_signal) - self.device._wait_on(self.device.completion_signal.event_id) + for i in range(0, dest.nbytes, self.b[0].size): + self.device.completion_signal.value = 1 + self.device._submit_sdma(self.b[0].va_addr, src.va_addr+i, lsize:=min(self.b[0].size, dest.nbytes-i), + completion_signal=self.device.completion_signal) + self.device._wait_on(self.device.completion_signal.event_id) + ctypes.memmove(from_mv(dest[i:]), self.b[0].va_addr, lsize) MAP_FIXED, MAP_NORESERVE = 0x10, 0x400 class KFDDevice(Compiled): kfd:int = -1 event_page:Any = None # TODO: fix types in kfd, Optional[kfd.struct_kfd_ioctl_alloc_memory_of_gpu_args] signals_page:Any = None - - def _map_userptr_to_gpu(self, addr, size): - self.map_uptr2gpu_struct.start_addr = addr&~0xfff - self.map_uptr2gpu_struct.size = round_up(size+addr-(addr&~0xfff), 0x1000) - kio.svm(self.kfd, made_struct=self.map_uptr2gpu_struct) + signal_number:int = 10 def _gpu_map(self, mem): + if self.gpu_id in getattr(mem, "mapped_gpu_ids", []): return mem.__setattr__("mapped_gpu_ids", getattr(mem, "mapped_gpu_ids", []) + [self.gpu_id]) c_gpus = (ctypes.c_int32 * len(mem.mapped_gpu_ids))(*mem.mapped_gpu_ids) stm = kio.map_memory_to_gpu(self.kfd, handle=mem.handle, device_ids_array_ptr=ctypes.addressof(c_gpus), n_devices=len(mem.mapped_gpu_ids)) @@ -310,8 +346,13 @@ class KFDDevice(Compiled): kio.free_memory_of_gpu(self.kfd, handle=mem.handle) @classmethod - def _get_signal(self, num): - return hsa.amd_signal_t.from_address(KFDDevice.signals_page.va_addr + SIGNAL_SIZE*num) + def _get_signal(self, num=None): + if num is None: num = KFDDevice.signal_number + KFDDevice.signal_number += 1 + if KFDDevice.signal_number == SIGNAL_COUNT: KFDDevice.signal_number = 10 + ret = hsa.amd_signal_t.from_address(KFDDevice.signals_page.va_addr + SIGNAL_SIZE*num) + ret.value = 1 + return ret def __init__(self, device:str=""): if KFDDevice.kfd == -1: KFDDevice.kfd = os.open("/dev/kfd", os.O_RDWR) @@ -403,24 +444,13 @@ class KFDDevice(Compiled): self.pm4_packet = hsa.hsa_ext_amd_aql_pm4_packet_t(header=VENDOR_HEADER, pm4_command=pm4_cmds, completion_signal=hsa.hsa_signal_t(ctypes.addressof(self.completion_signal))) - # Helpers - map_uptr2gpu_struct_t = init_c_struct_t(tuple(kfd.struct_kfd_ioctl_svm_args._fields_[:-1]+[('attrs', kfd.struct_kfd_ioctl_svm_attribute*2)])) # type: ignore - self.map_uptr2gpu_struct = map_uptr2gpu_struct_t(nattr=2, op=kfd.KFD_IOCTL_SVM_OP_SET_ATTR) - self.map_uptr2gpu_struct.attrs[0].type = kfd.KFD_IOCTL_SVM_ATTR_SET_FLAGS - self.map_uptr2gpu_struct.attrs[0].value = kfd.KFD_IOCTL_SVM_FLAG_COHERENT - self.map_uptr2gpu_struct.attrs[1].type = kfd.KFD_IOCTL_SVM_ATTR_ACCESS_IN_PLACE - self.map_uptr2gpu_struct.attrs[1].value = self.gpu_id - super().__init__(device, KFDAllocator(self), KFDCompiler(self.arch), functools.partial(KFDProgram, self)) def _submit_sdma(self, dest, src, copy_size, wait_signals=None, completion_signal=None): - q = HWCopyQueue() - if wait_signals is not None: # NOTE: we check only low 32 bits to be zeroed, we don't use higher values for signals for sig in wait_signals: q.wait(ctypes.addressof(sig) + getattr(hsa.amd_signal_t, 'value').offset) - if completion_signal is not None: q.timestamp(ctypes.addressof(completion_signal) + getattr(hsa.amd_signal_t, 'start_ts').offset) q.copy(dest, src, copy_size) if completion_signal is not None: q.timestamp(ctypes.addressof(completion_signal) + getattr(hsa.amd_signal_t, 'end_ts').offset) @@ -435,28 +465,14 @@ class KFDDevice(Compiled): amd_gpu.PACKET3_ACQUIRE_MEM_GCR_CNTL_GLV_INV(glv) | amd_gpu.PACKET3_ACQUIRE_MEM_GCR_CNTL_GL1_INV(gl1) | \ amd_gpu.PACKET3_ACQUIRE_MEM_GCR_CNTL_GL2_INV(gl2)] for i, value in enumerate(pm4_cmd): pm4_buffer_view[i] = value - ctypes.memmove(self.aql_ring.va_addr + (self.aql_doorbell_value * AQL_PACKET_SIZE) % self.aql_ring.size, - ctypes.addressof(self.pm4_packet), AQL_PACKET_SIZE) - - self.amd_aql_queue.write_dispatch_id = self.aql_doorbell_value + 1 - self.aql_doorbell[0] = self.aql_doorbell_value - self.aql_doorbell_value += 1 - + q = HWComputeQueue() + q.q.append(self.pm4_packet) + q.submit(self) self._wait_on(self.completion_signal.event_id) assert (wp:=self.amd_aql_queue.write_dispatch_id) == (rp:=self.amd_aql_queue.read_dispatch_id), f"didn't run {wp} != {rp}" def synchronize(self): - q = HWComputeQueue() - q.signal(self.completion_signal) - - ring_addr = self.aql_ring.va_addr + (self.aql_doorbell_value*AQL_PACKET_SIZE) % self.aql_ring.size - for cmd in q.q: ctypes.memmove(ring_addr, ctypes.addressof(cmd), AQL_PACKET_SIZE) - - # one pending packet + ring doorbell - self.amd_aql_queue.write_dispatch_id = self.aql_doorbell_value + 1 - self.aql_doorbell[0] = self.aql_doorbell_value - self.aql_doorbell_value += 1 - + HWComputeQueue().signal(self.completion_signal).submit(self) self._wait_on(self.completion_signal.event_id) assert (wp:=self.amd_aql_queue.write_dispatch_id) == (rp:=self.amd_aql_queue.read_dispatch_id), f"didn't run {wp} != {rp}"