483 lines
18 KiB
Python
483 lines
18 KiB
Python
import certifi
|
|
import ftplib
|
|
import hatanaka
|
|
import os
|
|
import urllib.request
|
|
import urllib.error
|
|
import pycurl
|
|
import re
|
|
import time
|
|
import socket
|
|
|
|
from datetime import datetime, timedelta
|
|
from urllib.parse import urlparse
|
|
from io import BytesIO
|
|
from ftplib import FTP_TLS
|
|
|
|
from atomicwrites import atomic_write
|
|
|
|
from laika.ephemeris import EphemerisType
|
|
from .constants import SECS_IN_HR, SECS_IN_DAY, SECS_IN_WEEK
|
|
from .gps_time import GPSTime, tow_to_datetime
|
|
from .helpers import ConstellationId
|
|
|
|
dir_path = os.path.dirname(os.path.realpath(__file__))
|
|
|
|
class DownloadFailed(Exception):
|
|
pass
|
|
|
|
|
|
def retryable(f):
|
|
"""
|
|
Decorator to allow us to pass multiple URLs from which to download.
|
|
Automatically retry the request with the next URL on failure
|
|
"""
|
|
def wrapped(url_bases, *args, **kwargs):
|
|
if isinstance(url_bases, str):
|
|
# only one url passed, don't do the retry thing
|
|
return f(url_bases, *args, **kwargs)
|
|
|
|
# not a string, must be a list of url_bases
|
|
for url_base in url_bases:
|
|
try:
|
|
return f(url_base, *args, **kwargs)
|
|
except DownloadFailed as e:
|
|
print(e)
|
|
# none of them succeeded
|
|
raise DownloadFailed("Multiple URL failures attempting to pull file(s)")
|
|
return wrapped
|
|
|
|
|
|
def ftp_connect(url):
|
|
parsed = urlparse(url)
|
|
assert parsed.scheme == 'ftp'
|
|
try:
|
|
domain = parsed.netloc
|
|
ftp = ftplib.FTP(domain, timeout=10)
|
|
ftp.login()
|
|
except (OSError, ftplib.error_perm):
|
|
raise DownloadFailed("Could not connect/auth to: " + domain)
|
|
try:
|
|
ftp.cwd(parsed.path)
|
|
except ftplib.error_perm:
|
|
raise DownloadFailed("Permission failure with folder: " + url)
|
|
return ftp
|
|
|
|
|
|
@retryable
|
|
def list_dir(url):
|
|
parsed = urlparse(url)
|
|
if parsed.scheme == 'ftp':
|
|
try:
|
|
ftp = ftp_connect(url)
|
|
return ftp.nlst()
|
|
except ftplib.error_perm:
|
|
raise DownloadFailed("Permission failure listing folder: " + url)
|
|
else:
|
|
# just connect and do simple url parsing
|
|
listing = https_download_file(url)
|
|
urls = re.findall(b"<a href=\"([^\"]+)\">", listing)
|
|
# decode the urls to normal strings. If they are complicated paths, ignore them
|
|
return [name.decode("latin1") for name in urls if name and b"/" not in name[1:]]
|
|
|
|
|
|
def ftp_download_files(url_base, folder_path, cacheDir, filenames):
|
|
"""
|
|
Like download file, but more of them. Keeps a persistent FTP connection open
|
|
to be more efficient.
|
|
"""
|
|
folder_path_abs = os.path.join(cacheDir, folder_path)
|
|
|
|
ftp = ftp_connect(url_base + folder_path)
|
|
|
|
filepaths = []
|
|
for filename in filenames:
|
|
# os.path.join will be dumb if filename has a leading /
|
|
# if there is a / in the filename, then it's using a different folder
|
|
filename = filename.lstrip("/")
|
|
if "/" in filename:
|
|
continue
|
|
filepath = os.path.join(folder_path_abs, filename)
|
|
print("pulling from", url_base, "to", filepath)
|
|
|
|
if not os.path.isfile(filepath):
|
|
os.makedirs(folder_path_abs, exist_ok=True)
|
|
try:
|
|
ftp.retrbinary('RETR ' + filename, open(filepath, 'wb').write)
|
|
except (ftplib.error_perm):
|
|
raise DownloadFailed("Could not download file from: " + url_base + folder_path + filename)
|
|
except (socket.timeout):
|
|
raise DownloadFailed("Read timed out from: " + url_base + folder_path + filename)
|
|
filepaths.append(filepath)
|
|
else:
|
|
filepaths.append(filepath)
|
|
return filepaths
|
|
|
|
|
|
def http_download_files(url_base, folder_path, cacheDir, filenames):
|
|
"""
|
|
Similar to ftp_download_files, attempt to download multiple files faster than
|
|
just downloading them one-by-one.
|
|
Returns a list of filepaths instead of the raw data
|
|
"""
|
|
folder_path_abs = os.path.join(cacheDir, folder_path)
|
|
|
|
def write_function(disk_path, handle):
|
|
def do_write(data):
|
|
open(disk_path, "wb").write(data)
|
|
|
|
return do_write
|
|
|
|
fetcher = pycurl.CurlMulti()
|
|
fetcher.setopt(pycurl.M_PIPELINING, 3)
|
|
fetcher.setopt(pycurl.M_MAX_HOST_CONNECTIONS, 64)
|
|
fetcher.setopt(pycurl.M_MAX_TOTAL_CONNECTIONS, 64)
|
|
filepaths = []
|
|
for filename in filenames:
|
|
# os.path.join will be dumb if filename has a leading /
|
|
# if there is a / in the filename, then it's using a different folder
|
|
filename = filename.lstrip("/")
|
|
if "/" in filename:
|
|
continue
|
|
filepath = os.path.join(folder_path_abs, filename)
|
|
if not os.path.isfile(filepath):
|
|
print("pulling from", url_base, "to", filepath)
|
|
os.makedirs(folder_path_abs, exist_ok=True)
|
|
url_path = url_base + folder_path + filename
|
|
handle = pycurl.Curl()
|
|
handle.setopt(pycurl.URL, url_path)
|
|
handle.setopt(pycurl.CONNECTTIMEOUT, 10)
|
|
handle.setopt(pycurl.WRITEFUNCTION, write_function(filepath, handle))
|
|
fetcher.add_handle(handle)
|
|
filepaths.append(filepath)
|
|
|
|
requests_processing = len(filepaths)
|
|
timeout = 10.0 # after 10 seconds of nothing happening, restart
|
|
deadline = time.time() + timeout
|
|
while requests_processing and time.time() < deadline:
|
|
while True:
|
|
ret, cur_requests_processing = fetcher.perform()
|
|
if ret != pycurl.E_CALL_MULTI_PERFORM:
|
|
_, success, failed = fetcher.info_read()
|
|
break
|
|
if requests_processing > cur_requests_processing:
|
|
deadline = time.time() + timeout
|
|
requests_processing = cur_requests_processing
|
|
|
|
if fetcher.select(1) < 0:
|
|
continue
|
|
|
|
# if there are downloads left to be done, repeat, and don't overwrite
|
|
_, requests_processing = fetcher.perform()
|
|
if requests_processing > 0:
|
|
print("some requests stalled, retrying them")
|
|
return http_download_files(url_base, folder_path, cacheDir, filenames)
|
|
|
|
return filepaths
|
|
|
|
|
|
def https_download_file(url):
|
|
crl = pycurl.Curl()
|
|
crl.setopt(crl.CAINFO, certifi.where())
|
|
crl.setopt(crl.URL, url)
|
|
crl.setopt(crl.FOLLOWLOCATION, True)
|
|
crl.setopt(crl.SSL_CIPHER_LIST, 'DEFAULT@SECLEVEL=1')
|
|
crl.setopt(crl.COOKIEJAR, '/tmp/cddis_cookies')
|
|
crl.setopt(pycurl.CONNECTTIMEOUT, 10)
|
|
|
|
buf = BytesIO()
|
|
crl.setopt(crl.WRITEDATA, buf)
|
|
crl.perform()
|
|
response = crl.getinfo(pycurl.RESPONSE_CODE)
|
|
crl.close()
|
|
|
|
if response != 200:
|
|
raise DownloadFailed('HTTPS error ' + str(response))
|
|
return buf.getvalue()
|
|
|
|
|
|
def ftp_download_file(url):
|
|
try:
|
|
urlf = urllib.request.urlopen(url, timeout=10)
|
|
data_zipped = urlf.read()
|
|
urlf.close()
|
|
return data_zipped
|
|
except urllib.error.URLError as e:
|
|
raise DownloadFailed(e)
|
|
|
|
def ftps_download_file(url):
|
|
parsed = urlparse(url)
|
|
try:
|
|
buf = BytesIO()
|
|
with FTP_TLS(parsed.hostname) as ftps:
|
|
ftps.login(user='anonymous')
|
|
ftps.prot_p()
|
|
ftps.retrbinary('RETR ' + parsed.path, buf.write)
|
|
return buf.getvalue()
|
|
except ftplib.all_errors as e:
|
|
raise DownloadFailed(e)
|
|
|
|
@retryable
|
|
def download_files(url_base, folder_path, cacheDir, filenames):
|
|
parsed = urlparse(url_base)
|
|
if parsed.scheme == 'ftp':
|
|
return ftp_download_files(url_base, folder_path, cacheDir, filenames)
|
|
else:
|
|
return http_download_files(url_base, folder_path, cacheDir, filenames)
|
|
|
|
|
|
@retryable
|
|
def download_file(url_base, folder_path, filename_zipped):
|
|
url = url_base + folder_path + filename_zipped
|
|
print('Downloading ' + url)
|
|
if url.startswith('https://'):
|
|
return https_download_file(url)
|
|
elif url.startswith('ftp://'):
|
|
return ftp_download_file(url)
|
|
elif url.startswith('sftp://'):
|
|
return ftps_download_file(url)
|
|
raise NotImplementedError('Did not find supported url scheme')
|
|
|
|
|
|
def download_and_cache_file_return_first_success(url_bases, folder_and_file_names, cache_dir, compression='', overwrite=False, raise_error=False):
|
|
last_error = None
|
|
for folder_path, filename in folder_and_file_names:
|
|
try:
|
|
file = download_and_cache_file(url_bases, folder_path, cache_dir, filename, compression, overwrite)
|
|
return file
|
|
except DownloadFailed as e:
|
|
last_error = e
|
|
|
|
if last_error and raise_error:
|
|
raise last_error
|
|
|
|
|
|
def download_and_cache_file(url_base, folder_path: str, cache_dir: str, filename: str, compression='', overwrite=False):
|
|
filename_zipped = filename + compression
|
|
folder_path_abs = os.path.join(cache_dir, folder_path)
|
|
filepath = str(hatanaka.get_decompressed_path(os.path.join(folder_path_abs, filename)))
|
|
|
|
filepath_attempt = filepath + '.attempt_time'
|
|
|
|
if os.path.exists(filepath_attempt):
|
|
with open(filepath_attempt, 'r') as rf:
|
|
last_attempt_time = float(rf.read())
|
|
if time.time() - last_attempt_time < SECS_IN_HR:
|
|
raise DownloadFailed(f"Too soon to try downloading {folder_path + filename_zipped} from {url_base} again since last attempt")
|
|
if not os.path.isfile(filepath) or overwrite:
|
|
try:
|
|
data_zipped = download_file(url_base, folder_path, filename_zipped)
|
|
except (DownloadFailed, pycurl.error, socket.timeout):
|
|
unix_time = time.time()
|
|
os.makedirs(folder_path_abs, exist_ok=True)
|
|
with atomic_write(filepath_attempt, mode='w', overwrite=True) as wf:
|
|
wf.write(str(unix_time))
|
|
raise DownloadFailed(f"Could not download {folder_path + filename_zipped} from {url_base} ")
|
|
|
|
os.makedirs(folder_path_abs, exist_ok=True)
|
|
ephem_bytes = hatanaka.decompress(data_zipped)
|
|
try:
|
|
with atomic_write(filepath, mode='wb', overwrite=overwrite) as f:
|
|
f.write(ephem_bytes)
|
|
except FileExistsError:
|
|
# Only happens when same file is downloaded in parallel by another process.
|
|
pass
|
|
return filepath
|
|
|
|
|
|
# Currently, only GPS and Glonass are supported for daily and hourly data.
|
|
CONSTELLATION_NASA_CHAR = {ConstellationId.GPS: 'n', ConstellationId.GLONASS: 'g'}
|
|
|
|
|
|
def download_nav(time: GPSTime, cache_dir, constellation: ConstellationId):
|
|
t = time.as_datetime()
|
|
try:
|
|
if constellation not in CONSTELLATION_NASA_CHAR:
|
|
return None
|
|
c = CONSTELLATION_NASA_CHAR[constellation]
|
|
if GPSTime.from_datetime(datetime.utcnow()) - time > SECS_IN_DAY:
|
|
url_bases = (
|
|
'https://github.com/commaai/gnss-data/raw/master/gnss/data/daily/',
|
|
'sftp://gdc.cddis.eosdis.nasa.gov/gnss/data/daily/',
|
|
)
|
|
filename = t.strftime(f"brdc%j0.%y{c}")
|
|
folder_path = t.strftime(f'%Y/%j/%y{c}/')
|
|
compression = '.gz' if folder_path >= '2020/335/' else '.Z'
|
|
return download_and_cache_file(url_bases, folder_path, cache_dir+'daily_nav/', filename, compression)
|
|
else:
|
|
url_bases = (
|
|
'https://github.com/commaai/gnss-data-hourly/raw/master/',
|
|
'sftp://gdc.cddis.eosdis.nasa.gov/gnss/data/hourly/',
|
|
)
|
|
times = [t, (t - timedelta(hours=1))]
|
|
folder_and_filenames = [(t.strftime('%Y/%j/'), t.strftime(f"hour%j0.%y{c}")) for t in times]
|
|
compression = '.gz' if folder_and_filenames[0][0] >= '2020/336/' else '.Z'
|
|
# always overwrite as this file is appended
|
|
return download_and_cache_file_return_first_success(url_bases,
|
|
folder_and_filenames, cache_dir+'hourly_nav/', compression, overwrite=True)
|
|
except DownloadFailed:
|
|
pass
|
|
|
|
|
|
def download_orbits_gps_cod0(time, cache_dir, ephem_types):
|
|
url_bases = (
|
|
'https://github.com/commaai/gnss-data/raw/master/gnss/products/',
|
|
'sftp://gdc.cddis.eosdis.nasa.gov/gnss/products/',
|
|
)
|
|
|
|
if EphemerisType.ULTRA_RAPID_ORBIT not in ephem_types:
|
|
# TODO: raise error here
|
|
return None
|
|
|
|
tm = tow_to_datetime(time.tow, time.week).timetuple()
|
|
doy = str(tm.tm_yday).zfill(3)
|
|
filename = f"COD0OPSULT_{tm.tm_year}{doy}0000_02D_05M_ORB.SP3"
|
|
# TODO: add hour management
|
|
|
|
folder_path = "%i/" % time.week
|
|
folder_file_names = [(folder_path, filename)]
|
|
return download_and_cache_file_return_first_success(url_bases, folder_file_names, cache_dir+'cddis_products/', compression='.gz')
|
|
|
|
|
|
def download_orbits_gps(time, cache_dir, ephem_types):
|
|
url_bases = (
|
|
'https://github.com/commaai/gnss-data/raw/master/gnss/products/',
|
|
'sftp://gdc.cddis.eosdis.nasa.gov/gnss/products/',
|
|
'ftp://igs.ign.fr/pub/igs/products/',
|
|
)
|
|
folder_path = "%i/" % time.week
|
|
filenames = []
|
|
time_str = "%i%i" % (time.week, time.day)
|
|
# Download filenames in order of quality. Final -> Rapid -> Ultra-Rapid(newest first)
|
|
if EphemerisType.FINAL_ORBIT in ephem_types and GPSTime.from_datetime(datetime.utcnow()) - time > 3 * SECS_IN_WEEK:
|
|
filenames.append(f"igs{time_str}.sp3")
|
|
if EphemerisType.RAPID_ORBIT in ephem_types:
|
|
filenames.append(f"igr{time_str}.sp3")
|
|
if EphemerisType.ULTRA_RAPID_ORBIT in ephem_types:
|
|
filenames.extend([f"igu{time_str}_18.sp3",
|
|
f"igu{time_str}_12.sp3",
|
|
f"igu{time_str}_06.sp3",
|
|
f"igu{time_str}_00.sp3"])
|
|
folder_file_names = [(folder_path, filename) for filename in filenames]
|
|
ret = download_and_cache_file_return_first_success(url_bases, folder_file_names, cache_dir+'cddis_products/', compression='.Z')
|
|
if ret is not None:
|
|
return ret
|
|
|
|
# fallback to COD0 Ultra Rapid Orbits
|
|
return download_orbits_gps_cod0(time, cache_dir, ephem_types)
|
|
|
|
|
|
def download_prediction_orbits_russia_src(gps_time, cache_dir):
|
|
# Download single file that contains Ultra_Rapid predictions for GPS, GLONASS and other constellations
|
|
t = gps_time.as_datetime()
|
|
# Files exist starting at 29-01-2022
|
|
if t < datetime(2022, 1, 29):
|
|
return None
|
|
url_bases = 'https://github.com/commaai/gnss-data-alt/raw/master/MCC/PRODUCTS/'
|
|
folder_path = t.strftime('%y%j/ultra/')
|
|
file_prefix = "Stark_1D_" + t.strftime('%y%m%d')
|
|
|
|
# Predictions are 24H so previous day can also be used.
|
|
prev_day = (t - timedelta(days=1))
|
|
file_prefix_prev = "Stark_1D_" + prev_day.strftime('%y%m%d')
|
|
folder_path_prev = prev_day.strftime('%y%j/ultra/')
|
|
|
|
current_day = GPSTime.from_datetime(datetime(t.year, t.month, t.day))
|
|
# Ultra-Orbit is published in gnss-data-alt every 10th minute past the 5,11,17,23 hour.
|
|
# Predictions published are delayed by around 10 hours.
|
|
# Download latest file that includes gps_time with 20 minutes margin.:
|
|
if gps_time > current_day + 23.5 * SECS_IN_HR:
|
|
prev_day, current_day = [], [6, 12]
|
|
elif gps_time > current_day + 17.5 * SECS_IN_HR:
|
|
prev_day, current_day = [], [0, 6]
|
|
elif gps_time > current_day + 11.5 * SECS_IN_HR:
|
|
prev_day, current_day = [18], [0]
|
|
elif gps_time > current_day + 5.5 * SECS_IN_HR:
|
|
prev_day, current_day = [12, 18], []
|
|
else:
|
|
prev_day, current_day = [6, 12], []
|
|
# Example: Stark_1D_22060100.sp3
|
|
folder_and_file_names = [(folder_path, file_prefix + f"{h:02}.sp3") for h in reversed(current_day)] + \
|
|
[(folder_path_prev, file_prefix_prev + f"{h:02}.sp3") for h in reversed(prev_day)]
|
|
return download_and_cache_file_return_first_success(url_bases, folder_and_file_names, cache_dir+'russian_products/', raise_error=True)
|
|
|
|
|
|
def download_orbits_russia_src(time, cache_dir, ephem_types):
|
|
# Orbits from russian source. Contains GPS, GLONASS, GALILEO, BEIDOU
|
|
url_bases = (
|
|
'https://github.com/commaai/gnss-data-alt/raw/master/MCC/PRODUCTS/',
|
|
'ftp://ftp.glonass-iac.ru/MCC/PRODUCTS/',
|
|
)
|
|
t = time.as_datetime()
|
|
folder_paths = []
|
|
current_gps_time = GPSTime.from_datetime(datetime.utcnow())
|
|
filename = "Sta%i%i.sp3" % (time.week, time.day)
|
|
if EphemerisType.FINAL_ORBIT in ephem_types and current_gps_time - time > 2 * SECS_IN_WEEK:
|
|
folder_paths.append(t.strftime('%y%j/final/'))
|
|
if EphemerisType.RAPID_ORBIT in ephem_types:
|
|
folder_paths.append(t.strftime('%y%j/rapid/'))
|
|
if EphemerisType.ULTRA_RAPID_ORBIT in ephem_types:
|
|
folder_paths.append(t.strftime('%y%j/ultra/'))
|
|
folder_file_names = [(folder_path, filename) for folder_path in folder_paths]
|
|
return download_and_cache_file_return_first_success(url_bases, folder_file_names, cache_dir+'russian_products/')
|
|
|
|
|
|
def download_ionex(time, cache_dir):
|
|
t = time.as_datetime()
|
|
url_bases = (
|
|
'https://github.com/commaai/gnss-data/raw/master/gnss/products/ionex/',
|
|
'sftp://gdc.cddis.eosdis.nasa.gov/gnss/products/ionex/',
|
|
'ftp://igs.ensg.ign.fr/pub/igs/products/ionosphere/',
|
|
'ftp://gssc.esa.int/gnss/products/ionex/',
|
|
)
|
|
folder_path = t.strftime('%Y/%j/')
|
|
filenames = [t.strftime("codg%j0.%yi"), t.strftime("c1pg%j0.%yi"), t.strftime("c2pg%j0.%yi")]
|
|
|
|
folder_file_names = [(folder_path, f) for f in filenames]
|
|
return download_and_cache_file_return_first_success(url_bases, folder_file_names, cache_dir+'ionex/', compression='.Z', raise_error=True)
|
|
|
|
|
|
def download_dcb(time, cache_dir):
|
|
filenames = []
|
|
folder_paths = []
|
|
url_bases = (
|
|
'https://github.com/commaai/gnss-data/raw/master/gnss/products/bias/',
|
|
'sftp://gdc.cddis.eosdis.nasa.gov/gnss/products/bias/',
|
|
'ftp://igs.ign.fr/pub/igs/products/mgex/dcb/',
|
|
)
|
|
# seem to be a lot of data missing, so try many days
|
|
for time_step in [time - i * SECS_IN_DAY for i in range(14)]:
|
|
t = time_step.as_datetime()
|
|
folder_paths.append(t.strftime('%Y/'))
|
|
filenames.append(t.strftime("CAS0MGXRAP_%Y%j0000_01D_01D_DCB.BSX"))
|
|
|
|
return download_and_cache_file_return_first_success(url_bases, list(zip(folder_paths, filenames)), cache_dir+'dcb/', compression='.gz', raise_error=True)
|
|
|
|
|
|
def download_cors_coords(cache_dir):
|
|
cache_subdir = cache_dir + 'cors_coord/'
|
|
url_bases = (
|
|
'https://geodesy.noaa.gov/corsdata/coord/coord_14/',
|
|
'https://alt.ngs.noaa.gov/corsdata/coord/coord_14/',
|
|
)
|
|
file_names = list_dir(url_bases)
|
|
file_names = [file_name for file_name in file_names if file_name.endswith('coord.txt')]
|
|
filepaths = download_files(url_bases, '', cache_subdir, file_names)
|
|
return filepaths
|
|
|
|
|
|
def download_cors_station(time, station_name, cache_dir):
|
|
t = time.as_datetime()
|
|
folder_path = t.strftime('%Y/%j/') + station_name + '/'
|
|
filename = station_name + t.strftime("%j0.%yd")
|
|
url_bases = (
|
|
'https://geodesy.noaa.gov/corsdata/rinex/',
|
|
'https://alt.ngs.noaa.gov/corsdata/rinex/',
|
|
)
|
|
try:
|
|
filepath = download_and_cache_file(url_bases, folder_path, cache_dir+'cors_obs/', filename, compression='.gz')
|
|
return filepath
|
|
except DownloadFailed:
|
|
print("File not downloaded, check availability on server.")
|
|
return None
|