import numpy as np import ephemeris from geoFunctions import satpos, leastSquarePos, cart2geo, findUtmZone, cart2utm from initialize import Result import shutil class NavigationResult(Result): def __init__(self, trackResult): self._results = trackResult.results self._channels = trackResult.channels self._settings = trackResult.settings self._solutions = None self._eph = None @property def solutions(self): assert isinstance(self._solutions, np.recarray) return self._solutions @property def ephemeris(self): # Không assert nhầm sang _solutions nữa assert isinstance(self._solutions, np.recarray) return self._eph # ./calculatePseudoranges.m def calculatePseudoranges(self, msOfTheSignal, channelList): trackResults = self._results settings = self._settings # calculatePseudoranges finds relative pseudoranges for all satellites # listed in CHANNELLIST at the specified millisecond of the processed # signal. The pseudoranges contain unknown receiver clock offset. It can be # found by the least squares position search procedure. # --- Set initial travel time to infinity ---------------------------------- travelTime = np.inf * np.ones(settings.numberOfChannels) # Find number of samples per spreading code samplesPerCode = int(settings.samplesPerCode) # --- For all channels in the list ... for ch in channelList: # Ép chỉ số ms về int ms_idx = int(msOfTheSignal[ch]) travelTime[ch] = ( self._results[ch].absoluteSample[ms_idx] / samplesPerCode ) # --- Truncate the travelTime and compute pseudoranges --------------------- minimum = np.floor(np.min(travelTime[channelList])) # thay vì toàn bộ travelTime = travelTime - minimum + settings.startOffset # --- Convert travel time to a distance ------------------------------------ # The speed of light must be converted from meters per second to meters # per millisecond. pseudoranges = travelTime * settings.c / 1000.0 return pseudoranges # ./postNavigation.m def postNavigate(self): trackResults = self._results settings = self._settings # Check minimal requirements if settings.msToProcess < 36000.0 or np.sum(trackResults.status.astype('U') != '-') < 4: # do something print('Record is too short or too few satellites tracked. Exiting!') self._solutions = None self._eph = None return # Find preamble start positions subFrameStart, activeChnList = self.findPreambles() # Decode ephemerides field_str = 'weekNumber,accuracy,health,T_GD,IODC,t_oc,a_f2,a_f1,a_f0,' \ 'IODE_sf2,C_rs,deltan,M_0,C_uc,e,C_us,sqrtA,t_oe,' \ 'C_ic,omega_0,C_is,i_0,C_rc,omega,omegaDot,IODE_sf3,iDot' eph = np.recarray((32,), formats=['O'] * 27, names=field_str) TOW = None for ch in activeChnList: # === Convert tracking output to navigation bits ======================= # Copy 5 sub-frames long record start = int(subFrameStart[ch] - 20) stop = int(subFrameStart[ch] + 1500 * 20) # Bảo vệ biên start = max(0, start) stop = min(stop, trackResults[ch].I_P.size) if stop - start < 20 * 1500: # Không đủ dữ liệu cho 5 subframes continue navBitsSamples = trackResults[ch].I_P[start:stop].copy() # reshape 20 x N (sum theo cột) navBitsSamples = navBitsSamples.reshape(20, -1, order='F') navBits = navBitsSamples.sum(0) navBits = (navBits > 0).astype(int) # Binary string array (indexable) navBitsBin = navBits.astype(str) eph[trackResults[ch].PRN - 1], TOW = ephemeris.ephemeris(navBitsBin[1:], navBitsBin[0]) if (eph[trackResults[ch].PRN - 1].IODC is None or eph[trackResults[ch].PRN - 1].IODE_sf2 is None or eph[trackResults[ch].PRN - 1].IODE_sf3 is None): # Exclude channel from the list (from further processing) activeChnList = np.setdiff1d(activeChnList, ch) # Need >= 4 SV for 3D position if activeChnList.size < 4: print('Too few satellites with ephemeris data for position calculations. Exiting!') self._solutions = None self._eph = None return # Initialization satElev = np.inf * np.ones(settings.numberOfChannels) # include all SV for first fix readyChnList = activeChnList.copy() transmitTime = float(TOW) if TOW is not None else 0.0 total_ms = float(settings.msToProcess - int(np.max(subFrameStart))) num_iter = int(np.fix(total_ms) / settings.navSolPeriod) # num_iter = 64 channel = np.rec.array([(np.zeros((settings.numberOfChannels, num_iter)), np.nan * np.ones((settings.numberOfChannels, num_iter)), np.nan * np.ones((settings.numberOfChannels, num_iter)), np.nan * np.ones((settings.numberOfChannels, num_iter)), np.nan * np.ones((settings.numberOfChannels, num_iter)) )], formats=['O'] * 5, names='PRN,el,az,rawP,correctedP') navSolutions = np.rec.array([(channel, np.zeros((5, num_iter)), np.nan * np.ones(num_iter), np.nan * np.ones(num_iter), np.nan * np.ones(num_iter), np.nan * np.ones(num_iter), np.nan * np.ones(num_iter), np.nan * np.ones(num_iter), np.nan * np.ones(num_iter), 0, np.nan * np.ones(num_iter), np.nan * np.ones(num_iter), np.nan * np.ones(num_iter) )], formats=['O'] * 13, names='channel,DOP,X,Y,Z,dt,latitude,longitude,height,utmZone,E,N,U') for currMeasNr in range(num_iter): # apply elevation mask activeChnList = np.intersect1d((satElev >= settings.elevationMask).nonzero()[0], readyChnList) # PRN list channel[0].PRN[activeChnList, currMeasNr] = trackResults[activeChnList].PRN # Pseudoranges ms_vec = (subFrameStart + settings.navSolPeriod * currMeasNr).astype(int) channel[0].rawP[:, currMeasNr] = self.calculatePseudoranges(ms_vec, activeChnList) # Satellite positions and clock corrections satPositions, satClkCorr = satpos(transmitTime, trackResults[activeChnList].PRN, eph, settings) # Receiver position (need >= 4) if activeChnList.size > 3: (xyzdt, channel[0].el[activeChnList, currMeasNr], channel[0].az[activeChnList, currMeasNr], navSolutions[0].DOP[:, currMeasNr]) = leastSquarePos( satPositions, channel[0].rawP[activeChnList, currMeasNr] + satClkCorr * settings.c, settings ) navSolutions[0].X[currMeasNr] = xyzdt[0] navSolutions[0].Y[currMeasNr] = xyzdt[1] navSolutions[0].Z[currMeasNr] = xyzdt[2] navSolutions[0].dt[currMeasNr] = xyzdt[3] satElev = channel[0].el[:, currMeasNr] channel[0].correctedP[activeChnList, currMeasNr] = ( channel[0].rawP[activeChnList, currMeasNr] + satClkCorr * settings.c + navSolutions[0].dt[currMeasNr] ) # ECEF -> Geodetic (navSolutions[0].latitude[currMeasNr], navSolutions[0].longitude[currMeasNr], navSolutions[0].height[currMeasNr]) = cart2geo( navSolutions[0].X[currMeasNr], navSolutions[0].Y[currMeasNr], navSolutions[0].Z[currMeasNr], 4 ) navSolutions[0].utmZone = findUtmZone( navSolutions[0].latitude[currMeasNr], navSolutions[0].longitude[currMeasNr] ) (navSolutions[0].E[currMeasNr], navSolutions[0].N[currMeasNr], navSolutions[0].U[currMeasNr]) = cart2utm( xyzdt[0], xyzdt[1], xyzdt[2], navSolutions[0].utmZone ) else: print(' Measurement No. %d: Not enough information for position solution.' % currMeasNr) navSolutions[0].X[currMeasNr] = np.nan navSolutions[0].Y[currMeasNr] = np.nan navSolutions[0].Z[currMeasNr] = np.nan navSolutions[0].dt[currMeasNr] = np.nan navSolutions[0].DOP[:, currMeasNr] = np.zeros(5) navSolutions[0].latitude[currMeasNr] = np.nan navSolutions[0].longitude[currMeasNr] = np.nan navSolutions[0].height[currMeasNr] = np.nan navSolutions[0].E[currMeasNr] = np.nan navSolutions[0].N[currMeasNr] = np.nan navSolutions[0].U[currMeasNr] = np.nan channel[0].az[activeChnList, currMeasNr] = np.nan * np.ones(activeChnList.shape) channel[0].el[activeChnList, currMeasNr] = np.nan * np.ones(activeChnList.shape) # Update TOW transmitTime += settings.navSolPeriod / 1000.0 self._solutions = navSolutions self._eph = eph return def plot(self): settings = self._settings navSolutions = self._solutions # assert isinstance(navSolutions, np.recarray) import matplotlib as mpl import matplotlib.gridspec as gs import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import Axes3D # noqa: F401 import initialize mpl.rcdefaults() mpl.rc('savefig', bbox='tight', transparent=False, format='png') mpl.rc('axes', grid=True, linewidth=1.5, axisbelow=True) mpl.rc('lines', linewidth=1.5, solid_joinstyle='bevel') mpl.rc('figure', figsize=[8, 6], autolayout=False, dpi=120) # Dùng LaTeX nếu có sẵn, không thì fallback sang mathtext if shutil.which("latex"): mpl.rc('text', usetex=True) else: mpl.rc('text', usetex=False) mpl.rc('mathtext', fontset='cm') # hiển thị công thức toán kiểu Computer Modern mpl.rc('font', family='serif', serif='Computer Modern Roman', size=10) if navSolutions is not None: refCoord = initialize.TruePosition() # Nếu không có tọa độ tham chiếu, dùng trung bình if (settings.truePosition.E is None or settings.truePosition.N is None or settings.truePosition.U is None): refCoord.E = np.nanmean(navSolutions[0].E) refCoord.N = np.nanmean(navSolutions[0].N) refCoord.U = np.nanmean(navSolutions[0].U) meanLongitude = np.nanmean(navSolutions[0].longitude) meanLatitude = np.nanmean(navSolutions[0].latitude) refPointLgText = 'Mean Position' + '\\newline Lat: %.5f $^\\circ$' % meanLatitude + \ '\\newline Lng: %.5f $^\\circ$' % meanLongitude + \ '\\newline Hgt: %+6.1f' % np.nanmean(navSolutions[0].height) else: refPointLgText = 'Reference Position' refCoord.E = settings.truePosition.E refCoord.N = settings.truePosition.N refCoord.U = settings.truePosition.U figureNumber = 300 f = plt.figure(figureNumber) f.clf() f.set_label('Navigation solutions') spec = gs.GridSpec(2, 2) h11 = plt.subplot(spec[0:2]) h31 = plt.subplot(spec[2], projection='3d') h32 = plt.subplot(spec[3], projection='polar') # ENU variations h11.plot(navSolutions[0].E - refCoord.E, navSolutions[0].N - refCoord.N, navSolutions[0].U - refCoord.U, '-') h11.legend(['E', 'N', 'U']) h11.set(title='Coordinates variations in UTM system', xlabel='Measurement period: %i ms' % settings.navSolPeriod, ylabel='Variations (m)') h11.grid(True) h11.axis('tight') # 3D plot h31.plot((navSolutions[0].E - refCoord.E).T, (navSolutions[0].N - refCoord.N).T, (navSolutions[0].U - refCoord.U).T, '+') h31.plot([0], [0], [0], 'r+', lw=1.5, ms=10) h31.set_box_aspect((1, 1, 1)) h31.grid(True) h31.legend(['Measurements', refPointLgText]) h31.set(title='Positions in UTM system (3D plot)', xlabel='East (m)', ylabel='North (m)', zlabel='Upping (m)') # Sky plot az = np.deg2rad(navSolutions[0].channel[0].az.T) el = navSolutions[0].channel[0].el.T h32.plot(az, 90 - el) # PRN labels tại epoch 0 az0 = np.deg2rad(navSolutions[0].channel[0].az[:, 0]) el0 = navSolutions[0].channel[0].el[:, 0] prn0 = navSolutions[0].channel[0].PRN[:, 0] for x, y, s in zip(az0, 90 - el0, prn0): if np.isfinite(x) and np.isfinite(y) and s != 0: h32.text(x, y, str(s)) h32.set_theta_direction(-1) h32.set_theta_zero_location('N') h32.set_xlim([0, 2 * np.pi]) h32.set_xticks(np.linspace(0, 2 * np.pi, 12, endpoint=False)) h32.set_rlabel_position(0) h32.set_ylim([0, 90]) h32.set_yticks([0, 15, 30, 45, 60, 75]) h32.set_yticklabels([90, 75, 60, 45, 30, 15]) h32.set_title('Sky plot (mean PDOP: %f )' % np.nanmean(navSolutions[0].DOP[1, :])) plt.show() else: print('plotNavigation: No navigation data to plot.') @staticmethod # navPartyChk.m def navPartyChk(ndat): # Parity check cho một từ GPS (32 bits: 2 prev + 30 curr) if ndat[1] != 1: ndat[2:26] *= (-1) parity = np.zeros(6) parity[0] = ndat[0] * ndat[2] * ndat[3] * ndat[4] * ndat[6] * \ ndat[7] * ndat[11] * ndat[12] * ndat[13] * ndat[14] * \ ndat[15] * ndat[18] * ndat[19] * ndat[21] * ndat[24] parity[1] = ndat[1] * ndat[3] * ndat[4] * ndat[5] * ndat[7] * \ ndat[8] * ndat[12] * ndat[13] * ndat[14] * ndat[15] * \ ndat[16] * ndat[19] * ndat[20] * ndat[22] * ndat[25] parity[2] = ndat[0] * ndat[2] * ndat[4] * ndat[5] * ndat[6] * \ ndat[8] * ndat[9] * ndat[13] * ndat[14] * ndat[15] * \ ndat[16] * ndat[17] * ndat[20] * ndat[21] * ndat[23] parity[3] = ndat[1] * ndat[3] * ndat[5] * ndat[6] * ndat[7] * \ ndat[9] * ndat[10] * ndat[14] * ndat[15] * ndat[16] * \ ndat[17] * ndat[18] * ndat[21] * ndat[22] * ndat[24] parity[4] = ndat[1] * ndat[2] * ndat[4] * ndat[6] * ndat[7] * \ ndat[8] * ndat[10] * ndat[11] * ndat[15] * ndat[16] * \ ndat[17] * ndat[18] * ndat[19] * ndat[22] * ndat[23] * \ ndat[25] parity[5] = ndat[0] * ndat[4] * ndat[6] * ndat[7] * ndat[9] * \ ndat[10] * ndat[11] * ndat[12] * ndat[14] * ndat[16] * \ ndat[20] * ndat[23] * ndat[24] * ndat[25] if (parity == ndat[26:]).sum() == 6: status = -1 * ndat[1] else: status = 0 return status # ./findPreambles.m def findPreambles(self): assert isinstance(self._results, np.recarray) trackResults = self._results settings = self._settings searchStartOffset = 0 # --- Initialize the firstSubFrame array ----------------------------------- firstSubFrame = np.zeros(settings.numberOfChannels, dtype=int) # --- Generate the preamble pattern ---------------------------------------- preamble_bits = np.r_[1, -1, -1, -1, 1, -1, 1, 1] preamble_ms = np.kron(preamble_bits, np.ones(20)) # upsample 20x # --- Make a list of channels excluding not tracking channels -------------- activeChnList = np.nonzero(trackResults.status.astype('U') != '-')[0] # === For all tracking channels ... for ch in activeChnList.tolist(): bits = trackResults[ch].I_P[searchStartOffset:].copy() # Chuẩn hóa sang {-1, +1} bits = np.where(bits > 0, 1, -1) # Correlate với preamble (pad cho cùng độ dài) tlmXcorrResult = np.correlate( bits, np.pad(preamble_ms, (0, max(0, bits.size - preamble_ms.size)), 'constant'), mode='full' ) tlmXcorrResult = np.nan_to_num(tlmXcorrResult, nan=0.0, posinf=0.0, neginf=0.0) # Tìm các điểm giống preamble xcorrLength = (len(tlmXcorrResult) + 1) // 2 # int start = max(0, xcorrLength - 1) stop = min(2 * xcorrLength, tlmXcorrResult.shape[0]) idx_rel = (np.abs(tlmXcorrResult[start:stop]) > 153).nonzero()[0] index = idx_rel.astype(np.intp) + int(searchStartOffset) found = False for i in range(len(index)): index2 = index - index[i] # Khoảng cách 6000 ms (1 subframe) if (index2 == 6000).any(): s = int(index[i] - 40) # 2 bit prev * 20ms e = int(index[i] + 20 * 60) # 60 bits * 20ms if s < 0 or e > trackResults[ch].I_P.size: continue bits2 = trackResults[ch].I_P[s:e].copy().reshape(20, -1, order='F').sum(0) bits2 = np.where(bits2 > 0, 1, -1) if self.navPartyChk(bits2[:32]) != 0 and self.navPartyChk(bits2[30:62]) != 0: firstSubFrame[ch] = int(index[i]) found = True break if not found: activeChnList = np.setdiff1d(activeChnList, ch) print('Could not find valid preambles in channel %2d !' % ch) return firstSubFrame, activeChnList if __name__ == '__main__': pass