Claude
Skills
Sign in
Back

asyncio-concurrency-patterns

Included with Lifetime
$97 forever

Complete guide for asyncio concurrency patterns including event loops, coroutines, tasks, futures, async context managers, and performance optimization

Generalasyncioconcurrencyasyncpythonevent-loopcoroutinesperformance

What this skill does


# Asyncio Concurrency Patterns

A comprehensive skill for mastering Python's asyncio library and concurrent programming patterns. This skill covers event loops, coroutines, tasks, futures, synchronization primitives, async context managers, and production-ready patterns for building high-performance asynchronous applications.

## When to Use This Skill

Use this skill when:

- Building I/O-bound applications that need to handle many concurrent operations
- Creating web servers, API clients, or websocket applications
- Implementing real-time systems with event-driven architecture
- Optimizing application performance with concurrent request handling
- Managing multiple async operations with proper coordination and error handling
- Building background task processors or job queues
- Implementing async database operations and connection pooling
- Creating chat applications, real-time dashboards, or notification systems
- Handling parallel HTTP requests efficiently
- Managing websocket connections with multiple event sources
- Building microservices with async communication patterns
- Optimizing resource utilization in network applications

## Core Concepts

### What is Asyncio?

Asyncio is Python's built-in library for writing concurrent code using the async/await syntax. It provides:

- **Event Loop**: The core of asyncio that schedules and runs asynchronous tasks
- **Coroutines**: Functions defined with `async def` that can be paused and resumed
- **Tasks**: Scheduled coroutines that run concurrently
- **Futures**: Low-level objects representing results of async operations
- **Synchronization Primitives**: Locks, semaphores, events for coordination

### Event Loop Fundamentals

The event loop is the central execution mechanism in asyncio:

```python
import asyncio

# Get or create an event loop
loop = asyncio.get_event_loop()

# Run a coroutine until complete
loop.run_until_complete(my_coroutine())

# Modern approach (Python 3.7+)
asyncio.run(my_coroutine())
```

**Key Event Loop Concepts:**

1. **Single-threaded concurrency**: One thread, many tasks
2. **Cooperative multitasking**: Tasks yield control voluntarily
3. **I/O multiplexing**: Efficient handling of many I/O operations
4. **Non-blocking operations**: Don't wait for I/O, do other work

### Coroutines vs Functions

**Regular Function:**
```python
def fetch_data():
    # Blocks until complete
    return requests.get('http://api.example.com')
```

**Coroutine:**
```python
async def fetch_data():
    # Yields control while waiting
    async with aiohttp.ClientSession() as session:
        async with session.get('http://api.example.com') as resp:
            return await resp.text()
```

### Tasks and Futures

**Tasks** wrap coroutines and schedule them on the event loop:

```python
# Create a task
task = asyncio.create_task(my_coroutine())

# Task runs in background
# ... do other work ...

# Wait for result
result = await task
```

**Futures** represent eventual results:

```python
# Low-level future (rarely used directly)
future = asyncio.Future()

# Set result
future.set_result(42)

# Get result
result = await future
```

### Async Context Managers

Manage resources with async setup/teardown:

```python
class AsyncResource:
    async def __aenter__(self):
        # Async setup
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # Async cleanup
        await self.disconnect()

# Usage
async with AsyncResource() as resource:
    await resource.do_work()
```

## Concurrency Patterns

### Pattern 1: Gather - Concurrent Execution

Run multiple coroutines concurrently and wait for all to complete:

```python
import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        # Run all fetches concurrently
        results = await asyncio.gather(
            fetch(session, 'http://python.org'),
            fetch(session, 'http://docs.python.org'),
            fetch(session, 'http://pypi.org')
        )
        return results

# Results is a list in the same order as inputs
results = asyncio.run(main())
```

**When to use:**
- Need all results
- Order matters
- Want to fail fast on first exception (default)
- Can handle partial results with `return_exceptions=True`

### Pattern 2: Wait - Flexible Waiting

More control over how to wait for multiple tasks:

```python
import asyncio

async def task_a():
    await asyncio.sleep(2)
    return 'A'

async def task_b():
    await asyncio.sleep(1)
    return 'B'

async def main():
    tasks = [
        asyncio.create_task(task_a()),
        asyncio.create_task(task_b())
    ]

    # Wait for first to complete
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    # Get first result
    first_result = done.pop().result()

    # Cancel remaining
    for task in pending:
        task.cancel()

    return first_result

result = asyncio.run(main())  # Returns 'B' after 1 second
```

**Wait strategies:**
- `FIRST_COMPLETED`: Return when first task finishes
- `FIRST_EXCEPTION`: Return when first task raises exception
- `ALL_COMPLETED`: Wait for all tasks (default)

### Pattern 3: Semaphore - Limit Concurrency

Control maximum number of concurrent operations:

```python
import asyncio
import aiohttp

async def fetch_with_limit(session, url, semaphore):
    async with semaphore:
        # Only N requests run concurrently
        async with session.get(url) as resp:
            return await resp.text()

async def main():
    # Limit to 5 concurrent requests
    semaphore = asyncio.Semaphore(5)

    urls = [f'http://api.example.com/item/{i}' for i in range(100)]

    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_with_limit(session, url, semaphore)
            for url in urls
        ]
        results = await asyncio.gather(*tasks)

    return results

asyncio.run(main())
```

**When to use:**
- Rate limiting API requests
- Controlling database connection usage
- Preventing resource exhaustion
- Respecting external service limits

### Pattern 4: Lock - Mutual Exclusion

Ensure only one coroutine accesses a resource at a time:

```python
import asyncio

class SharedCounter:
    def __init__(self):
        self.value = 0
        self.lock = asyncio.Lock()

    async def increment(self):
        async with self.lock:
            # Critical section - only one coroutine at a time
            current = self.value
            await asyncio.sleep(0)  # Simulate async work
            self.value = current + 1

async def worker(counter):
    for _ in range(100):
        await counter.increment()

async def main():
    counter = SharedCounter()

    # Run 10 workers concurrently
    await asyncio.gather(*[worker(counter) for _ in range(10)])

    print(f"Final count: {counter.value}")  # Always 1000

asyncio.run(main())
```

### Pattern 5: Event - Signaling

Coordinate multiple coroutines with events:

```python
import asyncio

async def waiter(event, name):
    print(f'{name} waiting for event')
    await event.wait()
    print(f'{name} received event')

async def setter(event):
    await asyncio.sleep(2)
    print('Setting event')
    event.set()

async def main():
    event = asyncio.Event()

    # Multiple waiters
    await asyncio.gather(
        waiter(event, 'Waiter 1'),
        waiter(event, 'Waiter 2'),
        waiter(event, 'Waiter 3'),
        setter(event)
    )

asyncio.run(main())
```

### Pattern 6: Queue - Producer/Consumer

Coordinate work between producers and consumers:

```python
import asyncio

async def producer(queue, n):
    for i in range(n):
        await asyncio.sleep(0.1)
        await queue.put(f'item-{i}')
        print(f'Produced item-{i}')

    # Signal completion
    await queue.put(None)

async def consumer(queue, name):
    while True:
      

Related in General