Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 77 additions & 116 deletions python/adsb/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def decode_packet(self, pdu):
meta = pmt.to_python(pmt.car(pdu))
vector = pmt.to_python(pmt.cdr(pdu))
self.timestamp = meta["timestamp"]
self.datetime = datetime.datetime.utcfromtimestamp(self.timestamp).strftime("%Y-%m-%d %H:%M:%S.%f UTC")
self.datetime = datetime.datetime.fromtimestamp(self.timestamp, datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f UTC")
self.snr = meta["snr"]
self.bits = vector

Expand All @@ -352,8 +352,6 @@ def decode_packet(self, pdu):


def reset(self):
self.aa_bits = []
self.aa = -1
self.aa_str = ""
self.df = -1
self.payload_length = -1
Expand All @@ -371,43 +369,9 @@ def get_direction(self, heading):
`heading = 180/-180` Westbound
`heading = -90` Southbound
"""
if heading < 0:
heading += 360.0

quad = np.floor(heading/90.0)
residual_angle = heading - quad*90.0
sub_quad = np.floor((residual_angle+22.5)/45.0)

if quad == 0:
if sub_quad == 0:
dir_str = "E"
elif sub_quad == 1:
dir_str = "NE"
else:
dir_str = "N"
elif quad == 1:
if sub_quad == 0:
dir_str = "N"
elif sub_quad == 1:
dir_str = "NW"
else:
dir_str = "W"
elif quad == 2:
if sub_quad == 0:
dir_str = "W"
elif sub_quad == 1:
dir_str = "SW"
else:
dir_str = "S"
else:
if sub_quad == 0:
dir_str = "S"
elif sub_quad == 1:
dir_str = "SE"
else:
dir_str = "E"

return dir_str
heading = (float(heading) + 45/2) % 360
quad = int(heading / 45)
return ("E", "NE", "N", "NW", "W", "SW", "S", "SE")[quad]


def update_plane(self, aa_str):
Expand Down Expand Up @@ -440,61 +404,60 @@ def update_plane(self, aa_str):


def reset_plane_altimetry(self, plane):
plane["altitude"] = np.NaN
plane["speed"] = np.NaN
plane["heading"] = np.NaN
plane["vertical_rate"] = np.NaN
plane["latitude"] = np.NaN
plane["longitude"] = np.NaN
plane["cpr"] = [(np.NaN, np.NaN, np.NaN), (np.NaN, np.NaN, np.NaN)]
plane["altitude"] = np.nan
plane["speed"] = np.nan
plane["heading"] = np.nan
plane["vertical_rate"] = np.nan
plane["latitude"] = np.nan
plane["longitude"] = np.nan
plane["cpr"] = [(np.nan, np.nan, np.nan), (np.nan, np.nan, np.nan)]


def print_planes(self):
index = 0
for icao in self.plane_dict:
last_seen = datetime.datetime.utcfromtimestamp(self.timestamp).strftime("%H:%M:%S")
for index, icao in enumerate(self.plane_dict, start=2):
last_seen = datetime.datetime.fromtimestamp(self.plane_dict[icao]["last_seen"], datetime.timezone.utc).strftime("%H:%M:%S")

if self.plane_dict[icao]["callsign"] is not None:
callsign = "{:8s}".format(self.plane_dict[icao]["callsign"])
callsign = f"{self.plane_dict[icao]["callsign"]:8s}"
else:
callsign = " "*8

if np.isnan(self.plane_dict[icao]["altitude"]) == False:
altitude = "{:5.0f}".format(self.plane_dict[icao]["altitude"])
else:
if np.isnan(self.plane_dict[icao]["altitude"]):
altitude = " "*5

if np.isnan(self.plane_dict[icao]["vertical_rate"]) == False:
vertical_rate = "{:5.0f}".format(self.plane_dict[icao]["vertical_rate"])
else:
vertical_rate = " "*5
altitude = f'{self.plane_dict[icao]["altitude"]:5.0f}'

if np.isnan(self.plane_dict[icao]["speed"]) == False:
speed = "{:5.0f}".format(self.plane_dict[icao]["speed"])
if np.isnan(self.plane_dict[icao]["vertical_rate"]):
vertical_rate = " "*5
else:
speed = " "*5
vertical_rate = f'{self.plane_dict[icao]["vertical_rate"]:5.0f}'

if np.isnan(self.plane_dict[icao]["heading"]) == False:
heading = "{:5.0f}".format(self.plane_dict[icao]["heading"])
if np.isnan(self.plane_dict[icao]["speed"]):
speed = " "*5
else:
heading = " "*5
speed = f'{self.plane_dict[icao]["speed"]:5.0f}'

if np.isnan(self.plane_dict[icao]["latitude"]) == False:
latitude = "{:11.7f}".format(self.plane_dict[icao]["latitude"])
if np.isnan(self.plane_dict[icao]["heading"]):
heading = " "*5
else:
latitude = " "*11
heading = f'{self.plane_dict[icao]["heading"]:5.0f}'

if np.isnan(self.plane_dict[icao]["longitude"]) == False:
longitude = "{:11.7f}".format(self.plane_dict[icao]["longitude"])
if np.isnan(self.plane_dict[icao]["latitude"]):
latitude = " "*11
else:
latitude = f'{self.plane_dict[icao]["latitude"]:11.7f}'

if np.isnan(self.plane_dict[icao]["longitude"]):
longitude = " "*11
else:
longitude = f'{self.plane_dict[icao]["longitude"]:11.7f}'

num_msgs = "{:4d}".format(self.plane_dict[icao]["num_msgs"])
age = "{:3.0f}".format(int(time.time()) - self.plane_dict[icao]["last_seen"])
num_msgs = f'{self.plane_dict[icao]["num_msgs"]:4d}'
age = f'{int(time.time()) - self.plane_dict[icao]["last_seen"]:3.0f}'

self.screen.addstr(2 + index, 0, "{:8s} {:6s} {} {} {} {} {} {} {} {}".format(
last_seen,
icao,
self.screen.addstr(index, 0, " ".join((
last_seen[:8],
icao[:6],
callsign,
altitude,
vertical_rate,
Expand All @@ -503,10 +466,8 @@ def print_planes(self):
latitude,
longitude,
num_msgs
))
self.screen.refresh()

index += 1
)))
self.screen.refresh()


def publish_decoded_pdu(self, aa_str):
Expand Down Expand Up @@ -585,9 +546,9 @@ def check_parity(self):

# XOR the computed CRC with the AP, the result should be the
# interrogated plane's ICAO address
self.aa_bits = crc_bits ^ ap_bits
self.aa = self.bin2dec(self.aa_bits)
self.aa_str = "{:06x}".format(self.aa)
aa_bits = crc_bits ^ ap_bits
aa = self.bin2dec(aa_bits)
self.aa_str = f"{aa:06x}"

# If the ICAO address is in our plane dictionary,
# then it's safe to assume the CRC passes
Expand All @@ -603,7 +564,7 @@ def check_parity(self):
self.log("info", "Address Announced (AA)", self.aa_str)
return 0

elif self.df in [11]:
elif self.df == 11:
# 56 bit payload
self.payload_length = 56

Expand Down Expand Up @@ -646,9 +607,9 @@ def check_parity(self):

# XOR the computed CRC with the AP, the result should be the
# interrogated plane's ICAO address
self.aa_bits = crc_bits ^ ap_bits
self.aa = self.bin2dec(self.aa_bits)
self.aa_str = "{:06x}".format(self.aa)
aa_bits = crc_bits ^ ap_bits
aa = self.bin2dec(aa_bits)
self.aa_str = "{:06x}".format(aa)

# If the ICAO address is in our plane dictionary,
# then it's safe to assume the CRC passes
Expand Down Expand Up @@ -884,9 +845,9 @@ def decode_message(self):
ca = self.bin2dec(self.bits[5:5+3])

# Address Announced (ICAO Address) 24 bits
self.aa_bits = self.bits[8:8+24]
self.aa = self.bin2dec(self.aa_bits)
self.aa_str = "{:06x}".format(self.aa)
aa_bits = self.bits[8:8+24]
aa = self.bin2dec(aa_bits)
self.aa_str = "{:06x}".format(aa)

# Update planes dictionary
self.update_plane(self.aa_str)
Expand All @@ -903,9 +864,9 @@ def decode_message(self):
self.log("info", "Capability (CA)", ca, subvalue=CA_STR_LUT[ca])

# Address Announced (ICAO Address) 24 bits
self.aa_bits = self.bits[8:8+24]
self.aa = self.bin2dec(self.aa_bits)
self.aa_str = "{:06x}".format(self.aa)
aa_bits = self.bits[8:8+24]
aa = self.bin2dec(aa_bits)
self.aa_str = "{:06x}".format(aa)
self.log("info", "Address Announced (AA)", self.aa_str)
self.log("info", "Callsign", self.plane_dict.get(self.aa_str, {}).get("callsign", ""))

Expand All @@ -919,9 +880,9 @@ def decode_message(self):
self.log("info", "CF", cf, CF_STR_LUT[cf])

# Address Announced (ICAO Address) 24 bits
self.aa_bits = self.bits[8:8+24]
self.aa = self.bin2dec(self.aa_bits)
self.aa_str = "{:06x}".format(self.aa)
aa_bits = self.bits[8:8+24]
aa = self.bin2dec(aa_bits)
self.aa_str = "{:06x}".format(aa)
self.log("info", "Address Announced (AA)", self.aa_str)
self.log("info", "Callsign", self.plane_dict.get(self.aa_str, {}).get("callsign", ""))

Expand All @@ -933,9 +894,9 @@ def decode_message(self):
self.decode_me()
elif cf in [2,3,5]:
self.decode_tisb_me()
elif cf in [4]:
elif cf == 4:
self.log("debug", "TIS-B and ADS-B Management Message", "To be implemented")
elif cf in [6]:
elif cf == 6:
self.log("debug", "ADS-B Message Rebroadcast", "To be implemented")

# Military Extended Squitter
Expand All @@ -945,14 +906,14 @@ def decode_message(self):
self.log("info", "Application Field (AF)", af, AF_STR_LUT[af])

# Address Announced (ICAO Address) 24 bits
self.aa_bits = self.bits[8:8+24]
self.aa = self.bin2dec(self.aa_bits)
self.aa_str = "{:06x}".format(self.aa)
aa_bits = self.bits[8:8+24]
aa = self.bin2dec(aa_bits)
self.aa_str = "{:06x}".format(aa)
self.log("info", "Address Announced (AA)", self.aa_str)
self.log("info", "Callsign", self.plane_dict.get(self.aa_str, {}).get("callsign", ""))
self.log("debug", "DF={} AF={}".format(self.df, af), "Spotted in the wild!")

if af in [0]:
if af == 0:
self.decode_me()
elif af in [1,2,3,4,5,6,7]:
self.log("debug", "AF={}".format(af), "Reserved for Military Use")
Expand Down Expand Up @@ -1066,12 +1027,12 @@ def decode_me(self):
self.log("info", "Type Code (TC)", tc, TC_STR_LUT[tc])

## Airborne/Surface Position ###
if tc in [0]:
if tc == 0:
# Message, 3 bits
me = self.bits[0:self.payload_length]

### Aircraft Identification ###
elif tc in range(1,5):
elif 1 <= tc < 5:
# Grab callsign using character LUT
callsign = ""

Expand All @@ -1091,12 +1052,12 @@ def decode_me(self):
# print("Callsign: {}".format(callsign))

### Surface Position ###
elif tc in range(5,9):
elif tc < 9:
self.log("debug", "TC", tc, "To be implemented")
self.publish_unknown_pdu()

### Airborne Position (Baro Altitude) ###
elif tc in range(9,19):
elif tc < 19:
# Surveillance Status, 2 bits
ss = self.bin2dec(self.bits[37:37+2])

Expand Down Expand Up @@ -1154,7 +1115,7 @@ def decode_me(self):


### Airborne Velocities ###
elif tc in [19]:
elif tc == 19:
# Sub Type, 3 bits
st = self.bin2dec(self.bits[37:37+3])

Expand Down Expand Up @@ -1256,42 +1217,42 @@ def decode_me(self):
self.log("debug", "DF={} TC={} ST={}".format(self.df, tc, self.st), "To be implemented")

### Airborne Position (GNSS Height) ###
elif tc in range(20,23):
elif tc < 23:
self.log("debug", "TC", tc, "To be implemented")
self.publish_unknown_pdu()

### Test Message ###
elif tc in [23]:
elif tc == 23:
self.log("debug", "TC", tc, "To be implemented")
self.publish_unknown_pdu()

### Surface System Status ###
elif tc in [24]:
elif tc == 24:
self.log("debug", "TC", tc, "To be implemented")
self.publish_unknown_pdu()

### Reserved ###
elif tc in range(25,28):
elif 25 <= tc < 28:
self.log("debug", "TC", tc, "To be implemented")
self.publish_unknown_pdu()

### Extended Squitter A/C Status ###
elif tc in [28]:
elif tc == 28:
self.log("debug", "TC", tc, "To be implemented")
self.publish_unknown_pdu()

### Target State and Status (V.2) ###
elif tc in [29]:
elif tc == 29:
self.log("debug", "TC", tc, "To be implemented")
self.publish_unknown_pdu()

### Reserved ###
elif tc in [30]:
elif tc == 30:
self.log("debug", "TC", tc, "To be implemented")
self.publish_unknown_pdu()

### Aircraft Operation Status ###
elif tc in [31]:
elif tc == 31:
self.log("debug", "TC", tc, "To be implemented")
self.publish_unknown_pdu()

Expand All @@ -1309,8 +1270,8 @@ def decode_tisb_me(self):
def calculate_lat_lon(self, cpr):
# If the even and odd frame data is still valid, calculate the
# latitude and longitude
lat_dec = np.NaN
lon_dec = np.NaN
lat_dec = np.nan
lon_dec = np.nan

if (int(time.time()) - cpr[0][2]) < CPR_TIMEOUT_S and (int(time.time()) - cpr[1][2]) < CPR_TIMEOUT_S:
# Get fractional lat/lon for the even and odd frame
Expand Down Expand Up @@ -1374,7 +1335,7 @@ def calculate_lat_lon(self, cpr):
def cpr_n(self, lat, frame):
# frame = 0, even frame
# frame = 1, odd frame
n = self.cpr_nl(lat) - frame;
n = self.cpr_nl(lat) - frame

if n > 1:
return n
Expand Down
Loading