Files
dragonpilot/selfdrive/loggerd/uploader.py

285 lines
7.9 KiB
Python
Raw Normal View History

2016-11-29 18:34:21 -08:00
#!/usr/bin/env python
import os
2019-01-23 15:34:52 -08:00
import re
2016-11-29 18:34:21 -08:00
import time
import stat
2017-05-11 12:41:17 -07:00
import json
2016-11-29 18:34:21 -08:00
import random
import ctypes
import inspect
import requests
import traceback
import threading
2017-12-23 17:15:27 -08:00
import subprocess
2016-11-29 18:34:21 -08:00
2017-01-11 13:07:55 -08:00
from collections import Counter
2016-11-29 18:34:21 -08:00
from selfdrive.swaglog import cloudlog
2017-05-11 12:41:17 -07:00
from selfdrive.loggerd.config import ROOT
2016-11-29 18:34:21 -08:00
2017-05-11 12:41:17 -07:00
from common.params import Params
2016-11-29 18:34:21 -08:00
from common.api import api_get
2017-01-11 13:07:55 -08:00
fake_upload = os.getenv("FAKEUPLOAD") is not None
2016-11-29 18:34:21 -08:00
def raise_on_thread(t, exctype):
for ctid, tobj in threading._active.items():
if tobj is t:
tid = ctid
break
else:
raise Exception("Could not find thread")
'''Raises an exception in the threads with id tid'''
if not inspect.isclass(exctype):
raise TypeError("Only types can be raised (not instances)")
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),
ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# "if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
raise SystemError("PyThreadState_SetAsyncExc failed")
def listdir_with_creation_date(d):
lst = os.listdir(d)
for fn in lst:
try:
st = os.stat(os.path.join(d, fn))
ctime = st[stat.ST_CTIME]
yield (ctime, fn)
except OSError:
cloudlog.exception("listdir_with_creation_date: stat failed?")
yield (None, fn)
def listdir_by_creation_date(d):
times_and_paths = list(listdir_with_creation_date(d))
return [path for _, path in sorted(times_and_paths)]
def clear_locks(root):
for logname in os.listdir(root):
path = os.path.join(root, logname)
try:
for fname in os.listdir(path):
if fname.endswith(".lock"):
os.unlink(os.path.join(path, fname))
except OSError:
cloudlog.exception("clear_locks failed")
2017-12-23 17:15:27 -08:00
def is_on_wifi():
# ConnectivityManager.getActiveNetworkInfo()
2018-06-25 13:48:52 -07:00
try:
result = subprocess.check_output(["service", "call", "connectivity", "2"]).strip().split("\n")
except subprocess.CalledProcessError:
return False
2018-04-28 09:44:39 +00:00
data = ''.join(''.join(w.decode("hex")[::-1] for w in l[14:49].split()) for l in result[1:])
2017-12-23 17:15:27 -08:00
return "\x00".join("WIFI") in data
2019-01-23 15:34:52 -08:00
def is_on_hotspot():
try:
result = subprocess.check_output(["ifconfig", "wlan0"])
result = re.findall(r"inet addr:((\d+\.){3}\d+)", result)[0][0]
is_android = result.startswith('192.168.43.')
is_ios = result.startswith('172.20.10.')
is_entune = result.startswith('10.0.2.')
return (is_android or is_ios or is_entune)
2019-01-23 15:34:52 -08:00
except:
return False
2016-11-29 18:34:21 -08:00
class Uploader(object):
2017-05-11 12:41:17 -07:00
def __init__(self, dongle_id, access_token, root):
2016-11-29 18:34:21 -08:00
self.dongle_id = dongle_id
2017-05-11 12:41:17 -07:00
self.access_token = access_token
2016-11-29 18:34:21 -08:00
self.root = root
self.upload_thread = None
self.last_resp = None
self.last_exc = None
def clean_dirs(self):
try:
for logname in os.listdir(self.root):
path = os.path.join(self.root, logname)
# remove empty directories
if not os.listdir(path):
os.rmdir(path)
except OSError:
cloudlog.exception("clean_dirs failed")
def gen_upload_files(self):
2016-12-12 17:47:46 -08:00
if not os.path.isdir(self.root):
return
2016-11-29 18:34:21 -08:00
for logname in listdir_by_creation_date(self.root):
path = os.path.join(self.root, logname)
2019-05-16 13:20:29 -07:00
try:
names = os.listdir(path)
except OSError:
continue
2016-11-29 18:34:21 -08:00
if any(name.endswith(".lock") for name in names):
continue
for name in names:
key = os.path.join(logname, name)
fn = os.path.join(path, name)
yield (name, key, fn)
2017-01-11 13:07:55 -08:00
def get_data_stats(self):
name_counts = Counter()
total_size = 0
for name, key, fn in self.gen_upload_files():
name_counts[name] += 1
total_size += os.stat(fn).st_size
return dict(name_counts), total_size
2019-06-28 21:11:30 +00:00
def next_file_to_upload(self, with_raw):
# try to upload qlog files first
for name, key, fn in self.gen_upload_files():
2019-06-28 21:11:30 +00:00
if name == "qlog.bz2":
return (key, fn, 0)
2019-06-28 21:11:30 +00:00
if with_raw:
# then upload log files
for name, key, fn in self.gen_upload_files():
if name == "rlog.bz2":
return (key, fn, 1)
2016-11-29 18:34:21 -08:00
2019-06-28 21:11:30 +00:00
# then upload rear and front camera files
2017-12-23 17:15:27 -08:00
for name, key, fn in self.gen_upload_files():
2018-04-28 09:44:39 +00:00
if name == "fcamera.hevc":
return (key, fn, 2)
2019-06-28 21:11:30 +00:00
elif name == "dcamera.hevc":
return (key, fn, 3)
2017-07-28 01:24:39 -07:00
2017-12-23 17:15:27 -08:00
# then upload other files
for name, key, fn in self.gen_upload_files():
if not name.endswith('.lock') and not name.endswith(".tmp"):
2019-06-28 21:11:30 +00:00
return (key, fn, 4)
2016-11-29 18:34:21 -08:00
return None
def do_upload(self, key, fn):
try:
2019-07-22 19:17:47 +00:00
url_resp = api_get("v1.2/"+self.dongle_id+"/upload_url/", timeout=10, path=key, access_token=self.access_token)
2017-05-11 12:41:17 -07:00
url_resp_json = json.loads(url_resp.text)
url = url_resp_json['url']
headers = url_resp_json['headers']
2018-05-23 03:59:04 +00:00
cloudlog.info("upload_url v1.2 %s %s", url, str(headers))
2016-11-29 18:34:21 -08:00
2017-01-11 13:07:55 -08:00
if fake_upload:
2017-06-28 13:57:09 -07:00
cloudlog.info("*** WARNING, THIS IS A FAKE UPLOAD TO %s ***" % url)
2017-01-11 13:07:55 -08:00
class FakeResponse(object):
def __init__(self):
self.status_code = 200
self.last_resp = FakeResponse()
else:
with open(fn, "rb") as f:
2018-04-14 06:10:58 +00:00
self.last_resp = requests.put(url, data=f, headers=headers, timeout=10)
2016-11-29 18:34:21 -08:00
except Exception as e:
self.last_exc = (e, traceback.format_exc())
raise
def normal_upload(self, key, fn):
self.last_resp = None
self.last_exc = None
try:
self.do_upload(key, fn)
except Exception:
pass
return self.last_resp
def upload(self, key, fn):
2016-11-29 18:34:21 -08:00
try:
sz = os.path.getsize(fn)
except OSError:
cloudlog.exception("upload: getsize failed")
return False
cloudlog.event("upload", key=key, fn=fn, sz=sz)
cloudlog.info("checking %r with size %r", key, sz)
if sz == 0:
# can't upload files of 0 size
os.unlink(fn) # delete the file
success = True
else:
cloudlog.info("uploading %r", fn)
stat = self.normal_upload(key, fn)
2017-05-22 22:26:12 -07:00
if stat is not None and stat.status_code in (200, 201):
2016-11-29 18:34:21 -08:00
cloudlog.event("upload_success", key=key, fn=fn, sz=sz)
2019-06-28 21:11:30 +00:00
# delete the file
try:
os.unlink(fn)
except OSError:
2019-07-22 19:17:47 +00:00
cloudlog.event("delete_failed", stat=stat, exc=self.last_exc, key=key, fn=fn, sz=sz)
2019-06-28 21:11:30 +00:00
2016-11-29 18:34:21 -08:00
success = True
else:
cloudlog.event("upload_failed", stat=stat, exc=self.last_exc, key=key, fn=fn, sz=sz)
success = False
self.clean_dirs()
return success
def uploader_fn(exit_event):
cloudlog.info("uploader_fn")
2017-05-11 12:41:17 -07:00
params = Params()
dongle_id, access_token = params.get("DongleId"), params.get("AccessToken")
2017-01-09 20:59:00 -08:00
2017-05-11 12:41:17 -07:00
if dongle_id is None or access_token is None:
cloudlog.info("uploader MISSING DONGLE_ID or ACCESS_TOKEN")
raise Exception("uploader can't start without dongle id and access token")
2017-01-09 20:59:00 -08:00
2017-05-11 12:41:17 -07:00
uploader = Uploader(dongle_id, access_token, ROOT)
2016-11-29 18:34:21 -08:00
2018-02-10 09:31:56 -06:00
backoff = 0.1
2016-11-29 18:34:21 -08:00
while True:
2019-06-28 21:11:30 +00:00
allow_raw_upload = (params.get("IsUploadRawEnabled") != "0")
2019-01-23 15:34:52 -08:00
allow_cellular = (params.get("IsUploadVideoOverCellularEnabled") != "0")
on_hotspot = is_on_hotspot()
on_wifi = is_on_wifi()
should_upload = allow_cellular or (on_wifi and not on_hotspot)
2017-12-23 17:15:27 -08:00
2018-02-10 09:31:56 -06:00
if exit_event.is_set():
return
2016-11-29 18:34:21 -08:00
2019-06-28 21:11:30 +00:00
d = uploader.next_file_to_upload(with_raw=allow_raw_upload and should_upload)
2018-02-10 09:31:56 -06:00
if d is None:
time.sleep(5)
continue
2016-11-29 18:34:21 -08:00
2018-02-10 09:31:56 -06:00
key, fn, _ = d
2016-11-29 18:34:21 -08:00
2019-01-23 15:34:52 -08:00
cloudlog.event("uploader_netcheck", allow_cellular=allow_cellular, is_on_hotspot=on_hotspot, is_on_wifi=on_wifi)
2018-02-10 09:31:56 -06:00
cloudlog.info("to upload %r", d)
success = uploader.upload(key, fn)
if success:
backoff = 0.1
else:
cloudlog.info("backoff %r", backoff)
time.sleep(backoff + random.uniform(0, backoff))
backoff = min(backoff*2, 120)
cloudlog.info("upload done, success=%r", success)
2016-11-29 18:34:21 -08:00
def main(gctx=None):
uploader_fn(threading.Event())
if __name__ == "__main__":
main()