Files
sunnypilot/selfdrive/test/helpers.py
Adeeb Shihadeh 7feae28705 run onroad tests on release build (#22700)
* check startup aelrt

* release decorator

* run in jenkins

* run onroad tests

* no push

* fix release2 build

* fix path

* no overwrite

* single release build script

* make files_eon non empty

* files

* run in source

* add that for now

* print

* ignore initialzing

* print

* fetch

* run tests last

* check alert text

* revert some stuff

* fixup jenkinsfile

Co-authored-by: Comma Device <device@comma.ai>
2021-10-26 23:50:05 -07:00

60 lines
1.7 KiB
Python

import os
import time
from functools import wraps
from selfdrive.hardware import PC
from selfdrive.manager.process_config import managed_processes
from selfdrive.version import training_version, terms_version
def set_params_enabled():
from common.params import Params
params = Params()
params.put("HasAcceptedTerms", terms_version)
params.put("CompletedTrainingVersion", training_version)
params.put_bool("OpenpilotEnabledToggle", True)
params.put_bool("CommunityFeaturesToggle", True)
params.put_bool("Passive", False)
def phone_only(f):
@wraps(f)
def wrap(self, *args, **kwargs):
if PC:
self.skipTest("This test is not meant to run on PC")
f(self, *args, **kwargs)
return wrap
def release_only(f):
@wraps(f)
def wrap(self, *args, **kwargs):
if "RELEASE" not in os.environ:
self.skipTest("This test is only for release branches")
f(self, *args, **kwargs)
return wrap
def with_processes(processes, init_time=0, ignore_stopped=None):
ignore_stopped = [] if ignore_stopped is None else ignore_stopped
def wrapper(func):
@wraps(func)
def wrap(*args, **kwargs):
# start and assert started
for n, p in enumerate(processes):
managed_processes[p].start()
if n < len(processes) - 1:
time.sleep(init_time)
assert all(managed_processes[name].proc.exitcode is None for name in processes)
# call the function
try:
func(*args, **kwargs)
# assert processes are still started
assert all(managed_processes[name].proc.exitcode is None for name in processes if name not in ignore_stopped)
finally:
for p in processes:
managed_processes[p].stop()
return wrap
return wrapper