-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathapp.py
More file actions
727 lines (654 loc) · 28.5 KB
/
app.py
File metadata and controls
727 lines (654 loc) · 28.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
# /var/www/schema.backyardbrains.com/app.py
from flask import Flask, request, jsonify, Response, send_from_directory
from flask import redirect, url_for, session
from flask_cors import CORS
from werkzeug.middleware.proxy_fix import ProxyFix
from datetime import datetime
import os, json, fnmatch
import base64, hmac, io, zipfile
from functools import wraps
# Load environment from a local .env when present (useful for dev)
try:
from dotenv import load_dotenv # type: ignore
load_dotenv()
except Exception:
pass
# Optional Auth0 dependencies
try:
from jose import jwt
import requests
except Exception:
jwt = None # type: ignore
requests = None # type: ignore
app = Flask(__name__)
CORS(app)
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
UPLOAD_DIRECTORY = os.environ.get('UPLOAD_DIRECTORY', '/var/www/schema.backyardbrains.com/uploads')
RESULTS_PASSWORD = os.environ.get('RESULTS_PASSWORD')
AUTH0_DOMAIN = os.environ.get('AUTH0_DOMAIN') # e.g. backyardbrains.us.auth0.com
AUTH0_AUDIENCE = os.environ.get('AUTH0_AUDIENCE') # e.g. https://schema.backyardbrains.com/api
AUTH0_CLIENT_ID = os.environ.get('AUTH0_CLIENT_ID')
AUTH0_CLIENT_SECRET = os.environ.get('AUTH0_CLIENT_SECRET')
AUTH0_MGMT_CLIENT_ID = os.environ.get('AUTH0_MGMT_CLIENT_ID')
AUTH0_MGMT_CLIENT_SECRET = os.environ.get('AUTH0_MGMT_CLIENT_SECRET')
AUTH0_MGMT_DOMAIN = os.environ.get('AUTH0_MGMT_DOMAIN') # e.g. backyardbrains.us.auth0.com
AUTH0_MGMT_AUDIENCE = os.environ.get('AUTH0_MGMT_AUDIENCE') # e.g. https://backyardbrains.us.auth0.com/api/v2/
AUTH0_READ_RESULTS_ROLE_ID = os.environ.get('AUTH0_READ_RESULTS_ROLE_ID') # optional: role that includes read:results
# Flask session config (required for server-side login)
app.secret_key = os.environ.get('SECRET_KEY', os.environ.get('FLASK_SECRET_KEY', 'dev-insecure'))
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
app.config['SESSION_COOKIE_SECURE'] = True
_JWKS_CACHE = None
_MGMT_TOKEN_CACHE = None
_MGMT_TOKEN_EXP = 0
def _get_auth0_jwks():
global _JWKS_CACHE
if _JWKS_CACHE is not None:
return _JWKS_CACHE
if not (requests and AUTH0_DOMAIN):
return None
try:
url = f"https://{AUTH0_DOMAIN}/.well-known/jwks.json"
resp = requests.get(url, timeout=5)
resp.raise_for_status()
_JWKS_CACHE = resp.json()
return _JWKS_CACHE
except Exception:
return None
def _get_mgmt_token() -> str:
# Fetch a Management API token using client credentials; cache briefly
global _MGMT_TOKEN_CACHE, _MGMT_TOKEN_EXP
import time
now = int(time.time())
if _MGMT_TOKEN_CACHE and now < _MGMT_TOKEN_EXP - 30:
return _MGMT_TOKEN_CACHE
mgmt_domain = AUTH0_MGMT_DOMAIN or AUTH0_DOMAIN
mgmt_audience = AUTH0_MGMT_AUDIENCE or (f"https://{AUTH0_MGMT_DOMAIN}/api/v2/" if AUTH0_MGMT_DOMAIN else f"https://{AUTH0_DOMAIN}/api/v2/")
if not (requests and mgmt_domain and AUTH0_MGMT_CLIENT_ID and AUTH0_MGMT_CLIENT_SECRET):
raise RuntimeError('management api not configured')
token_url = f"https://{mgmt_domain}/oauth/token"
data = {
'grant_type': 'client_credentials',
'client_id': AUTH0_MGMT_CLIENT_ID,
'client_secret': AUTH0_MGMT_CLIENT_SECRET,
'audience': mgmt_audience,
}
# Optionally request specific scopes (app must be authorized for them)
# read:users_by_email for users-by-email; read:users for search; update:users for granting permissions
try:
resp = requests.post(token_url, json=data, timeout=10)
resp.raise_for_status()
except Exception:
app.logger.exception('mgmt token fetch failed')
raise
j = resp.json()
_MGMT_TOKEN_CACHE = j.get('access_token')
_MGMT_TOKEN_EXP = now + int(j.get('expires_in', 300))
return _MGMT_TOKEN_CACHE
def _verify_auth0_jwt(token: str):
if not (jwt and AUTH0_DOMAIN and AUTH0_AUDIENCE):
raise ValueError('auth0 not configured')
unverified_header = jwt.get_unverified_header(token)
jwks = _get_auth0_jwks()
if not jwks:
raise ValueError('jwks unavailable')
rsa_key = {}
for key in jwks.get('keys', []):
if key.get('kid') == unverified_header.get('kid'):
rsa_key = {
'kty': key.get('kty'),
'kid': key.get('kid'),
'use': key.get('use'),
'n': key.get('n'),
'e': key.get('e')
}
break
if not rsa_key:
raise ValueError('no matching jwk')
issuer = f"https://{AUTH0_DOMAIN}/"
payload = jwt.decode(
token,
rsa_key,
algorithms=['RS256'],
audience=AUTH0_AUDIENCE,
issuer=issuer,
)
return payload
def _verify_auth0_id_token(token: str):
if not (jwt and AUTH0_DOMAIN and AUTH0_CLIENT_ID):
raise ValueError('auth0 not configured')
unverified_header = jwt.get_unverified_header(token)
jwks = _get_auth0_jwks()
if not jwks:
raise ValueError('jwks unavailable')
rsa_key = {}
for key in jwks.get('keys', []):
if key.get('kid') == unverified_header.get('kid'):
rsa_key = {
'kty': key.get('kty'),
'kid': key.get('kid'),
'use': key.get('use'),
'n': key.get('n'),
'e': key.get('e')
}
break
if not rsa_key:
raise ValueError('no matching jwk')
issuer = f"https://{AUTH0_DOMAIN}/"
payload = jwt.decode(
token,
rsa_key,
algorithms=['RS256'],
audience=AUTH0_CLIENT_ID,
issuer=issuer,
)
return payload
def ensure_upload_dir():
os.makedirs(UPLOAD_DIRECTORY, exist_ok=True)
def _constant_time_eq(a: str, b: str) -> bool:
try:
return hmac.compare_digest(a, b)
except Exception:
return False
def require_results_auth(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Accept an existing Flask session login
if session.get('user'):
return func(*args, **kwargs)
# Otherwise, require a valid Auth0 JWT (Bearer token)
authz = request.headers.get('Authorization', '')
if authz.startswith('Bearer '):
token = authz.split(' ', 1)[1]
try:
_verify_auth0_jwt(token)
return func(*args, **kwargs)
except Exception:
pass
resp = Response('Unauthorized', 401)
resp.headers['WWW-Authenticate'] = 'Bearer realm="Results"'
return resp
return wrapper
# --------- Auth (server-side session with Auth0) ---------
def _abs_url(path: str) -> str:
# Respect proxy headers for scheme/host
root = request.url_root.rstrip('/')
if not path.startswith('/'):
path = '/' + path
return root + path
@app.get('/api/auth/login')
def auth_login():
if not (AUTH0_DOMAIN and AUTH0_CLIENT_ID and AUTH0_CLIENT_SECRET):
return jsonify({"status":"error","error":"auth0 not configured"}), 500
# Generate state to mitigate CSRF
state = base64.urlsafe_b64encode(os.urandom(24)).decode('ascii')
session['oauth_state'] = state
# Optional nonce for ID token
nonce = base64.urlsafe_b64encode(os.urandom(24)).decode('ascii')
session['oauth_nonce'] = nonce
# Build OIDC scopes; include API permission if audience is configured
scope = 'openid profile email'
if AUTH0_AUDIENCE:
scope += ' read:results'
params = {
'response_type': 'code',
'client_id': AUTH0_CLIENT_ID,
'redirect_uri': _abs_url('/api/auth/callback'),
'scope': scope,
'state': state,
'nonce': nonce,
'prompt': 'login'
}
if AUTH0_AUDIENCE:
params['audience'] = AUTH0_AUDIENCE
q = '&'.join(f"{k}={requests.utils.quote(v)}" for k, v in params.items() if v)
return redirect(f"https://{AUTH0_DOMAIN}/authorize?{q}")
@app.get('/api/auth/callback')
def auth_callback():
if not (AUTH0_DOMAIN and AUTH0_CLIENT_ID and AUTH0_CLIENT_SECRET and requests):
return jsonify({"status":"error","error":"auth0 not configured"}), 500
code = request.args.get('code')
state = request.args.get('state')
if not code or not state or state != session.get('oauth_state'):
return jsonify({"status":"error","error":"invalid state or code"}), 400
# Exchange code for tokens
token_url = f"https://{AUTH0_DOMAIN}/oauth/token"
data = {
'grant_type': 'authorization_code',
'client_id': AUTH0_CLIENT_ID,
'client_secret': AUTH0_CLIENT_SECRET,
'code': code,
'redirect_uri': _abs_url('/api/auth/callback'),
}
try:
resp = requests.post(token_url, json=data, timeout=10)
resp.raise_for_status()
tok = resp.json()
except Exception as e:
return jsonify({"status":"error","error":"token exchange failed"}), 400
id_token = tok.get('id_token')
if not id_token:
return jsonify({"status":"error","error":"missing id_token"}), 400
try:
claims = _verify_auth0_id_token(id_token)
except Exception:
# If verification fails, do not log in
return jsonify({"status":"error","error":"invalid id_token"}), 400
# Store minimal user session
session.pop('oauth_state', None)
session.pop('oauth_nonce', None)
session['user'] = {
'sub': claims.get('sub'),
'email': claims.get('email'),
'name': claims.get('name') or claims.get('nickname'),
}
# Optional: store access_token if present (not sent to client)
if 'access_token' in tok:
session['access_token'] = tok['access_token']
# Redirect back to results
return redirect('/results')
@app.get('/api/auth/logout')
def auth_logout():
session.clear()
# Optional: Log out from Auth0 as well
if AUTH0_DOMAIN and AUTH0_CLIENT_ID:
return_to = _abs_url('/results')
logout_url = (
f"https://{AUTH0_DOMAIN}/v2/logout?client_id={AUTH0_CLIENT_ID}"
f"&returnTo={requests.utils.quote(return_to)}"
)
return redirect(logout_url)
return redirect('/results')
@app.get('/api/auth/me')
def auth_me():
user = session.get('user')
if not user:
return jsonify({"authenticated": False}), 401
permissions = []
token = session.get('access_token')
if token:
try:
payload = _verify_auth0_jwt(token)
permissions = payload.get('permissions', [])
except Exception:
# Token might be expired or invalid; just return empty perms
pass
return jsonify({"authenticated": True, "user": user, "permissions": permissions})
def _has_scope(payload: dict, required_scope: str) -> bool:
try:
# Require RBAC permissions in access token
perms = payload.get('permissions')
if isinstance(perms, list):
return required_scope in perms
# If permissions claim missing, treat as not authorized
return False
except Exception:
return False
def require_results_scope(required_scope: str):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Check Bearer token if provided
authz = request.headers.get('Authorization', '')
if authz.startswith('Bearer '):
token = authz.split(' ', 1)[1]
try:
payload = _verify_auth0_jwt(token)
app.logger.info('authz check: bearer token; perms=%s', payload.get('permissions'))
if _has_scope(payload, required_scope):
app.logger.info('authz allow: bearer with scope %s', required_scope)
return func(*args, **kwargs)
app.logger.info('authz deny: bearer missing permissions or scope %s', required_scope)
return jsonify({"status":"error","error":"forbidden","missing_scope": required_scope}), 403
except Exception:
app.logger.info('authz deny: bearer invalid')
pass
# Check session access token
token = session.get('access_token')
if token:
try:
payload = _verify_auth0_jwt(token)
app.logger.info('authz check: session token; perms=%s', payload.get('permissions'))
if _has_scope(payload, required_scope):
app.logger.info('authz allow: session with scope %s', required_scope)
return func(*args, **kwargs)
app.logger.info('authz deny: session missing permissions or scope %s', required_scope)
return jsonify({"status":"error","error":"forbidden","missing_scope": required_scope}), 403
except Exception:
app.logger.info('authz deny: session token invalid')
return jsonify({"status":"error","error":"unauthorized"}), 401
app.logger.info('authz deny: no auth presented')
return jsonify({"status":"error","error":"unauthorized"}), 401
return wrapper
return decorator
def require_admin_permission(required_permission: str):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
token = session.get('access_token')
if not token:
return jsonify({"status":"error","error":"unauthorized"}), 401
try:
payload = _verify_auth0_jwt(token)
app.logger.info('admin check perms=%s need=%s', payload.get('permissions'), required_permission)
if _has_scope(payload, required_permission):
return func(*args, **kwargs)
return jsonify({"status":"error","error":"forbidden","missing_scope": required_permission}), 403
except Exception:
return jsonify({"status":"error","error":"unauthorized"}), 401
return wrapper
return decorator
# --------- Admin APIs (via Auth0 Management API) ---------
@app.get('/api/admin/search_user')
@require_admin_permission('read:users')
def admin_search_user():
email = request.args.get('email', '').strip()
if not email:
return jsonify({"status":"error","error":"invalid email"}), 400
try:
token = _get_mgmt_token()
mgmt_domain = AUTH0_MGMT_DOMAIN or AUTH0_DOMAIN
headers = {'Authorization': f'Bearer {token}'}
# Always use v3 search to keep required scope to read:users only
url = f"https://{mgmt_domain}/api/v2/users"
# If the input contains '@', prefer an exact email match; otherwise wildcard partial
q = f"email:\"{email}\"" if '@' in email else f"email:*{email}*"
r = requests.get(url, params={'q': q, 'search_engine': 'v3', 'fields': 'user_id,email,name,nickname,identities', 'include_fields': 'true'}, headers=headers, timeout=10)
r.raise_for_status()
users = r.json() or []
out = [{
'user_id': u.get('user_id'),
'email': u.get('email'),
'name': u.get('name') or u.get('nickname'),
'connection': (u.get('identities') or [{}])[0].get('connection')
} for u in users]
return jsonify({"status":"ok","users": out})
except requests.HTTPError as he:
try:
txt = he.response.text
except Exception:
txt = ''
app.logger.error('admin search_user failed %s %s', getattr(he.response, 'status_code', '?'), txt)
code = getattr(he.response, 'status_code', 500) or 500
return jsonify({"status":"error","error":"search failed","upstream_status": code}), 502
except Exception:
app.logger.exception('admin search_user failed')
return jsonify({"status":"error","error":"search failed"}), 500
@app.post('/api/admin/grant_read_results')
@require_admin_permission('write:users')
def admin_grant_read_results():
try:
body = request.get_json(silent=True) or {}
user_id = (body.get('user_id') or '').strip()
email = (body.get('email') or '').strip()
token = _get_mgmt_token()
# Resolve user_id from email if not provided
mgmt_domain = AUTH0_MGMT_DOMAIN or AUTH0_DOMAIN
if not user_id:
if not email or '@' not in email:
return jsonify({"status":"error","error":"invalid email or user_id"}), 400
url = f"https://{mgmt_domain}/api/v2/users-by-email"
r = requests.get(url, params={'email': email}, headers={'Authorization': f'Bearer {token}'}, timeout=10)
r.raise_for_status()
users = r.json() or []
if not users:
return jsonify({"status":"error","error":"user not found"}), 404
user_id = users[0].get('user_id')
# Assign permission directly
perm = {
'permission_name': 'read:results',
'resource_server_identifier': AUTH0_AUDIENCE or ''
}
if not perm['resource_server_identifier']:
return jsonify({"status":"error","error":"AUTH0_AUDIENCE not set"}), 500
purl = f"https://{mgmt_domain}/api/v2/users/{requests.utils.quote(user_id, safe='')}/permissions"
pr = requests.post(purl, json={'permissions': [perm]}, headers={'Authorization': f'Bearer {token}'}, timeout=10)
if pr.status_code not in (200, 201, 204):
app.logger.error('grant failed: %s %s', pr.status_code, pr.text)
return jsonify({"status":"error","error":"grant failed"}), 500
return jsonify({"status":"ok","granted": True, "user_id": user_id})
except Exception:
app.logger.exception('admin grant_read_results failed')
return jsonify({"status":"error","error":"internal error"}), 500
@app.get('/api/admin/users_with_permission')
@require_admin_permission('read:users')
def admin_users_with_permission():
permission = request.args.get('permission', 'read:results')
audience = request.args.get('audience') or (AUTH0_AUDIENCE or '')
role_id = request.args.get('role_id') or (AUTH0_READ_RESULTS_ROLE_ID if permission == 'read:results' else None)
per_page = int(request.args.get('per_page', 50))
page = int(request.args.get('page', 0))
try:
token = _get_mgmt_token()
headers = {'Authorization': f'Bearer {token}'}
mgmt_domain = AUTH0_MGMT_DOMAIN or AUTH0_DOMAIN
users = []
if role_id:
# If a role is provided, list users by role
url = f"https://{mgmt_domain}/api/v2/roles/{requests.utils.quote(role_id, safe='')}/users"
r = requests.get(url, params={'per_page': per_page, 'page': page}, headers=headers, timeout=15)
r.raise_for_status()
users = r.json() or []
else:
# Scan a page of users and filter by permission
url = f"https://{mgmt_domain}/api/v2/users"
r = requests.get(url, params={'per_page': per_page, 'page': page, 'fields': 'user_id,email,name,nickname,identities', 'include_fields': 'true'}, headers=headers, timeout=15)
r.raise_for_status()
candidates = r.json() or []
for u in candidates:
uid = u.get('user_id')
if not uid:
continue
purl = f"https://{mgmt_domain}/api/v2/users/{requests.utils.quote(uid, safe='')}/permissions"
pr = requests.get(purl, headers=headers, timeout=10)
if pr.status_code != 200:
continue
perms = pr.json() or []
has = any((p.get('permission_name') == permission and (not audience or p.get('resource_server_identifier') == audience)) for p in perms)
if has:
users.append(u)
out = [{
'user_id': u.get('user_id'),
'email': u.get('email'),
'name': u.get('name') or u.get('nickname'),
'connection': (u.get('identities') or [{}])[0].get('connection')
} for u in users]
return jsonify({"status":"ok","users": out, "page": page, "per_page": per_page})
except requests.HTTPError as he:
try:
txt = he.response.text
except Exception:
txt = ''
app.logger.error('admin users_with_permission failed %s %s', getattr(he.response, 'status_code', '?'), txt)
code = getattr(he.response, 'status_code', 500) or 500
return jsonify({"status":"error","error":"list failed","upstream_status": code}), 502
except Exception:
app.logger.exception('admin users_with_permission failed')
return jsonify({"status":"error","error":"list failed"}), 500
# ---- POST /data : save one submission ----
@app.post('/data')
def receive_data():
try:
payload = request.get_json(silent=True)
if not isinstance(payload, dict):
return jsonify({"status": "error", "error": "invalid JSON"}), 400
uuid = payload.get('UUID') or 'nouuid'
exp = payload.get('experiment') or 'exp'
ts = datetime.utcnow().strftime('%Y%m%d-%H%M%S') # timestamp in name
fname = f"{exp}_{uuid}_{ts}.json"
final_path = os.path.join(UPLOAD_DIRECTORY, fname)
ensure_upload_dir()
# atomic write to avoid partial files
tmp_path = final_path + '.tmp'
with open(tmp_path, 'w') as f:
json.dump(payload, f, indent=2)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, final_path)
# fsync the dir so the file appears immediately in listings
dir_fd = os.open(UPLOAD_DIRECTORY, os.O_DIRECTORY)
try:
os.fsync(dir_fd)
finally:
os.close(dir_fd)
app.logger.info(f"saved {final_path}")
return jsonify({"status": "ok", "saved": final_path}), 200
except PermissionError:
app.logger.exception("permission")
return jsonify({"status":"error","error":"permission denied"}), 500
except Exception as e:
app.logger.exception("save error")
return jsonify({"status":"error","error":str(e)}), 500
# ---- GET /api/uploads : list files (JSON or simple HTML) ----
def _list_files(pattern=None, ext=None, sort='date', order='desc',
limit=200, offset=0, min_size=0, max_size=None,
since=None, until=None):
ensure_upload_dir()
files=[]
since_ts = datetime.fromisoformat(since).timestamp() if since else None
until_ts = datetime.fromisoformat(until).timestamp() if until else None
with os.scandir(UPLOAD_DIRECTORY) as it:
for e in it:
if not e.is_file():
continue
name = e.name
if pattern and not fnmatch.fnmatch(name, pattern):
continue
if ext and not name.lower().endswith(ext.lower()):
continue
st = e.stat()
if st.st_size < int(min_size):
continue
if max_size is not None and st.st_size > int(max_size):
continue
if since_ts and st.st_mtime < since_ts:
continue
if until_ts and st.st_mtime > until_ts:
continue
files.append({
"name": name,
"size": st.st_size,
"mtime": datetime.fromtimestamp(st.st_mtime).isoformat(timespec="seconds"),
"url": f"/uploads/{name}",
})
key = {"date":"mtime","name":"name","size":"size"}.get(sort, "mtime")
reverse = (order == 'desc')
files.sort(key=lambda x: x[key], reverse=reverse)
return files[int(offset): int(offset)+int(limit)]
@app.get('/api/uploads')
def api_uploads():
q = request.args
files = _list_files(
pattern=q.get('pattern'),
ext=q.get('ext'),
sort=q.get('sort','date'),
order=q.get('order','desc'),
limit=q.get('limit',200),
offset=q.get('offset',0),
min_size=q.get('min_size',0),
max_size=q.get('max_size'),
since=q.get('since'),
until=q.get('until'),
)
if q.get('format') == 'html':
rows = "\n".join(
f'<tr><td><a href="{f["url"]}">{f["name"]}</a></td>'
f'<td style="text-align:right">{f["size"]}</td>'
f'<td>{f["mtime"]}</td></tr>' for f in files
)
html = f"""<!doctype html><meta charset="utf-8"><title>Uploads</title>
<style>body{{font-family:system-ui,Arial}} table{{border-collapse:collapse}} td,th{{padding:6px 10px;border-bottom:1px solid #ddd}}</style>
<h1>Uploads</h1>
<table><thead><tr><th>Name</th><th>Size</th><th>Modified</th></tr></thead><tbody>{rows}</tbody></table>"""
return Response(html, mimetype="text/html")
return jsonify({"count": len(files), "files": files})
# ---- RESULTS SPA & API ----
@app.get('/results')
@require_results_auth
def results_page():
# Serve the static SPA
return send_from_directory(os.path.join(app.root_path, 'static', 'results'), 'index.html')
@app.get('/results/<path:filename>')
@require_results_auth
def results_assets(filename):
base_dir = os.path.join(app.root_path, 'static', 'results')
return send_from_directory(base_dir, filename)
@app.get('/api/results/list')
@require_results_scope('read:results')
def results_list():
q = request.args
files = _list_files(
pattern=q.get('pattern'),
ext=q.get('ext', '.json'),
sort=q.get('sort', 'date'),
order=q.get('order', 'desc'),
limit=q.get('limit', 200),
offset=q.get('offset', 0),
min_size=q.get('min_size', 0),
max_size=q.get('max_size'),
since=q.get('since'),
until=q.get('until'),
)
return jsonify({"count": len(files), "files": files})
def _safe_join_uploads(name: str) -> str:
# prevent path traversal; only allow plain filenames within UPLOAD_DIRECTORY
if not name or '/' in name or '\\' in name or '..' in name:
raise ValueError('invalid name')
path = os.path.realpath(os.path.join(UPLOAD_DIRECTORY, name))
base = os.path.realpath(UPLOAD_DIRECTORY)
if not path.startswith(base + os.sep):
raise ValueError('invalid path')
return path
@app.get('/api/results/file/<path:name>')
@require_results_auth
def results_file(name):
try:
if not name.lower().endswith('.json'):
return jsonify({"status": "error", "error": "not a JSON file"}), 400
file_path = _safe_join_uploads(name)
with open(file_path, 'rb') as f:
data = f.read()
return Response(data, mimetype='application/json')
except FileNotFoundError:
return jsonify({"status": "error", "error": "not found"}), 404
except ValueError:
return jsonify({"status": "error", "error": "invalid name"}), 400
except Exception as e:
app.logger.exception('file preview error')
return jsonify({"status": "error", "error": str(e)}), 500
@app.get('/api/results/zip')
@require_results_scope('read:results')
def results_zip():
q = request.args
max_files = int(q.get('max_files', 500))
files = _list_files(
pattern=q.get('pattern'),
ext=q.get('ext', '.json'),
sort=q.get('sort', 'date'),
order=q.get('order', 'desc'),
limit=q.get('limit', 200),
offset=q.get('offset', 0),
min_size=q.get('min_size', 0),
max_size=q.get('max_size'),
since=q.get('since'),
until=q.get('until'),
)
files = files[:max_files]
buf = io.BytesIO()
with zipfile.ZipFile(buf, 'w', compression=zipfile.ZIP_DEFLATED) as zf:
for fmeta in files:
name = fmeta.get('name')
try:
file_path = _safe_join_uploads(name)
zf.write(file_path, arcname=name)
except Exception:
# skip problematic files but continue
app.logger.exception(f"zip add failed for {name}")
continue
buf.seek(0)
ts = datetime.utcnow().strftime('%Y%m%d-%H%M%S')
resp = Response(buf.read(), mimetype='application/zip')
resp.headers['Content-Disposition'] = f'attachment; filename="results-{ts}.zip"'
return resp
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000)