Skip to content

Commit f436b13

Browse files
author
Alex J Lennon
committed
Add CGA network format conversion for UWB MQTT publisher
- Add uwb_network_converter.py module for converting edge list to CGA network format - Add --cga-format and --anchor-config command line options - Instance ID now matches node ID (hex to decimal conversion) - Support for anchor point configuration via JSON file - Maintains backward compatibility with simple edge list format
1 parent c91f290 commit f436b13

File tree

2 files changed

+231
-3
lines changed

2 files changed

+231
-3
lines changed

recipes-connectivity/uwb-mqtt-publisher/files/mqtt-live-publisher.py

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import json
2929
import ssl
3030
import sys
31+
import os
3132

3233
# MQTT imports
3334
try:
@@ -36,6 +37,26 @@
3637
print("Error: paho-mqtt library not found. Install with: pip install paho-mqtt")
3738
sys.exit(1)
3839

40+
# UWB Network Converter import
41+
# This module converts edge list format to CGA network format
42+
# See uwb_network_converter.py for implementation details (Jen's review)
43+
try:
44+
# Import from same directory
45+
script_dir = os.path.dirname(os.path.abspath(__file__))
46+
converter_path = os.path.join(script_dir, 'uwb_network_converter.py')
47+
if os.path.exists(converter_path):
48+
import importlib.util
49+
spec = importlib.util.spec_from_file_location("uwb_network_converter", converter_path)
50+
uwb_network_converter = importlib.util.module_from_spec(spec)
51+
spec.loader.exec_module(uwb_network_converter)
52+
UwbNetworkConverter = uwb_network_converter.UwbNetworkConverter
53+
else:
54+
UwbNetworkConverter = None
55+
print("[WARNING] uwb_network_converter.py not found - CGA format will be unavailable")
56+
except Exception as e:
57+
UwbNetworkConverter = None
58+
print(f"[WARNING] Failed to load UWB network converter: {e}")
59+
3960
parser = argparse.ArgumentParser(description='Sketch flash loader with MQTT publishing')
4061
parser.add_argument("uart", help="uart port to use", type=str, default="/dev/ttyUSB0", nargs='?')
4162
parser.add_argument("nodes", help="node lists",type=str, default="[]", nargs='?')
@@ -46,13 +67,23 @@
4667
parser.add_argument("--disable-mqtt", help="Disable MQTT publishing", action="store_true")
4768
parser.add_argument("--verbose", help="Enable verbose logging", action="store_true")
4869
parser.add_argument("--quiet", help="Enable quiet mode (minimal logging)", action="store_true")
70+
parser.add_argument("--cga-format", help="Publish in CGA network format instead of simple edge list", action="store_true")
71+
parser.add_argument("--anchor-config", help="Path to JSON config file with anchor point coordinates", type=str, default=None)
4972

5073
args = parser.parse_args()
5174

5275
# MQTT globals
5376
mqtt_client = None
5477
last_publish_time = 0
5578

79+
# UWB Network Converter instance (for CGA format)
80+
uwb_converter = None
81+
if args.cga_format:
82+
if UwbNetworkConverter is None:
83+
print("[ERROR] --cga-format requires uwb_network_converter.py module")
84+
sys.exit(1)
85+
uwb_converter = UwbNetworkConverter(anchor_config_path=args.anchor_config)
86+
5687
# Error tracking globals
5788
parsing_error_count = 0
5889
MAX_PARSING_ERRORS = 3
@@ -237,7 +268,6 @@ def publish_to_mqtt(data):
237268
rate_limit = current_rate_limit
238269

239270
if time_since_last < rate_limit:
240-
log_info(f"MQTT publish rate limited - waiting {rate_limit - time_since_last:.1f}s more")
241271
return
242272

243273
if not mqtt_client.is_connected():
@@ -403,6 +433,9 @@ def parse_final(assignments, final_payload, mode=0):
403433
def print_list(results):
404434
if len(results) == 0:
405435
return
436+
437+
# Get current timestamp for CGA format
438+
current_timestamp = time.time()
406439

407440
# Format data for both display and MQTT publishing
408441
formatted_data = []
@@ -424,8 +457,19 @@ def print_list(results):
424457
# Only show parsed data in verbose mode
425458
log_verbose(f"Parsed data: {row}")
426459

427-
# Publish to MQTT
428-
publish_to_mqtt(formatted_data)
460+
# Convert to CGA format if requested
461+
if args.cga_format and uwb_converter is not None:
462+
try:
463+
network_data = uwb_converter.convert_edges_to_network(formatted_data, timestamp=current_timestamp)
464+
log_verbose("Converted to CGA network format")
465+
publish_to_mqtt(network_data)
466+
except Exception as e:
467+
log_error(f"Failed to convert to CGA format: {e}")
468+
log_info("Falling back to simple edge list format")
469+
publish_to_mqtt(formatted_data)
470+
else:
471+
# Publish in simple edge list format (default)
472+
publish_to_mqtt(formatted_data)
429473

430474
def print_matrix(assignments, results):
431475
if not args.verbose:
@@ -472,6 +516,14 @@ def print_matrix(assignments, results):
472516
log_start(f"MQTT topic: {args.mqtt_topic}")
473517
log_start(f"MQTT command topic: {args.mqtt_topic}/cmd")
474518
log_start(f"Initial rate limit: {args.mqtt_rate_limit}s")
519+
if args.cga_format:
520+
log_start("CGA network format: ENABLED")
521+
if args.anchor_config:
522+
log_start(f"Anchor config: {args.anchor_config}")
523+
else:
524+
log_start("Anchor config: None (no anchor points configured)")
525+
else:
526+
log_start("Output format: Simple edge list")
475527
if args.quiet:
476528
log_start("Quiet mode enabled - minimal logging")
477529
elif args.verbose:
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
#!/usr/bin/env python3
2+
"""
3+
UWB Network Converter - Converts edge list format to CGA network format
4+
5+
This module provides functionality to convert UWB distance measurements
6+
from edge list format to the network format required by CGA systems.
7+
8+
Author: Dynamic Devices Ltd. 2025
9+
Reviewer: Jen
10+
"""
11+
12+
import json
13+
import time
14+
import os
15+
16+
17+
class UwbNetworkConverter:
18+
"""
19+
Converts UWB edge list data to CGA network format.
20+
21+
This class handles the conversion of UWB distance measurements from
22+
a simple edge list format to a structured network format with anchor
23+
point positioning support.
24+
25+
Example usage:
26+
converter = UwbNetworkConverter(anchor_config_path="anchors.json")
27+
network_json = converter.convert_edges_to_network(edge_list)
28+
"""
29+
30+
def __init__(self, anchor_config_path=None):
31+
"""
32+
Initialize the converter with anchor point configuration.
33+
34+
Args:
35+
anchor_config_path (str, optional): Path to JSON config file with anchor points.
36+
Format: {"anchors": [{"id": "B5A4", "lat": 53.48..., "lon": -2.19..., "alt": 0}, ...]}
37+
If None, no anchor points will be configured.
38+
"""
39+
self.anchor_config_path = anchor_config_path
40+
self.anchor_map = {} # Maps anchor ID to [lat, lon, alt]
41+
self.instance_id_base = 1000 # Fallback base if node ID is not valid hex
42+
43+
if anchor_config_path and os.path.exists(anchor_config_path):
44+
self._load_anchor_config()
45+
elif anchor_config_path:
46+
print(f"[WARNING] Anchor config file not found: {anchor_config_path}")
47+
48+
def _load_anchor_config(self):
49+
"""Load anchor point configuration from JSON file."""
50+
try:
51+
with open(self.anchor_config_path, 'r') as f:
52+
config = json.load(f)
53+
54+
if 'anchors' not in config:
55+
print(f"[WARNING] No 'anchors' key found in config file")
56+
return
57+
58+
for anchor in config['anchors']:
59+
if 'id' not in anchor or 'lat' not in anchor or 'lon' not in anchor:
60+
print(f"[WARNING] Invalid anchor entry: {anchor}")
61+
continue
62+
63+
anchor_id = anchor['id']
64+
lat = float(anchor['lat'])
65+
lon = float(anchor['lon'])
66+
alt = float(anchor.get('alt', 0.0))
67+
68+
self.anchor_map[anchor_id] = [lat, lon, alt]
69+
70+
print(f"[INFO] Loaded {len(self.anchor_map)} anchor points from config")
71+
72+
except json.JSONDecodeError as e:
73+
print(f"[ERROR] Failed to parse anchor config JSON: {e}")
74+
except Exception as e:
75+
print(f"[ERROR] Failed to load anchor config: {e}")
76+
77+
def convert_edges_to_network(self, edge_list, timestamp=None):
78+
"""
79+
Convert edge list to CGA network format.
80+
81+
Args:
82+
edge_list (list): List of edges in format [["B5A4", "B57A", 1.726], ...]
83+
timestamp (float, optional): Unix timestamp for lastPositionUpdateTime.
84+
If None, uses current time.
85+
86+
Returns:
87+
dict: Network object with "uwbs" array in CGA format
88+
"""
89+
if timestamp is None:
90+
timestamp = time.time()
91+
92+
# Extract unique UWB IDs from edges
93+
uwb_ids = set()
94+
for edge in edge_list:
95+
if len(edge) >= 2:
96+
uwb_ids.add(edge[0])
97+
uwb_ids.add(edge[1])
98+
99+
# Create UWB objects with default values
100+
uwbs = []
101+
sorted_ids = sorted(uwb_ids)
102+
103+
for idx, uwb_id in enumerate(sorted_ids):
104+
# Check if this UWB is an anchor (has known position)
105+
is_anchor = uwb_id in self.anchor_map
106+
anchor_position = self.anchor_map.get(uwb_id, [0.0, 0.0, 0.0])
107+
108+
# Convert hex node ID to decimal for instance ID
109+
# Node IDs are hex strings like "B5A4" -> convert to decimal (46500)
110+
try:
111+
instance_id = int(uwb_id, 16)
112+
except (ValueError, TypeError):
113+
# Fallback if node ID is not valid hex
114+
instance_id = self.instance_id_base + idx
115+
116+
uwb = {
117+
"id": uwb_id,
118+
"triageStatus": 0, # unknown/not triaged
119+
"position": {"x": 0.0, "y": 0.0, "z": 0.0},
120+
"latLonAlt": anchor_position if is_anchor else [0.0, 0.0, 0.0],
121+
"positionKnown": is_anchor,
122+
"lastPositionUpdateTime": timestamp,
123+
"edges": [],
124+
"positionAccuracy": 0.0,
125+
"go": {
126+
"instanceID": instance_id
127+
}
128+
}
129+
uwbs.append(uwb)
130+
131+
# Create a map for quick lookup
132+
uwb_map = {uwb["id"]: uwb for uwb in uwbs}
133+
134+
# Populate edges (bidirectional - add to both nodes)
135+
for edge in edge_list:
136+
if len(edge) < 3:
137+
continue
138+
139+
end0_id = edge[0]
140+
end1_id = edge[1]
141+
distance = float(edge[2])
142+
143+
edge_obj = {
144+
"end0": end0_id,
145+
"end1": end1_id,
146+
"distance": distance
147+
}
148+
149+
# Add edge to both UWBs (bidirectional)
150+
if end0_id in uwb_map:
151+
uwb_map[end0_id]["edges"].append(edge_obj)
152+
if end1_id in uwb_map:
153+
uwb_map[end1_id]["edges"].append(edge_obj)
154+
155+
# Create Network object
156+
network = {
157+
"uwbs": uwbs
158+
}
159+
160+
return network
161+
162+
def convert_edges_to_network_json(self, edge_list, timestamp=None):
163+
"""
164+
Convert edge list to CGA network format and return as JSON string.
165+
166+
Args:
167+
edge_list (list): List of edges in format [["B5A4", "B57A", 1.726], ...]
168+
timestamp (float, optional): Unix timestamp for lastPositionUpdateTime.
169+
If None, uses current time.
170+
171+
Returns:
172+
str: JSON string representation of network object
173+
"""
174+
network = self.convert_edges_to_network(edge_list, timestamp)
175+
return json.dumps(network)
176+

0 commit comments

Comments
 (0)