fix non-jitted transfers in profile (#5980)

* fix transfers in profile

* fix linter

* sync to be sure everythin is recorded
This commit is contained in:
nimlgen 2024-08-08 17:58:08 +03:00 committed by GitHub
parent 76eca0d27e
commit 183c4c91a3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 209 additions and 180 deletions

View File

@ -1,6 +1,6 @@
import unittest, ctypes, struct, contextlib, tempfile, pathlib, json, time, atexit, random
from tinygrad import Device, Tensor, dtypes, TinyJit
from tinygrad.helpers import CI, getenv, Context
import unittest, ctypes, struct
from tinygrad import Device, Tensor, dtypes
from tinygrad.helpers import CI, getenv
from tinygrad.device import Buffer, BufferOptions, HCQCompiled
from tinygrad.engine.schedule import create_schedule
from tinygrad.engine.realize import get_runner, CompiledRunner
@ -459,180 +459,5 @@ class TestHCQ(unittest.TestCase):
assert buf2.as_buffer()[0] == i
@contextlib.contextmanager
def helper_collect_profile(*devs, random_setup_delay=False):
if random_setup_delay:
devs = list(devs)
for dev in devs: dev.synchronize()
random.shuffle(devs)
for dev in devs:
dev._prof_setup()
time.sleep(random.randint(1, 1000) / 1000)
else:
for dev in devs: dev._prof_setup()
profile_dict = {}
_, tmp = tempfile.mkstemp()
with Context(PROFILE=1, PROFILEPATH=tmp):
try: yield profile_dict
finally:
for dev in devs:
dev.synchronize()
dev._prof_finalize()
atexit.unregister(dev._prof_finalize)
for k,v in json.loads(pathlib.Path(tmp).read_text()).items(): profile_dict[k] = v
pathlib.Path(tmp).unlink()
def helper_profile_filter_node(profile, **kwargs):
assert len(profile) > 0, "Empty profile"
assert 'traceEvents' in profile, "traceEvents should present"
return [x for x in profile['traceEvents'] if all(x.get(k, None) == v for k,v in kwargs.items())]
def helper_profile_parse_pids(profile):
pids, tids = {}, {}
procs = helper_profile_filter_node(profile, name='process_name')
for proc in procs: pids[proc['pid']] = proc['args']['name']
threads = helper_profile_filter_node(profile, name='thread_name')
for th in threads: tids[th['tid']] = th['args']['name']
return pids, tids
def helper_profile_parse_deps(profile):
deps = []
for s in helper_profile_filter_node(profile, ph="s"):
f = helper_profile_filter_node(profile, ph="f", id=s['id'])[0]
starts, ends = [], []
for x in helper_profile_filter_node(profile, ph="X"):
if s['pid'] == x['pid'] and s['tid'] == x['tid'] and x['ts'] <= s['ts'] <= x['ts'] + x['dur']: starts.append(x)
if f['pid'] == x['pid'] and f['tid'] == x['tid'] and x['ts'] <= f['ts'] <= x['ts'] + x['dur']: ends.append(x)
assert len(starts) == 1 and len(ends) == 1, "more than one start and end possible, valid?"
deps.append((s, f, starts[0], ends[0]))
return deps
def helper_validate_node(node, duration_s=10, ts_age_s=30, profile=None, pid_name=None, tid_name=None):
pids, tids = helper_profile_parse_pids(profile)
assert abs(node['ts'] - time.perf_counter_ns() / 1e3) < ts_age_s * 1e6, "timestimp is not in 30s range"
assert 0 < node['dur'] < duration_s * 1e6, "duration is not in 10s range"
assert pid_name is None or pids[node['pid']] == pid_name
assert tid_name is None or tids[node['tid']] == tid_name
@unittest.skipUnless(issubclass(type(Device[Device.DEFAULT]), HCQCompiled), "HCQ device required to run")
class TestProfiler(unittest.TestCase):
@classmethod
def setUpClass(self):
TestProfiler.d0 = Device[Device.DEFAULT]
TestProfiler.a = Tensor([0.,1.], device=Device.DEFAULT).realize()
TestProfiler.b = self.a + 1
si = create_schedule([self.b.lazydata])[-1]
TestProfiler.runner = get_runner(TestProfiler.d0.dname, si.ast)
TestProfiler.b.lazydata.buffer.allocate()
TestProfiler.kernargs_ba_ptr = TestProfiler.runner.clprg.fill_kernargs([TestProfiler.b.lazydata.buffer._buf, TestProfiler.a.lazydata.buffer._buf])
TestProfiler.kernargs_ab_ptr = TestProfiler.runner.clprg.fill_kernargs([TestProfiler.a.lazydata.buffer._buf, TestProfiler.b.lazydata.buffer._buf])
def test_profile_kernel_run(self):
runner_name = TestProfiler.runner.clprg.name
with helper_collect_profile(TestProfiler.d0) as profile:
TestProfiler.runner([TestProfiler.b.lazydata.buffer, TestProfiler.a.lazydata.buffer], var_vals={})
kernel_node = helper_profile_filter_node(profile, name=runner_name)[0]
helper_validate_node(kernel_node, profile=profile, pid_name=Device.DEFAULT, tid_name="COMPUTE")
def test_profile_copyin(self):
buf1 = Buffer(Device.DEFAULT, 2, dtypes.float, options=BufferOptions(nolru=True)).ensure_allocated()
with helper_collect_profile(TestProfiler.d0) as profile:
buf1.copyin(memoryview(bytearray(struct.pack("ff", 0, 1))))
copyin_node = helper_profile_filter_node(profile, name=f"CPU -> {Device.DEFAULT}")[0]
helper_validate_node(copyin_node, profile=profile, pid_name=Device.DEFAULT, tid_name="DMA")
def test_profile_multiops(self):
runner_name = TestProfiler.runner.clprg.name
buf1 = Buffer(Device.DEFAULT, 2, dtypes.float, options=BufferOptions(nolru=True)).ensure_allocated()
with helper_collect_profile(TestProfiler.d0) as profile:
buf1.copyin(memoryview(bytearray(struct.pack("ff", 0, 1))))
TestProfiler.runner([buf1, TestProfiler.a.lazydata.buffer], var_vals={})
buf1.as_buffer()
copyin_node = helper_profile_filter_node(profile, name=f"CPU -> {Device.DEFAULT}")[0]
helper_validate_node(copyin_node, profile=profile, pid_name=Device.DEFAULT, tid_name="DMA")
kernel_node = helper_profile_filter_node(profile, name=runner_name)[0]
helper_validate_node(kernel_node, profile=profile, pid_name=Device.DEFAULT, tid_name="COMPUTE")
copyout_node = helper_profile_filter_node(profile, name=f"{Device.DEFAULT} -> CPU")[0]
helper_validate_node(copyout_node, profile=profile, pid_name=Device.DEFAULT, tid_name="DMA")
assert copyin_node['ts'] + copyin_node['dur'] < kernel_node['ts'], "timestamp not aranged"
assert kernel_node['ts'] + kernel_node['dur'] < copyout_node['ts'], "timestamp not aranged"
def test_profile_multidev(self):
d1 = Device[f"{Device.DEFAULT}:1"]
buf1 = Buffer(Device.DEFAULT, 2, dtypes.float, options=BufferOptions(nolru=True)).ensure_allocated()
buf2 = Buffer(f"{Device.DEFAULT}:1", 2, dtypes.float, options=BufferOptions(nolru=True)).ensure_allocated()
with helper_collect_profile(TestProfiler.d0, d1) as profile:
buf1.copyin(memoryview(bytearray(struct.pack("ff", 0, 1))))
buf2.copyin(memoryview(bytearray(struct.pack("ff", 0, 1))))
copyin_node_1 = helper_profile_filter_node(profile, name=f"CPU -> {Device.DEFAULT}")[0]
helper_validate_node(copyin_node_1, profile=profile, pid_name=Device.DEFAULT, tid_name="DMA")
copyin_node_2 = helper_profile_filter_node(profile, name=f"CPU -> {Device.DEFAULT}:1")[0]
helper_validate_node(copyin_node_2, profile=profile, pid_name=f"{Device.DEFAULT}:1", tid_name="DMA")
@unittest.skipIf(MOCKGPU and Device.DEFAULT == "AMD", "AMD mockgpu with indirect buffers does not support queue wait interrupts")
def test_profile_deps(self):
d1 = Device[f"{Device.DEFAULT}:1"]
def f(a):
x = (a + 1).realize()
return x, x.to(d1.dname).realize()
a = Tensor.randn(10, 10, device=TestProfiler.d0.dname).realize()
with helper_collect_profile(TestProfiler.d0, d1) as profile:
jf = TinyJit(f)
for _ in range(3): jf(a)
del jf
deps = helper_profile_parse_deps(profile)
assert len(deps) == 1, "one dep is expected, one launch"
_, _, l, r = deps[0]
assert l['name'].find("->") == -1, "should be kernel"
assert r['name'] == f"{Device.DEFAULT} -> {Device.DEFAULT}:1", "should be copy"
@unittest.skipIf(CI, "skip CI")
def test_profile_sync(self):
mv = memoryview(bytearray(struct.pack("ff", 0, 1)))
expected_diff = 100000 # sleep in us
devs = [Device[f"{Device.DEFAULT}:{i}"] for i in range(6)]
bufs = [Buffer(f"{Device.DEFAULT}:{i}", 2, dtypes.float, options=BufferOptions(nolru=True)).ensure_allocated() for i in range(6)]
# enqueue ops on different queues to check the timer sync
cpu_time = []
with helper_collect_profile(*devs, random_setup_delay=True) as profile:
for i in range(6):
x = time.perf_counter_ns()
time.sleep(expected_diff / 1e6)
bufs[i].copyin(mv)
cpu_time.append(((time.perf_counter_ns() - x) / 1000) - expected_diff)
nodes = [helper_profile_filter_node(profile, name=f"CPU -> {Device.canonicalize(f'{Device.DEFAULT}:{i}')}")[-1] for i in range(6)]
avg_diff = []
for i in range(1, 6):
diff = nodes[i]['ts'] - nodes[i-1]['ts'] - cpu_time[i]
avg_diff.append(diff - expected_diff)
assert expected_diff * 0.998 < diff < expected_diff * 1.002, "more that 0.2% diff"
print(f"total avg delay is {sum(avg_diff) / len(avg_diff)} us")
if __name__ == "__main__":
unittest.main()

199
test/test_profiler.py Normal file
View File

@ -0,0 +1,199 @@
import unittest, struct, contextlib, tempfile, pathlib, json, time, atexit, random
from tinygrad import Device, Tensor, dtypes, TinyJit
from tinygrad.helpers import CI, getenv, Context, ProfileLogger
from tinygrad.device import Buffer, BufferOptions, HCQCompiled
from tinygrad.engine.schedule import create_schedule
from tinygrad.engine.realize import get_runner
MOCKGPU = getenv("MOCKGPU")
@contextlib.contextmanager
def helper_collect_profile(*devs, random_setup_delay=False):
ProfileLogger.mjson, ProfileLogger.actors = [], {}
if random_setup_delay:
devs = list(devs)
for dev in devs: dev.synchronize()
random.shuffle(devs)
for dev in devs:
dev._prof_setup()
time.sleep(random.randint(1, 1000) / 1000)
else:
for dev in devs: dev._prof_setup()
profile_dict = {}
_, tmp = tempfile.mkstemp()
with Context(PROFILE=1, PROFILEPATH=tmp):
try: yield profile_dict
finally:
for dev in devs:
dev.synchronize()
dev._prof_finalize()
atexit.unregister(dev._prof_finalize)
for k,v in json.loads(pathlib.Path(tmp).read_text()).items(): profile_dict[k] = v
pathlib.Path(tmp).unlink()
def helper_profile_filter_node(profile, **kwargs):
assert len(profile) > 0, "Empty profile"
assert 'traceEvents' in profile, "traceEvents should present"
return [x for x in profile['traceEvents'] if all(x.get(k, None) == v for k,v in kwargs.items())]
def helper_profile_parse_pids(profile):
pids, tids = {}, {}
procs = helper_profile_filter_node(profile, name='process_name')
for proc in procs: pids[proc['pid']] = proc['args']['name']
threads = helper_profile_filter_node(profile, name='thread_name')
for th in threads: tids[th['tid']] = th['args']['name']
return pids, tids
def helper_profile_parse_deps(profile):
deps = []
for s in helper_profile_filter_node(profile, ph="s"):
f = helper_profile_filter_node(profile, ph="f", id=s['id'])[0]
starts, ends = [], []
for x in helper_profile_filter_node(profile, ph="X"):
if s['pid'] == x['pid'] and s['tid'] == x['tid'] and x['ts'] <= s['ts'] <= x['ts'] + x['dur']: starts.append(x)
if f['pid'] == x['pid'] and f['tid'] == x['tid'] and x['ts'] <= f['ts'] <= x['ts'] + x['dur']: ends.append(x)
assert len(starts) == 1 and len(ends) == 1, "more than one start and end possible, valid?"
deps.append((s, f, starts[0], ends[0]))
return deps
def helper_validate_node(node, duration_s=10, ts_age_s=30, profile=None, pid_name=None, tid_name=None):
pids, tids = helper_profile_parse_pids(profile)
assert abs(node['ts'] - time.perf_counter_ns() / 1e3) < ts_age_s * 1e6, "timestimp is not in 30s range"
assert 0 < node['dur'] < duration_s * 1e6, "duration is not in 10s range"
assert pid_name is None or pids[node['pid']] == pid_name
assert tid_name is None or tids[node['tid']] == tid_name
@unittest.skipUnless(issubclass(type(Device[Device.DEFAULT]), HCQCompiled), "HCQ device required to run")
class TestProfiler(unittest.TestCase):
@classmethod
def setUpClass(self):
TestProfiler.d0 = Device[Device.DEFAULT]
TestProfiler.a = Tensor([0.,1.], device=Device.DEFAULT).realize()
TestProfiler.b = self.a + 1
si = create_schedule([self.b.lazydata])[-1]
TestProfiler.runner = get_runner(TestProfiler.d0.dname, si.ast)
TestProfiler.b.lazydata.buffer.allocate()
TestProfiler.kernargs_ba_ptr = TestProfiler.runner.clprg.fill_kernargs([TestProfiler.b.lazydata.buffer._buf, TestProfiler.a.lazydata.buffer._buf])
TestProfiler.kernargs_ab_ptr = TestProfiler.runner.clprg.fill_kernargs([TestProfiler.a.lazydata.buffer._buf, TestProfiler.b.lazydata.buffer._buf])
def test_profile_kernel_run(self):
runner_name = TestProfiler.runner.clprg.name
with helper_collect_profile(TestProfiler.d0) as profile:
TestProfiler.runner([TestProfiler.b.lazydata.buffer, TestProfiler.a.lazydata.buffer], var_vals={})
kernel_node = helper_profile_filter_node(profile, name=runner_name)[0]
helper_validate_node(kernel_node, profile=profile, pid_name=Device.DEFAULT, tid_name="COMPUTE")
def test_profile_copyin(self):
buf1 = Buffer(Device.DEFAULT, 2, dtypes.float, options=BufferOptions(nolru=True)).ensure_allocated()
with helper_collect_profile(TestProfiler.d0) as profile:
buf1.copyin(memoryview(bytearray(struct.pack("ff", 0, 1))))
copyin_node = helper_profile_filter_node(profile, name=f"CPU -> {Device.DEFAULT}")[0]
helper_validate_node(copyin_node, profile=profile, pid_name=Device.DEFAULT, tid_name="DMA")
def test_profile_multiops(self):
runner_name = TestProfiler.runner.clprg.name
buf1 = Buffer(Device.DEFAULT, 2, dtypes.float, options=BufferOptions(nolru=True)).ensure_allocated()
with helper_collect_profile(TestProfiler.d0) as profile:
buf1.copyin(memoryview(bytearray(struct.pack("ff", 0, 1))))
TestProfiler.runner([buf1, TestProfiler.a.lazydata.buffer], var_vals={})
buf1.as_buffer()
copyin_node = helper_profile_filter_node(profile, name=f"CPU -> {Device.DEFAULT}")[0]
helper_validate_node(copyin_node, profile=profile, pid_name=Device.DEFAULT, tid_name="DMA")
kernel_node = helper_profile_filter_node(profile, name=runner_name)[0]
helper_validate_node(kernel_node, profile=profile, pid_name=Device.DEFAULT, tid_name="COMPUTE")
copyout_node = helper_profile_filter_node(profile, name=f"{Device.DEFAULT} -> CPU")[0]
helper_validate_node(copyout_node, profile=profile, pid_name=Device.DEFAULT, tid_name="DMA")
assert copyin_node['ts'] + copyin_node['dur'] < kernel_node['ts'], "timestamp not aranged"
assert kernel_node['ts'] + kernel_node['dur'] < copyout_node['ts'], "timestamp not aranged"
def test_profile_multidev_copyin(self):
d1 = Device[f"{Device.DEFAULT}:1"]
buf1 = Buffer(Device.DEFAULT, 2, dtypes.float, options=BufferOptions(nolru=True)).ensure_allocated()
buf2 = Buffer(f"{Device.DEFAULT}:1", 2, dtypes.float, options=BufferOptions(nolru=True)).ensure_allocated()
with helper_collect_profile(TestProfiler.d0, d1) as profile:
buf1.copyin(memoryview(bytearray(struct.pack("ff", 0, 1))))
buf2.copyin(memoryview(bytearray(struct.pack("ff", 0, 1))))
copyin_node_1 = helper_profile_filter_node(profile, name=f"CPU -> {Device.DEFAULT}")[0]
helper_validate_node(copyin_node_1, profile=profile, pid_name=Device.DEFAULT, tid_name="DMA")
copyin_node_2 = helper_profile_filter_node(profile, name=f"CPU -> {Device.DEFAULT}:1")[0]
helper_validate_node(copyin_node_2, profile=profile, pid_name=f"{Device.DEFAULT}:1", tid_name="DMA")
def test_profile_multidev_transfer(self):
d1 = Device[f"{Device.DEFAULT}:1"]
a = Tensor.randn(1 << 20, device=Device.DEFAULT).realize()
with helper_collect_profile(TestProfiler.d0, d1) as profile:
y = a.to(f"{Device.DEFAULT}:1")
y.realize()
transfer_node_1 = helper_profile_filter_node(profile, name=f"{Device.DEFAULT} -> {Device.DEFAULT}:1")[0]
helper_validate_node(transfer_node_1, profile=profile, pid_name=Device.DEFAULT, tid_name="DMA")
assert 80 < transfer_node_1['dur'] < (5000 if CI else 1400), f"Duration is not in the range: {transfer_node_1['dur']}"
@unittest.skipIf(MOCKGPU and Device.DEFAULT == "AMD", "AMD mockgpu with indirect buffers does not support queue wait interrupts")
def test_profile_deps(self):
d1 = Device[f"{Device.DEFAULT}:1"]
def f(a):
x = (a + 1).realize()
return x, x.to(d1.dname).realize()
a = Tensor.randn(10, 10, device=TestProfiler.d0.dname).realize()
with helper_collect_profile(TestProfiler.d0, d1) as profile:
jf = TinyJit(f)
for _ in range(3): jf(a)
del jf
deps = helper_profile_parse_deps(profile)
assert len(deps) == 1, "one dep is expected, one launch"
_, _, l, r = deps[0]
assert l['name'].find("->") == -1, "should be kernel"
assert r['name'] == f"{Device.DEFAULT} -> {Device.DEFAULT}:1", "should be copy"
@unittest.skipIf(CI, "skip CI")
def test_profile_sync(self):
mv = memoryview(bytearray(struct.pack("ff", 0, 1)))
expected_diff = 100000 # sleep in us
devs = [Device[f"{Device.DEFAULT}:{i}"] for i in range(6)]
bufs = [Buffer(f"{Device.DEFAULT}:{i}", 2, dtypes.float, options=BufferOptions(nolru=True)).ensure_allocated() for i in range(6)]
# enqueue ops on different queues to check the timer sync
cpu_time = []
with helper_collect_profile(*devs, random_setup_delay=True) as profile:
for i in range(6):
x = time.perf_counter_ns()
time.sleep(expected_diff / 1e6)
bufs[i].copyin(mv)
cpu_time.append(((time.perf_counter_ns() - x) / 1000) - expected_diff)
nodes = [helper_profile_filter_node(profile, name=f"CPU -> {Device.canonicalize(f'{Device.DEFAULT}:{i}')}")[-1] for i in range(6)]
avg_diff = []
for i in range(1, 6):
diff = nodes[i]['ts'] - nodes[i-1]['ts'] - cpu_time[i]
avg_diff.append(diff - expected_diff)
assert expected_diff * 0.998 < diff < expected_diff * 1.002, "more that 0.2% diff"
print(f"total avg delay is {sum(avg_diff) / len(avg_diff)} us")
if __name__ == "__main__":
unittest.main()

View File

@ -419,7 +419,9 @@ def hcq_profile(dev, enabled, desc, queue_type=None, queue=None):
st, en = (dev.signal_t(), dev.signal_t()) if enabled else (None, None)
if enabled and queue is not None: queue.timestamp(st)
elif enabled: queue_type().timestamp(st).submit(dev)
elif enabled:
queue_type().wait(dev.timeline_signal, dev.timeline_value - 1).timestamp(st).signal(dev.timeline_signal, dev.timeline_value).submit(dev)
dev.timeline_value += 1
try: yield (st, en)
finally:
@ -575,6 +577,9 @@ class HCQCompiled(Compiled):
def _prof_finalize(self):
qname = ["COMPUTE", "DMA"]
# Sync to be sure all events on the device are recorded.
self.synchronize()
for st, en, name, is_cp in self.raw_prof_records:
self.profile_logger.events += [(name, self._gpu2cpu_time(st, is_cp), self._gpu2cpu_time(en, is_cp), self.dname, qname[is_cp])]
for a_st, a_en, a_dev, a_is_copy, b_st, b_en, b_dev, b_is_copy in self.dep_prof_records:
@ -653,7 +658,7 @@ class HCQAllocator(LRUAllocator): # pylint: disable=abstract-method
def transfer(self, dest:HCQBuffer, src:HCQBuffer, sz:int, src_dev, dest_dev):
src_dev.allocator.map(dest)
with hcq_profile(self.device, queue_type=self.device.hw_copy_queue_t, desc=f"{src_dev.dname} -> {dest_dev.dname}", enabled=PROFILE):
with hcq_profile(src_dev, queue_type=src_dev.hw_copy_queue_t, desc=f"{src_dev.dname} -> {dest_dev.dname}", enabled=PROFILE):
src_dev.hw_copy_queue_t().wait(src_dev.timeline_signal, src_dev.timeline_value - 1) \
.wait(dest_dev.timeline_signal, dest_dev.timeline_value - 1) \
.copy(dest.va_addr, src.va_addr, sz) \