---
title: Go Concurrency Patterns
date: 2024-11-15
description: Essential Go concurrency patterns including goroutines, channels, mutexes, and common synchronization techniques.
tags: [go]
---

## Goroutines

The `go` keyword is used to start a goroutine.

A goroutine is a lightweight, managed thread used by the Go runtime to run
functions concurrently.

Unlike OS threads, which have a fixed stack size (often around 1MB), goroutines
start with a very small stack, around 2KB, and can grow or shrink dynamically as
needed. This makes it possible to run thousands or even millions of goroutines
simultaneously, depending on the available memory.

Goroutines are sometimes compared to "green threads," which are threads that are
scheduled in user space rather than by the OS. The problem with green threads is
that they may not leverage multiple CPU cores efficiently since they don't
interact directly with the OS’s scheduler.

Goroutines are similar to green threads in that they are scheduled by the Go
runtime rather than the OS. However, they differ in a crucial way: Go uses a
model called Mscheduling, where the Go runtime maps multiple goroutines (M) onto
multiple OS threads (N). This allows the runtime to distribute goroutines across
multiple CPU cores when possible, making Go’s concurrency model more efficient
and scalable than traditional green threads.

> [!IMPORTANT]
> The following example uses a `time.Sleep` to wait for the goroutine to finish.\
> This is done for simplicity. Do NOT use this approach.\
> I'll explain alternative options afterwards.

```go
package main

import (
    "fmt"
    "time"
)

func sayHello() {
    fmt.Println("Hello from a goroutine!")
}

func main() {
    go sayHello() // Launches sayHello in a new goroutine

    fmt.Println("Hello from main!")
    time.Sleep(1 * time.Second) // Wait for the goroutine to complete (only for example)
}
```

https://play.golang.com/p/SdOdZ90-exI

> [!INFO]
> In Go, the `main` function is effectively the "initial" goroutine.\
> When a Go program starts, the Go runtime creates a goroutine to run `main`.\
> This main goroutine can then spawn additional goroutines as needed.

## Channels

Channels in Go are a powerful way to communicate between goroutines and to
synchronize them. They allow you to send and receive values across goroutines,
and they help avoid race conditions by enabling safe data sharing.

> [!IMPORTANT]
> Sending and receiving channel messages can
> [block](https://go.dev/tour/concurrency/2).\
> In the following example the `<-ch` BLOCKS the `main()` function.\
> While the `ch <-` BLOCKS the `sendMessage()` function.\
> This can cause a deadlock, hence we use a goroutine to unblock things.

```go
package main

import (
    "fmt"
)

func sendMessage(ch chan string) {
    ch <- "Hello from a goroutine!" // Send a message to the channel
                                    // ⚠️ This BLOCKS but as it's running in a goroutine 
                                    // the program is unaffected.
}

func main() {
    // Create a new channel of type string
    ch := make(chan string)

    // Start a goroutine to send a message
    go sendMessage(ch)

    // Receive the message from the channel
    msg := <-ch // ⚠️ This BLOCKS the program (hence sendMessage() runs in a goroutine)
    fmt.Println(msg) // Output: Hello from a goroutine!
}
```

https://play.golang.com/p/2Qn_NacVw-0

> [!IMPORTANT]
> You can `range` over a channel, but the loop will never stop unless the
> channel is closed.\
> So when ranging over a channel, think how the program can proceed and when is
> the channel going to be closed.

> [!INFO]
> Depending on what you application needs, you can create
> [buffered](https://go.dev/tour/concurrency/3) channels.\
> Sends to a buffered channel block only when the buffer is full.

> [!TIP]
> The most crucial best practice is to close the channel from the sender side, not the receiver.\
> The sender is the goroutine that writes data to the channel.\
> Hence the sender knows when there's no more data to be sent, not the receiver.

## Select Statement

The `select` statement is used to wait on multiple channel operations.\
It blocks until one of its cases can proceed, which makes it essential for
handling multiple asynchronous tasks.

Use `select` when you have multiple channels to listen to, and you want to
respond to whichever channel receives data first.

> [!IMPORTANT]
> In the following example, the first goroutine uses a `time.Sleep`.\
> This is to simulate the operation taking a long time.\
> It results in the `select` pulling a value from the second goroutine.

```go
package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "result from ch1"
    }()

    go func() {
        ch2 <- "result from ch2"
    }()

    select {
    case msg1 := <-ch1:
        fmt.Println("Received:", msg1)
    case msg2 := <-ch2:
        fmt.Println("Received:", msg2)
    }
}
```

https://play.golang.com/p/HXe-bZ\_\_EEy

A common use case for `select` is to timeout a potential deadlock:

> [!IMPORTANT]
> In the following example we use `time.After` to cause a timeout.

```go
package main

import (
    "fmt"
    "time"
)

func main() {
    // Create an unbuffered channel
    ch := make(chan string)

    // Start a goroutine that simulates a delayed send
    go func() {
        time.Sleep(3 * time.Second)   // Simulate a delay
        ch <- "Hello from goroutine!" // Send a message after delay
    }()

    select {
    case msg := <-ch:
        fmt.Println("Received:", msg)
    case <-time.After(2 * time.Second): // Timeout after 2 seconds
        fmt.Println("Timeout! No message received.")
    }
}
```

https://play.golang.com/p/6BMPeUKdqkg

## Wait Groups

A `sync.WaitGroup` waits for a collection of goroutines to finish. It helps
coordinate a group of goroutines and ensures the program waits until all of them
have completed before proceeding.

Use a `WaitGroup` when you need to wait for multiple goroutines to finish before
moving on.

In this example, a `sync.WaitGroup` is used to wait for three goroutines to
complete. Each goroutine represents a worker, and each one calls `wg.Done()` to
signal that it's finished. The `main` function calls `wg.Wait()` to block until
all workers are done.

```go
package main

import (
    "fmt"
    "sync"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // Decrement counter when goroutine completes
    fmt.Printf("Worker %d starting\n", id)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup

    for i := range 3 {
        wg.Add(1) // Track each goroutine started
        go worker(i, &wg)
    }

    // Wait for all goroutines to finish
    wg.Wait()

    fmt.Println("All workers done.")
}
```

https://play.golang.com/p/LhEdSQIPp1R

> [!TIP]
> As of Go 1.25 you no longer need to `wg.Add`/`wg.Defer` for simple cases.\
> Instead you can just use `wg.Go()`.

```go
package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup

    wg.Go(func() {
        fmt.Println("go is awesome")
    })

    wg.Go(func() {
        fmt.Println("cats are cute")
    })

    wg.Wait()
    fmt.Println("done")
}
```

https://go.dev/play/p/b_WdOO7IYVM?v=gotip

## Error Groups

The [errgroup](https://pkg.go.dev/golang.org/x/sync/errgroup) package is like a
`sync.WaitGroup` but with added capabilities for error handling and context
cancellation.

While a WaitGroup simply waits for a collection of goroutines to finish, an
errgroup also collects any errors returned by those goroutines and can cancel
all of them if one fails.

In the following example, when one worker fails, it triggers a cancellation in
the other workers (some might complete in time, but those that don't will be
forcefully cancelled):

```go
package main

import (
    "context"
    "fmt"
    "net/http"
    "time"

    "golang.org/x/sync/errgroup"
)

func main() {
    ctx := context.Background()

    // errgroup.WithContext creates a new Group and a derived Context.
    // The errgroup context is cancelled when the first goroutine returns a non-nil
    // error or when the g.Wait function returns.
    g, ctx := errgroup.WithContext(ctx)

    // List of URLs to fetch
    urls := []string{
        "https://http-me.fastly.dev/status=200",
        "https://http-me.fastly.dev/?wait=1000&status=201", // This never completes (i.e. goroutine is cancelled)
        "https://http-me.fastly.dev/?wait=1000&status=202", // This never completes (i.e. goroutine is cancelled)
        "https://http-me.fastly.dev/status=500",            // This one will return an error
    }

    for _, url := range urls {
        u := url

        // Start a goroutine for each URL
        g.Go(func() error {
            fmt.Println("Goroutine for ", u)

            // Create a request with context
            req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
            if err != nil {
                return fmt.Errorf("creating request: %w", err)
            }

            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return fmt.Errorf("request failed: %w", err)
            }
            defer resp.Body.Close()

            // Treat non-200 status codes (1xx, 3xx, 4xx, 5xx) as errors
            if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
                return fmt.Errorf("non-200 status: %d from %s", resp.StatusCode, u)
            }

            select {
            case <-time.After(time.Millisecond):
                // don't block forever, just enough to check context.Done
            case <-ctx.Done():
                // This block is executed if the context is cancelled before the work is finished.
                fmt.Printf("Goroutine %s cancelled\n", u)
                return ctx.Err()
            }

            fmt.Printf("Successfully fetched: %s\n", u)
            return nil
        })
    }

    // Wait for all goroutines to complete or any to return an error
    if err := g.Wait(); err != nil {
        fmt.Printf("Error: %v\n", err)
    } else {
        fmt.Println("All fetches succeeded.")
    }
}
```

The output of the program is non-deterministic. So, for example, you might see:

```
Goroutine for  https://http-me.fastly.dev/status=500
Goroutine for  https://http-me.fastly.dev/?wait=1000&status=202
Goroutine for  https://http-me.fastly.dev/status=200
Goroutine for  https://http-me.fastly.dev/?wait=1000&status=201
Error: non-200 status: 500 from https://http-me.fastly.dev/status=500
```

Notice in the above output no goroutine is successful. The error is picked up
from the `status=500` URL and so all other goroutines are cancelled immediately.

Where as if you run the code again you might see this:

```
Goroutine for  https://http-me.fastly.dev/status=500
Goroutine for  https://http-me.fastly.dev/status=200
Goroutine for  https://http-me.fastly.dev/?wait=1000&status=201
Goroutine for  https://http-me.fastly.dev/?wait=1000&status=202
Goroutine https://http-me.fastly.dev/status=200 cancelled
Error: non-200 status: 500 from https://http-me.fastly.dev/status=500
```

Notice in the above output we now see one of the goroutines were explicitly
cancelled, i.e. the `select` statement had time to run the check on the
`ctx.Done()` call.

If we want to see any of these goroutines fail we need to play around with the
delays to give one of them a chance of succeeding. So something like this would
do it:

```go
urls := []string{
    "https://http-me.fastly.dev/status=200",
    "https://http-me.fastly.dev/?wait=1000&status=201",
    "https://http-me.fastly.dev/?wait=1000&status=202",
    "https://http-me.fastly.dev/?wait=500&status=500",
}
```

Notice we've set a smaller delay (`wait=500`) on the `status=500` URL.

When running the code again we'd see something like this:

```
Goroutine for  https://http-me.fastly.dev/?wait=500&status=500
Goroutine for  https://http-me.fastly.dev/?wait=1000&status=201
Goroutine for  https://http-me.fastly.dev/status=200
Goroutine for  https://http-me.fastly.dev/?wait=1000&status=202
Successfully fetched: https://http-me.fastly.dev/status=200
Error: non-200 status: 500 from https://http-me.fastly.dev/?wait=500&status=500
```

Notice in the above output we now see at least one successful case.

> [!TIP]
> You can limit the number of active goroutines in the group with:\
> `g.SetLimit(10)`\
> Calls to `g.Go` will block until an active goroutine can be added.

## Mutex

Go's `sync.Mutex` provides mutual exclusion, allowing only one goroutine at a
time to access a critical section of code.\
While `sync.RWMutex` is a variant that allows multiple readers or a single
writer but not both.

Use `sync.Mutex` or `sync.RWMutex` when you need fine-grained control over data
access and want to protect shared data from race conditions.

In the below example `sync.Mutex` ensures that only one goroutine modifies
`counter.value` at a time, preventing race conditions:

```go
package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    mu    sync.Mutex
    value int
}

func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func main() {
    counter := &Counter{}
    var wg sync.WaitGroup

    for range 10 {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }

    wg.Wait()
    
    fmt.Println("Final Counter:", counter.Value())
}
```

https://play.golang.com/p/VIbNkQaPfZI

> [!TIP]
> The use of `defer` in the `Increment()` method is redundant.\
> It's more useful for long complex functions where errors can occur.\
> Here I should have just placed the `Unlock()` call after the `c.value++`.

## Atomic Operations

<!--alex ignore simple-->

The `sync/atomic` package provides low-level atomic operations on simple types
like integers and pointers, ensuring operations are performed atomically.

Use atomic operations when you need lock-free synchronization for counters or
flags, but only for basic integer or pointer manipulations.

In the following example, `atomic.AddInt32` safely increments `counter` without
a lock, making it ideal for high-performance counters or flags:

```go
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {
    var counter int32
    var wg sync.WaitGroup

    for range 10 {
        wg.Add(1)
        go func() {
            defer wg.Done()
            atomic.AddInt32(&counter, 1)
        }()
    }

    wg.Wait()
    
    fmt.Println("Final Counter:", atomic.LoadInt32(&counter))
}
```

https://play.golang.com/p/qsRPoC4GPNv

## Once

Go's `sync.Once` ensures that a function only executes once, even if multiple
goroutines attempt to run it.

Use `sync.Once` when you need to perform a one-time initialization, such as
setting up a shared resource.

In the following example, even though multiple goroutines call
`once.Do(initialize)`, `initialize` only runs once. This is especially useful
for lazy initialization of global resources:

```go
package main

import (
    "fmt"
    "sync"
)

var once sync.Once

func initialize() {
    fmt.Println("Initializing...")
}

func main() {
    var wg sync.WaitGroup
    for range 3 {
        wg.Add(1)
        go func() {
            defer wg.Done()
            once.Do(initialize)
        }()
    }
    wg.Wait()
}
```

https://play.golang.com/p/5J1ApCPc1iU

## Context

Go's `context.Context` is not a strict concurrency primitive but is widely used
to manage timeouts, cancellations, and deadlines across goroutines.

> [!TIP]
> You've seen context used in the [Error Groups](#error-groups) example earlier.

Use `context.Context` to signal cancellation or control the lifespan of
goroutines, particularly in networked or long-running tasks.

In the following example, `context.WithTimeout` creates a context that
automatically cancels after 1 second, which is useful for controlling tasks that
may hang or take too long:

```go
package main

import (
    "context"
    "fmt"
    "time"
)

func process(ctx context.Context) {
    select {
    case <-time.After(2 * time.Second): // use time.After to simulate slow operation
        fmt.Println("Completed work")
    case <-ctx.Done():
        fmt.Println("Work cancelled")
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()

    go process(ctx)

    time.Sleep(2 * time.Second)
}
```

https://play.golang.com/p/diSmAp0SJkg

## Map

The `sync` package has a [`Map`](https://pkg.go.dev/sync#Map) type which you
will likely not need to use.

The Go authors even document it as such...

> [!CITE]
> The Map type is specialized. Most code should use a plain Go map instead, with
> separate locking or coordination, for better type safety and to make it easier
> to maintain other invariants along with the map content.
>
> The Map type is optimized for two common use cases: (1) when the entry for a
> given key is only ever written once but read many times, as in caches that
> only grow, or (2) when multiple goroutines read, write, and overwrite entries
> for disjoint sets of keys. In these two cases, use of a Map may significantly
> reduce lock contention compared to a Go map paired with a separate Mutex or
> RWMutex.

## Conditions

Go's `sync.Cond` is probably the most confusing and hard to use of all
concurrency tools (hence I've left it till last). It's used for signaling
between goroutines. It lets goroutines wait until they are notified to continue,
which is useful when one goroutine needs to wait for a certain condition to be
met by another goroutine.

Use `sync.Cond` when you need goroutines to wait for certain conditions, such as
producer-consumer scenarios.

In the following example, `cond.Wait()` blocks until `cond.Signal()` is called.
It's useful for waiting on complex conditions where other primitives like `chan`
may not be ideal:

> [!IMPORTANT]
> The call to `cond.L.Lock()` in the main goroutine just before `for !ready` is
> required, otherwise you'll get the error `fatal error: sync: unlock of unlocked mutex`. This is because `cond.Wait()` expects the caller to hold the
> lock before calling `Wait()` (see [this
> video](https://youtu.be/VAV2h1GdgE0?si=cqErfqLXnWOgmcsh) for details). Once
> `Wait()` returns, it reacquires the lock, ensuring the main goroutine can
> safely check ready and exit the loop.

```go
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    ready := false

    go func() {
        time.Sleep(1 * time.Second)
        cond.L.Lock()
        ready = true
        cond.L.Unlock()
        cond.Signal() // Notify one waiting goroutine
    }()

    cond.L.Lock()
    for !ready {
        cond.Wait() // Wait until condition is met
    }
    fmt.Println("Ready is true, proceeding.")
    cond.L.Unlock()
}
```

https://play.golang.com/p/n_txZaH7lPA

In the following example we have multiple worker goroutines waiting on a shared
condition to be "notified." We'll see how both `.Signal()` and `.Broadcast()`
work when notifying waiting goroutines:

```go
package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, cond *sync.Cond) {
    cond.L.Lock() // Lock the condition
    defer cond.L.Unlock()

    fmt.Printf("Worker %d is waiting\n", id)
    cond.Wait() // Wait for a signal or broadcast
    fmt.Printf("Worker %d is proceeding\n", id)
}

func main() {
    lock := &sync.Mutex{}
    cond := sync.NewCond(lock)

    // Start multiple worker goroutines that will wait on the condition
    for i := range 3 {
        go worker(i, cond)
    }

    // Allow time for all workers to start and wait
    time.Sleep(1 * time.Second)

    // Use Signal to wake up one goroutine
    fmt.Println("Notifying one worker")
    cond.Signal() // Notifies one waiting worker
    time.Sleep(1 * time.Second)

    // Use Broadcast to wake up all remaining goroutines
    fmt.Println("Broadcasting to all remaining workers")
    cond.Broadcast() // Notifies all remaining waiting workers

    // Allow time for all goroutines to complete
    time.Sleep(2 * time.Second)
    fmt.Println("Main function exiting.")
}
```

https://play.golang.com/p/41ibtmUmKaN

Each worker goroutine locks the condition, calls `cond.Wait()`, and then waits.
This releases the lock (as we now understand from the earlier IMPORTANT note,
see above if you missed it), allowing other goroutines to call `Wait()` as well.

The `cond.Signal()` call in the `main` function wakes up one of the waiting
goroutines, allowing it to proceed.

After a short delay, `cond.Broadcast()` wakes up all remaining waiting
goroutines, allowing them to proceed simultaneously.

This is useful for scenarios where multiple tasks need to wait for a common
event or state change to proceed.

> [!INFO]
> Notifications are not ordered.\
> Any one of the waiting goroutines can be chosen to proceed first.\
> Broadcast ensures that all waiting goroutines eventually proceed.

Now you might be thinking "hmm, it looks like I could use channels instead and
they're more idiomatic".

Well, here are some reasons for why you might need to choose `sync.Cond` over
channels:

1. Fine-Grained Control: `sync.Cond` allows precise control over waiting and
   signaling, suitable for cases where specific conditions must be checked or
   managed.

1. Broadcast Capability: Broadcasting to multiple goroutines is straightforward
   with `sync.Cond`, whereas channels require individual signaling, which can be
   inefficient.

1. Reduced Complexity for State-Based Waiting: `sync.Cond` is ideal for
   situations where goroutines need to wait for specific conditions to be true,
   rather than for individual values or events passed through a channel.

1. Avoiding Channel Overhead: Channels introduce buffering and management
   overhead, especially with many goroutines, whereas `sync.Cond` relies on a
   shared mutex with a direct wait/notify mechanism, which is often faster.

In summary, `sync.Cond` is best suited for use cases that involve waiting for
and signaling conditions, especially when you need more control over
synchronization and when goroutines are reacting to shared state changes rather
than discrete message passing.

## Real Examples

Here are some real world application code I've had to write.

### Bulk Deletion API

Below is a 'real world' example where we need to delete a bunch of keys from a
data store.

The API that is provided, does not support bulk deleting of keys.

The API does provide an endpoint that lets us paginate the available keys, and
we then need to stream that information as quickly as possible using a pool of
goroutines coordinated with both channels and wait groups.

It's a nice example because it brings together several different concurrency
primitives (goroutines, channels, select, wait groups, atomic operations).

> [!TIP]
> Keep reading after the code snippet for a brief breakdown of what the code
> does.

```go
const (
    // PoolSize is the goroutine/thread-pool size.
    // Each pool will take a 'key' from a channel and issue a DELETE request.
    const PoolSize int = 100

    // MaxErrors is the maximum number of errors we'll allow before
    // stopping the goroutines from executing.
    const MaxErrors int = 100
)

func DeleteAllKeys(storeID string, out io.Writer) error {
    // Create a 'spinner' which helps visually update the user on the progress.
    spinnerMessage := "Deleting keys"
    var spinner text.Spinner

    var err error
    spinner, err = text.NewSpinner(out)
    if err != nil {
        return err
    }
    err = spinner.Start()
    if err != nil {
        return err
    }
    spinner.Message(spinnerMessage + "...")

    // Create a keys paginator.
    p := fastly.NewListKVStoreKeysPaginator(&fastly.ListKVStoreKeysInput{
        StoreID: storeID,
    })

    // Channel for tracking errors when deleting keys.
    errorsCh := make(chan string, MaxErrors)
    
    // Channel for tracking keys to delete.
    keysCh := make(chan string, 1000) // this number correlates to the pagination 
                                      // page size defined by the API

    var (
        // Track the number of keys deleted.
        deleteCount atomic.Uint64
        
        // Track which keys failed to be deleted.
        failedKeys []string
        
        // This will help us wait for all goroutines to complete.
        wg sync.WaitGroup
    )

    // We have two separate execution flows happening at once:
    //
    // 1. Pushing keys from pagination data into a key channel.
    // 2. Pulling keys from key channel and issuing API DELETE call.
    //
    // We have a limit on the number of errors. Once that limit is reached we'll
    // stop the second set of goroutines processing the delete operation.

    wg.Add(1)
    go func() {
        defer wg.Done()
        defer close(keysCh)
        for p.Next() {
            for _, key := range p.Keys() {
                keysCh <- key
            }
        }
    }()

    // Limit the number of goroutines spun up to the specified pool size.
    for range PoolSize {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for key := range keysCh {
                err := fastly.DeleteKVStoreKey(
                    &fastly.DeleteKVStoreKeyInput{StoreID: c.StoreID, Key: key},
                )
                if err != nil {
                    select {
                    case errorsCh <- key:
                    default:
                        return // channel is full (i.e. MaxErrors limit reached)
                    }
                }
                // Update the TUI (Terminal UI) to reflect the current number of 
                // deleted keys.
                f := strconv.FormatUint(deleteCount.Add(1), 10)
                spinner.Message(spinnerMessage + "..." + f)
            }
        }()
    }

    wg.Wait()

    close(errorsCh)
    for err := range errorsCh {
        failedKeys = append(failedKeys, err)
    }

    spinnerMessage = "Deleted keys: " + strconv.FormatUint(deleteCount.Load(), 10)

    if len(failedKeys) > 0 {
        spinner.StopFailMessage(spinnerMessage)
        err := spinner.StopFail()
        if err != nil {
            return fmt.Errorf("failed to stop spinner: %w", err)
        }
        return fmt.Errorf("failed to delete %d keys", len(failedKeys))
    }

    spinner.StopMessage(spinnerMessage)
    if err := spinner.Stop(); err != nil {
        return fmt.Errorf("failed to stop spinner: %w", err)
    }

    text.Success(out, "\nDeleted all keys from KV Store '%s'", c.StoreID)
    return nil
}
```

So you can see we have multiple goroutines spun up (and we wait for them using
`sync.WaitGroup`):

- The first goroutine is iterating over the pagination data and pushing data
  into a channel.
- The other goroutines (we have a limit of `PoolSize`) are pulling data from the
  channel and issuing key deletion API calls.

We also use the `select` statement to control whether we stop the goroutines
processing the deletion operations. The way we do this is to define another
channel (`errorsCh`) with a buffer size of `MaxErrors`, and then every time we
get an error we push the error into that channel. If the channel becomes full
(which it will do eventually because there's nothing pulling messages from the
`errorsCh` channel), then the `select` statement will fallthrough to its
`default` block and we'll return the goroutine (causing it to stop running)

The last interesting concurrency primitive we use is `atomic.Uint64` for
accurately tracking the number of deleted keys. We use its `Add()` method within
the goroutine to safely increment the counter, and then at the end of the
function we use its `Load()` method to safely extract the final value.

#### UPDATE 2024.11.01

Below is a modification to the 'real world' example code.

The difference is in how errors are handled. In the original code we would stop
processing the deletion of keys when an error threshold was reached. But in the
following version we don't want that to happen. Instead we want to _keep_
processing errors, but that introduces a new challenge related to channel buffer
size (e.g. we can't set an infinite amount of memory for a channel) and so at
some point the errors channel is going to get filled up and we need to decide
what to do.

In this case we can realistically only drop errors. But we can alleviate that a
little bit by trying to pull messages out from the errors channel and appending
them to the `failedKeys` slice concurrently. This will make space in the
channel's buffer and so we can push more errors into it if they occur.

The problem with pulling errors out of the errors channel concurrently is we
need to `range` over the channel, but that will cause a deadlock if we don't
handle it correctly (because ranging over a channel only terminates the loop
when the channel is closed). So we need to introduce not one `sync.WaitGroup`
but _two_! See the code comments below for more details...

<!-- markdownlint-disable -->

```go
// deleteAllKVStoreKeys deletes all keys within the specified KV Store.
func deleteAllKVStoreKeys(
    conn *gofastly.Client, 
    storeID string, 
    maxErrors, poolSize int
) error {
    p := conn.NewListKVStoreKeysPaginator(&fastly.ListKVStoreKeysInput{
        StoreID: storeID,
    })

    errorsCh := make(chan string, maxErrors)
    keysCh := make(chan string, 1000) // number correlates to pagination page size

    var (
        failedKeys   []string
        mu           sync.Mutex
        wgProcessing sync.WaitGroup
        wgErrorCh    sync.WaitGroup
    )

    // We have three separate execution flows happening at once:
    //
    // 1. Pushing keys from pagination data into a key channel.
    // 2. Pulling keys from error channel and appending to failedKeys slice.
    // 3. Pulling keys from key channel and issuing API DELETE call.
    //
    // The second item is problematic, in that ranging over a channel only
    // terminates when the channel is closed. So we need to ensure we close the
    // errorsCh once we've finished processing the deletion of all the keys.
    //
    // To do that we need two sets of wait groups.
    //
    // The first is wgProcessing which keeps track of all goroutines related to
    // processing the pagination data (e.g. the goroutine ranging over the
    // paginator keys, and the goroutine ranging over the keysCh as part of the
    // poolSize loop).
    //
    // The second wait group is wgErrorCh which tracks when the first
    // (wgProcessing) has completed and then closes errorsCh.

    // The following goroutine finishes once all pagination keys have been
    // processed.
    wgProcessing.Add(1)
    go func() {
        defer wgProcessing.Done()
        defer close(keysCh)
        for p.Next() {
            for _, key := range p.Keys() {
                keysCh <- key
            }
        }
    }()

    // The following goroutine finishes once the errorsCh is closed.
    wgErrorCh.Add(1)
    go func() {
        defer wgErrorCh.Done()
        for err := range errorsCh {
            mu.Lock()
            failedKeys = append(failedKeys, err)
            mu.Unlock()
        }
    }()

    // The following goroutines close once they've pulled all data from keysCh.
    for i := 1; i <= poolSize; i++ {
        wgProcessing.Add(1)
        go func() {
            defer wgProcessing.Done()
            for key := range keysCh {
                err := conn.DeleteKVStoreKey(
                    &fastly.DeleteKVStoreKeyInput{StoreID: storeID, Key: key},
                )
                if err != nil {
                    select {
                    case errorsCh <- key:
                    default:
                        continue // the larger we make maxErrors 
                                 // the less likely we'll drop errors 
                                 // (obviously there's a memory trade-off to be made)
                    }
                }
            }
        }()
    }

    // The following goroutine is closed once the 'processing' goroutines are
    // finished.
    wgErrorCh.Add(1)
    go func() {
        defer wgErrorCh.Done()
        wgProcessing.Wait() // Wait for all deletion and pagination tasks.
        close(errorsCh)
    }()

    // Wait for the error-handling goroutines to finish processing.
    wgErrorCh.Wait()

    if len(failedKeys) > 0 {
        return fmt.Errorf("failed to delete %d keys", len(failedKeys))
    }

    return nil
}
```

<!-- markdownlint-enable -->

### TLS Certificate asynchronous Pipeline

I ended up needing to write some pretty complex asynchronous code with multiple
layers of fan-out requests.

To explain, I was building an API as a thin wrapper around an ACME client for
issuing TLS certificates and I have an asynchronous pipeline with multiple
stages.

In one stage I have an 'order' which contains multiple 'authorization' objects.
These objects are essentially each domain that needs to be validated (i.e. the
server needs to confirm the customer owns the domains).

The server issues multiple challenges for each authorization. There are three
challenge types: dns, http, tls. The customer just needs to validate one of
them, but we don't know which one they'll use so we try them all. Hence we need
to fan-out each challenge request concurrently.

We also process the parent authorization (i.e. the domains) concurrently because
we don't want to sequentially process them as that would take a lot longer.

So, as an example, if I (as the customer) had made an API request for a new TLS
certificate that I wanted to have for an apex domain (example.com) and a CNAME
(www.example.com), then we would:

- Have two authorizations (each running concurrently)
- Each authorization would have three challenges (each running concurrently)

The code for that looks like the following:

> [!INFO]
> The reason I'm sharing this code is because it uses lots of different
> concurrency mechanisms to ensure efficient use of memory resources (i.e. we
> need to ensure goroutines finishes more quickly, and actually get cleaned up
> completely and not leak memory).

<!-- markdownlint-disable -->

```go
package order

import (
    "context"
    "errors"
    "fmt"
    "io"
    "log/slog"
    "math"
    "net"
    "net/http"
    "slices"
    "strings"
    "sync"
    "time"

    "github.com/mholt/acmez/v3/acme"

    "github.com/fastly/ascerta/internal/httpx"
    "github.com/fastly/ascerta/internal/log"
)

const (
    // backoffMax is the maximum number to backoff by (i.e. 2 hours in minutes).
    backoffMax = 120
    // backoffMultiplier is the multiplier used for exponential backoff.
    backoffMultiplier = 2
    // checkTimeout short-circuits an authorization check if it stalls.
    // We don't expect an individual check to take longer than 5 seconds.
    checkTimeout = time.Duration(5) * time.Second
    // maxRetries is the maximum number of times a message can be re-queued.
    maxRetries = 10
)

var (
    // backoff is the duration to backoff by (in minutes).
    // This is a variable so it can be updated for exponential backoff purposes.
    backoff = 5.0

    // supportedChallengeTypes filters unsupported challenge types.
    supportedChallengeTypes = []string{
        acme.ChallengeTypeDNS01,
        acme.ChallengeTypeHTTP01,
    }
)

// consumerChecks takes new certificate orders from the queue and determines
// what pre-flight checks to perform (avoiding bothering a CA unnecessarily).
func consumerChecks() {
    l := log.New()
    ctx := context.Background()

    for o := range ordersCheck {
        sl := l.With(
            slog.Group("retries",
                slog.Int("current", o.retries),
                slog.Int("max", maxRetries),
            ),
        )
        sl.LogAttrs(ctx, slog.LevelInfo, "consumer_check", slog.String("state", "start"))

        if o.retries > maxRetries {
            sl.LogAttrs(ctx, slog.LevelWarn, "consumer_check", slog.String("state", "dead"))
            continue
        }

        // We have one 'authorization' per SAN list entry (i.e. per domain).
        // We then have multiple 'challenges' per authorization to check.
        //
        // We process each authorization asynchronously.
        // We also process each authorization's challenges asynchronously.

        var wg sync.WaitGroup

        for i, a := range o.authorizations {
            // If an order has been placed back on the queue, then we want to
            // avoid having to re-validate authorizations that were already
            // validated successfully.
            if !a.authorized {
                wg.Add(1)
                go func() {
                    defer wg.Done()
                    processAuthorization(ctx, l, i, &o)
                }()
            }
        }

        wg.Wait()

        // We now need to check if any authorizations failed.
        // If so, we need to re-queue the order.
        var (
            requeue bool
            failed  []string
        )
        for _, a := range o.authorizations {
            if !a.authorized {
                failed = append(failed, a.authorization.IdentifierValue())
                requeue = true
            }
        }
        if requeue {
            sl.LogAttrs(ctx, slog.LevelWarn, "consumer_check",
                slog.Any("err", errors.New("not all authorizations were successful")),
                slog.Bool("retry", true),
                slog.Float64("backoff", backoff),
                slog.String("failed", strings.Join(failed, ",")),
                slog.String("state", "re-queued"),
            )
            o.retries++

            time.Sleep(time.Duration(backoff) * time.Minute) // increases exponentially
            backoff = math.Min(backoff*backoffMultiplier, backoffMax)
            go func() {
                ordersCheck <- o // put the order back on the queue for retry.
            }()
            continue
        }

        sl.LogAttrs(ctx, slog.LevelInfo, "consumer_check",
            slog.Any("order", o),
            slog.String("state", "complete"),
        )
        ordersValidate <- o
    }
}

// processAuthorization processes each authorization concurrently. Each
// challenge within the authorization is also processed concurrently.
func processAuthorization(ctx context.Context, l *slog.Logger, i int, o *Order) {
    a := o.authorizations[i]
    sl := l.With(slog.String("authorization", a.authorization.Identifier.Value))

    // We wait for only one goroutine to complete successfully.
    authorized := make(chan acme.Challenge, 1)

    // The use of a cancellable context is to short-circuit running goroutines.
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    // The WaitGroup allows us to verify that all goroutines exit properly.
    var wg sync.WaitGroup

    for _, c := range a.authorization.Challenges {
        if slices.Contains(supportedChallengeTypes, c.Type) {
            wg.Add(1)
            go func(c acme.Challenge) {
                defer wg.Done()
                if err := challengeCheck(ctx, sl, c); err == nil {
                    // The use of select with a cancellable context allows to
                    // solve two issues:
                    //
                    // 1. Signal goroutines to stop if the channel gets blocked.
                    //    This can happen if both dns and http challenges pass.
                    //
                    // 2. Signal goroutines to stop when one challenge passes.
                    //    Allowing us to short-circuit more quickly.
                    //    And avoids waiting for the checkTimeout below.
                    select {
                    case <-ctx.Done():
                    case authorized <- c:
                    }
                }
            }(c)
        }
    }

    // Block until one of the challenges returns successfully.
    // Otherwise we'll timeout and cause a re-queue.
    select {
    case c := <-authorized:
        cancel()  // short-circuit any still running goroutines.
        wg.Wait() // validate we've cleaned up all goroutines.
        sl.LogAttrs(ctx, slog.LevelInfo, "authorization_check",
            slog.Bool("success", true),
            slog.Int("index", i),
            slog.String("validation_method", c.Type),
        )
        o.authorizations[i].authorized = true
        o.authorizations[i].challengeType = c.Type
    case <-time.After(checkTimeout): // in case no challenges are valid
        cancel()  // short-circuit any still running goroutines.
        wg.Wait() // validate we've cleaned up all goroutines.
        sl.LogAttrs(ctx, slog.LevelWarn, "authorization_check", slog.String("reason", "timeout"))
    }
}

// challengeCheck identifies the challenge type and triggers that check.
func challengeCheck(ctx context.Context, l *slog.Logger, c acme.Challenge) error {
    switch c.Type {
    case acme.ChallengeTypeDNS01:
        return checkDNS(ctx, l, c)
    case acme.ChallengeTypeHTTP01:
        return checkHTTP(ctx, l, c)
    }
    return nil
}

// checkDNS looks up the TXT record to see if it contains the correct value. It
// will return nil if the record exists with the correct value.
func checkDNS(ctx context.Context, l *slog.Logger, c acme.Challenge) error {
    // use a resolver to propagate context
    var r net.Resolver
    txtRecords, err := r.LookupTXT(ctx, c.DNS01TXTRecordName())
    if err != nil {
        err := fmt.Errorf("failed to lookup TXT record: %w", err)
        l.LogAttrs(ctx, slog.LevelWarn, "check_dns", slog.Any("err", err))
        return err
    }
    if slices.Contains(txtRecords, c.DNS01KeyAuthorization()) {
        return nil
    }
    err = fmt.Errorf("record exists but does not contain correct value: %s", strings.Join(txtRecords, ","))
    l.LogAttrs(ctx, slog.LevelWarn, "check_dns", slog.Any("err", err))
    return err
}

// checkHTTP looks up the .well-known/acme-challenge/... path to see if it
// exists at that path location and will return nil if it does.
func checkHTTP(ctx context.Context, l *slog.Logger, c acme.Challenge) error {
    var lastErr error

    for _, protocol := range []string{"http", "https"} {
        endpoint := fmt.Sprintf("%s://%s%s", protocol, c.Identifier.Value, c.HTTP01ResourcePath())
        sl := l.With(slog.String("endpoint", endpoint))

        req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
        if err != nil {
            lastErr = fmt.Errorf("failed to create a http.Request for '%s': %w", endpoint, err)
            sl.LogAttrs(ctx, slog.LevelWarn, "check_http", slog.Any("err", lastErr))
            continue
        }

        res, err := httpx.Client.Do(req)
        if err != nil {
            // Checking context cancelled allows us to return faster.
            // i.e. we're either already authorized or we timed out.
            if errors.Is(err, context.Canceled) {
                err := fmt.Errorf("failed to send http.Request: %w", ctx.Err())
                sl.LogAttrs(ctx, slog.LevelWarn, "check_http", slog.Any("err", err))
                return err
            }
            lastErr = fmt.Errorf("failed to send http.Request: %w", err)
            sl.LogAttrs(ctx, slog.LevelWarn, "check_http", slog.Any("err", lastErr))
            continue
        }

        // NOTE: It's unsafe to defer inside a loop so I manually close res.Body
        if res.StatusCode != http.StatusOK {
            res.Body.Close()
            lastErr = fmt.Errorf("invalid http status '%s'", res.Status)
            sl.LogAttrs(ctx, slog.LevelWarn, "check_http", slog.Any("err", lastErr))
            continue
        }

        body, err := io.ReadAll(res.Body)
        if err != nil {
            res.Body.Close()
            lastErr = fmt.Errorf("failed to read response body: %w", err)
            sl.LogAttrs(ctx, slog.LevelWarn, "check_http", slog.Any("err", lastErr))
            continue
        }
        res.Body.Close()

        if string(body) != c.KeyAuthorization {
            lastErr = fmt.Errorf("invalid response body '%s': expected '%s'", body, c.KeyAuthorization)
            sl.LogAttrs(ctx, slog.LevelWarn, "check_http", slog.Any("err", lastErr))
            continue
        }

        return nil // success
    }

    if lastErr != nil {
        return lastErr
    }

    // WARNING: We should never reach this part of the code.
    // We should have already returned a success or tracked an error.
    err := errors.New("unexpected code path reached")
    l.LogAttrs(ctx, slog.LevelError, "check_http", slog.Any("err", err))
    return err
}
```

The `processAuthorization` function specifically might look over engineered but
it's critical that we don't have goroutines hanging around waiting to be
unblocked. So all these concurrency mechanisms are in place to ensure the code
is as robust as possible.

<!-- markdownlint-enable -->

## Reference Material

- [Go Style Guide](../go-style-guide/index.html)
- [go.dev/doc/effective_go](https://go.dev/doc/effective_go#concurrency)
