MCPcopy
hub / github.com/luk4z7/go-concurrency-guide

github.com/luk4z7/go-concurrency-guide @main sqlite

repository ↗ · DeepWiki ↗
58 symbols 100 edges 28 files 6 documented · 10%
README

Go Concurrency Guide

This guide is built on top of the some examples of the book Go Concurrency in Go and Go Programming Language

Race Condition and Data Race

Race condition occur when two or more operations must execute in the correct order, but the program has not been written so that this order is guaranteed to be maintained.

Data race is when one concurrent operation attempts to read a variable while at some undetermined time another concurrent operation is attempting to write to the same variable. The main func is the main goroutine.

func main() {
    var data int
    go func() {
        data++
    }()

    if data == 0 {
        fmt.Printf("the value is %d", data)
    }
}

Memory Access Synchronization

The sync package contains the concurrency primitives that are most useful for low-level memory access synchronization. Critical section is the place in your code that has access to a shared memory

Mutex

Mutex stands for “mutual exclusion” and is a way to protect critical sections of your program.

type Counter struct {
    mu sync.Mutex
    value int
}

func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

WaitGroup

Call to add a group of goroutines

var wg sync.WaitGroup
for _, salutation := range []string{"hello", "greetings", "good day"} {
    wg.Add(1)
    go func(salutation string) { 
        defer wg.Done()
        fmt.Println(salutation)
    }(salutation) 
}
wg.Wait()

RWMutex

More fine-grained memory control, being possible to request read-only lock

producer := func(wg *sync.WaitGroup, l sync.Locker) { 
    defer wg.Done()
    for i := 5; i > 0; i-- {
        l.Lock()
        l.Unlock()
        time.Sleep(1) 
    }
}

observer := func(wg *sync.WaitGroup, l sync.Locker) {
    defer wg.Done()
    l.Lock()
    defer l.Unlock()
}

test := func(count int, mutex, rwMutex sync.Locker) time.Duration {
    var wg sync.WaitGroup
    wg.Add(count+1)
    beginTestTime := time.Now()
    go producer(&wg, mutex)
    for i := count; i > 0; i-- {
        go observer(&wg, rwMutex)
    }

    wg.Wait()
    return time.Since(beginTestTime)
}

tw := tabwriter.NewWriter(os.Stdout, 0, 1, 2, ' ', 0)
defer tw.Flush()

var m sync.RWMutex
fmt.Fprintf(tw, "Readers\tRWMutex\tMutex\n")

for i := 0; i < 20; i++ {
    count := int(math.Pow(2, float64(i)))
    fmt.Fprintf(
        tw,
        "%d\t%v\t%v\n",
        count,
        test(count, &m, m.RLocker()),
        test(count, &m, &m),
    )
}

Cond

It would be better if there were some kind of way for a goroutine to efficiently sleep until it was signaled to wake and check its condition. This is exactly what the Cond type does for us.

The Cond and the Broadcast is the method that provides for notifying goroutines blocked on Wait call that the condition has been triggered.

type Button struct {
    Clicked *sync.Cond
}

func main() {
    button := Button{
        Clicked: sync.NewCond(&sync.Mutex{}),
    }

    // running on goroutine every function that passed/registered
    // and wait, not exit until that goroutine is confirmed to be running
    subscribe := func(c *sync.Cond, param string, fn func(s string)) {
        var goroutineRunning sync.WaitGroup
        goroutineRunning.Add(1)

        go func(p string) {
            goroutineRunning.Done()
            c.L.Lock() // critical section
            defer c.L.Unlock()

            fmt.Println("Registered and wait ... ")
            c.Wait()

            fn(p)
        }(param)

        goroutineRunning.Wait()
    }

    var clickRegistered sync.WaitGroup

    for _, v := range []string{
        "Maximizing window.",
        "Displaying annoying dialog box!",
        "Mouse clicked."} {

        clickRegistered.Add(1)

        subscribe(button.Clicked, v, func(s string) {
            fmt.Println(s)
            clickRegistered.Done()
        })
    }

    button.Clicked.Broadcast()

    clickRegistered.Wait()
}

cond samples

Once

Ensuring that only one execution will be carried out even among several goroutines

var count int

increment := func() {
    count++
}

var once sync.Once

var increments sync.WaitGroup
increments.Add(100)

for i := 0; i < 100; i++ {
    go func() {
        defer increments.Done()
        once.Do(increment)
    }()
}

increments.Wait()
fmt.Printf("Count is %d\n", count)

Pool

Manager the pool of connections, a quantity

package main

import (
    "fmt"
    "sync"
)

func main() {
    myPool := &sync.Pool{
        New: func() interface{} {
            fmt.Println("Creating new instance.")

            return struct{}{}
        },
    }

    // Get call New function defined in pool if there is no instance started
    myPool.Get()
    instance := myPool.Get()
    fmt.Println("instance", instance)

    // here we put a previously retrieved instance back into the pool, 
    // this increases the number of instances available to one
    myPool.Put(instance)

    // when this call is executed, we will reuse the 
    // previously allocated instance and put it back in the pool
    myPool.Get()

    var numCalcsCreated int
    calcPool := &sync.Pool{
        New: func() interface{} {
            fmt.Println("new calc pool")

            numCalcsCreated += 1
            mem := make([]byte, 1024)

            return &mem
        },
    }

    fmt.Println("calcPool.New", calcPool.New())

    calcPool.Put(calcPool.New())
    calcPool.Put(calcPool.New())
    calcPool.Put(calcPool.New())
    calcPool.Put(calcPool.New())

    calcPool.Get()

    const numWorkers = 1024 * 1024
    var wg sync.WaitGroup
    wg.Add(numWorkers)

    for i := numWorkers; i > 0; i-- {
        go func() {
            defer wg.Done()

            mem := calcPool.Get().(*[]byte)
            defer calcPool.Put(mem)

            // Assume something interesting, but quick is being done with
            // this memory.
        }()
    }

    wg.Wait()
    fmt.Printf("%d calculators were created.", numCalcsCreated)
}

sync samples

Deadlocks, Livelocks, and Starvation

Deadlocks

Deadlocks is a program is one in which all concurrent processes are waiting on one another.

package main

import (
    "fmt"
    "sync"
    "time"
)

type value struct {
    mu    sync.Mutex
    value int
}

func main() {
    var wg sync.WaitGroup
    printSum := func(v1, v2 *value) {
        defer wg.Done()
        v1.mu.Lock()
        defer v1.mu.Unlock()

        // deadlock
        time.Sleep(2 * time.Second)
        v2.mu.Lock()
        defer v2.mu.Unlock()

        fmt.Printf("sum=%v\n", v1.value+v2.value)
    }

    var a, b value
    wg.Add(2)
    go printSum(&a, &b)
    go printSum(&b, &a)

    wg.Wait()
}
package main

func main() {
    message := make(chan string)

    // A goroutine ( main goroutine ) trying to send message to channel
    message <- "message" // fatal error: all goroutines are asleep - deadlock!
}
package main

func main() {
    message := make(chan string)

    // A goroutine ( main goroutine ) trying to receive message from channel
    <-message // fatal error: all goroutines are asleep - deadlock!
}

Livelocks

Livelocks are programs that are actively performing concurrent operations, but these operations do nothing to move the state of the program forward.

package main

import (
    "bytes"
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

func main() {
    cadence := sync.NewCond(&sync.Mutex{})
    go func() {
        for range time.Tick(1 * time.Millisecond) {
            cadence.Broadcast()
        }
    }()

    takeStep := func() {
        cadence.L.Lock()
        cadence.Wait()
        cadence.L.Unlock()
    }

    tryDir := func(dirName string, dir *int32, out *bytes.Buffer) bool {
        fmt.Fprintf(out, " %v", dirName)
        atomic.AddInt32(dir, 1)
        takeStep()

        if atomic.LoadInt32(dir) == 1 {
            fmt.Fprint(out, " . Success!")

            return true
        }

        takeStep()
        atomic.AddInt32(dir, -1)

        return false
    }

    var left, right int32
    tryLeft := func(out *bytes.Buffer) bool {
        return tryDir("left", &left, out)
    }

    tryRight := func(out *bytes.Buffer) bool {
        return tryDir("right", &right, out)
    }

    walk := func(walking *sync.WaitGroup, name string) {
        var out bytes.Buffer
        defer func() {
            fmt.Println(out.String())
        }()
        defer walking.Done()

        fmt.Fprintf(&out, "%v is trying to scoot:", name)
        for i := 0; i < 5; i++ {
            if tryLeft(&out) || tryRight(&out) {
                return
            }
        }

        fmt.Fprintf(&out, "\n%v tosses her hands up in exasperation", name)
    }

    var peopleInHallway sync.WaitGroup
    peopleInHallway.Add(2)

    go walk(&peopleInHallway, "Alice")
    go walk(&peopleInHallway, "Barbara")
    peopleInHallway.Wait()
}

Starvation

Starvation is any situation where a concurrent process cannot get all the resources it needs to perform work.

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    fmt.Println("vim-go")

    var wg sync.WaitGroup
    var sharedLock sync.Mutex
    const runtime = 1 * time.Second

    greedyWorker := func() {
        defer wg.Done()

        var count int
        for begin := time.Now(); time.Since(begin) <= runtime; {
            sharedLock.Lock()
            time.Sleep(3 * time.Nanosecond)
            sharedLock.Unlock()
            count++
        }

        fmt.Printf("Greedy worker was able to execute %v work loops\n", count)
    }

    politeWorker := func() {
        defer wg.Done()

        var count int
        for begin := time.Now(); time.Since(begin) <= runtime; {
            sharedLock.Lock()
            time.Sleep(1 * time.Nanosecond)
            sharedLock.Unlock()

            sharedLock.Lock()
            time.Sleep(1 * time.Nanosecond)
            sharedLock.Unlock()

            sharedLock.Lock()
            time.Sleep(1 * time.Nanosecond)
            sharedLock.Unlock()

            count++
        }

        fmt.Printf("Polite worker was able to execute %v work loops \n", count)
    }

    wg.Add(2)
    go greedyWorker()
    go politeWorker()
    wg.Wait()
}

Channels

Channels are one of the synchronization primitives in Go derived from Hoare’s CSP. While they can be used to synchronize access of the memory, they are best used to communicate information between goroutines, default value for channel: nil.

To declare a channel to read and send

stream := make(chan interface{})

To declare unidirectional channel that only can read

stream := make(<-chan interface{})

To declare unidirectional channel that only can send

stream := make(chan<- interface{})

is not often see the instantiates channels unidirectional, only in parameters in functions, is common because Go convert them implicity

var receiveChan <-chan interface{}
var sendChan chan<- interface{}
dataStream := make(chan interface{})

// Valid statements:
receiveChan = dataStream
sendChan = dataStream

To receive

<-stream

to send

stream <- "Hello world"

Ranging over a channel the for range break the loop if the channel is closed

intStream := make(chan int)
go func() {
    defer close(intStream) 
    for i := 1; i <= 5; i++ {
        intStream <- i
    }
}()

for integer := range intStream {
    fmt.Printf("%v ", integer)
}

unbuffered channel

A send operation on an unbuffered channel blocks the sending goroutine, until another goroutine performs a corresponding receive on the same channel; at that point, the value is passed, and both goroutines can continue. On the other hand, if a receive operation is attempted beforehand, the receiving goroutine is blocked until another goroutine performs a send on the same channel. Communication over an unbuffered channel makes the sending and receiving goroutines synchronize. Because of this, unbuffered channels are sometimes called synchronous channels. When a value is sent over an unbuffered channel, the reception of the value takes place before the sending goroutine wakes up again. In discussions of concurrency, when we say that x occurs before y, we do not simply mean that x occurs before y in time; we mean that this is guaranteed and that all your previous effects like updates to variables will complete and you can count on them. When x does not occur before y or after y, we say that x is concurrent with y. This is not to say that x and y are necessarily simultaneous; it just means that we can't assume anything about your order

buffered channel

both, read and write of a channel full or empty it will block, on the buffered channel

var dataStream chan interface{}
dataStream = make(chan interface{}, 4)

both, read and send a channel empty cause deadlock

var dataStream chan interface{}
<-dataStream // This panics with: fatal error: all goroutines are asleep - deadlock!
   goroutine 1 [chan receive (nil chan)]:
   main.main()
       /tmp/babel-23079IVB/go-src-23079O4q.go:9 +0x3f
   exit status 2

```go var dataStream chan interface{} dataStream <- struct{}{} // This produces: fatal error

Core symbols most depended-on inside this repo

mockScan
called by 3
patterns/fanoutfanin/samples/003/main.go
mockScan
called by 3
patterns/fanoutfanin/samples/002/main.go
locale
called by 2
patterns/contextpackage/cancel/main.go
fanIn
called by 2
patterns/fanoutfanin/samples/003/main.go
printGreeting
called by 1
patterns/contextpackage/cancel/main.go
printFarewell
called by 1
patterns/contextpackage/cancel/main.go
genGreeting
called by 1
patterns/contextpackage/cancel/main.go
genFarewell
called by 1
patterns/contextpackage/cancel/main.go

Shape

Function 52
Struct 4
TypeAlias 2

Languages

Go100%

Modules by API surface

patterns/fanoutfanin/samples/003/main.go8 symbols
patterns/fanoutfanin/samples/002/main.go7 symbols
patterns/contextpackage/cancel/main.go6 symbols
sync/pool/network.go4 symbols
sync/pool/network_test.go3 symbols
patterns/fanoutfanin/samples/001/main.go3 symbols
patterns/fanoutfanin/main.go3 symbols
sync/broadcast/main.go2 symbols
patterns/heartbeats/main.go2 symbols
patterns/errorhandler/returnerror/main.go2 symbols
sync/rwmutex/main.go1 symbols
sync/pool/main.go1 symbols

For agents

$ claude mcp add go-concurrency-guide \
  -- python -m otcore.mcp_server <graph>

⬇ download graph artifact