Skip to content
54 changes: 54 additions & 0 deletions docs/api-tokens.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# API Tokens

## Overview

Opaque, randomly generated tokens that authenticate API requests via the `X-API-KEY` header. Each user can have multiple active tokens. The plaintext is returned only once at creation.

Tokens are stored as HMAC-SHA256 hashes in a dedicated `api_token` MongoDB collection. Revoked tokens are kept for audit (soft-delete via `revoked_at`).

## Design decisions

### HMAC-SHA256 instead of plain SHA-256

A plain hash would let an attacker who obtains a database dump verify token candidates offline. HMAC-SHA256 with a server-side secret (`API_TOKEN_SECRET`) means the DB alone is not enough — the attacker also needs the secret.

A slow hash (bcrypt/argon2) is unnecessary here: tokens have 48 bytes of entropy from `secrets.token_urlsafe`, making brute-force infeasible regardless of hash speed.

### Separate `API_TOKEN_SECRET` instead of reusing `SECRET_KEY`

`SECRET_KEY` is used by Flask for session signing and by Flask-Security for password reset tokens. If a token hash secret needs to be rotated (e.g. suspected leak), rotating `SECRET_KEY` would invalidate all sessions and pending password resets. A dedicated secret allows independent rotation.

### `API_TOKEN_PREFIX` and display prefix

A configurable prefix (default: `udata_`) is prepended to every generated token. This serves two purposes: secret scanning tools (GitHub, GitLeaks, etc.) can detect leaked tokens via pattern matching, and different prefixes per environment (e.g. `udata_prod_`, `udata_demo_`) let users identify which environment a token belongs to.

Since the plaintext is never stored, we also save a `token_prefix`: the global prefix followed by the first 8 characters of the random part (e.g. `udata_abc123xy`). This lets users match the beginning of their token as they see it with what's displayed in the UI.

### Scope defaults to `admin`

Tokens currently grant full access matching the user's permissions. The scope is `admin` to reflect this reality. When restricted scopes are added (phase 2), new values like `normal` will be introduced with the corresponding permission checks.

### Revocation as soft-delete

Deleting a token marks it as revoked (`revoked_at` timestamp) rather than removing the record from the database. This keeps an audit trail (who created what, when it was revoked). No cleanup job for now (see Future work).

## Configuration

| Setting | Default | Description |
|---------|---------|-------------|
| `API_TOKEN_PREFIX` | `udata_` | Prefix for secret scanning tool detection. |
| `API_TOKEN_SECRET` | *(empty — must be set)* | HMAC key for token hashing. The app refuses to start without it. |

## Migration

The migration `2026-01-28-migrate-apikeys-to-api-tokens.py` hashes each existing `User.apikey` with HMAC-SHA256 into the new collection, then removes the `apikey` field from all user documents.

The migration is idempotent (checks for existing hashes before inserting). Existing API keys continue to work after migration because the same plaintext produces the same HMAC hash.

## Future work

- **Restricted scopes (phase 2)**: add `normal` scope with permission enforcement, so that admin-only endpoints require `scope="admin"` + `is_admin`
- **Refresh tokens**: add `refresh` kind for short-lived JWT access tokens in the SPA
- **Bulk revocation**: `DELETE /api/1/me/tokens/` (all my tokens), admin endpoint for revoking a user's tokens
- **Default expiration**: configurable duration applied when no explicit `expires_at` is provided
- **Token cleanup**: background job to purge revoked and/or expired tokens after a retention period (similar to GitLab's daily cleanup)
15 changes: 10 additions & 5 deletions udata/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,25 @@ def authentify(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
from udata.api.oauth2 import check_credentials
from udata.core.user.models import User

if current_user.is_authenticated:
return func(*args, **kwargs)

apikey = request.headers.get(HEADER_API_KEY)
if apikey:
try:
user = User.objects.get(apikey=apikey)
except User.DoesNotExist:
from udata.core.user.api_tokens import ApiToken

api_token, error = ApiToken.authenticate(apikey)
if api_token is None:
if error == "revoked":
self.abort(401, "Revoked API Key")
elif error == "expired":
self.abort(401, "Expired API Key")
self.abort(401, "Invalid API Key")

if not login_user(user, False):
if not login_user(api_token.user, False):
self.abort(401, "Inactive user")
api_token.update_usage(request.headers.get("User-Agent"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to update usage in authenticate directly?

else:
check_credentials()
return func(*args, **kwargs)
Expand Down
4 changes: 4 additions & 0 deletions udata/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,12 @@ def send_mail(


def init_app(app):
from udata.errors import ConfigError
from udata.models import datastore

if not app.config.get("API_TOKEN_SECRET"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed it may be better to raise instead of leaving it by default. However, we should do the same for a list of settings for coherence?

for config in ["SECRET_KEY", "API_TOKEN_SECRET"]:
...

And I would add a API_TOKEN_SECRET in the different udata.cfg samples in docs.

raise ConfigError("API_TOKEN_SECRET must be set to a unique, random value.")

from .forms import (
ExtendedForgotPasswordForm,
ExtendedLoginForm,
Expand Down
70 changes: 50 additions & 20 deletions udata/core/user/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
from udata.models import CommunityResource, Dataset, Reuse, User

from .api_fields import (
apikey_fields,
me_fields,
me_metrics_fields,
user_fields,
user_page_fields,
user_role_fields,
user_suggestion_fields,
)
from .api_tokens import ApiToken, apitoken_created_fields
from .forms import UserProfileAdminForm, UserProfileForm

DEFAULT_SORTING = "-created_at"
Expand Down Expand Up @@ -62,15 +61,15 @@ class UserApiParser(ModelApiParser):
class MeAPI(API):
@api.secure
@api.doc("get_me")
@api.marshal_with(me_fields)
@api.marshal_with(user_fields)
def get(self):
"""Fetch the current user (me) identity"""
return current_user._get_current_object()

@api.secure
@api.doc("update_me")
@api.expect(me_fields)
@api.marshal_with(me_fields)
@api.expect(user_fields)
@api.marshal_with(user_fields)
@api.response(400, "Validation error")
def put(self, **kwargs):
"""Update my profile"""
Expand Down Expand Up @@ -197,25 +196,56 @@ def get(self):
return list(discussions)


@me.route("/apikey/", endpoint="my_apikey")
class ApiKeyAPI(API):
token_parser = api.parser()
token_parser.add_argument("name", type=str, help="A label for this token", location="json")
token_parser.add_argument(
"expires_at", type=str, help="Expiration date (ISO 8601)", location="json"
)


@me.route("/api_tokens/", endpoint="my_api_tokens")
class ApiTokenListAPI(API):
@api.secure
@api.doc("generate_apikey")
@api.marshal_with(apikey_fields)
@api.response(201, "API Key generated")
@api.doc("list_api_tokens")
@api.marshal_list_with(ApiToken.__read_fields__)
def get(self):
"""List all my active API tokens"""
return list(ApiToken.objects(user=current_user.id, revoked_at=None))

@api.secure
@api.doc("create_api_token")
@api.expect(token_parser)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we use ApiToken.__write_fields__?

@api.marshal_with(apitoken_created_fields, code=201)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering, could we use ApiToken.__read_fields__ with a mask?

def post(self):
"""(Re)Generate my API Key"""
current_user.generate_api_key()
current_user.save()
return current_user, 201
"""Create a new API token. The plaintext token is returned only once."""
args = token_parser.parse_args()
user = current_user._get_current_object()
expires_at = None
if args.get("expires_at"):
expires_at = datetime.fromisoformat(args["expires_at"])
token, plaintext = ApiToken.generate(user, name=args.get("name"), expires_at=expires_at)
token._plaintext = plaintext
return token, 201


@me.route("/api_tokens/<string:id>/", endpoint="my_api_token")
class ApiTokenAPI(API):
@api.secure
@api.doc("clear_apikey")
@api.response(204, "API Key deleted/cleared")
def delete(self):
"""Clear/destroy an apikey"""
current_user.apikey = None
current_user.save()
@api.doc("revoke_api_token")
@api.response(204, "Token revoked")
@api.response(404, "Token not found")
@api.response(410, "Token already revoked")
def delete(self, id):
"""Revoke an API token"""
token = ApiToken.objects(
id=id,
user=current_user.id,
).first()
if not token:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we could differentiate between 404 not found and 410 token already revoked?

api.abort(404, "Token not found")
if token.revoked_at is not None:
api.abort(410, "Token already revoked")
token.revoke()
return "", 204


Expand Down
15 changes: 0 additions & 15 deletions udata/core/user/api_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,6 @@
},
)

me_fields = api.inherit(
"Me",
user_fields,
{
"apikey": fields.String(description="The user API Key", readonly=True),
},
)

me_metrics_fields = api.model(
"MyMetrics",
{
Expand All @@ -108,13 +100,6 @@
},
)

apikey_fields = api.model(
"ApiKey",
{
"apikey": fields.String(description="The user API Key", readonly=True),
},
)

user_page_fields = api.model("UserPage", fields.pager(user_fields))

user_suggestion_fields = api.model(
Expand Down
141 changes: 141 additions & 0 deletions udata/core/user/api_tokens.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import hashlib
import hmac
import secrets
from datetime import datetime, timezone

from flask import current_app

from udata.api import api, fields
from udata.api_fields import field, generate_fields
from udata.models import db

TOKEN_BYTE_LENGTH = 48
PREFIX_DISPLAY_LENGTH = 8
MAX_USER_AGENTS = 20


def _hash_token(plaintext):
key = current_app.config["API_TOKEN_SECRET"].encode()
return hmac.new(key, plaintext.encode(), hashlib.sha256).hexdigest()


@generate_fields()
class ApiToken(db.Document):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather have this model in user/models.py or a dedicated apikey/models.py?

token_hash = db.StringField(required=True, unique=True)
token_prefix = field(
db.StringField(required=True),
readonly=True,
description="First characters of the token for identification",
)
user = db.ReferenceField("User", required=True, reverse_delete_rule=db.CASCADE)
name = field(
db.StringField(max_length=255),
description="User-given label for this token",
)
scope = field(
db.StringField(choices=["admin"], default="admin"),
readonly=True,
description="Token scope",
)
kind = field(
db.StringField(choices=["api_key"], default="api_key"),
readonly=True,
description="Token type",
)
created_at = field(
db.DateTimeField(default=lambda: datetime.now(timezone.utc), required=True),
readonly=True,
description="Token creation date",
)
last_used_at = field(
db.DateTimeField(),
readonly=True,
description="Last time this token was used",
)
user_agents = field(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool to track this!
I was wondering if our target would be to track apikey usage through activities? In order to have the information (timestamp, activity, useragent)?

db.ListField(db.StringField()),
readonly=True,
description="User agents that have used this token",
)
revoked_at = field(
db.DateTimeField(),
readonly=True,
description="When this token was revoked",
)
expires_at = field(
db.DateTimeField(),
description="When this token expires",
)

meta = {
"collection": "api_token",
"indexes": [
"token_hash",
"user",
("user", "-created_at"),
],
"ordering": ["-created_at"],
}

@classmethod
def generate(cls, user, name=None, expires_at=None):
"""Create a new token. Returns (ApiToken, plaintext_token)."""
prefix = current_app.config.get("API_TOKEN_PREFIX", "udata_")
raw = secrets.token_urlsafe(TOKEN_BYTE_LENGTH)
plaintext = f"{prefix}{raw}"
token_hash = _hash_token(plaintext)
display_prefix = plaintext[: len(prefix) + PREFIX_DISPLAY_LENGTH]
token = cls(
token_hash=token_hash,
token_prefix=display_prefix,
user=user,
name=name,
expires_at=expires_at,
)
token.save()
return token, plaintext

@classmethod
def authenticate(cls, plaintext_token):
"""Lookup a token by hashing the plaintext.

Returns (ApiToken, None) on success, or (None, error_reason) on failure.
error_reason is one of: "invalid", "revoked", "expired".
"""
token_hash = _hash_token(plaintext_token)
token = cls.objects(token_hash=token_hash).first()
if token is None:
return None, "invalid"
if token.revoked_at is not None:
return None, "revoked"
if token.expires_at:
expires_at = token.expires_at
if expires_at.tzinfo is None:
expires_at = expires_at.replace(tzinfo=timezone.utc)
if expires_at < datetime.now(timezone.utc):
return None, "expired"
return token, None

def revoke(self):
self.revoked_at = datetime.now(timezone.utc)
self.save()

def update_usage(self, user_agent=None):
update_kwargs = {"set__last_used_at": datetime.now(timezone.utc)}
agents = self.user_agents or []
if user_agent and user_agent not in agents and len(agents) < MAX_USER_AGENTS:
update_kwargs["push__user_agents"] = user_agent
type(self).objects(id=self.id).update_one(**update_kwargs)


apitoken_created_fields = api.inherit(
"ApiTokenCreated",
ApiToken.__read_fields__,
{
"token": fields.String(
attribute="_plaintext",
readonly=True,
description="The plaintext token (shown only once at creation)",
),
},
)
Loading