-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdz_latency.py
More file actions
347 lines (298 loc) · 12.7 KB
/
dz_latency.py
File metadata and controls
347 lines (298 loc) · 12.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
import json
import subprocess
import re
import csv
# --- Configuration ---
OUTPUT_FILE = 'dz_latency_result.csv'
def is_validator_connected():
"""
Checks if the validator is connected to the DoubleZero network.
Returns:
bool: True if connected, False otherwise.
"""
try:
command = ['doublezero', 'status', '--json']
result = subprocess.run(command, capture_output=True, text=True, timeout=10)
if result.returncode != 0:
print(f"Error running 'doublezero status': {result.stderr}")
return False
data = json.loads(result.stdout)
if data and data[0].get("doublezero_status", {}).get("session_status") == "up":
print("Validator is connected to the DoubleZero network.")
return True
else:
print("Validator is not connected to the DoubleZero network.")
return False
except (FileNotFoundError, json.JSONDecodeError, IndexError) as e:
print(f"An error occurred: {e}")
return False
def ping_ip(ip_address):
"""
Pings an IP address to get its latency. Handles different OS commands.
Args:
ip_address (str): The IP address to ping.
Returns:
str: The latency in ms, or 'unreachable' if the ping fails.
"""
try:
command = ['ping', '-c', '3', '-i', '0.2', '-W', '0.5', ip_address]
latency_pattern = r"min/avg/max/mdev = [\d.]+/([\d.]+)/[\d.]+/[\d.]+ ms"
# Execute the ping command
result = subprocess.run(command, capture_output=True, text=True, timeout=5)
if result.returncode == 0:
# Search for the latency in the output
match = re.search(latency_pattern, result.stdout)
if match:
latency = match.group(1)
print(f"Ping successful for {ip_address}: {latency} ms")
return latency
print(f"Ping failed for {ip_address}.")
return 'unreachable'
except subprocess.TimeoutExpired:
print(f"Ping timed out for {ip_address}.")
return 'unreachable'
except Exception as e:
print(f"An error occurred during ping for {ip_address}: {e}")
return 'unreachable'
def get_ips_from_dz_clients():
"""
Gets a list of Solana Validator IP addresses from `doublezero user list --json`.
Returns:
list: A list of Solana Validator dz_ip addresses.
"""
command = "doublezero user list --json"
print(f'Running command to get IPs: "{command}"')
try:
# Execute the shell command
result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=15)
if result.returncode != 0:
print(f"Error running command to get IPs: {result.stderr}")
return []
# Process the output
users = json.loads(result.stdout)
ips = [
user["dz_ip"]
for user in users
if "SolanaValidator" in user.get("accesspass", "") and user.get("dz_ip")
]
print(f"Found {len(ips)} Solana Validator IPs.")
return ips
except subprocess.TimeoutExpired:
print("Error: Command to get IPs timed out.")
return []
except json.JSONDecodeError:
print(f"Error: Could not decode JSON from `{command}` command.")
return []
except Exception as e:
print(f"An unexpected error occurred while getting IPs: {e}")
return []
def load_gossip_data():
"""
Runs the 'solana gossip' command and loads the output as JSON.
Returns:
list or None: A list of gossip entries if successful, otherwise None.
"""
try:
# Execute the solana gossip command
command = ['solana', 'gossip', '--output=json']
print(f"Running command to load gossip data: `{' '.join(command)}`")
result = subprocess.run(command, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
print(f"Error running 'solana gossip': {result.stderr}")
return None
return json.loads(result.stdout)
except FileNotFoundError:
print("Error: 'solana' command not found. Make sure it's installed and in your PATH.")
return None
except subprocess.TimeoutExpired:
print("Error: 'solana gossip' command timed out.")
return None
except json.JSONDecodeError:
print(f"Error: Could not decode JSON from `solana gossip` command.")
return None
def get_identity_from_gossip(ip_address, gossip_data):
"""
Runs the 'solana gossip' command and parses the output to find
the identity for a given IP address.
Args:
ip_address (str): The IP address to check.
Returns:
str or None: The identity public key if found, otherwise None.
"""
try:
# Iterate through the gossip data to find the matching IP
for entry in gossip_data:
if entry.get("ipAddress") == ip_address:
identity_pubkey = entry.get("identityPubkey")
if identity_pubkey:
print(f"Found identity '{identity_pubkey}' for IP: {ip_address}")
return identity_pubkey
print(f"IP {ip_address} not found in gossip.")
return None
except Exception as e: # Catch any other unexpected errors during data processing
return None
except Exception as e:
print(f"An unexpected error occurred: {e}")
return None
def load_active_validators():
"""
Loads the list of active validator identity pubkeys by running
the `solana validators` command.
Returns:
set: A set of active validator identity pubkeys for efficient lookup.
"""
command = ['solana', 'validators', '--output=json']
print(f"Running command to get active validators: `{' '.join(command)}`")
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
print(f"Error running command to get active validators: {result.stderr}")
return set()
data = json.loads(result.stdout)
validator_list = data.get("validators", [])
active_identities = {v.get("identityPubkey") for v in validator_list if v.get("identityPubkey")}
print(f"Loaded {len(active_identities)} active validator identities.")
return active_identities
except FileNotFoundError:
print("Error: 'solana' command not found. Make sure it's installed and in your PATH.")
return set()
except json.JSONDecodeError:
print(f"Error: Could not decode JSON from `solana validators` command.")
return set()
except subprocess.TimeoutExpired:
print("Error: `solana validators` command timed out.")
return set()
except Exception as e:
print(f"An unexpected error occurred while loading active validators: {e}")
return set()
def load_validator_details():
"""
Loads validator information by running the `solana validator-info get` command
and creates a mapping from identity public key to validator name.
Returns:
dict: A dictionary mapping identity pubkeys to validator names.
"""
command = ['solana', 'validator-info', 'get', '--output=json']
print(f"Running command to get validator details: `{' '.join(command)}`")
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
print(f"Error running command to get validator details: {result.stderr}")
return {}
validators_data = json.loads(result.stdout)
validator_map = {
v.get("identityPubkey"): v.get("info", {}).get("name")
for v in validators_data
if v.get("identityPubkey") and v.get("info", {}).get("name")
}
print(f"Loaded {len(validator_map)} validator details.")
return validator_map
except FileNotFoundError:
print("Error: 'solana' command not found. Make sure it's installed and in your PATH.")
return {}
except json.JSONDecodeError:
print(f"Error: Could not decode JSON from `solana validator-info` command.")
return {}
except subprocess.TimeoutExpired:
print("Error: `solana validator-info` command timed out.")
return {}
except Exception as e:
print(f"An unexpected error occurred while loading validator details: {e}")
return {}
def get_ip_location(ip_address):
"""
Fetches the city and country for a given IP address using ip-api.com.
Args:
ip_address (str): The IP address to look up.
Returns:
tuple: A tuple containing (city, country) or ('Unknown', 'Unknown') if lookup fails.
"""
try:
response = requests.get(f"http://ip-api.com/json/{ip_address}", timeout=5)
response.raise_for_status() # Raise an exception for HTTP errors
data = response.json()
if data.get("status") == "success":
city = data.get("city", "Unknown")
country = data.get("country", "Unknown")
print(f"Location for {ip_address}: {city}, {country}")
return city, country
else:
print(f"Could not get location for {ip_address}: {data.get('message', 'Unknown error')}")
return 'Unknown', 'Unknown'
except requests.exceptions.RequestException as e:
print(f"Error fetching location for {ip_address}: {e}")
return 'Unknown', 'Unknown'
except Exception as e:
print(f"An unexpected error occurred during location lookup for {ip_address}: {e}")
return 'Unknown', 'Unknown'
import requests
def main():
"""
Main function to orchestrate the validator checking process.
"""
import argparse
parser = argparse.ArgumentParser(description="Check validator status and latency.")
parser.add_argument("--ip_list", help="Path to a file containing a list of IP addresses, one per line.")
parser.add_argument("--no_geo", action="store_true", help="Enable geolocation lookup.")
args = parser.parse_args()
print("Starting validator check...")
# Load necessary data by running CLI commands
if args.ip_list:
with open(args.ip_list, 'r') as f:
ip_addresses = [line.strip() for line in f if line.strip()]
else:
ip_addresses = get_ips_from_dz_clients()
active_validator_identities = load_active_validators()
validator_info = load_validator_details()
gossip_data = load_gossip_data()
if not ip_addresses:
print("No IP addresses to process. Exiting.")
return
if not active_validator_identities:
print("Warning: No active validators loaded. IPs cannot be confirmed as validators.")
return
if not validator_info:
print("Warning: No validator details loaded. IPs cannot be confirmed as validators.")
return
if not gossip_data:
print("Warning: No gossip data loaded. IPs cannot be confirmed as validators.")
return
results = []
# Process each IP address
for ip in ip_addresses:
print(f"\n--- Checking IP: {ip} ---")
latency = ping_ip(ip)
identity_key = get_identity_from_gossip(ip, gossip_data)
status = 'gossip_not_found'
name = ''
if identity_key:
# Check if the found identity is in the set of active validators
if identity_key in active_validator_identities:
status = 'validator'
# Look up the validator name from the details map
name = validator_info.get(identity_key, "Unknown")
print(f"SUCCESS: IP {ip} with identity {identity_key} is an active validator named '{name}'.")
else:
status = 'gossip'
print(f"INFO: IP {ip} has an identity ({identity_key}) but is NOT in the active validator list.")
if args.no_geo:
results.append([ip, status, name, latency])
else:
city, country = get_ip_location(ip)
results.append([ip, status, name.replace(',', ''), latency, city, country])
# Save the results to a CSV file
try:
with open(OUTPUT_FILE, 'w', newline='') as f:
writer = csv.writer(f)
# Write the header
if args.no_geo:
writer.writerow(['IP', 'Status', 'Validator Name', 'Latency', 'City', 'Country'])
else:
writer.writerow(['IP', 'Status', 'Validator Name', 'Latency'])
# Write the data rows
writer.writerows(results)
print(f"\nProcess complete. Results saved to '{OUTPUT_FILE}'")
except IOError as e:
print(f"Error writing to output file '{OUTPUT_FILE}': {e}")
if __name__ == "__main__":
main()