-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathws_server.py
More file actions
2490 lines (2125 loc) · 100 KB
/
ws_server.py
File metadata and controls
2490 lines (2125 loc) · 100 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
Freenet Telemetry WebSocket Server
Tails the telemetry log file and pushes events to connected clients in real-time.
Also tracks peer connections to build network topology.
Supports time-travel by buffering event history.
"""
import asyncio
import hashlib
import re
import time
import os
import secrets
from datetime import datetime
from pathlib import Path
from telemetry_db import TelemetryDB
from collections import deque
import orjson
import uvloop
import websockets
# Use uvloop for faster event loop
uvloop.install()
# Optional OpenAI for name sanitization
try:
from openai import OpenAI
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
TELEMETRY_LOG = Path("/mnt/media/freenet-telemetry/logs.jsonl")
WS_PORT = 3134
PEER_NAMES_FILE = Path("/var/www/freenet-dashboard/peer_names.json")
# Connection limits - reserve slots for returning users and peers
MAX_CLIENTS = 300 # Total max connections
PRIORITY_RESERVED = 50 # Slots reserved for priority users (returning visitors + peers)
# Load OpenAI API key from environment or .env
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
if not OPENAI_API_KEY:
env_file = Path("/home/ian/code/mediator/main/.env")
if env_file.exists():
for line in env_file.read_text().splitlines():
if line.startswith("OPENAI_API_KEY="):
OPENAI_API_KEY = line.split("=", 1)[1].strip()
break
# Peer names storage: ip_hash -> name
peer_names = {}
# Rate limiting: ip_hash -> [timestamp1, timestamp2, ...] (last N changes within window)
name_change_timestamps = {}
NAME_CHANGE_LIMIT = 5 # Max changes per window
NAME_CHANGE_WINDOW = 3600 # 1 hour in seconds
def check_rate_limit(ip_hash: str) -> tuple[bool, int]:
"""Check if peer can change name. Returns (allowed, seconds_until_allowed)."""
now = time.time()
if ip_hash not in name_change_timestamps:
return True, 0
# Filter to only timestamps within the window
recent = [t for t in name_change_timestamps[ip_hash] if now - t < NAME_CHANGE_WINDOW]
name_change_timestamps[ip_hash] = recent
if len(recent) < NAME_CHANGE_LIMIT:
return True, 0
# Find when the oldest one expires
oldest = min(recent)
wait_time = int(NAME_CHANGE_WINDOW - (now - oldest)) + 1
return False, wait_time
def record_name_change(ip_hash: str):
"""Record a name change for rate limiting."""
now = time.time()
if ip_hash not in name_change_timestamps:
name_change_timestamps[ip_hash] = []
name_change_timestamps[ip_hash].append(now)
def load_peer_names():
"""Load peer names from file."""
global peer_names
if PEER_NAMES_FILE.exists():
try:
peer_names = orjson.loads(PEER_NAMES_FILE.read_bytes())
except Exception as e:
print(f"Error loading peer names: {e}")
peer_names = {}
def save_peer_names():
"""Save peer names to file."""
try:
# Use OPT_INDENT_2 for readable output
PEER_NAMES_FILE.write_bytes(orjson.dumps(peer_names, option=orjson.OPT_INDENT_2))
except Exception as e:
print(f"Error saving peer names: {e}")
async def sanitize_name(name: str) -> tuple[str | None, str | None]:
"""
Use OpenAI to check a peer name is appropriate.
Returns (sanitized_name, rejection_reason).
- (name, None) if accepted
- (None, reason) if rejected
"""
if not name or len(name) > 30:
return name[:30] if name else None, "Name too long" if name else "Empty name"
# Basic sanitization
name = name.strip()
if not name:
return None, "Empty name"
if not OPENAI_AVAILABLE or not OPENAI_API_KEY:
# Without OpenAI, just do basic filtering
sanitized = re.sub(r'[^\w\s\-_.!/]', '', name)[:20]
return sanitized, None
try:
print(f"[sanitize_name] Checking name: {name!r}")
client = OpenAI(api_key=OPENAI_API_KEY)
response = await asyncio.to_thread(
client.chat.completions.create,
model="gpt-4o-mini",
messages=[{
"role": "system",
"content": """You are a peer name moderator for a network dashboard.
If the name is acceptable, respond with ONLY: safe
If not, respond with ONLY: reject: <reason>
Where <reason> is one of:
- political (slogans, advocacy, culture-war statements, references to political figures/movements/causes)
- offensive (slurs, hate speech, explicit sexual terms, threats of violence)
- religious (religious or ideological proclamations)
- impersonation (pretending to be a developer, admin, official account, or real person)
- spam (advertising, URLs, product/crypto promotion)
Names should be nicknames or handles, not statements or claims of authority. The dashboard is a technical tool, not a billboard.
SAFE examples: SpaceCowboy, Node42, BadAss, PizzaLord, hell_yeah, Destroyer, user/admin, CryptoKitty
REJECT examples: MAGA2024 (political), TransRights (political), FreePalestine (political), JesusIsLord (religious), Admin (impersonation), FreenetOfficial (impersonation), Ian Clarke (impersonation), BuyBitcoin (spam), visit-my.site (spam)"""
}, {
"role": "user",
"content": f"Username: {name}"
}],
max_tokens=20,
temperature=0.0
)
llm_response = response.choices[0].message.content.strip().lower()
print(f"[sanitize_name] LLM response: {llm_response!r}")
if llm_response.startswith("reject"):
# Parse reason from "reject: political" etc.
reason = llm_response.split(":", 1)[1].strip() if ":" in llm_response else "inappropriate"
print(f"[sanitize_name] Rejected: {name!r} reason={reason}")
return None, reason
else:
print(f"[sanitize_name] Safe, returning: {name[:20]!r}")
return name[:20], None
except Exception as e:
print(f"[sanitize_name] OpenAI error: {e}")
# Fallback to basic filtering
sanitized = re.sub(r'[^\w\s\-_.!/]', '', name)[:20]
return sanitized, None
# Event history buffer (last 2 hours, hard-capped)
MAX_HISTORY_AGE_NS = 2 * 60 * 60 * 1_000_000_000 # 2 hours in nanoseconds
MAX_HISTORY_EVENTS = 50000 # Limit events kept in memory
MAX_INITIAL_EVENTS = 20000 # Events sent to clients on connect (subset of history)
# Hard cap the deque to prevent unbounded growth. Events are appended in
# approximately chronological order so a maxlen deque naturally keeps the
# most recent events.
event_history = deque(maxlen=MAX_HISTORY_EVENTS) # bounded deque of event dicts
# Event types worth keeping in history for time-travel / contract tracking.
# get_request excluded — too noisy at ~3/sec.
# connect_connected/disconnect excluded — too noisy (~83% of all events),
# they flood the buffer and push out contract operations within minutes.
# Connection state is tracked via live peer topology, not history replay.
HISTORY_EVENT_TYPES = {
# Contract operations
"put_request", "put_success",
"get_request", "get_success", "get_not_found", "get_failure",
"update_request", "update_success", "update_failure",
"subscribe_request", "subscribe_success", "subscribe_not_found",
# Update propagation
"update_broadcast_received", "update_broadcast_applied",
"update_broadcast_emitted", "broadcast_emitted",
"update_broadcast_delivery_summary",
# Peer lifecycle
"peer_startup", "peer_shutdown",
# Subscription tree
"seeding_started", "seeding_stopped",
# Subscription completions (needed for timeline SUB lane)
"subscribed",
}
# Broader set sent in the real-time stream — includes noisy types that
# are useful to see live but would flood the history buffer.
REALTIME_EVENT_TYPES = HISTORY_EVENT_TYPES | {
"connect_connected", "connect_rejected", "disconnect",
}
# SQLite database for persistent event/transaction/flow storage
db = TelemetryDB()
# Connected WebSocket clients - now managed via ClientHandler for backpressure
clients = set() # Set of ClientHandler instances
# Per-client send queue size limit. If a slow client's queue fills up,
# oldest messages are dropped to prevent memory bloat.
CLIENT_QUEUE_MAX = 100
# Threshold for logging slow clients (queue fills above this fraction)
SLOW_CLIENT_LOG_THRESHOLD = 0.75
class ClientHandler:
"""Wraps a WebSocket connection with a bounded send queue and sender task.
Instead of sending directly to the websocket (which buffers internally in
the websockets library if the client is slow), we push messages into a
bounded asyncio.Queue. A dedicated sender coroutine drains the queue.
If the queue is full, the oldest message is dropped.
"""
__slots__ = ("ws", "queue", "_sender_task", "client_ip", "ip_hash_str",
"peer_id_str", "dropped_count", "_closed")
def __init__(self, ws, client_ip=None):
self.ws = ws
self.queue = asyncio.Queue(maxsize=CLIENT_QUEUE_MAX)
self._sender_task = None
self.client_ip = client_ip
self.ip_hash_str = ip_hash(client_ip) if client_ip else ""
self.peer_id_str = anonymize_ip(client_ip) if client_ip else ""
self.dropped_count = 0
self._closed = False
def start(self):
"""Start the background sender task."""
self._sender_task = asyncio.create_task(self._sender())
async def _sender(self):
"""Drain the queue and send messages to the WebSocket."""
try:
while not self._closed:
msg = await self.queue.get()
if msg is None:
break # Poison pill - shut down
try:
await self.ws.send(msg)
except websockets.exceptions.ConnectionClosed:
break
except Exception:
break
except asyncio.CancelledError:
pass
def enqueue(self, msg: str):
"""Enqueue a message for sending. Drops oldest if queue is full."""
if self._closed:
return
try:
self.queue.put_nowait(msg)
except asyncio.QueueFull:
# Drop the oldest message to make room
try:
self.queue.get_nowait()
except asyncio.QueueEmpty:
pass
try:
self.queue.put_nowait(msg)
except asyncio.QueueFull:
pass
self.dropped_count += 1
if self.dropped_count % 50 == 1:
print(f"[backpressure] Slow client {self.ip_hash_str or 'unknown'}: "
f"dropped {self.dropped_count} messages total")
async def send_direct(self, msg: str):
"""Send a message directly (bypassing queue), for initial state/history.
Used only during client setup before real-time streaming begins.
"""
try:
await self.ws.send(msg)
except websockets.exceptions.ConnectionClosed:
raise
async def close(self):
"""Shut down the sender task."""
self._closed = True
# Send poison pill to unblock the sender
try:
self.queue.put_nowait(None)
except asyncio.QueueFull:
# Clear one item and try again
try:
self.queue.get_nowait()
except asyncio.QueueEmpty:
pass
try:
self.queue.put_nowait(None)
except asyncio.QueueFull:
pass
if self._sender_task:
self._sender_task.cancel()
try:
await self._sender_task
except asyncio.CancelledError:
pass
def __hash__(self):
return id(self.ws)
def __eq__(self, other):
if isinstance(other, ClientHandler):
return self.ws is other.ws
return NotImplemented
# Network state (current/live)
peers = {} # ip -> {id, location, last_seen, connections: set()}
connections = {} # frozenset({ip1, ip2}) -> timestamp_ns
# Track IP <-> peer_id mappings for liveness tracking
ip_to_peer_id = {} # ip -> peer_id (from body fields like target, this_peer)
peer_id_to_ip = {} # peer_id -> ip (reverse mapping for updating last_seen from any event)
# Track attrs_peer_id (the telemetry emitter) -> ip for lifecycle matching
# This is different from body peer_id - attrs_peer_id is the peer sending telemetry,
# while body peer_id is parsed from fields like "target" which is how OTHER peers see them
attrs_peer_id_to_ip = {} # attrs peer_id -> ip
# Peer presence timeline for historical reconstruction
# ip -> {id, ip_hash, location, first_seen_ns}
peer_presence = {}
# Subscription trees per contract
# contract_key -> {subscribers: set(ip), broadcasts: [(from_ip, to_ip, timestamp)]}
subscriptions = {} # contract_key -> subscription data
# Seeding state per (contract, peer) - tracks each peer's subscription tree position
# contract_key -> {peer_id -> {is_seeding: bool, upstream: peer_str, downstream: [peer_str], downstream_count: int}}
seeding_state = {} # contract_key -> {peer_id -> state}
# Contract state hashes per (contract, peer) - tracks state propagation
# contract_key -> {peer_id -> {hash: str, timestamp: int, event_type: str}}
contract_states = {}
# Contract state sizes - latest known size per contract (from state_size telemetry field)
# contract_key -> {size: int, timestamp: int}
contract_state_sizes = {}
# Contract propagation tracking - tracks how quickly new states spread across peers
# contract_key -> {current_hash, first_seen, peers: {peer_id -> timestamp}, previous: {...}}
contract_propagation = {}
def update_contract_state(contract_key, peer_id, state_hash, timestamp, event_type):
"""Update the known state hash for a (contract, peer) pair."""
if not contract_key or not peer_id or not state_hash:
return
if contract_key not in contract_states:
contract_states[contract_key] = {}
# Only update if this is newer than what we have
existing = contract_states[contract_key].get(peer_id)
if existing and existing["timestamp"] >= timestamp:
return
contract_states[contract_key][peer_id] = {
"hash": state_hash,
"timestamp": timestamp,
"event_type": event_type,
}
# Track propagation timeline - only for UPDATE events that represent state changes spreading
# GET and PUT don't represent propagation - GET is reading existing data, PUT is initial creation
if event_type in ("update_success", "update_broadcast_applied", "update_broadcast_emitted"):
update_propagation_tracking(contract_key, peer_id, state_hash, timestamp)
def update_propagation_tracking(contract_key, peer_id, state_hash, timestamp):
"""Track how a new state hash propagates across peers."""
prop = contract_propagation.setdefault(contract_key, {})
# Propagation window: only count peers that receive state within 5 minutes of first_seen
# Anything after that is likely a peer catching up after being offline, not real propagation
PROPAGATION_WINDOW_NS = 5 * 60 * 1_000_000_000 # 5 minutes in nanoseconds
# Check if this is a new state version
if prop.get("current_hash") != state_hash:
# Archive current state as previous (if exists)
if "current_hash" in prop and prop.get("peers"):
peers = prop["peers"]
prop["previous"] = {
"hash": prop["current_hash"],
"first_seen": prop["first_seen"],
"propagation_ms": (prop.get("last_seen", prop["first_seen"]) - prop["first_seen"]) // 1_000_000,
"peer_count": len(peers),
}
# Start tracking new state
prop["current_hash"] = state_hash
prop["first_seen"] = timestamp
prop["last_seen"] = timestamp
prop["peers"] = {peer_id: timestamp}
else:
# Same hash - record when this peer first got it (if within propagation window)
if peer_id not in prop.get("peers", {}):
first_seen = prop.get("first_seen", timestamp)
# Only count if within propagation window - late arrivals are peers catching up, not propagation
if (timestamp - first_seen) <= PROPAGATION_WINDOW_NS:
prop.setdefault("peers", {})[peer_id] = timestamp
prop["last_seen"] = max(prop.get("last_seen", timestamp), timestamp)
def get_propagation_data():
"""Get propagation timeline data for all contracts."""
result = {}
for contract_key, prop in contract_propagation.items():
if not prop.get("peers"):
continue
peers = prop["peers"]
first_seen = prop["first_seen"]
# Build timeline: sort peers by timestamp, compute cumulative count
sorted_peers = sorted(peers.items(), key=lambda x: x[1])
timeline = []
for i, (pid, ts) in enumerate(sorted_peers, 1):
# Offset in milliseconds from first_seen (timestamps are in nanoseconds)
offset_ms = (ts - first_seen) // 1_000_000
timeline.append({"t": int(offset_ms), "peers": i})
propagation_ms = (prop.get("last_seen", first_seen) - first_seen) // 1_000_000
result[contract_key] = {
"hash": prop["current_hash"],
"first_seen": first_seen,
"propagation_ms": int(propagation_ms),
"peer_count": len(peers),
"timeline": timeline,
"previous": prop.get("previous"),
}
return result
# Operation statistics
op_stats = {
"put": {"requests": 0, "successes": 0, "latencies": []},
"get": {"requests": 0, "successes": 0, "not_found": 0, "latencies": []},
"update": {"requests": 0, "successes": 0, "broadcasts": 0, "latencies": []},
"subscribe": {"requests": 0, "successes": 0},
}
# ── Time-series metrics (4-hour buckets, kept for 8 days) ──
METRICS_BUCKET_NS = 4 * 60 * 60 * 1_000_000_000 # 4 hours
METRICS_MAX_AGE_NS = 8 * 24 * 60 * 60 * 1_000_000_000 # 8 days
METRICS_MIN_SAMPLES = 5 # Minimum ops in a bucket to compute a meaningful rate
# Each bucket: {ts, put_req, put_ok, get_req, get_ok, get_nf, upd_req, upd_ok, sub_ok, peers, latencies_put, latencies_get, latencies_upd}
metrics_buckets = {} # bucket_key -> bucket dict (dict for O(1) lookup by timestamp)
_current_bucket = None # the bucket we're currently filling
# Version/release markers: [(timestamp_ns, version_string), ...]
version_markers = []
_seen_versions = set()
def _bucket_key(timestamp_ns):
"""Round timestamp down to bucket boundary."""
return (timestamp_ns // METRICS_BUCKET_NS) * METRICS_BUCKET_NS
def _get_or_create_bucket(timestamp_ns):
"""Get the current bucket for this timestamp, creating if needed."""
global _current_bucket
key = _bucket_key(timestamp_ns)
if _current_bucket and _current_bucket["ts"] == key:
return _current_bucket
# Prune old buckets
cutoff = timestamp_ns - METRICS_MAX_AGE_NS
stale = [k for k in metrics_buckets if k < cutoff]
for k in stale:
del metrics_buckets[k]
# Look up existing bucket by key
if key in metrics_buckets:
_current_bucket = metrics_buckets[key]
return _current_bucket
# Create new bucket
_current_bucket = {
"ts": key,
"put_req": 0, "put_ok": 0,
"get_req": 0, "get_ok": 0, "get_nf": 0,
"upd_req": 0, "upd_ok": 0,
"sub_ok": 0,
"reporting_peers": set(),
"lat_put": [], "lat_get": [], "lat_upd": [],
}
metrics_buckets[key] = _current_bucket
return _current_bucket
def record_metric(event_type, timestamp_ns, latency_ms=None, peer_id=None):
"""Record an operation into the current time bucket."""
b = _get_or_create_bucket(timestamp_ns)
if peer_id:
b["reporting_peers"].add(peer_id)
if event_type == "put_request":
b["put_req"] += 1
elif event_type == "put_success":
b["put_ok"] += 1
if latency_ms is not None:
b["lat_put"].append(latency_ms)
elif event_type == "get_request":
b["get_req"] += 1
elif event_type == "get_success":
b["get_ok"] += 1
if latency_ms is not None:
b["lat_get"].append(latency_ms)
elif event_type == "get_not_found":
b["get_nf"] += 1
elif event_type == "update_request":
b["upd_req"] += 1
elif event_type == "update_success":
b["upd_ok"] += 1
if latency_ms is not None:
b["lat_upd"].append(latency_ms)
elif event_type == "subscribed":
b["sub_ok"] += 1
def record_version(version_str, timestamp_ns):
"""Track when a new version first appears."""
if version_str and version_str != "unknown" and version_str not in _seen_versions:
_seen_versions.add(version_str)
version_markers.append((timestamp_ns, version_str))
def get_metrics_timeseries():
"""Build the time series payload for clients."""
def p50(lats):
if not lats:
return None
s = sorted(lats)
return s[len(s) // 2]
def rate_or_none(ok, total):
"""Only compute rate if we have enough samples to be meaningful."""
if total < METRICS_MIN_SAMPLES:
return None
return round(ok / total * 100, 1)
series = []
for key in sorted(metrics_buckets):
b = metrics_buckets[key]
put_total = b["put_req"] or b["put_ok"]
get_total = b["get_ok"] + b["get_nf"]
upd_total = b["upd_req"] or b["upd_ok"]
series.append({
"t": b["ts"],
"put_rate": rate_or_none(b["put_ok"], put_total),
"get_rate": rate_or_none(b["get_ok"], get_total),
"upd_rate": rate_or_none(b["upd_ok"], upd_total),
"put_n": put_total,
"get_n": get_total,
"upd_n": upd_total,
"sub_n": b["sub_ok"],
"lat_put": p50(b["lat_put"]),
"lat_get": p50(b["lat_get"]),
"lat_upd": p50(b["lat_upd"]),
})
return {
"series": series,
"versions": [],
}
def get_version_rollout():
"""Build version rollout timeseries from peer lifecycle data.
Merges pre-extracted historical data (from rotated logs) with
live peer_lifecycle data. At each time bucket, counts peers that
are considered active: started before the bucket and either not yet
shut down or shut down after the bucket. Peers without a shutdown
event expire after PEER_TTL_NS.
"""
import time as _time
if not peer_lifecycle and not _version_history:
return {"series": [], "versions": []}
ROLLOUT_BUCKET_NS = 1 * 60 * 60 * 1_000_000_000 # 1 hour
PEER_TTL_NS = 4 * 60 * 60 * 1_000_000_000 # 4 hours - keeps counts closer to actual concurrent peers
# Build list of (version, startup_ns, shutdown_ns_or_None)
peers = []
for entry in _version_history:
v = entry[0]
st = entry[1]
sd = entry[2] if len(entry) > 2 else None
peers.append((v, st, sd))
for pid, data in peer_lifecycle.items():
v = data.get("version", "unknown")
st = data.get("startup_time")
if st:
sd = data.get("shutdown_time")
peers.append((v, st, sd))
if not peers:
return {"series": [], "versions": []}
# Determine time range: last 48 hours
now_ns = int(_time.time() * 1_000_000_000)
WINDOW_NS = 48 * 60 * 60 * 1_000_000_000
min_t = now_ns - WINDOW_NS
max_t = now_ns
# Sort peers by startup time for efficient scanning
peers.sort(key=lambda p: p[1])
# Build time buckets
all_versions = set()
series = []
t = (min_t // ROLLOUT_BUCKET_NS) * ROLLOUT_BUCKET_NS
while t <= max_t + ROLLOUT_BUCKET_NS:
counts = {} # version -> count
for v, st, sd in peers:
if st > t:
break # sorted, no more peers started before this bucket
# Peer is active at time t if:
# - started before t, AND
# - either shut down after t, or no shutdown and within TTL
if sd is not None:
if sd > t:
counts[v] = counts.get(v, 0) + 1
else:
# No shutdown: assume active for PEER_TTL_NS after startup
if st + PEER_TTL_NS > t:
counts[v] = counts.get(v, 0) + 1
if counts:
bucket_data = {"t": t}
for v, c in counts.items():
bucket_data[v] = c
all_versions.add(v)
series.append(bucket_data)
t += ROLLOUT_BUCKET_NS
sorted_versions = sorted(all_versions, key=lambda v: [int(x) if x.isdigit() else 0 for x in v.replace("-", ".").split(".")], reverse=True)
return {
"series": series,
"versions": sorted_versions,
}
# Pre-extracted version history from rotated logs (loaded on startup)
# List of [version, startup_ns, shutdown_ns?] tuples
_version_history = []
def _load_version_history():
"""Load pre-extracted version history from version_history.json."""
global _version_history
history_file = Path(__file__).parent / "version_history.json"
if not history_file.exists():
print("No version_history.json found (run extract_version_history.py to generate)")
return
try:
import json as _json
with open(history_file) as f:
data = _json.load(f)
_version_history = data.get("peers", [])
extracted = data.get("extracted_at", 0)
from datetime import datetime, timezone
ext_str = datetime.fromtimestamp(extracted, tz=timezone.utc).strftime("%Y-%m-%d %H:%M") if extracted else "unknown"
print(f"Loaded version history: {len(_version_history)} peers (extracted {ext_str})")
except Exception as e:
print(f"Failed to load version_history.json: {e}")
# Peer lifecycle tracking
# peer_id -> {version, arch, os, os_version, is_gateway, startup_time, shutdown_time, graceful}
peer_lifecycle = {}
# Track pending operations by transaction ID for latency calculation
# tx_id -> {"op": "put"|"get"|"update", "start_ns": timestamp}
pending_ops = {}
# Transaction tracking - store full event sequences for timeline lanes
# tx_id -> {"op": type, "contract": key, "events": [...], "start_ns": ts, "end_ns": ts, "status": "pending"|"success"|"failed"}
MAX_TRANSACTIONS = 10000 # Keep last N transactions
MAX_INITIAL_TRANSACTIONS = 2000 # Transactions sent to clients on connect
transactions = {} # tx_id -> transaction data
transaction_order = [] # List of tx_ids in order for pruning
# Transfer events (LEDBAT transport_snapshot) for data transfer visualization
# List of {timestamp_ns, bytes_sent, bytes_received, transfers_completed, avg_transfer_time_ms, peak_throughput_bps, ...}
MAX_TRANSFER_EVENTS = 1000
transfer_events = []
# Pattern to parse peer strings like: "PeerId@IP:port (@ location)"
PEER_PATTERN = re.compile(r'(\w+)@(\d+\.\d+\.\d+\.\d+):(\d+)\s*\(@\s*([\d.]+)\)')
def anonymize_ip(ip: str) -> str:
"""Convert IP to anonymous identifier."""
if not ip:
return "unknown"
h = hashlib.sha256(ip.encode()).hexdigest()[:8]
return f"peer-{h}"
def ip_hash(ip: str) -> str:
"""Generate a short hash of IP for user self-identification."""
if not ip:
return ""
return hashlib.sha256(ip.encode()).hexdigest()[:6]
def is_public_ip(ip: str) -> bool:
"""Check if IP is a public (non-test) address."""
if not ip:
return False
if ip.startswith("127.") or ip.startswith("172.") or ip.startswith("10.") or ip.startswith("192.168."):
return False
if ip.startswith("0.") or ip == "localhost":
return False
return True
def cleanup_stale_peer_id(old_peer_id: str):
"""Remove stale data for an old peer_id when a peer reconnects with new ID.
When a peer restarts, it gets a new peer_id but keeps the same IP. The old
peer_id's data in seeding_state and contract_states becomes stale and should
be removed to avoid showing ghost peers in the contracts tab.
"""
# Clean up seeding_state
for contract_key in list(seeding_state.keys()):
if old_peer_id in seeding_state[contract_key]:
del seeding_state[contract_key][old_peer_id]
# Remove empty contracts
if not seeding_state[contract_key]:
del seeding_state[contract_key]
# Clean up contract_states
for contract_key in list(contract_states.keys()):
if old_peer_id in contract_states[contract_key]:
del contract_states[contract_key][old_peer_id]
# Remove empty contracts
if not contract_states[contract_key]:
del contract_states[contract_key]
def parse_peer_string(peer_str):
"""Extract peer_id, IP, and location from peer string."""
if not peer_str:
return None, None, None
match = PEER_PATTERN.search(peer_str)
if match:
peer_id = match.group(1)
ip = match.group(2)
location = float(match.group(4))
return peer_id, ip, location
return None, None, None
def prune_old_events():
"""Remove events older than MAX_HISTORY_AGE_NS."""
now_ns = int(time.time() * 1_000_000_000)
cutoff = now_ns - MAX_HISTORY_AGE_NS
while event_history and event_history[0]["timestamp"] < cutoff:
event_history.popleft()
def prune_old_transactions():
"""Keep only the last MAX_TRANSACTIONS."""
global transaction_order
while len(transaction_order) > MAX_TRANSACTIONS:
old_tx_id = transaction_order.pop(0)
if old_tx_id in transactions:
del transactions[old_tx_id]
# Stale data cleanup threshold (same as topology filtering)
STALE_PEER_THRESHOLD_NS = 30 * 60 * 1_000_000_000 # 30 minutes
STALE_PENDING_OP_NS = 5 * 60 * 1_000_000_000 # 5 minutes (ops should complete quickly)
STALE_PROPAGATION_NS = 2 * 60 * 60 * 1_000_000_000 # 2 hours (match event history)
def cleanup_stale_peers():
"""Remove all data for peers that haven't reported in STALE_PEER_THRESHOLD_NS.
This is the authoritative cleanup: instead of just filtering at read-time,
we delete stale entries from every in-memory data structure to prevent
unbounded memory growth.
Returns list of (anonymized_id, ip) tuples for peers that were removed,
plus list of removed connection pairs, so callers can broadcast removals.
"""
now_ns = int(time.time() * 1_000_000_000)
cutoff = now_ns - STALE_PEER_THRESHOLD_NS
# 1. Find stale peer IPs
stale_ips = set()
for ip, data in peers.items():
if data.get("last_seen", 0) < cutoff:
stale_ips.add(ip)
if not stale_ips:
return [], [], set()
# 2. Collect peer_ids associated with stale IPs (for contract/seeding cleanup)
stale_peer_ids = set()
for ip in stale_ips:
peer_id = ip_to_peer_id.get(ip)
if peer_id:
stale_peer_ids.add(peer_id)
# Also check attrs mapping
peer_data = peers.get(ip)
if peer_data and peer_data.get("peer_id"):
stale_peer_ids.add(peer_data["peer_id"])
stale_anon_ids = set()
for ip in stale_ips:
stale_anon_ids.add(anonymize_ip(ip))
# 3. Remove from peers dict
removed_peers = []
for ip in stale_ips:
data = peers.pop(ip, None)
if data:
removed_peers.append((data["id"], ip))
# 4. Remove from IP <-> peer_id mappings
for ip in stale_ips:
pid = ip_to_peer_id.pop(ip, None)
if pid:
peer_id_to_ip.pop(pid, None)
# 5. Remove from attrs_peer_id_to_ip
stale_attrs_pids = [pid for pid, ip in attrs_peer_id_to_ip.items() if ip in stale_ips]
for pid in stale_attrs_pids:
del attrs_peer_id_to_ip[pid]
stale_peer_ids.add(pid) # Also clean contract data for attrs peer_ids
# 6. Remove from peer_presence
for ip in stale_ips:
peer_presence.pop(ip, None)
# 7. Remove from peer_lifecycle
for pid in stale_peer_ids:
peer_lifecycle.pop(pid, None)
# 8. Remove connections involving stale peers
removed_connections = []
stale_conns = {conn for conn in connections if conn & stale_ips}
for conn in stale_conns:
connections.pop(conn, None)
ips = list(conn)
if len(ips) == 2:
removed_connections.append((anonymize_ip(ips[0]), anonymize_ip(ips[1])))
# Clean up connection sets on the surviving peer
for ip in ips:
if ip not in stale_ips and ip in peers:
peers[ip]["connections"] -= stale_ips
# 9. Remove stale peer_ids from seeding_state
for contract_key in list(seeding_state.keys()):
for pid in stale_peer_ids:
seeding_state[contract_key].pop(pid, None)
if not seeding_state[contract_key]:
del seeding_state[contract_key]
# 10. Remove stale peer_ids from contract_states
# Also remove entries whose own timestamp is older than the cutoff,
# since some peer_ids may never have been mapped to an IP
# (e.g. peers only seen via get_success or broadcast_applied).
for contract_key in list(contract_states.keys()):
for pid in stale_peer_ids:
contract_states[contract_key].pop(pid, None)
# Timestamp-based cleanup for unmapped peers
stale_entries = [
pid for pid, entry in contract_states[contract_key].items()
if entry.get("timestamp", 0) < cutoff
]
for pid in stale_entries:
contract_states[contract_key].pop(pid, None)
if not contract_states[contract_key]:
del contract_states[contract_key]
# 11. Remove stale peers from subscriptions
for contract_key in list(subscriptions.keys()):
sub_data = subscriptions[contract_key]
sub_data["subscribers"] -= stale_anon_ids
# Clean broadcast tree
for sender_id in list(sub_data["tree"].keys()):
if sender_id in stale_anon_ids:
del sub_data["tree"][sender_id]
else:
sub_data["tree"][sender_id] -= stale_anon_ids
if not sub_data["tree"][sender_id]:
del sub_data["tree"][sender_id]
# Remove empty subscription entries
if not sub_data["subscribers"] and not sub_data["tree"]:
del subscriptions[contract_key]
# 12. Remove stale peers from contract_propagation peer lists
for contract_key in list(contract_propagation.keys()):
prop = contract_propagation[contract_key]
prop_peers = prop.get("peers", {})
for pid in stale_peer_ids:
prop_peers.pop(pid, None)
if not prop_peers and "current_hash" in prop:
del contract_propagation[contract_key]
if removed_peers:
print(f"[cleanup] Removed {len(removed_peers)} stale peers, "
f"{len(removed_connections)} connections, "
f"{len(stale_peer_ids)} peer_ids from contract data")
return removed_peers, removed_connections, stale_peer_ids
def cleanup_stale_pending_ops():
"""Remove pending operations that have been stuck for too long.
Operations that never received a success/failure response leak in pending_ops.
This cleans them up after STALE_PENDING_OP_NS.
"""
now_ns = int(time.time() * 1_000_000_000)
cutoff = now_ns - STALE_PENDING_OP_NS
stale_tx_ids = [
tx_id for tx_id, op in pending_ops.items()
if op.get("start_ns", 0) < cutoff
]
for tx_id in stale_tx_ids:
del pending_ops[tx_id]
if stale_tx_ids:
print(f"[cleanup] Removed {len(stale_tx_ids)} stale pending operations")
def cleanup_stale_propagation():
"""Remove old contract propagation tracking data.
Propagation data older than STALE_PROPAGATION_NS is no longer useful
for the dashboard (matches event history window).
"""
now_ns = int(time.time() * 1_000_000_000)
cutoff = now_ns - STALE_PROPAGATION_NS
stale_keys = []
for contract_key, prop in contract_propagation.items():
first_seen = prop.get("first_seen", 0)
last_seen = prop.get("last_seen", first_seen)
if last_seen < cutoff:
stale_keys.append(contract_key)
for key in stale_keys:
del contract_propagation[key]
if stale_keys:
print(f"[cleanup] Removed {len(stale_keys)} stale propagation entries")
def track_transaction(tx_id, event_type, timestamp, peer_id, contract_key=None, body_type=None):
"""Track an event as part of a transaction for timeline lanes.
All events with a valid transaction ID are tracked. Events are grouped by
transaction ID to show related events together in the timeline.
"""