247 lines
8.2 KiB
Python
247 lines
8.2 KiB
Python
import datetime as dt
|
||
import numpy as np
|
||
import re
|
||
from math import cos, sin, pi, floor
|
||
from .constants import SECS_IN_MIN, SECS_IN_HR, EARTH_RADIUS
|
||
from .lib.coordinates import LocalCoord
|
||
from .gps_time import GPSTime
|
||
|
||
# Altitude of Ionospheric-pierce-point
|
||
IPP_ALT = 6821000
|
||
|
||
|
||
def closest_in_list(lst, val, num=2):
|
||
"""
|
||
Returns two (`num` in general) closest values of `val` in list `lst`
|
||
"""
|
||
idxs = sorted(lst, key=lambda x: abs(x - val))[:num]
|
||
return sorted(list(lst).index(x) for x in idxs)
|
||
|
||
|
||
def get_header_line(headr, proprty):
|
||
"""
|
||
:param headr: the header of the RINEX-file
|
||
:param proprty: string-like property to search for (e.g. 'delta-utc')
|
||
:return: the string of the ``headr`` containing ``property``
|
||
"""
|
||
pattern = re.compile(proprty, re.IGNORECASE)
|
||
for d in headr:
|
||
if pattern.search(d):
|
||
return d
|
||
|
||
|
||
def get_header_body(file_path):
|
||
"""
|
||
Opens `file_path`, reads file and returns header and body
|
||
separated with "END OF HEADER"
|
||
:param file_path: path to RINEX-like file
|
||
:return: header, body (arrays of lines)
|
||
"""
|
||
with open(file_path) as fd:
|
||
data = fd.readlines()
|
||
for j, d in enumerate(data):
|
||
if "END OF HEADER" in d:
|
||
header_end = j
|
||
break
|
||
return data[:header_end], data[header_end + 1:]
|
||
|
||
|
||
def get_int_from_header(hdr, seq):
|
||
"""
|
||
Returns the first int from the line that contains `seq` of lines `hdr`.
|
||
In fact, _header_ here may not be header of RINEX/IONEX, just some set of lines.
|
||
"""
|
||
return int(get_header_line(hdr, seq).split()[0])
|
||
|
||
def compute_grid_lats_lons(data):
|
||
grid = np.array([], dtype='uint16')
|
||
lats = np.array([])
|
||
for j, line in enumerate(data[1:]):
|
||
if "LAT" in line:
|
||
lat, lon1, lon2, dlon, h = (float(line[x:x + 6]) for x in range(2, 32, 6))
|
||
lats = np.append(lats, lat)
|
||
row_length = (lon2 - lon1) / dlon + 1 # total number of values of longitudes
|
||
next_lines_with_numbers = int(np.ceil(row_length / 16))
|
||
elems_in_row = [
|
||
min(16, int(row_length - i * 16)) for i in range(next_lines_with_numbers)
|
||
]
|
||
row = np.array([], dtype='int16')
|
||
for i, elem in enumerate(elems_in_row):
|
||
row = np.append(
|
||
row,
|
||
np.array(
|
||
[int(data[j + 2 + i][5 * x:5 * x + 5]) for x in range(elem)],
|
||
dtype='int16',
|
||
),
|
||
)
|
||
if len(grid) > 0:
|
||
grid = np.vstack((grid, row))
|
||
else:
|
||
grid = np.append(grid, row)
|
||
lons = np.linspace(lon1, lon2, int(row_length))
|
||
return (grid, lats, lons)
|
||
|
||
|
||
class IonexMap:
|
||
def __init__(self, exp, data1, data2):
|
||
self.exp = exp
|
||
self.t1 = GPSTime.from_datetime(dt.datetime(*[int(d) for d in data1[0].split()[:6]]))
|
||
self.t2 = GPSTime.from_datetime(dt.datetime(*[int(d) for d in data2[0].split()[:6]]))
|
||
assert self.t2 - self.t1 == SECS_IN_HR
|
||
assert len(data1) == len(data2)
|
||
|
||
self.max_time_diff = SECS_IN_MIN*30
|
||
self.epoch = self.t1 + self.max_time_diff
|
||
|
||
self.grid_TEC1, self.lats, self.lons = compute_grid_lats_lons(data1)
|
||
self.grid_TEC2, self.lats, self.lons = compute_grid_lats_lons(data2)
|
||
|
||
def valid(self, time):
|
||
return abs(time - self.epoch) <= self.max_time_diff
|
||
|
||
@staticmethod
|
||
def find_nearest(lst, val):
|
||
return (np.abs(lst - val)).argmin()
|
||
|
||
def get_TEC(self, pos, time):
|
||
"""
|
||
Returns TEC in a position `pos` of ionosphere
|
||
:param pos: (lat, lon) [deg, deg]
|
||
:return:
|
||
"""
|
||
if pos[0] in self.lats and pos[1] in self.lons:
|
||
lat = self.find_nearest(self.lats, pos[0])
|
||
lon = self.find_nearest(self.lons, pos[1])
|
||
E = self.grid_TEC1[lat][lon] + self.grid_TEC2[lat][lon]
|
||
return E
|
||
lat_idxs = closest_in_list(self.lats, pos[0])
|
||
lon_idxs = closest_in_list(self.lons, pos[1])
|
||
lat0, lat1 = self.lats[lat_idxs[0]], self.lats[lat_idxs[1]]
|
||
lon0, lon1 = self.lons[lon_idxs[0]], self.lons[lon_idxs[1]]
|
||
dlat = lat1 - lat0
|
||
dlon = lon1 - lon0
|
||
p = float(pos[0] - lat0) / dlat
|
||
q = float(pos[1] - lon0) / dlon
|
||
|
||
(E00, E10), (E01, E11) = self.grid_TEC1[lat_idxs[0]:lat_idxs[1] + 1, lon_idxs[0]:lon_idxs[1] + 1]
|
||
TEC_1 = ((1 - p) * (1 - q) * E00 + p * (1 - q) * E01 + (1 - p) * q * E10 + p * q * E11)
|
||
(E00, E10), (E01, E11) = self.grid_TEC2[lat_idxs[0]:lat_idxs[1] + 1, lon_idxs[0]:lon_idxs[1] + 1]
|
||
TEC_2 = ((1 - p) * (1 - q) * E00 + p * (1 - q) * E01 + (1 - p) * q * E10 + p * q * E11)
|
||
|
||
return (1 - (time - self.t1)/SECS_IN_HR)*TEC_1 + ((time - self.t1)/SECS_IN_HR)*TEC_2
|
||
|
||
def get_delay(self, rcv_pos, az, el, sat_pos, time, freq):
|
||
# To get a delay from a TEC map, we need to calculate
|
||
# the ionospheric pierce point, geometry described here
|
||
# https://en.wikipedia.org/wiki/Ionospheric_pierce_point
|
||
conv = LocalCoord.from_ecef(rcv_pos)
|
||
geocentric_alt = np.linalg.norm(rcv_pos)
|
||
alpha = np.pi/2 + el
|
||
beta = np.arcsin(geocentric_alt*np.sin(alpha)/IPP_ALT)
|
||
gamma = np.pi - alpha - beta
|
||
ipp_dist = geocentric_alt*np.sin(gamma)/np.sin(beta)
|
||
ipp_ned = conv.ecef2ned(sat_pos)*(ipp_dist)/np.linalg.norm(sat_pos)
|
||
ipp_geo = conv.ned2geodetic(ipp_ned)
|
||
factor = 40.30E16 / (freq**2) * 10**(self.exp)
|
||
vertical_delay = self.get_TEC(ipp_geo, time) * factor
|
||
slant_delay = vertical_delay * ((1 - ((EARTH_RADIUS * np.sin(beta)) /
|
||
(EARTH_RADIUS + 3.5e5))**2)**(-0.5))
|
||
return slant_delay
|
||
|
||
@staticmethod
|
||
def round_to_grid(number, base):
|
||
return int(base * round(float(number) / base))
|
||
|
||
|
||
def parse_ionex(ionex_file):
|
||
"""
|
||
:param ionex_file: path to the IONEX file
|
||
:return: TEC interpolation function `f( (lat,lon), datetime )`
|
||
"""
|
||
header, body = get_header_body(ionex_file)
|
||
|
||
exponent = get_int_from_header(header, "EXPONENT")
|
||
maps_count = get_int_from_header(header, "MAPS IN FILE")
|
||
# =============
|
||
# Separate maps
|
||
# =============
|
||
map_start_idx = []
|
||
map_end_idx = []
|
||
|
||
for j, line in enumerate(body):
|
||
if "START OF TEC MAP" in line:
|
||
map_start_idx += [j]
|
||
elif "END OF TEC MAP" in line:
|
||
map_end_idx += [j]
|
||
if maps_count != len(map_start_idx):
|
||
raise LookupError("Parsing error: the number of maps in the header "
|
||
"is not equal to the number of maps in the body.")
|
||
if len(map_start_idx) != len(map_end_idx):
|
||
raise IndexError("Starts end ends numbers are not equal.")
|
||
map_dates = []
|
||
for i in range(maps_count):
|
||
date_components = body[map_start_idx[i] + 1].split()[:6]
|
||
map_dates.append(dt.datetime(*[int(d) for d in date_components]))
|
||
|
||
maps = []
|
||
iono_map = iono_map_prev = None
|
||
for m in range(maps_count):
|
||
iono_map_prev = iono_map
|
||
iono_map = body[map_start_idx[m] + 1:map_end_idx[m]]
|
||
if iono_map and iono_map_prev:
|
||
maps += [IonexMap(exponent, iono_map_prev, iono_map)]
|
||
return maps
|
||
|
||
|
||
def klobuchar(pos, az, el, time, iono_coeffs):
|
||
"""
|
||
Details are taken from [5]: IS-GPS-200H, Fig. 20-4
|
||
Note: result is referred to the GPS L₁ frequency;
|
||
if the user is operating on the GPS L₂ frequency, the correction term must
|
||
be multiplied by γ = f₂²/f₁¹ = 0.6071850227694382
|
||
:param pos: [lat, lon, alt] in radians and meters
|
||
"""
|
||
|
||
tow = time.tow
|
||
if pos[2] < -1E3 or el < 0:
|
||
return 0.0
|
||
if len(iono_coeffs) < 8:
|
||
return None
|
||
|
||
# earth centered angle (semi-circle)
|
||
psi = 0.0137 / (el / pi + 0.11) - 0.022
|
||
|
||
# subionospheric latitude/longitude (semi-circle)
|
||
phi = pos[0] / pi + psi * cos(az)
|
||
if phi > 0.416:
|
||
phi = 0.416
|
||
elif phi < -0.416:
|
||
phi = -0.416
|
||
lam = pos[1] / pi + psi * sin(az) / cos(phi * pi)
|
||
|
||
# geomagnetic latitude (semi-circle) */
|
||
phi += 0.064 * cos((lam - 1.617) * pi)
|
||
|
||
# local time (s)
|
||
tt = 43200.0 * lam + tow
|
||
tt -= floor(tt / 86400.0) * 86400.0 # 0<=tt<86400
|
||
|
||
# slant factor
|
||
f = 1.0 + 16.0 * pow(0.53 - el / pi, 3.0)
|
||
|
||
# ionospheric delay
|
||
amp = iono_coeffs[0] + phi * (iono_coeffs[1] + phi *
|
||
(iono_coeffs[2] + phi * iono_coeffs[3]))
|
||
per = iono_coeffs[4] + phi * (iono_coeffs[5] + phi *
|
||
(iono_coeffs[6] + phi * iono_coeffs[7]))
|
||
if amp < 0.0:
|
||
amp = 0.
|
||
if per < 72000.0:
|
||
per = 72000.0
|
||
x = 2.0 * pi * (tt - 50400.0) / per
|
||
|
||
mul = 5E-9
|
||
if abs(x) < 1.57:
|
||
mul = (5E-9 + amp * (1.0 + x * x * (-0.5 + x * x / 24.0)))
|
||
return 2.99792458E8 * f * mul
|