SoftGNSS/tracking_IQ.py

231 lines
9.9 KiB
Python
Raw Permalink Normal View History

2025-10-22 16:08:12 +07:00
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.gridspec as gs
from initialize import Result
class TrackingResultIQ(Result):
def __init__(self, acqResult):
self._results = None
self._channels = acqResult.channels
self._settings = acqResult.settings
def track_IQ(self, fid):
channel = self._channels
settings = self._settings
codePeriods = settings.msToProcess
earlyLateSpc = settings.dllCorrelatorSpacing
PDIcode = 0.001
tau1code, tau2code = settings.calcLoopCoef(settings.dllNoiseBandwidth,
settings.dllDampingRatio, 1.0)
PDIcarr = 0.001
tau1carr, tau2carr = settings.calcLoopCoef(settings.pllNoiseBandwidth,
settings.pllDampingRatio, 0.25)
rec = []
for channelNr in range(settings.numberOfChannels):
msToProcess = int(settings.msToProcess)
status = '-'
absoluteSample = np.zeros(msToProcess)
codeFreq_ = np.inf * np.ones(msToProcess)
carrFreq_ = np.inf * np.ones(msToProcess)
# Outputs from the correlators (In-phase):
I_E_ = np.zeros(msToProcess)
I_P_ = np.zeros(msToProcess)
I_L_ = np.zeros(msToProcess)
# Outputs from the correlators (Quadrature-phase):
Q_E_ = np.zeros(msToProcess)
Q_P_ = np.zeros(msToProcess)
Q_L_ = np.zeros(msToProcess)
dllDiscr = np.inf * np.ones(msToProcess)
dllDiscrFilt = np.inf * np.ones(msToProcess)
pllDiscr = np.inf * np.ones(msToProcess)
pllDiscrFilt = np.inf * np.ones(msToProcess)
PRN = 0
if channel[channelNr].PRN != 0:
PRN = channel[channelNr].PRN
fid.seek(int(settings.skipNumberOfBytes +
channel[channelNr].codePhase * settings.dataTypeSize * 2), 0)
caCode = settings.generateCAcode(channel[channelNr].PRN - 1)
caCode = np.r_[caCode[-1], caCode, caCode[0]]
codeFreq = settings.codeFreqBasis
remCodePhase = 0.0
carrFreq = channel[channelNr].acquiredFreq
carrFreqBasis = channel[channelNr].acquiredFreq
remCarrPhase = 0.0
oldCodeNco = 0.0;
oldCodeError = 0.0
oldCarrNco = 0.0;
oldCarrError = 0.0
for loopCnt in range(msToProcess):
if loopCnt % 50 == 0:
print(f"Tracking: Ch {channelNr + 1} of {settings.numberOfChannels}; "
f"PRN#{channel[channelNr].PRN}; "
f"Completed {loopCnt} of {msToProcess} msec")
codePhaseStep = codeFreq / settings.samplingFreq
blksize = np.ceil((settings.codeLength - remCodePhase) / codePhaseStep)
blksize = np.long(blksize)
raw = np.fromfile(fid, dtype=settings.dataType, count=2*blksize)
if len(raw) < 2*blksize:
print("End of file, stopping tracking")
break
rawSignal = raw[0::2] + 1j * raw[1::2]
# Early, Late, Prompt
tcode = np.arange(blksize) * codePhaseStep + remCodePhase - earlyLateSpc
earlyCode = caCode[np.ceil(tcode).astype(int)]
tcode = np.arange(blksize) * codePhaseStep + remCodePhase + earlyLateSpc
lateCode = caCode[np.ceil(tcode).astype(int)]
tcode = np.arange(blksize) * codePhaseStep + remCodePhase
promptCode = caCode[np.ceil(tcode).astype(int)]
remCodePhase = tcode[blksize - 1] + codePhaseStep - 1023.0
# Mix về baseband
time = np.arange(0, blksize + 1) / settings.samplingFreq
trigarg = carrFreq * 2*np.pi * time + remCarrPhase
# remCarrPhase = trigarg[-1] % (2*np.pi)
remCarrPhase = trigarg[blksize]% (2 * np.pi);
carrCos = np.cos(trigarg[0:blksize])
carrSin = np.sin(trigarg[0:blksize])
iqBaseband = (carrSin + 1j*carrCos) * rawSignal
iBaseband = iqBaseband.real
qBaseband = iqBaseband.imag
I_E = np.sum(earlyCode * iBaseband); Q_E = np.sum(earlyCode * qBaseband)
I_P = np.sum(promptCode * iBaseband); Q_P = np.sum(promptCode * qBaseband)
I_L = np.sum(lateCode * iBaseband); Q_L = np.sum(lateCode * qBaseband)
# PLL
carrError = np.arctan(Q_P / I_P) / (2*np.pi)
carrNco = oldCarrNco + (tau2carr/tau1carr) * (carrError - oldCarrError) + carrError * (PDIcarr/tau1carr)
oldCarrNco, oldCarrError = carrNco, carrError
carrFreq = carrFreqBasis + carrNco
# DLL
codeError = (np.sqrt(I_E**2 + Q_E**2) - np.sqrt(I_L**2 + Q_L**2)) / \
(np.sqrt(I_E**2 + Q_E**2) + np.sqrt(I_L**2 + Q_L**2))
codeNco = oldCodeNco + (tau2code/tau1code) * (codeError - oldCodeError) + codeError * (PDIcode/tau1code)
oldCodeNco, oldCodeError = codeNco, codeError
codeFreq = settings.codeFreqBasis - codeNco
# Save
absoluteSample[loopCnt] = fid.tell()
carrFreq_[loopCnt] = carrFreq
codeFreq_[loopCnt] = codeFreq
I_E_[loopCnt], Q_E_[loopCnt] = I_E, Q_E
I_P_[loopCnt], Q_P_[loopCnt] = I_P, Q_P
I_L_[loopCnt], Q_L_[loopCnt] = I_L, Q_L
dllDiscr[loopCnt], dllDiscrFilt[loopCnt] = codeError, codeNco
pllDiscr[loopCnt], pllDiscrFilt[loopCnt] = carrError, carrNco
status = channel[channelNr].status
rec.append((status, absoluteSample, codeFreq_, carrFreq_,
I_P_, I_E_, I_L_, Q_E_, Q_P_, Q_L_,
dllDiscr, dllDiscrFilt, pllDiscr, pllDiscrFilt, PRN))
trackResults = np.rec.fromrecords(rec,
dtype=[('status','S1'),
('absoluteSample','object'),
('codeFreq','object'),
('carrFreq','object'),
('I_P','object'),('I_E','object'),('I_L','object'),
('Q_E','object'),('Q_P','object'),('Q_L','object'),
('dllDiscr','object'),('dllDiscrFilt','object'),
('pllDiscr','object'),('pllDiscrFilt','object'),
('PRN','int64')])
self._results = trackResults
return trackResults
def plot(self, channelList=None):
trackResults = self._results
settings = self._settings
if channelList is None:
channelList = range(settings.numberOfChannels)
channelList = np.intersect1d(channelList, range(settings.numberOfChannels))
for channelNr in channelList:
f = plt.figure(channelNr + 200)
f.set_label(f'Channel {channelNr} (PRN {trackResults[channelNr].PRN}) results')
spec = gs.GridSpec(3, 3)
h11 = plt.subplot(spec[0, 0])
h12 = plt.subplot(spec[0, 1:])
h21 = plt.subplot(spec[1, 0])
h22 = plt.subplot(spec[1, 1:])
h31 = plt.subplot(spec[2, 0])
h32 = plt.subplot(spec[2, 1])
h33 = plt.subplot(spec[2, 2])
timeAxisInSeconds = np.arange(settings.msToProcess) / 1000.0
# Scatter plot
h11.plot(trackResults[channelNr].I_P, trackResults[channelNr].Q_P, '.')
h11.grid(); h11.axis('equal')
h11.set(title='Scatter Plot', xlabel='I prompt', ylabel='Q prompt')
# Nav bits
h12.plot(timeAxisInSeconds, trackResults[channelNr].I_P)
h12.grid(); h12.axis('tight')
h12.set(title='Navigation bits (I_P)', xlabel='Time (s)')
# PLL raw discr
h21.plot(timeAxisInSeconds, trackResults[channelNr].pllDiscr, 'r')
h21.grid(); h21.axis('tight')
h21.set(title='Raw PLL discriminator', xlabel='Time (s)')
# Correlation results
h22.plot(timeAxisInSeconds, np.sqrt(trackResults[channelNr].I_E**2 + trackResults[channelNr].Q_E**2),
timeAxisInSeconds, np.sqrt(trackResults[channelNr].I_P**2 + trackResults[channelNr].Q_P**2),
timeAxisInSeconds, np.sqrt(trackResults[channelNr].I_L**2 + trackResults[channelNr].Q_L**2), '-*')
h22.grid(); h22.axis('tight')
h22.set(title='Correlation results', xlabel='Time (s)')
h22.legend(['E', 'P', 'L'])
# PLL filtered
h31.plot(timeAxisInSeconds, trackResults[channelNr].pllDiscrFilt, 'b')
h31.grid(); h31.axis('tight')
h31.set(title='Filtered PLL discriminator', xlabel='Time (s)')
# DLL raw
h32.plot(timeAxisInSeconds, trackResults[channelNr].dllDiscr, 'r')
h32.grid(); h32.axis('tight')
h32.set(title='Raw DLL discriminator', xlabel='Time (s)')
# DLL filtered
h33.plot(timeAxisInSeconds, trackResults[channelNr].dllDiscrFilt, 'b')
h33.grid(); h33.axis('tight')
h33.set(title='Filtered DLL discriminator', xlabel='Time (s)')
plt.tight_layout()
f.show()