Go 101 - Channel (Part 1)

Go 101 - Channel (Part 1)

·

10 min read

1. Giới thiệu chung

Go là ngôn ngữ hỗ trợ lập trình concurrency hiệu quả và đơn giản với goroutines. Để giao tiếp bộ nhớ giữa các goroutines, ta có một triết lí nổi tiếng trong Go:

Do not communicate by sharing memory; instead, share memory by communicating.
Rob Pike

"Đừng giao tiếp bằng việc chia sẻ bộ nhớ, thay vào đó hãy chia sẻ bộ nhớ thông qua giao tiếp". Việc "giao tiếp" có nghĩa là trong Golang chúng ta sẽ có một kênh giao tiếp dữ liệu được thiết kế riêng. Đó chính là channel.

2. Channel là gì?

Channel là một kênh giao tiếp trung gian giữa các goroutines. Channel giúp các goroutines có thể hoặc nhận được dữ liệu cho nhau một cách an toàn thông qua cơ chế lock-free, tức channel là thread-safe.

image.png

Mặc định, channel là kênh giao tiếp 2 chiều (bidirectional), nghĩa là channel có thể dùng cho cả gửi và nhận dữ liệu.

Zero value của một channel là nil.

2.1 Khai báo channel

Để sử dụng channel, chúng ta dùng keyword chan được hỗ trợ mặc định từ trong chính ngôn ngữ Golang (thường được hiểu là first-class), tức chúng ta không cần phải import thêm bất kì một package nào để sử dụng chan.

Syntax khai báo channel với chan:

var channelName chan Type

hoặc

channelName := make(chan Type)

Vì Go là ngôn ngữ strong typed, tức là phải có kiểu dữ liệu cụ thể cho các biến khi khai báo, việc này cũng không ngoại lệ với channel. Channel phải biết được kiểu dữ liệu gì sẽ đi qua nó.

2.2 Gửi và nhận dữ liệu qua channel

Để gửi và nhận dữ liệu qua channel, chúng ta sẽ dùng toán tử <- [tbd]. Toán tử này hoạt động như một chỉ hướng dữ liệu sẽ đi từ đâu đến đâu. Chỉ hướng này giúp ta xác định đang được gửi đi hay nhận về.

Gửi dữ liệu vào channel

channelName <- valu

Việc gửi dữ liệu vào channel sẽ giống như: "Tôi đã hoàn tất công việc của mình với dữ liệu này và bàn giao chúng cho người khác".

Nhận dữ liệu từ channel

myVar := <- channelName

Ví dụ gửi và nhận dữ liệu với channel:

package main

import "fmt"

func main() {
    myChan := make(chan int)

    go func() {
        myChan <- 1
    }()

    fmt.Println(<-myChan)
}

2.3 Cơ chế block của channel

Việc gửi và nhận dữ liệu thông qua channel sẽ có hỗ trợ cơ chế block. Việc này giúp các goroutines giao tiếp qua channel một cách đồng bộ (synchonization). Về nguyên tắc, channel sẽ block goroutines nếu nó chưa sẵn sàng.

Chúng ta cần hiểu rằng giao tiếp dữ liệu qua chanel giống như việc ta phải "trao tận tay" cho người nhận. Nếu vì lí do nào đó, người nhận chưa sẵn sàng, hoặc ngược lại người trao chưa đến, đôi bên sẽ phải đợi.

![image.png](cdn.hashnode.com/res/hashnode/image/upload/.. align="center")

Minh hoạ "trao tận tay" dữ liệu với channel

Đoạn code dưới đây sẽ khiến main goroutine bị block lại vĩnh viễn, gây lỗi deadlock và chương trình sẽ exit.

package main

func main() {
    myChan := make(chan int)
    myChan <- 1
}

Main goroutine đang gửi giá trị 1 vào channel myChan. Bản thân main gorountine sẽ bị block lại cho tới khi goroutine khác nhận dữ liệu từ myChan. Chương trình bị deadlock vì ngoài main ra chẳng còn goroutine nào lấy dữ liệu ra từ myChan cả.

Các goroutines dù có chạy nhanh chậm như thế nào, khi giao tiếp với channel đều phải bị "dừng" lại để giao tiếp rồi muốn làm gì thì làm. Mình sẽ minh hoạ thêm một ví dụ nữa:

package main

import (
    "fmt"
    "time"
)

func main() {
    myChan := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            myChan <- i
            time.Sleep(time.Second)
        }
    }()

    for i := 0; i < 10; i++ {
        fmt.Println(<-myChan)
    }
}

Trong đoạn code trên chúng ta sẽ có 2 goroutine là main và một anonymous function. Main goroutine sẽ lấy dữ liệu từ myChan với tốc độ cao hơn function gửi vào. Ở mỗi vòng lặp, main goroutine cũng phải đợi cho tới khi myChan có dữ liệu gửi vào. Kết quả ta thấy các con số được in ra sau mỗi giây.

Thực ra đây chính là cơ chế "streaming" rất hữu dụng về sau này. Go đã làm nó trở nên cực kì gọn và dễ dùng.

2.4 Sử dụng channel để gửi hoặc nhận dữ liệu

Sử dụng channel cho cả gửi và nhận dữ liệu

Mặc định, channel sẽ dùng cho cả 2 chiều giao tiếp gửi và nhận như sau:

package main

import (
   "fmt"
)

func receiveAndSend(c chan int) {
   fmt.Printf("Received: %d\n", <-c)
   fmt.Printf("Sending 2...\n")
   c <- 2
}

func main() {
   myChan := make(chan int)

   go receiveAndSend(myChan)
   myChan <- 1

   fmt.Printf("Value from receiveAndSend: %d\n", <-myChan)
}

Trong đoạn code trên, hàm receiveAndSend thực hiện lấy dữ liệu ra và ngay sau đó là gửi dữ liệu vào c. Như vậy channel c dùng cho cả gửi và nhận dữ liệu (hay giao tiếp 2 chiều).

Tuy nhiên chúng ta vẫn có thể giới hạn lại channel chỉ được dùng cho gửi hoặc nhận ở một hàm nào đó.

Sử dụng channel chỉ được phép gửi hoặc nhận dữ liệu

Để giới hạn channel ở một hàm nào đó chỉ được phép nhận dữ liệu, chúng ta sẽ dùng <- chan:

func recieveOnly(c <-chan int) {
    fmt.Printf("Received: %d\n", <-c)
    c <- 2 // error
}

Sẽ có lỗi biên dịch nếu ta cố tình gửi dữ liệu vào channel trong hàm receiveOnly.

Tương tự, nếu cần giới hạn channel ở một hàm nào đó chỉ được dùng để gửi dữ liệu ta sẽ dùng chan<-:

func sendOnly(c chan<- int) {
    c <- 2 // OK
    fmt.Printf("Received: %d\n", <-c) // error
}```


## 2.5 Close channel

Để đóng một channel chúng ta sẽ dùng hàm `close()`. Khi một channel bị close có nghĩa là không còn dữ liệu nào sẽ đi qua nó nữa.

`Close()` cũng là một __first-class__ function của Go. Cú pháp như sau:

```Go
close(chanName)

Các lưu ý khi close channel:

  1. Không thể gửi dữ liệu vào channel đã bị close, sẽ có lỗi runtime nếu ta cố làm việc này.

  2. Khi hàm close() thực thi bản chất sẽ truyền vào channel một tín hiệu để phía sử dụng biết là channel đã close hay chưa.

Kiểm tra một channel đã đóng hay chưa?

Bình thường nếu chỉ lấy dữ liệu từ channel đơn thuần thì chúng ta không biết được channel đã đóng hay vẫn còn hoạt động. Vì thế một syntax bổ sung để làm việc này như sau

value, isAlive := <- chanName

Biến isAlive (bool) được bổ sung để biết channel bị đóng hay chưa.

Ví dụ minh hoạ:

package main

import (
    "fmt"
)

func main() {
    myChan := make(chan int)

    go func() {
        for i := 1; i <= 10; i++ {
            myChan <- i
        }
        close(myChan)
    }()

    for {
        value, isAlive := <-myChan

        if !isAlive {
            fmt.Printf("Value: %d. Channel has been closed.\n", value)
            break
        }

        fmt.Printf("Value: %d\n", value)
    }
}
Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
Value: 0. Channel has been closed.

Như đã lưu ý ở trên, nếu chúng ta sử dụng chan int và không dùng biến check isAlive thì trường hợp value = 0 sẽ có thể bị hiểu sai là có giá trị 0 được gửi qua channel. Tuy nhiên các biến number trong Go sẽ luôn là zero (0) nếu không được gán giá trị (empty).

3. Common pattern

Phần này sẽ liệt kê một số pattern phổ biến khi làm việc với channel trong Go.

3.1 Sử dụng for-range với channel

Một cách thuận lợi hơn rất nhiều để vừa lấy dữ liệu từ channel vừa có thể biết được channel còn hoạt động hay không với cú pháp for-range như sau:

package main

import (
    "fmt"
)

func main() {
    myChan := make(chan int)

    go func() {
        for i := 1; i < 10; i++ {
            myChan <- i
        }
        close(myChan)
    }()

    for value := range myChan {
        fmt.Printf("Value: %d\n", value)
    }
}

Sử dụng for-range như trên sẽ ngắn gọn và tiện lợi hơn rất đáng kể, hạn chế lỗi logic check channel.

3.2 Sử dụng selectWaitGroup để xử lí nhiều channel

Khi có nhiều hơn 1 channel cùng được sử dụng sẽ bắt đầu phát sinh những vẫn đề:

  1. Làm sao để biết channel nào về dữ liệu trước hoặc sẵn sàng nhận dữ liệu để ưu tiên xử lí?

  2. Làm sao để biết tất cả channel đều đã về dữ liệu hoặc đóng?

Ta sẽ xem xét một ví dụ đơn giản như sau:

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    r := rand.New(rand.NewSource(time.Now().Unix()))

    ch1 := make(chan int)
    ch2 := make(chan int)

    go func() {
        time.Sleep(time.Second * time.Duration(r.Intn(5)))
        ch1 <- 1
    }()

    go func() {
        time.Sleep(time.Second * time.Duration(r.Intn(5)))
        ch1 <- 1
    }()

    fmt.Println(<-ch1)
    fmt.Println(<-ch2)
}

Đoạn code trên sẽ có 2 channel đặt trong 2 goroutine thực thi random thời gian nên ta không biết channel nào sẽ có dữ liệu sớm hơn channel còn lại. Cách viết trên sẽ luôn đợi để in ra ch1 rồi đến ch2 dù thực tế có khả năng ch2 nhanh hơn.

3.2.1 Sử dụng select để chọn channel đã sẵn sàng

Theo định nghĩa, select sẽ block cho đến khi có case có thể được thực thi trên các channel. Vì thế ta có thể ứng dụng như sau:

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    r := rand.New(rand.NewSource(time.Now().Unix()))

    ch1 := make(chan int)
    ch2 := make(chan int)

    go func() {
        time.Sleep(time.Second * time.Duration(r.Intn(5)))
        ch1 <- 1
    }()

    go func() {
        time.Sleep(time.Second * time.Duration(r.Intn(5)))
        ch2 <- 2
    }()

    select {
    case v1 := <-ch1:
        fmt.Println("Ch1 come first with value:", v1)
        fmt.Println("then ch2 with value:", <-ch2)
    case v2 := <-ch2:
        fmt.Println("Ch2 come first with value:", v2)
        fmt.Println("then ch1 with value:", <-ch1)
    }
}

Trong thực tế, select sẽ hầu như đi với for tạo cặp syntax for-select như trong ví dụ sau:

package main

import "fmt"

func fibonacci(c, quit chan int) {
    x, y := 0, 1
    for {
        select {
        case c <- x:
            x, y = y, x+y
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}

func main() {
    c := make(chan int)
    quit := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println(<-c)
        }
        quit <- 0
    }()
    fibonacci(c, quit)
}

Source: go.dev/tour/concurrency/5

for-selectxuất hiện trong hàm fibonaci để tạo một loop xét trên 2 case select là:

  1. Case 1: channel c có thể tiếp nhận dữ liệu x (không bị block), thực hiện phép toán x = yy = x + y.

  2. Case 2: channel quit có dữ liệu, return thoát hàm luôn.

Tại main goroutine, có một loop 10 lần lấy dữ liệu từ channel c. Việc này khiến case 1 sẽ được thực thi 10 lần. Sau đó channel quit được truyền dữ liệu vào để case 2 thực thi (lúc này case 1 bị block, không thể thực thi được).

Phương thức sử dụng 1 channel riêng để quit, thoát hàm hoặc vòng lặp này trong practice xuất hiện rất phổ biến, có thể xem là một best practice.

3.2.2 Sử dụng Waitgroup để biết các goroutine đã hoàn tất

Đây sẽ là một trường hợp rất phổ biến vì chúng ta sẽ không biết khi nào thì các channel đã hoàn tất. Trong thực tế sẽ không bao giờ biết được là sleep bao lâu. Ví dụ cho vấn đề này như sau:

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    r := rand.New(rand.NewSource(time.Now().Unix()))

    go func() {
        time.Sleep(time.Second * time.Duration(r.Intn(5)))
        fmt.Println("Goroutine 1 done.")
    }()

    go func() {
        time.Sleep(time.Second * time.Duration(r.Intn(5)))
        fmt.Println("Goroutine 2 done.")
    }()

    time.Sleep(time.Second * 6)
}

Để sử dụng waitgroup chúng ta cần import package sync. Nguyên tắc hoạt động của waitgroup khá đơn giản là có bao nhiêu goroutines cần đợi thì dùng hàm Add(number). Mỗi khi goroutines chạy xong thì gọi hàm Done(). Hàm Wait() sẽ bị block cho tới khi đã done hết tất cả số lượng goroutines đã khai báo trước đó.

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func main() {
    r := rand.New(rand.NewSource(time.Now().Unix()))
    wc := new(sync.WaitGroup)
    wc.Add(2)

    go func() {
        time.Sleep(time.Second * time.Duration(r.Intn(5)))
        fmt.Println("Goroutine 1 done.")
        wc.Done()
    }()

    go func() {
        time.Sleep(time.Second * time.Duration(r.Intn(5)))
        fmt.Println("Goroutine 2 done.")
        wc.Done()
    }()

    wc.Wait()
    fmt.Println("All Goroutines done")
}

Xem nhiều hơn các common pattern ở đây

Tham khảo

programming-books.io/essential/go/channels-..