Claude
Skills
Sign in
Back

golang-concurrency-patterns

Included with Lifetime
$97 forever

Go concurrency patterns for production services: context cancellation, errgroup, worker pools, bounded parallelism, fan-in/fan-out, and common race/deadlock pitfalls

toolchaingolangconcurrencycontexterrgroupchannelsmutexrace-detectorworker-pool

What this skill does


# Go Concurrency Patterns (Production)

## Overview

Go concurrency scales when goroutine lifetimes are explicit, cancellation is propagated with `context.Context`, and shared state is protected (channels or locks). Apply these patterns to build reliable services and avoid common failure modes: goroutine leaks, deadlocks, and data races.

## Quick Start

**Default building blocks**
- Use `context` to drive cancellation and deadlines.
- Use `errgroup.WithContext` for fan-out/fan-in with early abort.
- Bound concurrency (avoid unbounded goroutines) with a semaphore or worker pool.
- Prefer immutable data; otherwise protect shared state with a mutex or make a single goroutine the owner.

**Avoid**
- Fire-and-forget goroutines in request handlers.
- `time.After` inside hot loops.
- Closing channels from the receiver side.
- Sharing mutable variables across goroutines without synchronization.

## Core Concepts

### Goroutine lifecycle

Treat goroutines as resources with a clear owner and shutdown condition.

✅ **Correct: stop goroutines via context**
```go
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
    ticker := time.NewTicker(250 * time.Millisecond)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            // do work
        }
    }
}()
```

❌ **Wrong: goroutine without a stop condition**
```go
go func() {
    for {
        doWork() // leaks forever
    }
}()
```

### Channels vs mutexes (choose intentionally)

- Use **channels** to model ownership/serialization of state or to pipeline work.
- Use **mutexes** to protect shared in-memory state with simple read/write patterns.

✅ **Correct: one goroutine owns the map**
```go
type req struct {
    key   string
    reply chan<- int
}

func mapOwner(ctx context.Context, in <-chan req) {
    m := map[string]int{}
    for {
        select {
        case <-ctx.Done():
            return
        case r := <-in:
            r.reply <- m[r.key]
        }
    }
}
```

✅ **Correct: mutex protects shared map**
```go
type SafeMap struct {
    mu sync.RWMutex
    m  map[string]int
}

func (s *SafeMap) Get(k string) (int, bool) {
    s.mu.RLock()
    defer s.mu.RUnlock()
    v, ok := s.m[k]
    return v, ok
}
```

## Patterns

### 1) Fan-out/fan-in with cancellation (`errgroup`)

Use `errgroup.WithContext` to run concurrent tasks, cancel siblings on error, and wait for completion.

✅ **Correct: cancel on first error**
```go
g, ctx := errgroup.WithContext(ctx)

for _, id := range ids {
    id := id // capture
    g.Go(func() error {
        return process(ctx, id)
    })
}

if err := g.Wait(); err != nil {
    return err
}
```

❌ **Wrong: WaitGroup loses the first error and does not propagate cancellation**
```go
var wg sync.WaitGroup
for _, id := range ids {
    wg.Add(1)
    go func() {
        defer wg.Done()
        _ = process(context.Background(), id) // ignores caller ctx + captures id
    }()
}
wg.Wait()
```

### 2) Bounded concurrency (semaphore pattern)

Bound parallelism to prevent CPU/memory exhaustion and downstream overload.

✅ **Correct: bounded fan-out**
```go
limit := make(chan struct{}, 8) // max 8 concurrent
g, ctx := errgroup.WithContext(ctx)

for _, id := range ids {
    id := id
    g.Go(func() error {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case limit <- struct{}{}:
        }
        defer func() { <-limit }()

        return process(ctx, id)
    })
}

return g.Wait()
```

### 3) Worker pool (durable throughput)

Use a fixed number of workers for stable throughput and predictable resource usage.

✅ **Correct: worker pool with context stop**
```go
type Job struct{ ID string }

func runPool(ctx context.Context, jobs <-chan Job, workers int) error {
    g, ctx := errgroup.WithContext(ctx)

    for i := 0; i < workers; i++ {
        g.Go(func() error {
            for {
                select {
                case <-ctx.Done():
                    return ctx.Err()
                case j, ok := <-jobs:
                    if !ok {
                        return nil
                    }
                    if err := handleJob(ctx, j); err != nil {
                        return err
                    }
                }
            }
        })
    }

    return g.Wait()
}
```

### 4) Pipeline stages (fan-out between stages)

Prefer one-directional channels and close only from the sending side.

✅ **Correct: sender closes**
```go
func stageA(ctx context.Context, out chan<- int) {
    defer close(out)
    for i := 0; i < 10; i++ {
        select {
        case <-ctx.Done():
            return
        case out <- i:
        }
    }
}
```

❌ **Wrong: receiver closes**
```go
func stageB(in <-chan int) {
    close(in) // compile error in<-chan; also wrong ownership model
}
```

### 5) Periodic work without leaks (`time.Ticker` vs `time.After`)

Use `time.NewTicker` for loops; avoid `time.After` allocations in hot paths.

✅ **Correct: ticker**
```go
t := time.NewTicker(1 * time.Second)
defer t.Stop()

for {
    select {
    case <-ctx.Done():
        return
    case <-t.C:
        poll()
    }
}
```

❌ **Wrong: time.After in loop**
```go
for {
    select {
    case <-ctx.Done():
        return
    case <-time.After(1 * time.Second):
        poll()
    }
}
```

## Decision Trees

### Channel vs Mutex

- Need **ownership/serialization** (single writer, message passing) → use **channel + owner goroutine**
- Need **shared cache/map** with many readers and simple updates → use **RWMutex**
- Need **simple counter** with low contention → use **atomic**

### WaitGroup vs errgroup

- Need **error propagation + sibling cancellation** → use **`errgroup.WithContext`**
- Need **only wait** and errors are handled elsewhere → use **`sync.WaitGroup`**

### Buffered vs unbuffered channel

- Need **backpressure** and synchronous handoff → use **unbuffered**
- Need **burst absorption** up to a known size → use **buffered** (size with intent)
- Unsure → start unbuffered and measure; add buffer only to remove known bottleneck

## Testing & Verification

### Race detector and flake control

Run targeted tests with the race detector and disable caching during debugging:

```bash
go test -race ./...
go test -run TestName -race -count=1 ./...
```

### Timeouts to prevent hanging tests

✅ **Correct: test-level timeout via context**
```go
func TestSomething(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    if err := doThing(ctx); err != nil {
        t.Fatal(err)
    }
}
```

## Troubleshooting

### Symptom: deadlock (test hangs, goroutines blocked)

Actions:
- Add timeouts (`context.WithTimeout`) around blocking operations.
- Verify channel ownership: only the sender closes; receivers stop on `ok == false`.
- Check for missing `<-limit` release in semaphore patterns.

### Symptom: data race (`go test -race` reports)

Actions:
- Identify shared variables mutated by multiple goroutines.
- Add a mutex or convert to ownership model (single goroutine owns state).
- Avoid writing to captured loop variables.

### Symptom: goroutine leak (memory growth, slow shutdown)

Actions:
- Ensure every goroutine selects on `ctx.Done()`.
- Ensure `time.Ticker` is stopped and channels are closed by senders.
- Avoid `context.Background()` inside request paths; propagate caller context.

## Resources

- Go Blog: Concurrency patterns: https://go.dev/blog/pipelines
- `errgroup`: https://pkg.go.dev/golang.org/x/sync/errgroup
- Go memory model: https://go.dev/ref/mem

Related in toolchain