Go Concurrency Patterns

TOC

Goroutines

The go keyword is used to start a goroutine.

A goroutine is a lightweight, managed thread used by the Go runtime to run functions concurrently.

Unlike OS threads, which have a fixed stack size (often around 1MB), goroutines start with a very small stack, around 2KB, and can grow or shrink dynamically as needed. This makes it possible to run thousands or even millions of goroutines simultaneously, depending on the available memory.

Goroutines are sometimes compared to “green threads,” which are threads that are scheduled in user space rather than by the OS. The problem with green threads is that they may not leverage multiple CPU cores efficiently since they don’t interact directly with the OS’s scheduler.

Goroutines are similar to green threads in that they are scheduled by the Go runtime rather than the OS. However, they differ in a crucial way: Go uses a model called Mscheduling, where the Go runtime maps multiple goroutines (M) onto multiple OS threads (N). This allows the runtime to distribute goroutines across multiple CPU cores when possible, making Go’s concurrency model more efficient and scalable than traditional green threads.

⚠️ IMPORTANT
The following example uses a time.Sleep to wait for the goroutine to finish.
This is done for simplicity. Do NOT use this approach.
I’ll explain alternative options afterwards.

package main

import (
	"fmt"
	"time"
)

func sayHello() {
	fmt.Println("Hello from a goroutine!")
}

func main() {
	go sayHello() // Launches sayHello in a new goroutine

	fmt.Println("Hello from main!")
	time.Sleep(1 * time.Second) // Wait for the goroutine to complete (only for example)
}

https://play.golang.com/p/SdOdZ90-exI

⚠️ NOTE
In Go, the main function is effectively the “initial” goroutine.
When a Go program starts, the Go runtime creates a goroutine to run main.
This main goroutine can then spawn additional goroutines as needed.

Channels

Channels in Go are a powerful way to communicate between goroutines and to synchronize them. They allow you to send and receive values across goroutines, and they help avoid race conditions by enabling safe data sharing.

⚠️ IMPORTANT
In the following example the <-ch BLOCKS the main thread.
This can cause a deadlock if we don’t have a goroutine running to unblock it.

package main

import (
	"fmt"
)

func sendMessage(ch chan string) {
	ch <- "Hello from a goroutine!" // Send a message to the channel
}

func main() {
	// Create a new channel of type string
	ch := make(chan string)

	// Start a goroutine to send a message
	go sendMessage(ch)

	// Receive the message from the channel
	msg := <-ch
	fmt.Println(msg) // Output: Hello from a goroutine!
}

https://play.golang.com/p/2Qn_NacVw-0

⚠️ IMPORTANT
The most crucial best practice is to close the channel from the sender side, not the receiver.
The sender is the goroutine that writes data to the channel.
This signals to the receiver that no more values will be sent.

Select Statement

The select statement is used to wait on multiple channel operations. It blocks until one of its cases can proceed, which makes it essential for handling multiple asynchronous tasks.

Use select when you have multiple channels to listen to, and you want to respond to whichever channel receives data first.

⚠️ NOTE
In the following example, the first goroutine uses a time.Sleep.
This is to simulate the operation taking a long time.
It results in the select pulling a value from the second goroutine.

package main

import (
	"fmt"
	"time"
)

func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)

	go func() {
		time.Sleep(1 * time.Second)
		ch1 <- "result from ch1"
	}()

	go func() {
		ch2 <- "result from ch2"
	}()

	select {
	case msg1 := <-ch1:
		fmt.Println("Received:", msg1)
	case msg2 := <-ch2:
		fmt.Println("Received:", msg2)
	}
}

https://play.golang.com/p/HXe-bZ__EEy

A common use case for select is to timeout a potential deadlock:

⚠️ NOTE
In the following example we use time.After to cause a timeout.

package main

import (
	"fmt"
	"time"
)

func main() {
	// Create an unbuffered channel
	ch := make(chan string)

	// Start a goroutine that simulates a delayed send
	go func() {
		time.Sleep(3 * time.Second)   // Simulate a delay
		ch <- "Hello from goroutine!" // Send a message after delay
	}()

	select {
	case msg := <-ch:
		fmt.Println("Received:", msg)
	case <-time.After(2 * time.Second): // Timeout after 2 seconds
		fmt.Println("Timeout! No message received.")
	}
}

https://play.golang.com/p/6BMPeUKdqkg

Wait Groups

A sync.WaitGroup waits for a collection of goroutines to finish. It helps coordinate a group of goroutines and ensures the program waits until all of them have completed before proceeding.

Use a WaitGroup when you need to wait for multiple goroutines to finish before moving on.

In this example, a sync.WaitGroup is used to wait for three goroutines to complete. Each goroutine represents a worker, and each one calls wg.Done() to signal that it’s finished. The main function calls wg.Wait() to block until all workers are done.

package main

import (
	"fmt"
	"sync"
)

func worker(id int, wg *sync.WaitGroup) {
	defer wg.Done() // Decrement counter when goroutine completes
	fmt.Printf("Worker %d starting\n", id)
	fmt.Printf("Worker %d done\n", id)
}

func main() {
	var wg sync.WaitGroup

	for i := range 3 {
		wg.Add(1) // Track each goroutine started
		go worker(i, &wg)
	}

	// Wait for all goroutines to finish
	wg.Wait()

	fmt.Println("All workers done.")
}

https://play.golang.com/p/LhEdSQIPp1R

Mutex

Go’s sync.Mutex provides mutual exclusion, allowing only one goroutine at a time to access a critical section of code. While sync.RWMutex is a variant that allows multiple readers or a single writer but not both.

Use sync.Mutex or sync.RWMutex when you need fine-grained control over data access and want to protect shared data from race conditions.

In the below example sync.Mutex ensures that only one goroutine modifies counter.value at a time, preventing race conditions:

package main

import (
	"fmt"
	"sync"
)

type Counter struct {
	mu    sync.Mutex
	value int
}

func (c *Counter) Increment() {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.value++
}

func (c *Counter) Value() int {
	c.mu.Lock()
	defer c.mu.Unlock()
	return c.value
}

func main() {
	counter := &Counter{}
	var wg sync.WaitGroup

	for range 10 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			counter.Increment()
		}()
	}

	wg.Wait()
	
	fmt.Println("Final Counter:", counter.Value())
}

https://play.golang.com/p/VIbNkQaPfZI

⚠️ NOTE
The use of defer in the Increment() method is redundant.
It’s more useful for long complex functions where errors can occur.
Here I should have just placed the Unlock() call after the c.value++.

Conditions

Go’s sync.Cond is used for signaling between goroutines. It lets goroutines wait until they are notified to continue, which is useful when one goroutine needs to wait for a certain condition to be met by another goroutine.

Use sync.Cond when you need goroutines to wait for certain conditions, such as producer-consumer scenarios.

In the following example, cond.Wait() blocks until cond.Signal() is called. It’s useful for waiting on complex conditions where other primitives like chan may not be ideal:

⚠️ IMPORTANT
The call to cond.L.Lock() in the main goroutine just before for !ready is required, otherwise you’ll get the error fatal error: sync: unlock of unlocked mutex. This is because cond.Wait() expects the caller to hold the lock before calling Wait() (see this video for details). Once Wait() returns, it reacquires the lock, ensuring the main goroutine can safely check ready and exit the loop.

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var mu sync.Mutex
	cond := sync.NewCond(&mu)
	ready := false

	go func() {
		time.Sleep(1 * time.Second)
		cond.L.Lock()
		ready = true
		cond.L.Unlock()
		cond.Signal() // Notify one waiting goroutine
	}()

	cond.L.Lock()
	for !ready {
		cond.Wait() // Wait until condition is met
	}
	fmt.Println("Ready is true, proceeding.")
	cond.L.Unlock()
}

https://play.golang.com/p/n_txZaH7lPA

In the following example we have multiple worker goroutines waiting on a shared condition to be “notified.” We’ll see how both .Signal() and .Broadcast() work when notifying waiting goroutines:

package main

import (
	"fmt"
	"sync"
	"time"
)

func worker(id int, cond *sync.Cond) {
	cond.L.Lock() // Lock the condition
	defer cond.L.Unlock()

	fmt.Printf("Worker %d is waiting\n", id)
	cond.Wait() // Wait for a signal or broadcast
	fmt.Printf("Worker %d is proceeding\n", id)
}

func main() {
	lock := &sync.Mutex{}
	cond := sync.NewCond(lock)

	// Start multiple worker goroutines that will wait on the condition
	for i := range 3 {
		go worker(i, cond)
	}

	// Allow time for all workers to start and wait
	time.Sleep(1 * time.Second)

	// Use Signal to wake up one goroutine
	fmt.Println("Notifying one worker")
	cond.Signal() // Notifies one waiting worker
	time.Sleep(1 * time.Second)

	// Use Broadcast to wake up all remaining goroutines
	fmt.Println("Broadcasting to all remaining workers")
	cond.Broadcast() // Notifies all remaining waiting workers

	// Allow time for all goroutines to complete
	time.Sleep(2 * time.Second)
	fmt.Println("Main function exiting.")
}

https://play.golang.com/p/41ibtmUmKaN

Each worker goroutine locks the condition, calls cond.Wait(), and then waits. This releases the lock (as we now understand from the earlier IMPORTANT note, see above if you missed it), allowing other goroutines to call Wait() as well.

The cond.Signal() call in the main function wakes up one of the waiting goroutines, allowing it to proceed.

After a short delay, cond.Broadcast() wakes up all remaining waiting goroutines, allowing them to proceed simultaneously.

This is useful for scenarios where multiple tasks need to wait for a common event or state change to proceed.

⚠️ IMPORTANT
Notifications are not ordered.
Any one of the waiting goroutines can be chosen to proceed first.
Broadcast ensures that all waiting goroutines eventually proceed.

Now you might be thinking “hmm, it looks like I could use channels instead and they’re more idiomatic”.

Well, here are some reasons for why you might need to choose sync.Cond over channels:

  1. Fine-Grained Control: sync.Cond allows precise control over waiting and signaling, suitable for cases where specific conditions must be checked or managed.

  2. Broadcast Capability: Broadcasting to multiple goroutines is straightforward with sync.Cond, whereas channels require individual signaling, which can be inefficient.

  3. Reduced Complexity for State-Based Waiting: sync.Cond is ideal for situations where goroutines need to wait for specific conditions to be true, rather than for individual values or events passed through a channel.

  4. Avoiding Channel Overhead: Channels introduce buffering and management overhead, especially with many goroutines, whereas sync.Cond relies on a shared mutex with a direct wait/notify mechanism, which is often faster.

In summary, sync.Cond is best suited for use cases that involve waiting for and signaling conditions, especially when you need more control over synchronization and when goroutines are reacting to shared state changes rather than discrete message passing.

Atomic Operations

The sync/atomic package provides low-level atomic operations on simple types like integers and pointers, ensuring operations are performed atomically.

Use atomic operations when you need lock-free synchronization for counters or flags, but only for basic integer or pointer manipulations.

In the following example, atomic.AddInt32 safely increments counter without a lock, making it ideal for high-performance counters or flags:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

func main() {
	var counter int32
	var wg sync.WaitGroup

	for range 10 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			atomic.AddInt32(&counter, 1)
		}()
	}

	wg.Wait()
	
	fmt.Println("Final Counter:", atomic.LoadInt32(&counter))
}

https://play.golang.com/p/qsRPoC4GPNv

Once

Go’s sync.Once ensures that a function only executes once, even if multiple goroutines attempt to run it.

Use sync.Once when you need to perform a one-time initialization, such as setting up a shared resource.

In the following example, even though multiple goroutines call once.Do(initialize), initialize only runs once. This is especially useful for lazy initialization of global resources:

package main

import (
	"fmt"
	"sync"
)

var once sync.Once

func initialize() {
	fmt.Println("Initializing...")
}

func main() {
	var wg sync.WaitGroup
	for range 3 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			once.Do(initialize)
		}()
	}
	wg.Wait()
}

https://play.golang.com/p/5J1ApCPc1iU

Context

Go’s context.Context is not a strict concurrency primitive but is widely used to manage timeouts, cancellations, and deadlines across goroutines.

Use context.Context to signal cancellation or control the lifespan of goroutines, particularly in networked or long-running tasks.

In the following example, context.WithTimeout creates a context that automatically cancels after 1 second, which is useful for controlling tasks that may hang or take too long:

package main

import (
	"context"
	"fmt"
	"time"
)

func process(ctx context.Context) {
	select {
	case <-time.After(2 * time.Second): // use time.After to simulate slow operation
		fmt.Println("Completed work")
	case <-ctx.Done():
		fmt.Println("Work cancelled")
	}
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

	go process(ctx)

	time.Sleep(2 * time.Second)
}

https://play.golang.com/p/diSmAp0SJkg

Map

The sync package has a Map type which you will likely not need to use.

The Go authors even document it as such…

The Map type is specialized. Most code should use a plain Go map instead, with separate locking or coordination, for better type safety and to make it easier to maintain other invariants along with the map content.

The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a Go map paired with a separate Mutex or RWMutex.

Real Examples

Below is a ‘real world’ example where we need to delete a bunch of keys from a data store.

The API that is provided, does not support bulk deleting of keys.

The API does provide an endpoint that lets us paginate the available keys, and we then need to stream that information as quickly as possible using a pool of goroutines coordinated with both channels and wait groups.

It’s a nice example because it brings together several different concurrency primitives (goroutines, channels, select, wait groups, atomic operations).

💡 TIP
Keep reading after the code snippet for a brief breakdown of what the code does.

const (
	// PoolSize is the goroutine/thread-pool size.
	// Each pool will take a 'key' from a channel and issue a DELETE request.
	const PoolSize int = 100

	// MaxErrors is the maximum number of errors we'll allow before
	// stopping the goroutines from executing.
	const MaxErrors int = 100
)

func DeleteAllKeys(storeID string, out io.Writer) error {
	// Create a 'spinner' which helps visually update the user on the progress.
	spinnerMessage := "Deleting keys"
	var spinner text.Spinner

	var err error
	spinner, err = text.NewSpinner(out)
	if err != nil {
		return err
	}
	err = spinner.Start()
	if err != nil {
		return err
	}
	spinner.Message(spinnerMessage + "...")

	// Create a keys paginator.
	p := fastly.NewListKVStoreKeysPaginator(&fastly.ListKVStoreKeysInput{
		StoreID: storeID,
	})

	// Channel for tracking errors when deleting keys.
	errorsCh := make(chan string, MaxErrors)
	
	// Channel for tracking keys to delete.
	keysCh := make(chan string, 1000) // this number correlates to the pagination 
	                                  // page size defined by the API

	var (
		// Track the number of keys deleted.
		deleteCount atomic.Uint64
		
		// Track which keys failed to be deleted.
		failedKeys []string
		
		// This will help us wait for all goroutines to complete.
		wg sync.WaitGroup
	)

	// We have two separate execution flows happening at once:
	//
	// 1. Pushing keys from pagination data into a key channel.
	// 2. Pulling keys from key channel and issuing API DELETE call.
	//
	// We have a limit on the number of errors. Once that limit is reached we'll
	// stop the second set of goroutines processing the delete operation.

	wg.Add(1)
	go func() {
		defer wg.Done()
		defer close(keysCh)
		for p.Next() {
			for _, key := range p.Keys() {
				keysCh <- key
			}
		}
	}()

	// Limit the number of goroutines spun up to the specified pool size.
	for range PoolSize {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for key := range keysCh {
				err := fastly.DeleteKVStoreKey(
					&fastly.DeleteKVStoreKeyInput{StoreID: c.StoreID, Key: key},
				)
				if err != nil {
					select {
					case errorsCh <- key:
					default:
						return // channel is full (i.e. MaxErrors limit reached)
					}
				}
				// Update the TUI (Terminal UI) to reflect the current number of 
				// deleted keys.
				f := strconv.FormatUint(deleteCount.Add(1), 10)
				spinner.Message(spinnerMessage + "..." + f)
			}
		}()
	}

	wg.Wait()

	close(errorsCh)
	for err := range errorsCh {
		failedKeys = append(failedKeys, err)
	}

	spinnerMessage = "Deleted keys: " + strconv.FormatUint(deleteCount.Load(), 10)

	if len(failedKeys) > 0 {
		spinner.StopFailMessage(spinnerMessage)
		err := spinner.StopFail()
		if err != nil {
			return fmt.Errorf("failed to stop spinner: %w", err)
		}
		return fmt.Errorf("failed to delete %d keys", len(failedKeys))
	}

	spinner.StopMessage(spinnerMessage)
	if err := spinner.Stop(); err != nil {
		return fmt.Errorf("failed to stop spinner: %w", err)
	}

	text.Success(out, "\nDeleted all keys from KV Store '%s'", c.StoreID)
	return nil
}

So you can see we have multiple goroutines spun up (and we wait for them using sync.WaitGroup):

  • The first goroutine is iterating over the pagination data and pushing data into a channel.
  • The other goroutines (we have a limit of PoolSize) are pulling data from the channel and issuing key deletion API calls.

We also use the select statement to control whether we stop the goroutines processing the deletion operations. The way we do this is to define another channel (errorsCh) with a buffer size of MaxErrors, and then every time we get an error we push the error into that channel. If the channel becomes full (which it will do eventually because there’s nothing pulling messages from the errorsCh channel), then the select statement will fallthrough to its default block and we’ll return the goroutine (causing it to stop running)

The last interesting concurrency primitive we use is atomic.Uint64 for accurately tracking the number of deleted keys. We use its Add() method within the goroutine to safely increment the counter, and then at the end of the function we use its Load() method to safely extract the final value.

UPDATE 2024.11.01

Below is a modification to the ‘real world’ example code.

The difference is in how errors are handled. In the original code we would stop processing the deletion of keys when an error threshold was reached. But in the following version we don’t want that to happen. Instead we want to keep processing errors, but that introduces a new challenge related to channel buffer size (e.g. we can’t set an infinite amount of memory for a channel) and so at some point the errors channel is going to get filled up and we need to decide what to do.

In this case we can realistically only drop errors. But we can alleviate that a little bit by trying to pull messages out from the errors channel and appending them to the failedKeys slice concurrently. This will make space in the channel’s buffer and so we can push more errors into it if they occur.

The problem with pulling errors out of the errors channel concurrently is we need to range over the channel, but that will cause a deadlock if we don’t handle it correctly (because ranging over a channel only terminates the loop when the channel is closed). So we need to introduce not one sync.WaitGroup but two! See the code comments below for more details…

// deleteAllKVStoreKeys deletes all keys within the specified KV Store.
func deleteAllKVStoreKeys(
	conn *gofastly.Client, 
	storeID string, 
	maxErrors, poolSize int
) error {
	p := conn.NewListKVStoreKeysPaginator(&fastly.ListKVStoreKeysInput{
		StoreID: storeID,
	})

	errorsCh := make(chan string, maxErrors)
	keysCh := make(chan string, 1000) // number correlates to pagination page size

	var (
		failedKeys   []string
		mu           sync.Mutex
		wgProcessing sync.WaitGroup
		wgErrorCh    sync.WaitGroup
	)

	// We have three separate execution flows happening at once:
	//
	// 1. Pushing keys from pagination data into a key channel.
	// 2. Pulling keys from error channel and appending to failedKeys slice.
	// 3. Pulling keys from key channel and issuing API DELETE call.
	//
	// The second item is problematic, in that ranging over a channel only
	// terminates when the channel is closed. So we need to ensure we close the
	// errorsCh once we've finished processing the deletion of all the keys.
	//
	// To do that we need two sets of wait groups.
	//
	// The first is wgProcessing which keeps track of all goroutines related to
	// processing the pagination data (e.g. the goroutine ranging over the
	// paginator keys, and the goroutine ranging over the keysCh as part of the
	// poolSize loop).
	//
	// The second wait group is wgErrorCh which tracks when the first
	// (wgProcessing) has completed and then closes errorsCh.

	// The following goroutine finishes once all pagination keys have been
	// processed.
	wgProcessing.Add(1)
	go func() {
		defer wgProcessing.Done()
		defer close(keysCh)
		for p.Next() {
			for _, key := range p.Keys() {
				keysCh <- key
			}
		}
	}()

	// The following goroutine finishes once the errorsCh is closed.
	wgErrorCh.Add(1)
	go func() {
		defer wgErrorCh.Done()
		for err := range errorsCh {
			mu.Lock()
			failedKeys = append(failedKeys, err)
			mu.Unlock()
		}
	}()

	// The following goroutines close once they've pulled all data from keysCh.
	for i := 1; i <= poolSize; i++ {
		wgProcessing.Add(1)
		go func() {
			defer wgProcessing.Done()
			for key := range keysCh {
				err := conn.DeleteKVStoreKey(
					&fastly.DeleteKVStoreKeyInput{StoreID: storeID, Key: key},
				)
				if err != nil {
					select {
					case errorsCh <- key:
					default:
						continue // the larger we make maxErrors 
						         // the less likely we'll drop errors 
						         // (obviously there's a memory trade-off to be made)
					}
				}
			}
		}()
	}

	// The following goroutine is closed once the 'processing' goroutines are
	// finished.
	wgErrorCh.Add(1)
	go func() {
		defer wgErrorCh.Done()
		wgProcessing.Wait() // Wait for all deletion and pagination tasks.
		close(errorsCh)
	}()

	// Wait for the error-handling goroutines to finish processing.
	wgErrorCh.Wait()

	if len(failedKeys) > 0 {
		return fmt.Errorf("failed to delete %d keys", len(failedKeys))
	}

	return nil
}

UPDATE 2025.03.21

I ended up needing to write some pretty complex asynchronous code with multiple layers of fan-out requests.

To explain, I was building an API as a thin wrapper around an ACME client for issuing TLS certificates and I have an asynchronous pipeline with multiple stages.

In one stage I have an ‘order’ which contains multiple ‘authorization’ objects. These objects are essentially each domain that needs to be validated (i.e. the server needs to confirm the customer owns the domains).

The server issues multiple challenges for each authorization. There are three challenge types: dns, http, tls. The customer just needs to validate one of them, but we don’t know which one they’ll use so we try them all. Hence we need to fan-out each challenge request concurrently.

We also process the parent authorization (i.e. the domains) concurrently because we don’t want to sequentially process them as that would take a lot longer.

So, as an example, if I (as the customer) had made an API request for a new TLS certificate that I wanted to have for an apex domain (example.com) and a CNAME (www.example.com), then we would:

  • Have two authorizations (each running concurrently)
  • Each authorization would have three challenges (each running concurrently)

The code for that looks like the following:

💡 The reason I’m sharing this code is because it uses lots of different concurrency mechanisms to ensure efficient use of memory resources (i.e. we need to ensure goroutines finishes more quickly, and actually get cleaned up completely and not leak memory).

package order

import (
	"context"
	"errors"
	"fmt"
	"io"
	"log/slog"
	"math"
	"net"
	"net/http"
	"slices"
	"strings"
	"sync"
	"time"

	"github.com/mholt/acmez/v3/acme"

	"github.com/fastly/ascerta/internal/httpx"
	"github.com/fastly/ascerta/internal/log"
)

const (
	// backoffMax is the maximum number to backoff by (i.e. 2 hours in minutes).
	backoffMax = 120
	// backoffMultiplier is the multiplier used for exponential backoff.
	backoffMultiplier = 2
	// checkTimeout short-circuits an authorization check if it stalls.
	// We don't expect an individual check to take longer than 5 seconds.
	checkTimeout = time.Duration(5) * time.Second
	// maxRetries is the maximum number of times a message can be re-queued.
	maxRetries = 10
)

var (
	// backoff is the duration to backoff by (in minutes).
	// This is a variable so it can be updated for exponential backoff purposes.
	backoff = 5.0

	// supportedChallengeTypes filters unsupported challenge types.
	supportedChallengeTypes = []string{
		acme.ChallengeTypeDNS01,
		acme.ChallengeTypeHTTP01,
	}
)

// consumerChecks takes new certificate orders from the queue and determines
// what pre-flight checks to perform (avoiding bothering a CA unnecessarily).
func consumerChecks() {
	l := log.New()
	ctx := context.Background()

	// TODO: Consider using pointer to Order struct to reduce value copying.
	for o := range ordersCheck {
		sl := l.With(
			slog.Group("retries",
				slog.Int("current", o.retries),
				slog.Int("max", maxRetries),
			),
		)
		sl.LogAttrs(ctx, slog.LevelInfo, "consumer_check", slog.String("state", "start"))

		if o.retries > maxRetries {
			// TODO: send order to a dead letter queue
			sl.LogAttrs(ctx, slog.LevelWarn, "consumer_check", slog.String("state", "dead"))
			continue
		}

		// We have one 'authorization' per SAN list entry (i.e. per domain).
		// We then have multiple 'challenges' per authorization to check.
		//
		// We process each authorization asynchronously.
		// We also process each authorization's challenges asynchronously.

		var wg sync.WaitGroup

		for i, a := range o.authorizations {
			// If an order has been placed back on the queue, then we want to
			// avoid having to re-validate authorizations that were already
			// validated successfully.
			if !a.authorized {
				wg.Add(1)
				go func() {
					defer wg.Done()
					processAuthorization(ctx, l, i, &o)
				}()
			}
		}

		wg.Wait()

		// We now need to check if any authorizations failed.
		// If so, we need to re-queue the order.
		var (
			requeue bool
			failed  []string
		)
		for _, a := range o.authorizations {
			if !a.authorized {
				failed = append(failed, a.authorization.IdentifierValue())
				requeue = true
			}
		}
		if requeue {
			sl.LogAttrs(ctx, slog.LevelWarn, "consumer_check",
				slog.Any("err", errors.New("not all authorizations were successful")),
				slog.Bool("retry", true),
				slog.Float64("backoff", backoff),
				slog.String("failed", strings.Join(failed, ",")),
				slog.String("state", "re-queued"),
			)
			o.retries++

			// TODO: When implementing NSQ use DeferredPublish.
			// https://pkg.go.dev/github.com/nsqio/go-nsq#DeferredPublish
			time.Sleep(time.Duration(backoff) * time.Minute) // increases exponentially: 10, 20, 40, 80, 160, 2hr (x5 until maxRetries) ...etc
			// TODO: Extend the backoff to once a day requeue.
			backoff = math.Min(backoff*backoffMultiplier, backoffMax)
			go func() {
				ordersCheck <- o // put the order back on the queue for retry.
			}()
			continue
		}

		sl.LogAttrs(ctx, slog.LevelInfo, "consumer_check",
			slog.Any("order", o),
			slog.String("state", "complete"),
		)
		ordersValidate <- o
	}
}

// processAuthorization processes each authorization concurrently. Each
// challenge within the authorization is also processed concurrently.
func processAuthorization(ctx context.Context, l *slog.Logger, i int, o *Order) {
	a := o.authorizations[i]
	sl := l.With(slog.String("authorization", a.authorization.Identifier.Value))

	// We wait for only one goroutine to complete successfully.
	authorized := make(chan acme.Challenge, 1)

	// The use of a cancellable context is to short-circuit running goroutines.
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	// The WaitGroup allows us to verify that all goroutines exit properly.
	var wg sync.WaitGroup

	for _, c := range a.authorization.Challenges {
		if slices.Contains(supportedChallengeTypes, c.Type) {
			wg.Add(1)
			go func(c acme.Challenge) {
				defer wg.Done()
				if err := challengeCheck(ctx, sl, c); err == nil {
					// The use of select with a cancellable context allows to
					// solve two issues:
					//
					// 1. Signal goroutines to stop if the channel gets blocked.
					//    This can happen if both dns and http challenges pass.
					//
					// 2. Signal goroutines to stop when one challenge passes.
					//    Allowing us to short-circuit more quickly.
					//    And avoids waiting for the checkTimeout below.
					select {
					case <-ctx.Done():
					case authorized <- c:
					}
				}
			}(c)
		}
	}

	// Block until one of the challenges returns successfully.
	// Otherwise we'll timeout and cause a re-queue.
	select {
	case c := <-authorized:
		cancel()  // short-circuit any still running goroutines.
		wg.Wait() // validate we've cleaned up all goroutines.
		sl.LogAttrs(ctx, slog.LevelInfo, "authorization_check",
			slog.Bool("success", true),
			slog.Int("index", i),
			slog.String("validation_method", c.Type),
		)
		o.authorizations[i].authorized = true
		o.authorizations[i].challengeType = c.Type
	case <-time.After(checkTimeout): // in case no challenges are valid
		cancel()  // short-circuit any still running goroutines.
		wg.Wait() // validate we've cleaned up all goroutines.
		sl.LogAttrs(ctx, slog.LevelWarn, "authorization_check", slog.String("reason", "timeout"))
	}
}

// challengeCheck identifies the challenge type and triggers that check.
func challengeCheck(ctx context.Context, l *slog.Logger, c acme.Challenge) error {
	switch c.Type {
	case acme.ChallengeTypeDNS01:
		return checkDNS(ctx, l, c)
	case acme.ChallengeTypeHTTP01:
		return checkHTTP(ctx, l, c)
	}
	return nil
}

// checkDNS looks up the TXT record to see if it contains the correct value. It
// will return nil if the record exists with the correct value.
func checkDNS(ctx context.Context, l *slog.Logger, c acme.Challenge) error {
	// use a resolver to propagate context
	var r net.Resolver
	txtRecords, err := r.LookupTXT(ctx, c.DNS01TXTRecordName())
	if err != nil {
		err := fmt.Errorf("failed to lookup TXT record: %w", err)
		l.LogAttrs(ctx, slog.LevelWarn, "check_dns", slog.Any("err", err))
		return err
	}
	if slices.Contains(txtRecords, c.DNS01KeyAuthorization()) {
		return nil
	}
	err = fmt.Errorf("record exists but does not contain correct value: %s", strings.Join(txtRecords, ","))
	l.LogAttrs(ctx, slog.LevelWarn, "check_dns", slog.Any("err", err))
	return err
}

// checkHTTP looks up the .well-known/acme-challenge/... path to see if it
// exists at that path location and will return nil if it does.
func checkHTTP(ctx context.Context, l *slog.Logger, c acme.Challenge) error {
	var lastErr error

	for _, protocol := range []string{"http", "https"} {
		endpoint := fmt.Sprintf("%s://%s%s", protocol, c.Identifier.Value, c.HTTP01ResourcePath())
		sl := l.With(slog.String("endpoint", endpoint))

		req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
		if err != nil {
			lastErr = fmt.Errorf("failed to create a http.Request for '%s': %w", endpoint, err)
			sl.LogAttrs(ctx, slog.LevelWarn, "check_http", slog.Any("err", lastErr))
			continue
		}

		res, err := httpx.Client.Do(req)
		if err != nil {
			// Checking context cancelled allows us to return faster.
			// i.e. we're either already authorized or we timed out.
			if errors.Is(err, context.Canceled) {
				err := fmt.Errorf("failed to send http.Request: %w", ctx.Err())
				sl.LogAttrs(ctx, slog.LevelWarn, "check_http", slog.Any("err", err))
				return err
			}
			lastErr = fmt.Errorf("failed to send http.Request: %w", err)
			sl.LogAttrs(ctx, slog.LevelWarn, "check_http", slog.Any("err", lastErr))
			continue
		}

		// NOTE: It's unsafe to defer inside a loop so I manually close res.Body
		if res.StatusCode != http.StatusOK {
			res.Body.Close()
			lastErr = fmt.Errorf("invalid http status '%s'", res.Status)
			sl.LogAttrs(ctx, slog.LevelWarn, "check_http", slog.Any("err", lastErr))
			continue
		}

		body, err := io.ReadAll(res.Body)
		if err != nil {
			res.Body.Close()
			lastErr = fmt.Errorf("failed to read response body: %w", err)
			sl.LogAttrs(ctx, slog.LevelWarn, "check_http", slog.Any("err", lastErr))
			continue
		}
		res.Body.Close()

		if string(body) != c.KeyAuthorization {
			lastErr = fmt.Errorf("invalid response body '%s': expected '%s'", body, c.KeyAuthorization)
			sl.LogAttrs(ctx, slog.LevelWarn, "check_http", slog.Any("err", lastErr))
			continue
		}

		return nil // success
	}

	if lastErr != nil {
		return lastErr
	}

	// WARNING: We should never reach this part of the code.
	// We should have already returned a success or tracked an error.
	err := errors.New("unexpected code path reached")
	l.LogAttrs(ctx, slog.LevelError, "check_http", slog.Any("err", err))
	return err
}

The processAuthorization function specifically might look over engineered but it’s critical that we don’t have goroutines hanging around waiting to be unblocked. So all these concurrency mechanisms are in place to ensure the code is as robust as possible.

Reference Material