-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmeta_mcp.py
More file actions
133 lines (111 loc) · 4.33 KB
/
meta_mcp.py
File metadata and controls
133 lines (111 loc) · 4.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import os
import jwt
import httpx
from jwt import PyJWKClient
from fastapi import APIRouter, Request
from fastapi.responses import Response, StreamingResponse
router = APIRouter()
# Upstream MCP endpoint
META_MCP_UPSTREAM = os.environ.get("META_MCP_UPSTREAM", "http://127.0.0.1:8088/mcp")
AUTH0_DOMAIN = os.environ.get("AUTH0_DOMAIN")
AUTH0_META_AUDIENCE = os.environ.get("AUTH0_META_AUDIENCE") # set this
PRM_META_URL = "https://mcp.backyardbrains.com/.well-known/oauth-protected-resource/meta"
_jwks = None
def _jwks_client():
global _jwks
if _jwks is None:
_jwks = PyJWKClient(f"https://{AUTH0_DOMAIN}/.well-known/jwks.json")
return _jwks
def _challenge_401():
r = Response(status_code=401)
r.headers["WWW-Authenticate"] = f'Bearer realm="mcp", resource_metadata="{PRM_META_URL}"'
return r
def _validate(token: str) -> dict:
key = _jwks_client().get_signing_key_from_jwt(token).key
return jwt.decode(
token,
key,
algorithms=["RS256"],
audience=AUTH0_META_AUDIENCE,
issuer=f"https://{AUTH0_DOMAIN}/",
)
def _has_scope(claims: dict, scope: str) -> bool:
perms = claims.get("permissions") or []
scopes = (claims.get("scope") or "").split()
return scope in perms or scope in scopes
from auth import check_permissions, validate_opaque_token
from fastapi import HTTPException
from utils import logger
@router.api_route("/meta/", methods=["GET", "POST"])
async def meta_gateway_any(request: Request):
method = request.method
# 1) Require bearer
auth_header = request.headers.get("authorization", "")
if not auth_header.lower().startswith("bearer "):
return _challenge_401()
token = auth_header.split(" ", 1)[1].strip()
# 2) Validate Token
try:
claims = await validate_opaque_token(token)
except Exception as e:
logger.warning(f"Meta token validation failed: {e}")
return _challenge_401()
# 3) Enforce scope
if not check_permissions(claims, ["mcp:read:meta", "mcp:write:meta"]):
logger.warning(f"Insufficient permissions. Claims: {claims}")
return Response(status_code=403, content="Insufficient permissions")
# 4) Proxy to upstream
body = await request.body()
# Forward client's Accept header so upstream knows what to return
accept = request.headers.get("accept") or "application/json, text/event-stream"
headers = {
"Content-Type": request.headers.get("content-type", "application/json"),
"Accept": accept,
}
# Use a long timeout for SSE
client = httpx.AsyncClient(timeout=300)
# We must stream the request to inspect headers before deciding response type
upstream_req = client.build_request(method, META_MCP_UPSTREAM, content=body, headers=headers)
try:
r = await client.send(upstream_req, stream=True)
except Exception as e:
await client.aclose()
logger.error(f"Upstream connection failed: {e}")
return Response(status_code=502, content=f"Upstream unavailable: {e}")
upstream_content_type = r.headers.get("content-type", "")
logger.info(f"Upstream response: {r.status_code} type={upstream_content_type}")
if "text/event-stream" in upstream_content_type:
# Pass through the stream for SSE
async def iterate_stream():
try:
async for chunk in r.aiter_bytes():
yield chunk
except Exception as e:
logger.error(f"Stream error: {e}")
finally:
await r.aclose()
await client.aclose()
return StreamingResponse(
iterate_stream(),
status_code=r.status_code,
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Connection": "keep-alive"
},
)
else:
# Standard response (JSON/text)
# Read the whole body so we can release the client
try:
content = await r.aread()
finally:
await r.aclose()
await client.aclose()
logger.info(f"Upstream body len={len(content)}")
return Response(
content=content,
status_code=r.status_code,
media_type=upstream_content_type or "application/json",
)