Python asyncio: Complete Guide to Async Programming 2026
Python's asyncio library lets you write concurrent code that handles thousands of simultaneous I/O operations on a single thread. If your application spends most of its time waiting for network responses, database queries, or file reads, asyncio can dramatically improve throughput without the complexity of multi-threaded programming.
This guide covers everything from async/await fundamentals to advanced patterns like semaphore-based rate limiting, async database access, and production error handling. Every concept includes runnable code examples.
Table of Contents
- What is asyncio and When to Use It
- async/await Fundamentals
- Event Loop Internals
- Tasks and Futures
- asyncio.gather, wait, and as_completed
- Semaphores and Rate Limiting
- Async Context Managers and Iterators
- aiohttp for Async HTTP
- Async Database Access
- Error Handling Patterns
- Testing Async Code
- asyncio vs Threading vs Multiprocessing
- Real-World Patterns and Best Practices
- FAQ
1. What is asyncio and When to Use It
asyncio is a standard library module (since Python 3.4) that provides infrastructure for writing single-threaded concurrent code using coroutines. It is built around an event loop that schedules and runs asynchronous tasks, switching between them whenever one is waiting for I/O.
The key insight: most applications spend the majority of their time waiting for HTTP responses, database rows, or file reads. During those waits, a synchronous program sits idle. asyncio lets your program do useful work during those idle periods.
When to use asyncio
- Web servers and API backends handling many simultaneous requests (FastAPI, Starlette, aiohttp)
- Web scrapers and crawlers fetching hundreds of pages concurrently
- Chat applications and WebSocket servers with thousands of open connections
- Microservice clients calling multiple downstream APIs in parallel
When NOT to use asyncio
- CPU-bound tasks like image processing or ML inference (use
multiprocessing) - Simple scripts making a few sequential API calls (synchronous code is simpler)
- Libraries without async support where everything needs
run_in_executor
2. async/await Fundamentals
A coroutine is defined with async def. Calling it returns a coroutine object. To run it, you must await it or schedule it on the event loop.
import asyncio
async def greet(name: str) -> str:
print(f"Hello, {name}!")
await asyncio.sleep(1) # Simulate I/O
return f"Greeted {name}"
# WRONG: does NOT run the coroutine
result = greet("Alice") # Returns a coroutine object
# RIGHT: run with asyncio.run()
async def main():
result = await greet("Alice")
print(result)
asyncio.run(main())
Sequential vs concurrent execution
import asyncio, time
async def fetch_data(source: str, delay: float) -> str:
print(f"Fetching from {source}...")
await asyncio.sleep(delay)
return f"Data from {source}"
async def main():
start = time.perf_counter()
# Sequential: runs one after another (~3.5s)
a = await fetch_data("API", 2)
b = await fetch_data("Database", 1)
c = await fetch_data("Cache", 0.5)
print(f"Sequential: {time.perf_counter() - start:.1f}s")
# Concurrent: runs in parallel (~2.0s)
start = time.perf_counter()
a, b, c = await asyncio.gather(
fetch_data("API", 2),
fetch_data("Database", 1),
fetch_data("Cache", 0.5),
)
print(f"Concurrent: {time.perf_counter() - start:.1f}s")
asyncio.run(main())
3. Event Loop Internals
The event loop maintains a queue of ready tasks, executes them until they hit an await, then checks for completed I/O and schedules continuations.
import asyncio
async def show_loop():
loop = asyncio.get_running_loop()
print(f"Loop running: {loop.is_running()}")
# Scheduling callbacks
loop.call_soon(print, "Runs on next iteration")
loop.call_later(1.0, print, "Runs after 1 second")
loop.call_at(loop.time() + 2.0, print, "Runs at loop_time + 2s")
await asyncio.sleep(3)
asyncio.run(show_loop())
Running blocking code in an executor
import asyncio, time
from concurrent.futures import ThreadPoolExecutor
def blocking_io(url: str) -> str:
time.sleep(2) # Slow synchronous I/O
return f"Result from {url}"
async def main():
loop = asyncio.get_running_loop()
# Default thread pool
result = await loop.run_in_executor(None, blocking_io, "https://api.example.com")
# Custom executor for parallel blocking calls
with ThreadPoolExecutor(max_workers=4) as pool:
results = await asyncio.gather(
loop.run_in_executor(pool, blocking_io, "url1"),
loop.run_in_executor(pool, blocking_io, "url2"),
loop.run_in_executor(pool, blocking_io, "url3"),
)
print(results) # Completes in ~2s instead of ~6s
asyncio.run(main())
4. Tasks and Futures
A Task wraps a coroutine and schedules it to run immediately in the background, unlike directly awaiting a coroutine.
import asyncio
async def download(url: str) -> str:
print(f"Starting: {url}")
await asyncio.sleep(2)
return f"Content of {url}"
async def main():
# Tasks start running immediately
task1 = asyncio.create_task(download("page1.html"))
task2 = asyncio.create_task(download("page2.html"))
task3 = asyncio.create_task(download("page3.html"), name="dl-page3")
print("Tasks running in background...")
result1 = await task1
result2 = await task2
result3 = await task3
print(f"Got: {result1}, {result2}, {result3}")
asyncio.run(main())
Task cancellation
async def long_running():
try:
while True:
print("Working...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Cancelled! Cleaning up...")
raise # Re-raise to mark task as cancelled
async def main():
task = asyncio.create_task(long_running())
await asyncio.sleep(3)
task.cancel()
try:
await task
except asyncio.CancelledError:
print(f"Task cancelled: {task.cancelled()}") # True
asyncio.run(main())
Task groups (Python 3.11+)
async def process_item(item: int) -> int:
await asyncio.sleep(0.5)
if item == 3:
raise ValueError(f"Bad item: {item}")
return item * 2
async def main():
try:
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(process_item(1))
t2 = tg.create_task(process_item(2))
t3 = tg.create_task(process_item(4))
print(t1.result(), t2.result(), t3.result()) # 2, 4, 8
except* ValueError as eg:
for exc in eg.exceptions:
print(f"Caught: {exc}")
asyncio.run(main())
5. asyncio.gather, wait, and as_completed
asyncio.gather
async def fetch(url: str) -> dict:
await asyncio.sleep(1)
return {"url": url, "status": 200}
async def main():
# Results maintain input order
results = await asyncio.gather(
fetch("/users"), fetch("/posts"), fetch("/comments"),
)
# return_exceptions=True returns errors as values instead of raising
results = await asyncio.gather(
fetch("good_url"), asyncio.sleep(-1),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
print(f"Error: {r}")
else:
print(f"Success: {r}")
asyncio.run(main())
asyncio.wait
async def main():
tasks = [asyncio.create_task(fetch(f"url_{i}"), name=f"fetch-{i}") for i in range(5)]
# Wait for first completed
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"Done: {len(done)}, Pending: {len(pending)}")
# Wait with timeout, cancel stragglers
done2, pending2 = await asyncio.wait(pending, timeout=2.0)
for task in pending2:
task.cancel()
asyncio.run(main())
asyncio.as_completed
import asyncio, random
async def fetch_with_delay(url: str) -> dict:
delay = random.uniform(0.5, 3.0)
await asyncio.sleep(delay)
return {"url": url, "delay": round(delay, 2)}
async def main():
coros = [fetch_with_delay(f"url_{i}") for i in range(5)]
for coro in asyncio.as_completed(coros):
result = await coro
print(f"Got: {result}") # Prints in completion order
asyncio.run(main())
6. Semaphores and Rate Limiting
import asyncio
async def fetch_url(sem: asyncio.Semaphore, url: str) -> str:
async with sem: # Limits concurrent access
print(f"Fetching {url}")
await asyncio.sleep(1)
return f"Content of {url}"
async def main():
sem = asyncio.Semaphore(3) # Max 3 concurrent requests
urls = [f"https://api.example.com/page/{i}" for i in range(10)]
results = await asyncio.gather(*[fetch_url(sem, url) for url in urls])
print(f"Fetched {len(results)} pages")
asyncio.run(main())
Token bucket rate limiter
import asyncio, time
class RateLimiter:
def __init__(self, rate: float, burst: int = 1):
self.rate = rate
self.burst = burst
self.tokens = burst
self.last_refill = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.monotonic()
self.tokens = min(self.burst, self.tokens + (now - self.last_refill) * self.rate)
self.last_refill = now
if self.tokens < 1:
await asyncio.sleep((1 - self.tokens) / self.rate)
self.tokens = 0
else:
self.tokens -= 1
async def call_api(limiter: RateLimiter, endpoint: str) -> str:
await limiter.acquire()
print(f"[{time.monotonic():.2f}] Calling {endpoint}")
await asyncio.sleep(0.1)
return f"Response from {endpoint}"
async def main():
limiter = RateLimiter(rate=5, burst=2) # 5 req/s, burst of 2
await asyncio.gather(*[call_api(limiter, f"/api/{i}") for i in range(15)])
asyncio.run(main())
7. Async Context Managers and Iterators
Async context managers
import asyncio
from contextlib import asynccontextmanager
class AsyncDBConnection:
def __init__(self, dsn: str):
self.dsn = dsn
async def __aenter__(self):
print(f"Connecting to {self.dsn}...")
await asyncio.sleep(0.5)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing connection...")
await asyncio.sleep(0.1)
return False
async def query(self, sql: str) -> list:
await asyncio.sleep(0.2)
return [{"id": 1, "name": "Alice"}]
# Decorator-based alternative
@asynccontextmanager
async def managed_resource(name: str):
print(f"Acquiring {name}")
resource = {"name": name, "data": []}
try:
yield resource
finally:
print(f"Releasing {name}")
async def main():
async with AsyncDBConnection("postgresql://localhost/db") as db:
rows = await db.query("SELECT * FROM users")
print(rows)
async with managed_resource("cache") as cache:
cache["data"].append("item1")
asyncio.run(main())
Async generators
async def fetch_pages(base_url: str, total_pages: int):
"""Async generator yielding pages one at a time."""
for page in range(1, total_pages + 1):
await asyncio.sleep(0.3)
yield {"page": page, "items": [f"item_{i}" for i in range(5)]}
async def main():
async for page_data in fetch_pages("https://api.example.com", 5):
print(f"Page {page_data['page']}: {len(page_data['items'])} items")
# Async comprehension
items = [item async for page in fetch_pages("https://api.example.com", 3)
for item in page["items"]]
print(f"Total items: {len(items)}")
asyncio.run(main())
8. aiohttp for Async HTTP
# pip install aiohttp
import asyncio, aiohttp
async def fetch_json(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
async def main():
async with aiohttp.ClientSession() as session:
# Single request
data = await fetch_json(session, "https://jsonplaceholder.typicode.com/posts/1")
print(data["title"])
# Multiple concurrent requests
urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 6)]
posts = await asyncio.gather(*[fetch_json(session, url) for url in urls])
for post in posts:
print(f"Post {post['id']}: {post['title']}")
asyncio.run(main())
Timeouts and retries with exponential backoff
async def fetch_with_retry(
session: aiohttp.ClientSession, url: str,
max_retries: int = 3, backoff: float = 1.0,
) -> str:
for attempt in range(max_retries):
try:
timeout = aiohttp.ClientTimeout(total=10)
async with session.get(url, timeout=timeout) as resp:
resp.raise_for_status()
return await resp.text()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == max_retries - 1:
raise
wait = backoff * (2 ** attempt)
print(f"Retry {attempt+1}/{max_retries} after {wait}s: {e}")
await asyncio.sleep(wait)
Concurrent scraping with semaphore
async def scrape_page(session, sem, url):
async with sem:
async with session.get(url) as response:
text = await response.text()
return {"url": url, "length": len(text), "status": response.status}
async def main():
sem = asyncio.Semaphore(10)
urls = [f"https://example.com/page/{i}" for i in range(100)]
connector = aiohttp.TCPConnector(limit=20)
async with aiohttp.ClientSession(connector=connector) as session:
results = await asyncio.gather(
*[scrape_page(session, sem, url) for url in urls],
return_exceptions=True,
)
successes = [r for r in results if not isinstance(r, Exception)]
print(f"Success: {len(successes)}, Failed: {len(results) - len(successes)}")
asyncio.run(main())
9. Async Database Access
asyncpg (PostgreSQL)
# pip install asyncpg
import asyncio, asyncpg
async def main():
pool = await asyncpg.create_pool("postgresql://user:pass@localhost/mydb", min_size=5, max_size=20)
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT id, name, email FROM users WHERE id = $1", 1)
print(f"User: {row['name']} ({row['email']})")
rows = await conn.fetch("SELECT * FROM users WHERE active = $1 LIMIT $2", True, 100)
new_id = await conn.fetchval(
"INSERT INTO users(name, email) VALUES($1, $2) RETURNING id", "Bob", "bob@example.com"
)
# Transactions
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute("UPDATE accounts SET balance = balance - $1 WHERE id = $2", 100.0, 1)
await conn.execute("UPDATE accounts SET balance = balance + $1 WHERE id = $2", 100.0, 2)
await pool.close()
asyncio.run(main())
aiosqlite (SQLite)
# pip install aiosqlite
import asyncio, aiosqlite
async def main():
async with aiosqlite.connect("app.db") as db:
await db.execute("""CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY, title TEXT NOT NULL, done BOOLEAN DEFAULT 0)""")
await db.execute("INSERT INTO tasks(title) VALUES(?)", ("Learn asyncio",))
await db.commit()
async with db.execute("SELECT * FROM tasks") as cursor:
async for row in cursor:
print(f"Task {row[0]}: {row[1]} (done: {row[2]})")
asyncio.run(main())
10. Error Handling Patterns
import asyncio
async def risky_operation(value: int) -> int:
await asyncio.sleep(0.1)
if value < 0:
raise ValueError(f"Negative value: {value}")
return value * 2
async def main():
# Basic try/except
try:
result = await risky_operation(-1)
except ValueError as e:
print(f"Caught: {e}")
# gather with return_exceptions
results = await asyncio.gather(
risky_operation(5), risky_operation(-1), risky_operation(10),
return_exceptions=True,
)
for i, r in enumerate(results):
print(f"Task {i}: {'FAIL ' + str(r) if isinstance(r, Exception) else r}")
asyncio.run(main())
Timeout handling
async def slow_operation():
await asyncio.sleep(10)
return "done"
async def main():
# Python 3.11+ style
try:
async with asyncio.timeout(2.0):
result = await slow_operation()
except TimeoutError:
print("Timed out (asyncio.timeout)")
# Compatible with all Python versions
try:
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
except asyncio.TimeoutError:
print("Timed out (wait_for)")
asyncio.run(main())
Graceful shutdown
import asyncio, signal
async def worker(name: str, shutdown: asyncio.Event):
while not shutdown.is_set():
print(f"[{name}] Working...")
try:
await asyncio.wait_for(shutdown.wait(), timeout=2.0)
except asyncio.TimeoutError:
continue
print(f"[{name}] Stopped")
async def main():
shutdown = asyncio.Event()
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, shutdown.set)
workers = [asyncio.create_task(worker(f"w-{i}", shutdown)) for i in range(3)]
await asyncio.gather(*workers)
11. Testing Async Code
pytest-asyncio
# pip install pytest pytest-asyncio
import asyncio, pytest
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(0.1)
if user_id <= 0:
raise ValueError("Invalid user ID")
return {"id": user_id, "name": f"User {user_id}"}
@pytest.mark.asyncio
async def test_fetch_user():
user = await fetch_user(1)
assert user["id"] == 1
assert user["name"] == "User 1"
@pytest.mark.asyncio
async def test_fetch_user_invalid():
with pytest.raises(ValueError, match="Invalid user ID"):
await fetch_user(-1)
@pytest.mark.asyncio
async def test_concurrent_fetch():
users = await asyncio.gather(fetch_user(1), fetch_user(2), fetch_user(3))
assert len(users) == 3
assert all(u["id"] > 0 for u in users)
Mocking async functions
from unittest.mock import AsyncMock, patch
async def get_weather(city: str) -> dict:
pass # Calls external API
async def get_forecast(city: str) -> str:
weather = await get_weather(city)
return "Hot" if weather["temp"] > 30 else "Warm" if weather["temp"] > 15 else "Cold"
@pytest.mark.asyncio
async def test_forecast_hot():
with patch("__main__.get_weather", new_callable=AsyncMock,
return_value={"temp": 35, "humidity": 60}):
assert await get_forecast("Phoenix") == "Hot"
@pytest.mark.asyncio
async def test_forecast_cold():
mock = AsyncMock(return_value={"temp": 5})
with patch("__main__.get_weather", mock):
assert await get_forecast("Oslo") == "Cold"
mock.assert_called_once_with("Oslo")
12. asyncio vs Threading vs Multiprocessing
| Feature | asyncio | threading | multiprocessing |
|---|---|---|---|
| Best for | I/O-bound (many connections) | I/O-bound (sync libraries) | CPU-bound |
| Concurrency | Single thread, cooperative | Multi-thread, preemptive | Multi-process |
| GIL affected | N/A (single thread) | Yes (limits CPU work) | No (separate processes) |
| Memory | Very low (~KB/task) | Medium (~MB/thread) | High (~MB/process) |
| Scale | 100K+ tasks | Hundreds of threads | Number of CPU cores |
| Race conditions | Rare (explicit yields) | Common (need locks) | Rare (separate memory) |
| Debugging | Easier (predictable) | Hard (nondeterministic) | Medium |
| Ecosystem | Growing (aiohttp, asyncpg) | Mature (all libraries) | Good (shared memory) |
# Side-by-side: downloading 100 URLs
# asyncio (best for this use case)
async def download_async(urls):
async with aiohttp.ClientSession() as session:
return await asyncio.gather(*[session.get(u) for u in urls])
# threading
from concurrent.futures import ThreadPoolExecutor
import requests
def download_threaded(urls):
with ThreadPoolExecutor(max_workers=20) as pool:
return list(pool.map(lambda u: requests.get(u).text, urls))
# multiprocessing (overkill for I/O)
from multiprocessing import Pool
def download_multi(urls):
with Pool(8) as pool:
return pool.map(lambda u: requests.get(u).text, urls)
13. Real-World Patterns and Best Practices
Producer-consumer with asyncio.Queue
import asyncio, random
async def producer(queue: asyncio.Queue, name: str, count: int):
for i in range(count):
item = f"{name}-item-{i}"
await asyncio.sleep(random.uniform(0.1, 0.5))
await queue.put(item)
print(f"[{name}] Produced: {item}")
await queue.put(None) # Sentinel
async def consumer(queue: asyncio.Queue, name: str):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"[{name}] Processing: {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
producers = [asyncio.create_task(producer(queue, f"P{i}", 5)) for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, f"C{i}")) for i in range(3)]
await asyncio.gather(*producers)
await queue.put(None) # Extra sentinel for remaining consumers
await asyncio.gather(*consumers)
asyncio.run(main())
Async retry decorator
import asyncio, functools
def async_retry(max_retries=3, backoff=1.0, exceptions=(Exception,)):
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
last_exc = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exc = e
if attempt < max_retries - 1:
wait = backoff * (2 ** attempt)
await asyncio.sleep(wait)
raise last_exc
return wrapper
return decorator
@async_retry(max_retries=3, backoff=0.5, exceptions=(ConnectionError,))
async def unreliable_api(endpoint: str) -> dict:
import random
if random.random() < 0.7:
raise ConnectionError("Connection refused")
return {"status": "ok"}
Async caching with TTL
import asyncio
from functools import wraps
def async_cache(ttl: float = 60.0):
cache = {}
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
key = str((args, tuple(sorted(kwargs.items()))))
now = asyncio.get_event_loop().time()
if key in cache and now - cache[key][0] < ttl:
return cache[key][1]
result = await func(*args, **kwargs)
cache[key] = (now, result)
return result
wrapper.cache_clear = cache.clear
return wrapper
return decorator
@async_cache(ttl=30)
async def get_exchange_rate(currency: str) -> float:
await asyncio.sleep(1) # Simulate API call
return 1.08 if currency == "EUR" else 0.79
Best practices summary
- Always use
asyncio.run()as the entry point. Avoid manually managing event loops. - Never call blocking code directly. Use
run_in_executor()for synchronous I/O. - Reuse sessions and pools. One
aiohttp.ClientSessionor connection pool shared across tasks. - Use semaphores to limit concurrency when hitting external services.
- Handle cancellation. Catch
CancelledErrorin long-running tasks for cleanup. - Prefer
TaskGroup(3.11+) overgatherfor structured concurrency. - Use
asyncio.timeout()(3.11+) instead ofwait_for(). - Name your tasks with
create_task(coro, name="x")for debugging. - Test with
pytest-asyncioand mock withAsyncMock.
Frequently Asked Questions
What is asyncio in Python?
asyncio is Python's built-in library for writing concurrent code using async/await. It provides an event loop that manages coroutines -- functions that can pause while waiting for I/O operations like network requests, file reads, or database queries. Unlike threading, asyncio uses a single thread and achieves concurrency by switching between tasks whenever one is waiting, making it highly efficient for I/O-bound workloads. It has been in the standard library since Python 3.4 and matured significantly with Python 3.11+ features like TaskGroup and asyncio.timeout().
When should I use asyncio vs threading vs multiprocessing?
Use asyncio for I/O-bound tasks with many concurrent connections (web servers, scrapers, chat apps). It uses very little memory per task and scales to hundreds of thousands of concurrent operations. Use threading for I/O-bound work that requires synchronous libraries without async alternatives. Use multiprocessing for CPU-bound tasks like image processing, data crunching, or ML inference, because it bypasses the GIL by running separate Python processes.
How do I run multiple async tasks concurrently?
Use asyncio.gather() to run coroutines concurrently and collect results in order: results = await asyncio.gather(task1(), task2(), task3()). Create Task objects with asyncio.create_task() for background execution. Use asyncio.wait() with return_when=FIRST_COMPLETED for processing tasks as they finish. Use asyncio.as_completed() to iterate results in completion order. In Python 3.11+, use asyncio.TaskGroup() for structured concurrency. Limit concurrency with asyncio.Semaphore.