447 lines
19 KiB
Python
447 lines
19 KiB
Python
import numpy as np
|
|
|
|
import ephemeris
|
|
from geoFunctions import satpos, leastSquarePos, cart2geo, findUtmZone, cart2utm
|
|
from initialize import Result
|
|
import shutil
|
|
|
|
|
|
|
|
class NavigationResult(Result):
|
|
def __init__(self, trackResult):
|
|
self._results = trackResult.results
|
|
self._channels = trackResult.channels
|
|
self._settings = trackResult.settings
|
|
self._solutions = None
|
|
self._eph = None
|
|
|
|
@property
|
|
def solutions(self):
|
|
assert isinstance(self._solutions, np.recarray)
|
|
return self._solutions
|
|
|
|
@property
|
|
def ephemeris(self):
|
|
# Không assert nhầm sang _solutions nữa
|
|
assert isinstance(self._solutions, np.recarray)
|
|
return self._eph
|
|
|
|
# ./calculatePseudoranges.m
|
|
def calculatePseudoranges(self, msOfTheSignal, channelList):
|
|
trackResults = self._results
|
|
settings = self._settings
|
|
# calculatePseudoranges finds relative pseudoranges for all satellites
|
|
# listed in CHANNELLIST at the specified millisecond of the processed
|
|
# signal. The pseudoranges contain unknown receiver clock offset. It can be
|
|
# found by the least squares position search procedure.
|
|
|
|
# --- Set initial travel time to infinity ----------------------------------
|
|
travelTime = np.inf * np.ones(settings.numberOfChannels)
|
|
|
|
# Find number of samples per spreading code
|
|
samplesPerCode = int(settings.samplesPerCode)
|
|
|
|
# --- For all channels in the list ...
|
|
for ch in channelList:
|
|
# Ép chỉ số ms về int
|
|
ms_idx = int(msOfTheSignal[ch])
|
|
travelTime[ch] = (
|
|
self._results[ch].absoluteSample[ms_idx] / samplesPerCode
|
|
)
|
|
|
|
# --- Truncate the travelTime and compute pseudoranges ---------------------
|
|
minimum = np.floor(np.min(travelTime[channelList])) # thay vì toàn bộ
|
|
|
|
travelTime = travelTime - minimum + settings.startOffset
|
|
|
|
# --- Convert travel time to a distance ------------------------------------
|
|
# The speed of light must be converted from meters per second to meters
|
|
# per millisecond.
|
|
pseudoranges = travelTime * settings.c / 1000.0
|
|
return pseudoranges
|
|
|
|
# ./postNavigation.m
|
|
def postNavigate(self):
|
|
trackResults = self._results
|
|
settings = self._settings
|
|
# Check minimal requirements
|
|
if settings.msToProcess < 36000.0 or np.sum(trackResults.status.astype('U') != '-') < 4:
|
|
# do something
|
|
print('Record is too short or too few satellites tracked. Exiting!')
|
|
self._solutions = None
|
|
self._eph = None
|
|
return
|
|
|
|
# Find preamble start positions
|
|
subFrameStart, activeChnList = self.findPreambles()
|
|
|
|
# Decode ephemerides
|
|
field_str = 'weekNumber,accuracy,health,T_GD,IODC,t_oc,a_f2,a_f1,a_f0,' \
|
|
'IODE_sf2,C_rs,deltan,M_0,C_uc,e,C_us,sqrtA,t_oe,' \
|
|
'C_ic,omega_0,C_is,i_0,C_rc,omega,omegaDot,IODE_sf3,iDot'
|
|
eph = np.recarray((32,), formats=['O'] * 27, names=field_str)
|
|
|
|
TOW = None
|
|
for ch in activeChnList:
|
|
# === Convert tracking output to navigation bits =======================
|
|
# Copy 5 sub-frames long record
|
|
start = int(subFrameStart[ch] - 20)
|
|
stop = int(subFrameStart[ch] + 1500 * 20)
|
|
|
|
# Bảo vệ biên
|
|
start = max(0, start)
|
|
stop = min(stop, trackResults[ch].I_P.size)
|
|
if stop - start < 20 * 1500:
|
|
# Không đủ dữ liệu cho 5 subframes
|
|
continue
|
|
|
|
navBitsSamples = trackResults[ch].I_P[start:stop].copy()
|
|
# reshape 20 x N (sum theo cột)
|
|
navBitsSamples = navBitsSamples.reshape(20, -1, order='F')
|
|
navBits = navBitsSamples.sum(0)
|
|
navBits = (navBits > 0).astype(int)
|
|
|
|
# Binary string array (indexable)
|
|
navBitsBin = navBits.astype(str)
|
|
|
|
eph[trackResults[ch].PRN - 1], TOW = ephemeris.ephemeris(navBitsBin[1:], navBitsBin[0])
|
|
|
|
if (eph[trackResults[ch].PRN - 1].IODC is None or
|
|
eph[trackResults[ch].PRN - 1].IODE_sf2 is None or
|
|
eph[trackResults[ch].PRN - 1].IODE_sf3 is None):
|
|
# Exclude channel from the list (from further processing)
|
|
activeChnList = np.setdiff1d(activeChnList, ch)
|
|
|
|
# Need >= 4 SV for 3D position
|
|
if activeChnList.size < 4:
|
|
print('Too few satellites with ephemeris data for position calculations. Exiting!')
|
|
self._solutions = None
|
|
self._eph = None
|
|
return
|
|
|
|
# Initialization
|
|
satElev = np.inf * np.ones(settings.numberOfChannels) # include all SV for first fix
|
|
readyChnList = activeChnList.copy()
|
|
transmitTime = float(TOW) if TOW is not None else 0.0
|
|
|
|
total_ms = float(settings.msToProcess - int(np.max(subFrameStart)))
|
|
num_iter = int(np.fix(total_ms) / settings.navSolPeriod)
|
|
# num_iter = 64
|
|
|
|
channel = np.rec.array([(np.zeros((settings.numberOfChannels, num_iter)),
|
|
np.nan * np.ones((settings.numberOfChannels, num_iter)),
|
|
np.nan * np.ones((settings.numberOfChannels, num_iter)),
|
|
np.nan * np.ones((settings.numberOfChannels, num_iter)),
|
|
np.nan * np.ones((settings.numberOfChannels, num_iter))
|
|
)], formats=['O'] * 5, names='PRN,el,az,rawP,correctedP')
|
|
|
|
navSolutions = np.rec.array([(channel,
|
|
np.zeros((5, num_iter)),
|
|
np.nan * np.ones(num_iter),
|
|
np.nan * np.ones(num_iter),
|
|
np.nan * np.ones(num_iter),
|
|
np.nan * np.ones(num_iter),
|
|
np.nan * np.ones(num_iter),
|
|
np.nan * np.ones(num_iter),
|
|
np.nan * np.ones(num_iter),
|
|
0,
|
|
np.nan * np.ones(num_iter),
|
|
np.nan * np.ones(num_iter),
|
|
np.nan * np.ones(num_iter)
|
|
)], formats=['O'] * 13,
|
|
names='channel,DOP,X,Y,Z,dt,latitude,longitude,height,utmZone,E,N,U')
|
|
|
|
for currMeasNr in range(num_iter):
|
|
# apply elevation mask
|
|
activeChnList = np.intersect1d((satElev >= settings.elevationMask).nonzero()[0], readyChnList)
|
|
|
|
# PRN list
|
|
channel[0].PRN[activeChnList, currMeasNr] = trackResults[activeChnList].PRN
|
|
|
|
# Pseudoranges
|
|
ms_vec = (subFrameStart + settings.navSolPeriod * currMeasNr).astype(int)
|
|
channel[0].rawP[:, currMeasNr] = self.calculatePseudoranges(ms_vec, activeChnList)
|
|
|
|
# Satellite positions and clock corrections
|
|
satPositions, satClkCorr = satpos(transmitTime, trackResults[activeChnList].PRN, eph, settings)
|
|
|
|
# Receiver position (need >= 4)
|
|
if activeChnList.size > 3:
|
|
(xyzdt,
|
|
channel[0].el[activeChnList, currMeasNr],
|
|
channel[0].az[activeChnList, currMeasNr],
|
|
navSolutions[0].DOP[:, currMeasNr]) = leastSquarePos(
|
|
satPositions,
|
|
channel[0].rawP[activeChnList, currMeasNr] + satClkCorr * settings.c,
|
|
settings
|
|
)
|
|
|
|
navSolutions[0].X[currMeasNr] = xyzdt[0]
|
|
navSolutions[0].Y[currMeasNr] = xyzdt[1]
|
|
navSolutions[0].Z[currMeasNr] = xyzdt[2]
|
|
navSolutions[0].dt[currMeasNr] = xyzdt[3]
|
|
|
|
satElev = channel[0].el[:, currMeasNr]
|
|
|
|
channel[0].correctedP[activeChnList, currMeasNr] = (
|
|
channel[0].rawP[activeChnList, currMeasNr] +
|
|
satClkCorr * settings.c +
|
|
navSolutions[0].dt[currMeasNr]
|
|
)
|
|
|
|
# ECEF -> Geodetic
|
|
(navSolutions[0].latitude[currMeasNr],
|
|
navSolutions[0].longitude[currMeasNr],
|
|
navSolutions[0].height[currMeasNr]) = cart2geo(
|
|
navSolutions[0].X[currMeasNr],
|
|
navSolutions[0].Y[currMeasNr],
|
|
navSolutions[0].Z[currMeasNr],
|
|
4
|
|
)
|
|
|
|
navSolutions[0].utmZone = findUtmZone(
|
|
navSolutions[0].latitude[currMeasNr],
|
|
navSolutions[0].longitude[currMeasNr]
|
|
)
|
|
|
|
(navSolutions[0].E[currMeasNr],
|
|
navSolutions[0].N[currMeasNr],
|
|
navSolutions[0].U[currMeasNr]) = cart2utm(
|
|
xyzdt[0], xyzdt[1], xyzdt[2], navSolutions[0].utmZone
|
|
)
|
|
|
|
else:
|
|
print(' Measurement No. %d: Not enough information for position solution.' % currMeasNr)
|
|
navSolutions[0].X[currMeasNr] = np.nan
|
|
navSolutions[0].Y[currMeasNr] = np.nan
|
|
navSolutions[0].Z[currMeasNr] = np.nan
|
|
navSolutions[0].dt[currMeasNr] = np.nan
|
|
navSolutions[0].DOP[:, currMeasNr] = np.zeros(5)
|
|
navSolutions[0].latitude[currMeasNr] = np.nan
|
|
navSolutions[0].longitude[currMeasNr] = np.nan
|
|
navSolutions[0].height[currMeasNr] = np.nan
|
|
navSolutions[0].E[currMeasNr] = np.nan
|
|
navSolutions[0].N[currMeasNr] = np.nan
|
|
navSolutions[0].U[currMeasNr] = np.nan
|
|
channel[0].az[activeChnList, currMeasNr] = np.nan * np.ones(activeChnList.shape)
|
|
channel[0].el[activeChnList, currMeasNr] = np.nan * np.ones(activeChnList.shape)
|
|
|
|
# Update TOW
|
|
transmitTime += settings.navSolPeriod / 1000.0
|
|
|
|
self._solutions = navSolutions
|
|
self._eph = eph
|
|
return
|
|
|
|
def plot(self):
|
|
settings = self._settings
|
|
navSolutions = self._solutions
|
|
# assert isinstance(navSolutions, np.recarray)
|
|
|
|
import matplotlib as mpl
|
|
import matplotlib.gridspec as gs
|
|
import matplotlib.pyplot as plt
|
|
from mpl_toolkits.mplot3d import Axes3D # noqa: F401
|
|
import initialize
|
|
|
|
mpl.rcdefaults()
|
|
mpl.rc('savefig', bbox='tight', transparent=False, format='png')
|
|
mpl.rc('axes', grid=True, linewidth=1.5, axisbelow=True)
|
|
mpl.rc('lines', linewidth=1.5, solid_joinstyle='bevel')
|
|
mpl.rc('figure', figsize=[8, 6], autolayout=False, dpi=120)
|
|
|
|
# Dùng LaTeX nếu có sẵn, không thì fallback sang mathtext
|
|
if shutil.which("latex"):
|
|
mpl.rc('text', usetex=True)
|
|
else:
|
|
mpl.rc('text', usetex=False)
|
|
mpl.rc('mathtext', fontset='cm') # hiển thị công thức toán kiểu Computer Modern
|
|
mpl.rc('font', family='serif', serif='Computer Modern Roman', size=10)
|
|
|
|
if navSolutions is not None:
|
|
refCoord = initialize.TruePosition()
|
|
# Nếu không có tọa độ tham chiếu, dùng trung bình
|
|
if (settings.truePosition.E is None or
|
|
settings.truePosition.N is None or
|
|
settings.truePosition.U is None):
|
|
|
|
refCoord.E = np.nanmean(navSolutions[0].E)
|
|
refCoord.N = np.nanmean(navSolutions[0].N)
|
|
refCoord.U = np.nanmean(navSolutions[0].U)
|
|
|
|
meanLongitude = np.nanmean(navSolutions[0].longitude)
|
|
meanLatitude = np.nanmean(navSolutions[0].latitude)
|
|
|
|
refPointLgText = 'Mean Position' + '\\newline Lat: %.5f $^\\circ$' % meanLatitude + \
|
|
'\\newline Lng: %.5f $^\\circ$' % meanLongitude + \
|
|
'\\newline Hgt: %+6.1f' % np.nanmean(navSolutions[0].height)
|
|
|
|
else:
|
|
refPointLgText = 'Reference Position'
|
|
refCoord.E = settings.truePosition.E
|
|
refCoord.N = settings.truePosition.N
|
|
refCoord.U = settings.truePosition.U
|
|
|
|
figureNumber = 300
|
|
f = plt.figure(figureNumber)
|
|
f.clf()
|
|
f.set_label('Navigation solutions')
|
|
spec = gs.GridSpec(2, 2)
|
|
h11 = plt.subplot(spec[0:2])
|
|
h31 = plt.subplot(spec[2], projection='3d')
|
|
h32 = plt.subplot(spec[3], projection='polar')
|
|
|
|
# ENU variations
|
|
h11.plot(navSolutions[0].E - refCoord.E,
|
|
navSolutions[0].N - refCoord.N,
|
|
navSolutions[0].U - refCoord.U, '-')
|
|
h11.legend(['E', 'N', 'U'])
|
|
h11.set(title='Coordinates variations in UTM system',
|
|
xlabel='Measurement period: %i ms' % settings.navSolPeriod,
|
|
ylabel='Variations (m)')
|
|
h11.grid(True)
|
|
h11.axis('tight')
|
|
|
|
# 3D plot
|
|
h31.plot((navSolutions[0].E - refCoord.E).T,
|
|
(navSolutions[0].N - refCoord.N).T,
|
|
(navSolutions[0].U - refCoord.U).T, '+')
|
|
h31.plot([0], [0], [0], 'r+', lw=1.5, ms=10)
|
|
h31.set_box_aspect((1, 1, 1))
|
|
h31.grid(True)
|
|
h31.legend(['Measurements', refPointLgText])
|
|
h31.set(title='Positions in UTM system (3D plot)',
|
|
xlabel='East (m)',
|
|
ylabel='North (m)',
|
|
zlabel='Upping (m)')
|
|
|
|
# Sky plot
|
|
az = np.deg2rad(navSolutions[0].channel[0].az.T)
|
|
el = navSolutions[0].channel[0].el.T
|
|
h32.plot(az, 90 - el)
|
|
# PRN labels tại epoch 0
|
|
az0 = np.deg2rad(navSolutions[0].channel[0].az[:, 0])
|
|
el0 = navSolutions[0].channel[0].el[:, 0]
|
|
prn0 = navSolutions[0].channel[0].PRN[:, 0]
|
|
for x, y, s in zip(az0, 90 - el0, prn0):
|
|
if np.isfinite(x) and np.isfinite(y) and s != 0:
|
|
h32.text(x, y, str(s))
|
|
h32.set_theta_direction(-1)
|
|
h32.set_theta_zero_location('N')
|
|
h32.set_xlim([0, 2 * np.pi])
|
|
h32.set_xticks(np.linspace(0, 2 * np.pi, 12, endpoint=False))
|
|
h32.set_rlabel_position(0)
|
|
h32.set_ylim([0, 90])
|
|
h32.set_yticks([0, 15, 30, 45, 60, 75])
|
|
h32.set_yticklabels([90, 75, 60, 45, 30, 15])
|
|
h32.set_title('Sky plot (mean PDOP: %f )' % np.nanmean(navSolutions[0].DOP[1, :]))
|
|
plt.show()
|
|
else:
|
|
print('plotNavigation: No navigation data to plot.')
|
|
|
|
@staticmethod
|
|
# navPartyChk.m
|
|
def navPartyChk(ndat):
|
|
# Parity check cho một từ GPS (32 bits: 2 prev + 30 curr)
|
|
if ndat[1] != 1:
|
|
ndat[2:26] *= (-1)
|
|
|
|
parity = np.zeros(6)
|
|
parity[0] = ndat[0] * ndat[2] * ndat[3] * ndat[4] * ndat[6] * \
|
|
ndat[7] * ndat[11] * ndat[12] * ndat[13] * ndat[14] * \
|
|
ndat[15] * ndat[18] * ndat[19] * ndat[21] * ndat[24]
|
|
|
|
parity[1] = ndat[1] * ndat[3] * ndat[4] * ndat[5] * ndat[7] * \
|
|
ndat[8] * ndat[12] * ndat[13] * ndat[14] * ndat[15] * \
|
|
ndat[16] * ndat[19] * ndat[20] * ndat[22] * ndat[25]
|
|
|
|
parity[2] = ndat[0] * ndat[2] * ndat[4] * ndat[5] * ndat[6] * \
|
|
ndat[8] * ndat[9] * ndat[13] * ndat[14] * ndat[15] * \
|
|
ndat[16] * ndat[17] * ndat[20] * ndat[21] * ndat[23]
|
|
|
|
parity[3] = ndat[1] * ndat[3] * ndat[5] * ndat[6] * ndat[7] * \
|
|
ndat[9] * ndat[10] * ndat[14] * ndat[15] * ndat[16] * \
|
|
ndat[17] * ndat[18] * ndat[21] * ndat[22] * ndat[24]
|
|
|
|
parity[4] = ndat[1] * ndat[2] * ndat[4] * ndat[6] * ndat[7] * \
|
|
ndat[8] * ndat[10] * ndat[11] * ndat[15] * ndat[16] * \
|
|
ndat[17] * ndat[18] * ndat[19] * ndat[22] * ndat[23] * \
|
|
ndat[25]
|
|
|
|
parity[5] = ndat[0] * ndat[4] * ndat[6] * ndat[7] * ndat[9] * \
|
|
ndat[10] * ndat[11] * ndat[12] * ndat[14] * ndat[16] * \
|
|
ndat[20] * ndat[23] * ndat[24] * ndat[25]
|
|
|
|
if (parity == ndat[26:]).sum() == 6:
|
|
status = -1 * ndat[1]
|
|
else:
|
|
status = 0
|
|
return status
|
|
|
|
# ./findPreambles.m
|
|
def findPreambles(self):
|
|
assert isinstance(self._results, np.recarray)
|
|
trackResults = self._results
|
|
settings = self._settings
|
|
|
|
searchStartOffset = 0
|
|
|
|
# --- Initialize the firstSubFrame array -----------------------------------
|
|
firstSubFrame = np.zeros(settings.numberOfChannels, dtype=int)
|
|
|
|
# --- Generate the preamble pattern ----------------------------------------
|
|
preamble_bits = np.r_[1, -1, -1, -1, 1, -1, 1, 1]
|
|
preamble_ms = np.kron(preamble_bits, np.ones(20)) # upsample 20x
|
|
|
|
# --- Make a list of channels excluding not tracking channels --------------
|
|
activeChnList = np.nonzero(trackResults.status.astype('U') != '-')[0]
|
|
|
|
# === For all tracking channels ...
|
|
for ch in activeChnList.tolist():
|
|
bits = trackResults[ch].I_P[searchStartOffset:].copy()
|
|
# Chuẩn hóa sang {-1, +1}
|
|
bits = np.where(bits > 0, 1, -1)
|
|
|
|
# Correlate với preamble (pad cho cùng độ dài)
|
|
tlmXcorrResult = np.correlate(
|
|
bits,
|
|
np.pad(preamble_ms, (0, max(0, bits.size - preamble_ms.size)), 'constant'),
|
|
mode='full'
|
|
)
|
|
tlmXcorrResult = np.nan_to_num(tlmXcorrResult, nan=0.0, posinf=0.0, neginf=0.0)
|
|
|
|
# Tìm các điểm giống preamble
|
|
xcorrLength = (len(tlmXcorrResult) + 1) // 2 # int
|
|
start = max(0, xcorrLength - 1)
|
|
stop = min(2 * xcorrLength, tlmXcorrResult.shape[0])
|
|
idx_rel = (np.abs(tlmXcorrResult[start:stop]) > 153).nonzero()[0]
|
|
index = idx_rel.astype(np.intp) + int(searchStartOffset)
|
|
|
|
found = False
|
|
for i in range(len(index)):
|
|
index2 = index - index[i]
|
|
# Khoảng cách 6000 ms (1 subframe)
|
|
if (index2 == 6000).any():
|
|
s = int(index[i] - 40) # 2 bit prev * 20ms
|
|
e = int(index[i] + 20 * 60) # 60 bits * 20ms
|
|
if s < 0 or e > trackResults[ch].I_P.size:
|
|
continue
|
|
|
|
bits2 = trackResults[ch].I_P[s:e].copy().reshape(20, -1, order='F').sum(0)
|
|
bits2 = np.where(bits2 > 0, 1, -1)
|
|
|
|
if self.navPartyChk(bits2[:32]) != 0 and self.navPartyChk(bits2[30:62]) != 0:
|
|
firstSubFrame[ch] = int(index[i])
|
|
found = True
|
|
break
|
|
|
|
if not found:
|
|
activeChnList = np.setdiff1d(activeChnList, ch)
|
|
print('Could not find valid preambles in channel %2d !' % ch)
|
|
|
|
return firstSubFrame, activeChnList
|
|
|
|
|
|
if __name__ == '__main__':
|
|
pass
|