don't use SVM memory in KFD (#4072)

* don't use SVM memory in KFD

* copy from fd

* cleanups

* transfer

* hacks

* ops_hsa

* tighter API
This commit is contained in:
George Hotz 2024-04-04 17:33:21 -07:00 committed by GitHub
parent 5e6e6c9a67
commit 3de855ea50
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 83 additions and 75 deletions

View File

@ -65,10 +65,7 @@ class TestHCQ(unittest.TestCase):
def test_wait_signal(self):
TestHCQ.d0.completion_signal.value = 1
q = HWComputeQueue()
q.wait(TestHCQ.d0.completion_signal)
q.signal(TestHCQ.d0.completion_signal)
q.submit(TestHCQ.d0)
HWComputeQueue().wait(TestHCQ.d0.completion_signal).signal(TestHCQ.d0.completion_signal).submit(TestHCQ.d0)
with self.assertRaises(RuntimeError):
TestHCQ.d0._wait_on(TestHCQ.d0.completion_signal.event_id, timeout=50)
# clean up
@ -77,10 +74,7 @@ class TestHCQ(unittest.TestCase):
def test_wait_copy_signal(self):
TestHCQ.d0.completion_signal.value = 1
q = HWCopyQueue()
q.wait(TestHCQ.d0.completion_signal)
q.signal(TestHCQ.d0.completion_signal)
q.submit(TestHCQ.d0)
HWCopyQueue().wait(TestHCQ.d0.completion_signal).signal(TestHCQ.d0.completion_signal).submit(TestHCQ.d0)
with self.assertRaises(RuntimeError):
TestHCQ.d0._wait_on(TestHCQ.d0.completion_signal.event_id, timeout=50)
# clean up
@ -94,16 +88,20 @@ class TestHCQ(unittest.TestCase):
TestHCQ.d0._wait_on(TestHCQ.d0.completion_signal.event_id)
assert (val:=TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[0]) == 1.0, f"got val {val}"
def test_submit_empty_queues(self):
HWComputeQueue().submit(TestHCQ.d0)
HWCopyQueue().submit(TestHCQ.d0)
def test_signal_timeout(self):
q = HWComputeQueue()
q.submit(TestHCQ.d0)
with self.assertRaises(RuntimeError):
TestHCQ.d0._wait_on(TestHCQ.d0.completion_signal.event_id, timeout=50)
def test_signal(self):
q = HWComputeQueue()
q.signal(TestHCQ.d0.completion_signal)
q.submit(TestHCQ.d0)
HWComputeQueue().signal(TestHCQ.d0.completion_signal).submit(TestHCQ.d0)
TestHCQ.d0._wait_on(TestHCQ.d0.completion_signal.event_id)
def test_copy_signal(self):
HWCopyQueue().signal(TestHCQ.d0.completion_signal).submit(TestHCQ.d0)
TestHCQ.d0._wait_on(TestHCQ.d0.completion_signal.event_id)
def test_run_signal(self):
@ -114,12 +112,6 @@ class TestHCQ(unittest.TestCase):
TestHCQ.d0._wait_on(TestHCQ.d0.completion_signal.event_id)
assert (val:=TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[0]) == 1.0, f"got val {val}"
def test_copy_signal(self):
q = HWCopyQueue()
q.signal(TestHCQ.d0.completion_signal)
q.submit(TestHCQ.d0)
TestHCQ.d0._wait_on(TestHCQ.d0.completion_signal.event_id)
def test_copy_1000_times(self):
q = HWCopyQueue()
q.copy(TestHCQ.a.lazydata.buffer._buf.va_addr, TestHCQ.b.lazydata.buffer._buf.va_addr, 8)
@ -169,11 +161,11 @@ class TestHCQ(unittest.TestCase):
q = HWComputeQueue()
qc = HWCopyQueue()
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.global_size, TestHCQ.runner.local_size) # b = [1, 2]
KFDDevice._get_signal(10).value = 1
q.signal(sig:=KFDDevice._get_signal(10))
qc.wait(sig)
qc.copy(TestHCQ.a.lazydata.buffer._buf.va_addr, TestHCQ.b.lazydata.buffer._buf.va_addr, 8)
qc.signal(TestHCQ.d0.completion_signal)
sig.value = 1
qc.submit(TestHCQ.d0)
time.sleep(0.02) # give it time for the wait to fail
q.submit(TestHCQ.d0)

View File

@ -1,6 +1,6 @@
from __future__ import annotations
from typing import Tuple, Any
import os, fcntl, ctypes, functools, re, pathlib, mmap, struct, errno
import os, fcntl, ctypes, functools, re, pathlib, mmap, struct, errno, io
from tinygrad.device import Compiled, LRUAllocator, Compiler, BufferOptions, CompilerOptions
from tinygrad.helpers import getenv, from_mv, init_c_struct_t, to_mv, round_up
from tinygrad.renderer.cstyle import HIPRenderer
@ -73,8 +73,9 @@ class KFDCompiler(Compiler):
AQL_PACKET_SIZE = ctypes.sizeof(hsa.hsa_kernel_dispatch_packet_t)
SDMA_MAX_COPY_SIZE = 0x400000
PAGE_SIZE = 0x1000
SIGNAL_SIZE, SIGNAL_COUNT = ctypes.sizeof(hsa.amd_signal_t), 256
SIGNAL_SIZE, SIGNAL_COUNT = ctypes.sizeof(hsa.amd_signal_t), 16384
VENDOR_HEADER = hsa.HSA_PACKET_TYPE_VENDOR_SPECIFIC << hsa.HSA_PACKET_HEADER_TYPE
@ -106,14 +107,17 @@ class HWComputeQueue:
kernel_object=prg.handle, group_segment_size=prg.group_segment_size, private_segment_size=prg.private_segment_size,
kernarg_address=kernargs,
completion_signal=hsa.hsa_signal_t(ctypes.addressof(completion_signal)) if completion_signal is not None else EMPTY_SIGNAL))
return self
def signal(self, signal):
self.q.append(hsa.hsa_barrier_and_packet_t(header=BARRIER_HEADER, completion_signal=hsa.hsa_signal_t(ctypes.addressof(signal))))
return self
def wait(self, signal):
sig = hsa.hsa_barrier_and_packet_t(header=BARRIER_HEADER)
sig.dep_signal[0] = hsa.hsa_signal_t(ctypes.addressof(signal))
self.q.append(sig)
return self
def submit(self, device:KFDDevice):
read_ptr = device.amd_aql_queue.read_dispatch_id
@ -125,6 +129,7 @@ class HWComputeQueue:
if len(self.q):
device.aql_doorbell[0] = device.aql_doorbell_value + len(self.q) - 1
device.aql_doorbell_value += len(self.q)
return self
# prebuilt sdma packets
sdma_flush_hdp_pkt = sdma_pkts.hdp_flush(0x8, 0x0, 0x80000000, 0x0, 0x0, 0x0)
@ -150,12 +155,14 @@ class HWCopyQueue:
if (device.sdma_doorbell_value-read_ptr) > device.sdma_ring.size: raise RuntimeError("SDMA queue overrun")
device.sdma_write_pointer[0] = device.sdma_doorbell_value
device.sdma_doorbell[0] = device.sdma_doorbell_value
return self
def timestamp(self, addr):
self.q.append(sdma_pkts.timestamp(op=amd_gpu.SDMA_OP_TIMESTAMP, sub_op=amd_gpu.SDMA_SUBOP_TIMESTAMP_GET_GLOBAL, addr=addr))
return self
def copy(self, dest, src, copy_size):
self.q.append(sdma_flush_hdp_pkt)
self.q.append(sdma_flush_hdp_pkt) # TODO: do I need this?
self.q.append(sdma_cache_inv)
copied = 0
copies_commands = (copy_size + SDMA_MAX_COPY_SIZE - 1) // SDMA_MAX_COPY_SIZE
@ -165,6 +172,7 @@ class HWCopyQueue:
count=step_copy_size-1, src_addr=src+copied, dst_addr=dest+copied))
copied += step_copy_size
self.q.append(sdma_cache_wb)
return self
def signal(self, completion_signal):
self.q.append(sdma_pkts.atomic(op=amd_gpu.SDMA_OP_ATOMIC, operation=amd_gpu.SDMA_ATOMIC_ADD64,
@ -172,11 +180,13 @@ class HWCopyQueue:
if completion_signal.event_mailbox_ptr != 0:
self.q.append(sdma_pkts.fence(op=amd_gpu.SDMA_OP_FENCE, mtype=3, addr=completion_signal.event_mailbox_ptr, data=completion_signal.event_id))
self.q.append(sdma_pkts.trap(op=amd_gpu.SDMA_OP_TRAP, int_ctx=completion_signal.event_id))
return self
def wait(self, completion_signal):
self.q.append(sdma_pkts.poll_regmem(op=amd_gpu.SDMA_OP_POLL_REGMEM, mem_poll=1, func=0x3,
addr=ctypes.addressof(completion_signal) + getattr(hsa.amd_signal_t, 'value').offset,
value=0, mask=0xffffffff, interval=0x04, retry_count=0xfff))
return self
class KFDProgram:
def __init__(self, device:KFDDevice, name:str, lib:bytes):
@ -219,14 +229,10 @@ class KFDProgram:
for i in range(len(vals)): args_st.__setattr__(f'v{i}', vals[i])
self.device.completion_signal.value = 1 # reset the signal before call
self.q = HWComputeQueue()
self.q.exec(self, self.device.kernargs_ptr, global_size, local_size, self.device.completion_signal if wait else None)
HWComputeQueue().exec(self, self.device.kernargs_ptr, global_size, local_size,
self.device.completion_signal if wait else None).submit(self.device)
self.device.kernargs_ptr += self.kernargs_segment_size
# one pending packet + ring doorbell
self.q.submit(self.device)
if wait:
self.device._wait_on(self.device.completion_signal.event_id)
assert (wp:=self.device.amd_aql_queue.write_dispatch_id) == (rp:=self.device.amd_aql_queue.read_dispatch_id), f"didn't run {wp} != {rp}"
@ -235,6 +241,7 @@ class KFDProgram:
class KFDAllocator(LRUAllocator):
def __init__(self, device:KFDDevice):
self.device = device
self.b = [self.device._gpu_alloc(SDMA_MAX_COPY_SIZE*4, kfd.KFD_IOC_ALLOC_MEM_FLAGS_USERPTR, public=True) for _ in range(2)]
super().__init__()
def _alloc(self, size:int, options:BufferOptions):
@ -245,34 +252,63 @@ class KFDAllocator(LRUAllocator):
if e.errno == errno.ENOMEM: raise MemoryError("Cannot allocate memory") from e
else: raise
def _free(self, gpumem, options:BufferOptions):
self.device._gpu_free(gpumem)
def _free(self, gpumem, options:BufferOptions): self.device._gpu_free(gpumem)
def as_buffer(self, src:Any) -> memoryview:
self.device.synchronize()
return to_mv(src.va_addr, src.size)
def copy_from_fd(self, dest, fd, offset, size):
fo = io.FileIO(fd, "a+b", closefd=False)
fo.seek(offset - (minor_offset:=offset % PAGE_SIZE))
copied_in, total_copy_size = 0, round_up(size+minor_offset, PAGE_SIZE)
for i in range(0, size+minor_offset, self.b[0].size):
local_size = min(self.b[0].size, total_copy_size-i)
copy_size = min(local_size-minor_offset, size-copied_in)
if copy_size == 0: break
fo.readinto(to_mv(self.b[1].va_addr, local_size))
if i != 0: self.device._wait_on(self.device.completion_signal.event_id)
self.b = self.b[::-1]
self.device.completion_signal.value = 1 # TODO: when do we have to reset it?
self.device._submit_sdma(dest.va_addr+copied_in, self.b[0].va_addr+minor_offset, copy_size, completion_signal=self.device.completion_signal)
copied_in += copy_size
minor_offset = 0 # only on the first
self.device._wait_on(self.device.completion_signal.event_id)
def transfer(self, dest, src, sz:int, src_dev=None, dest_dev=None):
dest_dev._gpu_map(src)
q = HWComputeQueue().signal(sig := KFDDevice._get_signal())
HWCopyQueue().wait(sig).copy(dest.va_addr, src.va_addr, sz).signal(sigc := KFDDevice._get_signal()).submit(dest_dev)
HWComputeQueue().wait(sigc).submit(dest_dev)
q.wait(sigc).submit(src_dev)
def copyin(self, dest, src: memoryview):
# TODO: need to make the address visible to gpu and pass it directly to sdma.
self.device._map_userptr_to_gpu(ctypes.addressof(from_mv(src).contents), src.nbytes)
self.device.completion_signal.value = 1
self.device._submit_sdma(dest.va_addr, ctypes.addressof(from_mv(src).contents), src.nbytes, completion_signal=self.device.completion_signal)
for i in range(0, src.nbytes, self.b[0].size):
ctypes.memmove(self.b[1].va_addr, from_mv(src[i:]), lsize:=min(self.b[0].size, src.nbytes-i))
if i != 0: self.device._wait_on(self.device.completion_signal.event_id)
self.b = self.b[::-1]
self.device.completion_signal.value = 1 # TODO: when do we have to reset it?
self.device._submit_sdma(dest.va_addr+i, self.b[0].va_addr, lsize, completion_signal=self.device.completion_signal)
self.device._wait_on(self.device.completion_signal.event_id)
def copyout(self, dest:memoryview, src):
self.device._map_userptr_to_gpu(ctypes.addressof(from_mv(dest).contents), dest.nbytes)
for i in range(0, dest.nbytes, self.b[0].size):
self.device.completion_signal.value = 1
self.device._submit_sdma(ctypes.addressof(from_mv(dest).contents), src.va_addr, dest.nbytes, completion_signal=self.device.completion_signal)
self.device._submit_sdma(self.b[0].va_addr, src.va_addr+i, lsize:=min(self.b[0].size, dest.nbytes-i),
completion_signal=self.device.completion_signal)
self.device._wait_on(self.device.completion_signal.event_id)
ctypes.memmove(from_mv(dest[i:]), self.b[0].va_addr, lsize)
MAP_FIXED, MAP_NORESERVE = 0x10, 0x400
class KFDDevice(Compiled):
kfd:int = -1
event_page:Any = None # TODO: fix types in kfd, Optional[kfd.struct_kfd_ioctl_alloc_memory_of_gpu_args]
signals_page:Any = None
def _map_userptr_to_gpu(self, addr, size):
self.map_uptr2gpu_struct.start_addr = addr&~0xfff
self.map_uptr2gpu_struct.size = round_up(size+addr-(addr&~0xfff), 0x1000)
kio.svm(self.kfd, made_struct=self.map_uptr2gpu_struct)
signal_number:int = 10
def _gpu_map(self, mem):
if self.gpu_id in getattr(mem, "mapped_gpu_ids", []): return
mem.__setattr__("mapped_gpu_ids", getattr(mem, "mapped_gpu_ids", []) + [self.gpu_id])
c_gpus = (ctypes.c_int32 * len(mem.mapped_gpu_ids))(*mem.mapped_gpu_ids)
stm = kio.map_memory_to_gpu(self.kfd, handle=mem.handle, device_ids_array_ptr=ctypes.addressof(c_gpus), n_devices=len(mem.mapped_gpu_ids))
@ -310,8 +346,13 @@ class KFDDevice(Compiled):
kio.free_memory_of_gpu(self.kfd, handle=mem.handle)
@classmethod
def _get_signal(self, num):
return hsa.amd_signal_t.from_address(KFDDevice.signals_page.va_addr + SIGNAL_SIZE*num)
def _get_signal(self, num=None):
if num is None: num = KFDDevice.signal_number
KFDDevice.signal_number += 1
if KFDDevice.signal_number == SIGNAL_COUNT: KFDDevice.signal_number = 10
ret = hsa.amd_signal_t.from_address(KFDDevice.signals_page.va_addr + SIGNAL_SIZE*num)
ret.value = 1
return ret
def __init__(self, device:str=""):
if KFDDevice.kfd == -1: KFDDevice.kfd = os.open("/dev/kfd", os.O_RDWR)
@ -403,24 +444,13 @@ class KFDDevice(Compiled):
self.pm4_packet = hsa.hsa_ext_amd_aql_pm4_packet_t(header=VENDOR_HEADER, pm4_command=pm4_cmds,
completion_signal=hsa.hsa_signal_t(ctypes.addressof(self.completion_signal)))
# Helpers
map_uptr2gpu_struct_t = init_c_struct_t(tuple(kfd.struct_kfd_ioctl_svm_args._fields_[:-1]+[('attrs', kfd.struct_kfd_ioctl_svm_attribute*2)])) # type: ignore
self.map_uptr2gpu_struct = map_uptr2gpu_struct_t(nattr=2, op=kfd.KFD_IOCTL_SVM_OP_SET_ATTR)
self.map_uptr2gpu_struct.attrs[0].type = kfd.KFD_IOCTL_SVM_ATTR_SET_FLAGS
self.map_uptr2gpu_struct.attrs[0].value = kfd.KFD_IOCTL_SVM_FLAG_COHERENT
self.map_uptr2gpu_struct.attrs[1].type = kfd.KFD_IOCTL_SVM_ATTR_ACCESS_IN_PLACE
self.map_uptr2gpu_struct.attrs[1].value = self.gpu_id
super().__init__(device, KFDAllocator(self), KFDCompiler(self.arch), functools.partial(KFDProgram, self))
def _submit_sdma(self, dest, src, copy_size, wait_signals=None, completion_signal=None):
q = HWCopyQueue()
if wait_signals is not None:
# NOTE: we check only low 32 bits to be zeroed, we don't use higher values for signals
for sig in wait_signals: q.wait(ctypes.addressof(sig) + getattr(hsa.amd_signal_t, 'value').offset)
if completion_signal is not None: q.timestamp(ctypes.addressof(completion_signal) + getattr(hsa.amd_signal_t, 'start_ts').offset)
q.copy(dest, src, copy_size)
if completion_signal is not None: q.timestamp(ctypes.addressof(completion_signal) + getattr(hsa.amd_signal_t, 'end_ts').offset)
@ -435,28 +465,14 @@ class KFDDevice(Compiled):
amd_gpu.PACKET3_ACQUIRE_MEM_GCR_CNTL_GLV_INV(glv) | amd_gpu.PACKET3_ACQUIRE_MEM_GCR_CNTL_GL1_INV(gl1) | \
amd_gpu.PACKET3_ACQUIRE_MEM_GCR_CNTL_GL2_INV(gl2)]
for i, value in enumerate(pm4_cmd): pm4_buffer_view[i] = value
ctypes.memmove(self.aql_ring.va_addr + (self.aql_doorbell_value * AQL_PACKET_SIZE) % self.aql_ring.size,
ctypes.addressof(self.pm4_packet), AQL_PACKET_SIZE)
self.amd_aql_queue.write_dispatch_id = self.aql_doorbell_value + 1
self.aql_doorbell[0] = self.aql_doorbell_value
self.aql_doorbell_value += 1
q = HWComputeQueue()
q.q.append(self.pm4_packet)
q.submit(self)
self._wait_on(self.completion_signal.event_id)
assert (wp:=self.amd_aql_queue.write_dispatch_id) == (rp:=self.amd_aql_queue.read_dispatch_id), f"didn't run {wp} != {rp}"
def synchronize(self):
q = HWComputeQueue()
q.signal(self.completion_signal)
ring_addr = self.aql_ring.va_addr + (self.aql_doorbell_value*AQL_PACKET_SIZE) % self.aql_ring.size
for cmd in q.q: ctypes.memmove(ring_addr, ctypes.addressof(cmd), AQL_PACKET_SIZE)
# one pending packet + ring doorbell
self.amd_aql_queue.write_dispatch_id = self.aql_doorbell_value + 1
self.aql_doorbell[0] = self.aql_doorbell_value
self.aql_doorbell_value += 1
HWComputeQueue().signal(self.completion_signal).submit(self)
self._wait_on(self.completion_signal.event_id)
assert (wp:=self.amd_aql_queue.write_dispatch_id) == (rp:=self.amd_aql_queue.read_dispatch_id), f"didn't run {wp} != {rp}"