Python asyncio: Complete Guide to Async Programming 2026

February 12, 2026 · 22 min read

Python's asyncio library lets you write concurrent code that handles thousands of simultaneous I/O operations on a single thread. If your application spends most of its time waiting for network responses, database queries, or file reads, asyncio can dramatically improve throughput without the complexity of multi-threaded programming.

This guide covers everything from async/await fundamentals to advanced patterns like semaphore-based rate limiting, async database access, and production error handling. Every concept includes runnable code examples.

⚙ Related resources: Set up your project with our Python Virtual Environments Guide, build async APIs with FastAPI Complete Guide, and compare with JavaScript Async/Await.

Table of Contents

  1. What is asyncio and When to Use It
  2. async/await Fundamentals
  3. Event Loop Internals
  4. Tasks and Futures
  5. asyncio.gather, wait, and as_completed
  6. Semaphores and Rate Limiting
  7. Async Context Managers and Iterators
  8. aiohttp for Async HTTP
  9. Async Database Access
  10. Error Handling Patterns
  11. Testing Async Code
  12. asyncio vs Threading vs Multiprocessing
  13. Real-World Patterns and Best Practices
  14. FAQ

1. What is asyncio and When to Use It

asyncio is a standard library module (since Python 3.4) that provides infrastructure for writing single-threaded concurrent code using coroutines. It is built around an event loop that schedules and runs asynchronous tasks, switching between them whenever one is waiting for I/O.

The key insight: most applications spend the majority of their time waiting for HTTP responses, database rows, or file reads. During those waits, a synchronous program sits idle. asyncio lets your program do useful work during those idle periods.

When to use asyncio

When NOT to use asyncio

2. async/await Fundamentals

A coroutine is defined with async def. Calling it returns a coroutine object. To run it, you must await it or schedule it on the event loop.

import asyncio

async def greet(name: str) -> str:
    print(f"Hello, {name}!")
    await asyncio.sleep(1)  # Simulate I/O
    return f"Greeted {name}"

# WRONG: does NOT run the coroutine
result = greet("Alice")  # Returns a coroutine object

# RIGHT: run with asyncio.run()
async def main():
    result = await greet("Alice")
    print(result)

asyncio.run(main())

Sequential vs concurrent execution

import asyncio, time

async def fetch_data(source: str, delay: float) -> str:
    print(f"Fetching from {source}...")
    await asyncio.sleep(delay)
    return f"Data from {source}"

async def main():
    start = time.perf_counter()

    # Sequential: runs one after another (~3.5s)
    a = await fetch_data("API", 2)
    b = await fetch_data("Database", 1)
    c = await fetch_data("Cache", 0.5)
    print(f"Sequential: {time.perf_counter() - start:.1f}s")

    # Concurrent: runs in parallel (~2.0s)
    start = time.perf_counter()
    a, b, c = await asyncio.gather(
        fetch_data("API", 2),
        fetch_data("Database", 1),
        fetch_data("Cache", 0.5),
    )
    print(f"Concurrent: {time.perf_counter() - start:.1f}s")

asyncio.run(main())

3. Event Loop Internals

The event loop maintains a queue of ready tasks, executes them until they hit an await, then checks for completed I/O and schedules continuations.

import asyncio

async def show_loop():
    loop = asyncio.get_running_loop()
    print(f"Loop running: {loop.is_running()}")

    # Scheduling callbacks
    loop.call_soon(print, "Runs on next iteration")
    loop.call_later(1.0, print, "Runs after 1 second")
    loop.call_at(loop.time() + 2.0, print, "Runs at loop_time + 2s")
    await asyncio.sleep(3)

asyncio.run(show_loop())

Running blocking code in an executor

import asyncio, time
from concurrent.futures import ThreadPoolExecutor

def blocking_io(url: str) -> str:
    time.sleep(2)  # Slow synchronous I/O
    return f"Result from {url}"

async def main():
    loop = asyncio.get_running_loop()

    # Default thread pool
    result = await loop.run_in_executor(None, blocking_io, "https://api.example.com")

    # Custom executor for parallel blocking calls
    with ThreadPoolExecutor(max_workers=4) as pool:
        results = await asyncio.gather(
            loop.run_in_executor(pool, blocking_io, "url1"),
            loop.run_in_executor(pool, blocking_io, "url2"),
            loop.run_in_executor(pool, blocking_io, "url3"),
        )
    print(results)  # Completes in ~2s instead of ~6s

asyncio.run(main())

4. Tasks and Futures

A Task wraps a coroutine and schedules it to run immediately in the background, unlike directly awaiting a coroutine.

import asyncio

async def download(url: str) -> str:
    print(f"Starting: {url}")
    await asyncio.sleep(2)
    return f"Content of {url}"

async def main():
    # Tasks start running immediately
    task1 = asyncio.create_task(download("page1.html"))
    task2 = asyncio.create_task(download("page2.html"))
    task3 = asyncio.create_task(download("page3.html"), name="dl-page3")

    print("Tasks running in background...")
    result1 = await task1
    result2 = await task2
    result3 = await task3
    print(f"Got: {result1}, {result2}, {result3}")

asyncio.run(main())

Task cancellation

async def long_running():
    try:
        while True:
            print("Working...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Cancelled! Cleaning up...")
        raise  # Re-raise to mark task as cancelled

async def main():
    task = asyncio.create_task(long_running())
    await asyncio.sleep(3)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print(f"Task cancelled: {task.cancelled()}")  # True

asyncio.run(main())

Task groups (Python 3.11+)

async def process_item(item: int) -> int:
    await asyncio.sleep(0.5)
    if item == 3:
        raise ValueError(f"Bad item: {item}")
    return item * 2

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            t1 = tg.create_task(process_item(1))
            t2 = tg.create_task(process_item(2))
            t3 = tg.create_task(process_item(4))
        print(t1.result(), t2.result(), t3.result())  # 2, 4, 8
    except* ValueError as eg:
        for exc in eg.exceptions:
            print(f"Caught: {exc}")

asyncio.run(main())

5. asyncio.gather, wait, and as_completed

asyncio.gather

async def fetch(url: str) -> dict:
    await asyncio.sleep(1)
    return {"url": url, "status": 200}

async def main():
    # Results maintain input order
    results = await asyncio.gather(
        fetch("/users"), fetch("/posts"), fetch("/comments"),
    )
    # return_exceptions=True returns errors as values instead of raising
    results = await asyncio.gather(
        fetch("good_url"), asyncio.sleep(-1),
        return_exceptions=True,
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"Error: {r}")
        else:
            print(f"Success: {r}")

asyncio.run(main())

asyncio.wait

async def main():
    tasks = [asyncio.create_task(fetch(f"url_{i}"), name=f"fetch-{i}") for i in range(5)]

    # Wait for first completed
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    print(f"Done: {len(done)}, Pending: {len(pending)}")

    # Wait with timeout, cancel stragglers
    done2, pending2 = await asyncio.wait(pending, timeout=2.0)
    for task in pending2:
        task.cancel()

asyncio.run(main())

asyncio.as_completed

import asyncio, random

async def fetch_with_delay(url: str) -> dict:
    delay = random.uniform(0.5, 3.0)
    await asyncio.sleep(delay)
    return {"url": url, "delay": round(delay, 2)}

async def main():
    coros = [fetch_with_delay(f"url_{i}") for i in range(5)]
    for coro in asyncio.as_completed(coros):
        result = await coro
        print(f"Got: {result}")  # Prints in completion order

asyncio.run(main())

6. Semaphores and Rate Limiting

import asyncio

async def fetch_url(sem: asyncio.Semaphore, url: str) -> str:
    async with sem:  # Limits concurrent access
        print(f"Fetching {url}")
        await asyncio.sleep(1)
        return f"Content of {url}"

async def main():
    sem = asyncio.Semaphore(3)  # Max 3 concurrent requests
    urls = [f"https://api.example.com/page/{i}" for i in range(10)]
    results = await asyncio.gather(*[fetch_url(sem, url) for url in urls])
    print(f"Fetched {len(results)} pages")

asyncio.run(main())

Token bucket rate limiter

import asyncio, time

class RateLimiter:
    def __init__(self, rate: float, burst: int = 1):
        self.rate = rate
        self.burst = burst
        self.tokens = burst
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self):
        async with self._lock:
            now = time.monotonic()
            self.tokens = min(self.burst, self.tokens + (now - self.last_refill) * self.rate)
            self.last_refill = now
            if self.tokens < 1:
                await asyncio.sleep((1 - self.tokens) / self.rate)
                self.tokens = 0
            else:
                self.tokens -= 1

async def call_api(limiter: RateLimiter, endpoint: str) -> str:
    await limiter.acquire()
    print(f"[{time.monotonic():.2f}] Calling {endpoint}")
    await asyncio.sleep(0.1)
    return f"Response from {endpoint}"

async def main():
    limiter = RateLimiter(rate=5, burst=2)  # 5 req/s, burst of 2
    await asyncio.gather(*[call_api(limiter, f"/api/{i}") for i in range(15)])

asyncio.run(main())

7. Async Context Managers and Iterators

Async context managers

import asyncio
from contextlib import asynccontextmanager

class AsyncDBConnection:
    def __init__(self, dsn: str):
        self.dsn = dsn

    async def __aenter__(self):
        print(f"Connecting to {self.dsn}...")
        await asyncio.sleep(0.5)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing connection...")
        await asyncio.sleep(0.1)
        return False

    async def query(self, sql: str) -> list:
        await asyncio.sleep(0.2)
        return [{"id": 1, "name": "Alice"}]

# Decorator-based alternative
@asynccontextmanager
async def managed_resource(name: str):
    print(f"Acquiring {name}")
    resource = {"name": name, "data": []}
    try:
        yield resource
    finally:
        print(f"Releasing {name}")

async def main():
    async with AsyncDBConnection("postgresql://localhost/db") as db:
        rows = await db.query("SELECT * FROM users")
        print(rows)

    async with managed_resource("cache") as cache:
        cache["data"].append("item1")

asyncio.run(main())

Async generators

async def fetch_pages(base_url: str, total_pages: int):
    """Async generator yielding pages one at a time."""
    for page in range(1, total_pages + 1):
        await asyncio.sleep(0.3)
        yield {"page": page, "items": [f"item_{i}" for i in range(5)]}

async def main():
    async for page_data in fetch_pages("https://api.example.com", 5):
        print(f"Page {page_data['page']}: {len(page_data['items'])} items")

    # Async comprehension
    items = [item async for page in fetch_pages("https://api.example.com", 3)
             for item in page["items"]]
    print(f"Total items: {len(items)}")

asyncio.run(main())

8. aiohttp for Async HTTP

# pip install aiohttp
import asyncio, aiohttp

async def fetch_json(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as response:
        response.raise_for_status()
        return await response.json()

async def main():
    async with aiohttp.ClientSession() as session:
        # Single request
        data = await fetch_json(session, "https://jsonplaceholder.typicode.com/posts/1")
        print(data["title"])

        # Multiple concurrent requests
        urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 6)]
        posts = await asyncio.gather(*[fetch_json(session, url) for url in urls])
        for post in posts:
            print(f"Post {post['id']}: {post['title']}")

asyncio.run(main())

Timeouts and retries with exponential backoff

async def fetch_with_retry(
    session: aiohttp.ClientSession, url: str,
    max_retries: int = 3, backoff: float = 1.0,
) -> str:
    for attempt in range(max_retries):
        try:
            timeout = aiohttp.ClientTimeout(total=10)
            async with session.get(url, timeout=timeout) as resp:
                resp.raise_for_status()
                return await resp.text()
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            if attempt == max_retries - 1:
                raise
            wait = backoff * (2 ** attempt)
            print(f"Retry {attempt+1}/{max_retries} after {wait}s: {e}")
            await asyncio.sleep(wait)

Concurrent scraping with semaphore

async def scrape_page(session, sem, url):
    async with sem:
        async with session.get(url) as response:
            text = await response.text()
            return {"url": url, "length": len(text), "status": response.status}

async def main():
    sem = asyncio.Semaphore(10)
    urls = [f"https://example.com/page/{i}" for i in range(100)]
    connector = aiohttp.TCPConnector(limit=20)
    async with aiohttp.ClientSession(connector=connector) as session:
        results = await asyncio.gather(
            *[scrape_page(session, sem, url) for url in urls],
            return_exceptions=True,
        )
    successes = [r for r in results if not isinstance(r, Exception)]
    print(f"Success: {len(successes)}, Failed: {len(results) - len(successes)}")

asyncio.run(main())

9. Async Database Access

asyncpg (PostgreSQL)

# pip install asyncpg
import asyncio, asyncpg

async def main():
    pool = await asyncpg.create_pool("postgresql://user:pass@localhost/mydb", min_size=5, max_size=20)

    async with pool.acquire() as conn:
        row = await conn.fetchrow("SELECT id, name, email FROM users WHERE id = $1", 1)
        print(f"User: {row['name']} ({row['email']})")

        rows = await conn.fetch("SELECT * FROM users WHERE active = $1 LIMIT $2", True, 100)

        new_id = await conn.fetchval(
            "INSERT INTO users(name, email) VALUES($1, $2) RETURNING id", "Bob", "bob@example.com"
        )

    # Transactions
    async with pool.acquire() as conn:
        async with conn.transaction():
            await conn.execute("UPDATE accounts SET balance = balance - $1 WHERE id = $2", 100.0, 1)
            await conn.execute("UPDATE accounts SET balance = balance + $1 WHERE id = $2", 100.0, 2)

    await pool.close()

asyncio.run(main())

aiosqlite (SQLite)

# pip install aiosqlite
import asyncio, aiosqlite

async def main():
    async with aiosqlite.connect("app.db") as db:
        await db.execute("""CREATE TABLE IF NOT EXISTS tasks (
            id INTEGER PRIMARY KEY, title TEXT NOT NULL, done BOOLEAN DEFAULT 0)""")
        await db.execute("INSERT INTO tasks(title) VALUES(?)", ("Learn asyncio",))
        await db.commit()

        async with db.execute("SELECT * FROM tasks") as cursor:
            async for row in cursor:
                print(f"Task {row[0]}: {row[1]} (done: {row[2]})")

asyncio.run(main())

10. Error Handling Patterns

import asyncio

async def risky_operation(value: int) -> int:
    await asyncio.sleep(0.1)
    if value < 0:
        raise ValueError(f"Negative value: {value}")
    return value * 2

async def main():
    # Basic try/except
    try:
        result = await risky_operation(-1)
    except ValueError as e:
        print(f"Caught: {e}")

    # gather with return_exceptions
    results = await asyncio.gather(
        risky_operation(5), risky_operation(-1), risky_operation(10),
        return_exceptions=True,
    )
    for i, r in enumerate(results):
        print(f"Task {i}: {'FAIL ' + str(r) if isinstance(r, Exception) else r}")

asyncio.run(main())

Timeout handling

async def slow_operation():
    await asyncio.sleep(10)
    return "done"

async def main():
    # Python 3.11+ style
    try:
        async with asyncio.timeout(2.0):
            result = await slow_operation()
    except TimeoutError:
        print("Timed out (asyncio.timeout)")

    # Compatible with all Python versions
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2.0)
    except asyncio.TimeoutError:
        print("Timed out (wait_for)")

asyncio.run(main())

Graceful shutdown

import asyncio, signal

async def worker(name: str, shutdown: asyncio.Event):
    while not shutdown.is_set():
        print(f"[{name}] Working...")
        try:
            await asyncio.wait_for(shutdown.wait(), timeout=2.0)
        except asyncio.TimeoutError:
            continue
    print(f"[{name}] Stopped")

async def main():
    shutdown = asyncio.Event()
    loop = asyncio.get_running_loop()
    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(sig, shutdown.set)

    workers = [asyncio.create_task(worker(f"w-{i}", shutdown)) for i in range(3)]
    await asyncio.gather(*workers)

11. Testing Async Code

pytest-asyncio

# pip install pytest pytest-asyncio
import asyncio, pytest

async def fetch_user(user_id: int) -> dict:
    await asyncio.sleep(0.1)
    if user_id <= 0:
        raise ValueError("Invalid user ID")
    return {"id": user_id, "name": f"User {user_id}"}

@pytest.mark.asyncio
async def test_fetch_user():
    user = await fetch_user(1)
    assert user["id"] == 1
    assert user["name"] == "User 1"

@pytest.mark.asyncio
async def test_fetch_user_invalid():
    with pytest.raises(ValueError, match="Invalid user ID"):
        await fetch_user(-1)

@pytest.mark.asyncio
async def test_concurrent_fetch():
    users = await asyncio.gather(fetch_user(1), fetch_user(2), fetch_user(3))
    assert len(users) == 3
    assert all(u["id"] > 0 for u in users)

Mocking async functions

from unittest.mock import AsyncMock, patch

async def get_weather(city: str) -> dict:
    pass  # Calls external API

async def get_forecast(city: str) -> str:
    weather = await get_weather(city)
    return "Hot" if weather["temp"] > 30 else "Warm" if weather["temp"] > 15 else "Cold"

@pytest.mark.asyncio
async def test_forecast_hot():
    with patch("__main__.get_weather", new_callable=AsyncMock,
               return_value={"temp": 35, "humidity": 60}):
        assert await get_forecast("Phoenix") == "Hot"

@pytest.mark.asyncio
async def test_forecast_cold():
    mock = AsyncMock(return_value={"temp": 5})
    with patch("__main__.get_weather", mock):
        assert await get_forecast("Oslo") == "Cold"
        mock.assert_called_once_with("Oslo")

12. asyncio vs Threading vs Multiprocessing

Featureasynciothreadingmultiprocessing
Best forI/O-bound (many connections)I/O-bound (sync libraries)CPU-bound
ConcurrencySingle thread, cooperativeMulti-thread, preemptiveMulti-process
GIL affectedN/A (single thread)Yes (limits CPU work)No (separate processes)
MemoryVery low (~KB/task)Medium (~MB/thread)High (~MB/process)
Scale100K+ tasksHundreds of threadsNumber of CPU cores
Race conditionsRare (explicit yields)Common (need locks)Rare (separate memory)
DebuggingEasier (predictable)Hard (nondeterministic)Medium
EcosystemGrowing (aiohttp, asyncpg)Mature (all libraries)Good (shared memory)
# Side-by-side: downloading 100 URLs

# asyncio (best for this use case)
async def download_async(urls):
    async with aiohttp.ClientSession() as session:
        return await asyncio.gather(*[session.get(u) for u in urls])

# threading
from concurrent.futures import ThreadPoolExecutor
import requests
def download_threaded(urls):
    with ThreadPoolExecutor(max_workers=20) as pool:
        return list(pool.map(lambda u: requests.get(u).text, urls))

# multiprocessing (overkill for I/O)
from multiprocessing import Pool
def download_multi(urls):
    with Pool(8) as pool:
        return pool.map(lambda u: requests.get(u).text, urls)

13. Real-World Patterns and Best Practices

Producer-consumer with asyncio.Queue

import asyncio, random

async def producer(queue: asyncio.Queue, name: str, count: int):
    for i in range(count):
        item = f"{name}-item-{i}"
        await asyncio.sleep(random.uniform(0.1, 0.5))
        await queue.put(item)
        print(f"[{name}] Produced: {item}")
    await queue.put(None)  # Sentinel

async def consumer(queue: asyncio.Queue, name: str):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        print(f"[{name}] Processing: {item}")
        await asyncio.sleep(random.uniform(0.2, 0.8))
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)
    producers = [asyncio.create_task(producer(queue, f"P{i}", 5)) for i in range(2)]
    consumers = [asyncio.create_task(consumer(queue, f"C{i}")) for i in range(3)]
    await asyncio.gather(*producers)
    await queue.put(None)  # Extra sentinel for remaining consumers
    await asyncio.gather(*consumers)

asyncio.run(main())

Async retry decorator

import asyncio, functools

def async_retry(max_retries=3, backoff=1.0, exceptions=(Exception,)):
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            last_exc = None
            for attempt in range(max_retries):
                try:
                    return await func(*args, **kwargs)
                except exceptions as e:
                    last_exc = e
                    if attempt < max_retries - 1:
                        wait = backoff * (2 ** attempt)
                        await asyncio.sleep(wait)
            raise last_exc
        return wrapper
    return decorator

@async_retry(max_retries=3, backoff=0.5, exceptions=(ConnectionError,))
async def unreliable_api(endpoint: str) -> dict:
    import random
    if random.random() < 0.7:
        raise ConnectionError("Connection refused")
    return {"status": "ok"}

Async caching with TTL

import asyncio
from functools import wraps

def async_cache(ttl: float = 60.0):
    cache = {}
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            key = str((args, tuple(sorted(kwargs.items()))))
            now = asyncio.get_event_loop().time()
            if key in cache and now - cache[key][0] < ttl:
                return cache[key][1]
            result = await func(*args, **kwargs)
            cache[key] = (now, result)
            return result
        wrapper.cache_clear = cache.clear
        return wrapper
    return decorator

@async_cache(ttl=30)
async def get_exchange_rate(currency: str) -> float:
    await asyncio.sleep(1)  # Simulate API call
    return 1.08 if currency == "EUR" else 0.79

Best practices summary

Frequently Asked Questions

What is asyncio in Python?

asyncio is Python's built-in library for writing concurrent code using async/await. It provides an event loop that manages coroutines -- functions that can pause while waiting for I/O operations like network requests, file reads, or database queries. Unlike threading, asyncio uses a single thread and achieves concurrency by switching between tasks whenever one is waiting, making it highly efficient for I/O-bound workloads. It has been in the standard library since Python 3.4 and matured significantly with Python 3.11+ features like TaskGroup and asyncio.timeout().

When should I use asyncio vs threading vs multiprocessing?

Use asyncio for I/O-bound tasks with many concurrent connections (web servers, scrapers, chat apps). It uses very little memory per task and scales to hundreds of thousands of concurrent operations. Use threading for I/O-bound work that requires synchronous libraries without async alternatives. Use multiprocessing for CPU-bound tasks like image processing, data crunching, or ML inference, because it bypasses the GIL by running separate Python processes.

How do I run multiple async tasks concurrently?

Use asyncio.gather() to run coroutines concurrently and collect results in order: results = await asyncio.gather(task1(), task2(), task3()). Create Task objects with asyncio.create_task() for background execution. Use asyncio.wait() with return_when=FIRST_COMPLETED for processing tasks as they finish. Use asyncio.as_completed() to iterate results in completion order. In Python 3.11+, use asyncio.TaskGroup() for structured concurrency. Limit concurrency with asyncio.Semaphore.

Related Content

Python Virtual Environments
Set up isolated Python environments with venv, pipenv, and Poetry
FastAPI Complete Guide
Build high-performance async APIs with FastAPI and Python
JavaScript Async/Await
Compare async patterns in JavaScript with Promises and async/await