Claude
Skills
Sign in
Back

asyncio

Included with Lifetime
$97 forever

Python asyncio - Modern concurrent programming with async/await, event loops, tasks, coroutines, primitives, aiohttp, and FastAPI async patterns

toolchainasyncioasyncconcurrencycoroutinesaiohttpfastapiasync-databaseevent-loop

What this skill does


# Python asyncio - Async/Await Concurrency

## Overview

Python's asyncio library enables writing concurrent code using async/await syntax. It's ideal for I/O-bound operations like HTTP requests, database queries, file operations, and WebSocket connections. asyncio provides non-blocking execution without the complexity of threading or multiprocessing.

**Key Features**:
- async/await syntax for readable concurrent code
- Event loop for managing concurrent operations
- Tasks for running multiple coroutines concurrently
- Primitives: locks, semaphores, events, queues
- HTTP client/server with aiohttp
- Database async support (asyncpg, aiomysql, motor)
- FastAPI async endpoints
- WebSocket support
- Background task management

**Installation**:
```bash
# asyncio is built-in (Python 3.7+)

# Async HTTP client
pip install aiohttp

# Async HTTP requests (alternative)
pip install httpx

# Async database drivers
pip install asyncpg aiomysql motor  # PostgreSQL, MySQL, MongoDB

# FastAPI with async support
pip install fastapi uvicorn[standard]

# Async testing
pip install pytest-asyncio
```

## Basic Async/Await Patterns

### 1. Simple Async Function

```python
import asyncio

async def hello():
    """Basic async function (coroutine)."""
    print("Hello")
    await asyncio.sleep(1)  # Async sleep (non-blocking)
    print("World")
    return "Done"

# Run async function
result = asyncio.run(hello())
print(result)  # "Done"
```

**Key Points**:
- `async def` defines a coroutine function
- `await` suspends execution until awaitable completes
- `asyncio.run()` is the entry point for async programs
- Coroutines must be awaited or scheduled

### 2. Multiple Concurrent Tasks

```python
import asyncio
import time

async def task(name, duration):
    """Simulate async task."""
    print(f"{name}: Starting (duration: {duration}s)")
    await asyncio.sleep(duration)
    print(f"{name}: Complete")
    return f"{name} result"

async def run_concurrent():
    """Run multiple tasks concurrently."""
    start = time.time()

    # Sequential (slow) - 6 seconds total
    # result1 = await task("Task 1", 3)
    # result2 = await task("Task 2", 2)
    # result3 = await task("Task 3", 1)

    # Concurrent (fast) - 3 seconds total
    results = await asyncio.gather(
        task("Task 1", 3),
        task("Task 2", 2),
        task("Task 3", 1)
    )

    elapsed = time.time() - start
    print(f"Total time: {elapsed:.2f}s")
    print(f"Results: {results}")

asyncio.run(run_concurrent())
# Output: Total time: 3.00s (tasks ran concurrently)
```

### 3. Task Creation and Management

```python
import asyncio

async def background_task(name):
    """Long-running background task."""
    for i in range(5):
        print(f"{name}: iteration {i}")
        await asyncio.sleep(1)
    return f"{name} complete"

async def main():
    # Create task (starts immediately)
    task1 = asyncio.create_task(background_task("Task-1"))
    task2 = asyncio.create_task(background_task("Task-2"))

    # Do other work while tasks run
    print("Main: doing other work")
    await asyncio.sleep(2)

    # Wait for tasks to complete
    result1 = await task1
    result2 = await task2

    print(f"Results: {result1}, {result2}")

asyncio.run(main())
```

### 4. Error Handling in Async Code

```python
import asyncio

async def risky_operation(fail=False):
    """Operation that might fail."""
    await asyncio.sleep(1)
    if fail:
        raise ValueError("Operation failed")
    return "Success"

async def handle_errors():
    # Individual try/except
    try:
        result = await risky_operation(fail=True)
    except ValueError as e:
        print(f"Caught error: {e}")
        result = "Fallback value"

    # Gather with error handling
    results = await asyncio.gather(
        risky_operation(fail=False),
        risky_operation(fail=True),
        risky_operation(fail=False),
        return_exceptions=True  # Return exceptions instead of raising
    )

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")

asyncio.run(handle_errors())
```

## Event Loop Fundamentals

### 1. Event Loop Lifecycle

```python
import asyncio

# Modern approach (Python 3.7+)
async def main():
    print("Main coroutine")
    await asyncio.sleep(1)

asyncio.run(main())  # Creates loop, runs main, closes loop

# Manual loop management (advanced use cases)
async def manual_example():
    loop = asyncio.get_event_loop()

    # Schedule coroutine
    task = loop.create_task(some_coroutine())

    # Schedule callback
    loop.call_later(5, callback_function)

    # Run until complete
    result = await task

    return result

# Get current event loop
async def get_current_loop():
    loop = asyncio.get_running_loop()
    print(f"Loop: {loop}")

    # Schedule callback in event loop
    loop.call_soon(lambda: print("Callback executed"))

    await asyncio.sleep(0)  # Let callback execute
```

### 2. Loop Scheduling and Callbacks

```python
import asyncio
from datetime import datetime

def callback(name, loop):
    """Callback function (not async)."""
    print(f"{datetime.now()}: {name} callback executed")

    # Stop loop after callback
    # loop.stop()

async def schedule_callbacks():
    loop = asyncio.get_running_loop()

    # Schedule immediate callback
    loop.call_soon(callback, "Immediate", loop)

    # Schedule callback after delay
    loop.call_later(2, callback, "Delayed 2s", loop)

    # Schedule callback at specific time
    loop.call_at(loop.time() + 3, callback, "Delayed 3s", loop)

    # Wait for callbacks to execute
    await asyncio.sleep(5)

asyncio.run(schedule_callbacks())
```

### 3. Running Blocking Code

```python
import asyncio
import time

def blocking_io():
    """CPU-intensive or blocking I/O operation."""
    print("Blocking operation started")
    time.sleep(2)  # Blocks thread
    print("Blocking operation complete")
    return "Blocking result"

async def run_in_executor():
    """Run blocking code in thread pool."""
    loop = asyncio.get_running_loop()

    # Run in default executor (thread pool)
    result = await loop.run_in_executor(
        None,  # Use default executor
        blocking_io
    )

    print(f"Result: {result}")

# Run blocking operations concurrently
async def concurrent_blocking():
    loop = asyncio.get_running_loop()

    # These run in thread pool, don't block event loop
    results = await asyncio.gather(
        loop.run_in_executor(None, blocking_io),
        loop.run_in_executor(None, blocking_io),
        loop.run_in_executor(None, blocking_io)
    )

    print(f"All results: {results}")

asyncio.run(concurrent_blocking())
```

## Asyncio Primitives

### 1. Locks for Mutual Exclusion

```python
import asyncio

# Shared resource
counter = 0
lock = asyncio.Lock()

async def increment_with_lock(name):
    """Increment counter with lock protection."""
    global counter

    async with lock:
        # Critical section - only one task at a time
        print(f"{name}: acquired lock")
        current = counter
        await asyncio.sleep(0.1)  # Simulate processing
        counter = current + 1
        print(f"{name}: released lock, counter={counter}")

async def increment_without_lock(name):
    """Increment without lock - race condition!"""
    global counter

    current = counter
    await asyncio.sleep(0.1)  # Race condition window
    counter = current + 1
    print(f"{name}: counter={counter}")

async def test_locks():
    global counter

    # Without lock (race condition)
    counter = 0
    await asyncio.gather(
        increment_without_lock("Task-1"),
        increment_without_lock("Task-2"),
        increment_without_lock("Task-3")
    )
    print(f"Without lock: {counter}")  # Often wrong (< 3)

    # With lock (correct)
    counter = 0
    await asyncio.gather(
        increment_with_lock("Task-1"),
        increment_wi

Related in toolchain