Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
### v0.7.x
- fix: allow dynamic header composition in v2 http client ([#493](https://github.com/RWTH-EBC/FiLiP/pull/493))

### v0.7.4
- fix: various bug fixes regarding query string and its regex ([#469](https://github.com/RWTH-EBC/FiLiP/pull/469))
- fix: fixed outdated module overview diagram ([#467](https://github.com/RWTH-EBC/FiLiP/pull/467))
Expand Down
214 changes: 78 additions & 136 deletions examples/ngsi_v2/e13_ngsi_v2_secure_fiware_headers.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
"""
# This example shows how to generate an access token from an authnetication srever in order to access Fiware services
# which are protected behind an authentication/authorisation layer.
Example e13: Obtain FIWARE tokens via python-keycloak and refresh them per request.
The FiwareHeaderSecureKeycloak class uses a KeycloakTokenManager to fetch and cache tokens, refreshing them before they
expire so every outgoing request receives a fresh Authorization header.
"""

import os

import time
from typing import Optional
import requests
import urllib3

from keycloak import KeycloakOpenID
from pydantic import ConfigDict, computed_field, PrivateAttr
from filip.clients.ngsi_v2 import ContextBrokerClient
from filip.config import settings
from filip.models.base import FiwareHeaderSecure
from filip.models.base import FiwareHeader
Comment thread
djs0109 marked this conversation as resolved.

urllib3.disable_warnings()
session = requests.Session()
Expand All @@ -19,151 +22,90 @@
SERVICE = "filip"
# FIWARE-Servicepath
SERVICE_PATH = "/"
# Provide client credentials which are used when generating access token from authentication server
CLIENT_ID = "client_id"
CLIENT_SECRET = "client_secret"
# TODO: Please adapt it according to your authentication server which is generating access token for the service
KEYCLOAK_HOST = "https://keycloak.example.com"


class KeycloakPython:
def __init__(self, keycloak_host=None, client_id=None, client_secret=None):
"""
- Initialze the Keycloak Host , Client ID and Client secret.
- If no parameters are passed .env file is used
- Priority : function parameters > Class Instatiation > .env file
"""
# if keycloak_host == None:
# self.keycloak_host = os.getenv('KEYCLOAK_HOST')
# else:
# self.keycloak_host = keycloak_host
self.keycloak_host = (
os.getenv("KEYCLOAK_HOST") if keycloak_host == None else keycloak_host
)
self.client_id = os.getenv("CLIENT_ID") if client_id == None else client_id
self.client_secret = (
os.getenv("CLIENT_SECRET") if client_secret == None else client_secret
)
# TODO Provide client credentials from environment (fall back to placeholders for demo purposes)
KEYCLOAK_HOST = os.getenv("KEYCLOAK_HOST", "https://keycloak.example.com")
KEYCLOAK_REALM = os.getenv("KEYCLOAK_REALM", "example-realm")
CLIENT_ID = os.getenv("KEYCLOAK_CLIENT_ID", "demo-client")
CLIENT_SECRET = os.getenv("KEYCLOAK_CLIENT_SECRET", "demo-secret")

def get_access_token(self, keycloak_host=None, client_id=None, client_secret=None):
"""
- Get access token for a given client id and client secret.
"""
self.keycloak_host = (
keycloak_host if keycloak_host != None else self.keycloak_host
)
self.client_id = client_id if client_id != None else self.client_id
self.client_secret = (
client_secret if client_secret != None else self.client_secret
)
self.data = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "email",
"grant_type": "client_credentials",
}
try:
headers = {"content-type": "application/x-www-form-urlencoded"}
access_data = requests.post(
self.keycloak_host, data=self.data, headers=headers
)
expires_in = access_data.json()["expires_in"]
access_token = access_data.json()["access_token"]
return access_token, expires_in
except requests.exceptions.RequestException as err:
raise KeycloakPythonException(err.args[0])

def get_data(
self,
client_host,
headers={},
keycloak_host=None,
client_id=None,
client_secret=None,
):
"""
- Get data for a given api.
- Mandatory input - Target api, fiware-service and fiware-servicepath headers
- Optional Inmput - Keycloak host, Client ID, Client Secret
"""
access_token, expires_in = self.get_access_token(
keycloak_host=keycloak_host,
client_id=client_id,
client_secret=client_secret,
)
headers["Authorization"] = "Bearer %s" % (access_token)
response = requests.get(client_host, headers=headers)
return response.text

def post_data(
self,
client_host,
data,
headers={},
keycloak_host=None,
client_id=None,
client_secret=None,
):
"""
- Post data for a given api.
- Mandatory input - Target api, headers, request body.
- Optional Inmput - Keycloak host, Client ID, Client Secret.
"""
access_token, expires_in = self.get_access_token(
keycloak_host=keycloak_host,
client_id=client_id,
client_secret=client_secret,
)
headers["Content-Type"] = "application/json"
headers["Authorization"] = "Bearer %s" % (access_token)
response = requests.post(client_host, data=data, headers=headers)
return response
class KeycloakTokenManager:
"""Caches and refreshes client-credential tokens coming from python-keycloak."""

def patch_data(
def __init__(
self,
client_host,
json,
headers={},
keycloak_host=None,
client_id=None,
client_secret=None,
keycloak_host: str,
realm_name: str,
client_id: str,
client_secret: str,
refresh_margin: int = 10,
):
"""
- Patch data for a given api.
- Mandatory input - Target api, headers, request body.
- Optional Inmput - Keycloak host, Client ID, Client Secret.
"""
access_token, expires_in = self.get_access_token(
keycloak_host=keycloak_host,
server_url = keycloak_host.rstrip("/") + "/"
self._client = KeycloakOpenID(
server_url=server_url,
realm_name=realm_name,
client_id=client_id,
client_secret=client_secret,
client_secret_key=client_secret,
)
headers["Content-Type"] = "application/json"
headers["Authorization"] = "Bearer %s" % (access_token)
response = requests.patch(url=client_host, json=json, headers=headers)
return response
self._token: Optional[str] = None
self._expires_at: float = 0.0
self._refresh_margin = refresh_margin

def get_token(self) -> str:
now = time.time()
if not self._token or now >= (self._expires_at - self._refresh_margin):
token_data = self._client.token(grant_type="client_credentials")
self._token = token_data["access_token"]
self._expires_at = now + token_data.get("expires_in", 0)
return self._token


class FiwareHeaderSecureKeycloak(FiwareHeader):
"""Fiware header that fetches Keycloak authorization on demand."""

model_config = ConfigDict(arbitrary_types_allowed=True)
_token_manager: KeycloakTokenManager = PrivateAttr(default=None)

def __init__(self, *, token_manager: KeycloakTokenManager, **data):
super().__init__(**data)
self._token_manager = token_manager

@computed_field(alias="authorization", return_type=str)
def authorization(self) -> str:
return f"Bearer {self._token_manager.get_token()}"

class KeycloakPythonException(Exception):
def __init__(self, message):
self.message = message
super().__init__(self.message)

def demonstrate_dynamic_requests(
client: ContextBrokerClient, header: FiwareHeader
) -> None:
"""Print successive Authorization headers to show automatic refreshes."""

for attempt in range(2):
headers = header.model_dump(by_alias=True)
print(f"Attempt {attempt + 1} Authorization: {headers['authorization']}")
try:
client.get_entity_list(limit=0)
except Exception as exc:
print(f"Call #{attempt + 1} failed in demo environment: {exc}")
time.sleep(2)


if __name__ == "__main__":
# get token from keycloak
token, in_sec = KeycloakPython(
KEYCLOAK_HOST, CLIENT_ID, CLIENT_SECRET
).get_access_token()

# create secure fiware header with authorisation token
fiware_header = FiwareHeaderSecure(
service=SERVICE, service_path=SERVICE_PATH, authorization="Bearer %s" % token
token_manager = KeycloakTokenManager(
keycloak_host=KEYCLOAK_HOST,
realm_name=KEYCLOAK_REALM,
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
)

fiware_header = FiwareHeaderSecureKeycloak(
service=SERVICE,
service_path=SERVICE_PATH,
token_manager=token_manager,
)

# create a context broker client
cb_client = ContextBrokerClient(
url=CB_URL, fiware_header=fiware_header, session=session
)
# you don't need to set any extra parameter for requesting the service besides setting session in the client object
entity_list = cb_client.get_entity_list()

demonstrate_dynamic_requests(client=cb_client, header=fiware_header)
Loading
Loading