mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-02-27 13:03:52 +08:00
* agnos updater * add manifest * fix path * get manifest from overlay * update manifest * remove merge markers * add streaming decompressor * dont need read all * Unsparsify * Fix output filename * Optimization * cleanup * Small cleanup * Read manifest from merged overlay * Write hash at end of partition * Sync before writing hash * Write bytes in file * add manifest with image sizes * Fix manifest path * File was closed already * Format string * Put raw hash * Read hashes in launch script * update launch script * should be agnos version * fix slot * Make sure we clear the hash * Verify partition size * move updated * Standalone flasher * Don't rely on ordering * Get path * Debug log * Download agnos * Info is enough * update manifest * Remove f * Check downloader return code * Exit on wrong manifest * Fix typos * Set pythonpath before hardware init * move agnos into hardware folder * remove comments * Fix abstractmethod Co-authored-by: Comma Device <device@comma.ai> Co-authored-by: Willem Melching <willem.melching@gmail.com>
53 lines
1.3 KiB
Python
53 lines
1.3 KiB
Python
import os
|
|
import subprocess
|
|
from common.basedir import BASEDIR
|
|
|
|
|
|
class Spinner():
|
|
def __init__(self):
|
|
try:
|
|
self.spinner_proc = subprocess.Popen(["./spinner"],
|
|
stdin=subprocess.PIPE,
|
|
cwd=os.path.join(BASEDIR, "selfdrive", "ui"),
|
|
close_fds=True)
|
|
except OSError:
|
|
self.spinner_proc = None
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def update(self, spinner_text):
|
|
if self.spinner_proc is not None:
|
|
self.spinner_proc.stdin.write(spinner_text.encode('utf8') + b"\n")
|
|
try:
|
|
self.spinner_proc.stdin.flush()
|
|
except BrokenPipeError:
|
|
pass
|
|
|
|
def update_progress(self, cur, total):
|
|
self.update(str(int(100 * cur / total)))
|
|
|
|
def close(self):
|
|
if self.spinner_proc is not None:
|
|
try:
|
|
self.spinner_proc.stdin.close()
|
|
except BrokenPipeError:
|
|
pass
|
|
self.spinner_proc.terminate()
|
|
self.spinner_proc = None
|
|
|
|
def __del__(self):
|
|
self.close()
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
self.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import time
|
|
with Spinner() as s:
|
|
s.update("Spinner text")
|
|
time.sleep(5.0)
|
|
print("gone")
|
|
time.sleep(5.0)
|