
Go Concurrency Primitives and Patterns
Discover the different concurrency primitives in Go and learn common patterns to utilize them effectively.
Tuesday, Jun 18, 2024
Concurrency Primitives
A high-level overview of some of the foundational concurrency primitives in Go. If you are unfamiliar with these concepts, I recommended to dive further into each one.
Goroutine
Independent functions that execute on a set of user-space threads, known as 'lightweight threads,' are managed by the Go runtime.
The Go runtime schedules goroutines, executing them efficiently on top of OS threads. It also determines when these goroutines are run.
The Go Scheduler can spin up several thousand goroutines on the same machine.
package main
func main() {
go func() {
// inside goroutine
}()
}
Read more about goroutines and the Go Runtime Scheduler: Go Runtime Scheduler: High Level Concurrency Overview
WaitGroups
Synchronization primitive that Waits for a collection of goroutines to finish. WaitGroups can be used in place of or in conjunction with channels.
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // Signal that this goroutine is done
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second) // Simulate work
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1) // Increment the counter for each new goroutine
go worker(i, &wg) // Launch the worker goroutine
}
wg.Wait() // Block until all worker goroutines are done
fmt.Println("All workers completed")
}
Channels
Channels allow for sharing data between goroutines in a seamless fashion without passing data between functions directly. Channels can also be thought of a FIFO (First-In-First-Out) queue.
ch := make(chan string)
// sender
ch <- "value"
// receiver
fmt.Println(<-ch)
Behaviors:
- Types:
Unbuffered
(default) andBuffered
Channels - Close (
close(chan)
) channels when the sender is done sending messages. Signals to the receivers that they should stop waiting for messages. Important for receivers whenranging
over the channel. - Deadlocks can occur when goroutines are waiting for each other to release resources or communicate over channels. In respect to channels, this can be easily triggered by sending a value over an unbuffered channel without a separate goroutine to read from it.
Unbuffered
The Unbuffered channel has no capacity, where the sender and receiver must be ready to communicate at the same time.
- Behavior: Both send and receive blocks until another goroutine performs the inverse operation.
- Use Case: Ideal for synchronizing goroutines, ensuring the value sent over the channel is processed immediately.
package main
import (
"fmt"
)
func main() {
ch := make(chan int)
go func() {
ch <- 42 // This blocks until the value is received
}()
value := <-ch // This blocks until a value is sent
fmt.Println(value) // Output: 42
}
Buffered
Contrary to Unbuffered channels, Buffered channels do have a specified capacity, allowing the sender to fill up to the limited capacity without requiring a receiver to immediately read from the channel.
- Behavior: A send operation only blocks on a buffered channel if the buffer is full and a receive operation only blocks if the buffer is empty.
- Use Case: Ideal for scenarios requiring producer and consumers to be decoupled, allowing them to operate at different cadences.
package main
import (
"fmt"
)
func main() {
ch := make(chan int, 3) // Buffered channel with capacity 3
ch <- 1 // This does not block
ch <- 2 // This does not block
ch <- 3 // This does not block
fmt.Println(<-ch) // Output: 1
fmt.Println(<-ch) // Output: 2
fmt.Println(<-ch) // Output: 3
}
Select
Used to wait on multiple (known) channel operations. Allows goroutines to choose from multiple channels in which to receive from that are ready for synchronization.
Use Cases:
- Waiting on multiple channels
- Timeouts and cancellation handling
- Non-blocking communication with use of
default
case to perform sends and receives in a non-blocking manner.
select {
case <- chan1:
// if chan1 is ready for receiving
case <- chan2:
// if chan2 is ready for receiving
case <-time.After(3 * time.Second):
// if timeout exceeds deadline
default:
// code to execute if none of the channels are ready (optional)
// can help prevent a deadlock if none of the channels are ready to communicate
}
The select
statement evaluates its cases once and then blocks until one of the cases can proceed. If required to repeatedly evaluate the select
statement, must place select
inside a loop.
for {
select {
case msg1 := <- chan1:
fmt.Println(msg1)
case msg2 := <- chan2:
fmt.Println(msg1)
case <- quitChan:
fmt.Println("quit received")
return
}
}
Will continuously receive from both chan1 and chan2 until the quitChan is received.
Mutexes and Atomics
In Go, both sync/mutex
and sync/atomic
are tools that provide operations for managing concurrent access to shared resources. Both can prevent common race conditions, verified with use of the -race
flag.
sync/mutex
package main
import (
"fmt"
"sync"
)
var (
counter int
mu sync.Mutex
)
func increment(wg *sync.WaitGroup) {
defer wg.Done()
// Acquire the lock
mu.Lock()
// Access shared resource
counter++
// Release the lock
mu.Unlock()
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go increment(&wg)
}
wg.Wait()
fmt.Println("Final counter value:", counter)
}
sync/atomic
package main
import (
"fmt"
"sync"
"sync/atomic"
)
var counter int32
func increment(wg *sync.WaitGroup) {
defer wg.Done()
atomic.AddInt32(&counter, 1)
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go increment(&wg)
}
wg.Wait()
fmt.Println("Final counter value:", counter)
}
Comparison
sync/mutex
:- Use Case: When you need to perform a series of operations atomically or work with non-primitive types.
- Pro: Slower due to locking overhead, risk of deadlocks if not used carefully.
- Con: Slower due to locking overhead.
sync/atomic
:- *Use Case: When you need to perform simple, fast, lock-free operations on primitive types like integers.
- Pro: Faster due to the absence of locking.
- Con: Limited to specific data types and operations.
Concurrency Patterns
Signal
The Signal pattern in Go used to signal to one or more goroutines that a particular event has occurred. Often used to notify separate goroutines to start, stop, quit, cancel or perform some other action. The signal channel is often used with an empty struct{}
since it requires no memory allocation.
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup, stopChan <-chan struct{}) {
defer wg.Done()
fmt.Printf("Worker %d Start\n", id)
// loop continuously
for {
select {
// until stop signal received
case <-stopChan:
fmt.Printf("Worker %d Stop\n", id)
return
default:
fmt.Printf("Worker %d Processing\n", id)
// simulate work being done
time.Sleep(time.Second)
}
}
}
func main() {
var wg sync.WaitGroup
stopCh := make(chan struct{})
go worker(1, &wg, stopCh)
go worker(2, &wg, stopCh)
wg.Add(2) // inform Wait() to wait until both workers call Done()
// main goroutine run to allow the worker goroutines to simulate "processing"
time.Sleep(7 * time.Second)
// signal to all workers to stop and exit
close(stopCh)
wg.Wait()
}
In this example, we're implementing graceful shutdown by leveraging the close(chan)
operation on the channel to signal to both worker/receiving goroutines that no more messages will be received and to stop processing.
Generator
The Generator pattern in Go involves generating a stream of data produced by one goroutine and consumed by another. Typically, this pattern employs a function that runs in a separate goroutine to generate data and returns a channel through which the data can be received. This allows the producer (generator) and consumer to operate concurrently.
package main
import (
"fmt"
"time"
)
func generator() <-chan int {
ch := make(chan int)
go func() {
// forever loop
for i := 0; ; i++ {
ch <- i
}
}()
return ch
}
func main() {
gen := generator()
timeout := time.After(3 * time.Second)
for {
select {
case val := <-gen:
fmt.Println("Value:", val)
case <-timeout:
fmt.Println("timeout")
return
}
}
}
Multiplexing
Multiplexing is often used interchangeably with "Fan-In".
Multiplexing enables a single goroutine to handle multiple inputs from various sources concurrently by utilizing the select {}
statement, which allows it to choose between multiple communication operations, such as reading from a channel.
Another approach to achieve multiplexing is by combining multiple input channels into a single output channel.
package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
func signalHandler() <-chan os.Signal {
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
return ch
}
func main() {
networkCh := make(chan int)
systemCh := make(chan int)
quitCh := signalHandler()
ctxTimeout, ctxCancel := context.WithTimeout(context.Background(), time.Second*10)
defer ctxCancel()
// Simulate Network Packets msgs
go func() {
time.Sleep(3 * time.Second)
networkCh <- 1
networkCh <- 1
}()
// Simulate System msgs
go func() {
time.Sleep(1 * time.Second)
systemCh <- 2
systemCh <- 2
}()
for {
select {
// Input Channel Source
case packet := <-networkCh:
fmt.Println("Received Packet", packet)
// Input Channel Source
case load := <-systemCh:
fmt.Println("Received Sys", load)
// Listen for Signal Interupt
case <-quitCh:
fmt.Println("Received Quit Signal")
return
// Handle Timeout
case <-ctxTimeout.Done():
fmt.Println("Received Timeout")
return
default:
time.Sleep(1 * time.Second)
fmt.Println("Processing...")
}
}
}
Fan-In
Fan-In is often used interchangeably with "Multiplexing".
The Fan-In concurrency pattern in Go enables you to merge multiple input channels or sources into a single output channel, which can then be processed by a single consumer. It commonly utilizes the built-in range function to iterate over this consolidated channel.
This pattern is particularly valuable in scenarios where multiple workers produce results that must be gathered and processed into a unified stream of operations.
In the following example, we demonstrate a Fan-In/Multiplex function that merges multiple input channels into a single output channel for iterative processing:
package main
import (
"fmt"
"sync"
"time"
)
// Producers/Workers
func worker(id int) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for i := 0; i < 5; i++ {
ch <- id*10 + i
time.Sleep(time.Millisecond * 100)
}
}()
return ch
}
// Merge multiple channels function
func fanIn(inputs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
wg.Add(len(inputs))
for _, in := range inputs {
go func(ch <-chan int) {
for m := range ch {
out <- m
}
wg.Done()
}(in)
}
go func() {
defer close(out) // done reading from out chan
wg.Wait()
}()
return out
}
func main() {
// Single merged output channel
out := fanIn(worker(1), worker(2))
// Consume and Print the values from the merged channel
for val := range out {
fmt.Println(val)
}
}
Fan-In can also be accomplished by combining multiple input sources from separate goroutines directly, without the use of merging channels.
package main
import (
"fmt"
"sync"
"time"
)
// Worker function that produces data on a channel
func worker(id int, ch chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 5; i++ {
ch <- id*10 + i
time.Sleep(time.Millisecond * 100) // Simulate work
}
}
func main() {
var wg sync.WaitGroup
ch := make(chan int)
numWorkers := 3
// Start worker goroutines
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, ch, &wg)
}
// Close the channel once all workers are done
go func() {
wg.Wait()
close(ch)
}()
// Consume and print the values from the channel
for val := range ch {
fmt.Println(val)
}
}
Fan-In Sequencing
Fan-In Sequencing is a variant of the Fan-In pattern that involves merging multiple outputs into a single channel while ensuring that the outputs or results are processed in a specific order.
One way to achieve this pattern is by leveraging a signal channel. In the following example, we have a Message
struct with a signal channel called "ready." The producer/generator produces the message but then immediately blocks until the consumer/reader signals that it is ready to process the next message.
package main
import (
"fmt"
"math/rand"
"time"
)
type Message struct {
content string
ready chan bool
}
func fanIn(inputs ...<-chan Message) <-chan Message {
out := make(chan Message)
for _, in := range inputs {
go func(ch <-chan Message) {
for {
out <- <-ch
}
}(in)
}
return out
}
func generator(msg string) <-chan Message {
ch := make(chan Message)
rdy := make(chan bool)
go func(ready chan bool) {
for i := 0; ; i++ {
ch <- Message{fmt.Sprintf("%s %d", msg, i), ready}
// Create variance in wait time
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
// Blocks until consumer/reader signals its ready for the next message
<-ready // Wait for ready chan message
}
}(rdy)
return ch
}
func main() {
batchSize := 3
out := fanIn(
generator("A"),
generator("B"),
generator("C"),
)
for i := 0; i < 10; i++ {
msgs := []Message{}
for j := 0; j < batchSize; j++ {
msgs = append(msgs, <-out)
}
// Grab five messages and then signal to each generator we're ready for next message.
for _, msg := range msgs {
fmt.Println(msg.content)
msg.ready <- true
}
}
fmt.Println("Done!")
}
Another implementation involves leveraging indexed result ordering, where the consumer collects all results in an ordered structure such as a slice or map, allowing for sequential processing.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// Result struct to hold the index and the actual result
type Result struct {
Index int
Value int
}
// Worker function to perform some work and send the result along with the index
func worker(index int, out chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
// Simulate work with sleep,
// add randomness to show variance in order sent
// but then processed in sequence
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
out <- Result{Index: index, Value: index * index}
}
func main() {
numWorkers := 5
results := make(chan Result, numWorkers)
var wg sync.WaitGroup
// Start workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(i, results, &wg)
}
// Close the results channel once all workers are done
go func() {
wg.Wait()
close(results)
}()
// Collect results and store them in a slice
collectedResults := make([]Result, numWorkers)
for result := range results {
fmt.Printf("Ordering result idx: %d, with: %d\n", result.Index, result.Value)
collectedResults[result.Index] = result
}
// Process results in sequence
for _, result := range collectedResults {
fmt.Printf("Result from worker %d: %d\n", result.Index, result.Value)
}
}
Fan-Out
The Fan-Out pattern involves distributing or splitting work from a single source into smaller, independent tasks that can be processed in parallel. This pattern is useful for scenarios such as processing large datasets by dividing them into smaller chunks or handling multiple I/O operations simultaneously.
In Go, this can be achieved by spawning multiple workers to process the distributed tasks across multiple channels.
package main
import (
"fmt"
"math/rand"
"time"
)
func fanOut(in <-chan int) <-chan int {
out := make(chan int)
go func() {
// defer closing out channel
defer close(out)
for msg := range in {
// mock processing time
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
out <- msg
}
}()
return out
}
func getWork(n int) <-chan int {
workChan := make(chan int, n)
for i := 0; i < n; i++ {
workChan <- i
}
// close buffered channel since we're done generating work
defer close(workChan)
return workChan
}
func main() {
work := getWork(20)
workerOut1 := fanOut(work)
workerOut2 := fanOut(work)
workerOut3 := fanOut(work)
for {
select {
case value, ok := <-workerOut1:
if !ok {
workerOut1 = nil
} else {
fmt.Println("Output Worker 1 received:", value)
}
case value, ok := <-workerOut2:
if !ok {
workerOut2 = nil
} else {
fmt.Println("Output Worker 2 received:", value)
}
case value, ok := <-workerOut3:
if !ok {
workerOut3 = nil
} else {
fmt.Println("Output Worker 3 received:", value)
}
}
// once all channels are closed, break for{} loop
if workerOut1 == nil && workerOut2 == nil && workerOut3 == nil {
break
}
}
fmt.Println("Done!!")
}
Pipeline
The Pipeline concurrency pattern in Go enables you to process data in stages, with each stage managed by a separate goroutine. In this pattern, each stage receives data via a channel from the previous stage, processes it, and then sends it to the next stage.
This pattern is particularly useful in scenarios such as image processing for transforming images, stream processing for real-time logs, and general data processing.
package main
import (
"fmt"
)
// Generator function that produces a stream of integers
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// Stage 1: Doubles each integer
func stage1(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * 2
}
}()
return out
}
// Stage 2: Adds 1 to each integer
func stage2(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n + 1
}
}()
return out
}
// Stage 3: Converts each integer to a string and appends a message
func stage3(in <-chan int) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for n := range in {
out <- fmt.Sprintf("Result: %d", n)
}
}()
return out
}
// Main function
func main() {
// Create the pipeline
nums := generator(1, 2, 3, 4, 5)
stage1Output := stage1(nums)
stage2Output := stage2(stage1Output)
stage3Output := stage3(stage2Output)
// Collect and print the results
for result := range stage3Output {
fmt.Println(result)
}
}
Worker Pool
The Worker Pool concurrency pattern enables your application to manage and process tasks using a fixed number of worker goroutines. By utilizing a fixed number of workers within a "pool," this pattern efficiently handles a large number of tasks by distributing them among a finite number of workers. This approach improves system resource utilization and prevents overloading.
package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
go func() {
// defer closing channel when we're done sending messages
defer close(results)
for j := range jobs {
fmt.Println("worker", id, "started job", j)
// Work/Processing go's here
time.Sleep(time.Second)
// Work/Processing go's here
fmt.Println("worker", id, "finished job", j)
// Send results over channel
results <- j * 2
}
}()
}
func main() {
workerCount := 3
jobCount := 20
jobs := make(chan int, jobCount)
results := make(chan int)
// Create a worker pool to process jobs
for i := 1; i <= workerCount; i++ {
go worker(i, jobs, results)
}
// Send jobs to provisioned workers
for j := 1; j <= jobCount; j++ {
jobs <- j
}
// signal we're done sending jobs to workers
close(jobs)
// Receive results from workers
for msg := range results {
fmt.Println("Result: ", msg)
}
fmt.Println("Done!")
}
Queue
The Queue concurrency pattern in Go involves using a queue data structure, typically implemented with channels, to coordinate and manage concurrent tasks.
This pattern follows a producer/consumer model where one or more goroutines (producers) add messages to the queue, and other goroutines (consumers) retrieve and process them.
package main
import (
"fmt"
"sync"
"time"
)
func producer(id int, queue chan int, wg *sync.WaitGroup, wgClose *sync.WaitGroup) {
// decrement waitgroup by one
defer wg.Done()
// decrement close queue waitgroup by one
defer wgClose.Done()
for i := 0; i < 5; i++ {
fmt.Printf("Producer %d producing item: %d\n", id, i)
queue <- i // Add to the queue
time.Sleep(time.Millisecond * 500) // Simulate some work
}
fmt.Printf("Producer %d DONE\n", id)
}
func consumer(id int, queue chan int, wg *sync.WaitGroup) {
defer wg.Done()
for item := range queue {
fmt.Printf("Consumer %d consuming item: %d\n", id, item)
time.Sleep(2 * time.Second) // Simulate processing
}
fmt.Printf("Consumer %d DONE\n", id)
}
func main() {
var wg sync.WaitGroup // syncrhonize pausing until all producers/consumers complete
var wgClose sync.WaitGroup // Control when to close queue
queue := make(chan int, 5) // Buffered channel with capacity 5
// Create producers
for i := 0; i < 3; i++ {
wg.Add(1)
wgClose.Add(1)
go producer(i, queue, &wg, &wgClose)
}
// separate anonymous goroutine to wait until all producers are done to close queue channel
go func() {
wgClose.Wait()
close(queue)
}()
// Create consumers
for i := 0; i < 2; i++ {
wg.Add(1)
go consumer(i, queue, &wg)
}
// Wait for all producers and consumers to complete
wg.Wait()
fmt.Println("DONE!")
}