simple fuzzing test for all processes (#28584)
* working test * classmethod * review * add to ci old-commit-hash: bac193bdd518a260bfbbdb4893520c4263ab2be6
This commit is contained in:
1
.github/workflows/selfdrive_tests.yaml
vendored
1
.github/workflows/selfdrive_tests.yaml
vendored
@@ -228,6 +228,7 @@ jobs:
|
||||
./tools/replay/tests/test_replay && \
|
||||
./tools/cabana/tests/test_cabana && \
|
||||
./system/camerad/test/ae_gray_test && \
|
||||
./selfdrive/test/process_replay/test_fuzzy.py && \
|
||||
coverage xml"
|
||||
- name: "Upload coverage to Codecov"
|
||||
uses: codecov/codecov-action@v2
|
||||
|
||||
@@ -9,14 +9,14 @@ from cereal import car
|
||||
from selfdrive.car import gen_empty_fingerprint
|
||||
from selfdrive.car.car_helpers import interfaces
|
||||
from selfdrive.car.fingerprints import _FINGERPRINTS as FINGERPRINTS, all_known_cars
|
||||
from selfdrive.test.fuzzy_generation import get_random_msg
|
||||
from selfdrive.test.fuzzy_generation import FuzzyGenerator
|
||||
|
||||
|
||||
class TestCarInterfaces(unittest.TestCase):
|
||||
|
||||
@parameterized.expand([(car,) for car in all_known_cars()])
|
||||
@settings(max_examples=5)
|
||||
@given(cc_msg=get_random_msg(car.CarControl, real_floats=True))
|
||||
@given(cc_msg=FuzzyGenerator.get_random_msg(car.CarControl, real_floats=True))
|
||||
def test_car_interfaces(self, car_name, cc_msg):
|
||||
if car_name in FINGERPRINTS:
|
||||
fingerprint = FINGERPRINTS[car_name][0]
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import hypothesis.strategies as st
|
||||
import random
|
||||
|
||||
from cereal import log
|
||||
|
||||
class FuzzyGenerator:
|
||||
def __init__(self, real_floats):
|
||||
self.real_floats=real_floats
|
||||
@@ -60,10 +62,16 @@ class FuzzyGenerator:
|
||||
else:
|
||||
return self.generate_struct(field.schema)
|
||||
|
||||
def generate_struct(self, schema):
|
||||
def generate_struct(self, schema, required=None):
|
||||
full_fill = list(schema.non_union_fields) if schema.non_union_fields else []
|
||||
single_fill = [random.choice(schema.union_fields)] if schema.union_fields else []
|
||||
single_fill = [required] if required else [random.choice(schema.union_fields)] if schema.union_fields else []
|
||||
return st.fixed_dictionaries(dict((field, self.generate_field(schema.fields[field])) for field in full_fill + single_fill))
|
||||
|
||||
def get_random_msg(struct, real_floats=False):
|
||||
return FuzzyGenerator(real_floats=real_floats).generate_struct(struct.schema)
|
||||
@classmethod
|
||||
def get_random_msg(cls, struct, real_floats=False):
|
||||
return cls(real_floats=real_floats).generate_struct(struct.schema)
|
||||
|
||||
@classmethod
|
||||
def get_random_event_msg(cls, required, real_floats=False):
|
||||
fg = cls(real_floats=real_floats)
|
||||
return st.tuples(*[fg.generate_struct(log.Event.schema, r) for r in required])
|
||||
|
||||
@@ -1,174 +1,30 @@
|
||||
#!/usr/bin/env python3
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
from hypothesis import given, HealthCheck, Phase, settings
|
||||
import hypothesis.strategies as st
|
||||
import numpy as np
|
||||
from hypothesis import given, settings, note
|
||||
from parameterized import parameterized
|
||||
import unittest
|
||||
|
||||
from cereal import log
|
||||
from selfdrive.car.toyota.values import CAR as TOYOTA
|
||||
from selfdrive.test.fuzzy_generation import FuzzyGenerator
|
||||
import selfdrive.test.process_replay.process_replay as pr
|
||||
|
||||
# These processes currently fail because of unrealistic data breaking assumptions
|
||||
# that openpilot makes causing error with NaN, inf, int size, array indexing ...
|
||||
# TODO: Make each one testable
|
||||
NOT_TESTED = ['controlsd', 'plannerd', 'calibrationd', 'dmonitoringd', 'paramsd', 'laikad']
|
||||
TEST_CASES = [(cfg.proc_name, cfg) for cfg in pr.CONFIGS if cfg.proc_name not in NOT_TESTED]
|
||||
|
||||
def get_process_config(process):
|
||||
return [cfg for cfg in pr.CONFIGS if cfg.proc_name == process][0]
|
||||
|
||||
|
||||
def get_event_union_strategy(r, name):
|
||||
return st.fixed_dictionaries({
|
||||
'valid': st.just(True),
|
||||
'logMonoTime': st.integers(min_value=0, max_value=2**64-1),
|
||||
name: r[name[0].upper() + name[1:]],
|
||||
})
|
||||
|
||||
|
||||
def get_strategy_for_events(event_types, finite=False):
|
||||
# TODO: generate automatically based on capnp definitions
|
||||
def floats(**kwargs):
|
||||
allow_nan = False if finite else None
|
||||
allow_infinity = False if finite else None
|
||||
return st.floats(**kwargs, allow_nan=allow_nan, allow_infinity=allow_infinity)
|
||||
|
||||
r = {}
|
||||
r['liveLocationKalman.Measurement'] = st.fixed_dictionaries({
|
||||
'value': st.lists(floats(), min_size=3, max_size=3),
|
||||
'std': st.lists(floats(), min_size=3, max_size=3),
|
||||
'valid': st.just(True),
|
||||
})
|
||||
r['LiveLocationKalman'] = st.fixed_dictionaries({
|
||||
'angularVelocityCalibrated': r['liveLocationKalman.Measurement'],
|
||||
'inputsOK': st.booleans(),
|
||||
'posenetOK': st.booleans(),
|
||||
})
|
||||
r['CarState'] = st.fixed_dictionaries({
|
||||
'vEgo': floats(width=32),
|
||||
'vEgoRaw': floats(width=32),
|
||||
'steeringPressed': st.booleans(),
|
||||
'steeringAngleDeg': floats(width=32),
|
||||
})
|
||||
r['CameraOdometry'] = st.fixed_dictionaries({
|
||||
'frameId': st.integers(min_value=0, max_value=2**32 - 1),
|
||||
'timestampEof': st.integers(min_value=0, max_value=2**64 - 1),
|
||||
'trans': st.lists(floats(width=32), min_size=3, max_size=3),
|
||||
'rot': st.lists(floats(width=32), min_size=3, max_size=3),
|
||||
'transStd': st.lists(floats(width=32), min_size=3, max_size=3),
|
||||
'rotStd': st.lists(floats(width=32), min_size=3, max_size=3),
|
||||
})
|
||||
r['SensorEventData.SensorVec'] = st.fixed_dictionaries({
|
||||
'v': st.lists(floats(width=32), min_size=3, max_size=3),
|
||||
'status': st.just(1),
|
||||
})
|
||||
r['SensorEventData_gyro'] = st.fixed_dictionaries({
|
||||
'version': st.just(1),
|
||||
'sensor': st.just(5),
|
||||
'type': st.just(16),
|
||||
'timestamp': st.integers(min_value=0, max_value=2**63 - 1),
|
||||
'source': st.just(8), # BMX055
|
||||
'gyroUncalibrated': r['SensorEventData.SensorVec'],
|
||||
})
|
||||
r['SensorEventData_accel'] = st.fixed_dictionaries({
|
||||
'version': st.just(1),
|
||||
'sensor': st.just(1),
|
||||
'type': st.just(1),
|
||||
'timestamp': st.integers(min_value=0, max_value=2**63 - 1),
|
||||
'source': st.just(8), # BMX055
|
||||
'acceleration': r['SensorEventData.SensorVec'],
|
||||
})
|
||||
r['SensorEvents'] = st.lists(st.one_of(r['SensorEventData_gyro'], r['SensorEventData_accel']), min_size=1)
|
||||
r['GpsLocationExternal'] = st.fixed_dictionaries({
|
||||
'flags': st.just(1),
|
||||
'latitude': floats(),
|
||||
'longitude': floats(),
|
||||
'altitude': floats(),
|
||||
'speed': floats(width=32),
|
||||
'bearingDeg': floats(width=32),
|
||||
'accuracy': floats(width=32),
|
||||
'timestamp': st.integers(min_value=0, max_value=2**63 - 1),
|
||||
'source': st.just(6), # Ublox
|
||||
'vNED': st.lists(floats(width=32), min_size=3, max_size=3),
|
||||
'verticalAccuracy': floats(width=32),
|
||||
'bearingAccuracyDeg': floats(width=32),
|
||||
'speedAccuracy': floats(width=32),
|
||||
})
|
||||
r['LiveCalibration'] = st.fixed_dictionaries({
|
||||
'rpyCalib': st.lists(floats(width=32), min_size=3, max_size=3),
|
||||
})
|
||||
|
||||
return st.lists(st.one_of(*[get_event_union_strategy(r, n) for n in event_types]))
|
||||
|
||||
|
||||
def get_strategy_for_process(process, finite=False):
|
||||
return get_strategy_for_events(get_process_config(process).pubs, finite)
|
||||
|
||||
|
||||
def convert_to_lr(msgs):
|
||||
return [log.Event.new_message(**m).as_reader() for m in msgs]
|
||||
|
||||
|
||||
def is_finite(d, exclude=[], prefix=""): # pylint: disable=dangerous-default-value
|
||||
ret = True
|
||||
for k, v in d.items():
|
||||
name = prefix + f"{k}"
|
||||
if name in exclude:
|
||||
continue
|
||||
|
||||
if isinstance(v, dict):
|
||||
if not is_finite(v, exclude, name + "."):
|
||||
ret = False
|
||||
else:
|
||||
try:
|
||||
if not np.isfinite(v).all():
|
||||
note((name, v))
|
||||
ret = False
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
def test_process(dat, name):
|
||||
cfg = get_process_config(name)
|
||||
lr = convert_to_lr(dat)
|
||||
pr.TIMEOUT = 0.1
|
||||
return pr.replay_process(cfg, lr, TOYOTA.COROLLA_TSS2)
|
||||
|
||||
|
||||
class TestFuzzy(unittest.TestCase):
|
||||
@given(get_strategy_for_process('paramsd'))
|
||||
@settings(deadline=1000)
|
||||
def test_paramsd(self, dat):
|
||||
for r in test_process(dat, 'paramsd'):
|
||||
d = r.liveParameters.to_dict()
|
||||
assert is_finite(d)
|
||||
|
||||
@given(get_strategy_for_process('locationd', finite=True))
|
||||
@settings(deadline=1000)
|
||||
def test_locationd(self, dat):
|
||||
exclude = [
|
||||
'positionGeodetic.std',
|
||||
'velocityNED.std',
|
||||
'orientationNED.std',
|
||||
'calibratedOrientationECEF.std',
|
||||
]
|
||||
for r in test_process(dat, 'locationd'):
|
||||
d = r.liveLocationKalman.to_dict()
|
||||
assert is_finite(d, exclude)
|
||||
class TestFuzzProcesses(unittest.TestCase):
|
||||
|
||||
@parameterized.expand(TEST_CASES)
|
||||
@given(st.data())
|
||||
@settings(phases=[Phase.generate, Phase.target], max_examples=50, deadline=1000, suppress_health_check=[HealthCheck.too_slow, HealthCheck.data_too_large])
|
||||
def test_fuzz_process(self, proc_name, cfg, data):
|
||||
msgs = data.draw(FuzzyGenerator.get_random_event_msg(required=cfg.pubs, real_floats=True))
|
||||
lr = [log.Event.new_message(**m).as_reader() for m in msgs]
|
||||
cfg.timeout = 5
|
||||
pr.replay_process(cfg, lr, TOYOTA.COROLLA_TSS2, disable_progress=True)
|
||||
|
||||
if __name__ == "__main__":
|
||||
procs = {
|
||||
'locationd': TestFuzzy().test_locationd,
|
||||
'paramsd': TestFuzzy().test_paramsd,
|
||||
}
|
||||
|
||||
if len(sys.argv) != 2:
|
||||
print("Usage: ./test_fuzzy.py <process name>")
|
||||
sys.exit(0)
|
||||
|
||||
proc = sys.argv[1]
|
||||
if proc not in procs:
|
||||
print(f"{proc} not available")
|
||||
sys.exit(0)
|
||||
else:
|
||||
procs[proc]()
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user