FastAPI Advanced Patterns: Production-Ready Python APIs

FastAPI has become the dominant Python framework for building high-performance APIs. Its async-first design, automatic OpenAPI generation, and Pydantic-powered validation make it compelling for greenfield projects. But going from a tutorial-level app to a production system requires mastering advanced patterns around project structure, dependency injection, middleware, testing, and deployment. This guide covers the fifteen patterns you need to build FastAPI applications that scale.

1. Project Structure for Large Apps

A flat main.py breaks down fast. Production FastAPI projects need clear separation between routing, business logic, data access, and configuration. Here is a proven layout:

project/
    app/
        __init__.py
        main.py              # create_app factory
        config.py            # Settings with pydantic-settings
        api/
            __init__.py
            deps.py           # Shared dependencies
            routes/
                __init__.py
                users.py
                items.py
                auth.py
        models/               # SQLAlchemy models
            __init__.py
            user.py
            item.py
        schemas/              # Pydantic request/response models
            __init__.py
            user.py
            item.py
        services/             # Business logic
            __init__.py
            user_service.py
            item_service.py
        db/
            __init__.py
            session.py        # Engine and session factory
            migrations/       # Alembic migrations
    tests/
    alembic.ini
    pyproject.toml
    Dockerfile

The factory function in main.py assembles everything:

from fastapi import FastAPI
from app.config import settings
from app.api.routes import users, items, auth
from app.db.session import engine
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: create connection pool, warm caches
    async with engine.begin() as conn:
        pass  # Run migrations or health checks
    yield
    # Shutdown: close connections
    await engine.dispose()

def create_app() -> FastAPI:
    app = FastAPI(
        title=settings.PROJECT_NAME,
        version=settings.VERSION,
        lifespan=lifespan,
        docs_url="/docs" if settings.DEBUG else None,
    )
    app.include_router(auth.router, prefix="/api/v1/auth", tags=["auth"])
    app.include_router(users.router, prefix="/api/v1/users", tags=["users"])
    app.include_router(items.router, prefix="/api/v1/items", tags=["items"])
    return app

app = create_app()

2. Advanced Dependency Injection

FastAPI's Depends() is a compile-time-safe, recursive dependency injection system. Dependencies can be functions, async functions, classes, or generators. They are resolved per-request and cached automatically, so a dependency used in multiple places only runs once per request.

Generator Dependencies with Cleanup

from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import async_session_maker

async def get_db() -> AsyncSession:
    async with async_session_maker() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

Class-Based Dependencies

class PaginationParams:
    def __init__(self, skip: int = 0, limit: int = 20):
        self.skip = max(0, skip)
        self.limit = min(limit, 100)

class CurrentUser:
    def __init__(self, required_role: str = "user"):
        self.required_role = required_role

    async def __call__(self, token: str = Depends(oauth2_scheme),
                       db: AsyncSession = Depends(get_db)):
        user = await verify_token(token, db)
        if self.required_role == "admin" and not user.is_admin:
            raise HTTPException(403, "Admin access required")
        return user

# Usage: parameterized dependency
@router.get("/admin/users")
async def admin_list(user=Depends(CurrentUser(required_role="admin"))):
    ...

Dependency Overrides for Testing

# In tests, override real dependencies with mocks
def get_test_db():
    db = TestingSessionLocal()
    try:
        yield db
    finally:
        db.close()

app.dependency_overrides[get_db] = get_test_db

3. Custom Middleware

Middleware wraps every request/response cycle. FastAPI supports both ASGI middleware and its own @app.middleware("http") decorator. Use middleware for cross-cutting concerns like logging, timing, CORS, and request IDs.

import time, uuid
from fastapi import Request
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.base import BaseHTTPMiddleware

# CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.ALLOWED_ORIGINS,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Request timing and ID middleware
class RequestContextMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        request_id = str(uuid.uuid4())[:8]
        request.state.request_id = request_id
        start = time.perf_counter()

        response = await call_next(request)

        elapsed = time.perf_counter() - start
        response.headers["X-Request-ID"] = request_id
        response.headers["X-Process-Time"] = f"{elapsed:.4f}"
        logger.info(
            "request",
            method=request.method,
            path=request.url.path,
            status=response.status_code,
            duration=round(elapsed, 4),
            request_id=request_id,
        )
        return response

app.add_middleware(RequestContextMiddleware)

4. Background Tasks and Task Queues

FastAPI's built-in BackgroundTasks runs work after the response is sent, perfect for fire-and-forget operations. For heavier work, offload to Celery or ARQ.

Built-in Background Tasks

from fastapi import BackgroundTasks

async def send_welcome_email(email: str, name: str):
    # Simulate email sending
    await email_client.send(to=email, subject=f"Welcome {name}!")

@router.post("/users", status_code=201)
async def create_user(
    data: UserCreate,
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_db),
):
    user = await user_service.create(db, data)
    background_tasks.add_task(send_welcome_email, user.email, user.name)
    return user

Celery for Heavy Workloads

# tasks.py
from celery import Celery

celery_app = Celery("worker", broker="redis://redis:6379/0")

@celery_app.task(bind=True, max_retries=3)
def process_report(self, report_id: int):
    try:
        generate_pdf(report_id)
        upload_to_s3(report_id)
    except Exception as exc:
        self.retry(countdown=60, exc=exc)

# In your route handler
@router.post("/reports")
async def create_report(data: ReportCreate):
    report = await save_report(data)
    process_report.delay(report.id)
    return {"id": report.id, "status": "processing"}

ARQ for Async Task Queues

from arq import create_pool
from arq.connections import RedisSettings

async def process_image(ctx, image_id: int):
    """ARQ worker function - fully async."""
    image = await fetch_image(image_id)
    resized = await resize(image, width=800)
    await save_image(resized)

# Enqueue from route handler
@router.post("/images")
async def upload_image(file: UploadFile, redis=Depends(get_redis)):
    image_id = await save_upload(file)
    pool = await create_pool(RedisSettings())
    await pool.enqueue_job("process_image", image_id)
    return {"id": image_id}

5. WebSocket Connections

FastAPI has first-class WebSocket support for real-time features like chat, notifications, and live dashboards.

from fastapi import WebSocket, WebSocketDisconnect

class ConnectionManager:
    def __init__(self):
        self.active: dict[str, list[WebSocket]] = {}

    async def connect(self, room: str, websocket: WebSocket):
        await websocket.accept()
        self.active.setdefault(room, []).append(websocket)

    def disconnect(self, room: str, websocket: WebSocket):
        self.active[room].remove(websocket)
        if not self.active[room]:
            del self.active[room]

    async def broadcast(self, room: str, message: dict):
        for ws in self.active.get(room, []):
            await ws.send_json(message)

manager = ConnectionManager()

@router.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
    await manager.connect(room, websocket)
    try:
        while True:
            data = await websocket.receive_json()
            data["room"] = room
            await manager.broadcast(room, data)
    except WebSocketDisconnect:
        manager.disconnect(room, websocket)
        await manager.broadcast(room, {"event": "user_left"})

6. Authentication Patterns

Production APIs need layered authentication. Here are the three most common patterns: JWT bearer tokens, OAuth2 password flow, and API key headers.

JWT Authentication

from datetime import datetime, timedelta
from jose import jwt, JWTError
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token")

def create_access_token(data: dict, expires_minutes: int = 30) -> str:
    payload = data.copy()
    payload["exp"] = datetime.utcnow() + timedelta(minutes=expires_minutes)
    return jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256")

async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db),
):
    try:
        payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
        user_id: int = payload.get("sub")
        if user_id is None:
            raise HTTPException(401, "Invalid token")
    except JWTError:
        raise HTTPException(401, "Invalid token")
    user = await db.get(User, user_id)
    if not user or not user.is_active:
        raise HTTPException(401, "User not found or inactive")
    return user

API Key Authentication

from fastapi.security import APIKeyHeader

api_key_header = APIKeyHeader(name="X-API-Key")

async def verify_api_key(
    key: str = Depends(api_key_header),
    db: AsyncSession = Depends(get_db),
):
    api_key = await db.execute(
        select(APIKey).where(APIKey.key == key, APIKey.is_active == True)
    )
    result = api_key.scalar_one_or_none()
    if not result:
        raise HTTPException(403, "Invalid API key")
    return result

7. Rate Limiting and Throttling

Protect your API from abuse with per-user rate limiting using Redis as a sliding window counter.

import redis.asyncio as redis
from fastapi import Request, HTTPException

redis_client = redis.from_url("redis://redis:6379/1")

async def rate_limit(
    request: Request,
    max_requests: int = 100,
    window_seconds: int = 60,
):
    key = f"rate:{request.client.host}:{request.url.path}"
    current = await redis_client.incr(key)
    if current == 1:
        await redis_client.expire(key, window_seconds)
    if current > max_requests:
        raise HTTPException(
            status_code=429,
            detail="Rate limit exceeded. Try again later.",
            headers={"Retry-After": str(window_seconds)},
        )

# Apply as dependency
@router.get("/search", dependencies=[Depends(rate_limit)])
async def search(q: str):
    return await perform_search(q)

For more granular control, create a parameterized rate limiter:

def rate_limiter(max_requests: int = 60, window: int = 60):
    async def _limiter(request: Request):
        await rate_limit(request, max_requests, window)
    return _limiter

# 10 requests per minute for expensive endpoints
@router.post("/generate", dependencies=[Depends(rate_limiter(10, 60))])
async def generate_report(data: ReportRequest):
    ...

8. Database Patterns

Modern FastAPI apps use SQLAlchemy 2.0 with async sessions. The repository pattern keeps database queries out of route handlers.

Async SQLAlchemy Setup

from sqlalchemy.ext.asyncio import (
    create_async_engine, async_sessionmaker, AsyncSession
)
from sqlalchemy.orm import DeclarativeBase

engine = create_async_engine(
    settings.DATABASE_URL,  # postgresql+asyncpg://...
    pool_size=20,
    max_overflow=10,
    pool_pre_ping=True,
)
async_session_maker = async_sessionmaker(engine, expire_on_commit=False)

class Base(DeclarativeBase):
    pass

Repository Pattern

from sqlalchemy import select, func
from typing import Generic, TypeVar, Type

ModelType = TypeVar("ModelType", bound=Base)

class BaseRepository(Generic[ModelType]):
    def __init__(self, model: Type[ModelType], db: AsyncSession):
        self.model = model
        self.db = db

    async def get(self, id: int) -> ModelType | None:
        return await self.db.get(self.model, id)

    async def list(self, skip: int = 0, limit: int = 20):
        stmt = select(self.model).offset(skip).limit(limit)
        result = await self.db.execute(stmt)
        return result.scalars().all()

    async def create(self, data: dict) -> ModelType:
        obj = self.model(**data)
        self.db.add(obj)
        await self.db.flush()
        return obj

    async def count(self) -> int:
        stmt = select(func.count()).select_from(self.model)
        result = await self.db.execute(stmt)
        return result.scalar()

class UserRepository(BaseRepository[User]):
    async def get_by_email(self, email: str) -> User | None:
        stmt = select(User).where(User.email == email)
        result = await self.db.execute(stmt)
        return result.scalar_one_or_none()

9. Testing Strategies

FastAPI testing combines httpx.AsyncClient with pytest-asyncio for full async coverage. Override dependencies to isolate tests from external services.

import pytest
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

TEST_DB_URL = "sqlite+aiosqlite:///./test.db"

@pytest.fixture
async def db_session():
    engine = create_async_engine(TEST_DB_URL)
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    session_maker = async_sessionmaker(engine, expire_on_commit=False)
    async with session_maker() as session:
        yield session
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
    await engine.dispose()

@pytest.fixture
async def client(db_session):
    def override_db():
        yield db_session
    app.dependency_overrides[get_db] = override_db
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        yield ac
    app.dependency_overrides.clear()

@pytest.mark.anyio
async def test_create_user(client: AsyncClient):
    resp = await client.post("/api/v1/users", json={
        "email": "test@example.com", "name": "Test User"
    })
    assert resp.status_code == 201
    assert resp.json()["email"] == "test@example.com"

@pytest.mark.anyio
async def test_get_user_not_found(client: AsyncClient):
    resp = await client.get("/api/v1/users/999")
    assert resp.status_code == 404

10. Error Handling and Custom Exceptions

Define a hierarchy of domain exceptions and register global handlers so every error returns consistent JSON.

from fastapi import Request
from fastapi.responses import JSONResponse

class AppException(Exception):
    def __init__(self, status_code: int, detail: str, error_code: str = None):
        self.status_code = status_code
        self.detail = detail
        self.error_code = error_code or "UNKNOWN_ERROR"

class NotFoundError(AppException):
    def __init__(self, resource: str, id: int):
        super().__init__(404, f"{resource} #{id} not found", "NOT_FOUND")

class ConflictError(AppException):
    def __init__(self, detail: str):
        super().__init__(409, detail, "CONFLICT")

class ValidationError(AppException):
    def __init__(self, detail: str):
        super().__init__(422, detail, "VALIDATION_ERROR")

@app.exception_handler(AppException)
async def app_exception_handler(request: Request, exc: AppException):
    return JSONResponse(
        status_code=exc.status_code,
        content={
            "error": exc.error_code,
            "detail": exc.detail,
            "path": str(request.url.path),
        },
    )

# Usage in service layer
async def get_user(db, user_id: int) -> User:
    user = await db.get(User, user_id)
    if not user:
        raise NotFoundError("User", user_id)
    return user

11. Pagination and Filtering

Standardize pagination across all list endpoints with a reusable dependency and response schema.

from pydantic import BaseModel
from typing import Generic, TypeVar, Sequence

T = TypeVar("T")

class PageResponse(BaseModel, Generic[T]):
    items: Sequence[T]
    total: int
    page: int
    per_page: int
    pages: int

class PaginationParams:
    def __init__(self, page: int = 1, per_page: int = 20):
        self.page = max(1, page)
        self.per_page = min(per_page, 100)
        self.skip = (self.page - 1) * self.per_page

async def paginate(db, stmt, pagination: PaginationParams, schema):
    # Count total
    count_stmt = select(func.count()).select_from(stmt.subquery())
    total = (await db.execute(count_stmt)).scalar()
    # Fetch page
    result = await db.execute(
        stmt.offset(pagination.skip).limit(pagination.per_page)
    )
    items = [schema.model_validate(r) for r in result.scalars().all()]
    return PageResponse(
        items=items,
        total=total,
        page=pagination.page,
        per_page=pagination.per_page,
        pages=(total + pagination.per_page - 1) // pagination.per_page,
    )

@router.get("/items", response_model=PageResponse[ItemOut])
async def list_items(
    p: PaginationParams = Depends(),
    db: AsyncSession = Depends(get_db),
    category: str | None = None,
):
    stmt = select(Item)
    if category:
        stmt = stmt.where(Item.category == category)
    return await paginate(db, stmt, p, ItemOut)

12. File Uploads and Streaming

Handle file uploads with validation and stream large responses without loading everything into memory.

from fastapi import UploadFile, File, HTTPException
from fastapi.responses import StreamingResponse
import aiofiles

ALLOWED_TYPES = {"image/jpeg", "image/png", "image/webp"}
MAX_SIZE = 10 * 1024 * 1024  # 10MB

@router.post("/upload")
async def upload_file(file: UploadFile = File(...)):
    if file.content_type not in ALLOWED_TYPES:
        raise HTTPException(400, f"Type {file.content_type} not allowed")
    contents = await file.read()
    if len(contents) > MAX_SIZE:
        raise HTTPException(400, "File too large (max 10MB)")
    path = f"/data/uploads/{file.filename}"
    async with aiofiles.open(path, "wb") as f:
        await f.write(contents)
    return {"filename": file.filename, "size": len(contents)}

# Streaming large files
@router.get("/download/{filename}")
async def download_file(filename: str):
    path = f"/data/uploads/{filename}"
    async def stream():
        async with aiofiles.open(path, "rb") as f:
            while chunk := await f.read(8192):
                yield chunk
    return StreamingResponse(
        stream(),
        media_type="application/octet-stream",
        headers={"Content-Disposition": f'attachment; filename="{filename}"'},
    )

13. Caching Strategies

Cache expensive computations and database queries with Redis or in-memory caches to reduce latency.

Redis Caching

import json
import redis.asyncio as redis
from functools import wraps

redis_client = redis.from_url("redis://redis:6379/0")

def cache(ttl_seconds: int = 300):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            key = f"cache:{func.__name__}:{args}:{kwargs}"
            cached = await redis_client.get(key)
            if cached:
                return json.loads(cached)
            result = await func(*args, **kwargs)
            await redis_client.setex(key, ttl_seconds, json.dumps(result))
            return result
        return wrapper
    return decorator

@cache(ttl_seconds=600)
async def get_popular_items() -> list[dict]:
    # Expensive aggregation query
    return await fetch_popular_from_db()

In-Memory Caching with TTL

from cachetools import TTLCache
from asyncio import Lock

_cache = TTLCache(maxsize=1000, ttl=300)
_lock = Lock()

async def get_config(key: str) -> dict:
    async with _lock:
        if key in _cache:
            return _cache[key]
    value = await fetch_config_from_db(key)
    async with _lock:
        _cache[key] = value
    return value

14. Production Deployment

A production FastAPI deployment combines Gunicorn with Uvicorn workers, Docker for packaging, and Kubernetes for orchestration.

Gunicorn Configuration

# gunicorn.conf.py
import multiprocessing

bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
timeout = 120
keepalive = 5
accesslog = "-"
errorlog = "-"
loglevel = "info"

Multi-Stage Dockerfile

# Stage 1: Build dependencies
FROM python:3.12-slim AS builder
WORKDIR /app
COPY pyproject.toml poetry.lock ./
RUN pip install poetry && \
    poetry config virtualenvs.create false && \
    poetry install --no-dev --no-interaction

# Stage 2: Production image
FROM python:3.12-slim
WORKDIR /app
COPY --from=builder /usr/local/lib/python3.12 /usr/local/lib/python3.12
COPY --from=builder /usr/local/bin /usr/local/bin
COPY app/ ./app/
COPY gunicorn.conf.py .

RUN useradd -m appuser && chown -R appuser /app
USER appuser

EXPOSE 8000
HEALTHCHECK CMD curl -f http://localhost:8000/health || exit 1
CMD ["gunicorn", "app.main:app", "-c", "gunicorn.conf.py"]

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: fastapi-app
spec:
  replicas: 3
  selector:
    matchLabels: { app: fastapi }
  template:
    metadata:
      labels: { app: fastapi }
    spec:
      containers:
      - name: api
        image: registry.example.com/fastapi-app:latest
        ports:
        - containerPort: 8000
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef: { name: app-secrets, key: db-url }
        livenessProbe:
          httpGet: { path: /health, port: 8000 }
          initialDelaySeconds: 10
          periodSeconds: 30
        readinessProbe:
          httpGet: { path: /health, port: 8000 }
          initialDelaySeconds: 5
          periodSeconds: 10
        resources:
          requests: { cpu: 250m, memory: 256Mi }
          limits: { cpu: 1000m, memory: 512Mi }

15. Performance Optimization

FastAPI is already fast, but these patterns squeeze out more throughput and lower latency.

Connection Pooling and Keep-Alive

import httpx

# Reuse a single client across requests
http_client = httpx.AsyncClient(
    timeout=30.0,
    limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
)

@app.on_event("shutdown")
async def shutdown():
    await http_client.aclose()

Response Model Optimization

from pydantic import BaseModel

class ItemOut(BaseModel):
    model_config = {"from_attributes": True}
    id: int
    name: str
    price: float

# Use response_model_exclude_unset for sparse responses
@router.get("/items/{id}", response_model=ItemOut)
async def get_item(id: int, db: AsyncSession = Depends(get_db)):
    item = await db.get(Item, id)
    if not item:
        raise NotFoundError("Item", id)
    return item

Health Check Endpoint

@app.get("/health")
async def health_check():
    checks = {}
    try:
        async with async_session_maker() as db:
            await db.execute(text("SELECT 1"))
        checks["database"] = "ok"
    except Exception:
        checks["database"] = "error"
    try:
        await redis_client.ping()
        checks["redis"] = "ok"
    except Exception:
        checks["redis"] = "error"
    healthy = all(v == "ok" for v in checks.values())
    return JSONResponse(
        status_code=200 if healthy else 503,
        content={"status": "healthy" if healthy else "degraded", **checks},
    )

Quick Reference: Advanced Patterns

PatternImplementation
Project structureFactory function + routers + service layer
Dependency injectionDepends() with classes, generators, sub-dependencies
MiddlewareBaseHTTPMiddleware for logging, timing, request IDs
Background tasksBuilt-in BackgroundTasks, Celery, or ARQ for heavy work
WebSocketsConnection manager pattern with room-based broadcast
AuthenticationJWT + OAuth2PasswordBearer, API key header
Rate limitingRedis sliding window with parameterized dependency
DatabaseAsync SQLAlchemy 2.0 + repository pattern
Testinghttpx.AsyncClient + dependency overrides + pytest-asyncio
Error handlingCustom exception hierarchy + global handler
PaginationReusable PaginationParams + generic PageResponse
CachingRedis with TTL decorator, in-memory TTLCache
DeploymentGunicorn + Uvicorn workers + Docker + Kubernetes

Frequently Asked Questions

How do I structure a large FastAPI project?

Use a modular structure with an app/ directory containing core/, api/, models/, schemas/, services/, and db/ subdirectories. Each API domain gets its own router module under api/routes/. Use a factory function (create_app) to assemble the application, register routers with prefixes, and configure middleware. Keep business logic in service classes separate from route handlers, and use dependency injection to wire everything together. See the Project Structure section for a complete layout.

How does FastAPI dependency injection work?

FastAPI's Depends() system lets you declare callable dependencies as function parameters. When a request arrives, FastAPI calls the dependency function, resolves its own sub-dependencies recursively, and injects the results. Dependencies can be sync or async functions, classes, or generators (using yield for cleanup). They are cached per-request by default, so the same dependency used in multiple places only executes once. This pattern is ideal for database sessions, authentication, configuration, and shared services. See the Dependency Injection section for examples.

How do I deploy FastAPI in production?

Deploy FastAPI with Uvicorn workers managed by Gunicorn: gunicorn app.main:app -w 4 -k uvicorn.workers.UvicornWorker. Containerize with a multi-stage Docker build using a slim Python base image. Use environment variables for all secrets and configuration. Place behind a reverse proxy like Nginx or Traefik for TLS termination, and deploy to Kubernetes with liveness and readiness probes hitting your /health endpoint. See the Deployment section for complete Docker and Kubernetes configs.