import numpy as np import matplotlib.pyplot as plt import matplotlib.gridspec as gs from initialize import Result class TrackingResultIQ(Result): def __init__(self, acqResult): self._results = None self._channels = acqResult.channels self._settings = acqResult.settings def track_IQ(self, fid): channel = self._channels settings = self._settings codePeriods = settings.msToProcess earlyLateSpc = settings.dllCorrelatorSpacing PDIcode = 0.001 tau1code, tau2code = settings.calcLoopCoef(settings.dllNoiseBandwidth, settings.dllDampingRatio, 1.0) PDIcarr = 0.001 tau1carr, tau2carr = settings.calcLoopCoef(settings.pllNoiseBandwidth, settings.pllDampingRatio, 0.25) rec = [] for channelNr in range(settings.numberOfChannels): msToProcess = int(settings.msToProcess) status = '-' absoluteSample = np.zeros(msToProcess) codeFreq_ = np.inf * np.ones(msToProcess) carrFreq_ = np.inf * np.ones(msToProcess) # Outputs from the correlators (In-phase): I_E_ = np.zeros(msToProcess) I_P_ = np.zeros(msToProcess) I_L_ = np.zeros(msToProcess) # Outputs from the correlators (Quadrature-phase): Q_E_ = np.zeros(msToProcess) Q_P_ = np.zeros(msToProcess) Q_L_ = np.zeros(msToProcess) dllDiscr = np.inf * np.ones(msToProcess) dllDiscrFilt = np.inf * np.ones(msToProcess) pllDiscr = np.inf * np.ones(msToProcess) pllDiscrFilt = np.inf * np.ones(msToProcess) PRN = 0 if channel[channelNr].PRN != 0: PRN = channel[channelNr].PRN fid.seek(int(settings.skipNumberOfBytes + channel[channelNr].codePhase * settings.dataTypeSize * 2), 0) caCode = settings.generateCAcode(channel[channelNr].PRN - 1) caCode = np.r_[caCode[-1], caCode, caCode[0]] codeFreq = settings.codeFreqBasis remCodePhase = 0.0 carrFreq = channel[channelNr].acquiredFreq carrFreqBasis = channel[channelNr].acquiredFreq remCarrPhase = 0.0 oldCodeNco = 0.0; oldCodeError = 0.0 oldCarrNco = 0.0; oldCarrError = 0.0 for loopCnt in range(msToProcess): if loopCnt % 50 == 0: print(f"Tracking: Ch {channelNr + 1} of {settings.numberOfChannels}; " f"PRN#{channel[channelNr].PRN}; " f"Completed {loopCnt} of {msToProcess} msec") codePhaseStep = codeFreq / settings.samplingFreq blksize = np.ceil((settings.codeLength - remCodePhase) / codePhaseStep) blksize = np.long(blksize) raw = np.fromfile(fid, dtype=settings.dataType, count=2*blksize) if len(raw) < 2*blksize: print("End of file, stopping tracking") break rawSignal = raw[0::2] + 1j * raw[1::2] # Early, Late, Prompt tcode = np.arange(blksize) * codePhaseStep + remCodePhase - earlyLateSpc earlyCode = caCode[np.ceil(tcode).astype(int)] tcode = np.arange(blksize) * codePhaseStep + remCodePhase + earlyLateSpc lateCode = caCode[np.ceil(tcode).astype(int)] tcode = np.arange(blksize) * codePhaseStep + remCodePhase promptCode = caCode[np.ceil(tcode).astype(int)] remCodePhase = tcode[blksize - 1] + codePhaseStep - 1023.0 # Mix về baseband time = np.arange(0, blksize + 1) / settings.samplingFreq trigarg = carrFreq * 2*np.pi * time + remCarrPhase # remCarrPhase = trigarg[-1] % (2*np.pi) remCarrPhase = trigarg[blksize]% (2 * np.pi); carrCos = np.cos(trigarg[0:blksize]) carrSin = np.sin(trigarg[0:blksize]) iqBaseband = (carrSin + 1j*carrCos) * rawSignal iBaseband = iqBaseband.real qBaseband = iqBaseband.imag I_E = np.sum(earlyCode * iBaseband); Q_E = np.sum(earlyCode * qBaseband) I_P = np.sum(promptCode * iBaseband); Q_P = np.sum(promptCode * qBaseband) I_L = np.sum(lateCode * iBaseband); Q_L = np.sum(lateCode * qBaseband) # PLL carrError = np.arctan(Q_P / I_P) / (2*np.pi) carrNco = oldCarrNco + (tau2carr/tau1carr) * (carrError - oldCarrError) + carrError * (PDIcarr/tau1carr) oldCarrNco, oldCarrError = carrNco, carrError carrFreq = carrFreqBasis + carrNco # DLL codeError = (np.sqrt(I_E**2 + Q_E**2) - np.sqrt(I_L**2 + Q_L**2)) / \ (np.sqrt(I_E**2 + Q_E**2) + np.sqrt(I_L**2 + Q_L**2)) codeNco = oldCodeNco + (tau2code/tau1code) * (codeError - oldCodeError) + codeError * (PDIcode/tau1code) oldCodeNco, oldCodeError = codeNco, codeError codeFreq = settings.codeFreqBasis - codeNco # Save absoluteSample[loopCnt] = fid.tell() carrFreq_[loopCnt] = carrFreq codeFreq_[loopCnt] = codeFreq I_E_[loopCnt], Q_E_[loopCnt] = I_E, Q_E I_P_[loopCnt], Q_P_[loopCnt] = I_P, Q_P I_L_[loopCnt], Q_L_[loopCnt] = I_L, Q_L dllDiscr[loopCnt], dllDiscrFilt[loopCnt] = codeError, codeNco pllDiscr[loopCnt], pllDiscrFilt[loopCnt] = carrError, carrNco status = channel[channelNr].status rec.append((status, absoluteSample, codeFreq_, carrFreq_, I_P_, I_E_, I_L_, Q_E_, Q_P_, Q_L_, dllDiscr, dllDiscrFilt, pllDiscr, pllDiscrFilt, PRN)) trackResults = np.rec.fromrecords(rec, dtype=[('status','S1'), ('absoluteSample','object'), ('codeFreq','object'), ('carrFreq','object'), ('I_P','object'),('I_E','object'),('I_L','object'), ('Q_E','object'),('Q_P','object'),('Q_L','object'), ('dllDiscr','object'),('dllDiscrFilt','object'), ('pllDiscr','object'),('pllDiscrFilt','object'), ('PRN','int64')]) self._results = trackResults return trackResults def plot(self, channelList=None): trackResults = self._results settings = self._settings if channelList is None: channelList = range(settings.numberOfChannels) channelList = np.intersect1d(channelList, range(settings.numberOfChannels)) for channelNr in channelList: f = plt.figure(channelNr + 200) f.set_label(f'Channel {channelNr} (PRN {trackResults[channelNr].PRN}) results') spec = gs.GridSpec(3, 3) h11 = plt.subplot(spec[0, 0]) h12 = plt.subplot(spec[0, 1:]) h21 = plt.subplot(spec[1, 0]) h22 = plt.subplot(spec[1, 1:]) h31 = plt.subplot(spec[2, 0]) h32 = plt.subplot(spec[2, 1]) h33 = plt.subplot(spec[2, 2]) timeAxisInSeconds = np.arange(settings.msToProcess) / 1000.0 # Scatter plot h11.plot(trackResults[channelNr].I_P, trackResults[channelNr].Q_P, '.') h11.grid(); h11.axis('equal') h11.set(title='Scatter Plot', xlabel='I prompt', ylabel='Q prompt') # Nav bits h12.plot(timeAxisInSeconds, trackResults[channelNr].I_P) h12.grid(); h12.axis('tight') h12.set(title='Navigation bits (I_P)', xlabel='Time (s)') # PLL raw discr h21.plot(timeAxisInSeconds, trackResults[channelNr].pllDiscr, 'r') h21.grid(); h21.axis('tight') h21.set(title='Raw PLL discriminator', xlabel='Time (s)') # Correlation results h22.plot(timeAxisInSeconds, np.sqrt(trackResults[channelNr].I_E**2 + trackResults[channelNr].Q_E**2), timeAxisInSeconds, np.sqrt(trackResults[channelNr].I_P**2 + trackResults[channelNr].Q_P**2), timeAxisInSeconds, np.sqrt(trackResults[channelNr].I_L**2 + trackResults[channelNr].Q_L**2), '-*') h22.grid(); h22.axis('tight') h22.set(title='Correlation results', xlabel='Time (s)') h22.legend(['E', 'P', 'L']) # PLL filtered h31.plot(timeAxisInSeconds, trackResults[channelNr].pllDiscrFilt, 'b') h31.grid(); h31.axis('tight') h31.set(title='Filtered PLL discriminator', xlabel='Time (s)') # DLL raw h32.plot(timeAxisInSeconds, trackResults[channelNr].dllDiscr, 'r') h32.grid(); h32.axis('tight') h32.set(title='Raw DLL discriminator', xlabel='Time (s)') # DLL filtered h33.plot(timeAxisInSeconds, trackResults[channelNr].dllDiscrFilt, 'b') h33.grid(); h33.axis('tight') h33.set(title='Filtered DLL discriminator', xlabel='Time (s)') plt.tight_layout() f.show()