From 590b9ebb34737b46e5a0c489bffc76d769f5a576 Mon Sep 17 00:00:00 2001 From: nimlgen <138685161+nimlgen@users.noreply.github.com> Date: Mon, 5 Aug 2024 14:03:25 +0300 Subject: [PATCH] hcq copy queue is optional (#5909) * hcq copy queue is optional * one more * this --- test/test_hcq.py | 63 +++++++++++++++++++++++++++++++++++++++++++--- tinygrad/device.py | 5 ++-- 2 files changed, 63 insertions(+), 5 deletions(-) diff --git a/test/test_hcq.py b/test/test_hcq.py index 1ac72df9..f107d808 100644 --- a/test/test_hcq.py +++ b/test/test_hcq.py @@ -31,13 +31,17 @@ class TestHCQ(unittest.TestCase): # Test signals def test_signal(self): for queue_type in [TestHCQ.d0.hw_compute_queue_t, TestHCQ.d0.hw_copy_queue_t]: + if queue_type is None: continue + with self.subTest(name=str(queue_type)): queue_type().signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0) TestHCQ.d0.timeline_signal.wait(TestHCQ.d0.timeline_value) TestHCQ.d0.timeline_value += 1 def test_signal_update(self): - for queue_type in [TestHCQ.d0.hw_compute_queue_t]: + for queue_type in [TestHCQ.d0.hw_compute_queue_t, TestHCQ.d0.hw_copy_queue_t]: + if queue_type is None: continue + with self.subTest(name=str(queue_type)): q = queue_type().signal(TestHCQ.d0.signal_t(), 0x1000) @@ -52,6 +56,8 @@ class TestHCQ(unittest.TestCase): # Test wait def test_wait(self): for queue_type in [TestHCQ.d0.hw_compute_queue_t, TestHCQ.d0.hw_copy_queue_t]: + if queue_type is None: continue + with self.subTest(name=str(queue_type)): fake_signal = TestHCQ.d0.signal_t() fake_signal.value = 1 @@ -63,6 +69,8 @@ class TestHCQ(unittest.TestCase): @unittest.skipIf(MOCKGPU, "Can't handle async update on MOCKGPU for now") def test_wait_late_set(self): for queue_type in [TestHCQ.d0.hw_compute_queue_t, TestHCQ.d0.hw_copy_queue_t]: + if queue_type is None: continue + with self.subTest(name=str(queue_type)): fake_signal = TestHCQ.d0.signal_t() queue_type().wait(fake_signal, 1) \ @@ -78,6 +86,8 @@ class TestHCQ(unittest.TestCase): def test_wait_update(self): for queue_type in [TestHCQ.d0.hw_compute_queue_t, TestHCQ.d0.hw_copy_queue_t]: + if queue_type is None: continue + with self.subTest(name=str(queue_type)): fake_signal = TestHCQ.d0.signal_t() q = queue_type().wait(TestHCQ.d0.timeline_signal, 0xffffffff).signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value) @@ -126,6 +136,8 @@ class TestHCQ(unittest.TestCase): # Test copy def test_copy(self): + if TestHCQ.d0.hw_copy_queue_t is None: self.skipTest("device does not support copy queue") + TestHCQ.d0.hw_copy_queue_t().wait(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1) \ .copy(TestHCQ.b.lazydata.buffer._buf.va_addr, TestHCQ.a.lazydata.buffer._buf.va_addr, 8) \ .signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0) @@ -136,6 +148,8 @@ class TestHCQ(unittest.TestCase): assert (val:=TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[1]) == 1.0, f"got val {val}" def test_copy_long(self): + if TestHCQ.d0.hw_copy_queue_t is None: self.skipTest("device does not support copy queue") + sz = 64 << 20 buf1 = Buffer(Device.DEFAULT, sz, dtypes.int8, options=BufferOptions(nolru=True)).ensure_allocated() buf2 = Buffer(Device.DEFAULT, sz, dtypes.int8, options=BufferOptions(host=True, nolru=True)).ensure_allocated() @@ -152,6 +166,8 @@ class TestHCQ(unittest.TestCase): for i in range(sz//8): assert mv_buf1[i] == 0x0101010101010101, f"offset {i*8} differs, not all copied, got {hex(mv_buf1[i])}" def test_update_copy(self): + if TestHCQ.d0.hw_copy_queue_t is None: self.skipTest("device does not support copy queue") + q = TestHCQ.d0.hw_copy_queue_t().wait(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1) \ .copy(0x0, 0x0, 8) \ .signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value) @@ -165,6 +181,8 @@ class TestHCQ(unittest.TestCase): assert (val:=TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[1]) == 1.0, f"got val {val}" def test_update_copy_long(self): + if TestHCQ.d0.hw_copy_queue_t is None: self.skipTest("device does not support copy queue") + sz = 64 << 20 buf1 = Buffer(Device.DEFAULT, sz, dtypes.int8, options=BufferOptions(nolru=True)).ensure_allocated() buf2 = Buffer(Device.DEFAULT, sz, dtypes.int8, options=BufferOptions(host=True, nolru=True)).ensure_allocated() @@ -186,9 +204,9 @@ class TestHCQ(unittest.TestCase): # Test bind api def test_bind(self): for queue_type in [TestHCQ.d0.hw_compute_queue_t, TestHCQ.d0.hw_copy_queue_t]: - with self.subTest(name=str(queue_type)): - if not hasattr(queue_type(), 'bind'): self.skipTest("queue does not support bind api") + if queue_type is None: continue + with self.subTest(name=str(queue_type)): fake_signal = TestHCQ.d0.signal_t() q = queue_type().wait(TestHCQ.d0.timeline_signal, 0xffffffff).signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value) q.bind(TestHCQ.d0) @@ -201,6 +219,8 @@ class TestHCQ(unittest.TestCase): # Test multidevice def test_multidevice_signal_wait(self): + if TestHCQ.d0.hw_copy_queue_t is None: self.skipTest("device does not support copy queue") + d1 = Device[f"{Device.DEFAULT}:1"] TestHCQ.d0.hw_copy_queue_t().signal(sig:=TestHCQ.d0.signal_t(value=0), value=0xfff) \ @@ -234,6 +254,8 @@ class TestHCQ(unittest.TestCase): assert 1 <= et <= (2500 if CI else 20) def test_speed_copy_bandwidth(self): + if TestHCQ.d0.hw_copy_queue_t is None: self.skipTest("device does not support copy queue") + TestHCQ.d0._prof_setup() # THEORY: the bandwidth is low here because it's only using one SDMA queue. I suspect it's more stable like this at least. @@ -258,6 +280,8 @@ class TestHCQ(unittest.TestCase): assert (0.3 if CI else 10) <= gb_s <= 1000 def test_speed_cross_device_copy_bandwidth(self): + if TestHCQ.d0.hw_copy_queue_t is None: self.skipTest("device does not support copy queue") + TestHCQ.d0._prof_setup() SZ = 2_000_000_000 @@ -283,6 +307,8 @@ class TestHCQ(unittest.TestCase): def test_timeline_signal_rollover(self): for queue_type in [TestHCQ.d0.hw_compute_queue_t, TestHCQ.d0.hw_copy_queue_t]: + if queue_type is None: continue + with self.subTest(name=str(queue_type)): TestHCQ.d0.timeline_value = (1 << 32) - 20 # close value to reset queue_type().signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1).submit(TestHCQ.d0) @@ -295,6 +321,8 @@ class TestHCQ(unittest.TestCase): TestHCQ.d0.synchronize() def test_small_copies_from_host_buf(self): + if TestHCQ.d0.hw_copy_queue_t is None: self.skipTest("device does not support copy queue") + buf1 = Buffer(Device.DEFAULT, 1, dtypes.int8, options=BufferOptions(nolru=True)).ensure_allocated() buf2 = Buffer(Device.DEFAULT, 1, dtypes.int8, options=BufferOptions(host=True, nolru=True)).ensure_allocated() @@ -310,6 +338,8 @@ class TestHCQ(unittest.TestCase): assert buf1.as_buffer()[0] == i def test_small_copies_from_host_buf_intercopy(self): + if TestHCQ.d0.hw_copy_queue_t is None: self.skipTest("device does not support copy queue") + buf1 = Buffer(Device.DEFAULT, 1, dtypes.int8, options=BufferOptions(nolru=True)).ensure_allocated() buf2 = Buffer(Device.DEFAULT, 1, dtypes.int8, options=BufferOptions(nolru=True)).ensure_allocated() buf3 = Buffer(Device.DEFAULT, 1, dtypes.int8, options=BufferOptions(host=True, nolru=True)).ensure_allocated() @@ -327,6 +357,8 @@ class TestHCQ(unittest.TestCase): assert buf2.as_buffer()[0] == i def test_small_copies_from_host_buf_transfer(self): + if TestHCQ.d0.hw_copy_queue_t is None: self.skipTest("device does not support copy queue") + _ = Device[f"{Device.DEFAULT}:1"] buf1 = Buffer(Device.DEFAULT, 1, dtypes.int8, options=BufferOptions(nolru=True)).ensure_allocated() @@ -347,6 +379,31 @@ class TestHCQ(unittest.TestCase): assert buf2.as_buffer()[0] == i def test_memory_barrier(self): + a = Tensor([0, 1], device=Device.DEFAULT, dtype=dtypes.int8).realize() + b = a + 1 + runner = get_runner(TestHCQ.d0.dname, create_schedule([b.lazydata])[-1].ast) + + buf1 = Buffer(Device.DEFAULT, 2, dtypes.int8, options=BufferOptions(nolru=True)).ensure_allocated() + buf2 = Buffer(Device.DEFAULT, 2, dtypes.int8, options=BufferOptions(cpu_access=True, nolru=True)).ensure_allocated() + + kernargs_ptr = runner.clprg.fill_kernargs([buf1._buf, buf2._buf]) + + for i in range(255): + ctypes.memset(buf2._buf.va_addr, i, 2) + + # Need memory_barrier after direct write to vram + TestHCQ.d0.hw_compute_queue_t().wait(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1) \ + .memory_barrier() \ + .exec(runner.clprg, kernargs_ptr, runner.p.global_size, runner.p.local_size) \ + .signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0) + TestHCQ.d0.timeline_signal.wait(TestHCQ.d0.timeline_value) + TestHCQ.d0.timeline_value += 1 + + assert buf1.as_buffer()[0] == (i + 1), f"has {buf1.as_buffer()[0]}, need {i + 1}" + + def test_memory_barrier_before_copy(self): + if TestHCQ.d0.hw_copy_queue_t is None: self.skipTest("device does not support copy queue") + buf1 = Buffer(Device.DEFAULT, 1, dtypes.int8, options=BufferOptions(nolru=True)).ensure_allocated() buf2 = Buffer(Device.DEFAULT, 1, dtypes.int8, options=BufferOptions(nolru=True)).ensure_allocated() buf3 = Buffer(Device.DEFAULT, 1, dtypes.int8, options=BufferOptions(cpu_access=True, nolru=True)).ensure_allocated() diff --git a/tinygrad/device.py b/tinygrad/device.py index ed67ee81..853cfd46 100644 --- a/tinygrad/device.py +++ b/tinygrad/device.py @@ -486,7 +486,7 @@ class HCQCompiled(Compiled): gpu2cpu_compute_time_diff: decimal.Decimal = decimal.Decimal('nan') def __init__(self, device:str, allocator:Allocator, renderer:Renderer, compiler:Compiler, runtime, signal_t:Type[HCQSignal], - comp_queue_t:Type[HWComputeQueue], copy_queue_t:Type[HWCopyQueue], timeline_signals:Tuple[HCQSignal, HCQSignal]): + comp_queue_t:Type[HWComputeQueue], copy_queue_t:Optional[Type[HWCopyQueue]], timeline_signals:Tuple[HCQSignal, HCQSignal]): self.signal_t, self.hw_compute_queue_t, self.hw_copy_queue_t = signal_t, comp_queue_t, copy_queue_t self.timeline_value:int = 1 self.timeline_signal, self._shadow_timeline_signal = timeline_signals @@ -530,7 +530,8 @@ class HCQCompiled(Compiled): return (decimal.Decimal(et+st) / 2000) - d.timeline_signal.timestamp # randomly sample the timing from GPU to CPU - choices: List = [(d, d.hw_compute_queue_t, []) for d in self.devices] + [(d, d.hw_copy_queue_t, []) for d in self.devices] + choices: List = [(d, d.hw_compute_queue_t, []) for d in self.devices] + choices += [(d, d.hw_copy_queue_t, []) for d in self.devices if d.hw_copy_queue_t is not None] for _ in range(100*len(self.devices)): d,q,l = random.choice(choices) l.append(_sync_cpu_queue(d,q))