Python Asyncio Reference

Python Asyncio Reference

Welcome to your go-to reference for Python’s asyncio! Whether you're just starting out or looking for a detailed refresher, this guide will walk you through the fundamental concepts, key functions, and common patterns. Let’s dive right in.

Introduction to Asyncio

Asyncio is Python’s built-in library for writing concurrent code using the async/await syntax. It’s especially useful for I/O-bound and high-level structured network code. Instead of using threads, asyncio uses a single-threaded event loop, allowing you to write non-blocking code that can handle thousands of connections efficiently.

To use asyncio, you define coroutines with the async def syntax. Inside these coroutines, you can await other coroutines or asyncio-friendly functions. This enables cooperative multitasking: when one task is waiting (e.g., for a network response), another can run.

Here’s a simple example:

import asyncio

async def say_hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

asyncio.run(say_hello())

This program prints "Hello", waits for one second, and then prints "World". The asyncio.sleep(1) is non-blocking — it lets the event loop handle other tasks during the wait.

Core Components

Understanding asyncio requires familiarity with a few key components: the event loop, coroutines, tasks, and futures.

Event Loop

The event loop is the core of every asyncio application. It runs asynchronous tasks and callbacks, performs network IO operations, and handles subprocesses. You rarely interact with the loop directly; instead, you use high-level functions like asyncio.run().

import asyncio

async def main():
    await asyncio.sleep(1)

# asyncio.run() creates an event loop, runs the coroutine, and closes the loop.
asyncio.run(main())

Coroutines

Coroutines are the building blocks of asyncio. Declared with async def, they can pause execution using await. You can only call them from other coroutines or by running them via the event loop.

async def fetch_data():
    return "Data fetched"

async def main():
    result = await fetch_data()
    print(result)

asyncio.run(main())

Tasks

Tasks are used to schedule coroutines concurrently. When you wrap a coroutine in a task, it’s added to the event loop and will run automatically when the loop is active.

async def task_example():
    task = asyncio.create_task(fetch_data())
    # Do other things here while fetch_data runs in the background.
    await task
    print(task.result())

Futures

A Future is a low-level awaitable object that represents an eventual result of an asynchronous operation. Tasks are a subclass of Future. You typically use tasks, but it’s good to know futures exist for more advanced patterns.

Common Functions and Patterns

Let’s explore some of the most frequently used asyncio functions and how to apply them.

asyncio.run()

This function runs a coroutine and manages the event loop. It’s the main entry point for asyncio programs.

async def my_coroutine():
    await asyncio.sleep(2)
    print("Done")

if __name__ == "__main__":
    asyncio.run(my_coroutine())

asyncio.gather()

Use gather() to run multiple awaitables concurrently and wait for all to complete.

async def func1():
    await asyncio.sleep(1)
    return "Func1 done"

async def func2():
    await asyncio.sleep(2)
    return "Func2 done"

async def main():
    results = await asyncio.gather(func1(), func2())
    print(results)  # ['Func1 done', 'Func2 done']

asyncio.run(main())

asyncio.wait_for()

This function sets a timeout for an awaitable. If it doesn’t complete in time, it raises a TimeoutError.

async def slow_operation():
    await asyncio.sleep(5)
    return "Finished"

async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=3.0)
    except asyncio.TimeoutError:
        print("Took too long!")

asyncio.run(main())

asyncio.sleep()

A non-blocking sleep that yields control back to the event loop.

async def counter():
    for i in range(3):
        print(i)
        await asyncio.sleep(1)

Working with Async IO

Asyncio shines when dealing with I/O-bound operations. Here’s how you might use it with network requests. Note: For real HTTP requests, you’d use an async library like aiohttp, but this illustrates the idea.

import asyncio

async def mock_http_request(url):
    print(f"Fetching {url}")
    await asyncio.sleep(2)  # Simulate network delay
    return f"Response from {url}"

async def main():
    urls = ["http://example.com", "http://example.org", "http://example.net"]
    tasks = [mock_http_request(url) for url in urls]
    responses = await asyncio.gather(*tasks)
    for resp in responses:
        print(resp)

asyncio.run(main())

This runs three "HTTP requests" concurrently, taking about 2 seconds total instead of 6.

Synchronization Primitives

When working with shared resources in concurrent code, you may need locks, semaphores, or events. Asyncio provides these too.

Lock

Prevents multiple coroutines from accessing a shared resource simultaneously.

lock = asyncio.Lock()

async def use_resource():
    async with lock:
        print("Resource is locked")
        await asyncio.sleep(1)
    print("Resource released")

async def main():
    await asyncio.gather(use_resource(), use_resource())

Semaphore

Limits the number of coroutines that can access a resource concurrently.

sem = asyncio.Semaphore(2)

async def limited_task(id):
    async with sem:
        print(f"Task {id} acquired semaphore")
        await asyncio.sleep(1)
    print(f"Task {id} released")

async def main():
    await asyncio.gather(*(limited_task(i) for i in range(4)))

Only two tasks will run at a time.

Error Handling

Handling exceptions in asyncio is similar to synchronous code, but with a few nuances.

async def might_fail():
    raise ValueError("Something went wrong")

async def main():
    try:
        await might_fail()
    except ValueError as e:
        print(f"Caught error: {e}")

asyncio.run(main())

You can also handle errors in gathered tasks:

async def ok():
    return "OK"

async def fail():
    raise RuntimeError("Failed")

async def main():
    results = await asyncio.gather(ok(), fail(), return_exceptions=True)
    for result in results:
        if isinstance(result, Exception):
            print(f"Error: {result}")
        else:
            print(f"Success: {result}")

Best Practices

Follow these tips to write clean and efficient asyncio code:

  • Always use asyncio.run() as your main entry point instead of managing the event loop manually.
  • Prefer asyncio.create_task() for spawning background tasks rather than lower-level functions.
  • Avoid blocking calls inside coroutines. Use async libraries for I/O.
  • Use timeouts with asyncio.wait_for() to prevent hanging indefinitely.
  • Test your code with different event loop policies and under load.

Common Pitfalls

Watch out for these common mistakes:

  • Forgetting to await a coroutine.
  • Blocking the event loop with CPU-bound or synchronous I/O calls.
  • Creating too many tasks without backpressure can lead to resource exhaustion.
  • Not handling exceptions properly in concurrent tasks.

Comparison with Threading

While both asyncio and threading handle concurrency, they do so differently. Asyncio uses a single thread and cooperative multitasking, making it lightweight and efficient for I/O-bound tasks. Threading uses preemptive multitasking with multiple threads, which can be better for CPU-bound tasks but comes with higher overhead and complexity from locking.

Feature Asyncio Threading
Concurrency Model Cooperative multitasking Preemptive multitasking
Best For I/O-bound operations CPU-bound operations
Overhead Low Higher
Complexity Moderate (async/await) High (locks, race conditions)

Real-World Example

Let's put it all together with a practical example: a simple async web scraper using aiohttp (you’ll need to install it with pip install aiohttp).

import aiohttp
import asyncio

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "https://httpbin.org/html",
        "https://httpbin.org/json",
        "https://httpbin.org/xml"
    ]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        pages = await asyncio.gather(*tasks)
        for url, content in zip(urls, pages):
            print(f"{url} -> {len(content)} bytes")

asyncio.run(main())

This code fetches three web pages concurrently, demonstrating asyncio’s power for network I/O.

Advanced Topics

Once you’re comfortable with the basics, you might explore:

  • Streams for low-level network I/O.
  • Subprocesses for running external commands asynchronously.
  • Custom event loops for specialized use cases.
  • Async generators and async context managers.

But for most applications, the patterns covered here will serve you well.

Remember: practice is key. Start small, experiment with the examples, and gradually incorporate asyncio into your projects. Happy coding!