🔧 High-performance Python rate limiting library with multiple algorithms (Fixed Window, Sliding Window, Token Bucket, Leaky Bucket & GCRA) and storage backends (Redis, In-Memory).
简体中文 | English
🔰 Installation | 🎨 Quick Start | 📝 Usage | ⚙️ Data Models | 📊 Benchmarks | 🍃 Inspiration | 📚 Version History | 📄 License
- Supports both synchronous and asynchronous (
async / await). - Provides thread-safe storage backends: Redis(Standalone/Sentinel/Cluster), In-Memory (with support for key expiration and eviction).
- Supports multiple rate limiting algorithms: Fixed Window, Sliding Window, Token Bucket, Leaky Bucket & Generic Cell Rate Algorithm (GCRA).
- Supports configuration of rate limiting algorithms and provides flexible quota configuration.
- Supports immediate response and wait-retry modes, and provides function call, decorator, and context manager modes.
- Supports integration with the MCP Python SDK to provide rate limiting support for model dialog processes.
- Excellent performance, The execution time for a single rate limiting API call is equivalent to(see Benchmarks for details):
- In-Memory: ~2.5-4.5x
dict[key] += 1operations. - Redis: ~1.06-1.37x
INCRBY key incrementoperations.
- In-Memory: ~2.5-4.5x
$ pip install throttled-pyStarting from v2.0.0, only core dependencies are installed by default.
To enable additional features, install optional dependencies as follows (multiple extras can be comma-separated):
$ pip install "throttled-py[redis]"
$ pip install "throttled-py[redis,in-memory]"| Extra | Description |
|---|---|
all |
Install all extras. |
in-memory |
Use In-Memory as storage backend. |
redis |
Use Redis as storage backend. |
limit: Deduct requests and return RateLimitResult.peek: Check current rate limit state for a key (returns RateLimitState).
from throttled import RateLimiterType, Throttled, rate_limiter, utils
throttle = Throttled(
# 📈 Use Token Bucket algorithm
using=RateLimiterType.TOKEN_BUCKET.value,
# 🪣 Set quota: 1,000 tokens per second (limit), bucket size 1,000 (burst)
quota=rate_limiter.per_sec(1_000, burst=1_000),
# 📁 By default, global MemoryStore is used as the storage backend.
)
def call_api() -> bool:
# 💧 Deduct 1 token for key="/ping"
result = throttle.limit("/ping", cost=1)
return result.limited
if __name__ == "__main__":
# 💻 Python 3.12.10, Linux 5.4.119-1-tlinux4-0009.1, Arch: x86_64, Specs: 2C4G.
# ✅ Total: 100000, 🕒 Latency: 0.0068 ms/op, 🚀 Throughput: 122513 req/s (--)
# ❌ Denied: 98000 requests
benchmark: utils.Benchmark = utils.Benchmark()
denied_num: int = sum(benchmark.serial(call_api, 100_000))
print(f"❌ Denied: {denied_num} requests")The core API is the same for synchronous and asynchronous code. Just replace from throttled import ... with from throttled.asyncio import ... in your code.
For example, rewrite 2) Example to asynchronous:
import asyncio
from throttled.asyncio import RateLimiterType, Throttled, rate_limiter, utils
throttle = Throttled(
using=RateLimiterType.TOKEN_BUCKET.value,
quota=rate_limiter.per_sec(1_000, burst=1_000)
)
async def call_api() -> bool:
result = await throttle.limit("/ping", cost=1)
return result.limited
async def main():
benchmark: utils.Benchmark = utils.Benchmark()
denied_num: int = sum(await benchmark.async_serial(call_api, 100_000))
print(f"❌ Denied: {denied_num} requests")
if __name__ == "__main__":
asyncio.run(main())from throttled import Throttled
# Default: In-Memory storage, Token Bucket algorithm, 60 reqs / min.
throttle = Throttled()
# Deduct 1 request -> RateLimitResult(limited=False,
# state=RateLimitState(limit=60, remaining=59, reset_after=1, retry_after=0))
print(throttle.limit("key", 1))
# Check state -> RateLimitState(limit=60, remaining=59, reset_after=1, retry_after=0)
print(throttle.peek("key"))
# Deduct 60 requests (limited) -> RateLimitResult(limited=True,
# state=RateLimitState(limit=60, remaining=59, reset_after=1, retry_after=60))
print(throttle.limit("key", 60))from throttled import Throttled, rate_limiter, exceptions
@Throttled(key="/ping", quota=rate_limiter.per_min(1))
def ping() -> str:
return "ping"
ping()
try:
ping() # Raises LimitedError
except exceptions.LimitedError as exc:
print(exc) # Rate limit exceeded: remaining=0, reset_after=60, retry_after=60You can use the context manager to limit the code block. When access is allowed, return RateLimitResult.
If the limit is exceeded or the retry timeout is exceeded, it will raise LimitedError.
from throttled import Throttled, exceptions, rate_limiter
def call_api():
print("doing something...")
throttle: Throttled = Throttled(key="/api/v1/users/", quota=rate_limiter.per_min(1))
with throttle as rate_limit_result:
print(f"limited: {rate_limit_result.limited}")
call_api()
try:
with throttle:
call_api()
except exceptions.LimitedError as exc:
print(exc) # Rate limit exceeded: remaining=0, reset_after=60, retry_after=60By default, rate limiting returns RateLimitResult immediately.
You can specify a timeout to enable wait-and-retry behavior. The rate limiter will wait according to the retry_after value in RateLimitState and retry automatically.
Returns the final RateLimitResult when the request is allowed or timeout reached.
from throttled import RateLimiterType, Throttled, rate_limiter, utils
throttle = Throttled(
using=RateLimiterType.GCRA.value,
quota=rate_limiter.per_sec(100, burst=100),
# ⏳ Set timeout=1 to enable wait-and-retry (max wait 1 second)
timeout=1,
)
def call_api() -> bool:
# ⬆️⏳ Function-level timeout overrides global timeout
result = throttle.limit("/ping", cost=1, timeout=1)
return result.limited
if __name__ == "__main__":
# 👇 The actual QPS is close to the preset quota (100 req/s):
# ✅ Total: 1000, 🕒 Latency: 35.8103 ms/op, 🚀 Throughput: 111 req/s (--)
# ❌ Denied: 8 requests
benchmark: utils.Benchmark = utils.Benchmark()
denied_num: int = sum(benchmark.concurrent(call_api, 1_000, workers=4))
print(f"❌ Denied: {denied_num} requests")You only need very simple configuration, and it supports connecting to Redis standalone, sentinel, and cluster modes.
The following example uses Redis as the storage backend, options supports all Redis configuration items, see RedisStore Options.
from throttled import RateLimiterType, Throttled, rate_limiter, store
@Throttled(
key="/api/products",
using=RateLimiterType.TOKEN_BUCKET.value,
quota=rate_limiter.per_min(1),
store=store.RedisStore(
# Standalone mode
server="redis://127.0.0.1:6379/0",
# Sentinel mode
# server="redis+sentinel://:yourpassword@host1:26379,host2:26379/mymaster"
# Cluster mode
# server="redis+cluster://:yourpassword@host1:6379,host2:6379",
options={}
),
)
def products() -> list:
return [{"name": "iPhone"}, {"name": "MacBook"}]
products() # Success
products() # Raises LimitedErrorBy default, a global MemoryStore instance with a maximum capacity of 1024 is used as the storage backend when no storage backend is specified. Therefore, it is usually not necessary to manually create a MemoryStore instance.
Different instances mean different storage spaces, if you want to throttle the same Key at different locations in your program, make sure that Throttled receives the same MemoryStore and uses a consistent Quota.
The following example uses memory as the storage backend and throttles the same Key on ping and pong:
from throttled import Throttled, rate_limiter, store
mem_store = store.MemoryStore()
@Throttled(key="ping-pong", quota=rate_limiter.per_min(1), store=mem_store)
def ping() -> str: return "ping"
@Throttled(key="ping-pong", quota=rate_limiter.per_min(1), store=mem_store)
def pong() -> str: return "pong"
ping() # Success
pong() # Raises LimitedErrorThe rate limiting algorithm is specified by the using parameter. The supported algorithms are as follows:
- Fixed window:
RateLimiterType.FIXED_WINDOW.value - Sliding window:
RateLimiterType.SLIDING_WINDOW.value - Token Bucket:
RateLimiterType.TOKEN_BUCKET.value - Leaky Bucket:
RateLimiterType.LEAKING_BUCKET.value - Generic Cell Rate Algorithm, GCRA:
RateLimiterType.GCRA.value
from throttled import RateLimiterType, Throttled, rate_limiter
throttle = Throttled(
# 🌟Specifying a current limiting algorithm
using=RateLimiterType.FIXED_WINDOW.value,
quota=rate_limiter.per_min(1)
)
assert throttle.limit("key", 2).limited is Truefrom throttled import rate_limiter
rate_limiter.per_sec(60) # 60 req/sec
rate_limiter.per_min(60) # 60 req/min
rate_limiter.per_hour(60) # 60 req/hour
rate_limiter.per_day(60) # 60 req/day
rate_limiter.per_week(60) # 60 req/weekThe burst parameter can be used to adjust the ability of the throttling object to handle burst traffic. This is valid for the following algorithms:
TOKEN_BUCKETLEAKING_BUCKETGCRA
from throttled import rate_limiter
# Allow 120 burst requests.
# When burst is not specified, the default setting is the limit passed in.
rate_limiter.per_min(60, burst=120)from datetime import timedelta
from throttled import rate_limiter
# A total of 120 requests are allowed in two minutes, and a burst of 150 requests is allowed.
rate_limiter.per_duration(timedelta(minutes=2), limit=120, burst=150)RateLimitState represents the result after executing the RateLimiter for the given key.
| Field | Type | Description |
|---|---|---|
limited |
bool | Limited represents whether this request is allowed to pass. |
state |
RateLimitState | RateLimitState represents the result after executing the RateLimiter for the given key. |
RateLimitState represents the current state of the rate limiter for the given key.
| Field | Type | Description |
|---|---|---|
limit |
int | Limit represents the maximum number of requests allowed to pass in the initial state. |
remaining |
int | Remaining represents the maximum number of requests allowed to pass for the given key in the current state. |
reset_after |
float | ResetAfter represents the time in seconds for the RateLimiter to return to its initial state. In the initial state, Limit=Remaining. |
retry_after |
float | RetryAfter represents the time in seconds for the request to be retried, 0 if the request is allowed. |
Quota represents the quota limit configuration.
| Field | Type | Description |
|---|---|---|
burst |
int | Optional burst capacity that allows exceeding the rate limit momentarily(supports Token / Leaky Bucket, GCRA). |
rate |
Rate | The base rate limit configuration. |
Rate represents the rate limit configuration.
| Field | Type | Description |
|---|---|---|
period |
datetime.timedelta | The time period for which the rate limit applies. |
limit |
int | The maximum number of requests allowed within the specified period. |
| Param | Description | Default |
|---|---|---|
server |
Redis connection URL, you can use it to connect to Redis in any deployment mode. | "redis://localhost:6379/0" |
options |
Storage-specific configurations | {} |
RedisStore is developed based on the Redis API provided by redis-py.
In terms of Redis connection configuration management, the configuration naming of django-redis is basically used to reduce the learning cost.
| Parameter | Description | Default |
|---|---|---|
SOCKET_TIMEOUT |
ConnectionPool parameters. | null |
SOCKET_CONNECT_TIMEOUT |
ConnectionPool parameters. | null |
CONNECTION_POOL_KWARGS |
ConnectionPool construction parameters. | {} |
REDIS_CLIENT_KWARGS |
RedisClient construction parameters. | {} |
SENTINEL_KWARGS |
Sentinel construction parameters. | {} |
CONNECTION_FACTORY_CLASS |
ConnectionFactory is used to create and maintain ConnectionPool. | Automatically select via the server scheme by default. Standalone: "throttled.store.ConnectionFactory" Sentinel: "throttled.store.SentinelConnectionFactory"Cluster: "throttled.store.ClusterConnectionFactory" |
REDIS_CLIENT_CLASS |
RedisClient import path. | Automatically select sync/async mode by default. Sync(Standalone/Sentinel): "redis.client.Redis"Async(Standalone/Sentinel): "redis.asyncio.client.Redis"Sync(Cluster): "redis.cluster.RedisCluster"Async(Cluster): "redis.asyncio.cluster.RedisCluster" |
CONNECTION_POOL_CLASS |
ConnectionPool import path. | Automatically select via the server scheme and sync/async mode by default.Sync(Standalone): "redis.connection.ConnectionPool"Async(Standalone): "redis.asyncio.connection.ConnectionPool"Sync(Sentinel): "redis.sentinel.SentinelConnectionPool"Async(Sentinel): "redis.asyncio.sentinel.SentinelConnectionPool"Cluster: "Disabled" |
SENTINEL_CLASS |
Sentinel import path. | Automatically select sync/async mode by default. Sync: "redis.Sentinel"Async: "redis.asyncio.Sentinel" |
MemoryStore is essentially a LRU Cache based on memory with expiration time.
| Parameter | Description | Default |
|---|---|---|
MAX_SIZE |
Maximum capacity. When the number of stored key-value pairs exceeds MAX_SIZE, they will be eliminated according to the LRU policy. |
1024 |
All exceptions inherit from throttled.exceptions.BaseThrottledError.
When a request is throttled, an exception is thrown, such as: Rate limit exceeded: remaining=0, reset_after=60, retry_after=60..
| Field | Type | Description |
|---|---|---|
rate_limit_result |
RateLimitResult |
The result after executing the RateLimiter for the given key. |
Thrown when the parameter is invalid, such as: Invalid key: None, must be a non-empty key..
- Python Version: 3.13.1 (CPython implementation)
- Operating System: macOS Darwin 23.6.0 (ARM64 architecture)
- Redis Version: 7.x (local connection)
Throughput in req/s, Latency in ms/op.
| Algorithm Type | In-Memory (Single-thread) | In-Memory (16 threads) | Redis (Single-thread) | Redis (16 threads) |
|---|---|---|---|---|
| Baseline [1] | 1,692,307 / 0.0002 | 135,018 / 0.0004 [2] | 17,324 / 0.0571 | 16,803 / 0.9478 |
| Fixed Window | 369,635 / 0.0023 | 57,275 / 0.2533 | 16,233 / 0.0610 | 15,835 / 1.0070 |
| Sliding Window | 265,215 / 0.0034 | 49,721 / 0.2996 | 12,605 / 0.0786 | 13,371 / 1.1923 |
| Token Bucket | 365,678 / 0.0023 | 54,597 / 0.2821 | 13,643 / 0.0727 | 13,219 / 1.2057 |
| Leaky Bucket | 364,296 / 0.0023 | 54,136 / 0.2887 | 13,628 / 0.0727 | 12,579 / 1.2667 |
| GCRA | 373,906 / 0.0023 | 53,994 / 0.2895 | 12,901 / 0.0769 | 12,861 / 1.2391 |
- [1] Baseline: In-Memory -
dict[key] += 1, Redis -INCRBY key increment. - [2] In-Memory concurrent baseline uses
threading.RLockfor thread safety. - [3] Performance: In-Memory - ~2.5-4.5x
dict[key] += 1operations, Redis - ~1.06-1.37xINCRBY key incrementoperations. - [4] Benchmark code: tests/benchmarks/test_throttled.py.