Skip to content

Commit d2dbb4e

Browse files
author
Alex J Lennon
committed
Update local files with LoRa cache support and latest changes
1 parent f5b12b3 commit d2dbb4e

File tree

5 files changed

+403
-14
lines changed

5 files changed

+403
-14
lines changed
Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
#!/usr/bin/env python3
2+
"""
3+
LoRa Tag Data Cache
4+
Subscribes to TTN MQTT broker and caches LoRa tag data (battery, temperature, GPS, etc.)
5+
Maps dev_eui to UWB IDs using configuration file
6+
7+
Copyright (c) Dynamic Devices Ltd. 2025. All rights reserved.
8+
"""
9+
10+
import json
11+
import ssl
12+
import threading
13+
import time
14+
from datetime import datetime
15+
from typing import Dict, Optional, Any
16+
17+
try:
18+
import paho.mqtt.client as mqtt
19+
except ImportError as e:
20+
print("Error: paho-mqtt library not found. Install with: pip install paho-mqtt")
21+
raise
22+
23+
24+
class LoraTagDataCache:
25+
"""
26+
Subscribes to TTN MQTT broker and caches LoRa tag data.
27+
Provides thread-safe access to cached data mapped by UWB ID.
28+
"""
29+
30+
def __init__(self,
31+
broker: str = "eu1.cloud.thethings.network",
32+
port: int = 8883,
33+
username: str = None,
34+
password: str = None,
35+
topic_pattern: str = "#",
36+
dev_eui_to_uwb_id_map: Dict[str, str] = None,
37+
verbose: bool = False):
38+
"""
39+
Initialize the LoRa tag data cache.
40+
41+
Args:
42+
broker: MQTT broker hostname
43+
port: MQTT broker port
44+
username: MQTT username
45+
password: MQTT password
46+
topic_pattern: MQTT topic pattern to subscribe to (default: "#" for all topics)
47+
dev_eui_to_uwb_id_map: Dictionary mapping dev_eui (hex string) to UWB ID (hex string)
48+
verbose: Enable verbose logging
49+
"""
50+
self.broker = broker
51+
self.port = port
52+
self.username = username
53+
self.password = password
54+
self.topic_pattern = topic_pattern
55+
self.dev_eui_to_uwb_id_map = dev_eui_to_uwb_id_map or {}
56+
self.verbose = verbose
57+
58+
# Cache: dev_eui -> latest data
59+
self._cache: Dict[str, Dict[str, Any]] = {}
60+
self._cache_lock = threading.RLock()
61+
62+
# Cache: UWB ID -> latest data (derived from dev_eui mapping)
63+
self._uwb_cache: Dict[str, Dict[str, Any]] = {}
64+
65+
# MQTT client
66+
self.mqtt_client = None
67+
self._running = False
68+
self._thread = None
69+
70+
def _log(self, message: str, level: str = "INFO"):
71+
"""Internal logging method"""
72+
if self.verbose or level in ["ERROR", "WARNING"]:
73+
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
74+
print(f"[{timestamp}] [LoRaCache {level}] {message}")
75+
76+
def _on_connect(self, client, userdata, flags, rc):
77+
"""MQTT connection callback"""
78+
if rc == 0:
79+
self._log(f"Connected to TTN MQTT broker {self.broker}:{self.port}")
80+
try:
81+
client.subscribe(self.topic_pattern, qos=0)
82+
self._log(f"Subscribed to topic pattern: {self.topic_pattern}")
83+
except Exception as e:
84+
self._log(f"Failed to subscribe: {e}", "ERROR")
85+
else:
86+
self._log(f"Failed to connect to MQTT broker (rc={rc})", "ERROR")
87+
88+
def _on_disconnect(self, client, userdata, rc):
89+
"""MQTT disconnection callback"""
90+
if rc != 0:
91+
self._log(f"Unexpected disconnection from MQTT broker (rc={rc})", "WARNING")
92+
else:
93+
self._log("Disconnected from MQTT broker")
94+
95+
def _on_message(self, client, userdata, message):
96+
"""MQTT message callback - processes LoRa tag data"""
97+
try:
98+
topic = message.topic
99+
payload = message.payload.decode('utf-8')
100+
101+
self._log(f"Received message on topic: {topic}", "VERBOSE")
102+
103+
# Parse JSON payload
104+
data = json.loads(payload)
105+
106+
# Extract dev_eui
107+
dev_eui = None
108+
try:
109+
dev_eui = data.get("end_device_ids", {}).get("dev_eui", "").upper()
110+
except (AttributeError, KeyError):
111+
self._log(f"Could not extract dev_eui from message", "WARNING")
112+
return
113+
114+
if not dev_eui:
115+
self._log(f"No dev_eui found in message", "WARNING")
116+
return
117+
118+
# Extract decoded payload
119+
decoded_payload = {}
120+
try:
121+
uplink_msg = data.get("uplink_message", {})
122+
decoded_payload = uplink_msg.get("decoded_payload", {})
123+
except (AttributeError, KeyError):
124+
pass
125+
126+
# Extract location data
127+
location_data = {}
128+
try:
129+
locations = uplink_msg.get("locations", {})
130+
if "frm-payload" in locations:
131+
loc = locations["frm-payload"]
132+
location_data = {
133+
"latitude": loc.get("latitude"),
134+
"longitude": loc.get("longitude"),
135+
"altitude": loc.get("altitude"),
136+
"accuracy": loc.get("accuracy"),
137+
"source": loc.get("source")
138+
}
139+
except (AttributeError, KeyError):
140+
pass
141+
142+
# Extract metadata
143+
metadata = {
144+
"received_at": data.get("received_at"),
145+
"device_id": data.get("end_device_ids", {}).get("device_id"),
146+
"application_id": data.get("end_device_ids", {}).get("application_ids", {}).get("application_id"),
147+
"f_port": uplink_msg.get("f_port"),
148+
"f_cnt": uplink_msg.get("f_cnt"),
149+
}
150+
151+
# Extract RX metadata (gateway info, RSSI, SNR)
152+
rx_metadata = []
153+
try:
154+
rx_list = uplink_msg.get("rx_metadata", [])
155+
for rx in rx_list:
156+
rx_metadata.append({
157+
"gateway_id": rx.get("gateway_ids", {}).get("gateway_id"),
158+
"gateway_eui": rx.get("gateway_ids", {}).get("eui"),
159+
"rssi": rx.get("rssi"),
160+
"snr": rx.get("snr"),
161+
"timestamp": rx.get("timestamp"),
162+
"time": rx.get("time")
163+
})
164+
except (AttributeError, KeyError):
165+
pass
166+
167+
# Build cached data structure
168+
cached_data = {
169+
"dev_eui": dev_eui,
170+
"timestamp": time.time(),
171+
"received_at": metadata.get("received_at"),
172+
"decoded_payload": decoded_payload,
173+
"location": location_data,
174+
"metadata": metadata,
175+
"rx_metadata": rx_metadata
176+
}
177+
178+
# Update cache
179+
with self._cache_lock:
180+
self._cache[dev_eui] = cached_data
181+
182+
# Update UWB cache if mapping exists
183+
uwb_id = self.dev_eui_to_uwb_id_map.get(dev_eui)
184+
if uwb_id:
185+
# Normalize UWB ID to uppercase hex string
186+
uwb_id = uwb_id.upper()
187+
self._uwb_cache[uwb_id] = cached_data
188+
self._log(f"Cached data for dev_eui={dev_eui} -> UWB ID={uwb_id}", "VERBOSE")
189+
else:
190+
self._log(f"No UWB mapping for dev_eui={dev_eui}", "VERBOSE")
191+
192+
except json.JSONDecodeError as e:
193+
self._log(f"Failed to parse JSON payload: {e}", "ERROR")
194+
except Exception as e:
195+
self._log(f"Error processing message: {e}", "ERROR")
196+
if self.verbose:
197+
import traceback
198+
traceback.print_exc()
199+
200+
def start(self):
201+
"""Start the MQTT subscriber in a background thread"""
202+
if self._running:
203+
self._log("Cache already running", "WARNING")
204+
return
205+
206+
self._running = True
207+
self._thread = threading.Thread(target=self._run, daemon=True)
208+
self._thread.start()
209+
self._log("LoRa tag cache started")
210+
211+
def stop(self):
212+
"""Stop the MQTT subscriber"""
213+
self._running = False
214+
if self.mqtt_client:
215+
self.mqtt_client.loop_stop()
216+
self.mqtt_client.disconnect()
217+
if self._thread:
218+
self._thread.join(timeout=5)
219+
self._log("LoRa tag cache stopped")
220+
221+
def _run(self):
222+
"""Run the MQTT client loop"""
223+
try:
224+
self.mqtt_client = mqtt.Client()
225+
self.mqtt_client.on_connect = self._on_connect
226+
self.mqtt_client.on_disconnect = self._on_disconnect
227+
self.mqtt_client.on_message = self._on_message
228+
229+
# Set credentials if provided
230+
if self.username and self.password:
231+
self.mqtt_client.username_pw_set(self.username, self.password)
232+
233+
# Configure TLS
234+
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
235+
context.check_hostname = False
236+
context.verify_mode = ssl.CERT_NONE
237+
self.mqtt_client.tls_set_context(context)
238+
239+
# Connect
240+
self.mqtt_client.connect(self.broker, self.port, 60)
241+
242+
# Run loop
243+
self.mqtt_client.loop_start()
244+
245+
# Keep thread alive
246+
while self._running:
247+
time.sleep(1)
248+
249+
except Exception as e:
250+
self._log(f"Error in MQTT loop: {e}", "ERROR")
251+
if self.verbose:
252+
import traceback
253+
traceback.print_exc()
254+
finally:
255+
if self.mqtt_client:
256+
self.mqtt_client.loop_stop()
257+
self.mqtt_client.disconnect()
258+
259+
def get_by_dev_eui(self, dev_eui: str) -> Optional[Dict[str, Any]]:
260+
"""
261+
Get cached data by dev_eui.
262+
263+
Args:
264+
dev_eui: Device EUI (hex string, case-insensitive)
265+
266+
Returns:
267+
Cached data dictionary or None if not found
268+
"""
269+
dev_eui = dev_eui.upper()
270+
with self._cache_lock:
271+
return self._cache.get(dev_eui)
272+
273+
def get_by_uwb_id(self, uwb_id: str) -> Optional[Dict[str, Any]]:
274+
"""
275+
Get cached data by UWB ID (using dev_eui mapping).
276+
277+
Args:
278+
uwb_id: UWB ID (hex string, case-insensitive)
279+
280+
Returns:
281+
Cached data dictionary or None if not found
282+
"""
283+
uwb_id = uwb_id.upper()
284+
with self._cache_lock:
285+
return self._uwb_cache.get(uwb_id)
286+
287+
def get_all_cached(self) -> Dict[str, Dict[str, Any]]:
288+
"""
289+
Get all cached data by UWB ID.
290+
291+
Returns:
292+
Dictionary mapping UWB ID to cached data
293+
"""
294+
with self._cache_lock:
295+
return self._uwb_cache.copy()
296+
297+
def get_cache_stats(self) -> Dict[str, Any]:
298+
"""
299+
Get cache statistics.
300+
301+
Returns:
302+
Dictionary with cache statistics
303+
"""
304+
with self._cache_lock:
305+
return {
306+
"dev_eui_count": len(self._cache),
307+
"uwb_id_count": len(self._uwb_cache),
308+
"mapping_count": len(self.dev_eui_to_uwb_id_map),
309+
"dev_euis": list(self._cache.keys()),
310+
"uwb_ids": list(self._uwb_cache.keys())
311+
}
312+

0 commit comments

Comments
 (0)