FastAPI Advanced Patterns: Production-Ready Python APIs
FastAPI has become the dominant Python framework for building high-performance APIs. Its async-first design, automatic OpenAPI generation, and Pydantic-powered validation make it compelling for greenfield projects. But going from a tutorial-level app to a production system requires mastering advanced patterns around project structure, dependency injection, middleware, testing, and deployment. This guide covers the fifteen patterns you need to build FastAPI applications that scale.
Table of Contents
- 1. Project Structure for Large Apps
- 2. Advanced Dependency Injection
- 3. Custom Middleware
- 4. Background Tasks and Task Queues
- 5. WebSocket Connections
- 6. Authentication Patterns
- 7. Rate Limiting and Throttling
- 8. Database Patterns
- 9. Testing Strategies
- 10. Error Handling and Custom Exceptions
- 11. Pagination and Filtering
- 12. File Uploads and Streaming
- 13. Caching Strategies
- 14. Production Deployment
- 15. Performance Optimization
- Frequently Asked Questions
1. Project Structure for Large Apps
A flat main.py breaks down fast. Production FastAPI projects need clear separation between routing, business logic, data access, and configuration. Here is a proven layout:
project/
app/
__init__.py
main.py # create_app factory
config.py # Settings with pydantic-settings
api/
__init__.py
deps.py # Shared dependencies
routes/
__init__.py
users.py
items.py
auth.py
models/ # SQLAlchemy models
__init__.py
user.py
item.py
schemas/ # Pydantic request/response models
__init__.py
user.py
item.py
services/ # Business logic
__init__.py
user_service.py
item_service.py
db/
__init__.py
session.py # Engine and session factory
migrations/ # Alembic migrations
tests/
alembic.ini
pyproject.toml
Dockerfile
The factory function in main.py assembles everything:
from fastapi import FastAPI
from app.config import settings
from app.api.routes import users, items, auth
from app.db.session import engine
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: create connection pool, warm caches
async with engine.begin() as conn:
pass # Run migrations or health checks
yield
# Shutdown: close connections
await engine.dispose()
def create_app() -> FastAPI:
app = FastAPI(
title=settings.PROJECT_NAME,
version=settings.VERSION,
lifespan=lifespan,
docs_url="/docs" if settings.DEBUG else None,
)
app.include_router(auth.router, prefix="/api/v1/auth", tags=["auth"])
app.include_router(users.router, prefix="/api/v1/users", tags=["users"])
app.include_router(items.router, prefix="/api/v1/items", tags=["items"])
return app
app = create_app()
2. Advanced Dependency Injection
FastAPI's Depends() is a compile-time-safe, recursive dependency injection system. Dependencies can be functions, async functions, classes, or generators. They are resolved per-request and cached automatically, so a dependency used in multiple places only runs once per request.
Generator Dependencies with Cleanup
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import async_session_maker
async def get_db() -> AsyncSession:
async with async_session_maker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
Class-Based Dependencies
class PaginationParams:
def __init__(self, skip: int = 0, limit: int = 20):
self.skip = max(0, skip)
self.limit = min(limit, 100)
class CurrentUser:
def __init__(self, required_role: str = "user"):
self.required_role = required_role
async def __call__(self, token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db)):
user = await verify_token(token, db)
if self.required_role == "admin" and not user.is_admin:
raise HTTPException(403, "Admin access required")
return user
# Usage: parameterized dependency
@router.get("/admin/users")
async def admin_list(user=Depends(CurrentUser(required_role="admin"))):
...
Dependency Overrides for Testing
# In tests, override real dependencies with mocks
def get_test_db():
db = TestingSessionLocal()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = get_test_db
3. Custom Middleware
Middleware wraps every request/response cycle. FastAPI supports both ASGI middleware and its own @app.middleware("http") decorator. Use middleware for cross-cutting concerns like logging, timing, CORS, and request IDs.
import time, uuid
from fastapi import Request
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Request timing and ID middleware
class RequestContextMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
request_id = str(uuid.uuid4())[:8]
request.state.request_id = request_id
start = time.perf_counter()
response = await call_next(request)
elapsed = time.perf_counter() - start
response.headers["X-Request-ID"] = request_id
response.headers["X-Process-Time"] = f"{elapsed:.4f}"
logger.info(
"request",
method=request.method,
path=request.url.path,
status=response.status_code,
duration=round(elapsed, 4),
request_id=request_id,
)
return response
app.add_middleware(RequestContextMiddleware)
4. Background Tasks and Task Queues
FastAPI's built-in BackgroundTasks runs work after the response is sent, perfect for fire-and-forget operations. For heavier work, offload to Celery or ARQ.
Built-in Background Tasks
from fastapi import BackgroundTasks
async def send_welcome_email(email: str, name: str):
# Simulate email sending
await email_client.send(to=email, subject=f"Welcome {name}!")
@router.post("/users", status_code=201)
async def create_user(
data: UserCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
):
user = await user_service.create(db, data)
background_tasks.add_task(send_welcome_email, user.email, user.name)
return user
Celery for Heavy Workloads
# tasks.py
from celery import Celery
celery_app = Celery("worker", broker="redis://redis:6379/0")
@celery_app.task(bind=True, max_retries=3)
def process_report(self, report_id: int):
try:
generate_pdf(report_id)
upload_to_s3(report_id)
except Exception as exc:
self.retry(countdown=60, exc=exc)
# In your route handler
@router.post("/reports")
async def create_report(data: ReportCreate):
report = await save_report(data)
process_report.delay(report.id)
return {"id": report.id, "status": "processing"}
ARQ for Async Task Queues
from arq import create_pool
from arq.connections import RedisSettings
async def process_image(ctx, image_id: int):
"""ARQ worker function - fully async."""
image = await fetch_image(image_id)
resized = await resize(image, width=800)
await save_image(resized)
# Enqueue from route handler
@router.post("/images")
async def upload_image(file: UploadFile, redis=Depends(get_redis)):
image_id = await save_upload(file)
pool = await create_pool(RedisSettings())
await pool.enqueue_job("process_image", image_id)
return {"id": image_id}
5. WebSocket Connections
FastAPI has first-class WebSocket support for real-time features like chat, notifications, and live dashboards.
from fastapi import WebSocket, WebSocketDisconnect
class ConnectionManager:
def __init__(self):
self.active: dict[str, list[WebSocket]] = {}
async def connect(self, room: str, websocket: WebSocket):
await websocket.accept()
self.active.setdefault(room, []).append(websocket)
def disconnect(self, room: str, websocket: WebSocket):
self.active[room].remove(websocket)
if not self.active[room]:
del self.active[room]
async def broadcast(self, room: str, message: dict):
for ws in self.active.get(room, []):
await ws.send_json(message)
manager = ConnectionManager()
@router.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
await manager.connect(room, websocket)
try:
while True:
data = await websocket.receive_json()
data["room"] = room
await manager.broadcast(room, data)
except WebSocketDisconnect:
manager.disconnect(room, websocket)
await manager.broadcast(room, {"event": "user_left"})
6. Authentication Patterns
Production APIs need layered authentication. Here are the three most common patterns: JWT bearer tokens, OAuth2 password flow, and API key headers.
JWT Authentication
from datetime import datetime, timedelta
from jose import jwt, JWTError
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token")
def create_access_token(data: dict, expires_minutes: int = 30) -> str:
payload = data.copy()
payload["exp"] = datetime.utcnow() + timedelta(minutes=expires_minutes)
return jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
):
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
user_id: int = payload.get("sub")
if user_id is None:
raise HTTPException(401, "Invalid token")
except JWTError:
raise HTTPException(401, "Invalid token")
user = await db.get(User, user_id)
if not user or not user.is_active:
raise HTTPException(401, "User not found or inactive")
return user
API Key Authentication
from fastapi.security import APIKeyHeader
api_key_header = APIKeyHeader(name="X-API-Key")
async def verify_api_key(
key: str = Depends(api_key_header),
db: AsyncSession = Depends(get_db),
):
api_key = await db.execute(
select(APIKey).where(APIKey.key == key, APIKey.is_active == True)
)
result = api_key.scalar_one_or_none()
if not result:
raise HTTPException(403, "Invalid API key")
return result
7. Rate Limiting and Throttling
Protect your API from abuse with per-user rate limiting using Redis as a sliding window counter.
import redis.asyncio as redis
from fastapi import Request, HTTPException
redis_client = redis.from_url("redis://redis:6379/1")
async def rate_limit(
request: Request,
max_requests: int = 100,
window_seconds: int = 60,
):
key = f"rate:{request.client.host}:{request.url.path}"
current = await redis_client.incr(key)
if current == 1:
await redis_client.expire(key, window_seconds)
if current > max_requests:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded. Try again later.",
headers={"Retry-After": str(window_seconds)},
)
# Apply as dependency
@router.get("/search", dependencies=[Depends(rate_limit)])
async def search(q: str):
return await perform_search(q)
For more granular control, create a parameterized rate limiter:
def rate_limiter(max_requests: int = 60, window: int = 60):
async def _limiter(request: Request):
await rate_limit(request, max_requests, window)
return _limiter
# 10 requests per minute for expensive endpoints
@router.post("/generate", dependencies=[Depends(rate_limiter(10, 60))])
async def generate_report(data: ReportRequest):
...
8. Database Patterns
Modern FastAPI apps use SQLAlchemy 2.0 with async sessions. The repository pattern keeps database queries out of route handlers.
Async SQLAlchemy Setup
from sqlalchemy.ext.asyncio import (
create_async_engine, async_sessionmaker, AsyncSession
)
from sqlalchemy.orm import DeclarativeBase
engine = create_async_engine(
settings.DATABASE_URL, # postgresql+asyncpg://...
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
)
async_session_maker = async_sessionmaker(engine, expire_on_commit=False)
class Base(DeclarativeBase):
pass
Repository Pattern
from sqlalchemy import select, func
from typing import Generic, TypeVar, Type
ModelType = TypeVar("ModelType", bound=Base)
class BaseRepository(Generic[ModelType]):
def __init__(self, model: Type[ModelType], db: AsyncSession):
self.model = model
self.db = db
async def get(self, id: int) -> ModelType | None:
return await self.db.get(self.model, id)
async def list(self, skip: int = 0, limit: int = 20):
stmt = select(self.model).offset(skip).limit(limit)
result = await self.db.execute(stmt)
return result.scalars().all()
async def create(self, data: dict) -> ModelType:
obj = self.model(**data)
self.db.add(obj)
await self.db.flush()
return obj
async def count(self) -> int:
stmt = select(func.count()).select_from(self.model)
result = await self.db.execute(stmt)
return result.scalar()
class UserRepository(BaseRepository[User]):
async def get_by_email(self, email: str) -> User | None:
stmt = select(User).where(User.email == email)
result = await self.db.execute(stmt)
return result.scalar_one_or_none()
9. Testing Strategies
FastAPI testing combines httpx.AsyncClient with pytest-asyncio for full async coverage. Override dependencies to isolate tests from external services.
import pytest
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
TEST_DB_URL = "sqlite+aiosqlite:///./test.db"
@pytest.fixture
async def db_session():
engine = create_async_engine(TEST_DB_URL)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
session_maker = async_sessionmaker(engine, expire_on_commit=False)
async with session_maker() as session:
yield session
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
@pytest.fixture
async def client(db_session):
def override_db():
yield db_session
app.dependency_overrides[get_db] = override_db
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
app.dependency_overrides.clear()
@pytest.mark.anyio
async def test_create_user(client: AsyncClient):
resp = await client.post("/api/v1/users", json={
"email": "test@example.com", "name": "Test User"
})
assert resp.status_code == 201
assert resp.json()["email"] == "test@example.com"
@pytest.mark.anyio
async def test_get_user_not_found(client: AsyncClient):
resp = await client.get("/api/v1/users/999")
assert resp.status_code == 404
10. Error Handling and Custom Exceptions
Define a hierarchy of domain exceptions and register global handlers so every error returns consistent JSON.
from fastapi import Request
from fastapi.responses import JSONResponse
class AppException(Exception):
def __init__(self, status_code: int, detail: str, error_code: str = None):
self.status_code = status_code
self.detail = detail
self.error_code = error_code or "UNKNOWN_ERROR"
class NotFoundError(AppException):
def __init__(self, resource: str, id: int):
super().__init__(404, f"{resource} #{id} not found", "NOT_FOUND")
class ConflictError(AppException):
def __init__(self, detail: str):
super().__init__(409, detail, "CONFLICT")
class ValidationError(AppException):
def __init__(self, detail: str):
super().__init__(422, detail, "VALIDATION_ERROR")
@app.exception_handler(AppException)
async def app_exception_handler(request: Request, exc: AppException):
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.error_code,
"detail": exc.detail,
"path": str(request.url.path),
},
)
# Usage in service layer
async def get_user(db, user_id: int) -> User:
user = await db.get(User, user_id)
if not user:
raise NotFoundError("User", user_id)
return user
11. Pagination and Filtering
Standardize pagination across all list endpoints with a reusable dependency and response schema.
from pydantic import BaseModel
from typing import Generic, TypeVar, Sequence
T = TypeVar("T")
class PageResponse(BaseModel, Generic[T]):
items: Sequence[T]
total: int
page: int
per_page: int
pages: int
class PaginationParams:
def __init__(self, page: int = 1, per_page: int = 20):
self.page = max(1, page)
self.per_page = min(per_page, 100)
self.skip = (self.page - 1) * self.per_page
async def paginate(db, stmt, pagination: PaginationParams, schema):
# Count total
count_stmt = select(func.count()).select_from(stmt.subquery())
total = (await db.execute(count_stmt)).scalar()
# Fetch page
result = await db.execute(
stmt.offset(pagination.skip).limit(pagination.per_page)
)
items = [schema.model_validate(r) for r in result.scalars().all()]
return PageResponse(
items=items,
total=total,
page=pagination.page,
per_page=pagination.per_page,
pages=(total + pagination.per_page - 1) // pagination.per_page,
)
@router.get("/items", response_model=PageResponse[ItemOut])
async def list_items(
p: PaginationParams = Depends(),
db: AsyncSession = Depends(get_db),
category: str | None = None,
):
stmt = select(Item)
if category:
stmt = stmt.where(Item.category == category)
return await paginate(db, stmt, p, ItemOut)
12. File Uploads and Streaming
Handle file uploads with validation and stream large responses without loading everything into memory.
from fastapi import UploadFile, File, HTTPException
from fastapi.responses import StreamingResponse
import aiofiles
ALLOWED_TYPES = {"image/jpeg", "image/png", "image/webp"}
MAX_SIZE = 10 * 1024 * 1024 # 10MB
@router.post("/upload")
async def upload_file(file: UploadFile = File(...)):
if file.content_type not in ALLOWED_TYPES:
raise HTTPException(400, f"Type {file.content_type} not allowed")
contents = await file.read()
if len(contents) > MAX_SIZE:
raise HTTPException(400, "File too large (max 10MB)")
path = f"/data/uploads/{file.filename}"
async with aiofiles.open(path, "wb") as f:
await f.write(contents)
return {"filename": file.filename, "size": len(contents)}
# Streaming large files
@router.get("/download/{filename}")
async def download_file(filename: str):
path = f"/data/uploads/{filename}"
async def stream():
async with aiofiles.open(path, "rb") as f:
while chunk := await f.read(8192):
yield chunk
return StreamingResponse(
stream(),
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
13. Caching Strategies
Cache expensive computations and database queries with Redis or in-memory caches to reduce latency.
Redis Caching
import json
import redis.asyncio as redis
from functools import wraps
redis_client = redis.from_url("redis://redis:6379/0")
def cache(ttl_seconds: int = 300):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
key = f"cache:{func.__name__}:{args}:{kwargs}"
cached = await redis_client.get(key)
if cached:
return json.loads(cached)
result = await func(*args, **kwargs)
await redis_client.setex(key, ttl_seconds, json.dumps(result))
return result
return wrapper
return decorator
@cache(ttl_seconds=600)
async def get_popular_items() -> list[dict]:
# Expensive aggregation query
return await fetch_popular_from_db()
In-Memory Caching with TTL
from cachetools import TTLCache
from asyncio import Lock
_cache = TTLCache(maxsize=1000, ttl=300)
_lock = Lock()
async def get_config(key: str) -> dict:
async with _lock:
if key in _cache:
return _cache[key]
value = await fetch_config_from_db(key)
async with _lock:
_cache[key] = value
return value
14. Production Deployment
A production FastAPI deployment combines Gunicorn with Uvicorn workers, Docker for packaging, and Kubernetes for orchestration.
Gunicorn Configuration
# gunicorn.conf.py
import multiprocessing
bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
timeout = 120
keepalive = 5
accesslog = "-"
errorlog = "-"
loglevel = "info"
Multi-Stage Dockerfile
# Stage 1: Build dependencies
FROM python:3.12-slim AS builder
WORKDIR /app
COPY pyproject.toml poetry.lock ./
RUN pip install poetry && \
poetry config virtualenvs.create false && \
poetry install --no-dev --no-interaction
# Stage 2: Production image
FROM python:3.12-slim
WORKDIR /app
COPY --from=builder /usr/local/lib/python3.12 /usr/local/lib/python3.12
COPY --from=builder /usr/local/bin /usr/local/bin
COPY app/ ./app/
COPY gunicorn.conf.py .
RUN useradd -m appuser && chown -R appuser /app
USER appuser
EXPOSE 8000
HEALTHCHECK CMD curl -f http://localhost:8000/health || exit 1
CMD ["gunicorn", "app.main:app", "-c", "gunicorn.conf.py"]
Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: fastapi-app
spec:
replicas: 3
selector:
matchLabels: { app: fastapi }
template:
metadata:
labels: { app: fastapi }
spec:
containers:
- name: api
image: registry.example.com/fastapi-app:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef: { name: app-secrets, key: db-url }
livenessProbe:
httpGet: { path: /health, port: 8000 }
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet: { path: /health, port: 8000 }
initialDelaySeconds: 5
periodSeconds: 10
resources:
requests: { cpu: 250m, memory: 256Mi }
limits: { cpu: 1000m, memory: 512Mi }
15. Performance Optimization
FastAPI is already fast, but these patterns squeeze out more throughput and lower latency.
Connection Pooling and Keep-Alive
import httpx
# Reuse a single client across requests
http_client = httpx.AsyncClient(
timeout=30.0,
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
)
@app.on_event("shutdown")
async def shutdown():
await http_client.aclose()
Response Model Optimization
from pydantic import BaseModel
class ItemOut(BaseModel):
model_config = {"from_attributes": True}
id: int
name: str
price: float
# Use response_model_exclude_unset for sparse responses
@router.get("/items/{id}", response_model=ItemOut)
async def get_item(id: int, db: AsyncSession = Depends(get_db)):
item = await db.get(Item, id)
if not item:
raise NotFoundError("Item", id)
return item
Health Check Endpoint
@app.get("/health")
async def health_check():
checks = {}
try:
async with async_session_maker() as db:
await db.execute(text("SELECT 1"))
checks["database"] = "ok"
except Exception:
checks["database"] = "error"
try:
await redis_client.ping()
checks["redis"] = "ok"
except Exception:
checks["redis"] = "error"
healthy = all(v == "ok" for v in checks.values())
return JSONResponse(
status_code=200 if healthy else 503,
content={"status": "healthy" if healthy else "degraded", **checks},
)
Quick Reference: Advanced Patterns
| Pattern | Implementation |
|---|---|
| Project structure | Factory function + routers + service layer |
| Dependency injection | Depends() with classes, generators, sub-dependencies |
| Middleware | BaseHTTPMiddleware for logging, timing, request IDs |
| Background tasks | Built-in BackgroundTasks, Celery, or ARQ for heavy work |
| WebSockets | Connection manager pattern with room-based broadcast |
| Authentication | JWT + OAuth2PasswordBearer, API key header |
| Rate limiting | Redis sliding window with parameterized dependency |
| Database | Async SQLAlchemy 2.0 + repository pattern |
| Testing | httpx.AsyncClient + dependency overrides + pytest-asyncio |
| Error handling | Custom exception hierarchy + global handler |
| Pagination | Reusable PaginationParams + generic PageResponse |
| Caching | Redis with TTL decorator, in-memory TTLCache |
| Deployment | Gunicorn + Uvicorn workers + Docker + Kubernetes |
Frequently Asked Questions
How do I structure a large FastAPI project?
Use a modular structure with an app/ directory containing core/, api/, models/, schemas/, services/, and db/ subdirectories. Each API domain gets its own router module under api/routes/. Use a factory function (create_app) to assemble the application, register routers with prefixes, and configure middleware. Keep business logic in service classes separate from route handlers, and use dependency injection to wire everything together. See the Project Structure section for a complete layout.
How does FastAPI dependency injection work?
FastAPI's Depends() system lets you declare callable dependencies as function parameters. When a request arrives, FastAPI calls the dependency function, resolves its own sub-dependencies recursively, and injects the results. Dependencies can be sync or async functions, classes, or generators (using yield for cleanup). They are cached per-request by default, so the same dependency used in multiple places only executes once. This pattern is ideal for database sessions, authentication, configuration, and shared services. See the Dependency Injection section for examples.
How do I deploy FastAPI in production?
Deploy FastAPI with Uvicorn workers managed by Gunicorn: gunicorn app.main:app -w 4 -k uvicorn.workers.UvicornWorker. Containerize with a multi-stage Docker build using a slim Python base image. Use environment variables for all secrets and configuration. Place behind a reverse proxy like Nginx or Traefik for TLS termination, and deploy to Kubernetes with liveness and readiness probes hitting your /health endpoint. See the Deployment section for complete Docker and Kubernetes configs.