Go Concurrency Primitives and Patterns

Discover the different concurrency primitives in Go and learn common patterns to utilize them effectively.

Tuesday, Jun 18, 2024

avatargolang

Concurrency Primitives

A high-level overview of some of the foundational concurrency primitives in Go. If you are unfamiliar with these concepts, I recommended to dive further into each one.

Goroutine

Independent functions that execute on a set of user-space threads, known as 'lightweight threads,' are managed by the Go runtime.

The Go runtime schedules goroutines, executing them efficiently on top of OS threads. It also determines when these goroutines are run.

The Go Scheduler can spin up several thousand goroutines on the same machine.

main.go
package main
 
func main() {
	go func() {
		// inside goroutine
	}()
}

Read more about goroutines and the Go Runtime Scheduler: Go Runtime Scheduler: High Level Concurrency Overview

WaitGroups

Synchronization primitive that Waits for a collection of goroutines to finish. WaitGroups can be used in place of or in conjunction with channels.

main.go
package main
 
import (
    "fmt"
    "sync"
    "time"
)
 
func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // Signal that this goroutine is done
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second) // Simulate work
    fmt.Printf("Worker %d done\n", id)
}
 
func main() {
    var wg sync.WaitGroup
 
    for i := 1; i <= 3; i++ {
        wg.Add(1) // Increment the counter for each new goroutine
        go worker(i, &wg) // Launch the worker goroutine
    }
 
    wg.Wait() // Block until all worker goroutines are done
    fmt.Println("All workers completed")
}

Channels

Channels allow for sharing data between goroutines in a seamless fashion without passing data between functions directly. Channels can also be thought of a FIFO (First-In-First-Out) queue.

ch := make(chan string)
// sender
ch <- "value"
// receiver
fmt.Println(<-ch)

Behaviors:

  • Types: Unbuffered (default) and Buffered Channels
  • Close (close(chan)) channels when the sender is done sending messages. Signals to the receivers that they should stop waiting for messages. Important for receivers when ranging over the channel.
  • Deadlocks can occur when goroutines are waiting for each other to release resources or communicate over channels. In respect to channels, this can be easily triggered by sending a value over an unbuffered channel without a separate goroutine to read from it.

Unbuffered

The Unbuffered channel has no capacity, where the sender and receiver must be ready to communicate at the same time.

  • Behavior: Both send and receive blocks until another goroutine performs the inverse operation.
  • Use Case: Ideal for synchronizing goroutines, ensuring the value sent over the channel is processed immediately.
main.go
package main
 
import (
    "fmt"
)
 
func main() {
    ch := make(chan int)
 
    go func() {
        ch <- 42 // This blocks until the value is received
    }()
 
    value := <-ch // This blocks until a value is sent
    fmt.Println(value) // Output: 42
}

Buffered

Contrary to Unbuffered channels, Buffered channels do have a specified capacity, allowing the sender to fill up to the limited capacity without requiring a receiver to immediately read from the channel.

  • Behavior: A send operation only blocks on a buffered channel if the buffer is full and a receive operation only blocks if the buffer is empty.
  • Use Case: Ideal for scenarios requiring producer and consumers to be decoupled, allowing them to operate at different cadences.
main.go
package main
 
import (
    "fmt"
)
 
func main() {
    ch := make(chan int, 3) // Buffered channel with capacity 3
 
    ch <- 1 // This does not block
    ch <- 2 // This does not block
    ch <- 3 // This does not block
 
    fmt.Println(<-ch) // Output: 1
    fmt.Println(<-ch) // Output: 2
    fmt.Println(<-ch) // Output: 3
}

Select

Used to wait on multiple (known) channel operations. Allows goroutines to choose from multiple channels in which to receive from that are ready for synchronization.

Use Cases:

  • Waiting on multiple channels
  • Timeouts and cancellation handling
  • Non-blocking communication with use of default case to perform sends and receives in a non-blocking manner.
select {
case <- chan1:
    // if chan1 is ready for receiving
case <- chan2:
    // if chan2 is ready for receiving
case <-time.After(3 * time.Second):
    // if timeout exceeds deadline
default:
    // code to execute if none of the channels are ready (optional)
    // can help prevent a deadlock if none of the channels are ready to communicate
}

The select statement evaluates its cases once and then blocks until one of the cases can proceed. If required to repeatedly evaluate the select statement, must place select inside a loop.

for {
	select {
	case msg1 := <- chan1:
		fmt.Println(msg1)
	case msg2 := <- chan2:
		fmt.Println(msg1)
	case <- quitChan:
		fmt.Println("quit received")
		return
	}
}

Will continuously receive from both chan1 and chan2 until the quitChan is received.

Mutexes and Atomics

In Go, both sync/mutex and sync/atomic are tools that provide operations for managing concurrent access to shared resources. Both can prevent common race conditions, verified with use of the -race flag.

sync/mutex

main.go
package main
 
import (
    "fmt"
    "sync"
)
 
var (
    counter int
    mu      sync.Mutex
)
 
func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    // Acquire the lock
    mu.Lock()
    // Access shared resource
    counter++
    // Release the lock
    mu.Unlock()
}
 
func main() {
    var wg sync.WaitGroup
 
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go increment(&wg)
    }
 
    wg.Wait()
    fmt.Println("Final counter value:", counter)
}

sync/atomic

main.go
package main
 
import (
    "fmt"
    "sync"
    "sync/atomic"
)
 
var counter int32
 
func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    atomic.AddInt32(&counter, 1)
}
 
func main() {
    var wg sync.WaitGroup
 
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go increment(&wg)
    }
 
    wg.Wait()
    fmt.Println("Final counter value:", counter)
}

Comparison

  • sync/mutex:
    • Use Case: When you need to perform a series of operations atomically or work with non-primitive types.
    • Pro: Slower due to locking overhead, risk of deadlocks if not used carefully.
    • Con: Slower due to locking overhead.
  • sync/atomic:
    • *Use Case: When you need to perform simple, fast, lock-free operations on primitive types like integers.
    • Pro: Faster due to the absence of locking.
    • Con: Limited to specific data types and operations.

Concurrency Patterns

Signal

The Signal pattern in Go used to signal to one or more goroutines that a particular event has occurred. Often used to notify separate goroutines to start, stop, quit, cancel or perform some other action. The signal channel is often used with an empty struct{} since it requires no memory allocation.

main.go
package main
 
import (
	"fmt"
	"sync"
	"time"
)
 
func worker(id int, wg *sync.WaitGroup, stopChan <-chan struct{}) {
	defer wg.Done()
	fmt.Printf("Worker %d Start\n", id)
	// loop continuously
	for {
		select {
		// until stop signal received
		case <-stopChan:
			fmt.Printf("Worker %d Stop\n", id)
			return
		default:
			fmt.Printf("Worker %d Processing\n", id)
			// simulate work being done
			time.Sleep(time.Second)
		}
	}
}
 
func main() {
	var wg sync.WaitGroup
	stopCh := make(chan struct{})
 
	go worker(1, &wg, stopCh)
	go worker(2, &wg, stopCh)
	wg.Add(2) // inform Wait() to wait until both workers call Done()
	// main goroutine run to allow the worker goroutines to simulate "processing"
	time.Sleep(7 * time.Second)
 
	// signal to all workers to stop and exit
	close(stopCh)
 
	wg.Wait()
}

In this example, we're implementing graceful shutdown by leveraging the close(chan) operation on the channel to signal to both worker/receiving goroutines that no more messages will be received and to stop processing.

Generator

The Generator pattern in Go involves generating a stream of data produced by one goroutine and consumed by another. Typically, this pattern employs a function that runs in a separate goroutine to generate data and returns a channel through which the data can be received. This allows the producer (generator) and consumer to operate concurrently.

main.go
package main
 
import (
	"fmt"
	"time"
)
 
func generator() <-chan int {
	ch := make(chan int)
 
	go func() {
		// forever loop
		for i := 0; ; i++ {
			ch <- i
		}
	}()
 
	return ch
}
 
func main() {
	gen := generator()
	timeout := time.After(3 * time.Second)
 
	for {
		select {
		case val := <-gen:
			fmt.Println("Value:", val)
		case <-timeout:
			fmt.Println("timeout")
			return
		}
	}
}

Multiplexing

Multiplexing is often used interchangeably with "Fan-In".

Multiplexing enables a single goroutine to handle multiple inputs from various sources concurrently by utilizing the select {} statement, which allows it to choose between multiple communication operations, such as reading from a channel.

Another approach to achieve multiplexing is by combining multiple input channels into a single output channel.

main.go
package main
 
import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"
)
 
func signalHandler() <-chan os.Signal {
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
 
	return ch
}
 
func main() {
	networkCh := make(chan int)
	systemCh := make(chan int)
	quitCh := signalHandler()
	ctxTimeout, ctxCancel := context.WithTimeout(context.Background(), time.Second*10)
	defer ctxCancel()
 
	// Simulate Network Packets msgs
	go func() {
		time.Sleep(3 * time.Second)
		networkCh <- 1
		networkCh <- 1
	}()
 
	// Simulate System msgs
	go func() {
		time.Sleep(1 * time.Second)
		systemCh <- 2
		systemCh <- 2
	}()
 
	for {
		select {
		// Input Channel Source
		case packet := <-networkCh:
			fmt.Println("Received Packet", packet)
		// Input Channel Source
		case load := <-systemCh:
			fmt.Println("Received Sys", load)
		// Listen for Signal Interupt
		case <-quitCh:
			fmt.Println("Received Quit Signal")
			return
		// Handle Timeout
		case <-ctxTimeout.Done():
			fmt.Println("Received Timeout")
			return
		default:
			time.Sleep(1 * time.Second)
			fmt.Println("Processing...")
		}
	}
}

Fan-In

Fan-In is often used interchangeably with "Multiplexing".

The Fan-In concurrency pattern in Go enables you to merge multiple input channels or sources into a single output channel, which can then be processed by a single consumer. It commonly utilizes the built-in range function to iterate over this consolidated channel.

This pattern is particularly valuable in scenarios where multiple workers produce results that must be gathered and processed into a unified stream of operations.

In the following example, we demonstrate a Fan-In/Multiplex function that merges multiple input channels into a single output channel for iterative processing:

main.go
package main
 
import (
	"fmt"
	"sync"
	"time"
)
 
// Producers/Workers
func worker(id int) <-chan int {
	ch := make(chan int)
	go func() {
		defer close(ch)
		for i := 0; i < 5; i++ {
			ch <- id*10 + i
			time.Sleep(time.Millisecond * 100)
		}
	}()
	return ch
}
 
// Merge multiple channels function
func fanIn(inputs ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)
 
	wg.Add(len(inputs))
 
	for _, in := range inputs {
		go func(ch <-chan int) {
			for m := range ch {
				out <- m
			}
			wg.Done()
		}(in)
	}
 
	go func() {
		defer close(out) // done reading from out chan
		wg.Wait()
	}()
 
	return out
}
 
func main() {
        // Single merged output channel
	out := fanIn(worker(1), worker(2))
 
        // Consume and Print the values from the merged channel
	for val := range out {
		fmt.Println(val)
	}
}

Fan-In can also be accomplished by combining multiple input sources from separate goroutines directly, without the use of merging channels.

main.go
package main
 
import (
    "fmt"
    "sync"
    "time"
)
 
// Worker function that produces data on a channel
func worker(id int, ch chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 0; i < 5; i++ {
        ch <- id*10 + i
        time.Sleep(time.Millisecond * 100) // Simulate work
    }
}
 
func main() {
    var wg sync.WaitGroup
    ch := make(chan int)
    numWorkers := 3
 
    // Start worker goroutines
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, ch, &wg)
    }
 
    // Close the channel once all workers are done
    go func() {
        wg.Wait()
        close(ch)
    }()
 
    // Consume and print the values from the channel
    for val := range ch {
        fmt.Println(val)
    }
}

Fan-In Sequencing

Fan-In Sequencing is a variant of the Fan-In pattern that involves merging multiple outputs into a single channel while ensuring that the outputs or results are processed in a specific order.

One way to achieve this pattern is by leveraging a signal channel. In the following example, we have a Message struct with a signal channel called "ready." The producer/generator produces the message but then immediately blocks until the consumer/reader signals that it is ready to process the next message.

main.go
package main
 
import (
	"fmt"
	"math/rand"
	"time"
)
 
type Message struct {
	content string
	ready   chan bool
}
 
func fanIn(inputs ...<-chan Message) <-chan Message {
	out := make(chan Message)
 
	for _, in := range inputs {
		go func(ch <-chan Message) {
			for {
				out <- <-ch
			}
		}(in)
	}
 
	return out
}
 
func generator(msg string) <-chan Message {
	ch := make(chan Message)
	rdy := make(chan bool)
 
	go func(ready chan bool) {
		for i := 0; ; i++ {
			ch <- Message{fmt.Sprintf("%s %d", msg, i), ready}
			// Create variance in wait time
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
			// Blocks until consumer/reader signals its ready for the next message
			<-ready // Wait for ready chan message
		}
	}(rdy)
 
	return ch
}
 
func main() {
	batchSize := 3
	out := fanIn(
		generator("A"),
		generator("B"),
		generator("C"),
	)
 
	for i := 0; i < 10; i++ {
		msgs := []Message{}
		for j := 0; j < batchSize; j++ {
			msgs = append(msgs, <-out)
		}
 
		// Grab five messages and then signal to each generator we're ready for next message.
		for _, msg := range msgs {
			fmt.Println(msg.content)
			msg.ready <- true
		}
	}
 
	fmt.Println("Done!")
}

Another implementation involves leveraging indexed result ordering, where the consumer collects all results in an ordered structure such as a slice or map, allowing for sequential processing.

main.go
package main
 
import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)
 
// Result struct to hold the index and the actual result
type Result struct {
	Index int
	Value int
}
 
// Worker function to perform some work and send the result along with the index
func worker(index int, out chan<- Result, wg *sync.WaitGroup) {
	defer wg.Done()
	// Simulate work with sleep,
	// add randomness to show variance in order sent
	// but then processed in sequence
	time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
	out <- Result{Index: index, Value: index * index}
}
 
func main() {
	numWorkers := 5
	results := make(chan Result, numWorkers)
	var wg sync.WaitGroup
 
	// Start workers
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go worker(i, results, &wg)
	}
 
	// Close the results channel once all workers are done
	go func() {
		wg.Wait()
		close(results)
	}()
 
	// Collect results and store them in a slice
	collectedResults := make([]Result, numWorkers)
	for result := range results {
		fmt.Printf("Ordering result idx: %d, with: %d\n", result.Index, result.Value)
		collectedResults[result.Index] = result
	}
 
	// Process results in sequence
	for _, result := range collectedResults {
		fmt.Printf("Result from worker %d: %d\n", result.Index, result.Value)
	}
}

Fan-Out

The Fan-Out pattern involves distributing or splitting work from a single source into smaller, independent tasks that can be processed in parallel. This pattern is useful for scenarios such as processing large datasets by dividing them into smaller chunks or handling multiple I/O operations simultaneously.

In Go, this can be achieved by spawning multiple workers to process the distributed tasks across multiple channels.

main.go
package main
 
import (
	"fmt"
	"math/rand"
	"time"
)
 
func fanOut(in <-chan int) <-chan int {
	out := make(chan int)
 
	go func() {
		// defer closing out channel
		defer close(out)
 
		for msg := range in {
			// mock processing time
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
			out <- msg
		}
	}()
 
	return out
}
 
func getWork(n int) <-chan int {
	workChan := make(chan int, n)
	for i := 0; i < n; i++ {
		workChan <- i
	}
	// close buffered channel since we're done generating work
	defer close(workChan)
 
	return workChan
}
 
func main() {
	work := getWork(20)
 
	workerOut1 := fanOut(work)
	workerOut2 := fanOut(work)
	workerOut3 := fanOut(work)
 
	for {
		select {
		case value, ok := <-workerOut1:
			if !ok {
				workerOut1 = nil
			} else {
				fmt.Println("Output Worker 1 received:", value)
			}
		case value, ok := <-workerOut2:
			if !ok {
				workerOut2 = nil
			} else {
				fmt.Println("Output Worker 2 received:", value)
			}
		case value, ok := <-workerOut3:
			if !ok {
				workerOut3 = nil
			} else {
				fmt.Println("Output Worker 3 received:", value)
			}
		}
		// once all channels are closed, break for{} loop
		if workerOut1 == nil && workerOut2 == nil && workerOut3 == nil {
			break
		}
	}
	fmt.Println("Done!!")
}

Pipeline

The Pipeline concurrency pattern in Go enables you to process data in stages, with each stage managed by a separate goroutine. In this pattern, each stage receives data via a channel from the previous stage, processes it, and then sends it to the next stage.

This pattern is particularly useful in scenarios such as image processing for transforming images, stream processing for real-time logs, and general data processing.

main.go
package main
 
import (
	"fmt"
)
 
// Generator function that produces a stream of integers
func generator(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for _, n := range nums {
			out <- n
		}
	}()
	return out
}
 
// Stage 1: Doubles each integer
func stage1(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			out <- n * 2
		}
	}()
	return out
}
 
// Stage 2: Adds 1 to each integer
func stage2(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			out <- n + 1
		}
	}()
	return out
}
 
// Stage 3: Converts each integer to a string and appends a message
func stage3(in <-chan int) <-chan string {
	out := make(chan string)
	go func() {
		defer close(out)
		for n := range in {
			out <- fmt.Sprintf("Result: %d", n)
		}
	}()
	return out
}
 
// Main function
func main() {
	// Create the pipeline
	nums := generator(1, 2, 3, 4, 5)
	stage1Output := stage1(nums)
	stage2Output := stage2(stage1Output)
	stage3Output := stage3(stage2Output)
 
	// Collect and print the results
	for result := range stage3Output {
		fmt.Println(result)
	}
}

Worker Pool

The Worker Pool concurrency pattern enables your application to manage and process tasks using a fixed number of worker goroutines. By utilizing a fixed number of workers within a "pool," this pattern efficiently handles a large number of tasks by distributing them among a finite number of workers. This approach improves system resource utilization and prevents overloading.

main.go
package main
 
import (
	"fmt"
	"time"
)
 
func worker(id int, jobs <-chan int, results chan<- int) {
	go func() {
		// defer closing channel when we're done sending messages
		defer close(results)
 
		for j := range jobs {
			fmt.Println("worker", id, "started  job", j)
			// Work/Processing go's here
			time.Sleep(time.Second)
			// Work/Processing go's here
			fmt.Println("worker", id, "finished job", j)
			// Send results over channel
			results <- j * 2
		}
	}()
}
 
func main() {
	workerCount := 3
	jobCount := 20
 
	jobs := make(chan int, jobCount)
	results := make(chan int)
 
	// Create a worker pool to process jobs
	for i := 1; i <= workerCount; i++ {
		go worker(i, jobs, results)
	}
 
	// Send jobs to provisioned workers
	for j := 1; j <= jobCount; j++ {
		jobs <- j
	}
	// signal we're done sending jobs to workers
	close(jobs)
 
	// Receive results from workers
	for msg := range results {
		fmt.Println("Result: ", msg)
	}
 
	fmt.Println("Done!")
}

Queue

The Queue concurrency pattern in Go involves using a queue data structure, typically implemented with channels, to coordinate and manage concurrent tasks.

This pattern follows a producer/consumer model where one or more goroutines (producers) add messages to the queue, and other goroutines (consumers) retrieve and process them.

main.go
package main
 
import (
	"fmt"
	"sync"
	"time"
)
 
func producer(id int, queue chan int, wg *sync.WaitGroup, wgClose *sync.WaitGroup) {
	// decrement waitgroup by one
	defer wg.Done()
	// decrement close queue waitgroup by one
	defer wgClose.Done()
	for i := 0; i < 5; i++ {
		fmt.Printf("Producer %d producing item: %d\n", id, i)
		queue <- i                         // Add to the queue
		time.Sleep(time.Millisecond * 500) // Simulate some work
	}
	fmt.Printf("Producer %d DONE\n", id)
}
 
func consumer(id int, queue chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for item := range queue {
		fmt.Printf("Consumer %d consuming item: %d\n", id, item)
		time.Sleep(2 * time.Second) // Simulate processing
	}
	fmt.Printf("Consumer %d DONE\n", id)
}
 
func main() {
	var wg sync.WaitGroup      // syncrhonize pausing until all producers/consumers complete
	var wgClose sync.WaitGroup // Control when to close queue
	queue := make(chan int, 5) // Buffered channel with capacity 5
 
	// Create producers
	for i := 0; i < 3; i++ {
		wg.Add(1)
		wgClose.Add(1)
		go producer(i, queue, &wg, &wgClose)
	}
 
	// separate anonymous goroutine to wait until all producers are done to close queue channel
	go func() {
		wgClose.Wait()
		close(queue)
	}()
 
	// Create consumers
	for i := 0; i < 2; i++ {
		wg.Add(1)
		go consumer(i, queue, &wg)
	}
 
	// Wait for all producers and consumers to complete
	wg.Wait()
 
	fmt.Println("DONE!")
}

Additional Resources