Advertisement
  1. Code
  2. Go

Let's Go: Golang Concurrency, Part 2

Scroll to top
Read Time: 9 min
This post is part of a series called Let's Go: Golang Concurrency.
Let's Go: Golang Concurrency, Part 1

Overview

One of the unique features of Go is the use of channels to communicate safely between goroutines. In this article, you'll learn what channels are, how to use them effectively, and some common patterns. 

What Is a Channel?

A channel is a synchronized in-memory queue that goroutines and regular functions can use to send and receive typed values. Communication is serialized through the channel.

You create a channel using make() and specify the type of values the channel accepts:

ch := make(chan int)

Go provides a nice arrow syntax for sending and receiving to/from channels:

1
    // send value to a channel

2
    ch <- 5
3
4
    // receive value from a channel

5
    x := <- ch

You don't have to consume the value. It's OK just to pop up a value from a channel:

<-ch

Channels are blocking by default. If you send a value to a channel, you'll block until someone receives it. Similarly, if you receive from a channel, you'll block until someone sends a value to the channel.  

The following program demonstrates this. The main() function makes a channel and starts a go routine called that prints "start", reads a value from the channel, and prints too. Then main() starts another goroutine that just prints a dash ("-") every second. Then, it sleeps for 2.5 seconds, sends a value to the channel and sleeps 3 more seconds to let all goroutines finish.

1
import (
2
    "fmt"
3
    "time"
4
)
5
6
func main() {
7
    ch := make(chan int)
8
9
    // Start a goroutine that reads a value from a channel and prints it

10
    go func(ch chan int) {
11
        fmt.Println("start")
12
        fmt.Println(<-ch)
13
    }(ch)
14
15
    // Start a goroutine that prints a dash every second

16
    go func() {
17
        for i := 0; i < 5; i++ {
18
            time.Sleep(time.Second)
19
            fmt.Println("-")
20
        }
21
    }()
22
23
    // Sleep for two seconds

24
    time.Sleep(2500 * time.Millisecond)
25
26
    // Send a value to the channel

27
    ch <- 5
28
29
    // Sleep three more seconds to let all goroutines finish

30
    time.Sleep(3 * time.Second)
31
}

This program demonstrates very well the blocking nature of the channel. The first goroutine prints "start" right away, but then is blocked on trying to receive from the channel until the main() function, which sleeps for 2.5 seconds and sends the value. The other goroutine just provides a visual indication of the flow of time by printing a dash regularly every second. 

Here is the output:

1
start
2
-
3
-
4
5
5
-
6
-
7
-

Buffered Channels

This behavior tightly couples senders to receivers and sometimes is not what you want. Go provides several mechanisms to address that.

Buffered channels are channels that can hold a certain (predefined) number of values so that senders don't block until the buffer is full, even if no one is receiving. 

To create a buffered channel, just add a capacity as a second argument:

ch := make(chan int, 5)

The following program illustrates the behavior of buffered channels. The main() program defines a buffered channel with a capacity of 3. Then it starts one goroutine that reads a buffer from the channel every second and prints, and another goroutine that just prints a dash every second to give a visual indication of the progress of time. Then, it sends five values to the channel. 

1
import (
2
    "fmt"
3
    "time"
4
)
5
6
7
func main() {
8
    ch := make(chan int, 3)
9
10
    // Start a goroutine that reads a value from the channel every second and prints it

11
    go func(ch chan int) {
12
        for {
13
            time.Sleep(time.Second)
14
            fmt.Printf("Goroutine received: %d\n", <-ch)
15
        }
16
17
    }(ch)
18
19
    // Start a goroutine that prints a dash every second

20
    go func() {
21
        for i := 0; i < 5; i++ {
22
            time.Sleep(time.Second)
23
            fmt.Println("-")
24
        }
25
    }()
26
27
    // Push values to the channel as fast as possible

28
    for i := 0; i < 5; i++ {
29
        ch <- i
30
        fmt.Printf("main() pushed: %d\n", i)
31
    }
32
33
    // Sleep five more seconds to let all goroutines finish

34
    time.Sleep(5 * time.Second)
35
}

What happens at runtime? The first three values are buffered by the channel immediately, and the main() function blocks. After a second, a value is received by the goroutine, and the main() function can push another value. Another second goes by, the goroutine receives another value, and the main() function can push the last value. At this point, the goroutine keeps receiving values from the channel every second. 

Here is the output:

1
main() pushed: 0
2
main() pushed: 1
3
main() pushed: 2
4
-
5
Goroutine received: 0
6
main() pushed: 3
7
-
8
Goroutine received: 1
9
main() pushed: 4
10
-
11
Goroutine received: 2
12
-
13
Goroutine received: 3
14
-
15
Goroutine received: 4

Select

Buffered channels (as long as the buffer is big enough) can address the issue of temporary fluctuations where there aren't enough receivers to process all the sent messages. But there is also the opposite problem of blocked receivers waiting for messages to process. Go has got you covered. 

What if you want your goroutine to do something else when there are no messages to process in a channel? A good example is if your receiver is waiting for messages from multiple channels. You don't want to block on channel A if channel B has messages right now. The following program attempts to compute the sum of 3 and 5 using the full power of the machine. 

The idea is to simulate a complex operation (e.g. a remote query to a distributed DB) with redundancy. The sum() function (note how it's defined as nested function inside main()) accepts two int parameters and returns an int channel. An internal anonymous goroutine sleeps some random time up to one second and then writes the sum to the channel, closes it, and returns it.

Now, main calls sum(3, 5) four times and stores the resulting channels in variables ch1 to ch4. The four calls to sum() return immediately because the random sleeping happens inside the goroutine that each sum() function invokes.

Here comes the cool part. The select statement lets the main() function wait on all channels and respond to the first one that returns. The select statement operates a little like the switch statement.

1
func main() {
2
    r := rand.New(rand.NewSource(time.Now().UnixNano()))
3
4
    sum := func(a int, b int) <-chan int {
5
        ch := make(chan int)
6
        go func() {
7
            // Random time up to one second

8
            delay := time.Duration(r.Int()%1000) * time.Millisecond
9
            time.Sleep(delay)
10
            ch <- a + b
11
            close(ch)
12
        }()
13
        return ch
14
    }
15
16
    // Call sum 4 times with the same parameters

17
    ch1 := sum(3, 5)
18
    ch2 := sum(3, 5)
19
    ch3 := sum(3, 5)
20
    ch4 := sum(3, 5)
21
22
    // wait for the first goroutine to write to its channel

23
    select {
24
    case result := <-ch1:
25
        fmt.Printf("ch1: 3 + 5 = %d", result)
26
    case result := <-ch2:
27
        fmt.Printf("ch2: 3 + 5 = %d", result)
28
    case result := <-ch3:
29
        fmt.Printf("ch3: 3 + 5 = %d", result)
30
    case result := <-ch4:
31
        fmt.Printf("ch4: 3 + 5 = %d", result)
32
    }
33
}

Sometimes you don't want the main() function to block waiting even for the first goroutine to finish. In this case, you can add a default case that will execute if all channels are blocked.

A Web Crawler Example

In my previous article, I showed a solution to the web crawler exercise from the Tour of Go. I've used goroutines and a synchronized map. I also solved the exercise using channels. The complete source code for both solutions is available on GitHub.

Let's look at the relevant parts. First, here is a struct that will be sent to a channel whenever a goroutine parses a page. It contains the current depth and all URLs found on the page.

1
type links struct {
2
    urls  []string
3
    depth int
4
}

The fetchURL() function accepts a URL, a depth, and an output channel. It uses the fetcher (provided by the exercise) to get the URLs of all the links on the page. It sends the list of URLs as a single message to the candidate's channel as a links struct with a decremented depth. The depth represents how much further should we crawl. When depth reaches 0, no further processing should take place.

1
func fetchURL(url string, depth int, candidates chan links) {
2
    body, urls, err := fetcher.Fetch(url)
3
    fmt.Printf("found: %s %q\n", url, body)
4
5
    if err != nil {
6
        fmt.Println(err)
7
    }
8
9
    candidates <- links{urls, depth - 1}
10
}

The ChannelCrawl() function coordinates everything. It keeps track of all the URLs that were already fetched in a map. There is no need to synchronize access because no other function or goroutine is touching. It also defines a candidate channel that all the goroutines will write their results to.

Then, it starts invoking parseUrl as goroutines for each new URL. The logic keeps track of how many goroutines were launched by managing a counter. Whenever a value is read from the channel, the counter is decremented (because the sending goroutine exits after sending), and whenever a new goroutine is launched, the counter is incremented. If the depth gets to zero then no new goroutines will be launched, and the main function will keep reading from the channel until all goroutines are done.

1
// ChannelCrawl crawls links from a seed url

2
func ChannelCrawl(url string, depth int, fetcher Fetcher) {
3
    candidates := make(chan links, 0)
4
    fetched := make(map[string]bool)
5
    counter := 1
6
7
    // Fetch initial url to seed the candidates channel

8
    go fetchURL(url, depth, candidates)
9
10
    for counter > 0 {
11
        candidateLinks := <-candidates
12
        counter--
13
        depth = candidateLinks.depth
14
        for _, candidate := range candidateLinks.urls {
15
            // Already fetched. Continue...

16
            if fetched[candidate] {
17
                continue
18
            }
19
20
            // Add to fetched mapped

21
            fetched[candidate] = true
22
23
            if depth > 0 {
24
                counter++
25
                go fetchURL(candidate, depth, candidates)
26
            }
27
        }
28
    }

Conclusion

Go's channels provide a lot of options for safe communication between goroutines. The syntax support is both concise and illustrative. It's a real boon for expressing concurrent algorithms. There is much more to channels than I presented here. I encourage you to dive in and get familiar with the various concurrency patterns they enable.

Advertisement
Did you find this post useful?
Want a weekly email summary?
Subscribe below and we’ll send you a weekly email summary of all new Code tutorials. Never miss out on learning about the next big thing.
Advertisement
Looking for something to help kick start your next project?
Envato Market has a range of items for sale to help get you started.