53 lines
1.3 KiB
Python
53 lines
1.3 KiB
Python
|
import os
|
||
|
import subprocess
|
||
|
from openpilot.common.basedir import BASEDIR
|
||
|
|
||
|
|
||
|
class Spinner:
|
||
|
def __init__(self):
|
||
|
try:
|
||
|
self.spinner_proc = subprocess.Popen(["./spinner"],
|
||
|
stdin=subprocess.PIPE,
|
||
|
cwd=os.path.join(BASEDIR, "selfdrive", "ui"),
|
||
|
close_fds=True)
|
||
|
except OSError:
|
||
|
self.spinner_proc = None
|
||
|
|
||
|
def __enter__(self):
|
||
|
return self
|
||
|
|
||
|
def update(self, spinner_text: str):
|
||
|
if self.spinner_proc is not None:
|
||
|
self.spinner_proc.stdin.write(spinner_text.encode('utf8') + b"\n")
|
||
|
try:
|
||
|
self.spinner_proc.stdin.flush()
|
||
|
except BrokenPipeError:
|
||
|
pass
|
||
|
|
||
|
def update_progress(self, cur: float, total: float):
|
||
|
self.update(str(round(100 * cur / total)))
|
||
|
|
||
|
def close(self):
|
||
|
if self.spinner_proc is not None:
|
||
|
self.spinner_proc.kill()
|
||
|
try:
|
||
|
self.spinner_proc.communicate(timeout=2.)
|
||
|
except subprocess.TimeoutExpired:
|
||
|
print("WARNING: failed to kill spinner")
|
||
|
self.spinner_proc = None
|
||
|
|
||
|
def __del__(self):
|
||
|
self.close()
|
||
|
|
||
|
def __exit__(self, exc_type, exc_value, traceback):
|
||
|
self.close()
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
import time
|
||
|
with Spinner() as s:
|
||
|
s.update("Spinner text")
|
||
|
time.sleep(5.0)
|
||
|
print("gone")
|
||
|
time.sleep(5.0)
|