SoftGNSS/postNavigation.py

447 lines
19 KiB
Python
Raw Normal View History

2025-10-22 16:08:12 +07:00
import numpy as np
import ephemeris
from geoFunctions import satpos, leastSquarePos, cart2geo, findUtmZone, cart2utm
from initialize import Result
import shutil
class NavigationResult(Result):
def __init__(self, trackResult):
self._results = trackResult.results
self._channels = trackResult.channels
self._settings = trackResult.settings
self._solutions = None
self._eph = None
@property
def solutions(self):
assert isinstance(self._solutions, np.recarray)
return self._solutions
@property
def ephemeris(self):
# Không assert nhầm sang _solutions nữa
assert isinstance(self._solutions, np.recarray)
return self._eph
# ./calculatePseudoranges.m
def calculatePseudoranges(self, msOfTheSignal, channelList):
trackResults = self._results
settings = self._settings
# calculatePseudoranges finds relative pseudoranges for all satellites
# listed in CHANNELLIST at the specified millisecond of the processed
# signal. The pseudoranges contain unknown receiver clock offset. It can be
# found by the least squares position search procedure.
# --- Set initial travel time to infinity ----------------------------------
travelTime = np.inf * np.ones(settings.numberOfChannels)
# Find number of samples per spreading code
samplesPerCode = int(settings.samplesPerCode)
# --- For all channels in the list ...
for ch in channelList:
# Ép chỉ số ms về int
ms_idx = int(msOfTheSignal[ch])
travelTime[ch] = (
self._results[ch].absoluteSample[ms_idx] / samplesPerCode
)
# --- Truncate the travelTime and compute pseudoranges ---------------------
minimum = np.floor(np.min(travelTime[channelList])) # thay vì toàn bộ
travelTime = travelTime - minimum + settings.startOffset
# --- Convert travel time to a distance ------------------------------------
# The speed of light must be converted from meters per second to meters
# per millisecond.
pseudoranges = travelTime * settings.c / 1000.0
return pseudoranges
# ./postNavigation.m
def postNavigate(self):
trackResults = self._results
settings = self._settings
# Check minimal requirements
if settings.msToProcess < 36000.0 or np.sum(trackResults.status.astype('U') != '-') < 4:
# do something
print('Record is too short or too few satellites tracked. Exiting!')
self._solutions = None
self._eph = None
return
# Find preamble start positions
subFrameStart, activeChnList = self.findPreambles()
# Decode ephemerides
field_str = 'weekNumber,accuracy,health,T_GD,IODC,t_oc,a_f2,a_f1,a_f0,' \
'IODE_sf2,C_rs,deltan,M_0,C_uc,e,C_us,sqrtA,t_oe,' \
'C_ic,omega_0,C_is,i_0,C_rc,omega,omegaDot,IODE_sf3,iDot'
eph = np.recarray((32,), formats=['O'] * 27, names=field_str)
TOW = None
for ch in activeChnList:
# === Convert tracking output to navigation bits =======================
# Copy 5 sub-frames long record
start = int(subFrameStart[ch] - 20)
stop = int(subFrameStart[ch] + 1500 * 20)
# Bảo vệ biên
start = max(0, start)
stop = min(stop, trackResults[ch].I_P.size)
if stop - start < 20 * 1500:
# Không đủ dữ liệu cho 5 subframes
continue
navBitsSamples = trackResults[ch].I_P[start:stop].copy()
# reshape 20 x N (sum theo cột)
navBitsSamples = navBitsSamples.reshape(20, -1, order='F')
navBits = navBitsSamples.sum(0)
navBits = (navBits > 0).astype(int)
# Binary string array (indexable)
navBitsBin = navBits.astype(str)
eph[trackResults[ch].PRN - 1], TOW = ephemeris.ephemeris(navBitsBin[1:], navBitsBin[0])
if (eph[trackResults[ch].PRN - 1].IODC is None or
eph[trackResults[ch].PRN - 1].IODE_sf2 is None or
eph[trackResults[ch].PRN - 1].IODE_sf3 is None):
# Exclude channel from the list (from further processing)
activeChnList = np.setdiff1d(activeChnList, ch)
# Need >= 4 SV for 3D position
if activeChnList.size < 4:
print('Too few satellites with ephemeris data for position calculations. Exiting!')
self._solutions = None
self._eph = None
return
# Initialization
satElev = np.inf * np.ones(settings.numberOfChannels) # include all SV for first fix
readyChnList = activeChnList.copy()
transmitTime = float(TOW) if TOW is not None else 0.0
total_ms = float(settings.msToProcess - int(np.max(subFrameStart)))
num_iter = int(np.fix(total_ms) / settings.navSolPeriod)
# num_iter = 64
channel = np.rec.array([(np.zeros((settings.numberOfChannels, num_iter)),
np.nan * np.ones((settings.numberOfChannels, num_iter)),
np.nan * np.ones((settings.numberOfChannels, num_iter)),
np.nan * np.ones((settings.numberOfChannels, num_iter)),
np.nan * np.ones((settings.numberOfChannels, num_iter))
)], formats=['O'] * 5, names='PRN,el,az,rawP,correctedP')
navSolutions = np.rec.array([(channel,
np.zeros((5, num_iter)),
np.nan * np.ones(num_iter),
np.nan * np.ones(num_iter),
np.nan * np.ones(num_iter),
np.nan * np.ones(num_iter),
np.nan * np.ones(num_iter),
np.nan * np.ones(num_iter),
np.nan * np.ones(num_iter),
0,
np.nan * np.ones(num_iter),
np.nan * np.ones(num_iter),
np.nan * np.ones(num_iter)
)], formats=['O'] * 13,
names='channel,DOP,X,Y,Z,dt,latitude,longitude,height,utmZone,E,N,U')
for currMeasNr in range(num_iter):
# apply elevation mask
activeChnList = np.intersect1d((satElev >= settings.elevationMask).nonzero()[0], readyChnList)
# PRN list
channel[0].PRN[activeChnList, currMeasNr] = trackResults[activeChnList].PRN
# Pseudoranges
ms_vec = (subFrameStart + settings.navSolPeriod * currMeasNr).astype(int)
channel[0].rawP[:, currMeasNr] = self.calculatePseudoranges(ms_vec, activeChnList)
# Satellite positions and clock corrections
satPositions, satClkCorr = satpos(transmitTime, trackResults[activeChnList].PRN, eph, settings)
# Receiver position (need >= 4)
if activeChnList.size > 3:
(xyzdt,
channel[0].el[activeChnList, currMeasNr],
channel[0].az[activeChnList, currMeasNr],
navSolutions[0].DOP[:, currMeasNr]) = leastSquarePos(
satPositions,
channel[0].rawP[activeChnList, currMeasNr] + satClkCorr * settings.c,
settings
)
navSolutions[0].X[currMeasNr] = xyzdt[0]
navSolutions[0].Y[currMeasNr] = xyzdt[1]
navSolutions[0].Z[currMeasNr] = xyzdt[2]
navSolutions[0].dt[currMeasNr] = xyzdt[3]
satElev = channel[0].el[:, currMeasNr]
channel[0].correctedP[activeChnList, currMeasNr] = (
channel[0].rawP[activeChnList, currMeasNr] +
satClkCorr * settings.c +
navSolutions[0].dt[currMeasNr]
)
# ECEF -> Geodetic
(navSolutions[0].latitude[currMeasNr],
navSolutions[0].longitude[currMeasNr],
navSolutions[0].height[currMeasNr]) = cart2geo(
navSolutions[0].X[currMeasNr],
navSolutions[0].Y[currMeasNr],
navSolutions[0].Z[currMeasNr],
4
)
navSolutions[0].utmZone = findUtmZone(
navSolutions[0].latitude[currMeasNr],
navSolutions[0].longitude[currMeasNr]
)
(navSolutions[0].E[currMeasNr],
navSolutions[0].N[currMeasNr],
navSolutions[0].U[currMeasNr]) = cart2utm(
xyzdt[0], xyzdt[1], xyzdt[2], navSolutions[0].utmZone
)
else:
print(' Measurement No. %d: Not enough information for position solution.' % currMeasNr)
navSolutions[0].X[currMeasNr] = np.nan
navSolutions[0].Y[currMeasNr] = np.nan
navSolutions[0].Z[currMeasNr] = np.nan
navSolutions[0].dt[currMeasNr] = np.nan
navSolutions[0].DOP[:, currMeasNr] = np.zeros(5)
navSolutions[0].latitude[currMeasNr] = np.nan
navSolutions[0].longitude[currMeasNr] = np.nan
navSolutions[0].height[currMeasNr] = np.nan
navSolutions[0].E[currMeasNr] = np.nan
navSolutions[0].N[currMeasNr] = np.nan
navSolutions[0].U[currMeasNr] = np.nan
channel[0].az[activeChnList, currMeasNr] = np.nan * np.ones(activeChnList.shape)
channel[0].el[activeChnList, currMeasNr] = np.nan * np.ones(activeChnList.shape)
# Update TOW
transmitTime += settings.navSolPeriod / 1000.0
self._solutions = navSolutions
self._eph = eph
return
def plot(self):
settings = self._settings
navSolutions = self._solutions
# assert isinstance(navSolutions, np.recarray)
import matplotlib as mpl
import matplotlib.gridspec as gs
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D # noqa: F401
import initialize
mpl.rcdefaults()
mpl.rc('savefig', bbox='tight', transparent=False, format='png')
mpl.rc('axes', grid=True, linewidth=1.5, axisbelow=True)
mpl.rc('lines', linewidth=1.5, solid_joinstyle='bevel')
mpl.rc('figure', figsize=[8, 6], autolayout=False, dpi=120)
# Dùng LaTeX nếu có sẵn, không thì fallback sang mathtext
if shutil.which("latex"):
mpl.rc('text', usetex=True)
else:
mpl.rc('text', usetex=False)
mpl.rc('mathtext', fontset='cm') # hiển thị công thức toán kiểu Computer Modern
mpl.rc('font', family='serif', serif='Computer Modern Roman', size=10)
if navSolutions is not None:
refCoord = initialize.TruePosition()
# Nếu không có tọa độ tham chiếu, dùng trung bình
if (settings.truePosition.E is None or
settings.truePosition.N is None or
settings.truePosition.U is None):
refCoord.E = np.nanmean(navSolutions[0].E)
refCoord.N = np.nanmean(navSolutions[0].N)
refCoord.U = np.nanmean(navSolutions[0].U)
meanLongitude = np.nanmean(navSolutions[0].longitude)
meanLatitude = np.nanmean(navSolutions[0].latitude)
refPointLgText = 'Mean Position' + '\\newline Lat: %.5f $^\\circ$' % meanLatitude + \
'\\newline Lng: %.5f $^\\circ$' % meanLongitude + \
'\\newline Hgt: %+6.1f' % np.nanmean(navSolutions[0].height)
else:
refPointLgText = 'Reference Position'
refCoord.E = settings.truePosition.E
refCoord.N = settings.truePosition.N
refCoord.U = settings.truePosition.U
figureNumber = 300
f = plt.figure(figureNumber)
f.clf()
f.set_label('Navigation solutions')
spec = gs.GridSpec(2, 2)
h11 = plt.subplot(spec[0:2])
h31 = plt.subplot(spec[2], projection='3d')
h32 = plt.subplot(spec[3], projection='polar')
# ENU variations
h11.plot(navSolutions[0].E - refCoord.E,
navSolutions[0].N - refCoord.N,
navSolutions[0].U - refCoord.U, '-')
h11.legend(['E', 'N', 'U'])
h11.set(title='Coordinates variations in UTM system',
xlabel='Measurement period: %i ms' % settings.navSolPeriod,
ylabel='Variations (m)')
h11.grid(True)
h11.axis('tight')
# 3D plot
h31.plot((navSolutions[0].E - refCoord.E).T,
(navSolutions[0].N - refCoord.N).T,
(navSolutions[0].U - refCoord.U).T, '+')
h31.plot([0], [0], [0], 'r+', lw=1.5, ms=10)
h31.set_box_aspect((1, 1, 1))
h31.grid(True)
h31.legend(['Measurements', refPointLgText])
h31.set(title='Positions in UTM system (3D plot)',
xlabel='East (m)',
ylabel='North (m)',
zlabel='Upping (m)')
# Sky plot
az = np.deg2rad(navSolutions[0].channel[0].az.T)
el = navSolutions[0].channel[0].el.T
h32.plot(az, 90 - el)
# PRN labels tại epoch 0
az0 = np.deg2rad(navSolutions[0].channel[0].az[:, 0])
el0 = navSolutions[0].channel[0].el[:, 0]
prn0 = navSolutions[0].channel[0].PRN[:, 0]
for x, y, s in zip(az0, 90 - el0, prn0):
if np.isfinite(x) and np.isfinite(y) and s != 0:
h32.text(x, y, str(s))
h32.set_theta_direction(-1)
h32.set_theta_zero_location('N')
h32.set_xlim([0, 2 * np.pi])
h32.set_xticks(np.linspace(0, 2 * np.pi, 12, endpoint=False))
h32.set_rlabel_position(0)
h32.set_ylim([0, 90])
h32.set_yticks([0, 15, 30, 45, 60, 75])
h32.set_yticklabels([90, 75, 60, 45, 30, 15])
h32.set_title('Sky plot (mean PDOP: %f )' % np.nanmean(navSolutions[0].DOP[1, :]))
plt.show()
else:
print('plotNavigation: No navigation data to plot.')
@staticmethod
# navPartyChk.m
def navPartyChk(ndat):
# Parity check cho một từ GPS (32 bits: 2 prev + 30 curr)
if ndat[1] != 1:
ndat[2:26] *= (-1)
parity = np.zeros(6)
parity[0] = ndat[0] * ndat[2] * ndat[3] * ndat[4] * ndat[6] * \
ndat[7] * ndat[11] * ndat[12] * ndat[13] * ndat[14] * \
ndat[15] * ndat[18] * ndat[19] * ndat[21] * ndat[24]
parity[1] = ndat[1] * ndat[3] * ndat[4] * ndat[5] * ndat[7] * \
ndat[8] * ndat[12] * ndat[13] * ndat[14] * ndat[15] * \
ndat[16] * ndat[19] * ndat[20] * ndat[22] * ndat[25]
parity[2] = ndat[0] * ndat[2] * ndat[4] * ndat[5] * ndat[6] * \
ndat[8] * ndat[9] * ndat[13] * ndat[14] * ndat[15] * \
ndat[16] * ndat[17] * ndat[20] * ndat[21] * ndat[23]
parity[3] = ndat[1] * ndat[3] * ndat[5] * ndat[6] * ndat[7] * \
ndat[9] * ndat[10] * ndat[14] * ndat[15] * ndat[16] * \
ndat[17] * ndat[18] * ndat[21] * ndat[22] * ndat[24]
parity[4] = ndat[1] * ndat[2] * ndat[4] * ndat[6] * ndat[7] * \
ndat[8] * ndat[10] * ndat[11] * ndat[15] * ndat[16] * \
ndat[17] * ndat[18] * ndat[19] * ndat[22] * ndat[23] * \
ndat[25]
parity[5] = ndat[0] * ndat[4] * ndat[6] * ndat[7] * ndat[9] * \
ndat[10] * ndat[11] * ndat[12] * ndat[14] * ndat[16] * \
ndat[20] * ndat[23] * ndat[24] * ndat[25]
if (parity == ndat[26:]).sum() == 6:
status = -1 * ndat[1]
else:
status = 0
return status
# ./findPreambles.m
def findPreambles(self):
assert isinstance(self._results, np.recarray)
trackResults = self._results
settings = self._settings
searchStartOffset = 0
# --- Initialize the firstSubFrame array -----------------------------------
firstSubFrame = np.zeros(settings.numberOfChannels, dtype=int)
# --- Generate the preamble pattern ----------------------------------------
preamble_bits = np.r_[1, -1, -1, -1, 1, -1, 1, 1]
preamble_ms = np.kron(preamble_bits, np.ones(20)) # upsample 20x
# --- Make a list of channels excluding not tracking channels --------------
activeChnList = np.nonzero(trackResults.status.astype('U') != '-')[0]
# === For all tracking channels ...
for ch in activeChnList.tolist():
bits = trackResults[ch].I_P[searchStartOffset:].copy()
# Chuẩn hóa sang {-1, +1}
bits = np.where(bits > 0, 1, -1)
# Correlate với preamble (pad cho cùng độ dài)
tlmXcorrResult = np.correlate(
bits,
np.pad(preamble_ms, (0, max(0, bits.size - preamble_ms.size)), 'constant'),
mode='full'
)
tlmXcorrResult = np.nan_to_num(tlmXcorrResult, nan=0.0, posinf=0.0, neginf=0.0)
# Tìm các điểm giống preamble
xcorrLength = (len(tlmXcorrResult) + 1) // 2 # int
start = max(0, xcorrLength - 1)
stop = min(2 * xcorrLength, tlmXcorrResult.shape[0])
idx_rel = (np.abs(tlmXcorrResult[start:stop]) > 153).nonzero()[0]
index = idx_rel.astype(np.intp) + int(searchStartOffset)
found = False
for i in range(len(index)):
index2 = index - index[i]
# Khoảng cách 6000 ms (1 subframe)
if (index2 == 6000).any():
s = int(index[i] - 40) # 2 bit prev * 20ms
e = int(index[i] + 20 * 60) # 60 bits * 20ms
if s < 0 or e > trackResults[ch].I_P.size:
continue
bits2 = trackResults[ch].I_P[s:e].copy().reshape(20, -1, order='F').sum(0)
bits2 = np.where(bits2 > 0, 1, -1)
if self.navPartyChk(bits2[:32]) != 0 and self.navPartyChk(bits2[30:62]) != 0:
firstSubFrame[ch] = int(index[i])
found = True
break
if not found:
activeChnList = np.setdiff1d(activeChnList, ch)
print('Could not find valid preambles in channel %2d !' % ch)
return firstSubFrame, activeChnList
if __name__ == '__main__':
pass