Chinese (Traditional) (中文(繁體)) translation by Qiang Ji (you can also view the original English article)
概述
Go獨特功能之一是使用管道(channel)在goroutine之間進行通信。 在此教程中,你將會了解什麼是管道(Channel),怎樣有效地使用它們,和一些常見的模式。
什麼是管道(Channel)?
管道是一個同步的駐在內存中的隊列,goroutine和常規函數能使用此隊列發送和接收帶類型的值。 通信通過管道被序列化。
你使用make()
創建一個管道並且指定此管道可以接受的值的類型。
ch := make(chan int)
Go為管道的發送和接收提供了一個優美的箭頭語法:
// send value to a channel ch <- 5 // receive value from a channel x := <- ch
你不必使用這個值。 只需從管道中拿到一個值就行。
<-ch
管道在默認的情況下是阻礙進程的。 如果你對管道發送一個值,你將阻礙進程直到有人從這個管道接收它。 同樣,如果你從管道接收一個值,你將阻礙進程直到有人通過管道發送一個值。
以下程序演示了這一點。 在main()
函數里定義一個管道,啟動一個goroutine去打印一個“start”字符串,從管道裡讀取一個值,並且也把它打印出來。 然後main()
函數啟動另一個goroutine並每秒打印一個破折號("-")。 它休眠2.5秒後向管道發送一個值,並且再休眠3秒結束goroutine。
import ( "fmt" "time" ) func main() { ch := make(chan int) // Start a goroutine that reads a value from a channel and prints it go func(ch chan int) { fmt.Println("start") fmt.Println(<-ch) }(ch) // Start a goroutine that prints a dash every second go func() { for i := 0; i < 5; i++ { time.Sleep(time.Second) fmt.Println("-") } }() // Sleep for two seconds time.Sleep(2500 * time.Millisecond) // Send a value to the channel ch <- 5 // Sleep three more seconds to let all goroutines finish time.Sleep(3 * time.Second) }
該程序非常好地展示了通道的阻礙性質。 第一個goroutine直接打印“start”字符串,但是當它試圖從管道接收main()
函數將休眠2.5秒後才發送的數值時,它被阻止。 另一個goroutine只是通過每隔一段時間定期打印破折號來提供時間流的視覺指示。
這裡是程序輸出:
start - - 5 - - -
緩存管道
這種行為將發送者與接收者緊密地耦合在一起,有時這不是你想要的。 Go提供了幾個機制來解決這個問題。
緩存管道是一種能保存一定數量值的管道,這樣發送者不會被阻礙直到緩存滿了,即使沒有接收者。
建立一個緩存管道只需要加一個管道容量作為管道生成函數的第二個參數:
ch := make(chan int, 5)
下面的程序說明了緩存管道的行為。 main()
程序定義了一個有3個單位容量的緩存管道。然後它啟動一個從管道緩存中讀取數值的goroutine,每秒讀一個並打印出來。另一個goroutine以每秒打印一個破折號的方式給你一個時間進程的視覺指示。 然後,它向管道發送5個值。
import ( "fmt" "time" ) func main() { ch := make(chan int, 3) // Start a goroutine that reads a value from the channel every second and prints it go func(ch chan int) { for { time.Sleep(time.Second) fmt.Printf("Goroutine received: %d\n", <-ch) } }(ch) // Start a goroutine that prints a dash every second go func() { for i := 0; i < 5; i++ { time.Sleep(time.Second) fmt.Println("-") } }() // Push values to the channel as fast as possible for i := 0; i < 5; i++ { ch <- i fmt.Printf("main() pushed: %d\n", i) } // Sleep five more seconds to let all goroutines finish time.Sleep(5 * time.Second) }
在程序運行時發生了什麼呢? 前三個值立即通過管道被緩存,並且main()
函數被阻礙。 一秒後,一個值通過goroutine被接收,並且main()
函數能發送另一個值。 另一秒過去了,goroutine接收另一個值,並且main()
函數能發送最後一個值。 在這時,goroutine保持每秒從管道接收數值。
這裡是輸出結果:
main() pushed: 0 main() pushed: 1 main() pushed: 2 - Goroutine received: 0 main() pushed: 3 - Goroutine received: 1 main() pushed: 4 - Goroutine received: 2 - Goroutine received: 3 - Goroutine received: 4
Select
緩存管道(只要緩存足夠大)能夠解決暫時波動的問題,就是當沒有足夠的接收者來接收所有被發出的信息。 但是反方向也有問題,就是被阻礙的接收者等待需要被處理的信息。 Go有辦法幫你解決這些問題。
如果你希望你的goroutine在管道中沒有要處理的消息的情況下執行其它操作,該怎麼辦? 一個好例子是你的接收者從多個管道中等待信息的到來。 你不想在管道A中被阻礙當管道B中正好有信息需要處理。 下面的程序試圖用計算機的全部能力計算3和5的和。
這個主意模擬一個帶冗餘的複雜的操作(比如對一個分佈式數據庫進行一個遠程的查詢)。 sum()
函數(注意它是怎樣作為在main()
內的內嵌函數被定義的)接受兩個整數類型的參數並且返回一個整數類型的管道。 一個內部匿名goroutine隨機休眠最多一秒然後對管道寫入總和,關閉它,返回它。
現在main調用sum(3, 5)
四次並且從變量ch1到ch4中保存結果管道。 四個對sum()
的調用立即返回因為隨機休眠在sum()
函數被調用的goroutine裡發生。
這裡是最酷的部分。 select
聲明讓main()
函數等待所有的管道,並且對第一個返回的管道回應。 select
聲明的操作有點像switch
聲明的操作。
func main() { r := rand.New(rand.NewSource(time.Now().UnixNano())) sum := func(a int, b int) <-chan int { ch := make(chan int) go func() { // Random time up to one second delay := time.Duration(r.Int()%1000) * time.Millisecond time.Sleep(delay) ch <- a + b close(ch) }() return ch } // Call sum 4 times with the same parameters ch1 := sum(3, 5) ch2 := sum(3, 5) ch3 := sum(3, 5) ch4 := sum(3, 5) // wait for the first goroutine to write to its channel select { case result := <-ch1: fmt.Printf("ch1: 3 + 5 = %d", result) case result := <-ch2: fmt.Printf("ch2: 3 + 5 = %d", result) case result := <-ch3: fmt.Printf("ch3: 3 + 5 = %d", result) case result := <-ch4: fmt.Printf("ch4: 3 + 5 = %d", result) } }
有時你不想main()
函數因為等待第一個goroutine結束被阻礙。 在這種情況下,你可以添加一個默認情況,如果所有通道都被阻止,將執行此默認的操作。
一個網頁爬蟲的例子
在我之前的文章中,我在Go之旅中展示了一個網頁爬蟲練習的解決方案。 我使用了goroutine和一個同步的map。 我也通過管道解決了此練習。 兩個解決方案的完整源代碼可在GitHub看到。
讓我們看一下相關的部分。 首先,無論什麼時候goroutine解析一個頁面,這是一個將會被發送進一個管道的struct。 它包含當前的網頁內嵌深度和所有在此頁面找到的URL。
type links struct { urls []string depth int }
fetchURL()
函數接受一個URL, 一個深度值,和一個輸出管道。 它使用提取器(在練習中已被提供)去提取在此頁面上所有鏈接的URL。 它對做為一個links
struct的參加者的管道發送一個作為單獨信息的帶一個遞減深度的URL表單。 此深度代表我們應該讓網頁爬蟲爬多遠。 當深度到達0時,不會有更多的處理髮生。
func fetchURL(url string, depth int, candidates chan links) { body, urls, err := fetcher.Fetch(url) fmt.Printf("found: %s %q\n", url, body) if err != nil { fmt.Println(err) } candidates <- links{urls, depth - 1} }
ChannelCrawl()
函數協調所有的東西。 它跟踪所有的我們已經提取並保存在一個map中的URL。 不需要去同步訪問因為沒有其它函數或goroutine在運行。 它也定義了一個所有goroutine將會寫入結果的參加者管道。
然後,它開始調用parseUrl
作為處理每個新URL的goroutine。 此邏輯跟踪有多少goroutine通過管理一個計數器被啟動。 無論什麼時候一個數值從一個管道中被讀取,計數器遞減一位(因為發送goroutine在發送後退出),無論什麼時候一個新的goroutine被啟動,計數器遞增一位。 如果深度到達零那麼沒有新的goroutine將會被啟動,main函數將繼續從管道中讀取數值直到所有goroutine完成任務。
// ChannelCrawl crawls links from a seed url func ChannelCrawl(url string, depth int, fetcher Fetcher) { candidates := make(chan links, 0) fetched := make(map[string]bool) counter := 1 // Fetch initial url to seed the candidates channel go fetchURL(url, depth, candidates) for counter > 0 { candidateLinks := <-candidates counter-- depth = candidateLinks.depth for _, candidate := range candidateLinks.urls { // Already fetched. Continue... if fetched[candidate] { continue } // Add to fetched mapped fetched[candidate] = true if depth > 0 { counter++ go fetchURL(candidate, depth, candidates) } } }
結論
Go管道為在goroutine之間安全通信提供了許多選擇。 在語法上的支持既簡潔又有說服力。 表達並發算法對程序員來說是一個真正的福音。 與我在這裡介紹的相比,還有更多有關管道的知識。 我鼓勵你深入了解它們所能實現的各種並發模式。
Subscribe below and we’ll send you a weekly email summary of all new Code tutorials. Never miss out on learning about the next big thing.
Update me weeklyEnvato Tuts+ tutorials are translated into other languages by our community members—you can be involved too!
Translate this post