Files
sunnypilot/selfdrive/test/helpers.py
Willem Melching 2e182e5c57 Params refactor, simplified (#2300)
* always c++

* Create C++ params class

* get works

* tests hang now

* passes tests

* cleanup string formatting

* handle interrupt in blocking read

* fix memory leak

* remove unused constructor

* Use delete_db_value directly

* Rename put -> write_db_value

* filename cleanup

* no semicolons in cython

* Update common/SConscript

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>

* add std::string version of delete_db_value

* This is handled

* cleanup encoding

* Add clear method to clear all

* add persistent params

* fix android build

* Should be called clear_all

* only import params when needed

* set params path on manager import

* recusrively create directories

* Fix function order

* cleanup mkdirp

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
Co-authored-by: Comma Device <device@comma.ai>
2020-10-13 16:23:23 +02:00

72 lines
2.0 KiB
Python

import time
import subprocess
from functools import wraps
from nose.tools import nottest
from common.hardware import PC
from common.apk import update_apks, start_offroad, pm_apply_packages, android_packages
from selfdrive.version import training_version, terms_version
from selfdrive.manager import start_managed_process, kill_managed_process, get_running
def set_params_enabled():
from common.params import Params
params = Params()
params.put("HasAcceptedTerms", terms_version)
params.put("HasCompletedSetup", "1")
params.put("OpenpilotEnabledToggle", "1")
params.put("CommunityFeaturesToggle", "1")
params.put("Passive", "0")
params.put("CompletedTrainingVersion", training_version)
def phone_only(x):
if PC:
return nottest(x)
else:
return x
def with_processes(processes, init_time=0):
def wrapper(func):
@wraps(func)
def wrap(*args, **kwargs):
# start and assert started
for n, p in enumerate(processes):
start_managed_process(p)
if n < len(processes)-1:
time.sleep(init_time)
assert all(get_running()[name].exitcode is None for name in processes)
# call the function
try:
func(*args, **kwargs)
# assert processes are still started
assert all(get_running()[name].exitcode is None for name in processes)
finally:
# kill and assert all stopped
for p in processes:
kill_managed_process(p)
assert len(get_running()) == 0
return wrap
return wrapper
def with_apks():
def wrapper(func):
@wraps(func)
def wrap():
update_apks()
pm_apply_packages('enable')
start_offroad()
func()
try:
for package in android_packages:
apk_is_running = (subprocess.call(["pidof", package]) == 0)
assert apk_is_running, package
finally:
pm_apply_packages('disable')
for package in android_packages:
apk_is_not_running = (subprocess.call(["pidof", package]) == 1)
assert apk_is_not_running, package
return wrap
return wrapper