Golang 基础-并发、协程、信道、缓冲信道、工作池(第五章)

1. wili

Go 使用 Go 协程(Goroutine) 和信道(Channel)来处理并发。

2. 协程

2.1. 概念

Go 协程是与其他函数或方法一起并发运行的函数或方法。协程相比于线程的优势:

  • Go 协程的成本极低。堆栈大小只有若干 kb,并且可以根据应用的需求进行增减。而线程必须指定堆栈的大小,其堆栈是固定不变的
  • Go 协程会复用(Multiplex)数量更少的 OS 线程。即使程序有数以千计的 Go 协程,也可能只有一个线程。如果该线程中的某一 Go 协程发生了阻塞(比如说等待用户输入),那么系统会再创建一个 OS 线程,并把其余 Go 协程都移动到这个新的 OS 线程。
  • Go 协程使用信道(Channel)来进行通信。信道用于防止多个协程访问共享内存时发生竞态条件(Race Condition)。信道可以看作是 Go 协程之间通信的管道。

2.2. 声明

调用函数或者方法时,在前面加上关键字 go,可以让一个新的 Go 协程并发地运行。 协程主要特性:

  • 启动一个新的协程时,协程的调用会立即返回。与函数不同,程序控制不会去等待 Go 协程执行完毕。在调用 Go 协程之后,程序控制会立即返回到代码的下一行,忽略该协程的任何返回值。
  • 如果希望运行其他 Go 协程,Go 主协程必须继续运行着。如果 Go 主协程终止,则程序终止,于是其他 Go 协程也不会继续运行。
1
2
3
4
5
6
7
func hello() {
fmt.Println("Hello world goroutine")
}
func main() { // 主协程(Main Goroutine)
go hello() // 启动一个协程,此时,hello() 函数与 main() 函数会并发地执行
fmt.Println("main function") // 此时只会输出 main function
}

可以在 Go 主协程中使用休眠(Sleep),以便等待其他协程执行完毕。可以通过休眠时限顺序输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
"time"
)

func numbers() {
for i := 1; i <= 5; i++ {
time.Sleep(250 * time.Millisecond)
fmt.Printf("%d ", i)
}
}
func alphabets() {
for i := 'a'; i <= 'e'; i++ {
time.Sleep(400 * time.Millisecond)
fmt.Printf("%c ", i)
}
}
func main() {
go numbers()
go alphabets()
time.Sleep(3000 * time.Millisecond)
fmt.Println("main terminated")
}

运行结果:1 a 2 3 b 4 c 5 d e main terminated

3. 信道

3.1. 概念

信道可以想像成 Go 协程之间通信的管道。信道只能运输一个类型的数据,

3.2. 声明

chan T 表示 T 类型的信道。

  • 信道的零值为 nil
  • 用 make 来定义信道
  • 发送与接收默认是阻塞的,当把数据发送到信道时,直到有其它 Go 协程从信道读取到数据,才会解除阻塞。反之亦然。如果只有输出或输入,则会触发报错,形成死锁
    1
    2
    3
    a := make(chan int)
    data := <- a // 读取信道 a
    a <- data // 写入信道 a

3.3. 一些例子

信道的例子:计算一个数中每一位的平方和与立方和,然后把平方和与立方和相加并打印出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 在一个单独的 Go 协程计算平方和,而在另一个协程计算立方和,最后在 Go 主协程把平方和与立方和相加
package main

import (
"fmt"
)

func calcSquares(number int, squareop chan int) { // 计算平方和
sum := 0
for number != 0 {
digit := number % 10
sum += digit * digit
number /= 10
}
squareop <- sum
}

func calcCubes(number int, cubeop chan int) { // 计算立方和
sum := 0
for number != 0 {
digit := number % 10
sum += digit * digit * digit
number /= 10
}
cubeop <- sum
}

func main() {
number := 589
sqrch := make(chan int)
cubech := make(chan int)
go calcSquares(number, sqrch)
go calcCubes(number, cubech)
squares, cubes := <-sqrch, <-cubech // 从两个信道获取值
fmt.Println("Final output", squares + cubes) // 打印求和,Final output 1536
}

3.4. 单向信道

单向信道,这种信道只能发送或者接收数据。

  • 唯送(Send Only)信道:只能发送数据,语法 chan<- T
  • 唯收(Receive Only)信道:只能接收数据,语法 <-chan T

可以把双向信道转换成单向信道,但是不能反过来。

1
2
3
4
5
6
7
8
9
10
11
12
13
func sendData(sendch chan<- int) {  // 唯送信道
sendch <- 10
}

func main() {
cha1 := make(chan int) // 双向信道
go sendData(cha1)
fmt.Println(<-cha1) // 10

sendch := make(chan<- int)
go sendData(sendch)
fmt.Println(<-sendch) // 唯送信道接收数据,报错
}

3.5. 关闭和遍历信道

数据发送方可以关闭信道,通知接收方这个信道不再有数据发送过来。当从信道接收数据时,接收方可以多用一个变量来检查信道是否已经关闭。

  • v, ok := <-ch:使用第二个参数来判断信道是否关闭。ok 等于 true,成功接收数据;ok 等于 false,信道关闭,读到的值会是该信道类型的零值
  • for range 循环用于在一个信道关闭之前,从信道接收数据。其会从信道接收数据,直达信道关闭,循环会自动结束
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    func producer(chnl chan int) {  
    for i := 0; i < 10; i++ {
    chnl <- i
    }
    close(chnl) // 关闭信道
    }

    func main() {
    ch1 := make(chan int)
    go producer(ch1)
    for {
    v, ok := <-ch1
    if ok == false { // 如果 ok 等于 false,说明信道已经关闭,于是退出 for 循环
    break
    }
    fmt.Println("Received ", v, ok)
    }

    ch2 := make(chan int)
    go producer(ch2)
    for v := range ch2 {
    fmt.Println("Received ",v) // 一旦关闭了信道 ch2 ,循环会自动结束
    }
    }

4. 缓冲信道

4.1. 概念

在缓冲已满时,才会阻塞向缓冲信道(Buffered Channel)发送数据。同样,只有在缓冲为空的时候,才会阻塞从缓冲信道接收数据。
语法:无缓冲信道的容量默认为 0

1
ch := make(chan type, capacity)

举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
"fmt"
"time"
)

func write(ch chan int) {
for i := 0; i < 5; i++ {
ch <- i
fmt.Println("successfully wrote", i, "to ch") // 写入 2 个元素时,阻塞
}
close(ch)
}
func main() {
ch := make(chan int, 2) // 容量为 2 的缓冲信道
go write(ch) // 缓冲信道传递给了 write 协程
time.Sleep(2 * time.Second) // 主协程休眠 2 秒,此时 write 协程在并发运行
for v := range ch {
fmt.Println("read value", v,"from ch")
time.Sleep(2 * time.Second)
}
}
// 打印结果
successfully wrote 0 to ch
successfully wrote 1 to ch
read value 0 from ch
successfully wrote 2 to ch
read value 1 from ch
successfully wrote 3 to ch
read value 2 from ch
successfully wrote 4 to ch
read value 3 from ch
read value 4 from ch

4.2. 缓冲信道长度和容量

  • 容量:指信道可以存储的值的数量
  • 长度:指信道中当前排队的元素个数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    func main() {  
    ch := make(chan string, 3)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println("capacity is", cap(ch)) // capacity is 3
    fmt.Println("length is", len(ch)) // length is 2
    fmt.Println("read value", <-ch) // read value naveen
    fmt.Println("new length is", len(ch)) // new length is 1
    }

5. 工作池

5.1. WaitGroup

WaitGroup 用于实现工作池,其用于等待一批 Go 协程执行结束。程序控制会一直阻塞,直到这些协程全部执行完毕。WaitGroup 使用计数器来工作。

  • Add(int):计数器会加上 Add 的传参
  • Done():计数器减少1
  • Wait():会阻塞调用它的 Go 协程,直到计数器变为 0 后才会停止阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
"sync"
"time"
)

func process(i int, wg *sync.WaitGroup) {
fmt.Println("started Goroutine ", i) // Go 协程的执行顺序不一定,因此输出数字有多种可能
time.Sleep(2 * time.Second)
fmt.Printf("Goroutine %d ended\n", i)
wg.Done()
}

func main() {
no := 3
var wg sync.WaitGroup // 创建变量,初始值为零值
for i := 0; i < no; i++ {
wg.Add(1) // 计数器变为 3
go process(i, &wg) // 创建了 3 个协程,此处必须使用 wg 地址,否则每个 Go 协程将会得到一个 WaitGroup 值的拷贝
}
wg.Wait() // 主协程等待计数器变为 0
fmt.Println("All go routines finished executing")
}

5.2. 工作池

工作池就是一组等待任务分配的线程。一旦完成了所分配的任务,这些线程可继续等待任务的分配。
举例,设计一个工作池,任务是计算所输入数字的每一位的和。例如,如果输入 234,结果会是 9(即 2 + 3 + 4)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)

func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}