Go语言详解 第7章. 并发

并发介绍

进程和线程

  • 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。

  • 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。

  • 一个进程可以创建和撤销多个线程;同一个进程中的多个线程之间可以并发执行。

并发和并行

  • 多线程程序在一个核的cpu上运行,就是并发。

  • 多线程程序在多个核的cpu上运行,就是并行。

并发

并行

协程和线程

协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的。

线程:一个线程上可以跑多个协程,协程是轻量级的线程。

goroutine 只是由官方实现的超级”线程池”。

每个实力4~5KB的栈内存占用和由于实现机制而大幅减少的创建和销毁开销是go高并发的根本原因。

并发不是并行:

并发主要由切换时间片来实现”同时”运行,并行则是直接利用多核实现多线程的运行,go可以设置使用核数,以发挥多核计算机的能力。

goroutine 奉行通过通信来共享内存,而不是共享内存来通信。

Goroutine

Golang 在语言层面对并发编程提供支持,一种类似协程,称作 goroutine 的机制。

只需在函数调用语句前添加 go 关键字,就可创建并发执行单元。开发人员无需了解任何执行细节,调度器会自动将其安排到合适的系统线程上执行。goroutine 是一种非常轻量级的实现,可在单个进程里执行成千上万的并发任务。

事实上,入口函数 main 就以 goroutine 运行。另有与之配套的 channel 类型,用以实现 “以通讯来共享内存” 的 CSP 模式。

  1. package main
  2. import "time"
  3. func main() {
  4. go func() {
  5. println("Hello, World!")
  6. }()
  7. time.Sleep(1 * time.Second)
  8. }

输出结果:

Hello, World!

调度器不能保证多个 goroutine 执行次序,且进程退出时不会等待它们结束。

默认情况下,进程启动后仅允许一个系统线程服务于 goroutine。可使用环境变量或标准库函数 runtime.GOMAXPROCS 修改,让调度器用多个线程实现多核并行,而不仅仅是并发。

  1. package main
  2. import (
  3. "math"
  4. "sync"
  5. )
  6. func sum(id int) {
  7. var x int64
  8. for i := 0; i < math.MaxUint32; i++ {
  9. x += int64(i)
  10. }
  11. println(id, x)
  12. }
  13. func main() {
  14. wg := new(sync.WaitGroup)
  15. wg.Add(2)
  16. for i := 0; i < 2; i++ {
  17. go func(id int) {
  18. defer wg.Done()
  19. sum(id)
  20. }(i)
  21. }
  22. wg.Wait()
  23. }

命令行输入:

go build main.go

time -p ./main

输出结果:

  1. 0 9223372030412324865
  2. 1 9223372030412324865
  3. real 1.92 // 程序开始到结束时间差 ( CPU 时间)
  4. user 3.80 // 用户态所使用 CPU 时间片 (多核累加)
  5. sys 0.01 // 内核态所使用 CPU 时间片

命令行输入:

GOMAXPROCS=8 time -p ./main

输出结果:

  1. 1 9223372030412324865
  2. 0 9223372030412324865
  3. real 1.89
  4. user 3.76 // 虽然总时间差不多,但由 2 个核并行,real 时间自然少了许多。
  5. sys 0.00

设置golang运行的cpu核数

单核执行如果for前面或者中间不延迟,主线程不会让出CPU,导致异步的线程无法执行,从而无法设置flag的值,从而出现死循环。

  1. package main
  2. import (
  3. "fmt"
  4. "runtime"
  5. )
  6. var (
  7. flag = false
  8. str string
  9. )
  10. func foo() {
  11. flag = true
  12. str = "setup complete!"
  13. }
  14. func main() {
  15. runtime.GOMAXPROCS(1)
  16. go foo()
  17. for {
  18. if flag {
  19. break
  20. }
  21. }
  22. fmt.Println(str)
  23. }

运行的cpu核数设置成2核 runtime.GOMAXPROCS(2)

  1. package main
  2. import (
  3. "fmt"
  4. "runtime"
  5. )
  6. var (
  7. flag = false
  8. str string
  9. )
  10. func foo() {
  11. flag = true
  12. str = "setup complete!"
  13. }
  14. func main() {
  15. runtime.GOMAXPROCS(2)
  16. go foo()
  17. for {
  18. if flag {
  19. break
  20. }
  21. }
  22. fmt.Println(str)
  23. }

输出结果:

setup complete!

调用 runtime.Goexit 将立即终止当前 goroutine 执行,调度器确保所有已注册 defer 延迟调用被执行。

  1. package main
  2. import (
  3. "runtime"
  4. "sync"
  5. )
  6. func main() {
  7. wg := new(sync.WaitGroup)
  8. wg.Add(1)
  9. go func() {
  10. defer wg.Done()
  11. defer println("A.defer")
  12. func() {
  13. defer println("B.defer")
  14. runtime.Goexit() // 终止当前 goroutine
  15. println("B") // 不会执行
  16. }()
  17. println("A") // 不会执行
  18. }()
  19. wg.Wait()
  20. }

输出结果:

B.defer

A.defer

和协程 yield 作用类似,Gosched 让出底层线程,将当前 goroutine 暂停,放回队列等待下次被调度执行。

  1. package main
  2. import (
  3. "runtime"
  4. "sync"
  5. )
  6. func main() {
  7. wg := new(sync.WaitGroup)
  8. wg.Add(1)
  9. go func() {
  10. for i := 0; i < 6; i++ {
  11. println(i)
  12. runtime.Gosched()
  13. }
  14. defer wg.Done()
  15. }()
  16. for i := 0; i < 6; i++ {
  17. wg.Add(1)
  18. go func() {
  19. defer wg.Done()
  20. println("Hello, World!")
  21. }()
  22. }
  23. wg.Wait()
  24. }

输出结果:

  1. Hello, World!
  2. Hello, World!
  3. 0
  4. 1
  5. Hello, World!
  6. Hello, World!
  7. Hello, World!
  8. 2
  9. 3
  10. 4
  11. 5
  12. Hello, World!

每次输出结果都不一样

runtime.Gosched()用于让出CPU时间片。这就像跑接力赛,A跑了一会碰到代码runtime.Gosched()就把接力棒交给B了,A歇着了,B继续跑。

goroutine中使用recover

应用场景,如果某个goroutine panic了,而且这个goroutine里面没有捕获(recover),那么整个进程就会挂掉。所以,好的习惯是每当go产生一个goroutine,就需要写下recover。

  1. package main
  2. import (
  3. "fmt"
  4. // "runtime"
  5. "time"
  6. )
  7. func test() {
  8. defer func() {
  9. if err := recover(); err != nil {
  10. fmt.Println("panic:", err)
  11. }
  12. }()
  13. var m map[string]int
  14. m["stu"] = 100
  15. }
  16. func calc() {
  17. for {
  18. fmt.Println("i'm calc")
  19. time.Sleep(time.Second)
  20. }
  21. }
  22. func main() {
  23. go test()
  24. for i := 0; i < 2; i++ {
  25. go calc()
  26. }
  27. time.Sleep(time.Second * 10)
  28. }

输出结果:

  1. i'm calc
  2. i'm calc
  3. panic: assignment to entry in nil map
  4. i'm calc
  5. i'm calc
  6. i'm calc
  7. i'm calc
  8. i'm calc
  9. i'm calc
  10. i'm calc
  11. i'm calc
  12. i'm calc
  13. i'm calc
  14. i'm calc
  15. i'm calc
  16. i'm calc
  17. i'm calc
  18. i'm calc
  19. i'm calc
  20. i'm calc
  21. i'm calc

Chan

channel 是 CSP 模式的具体实现,用于多个 goroutine 通讯。

其内部实现了同步,确保并发安全。多个goroutine同时访问,不需要加锁。

由于管道容量是5,开启go写入10个数据,再写入5个数据,会阻塞,然而read每秒会读取一个,然后在会写入一个数据。

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func write(ch chan int) {
  7. for i := 0; i < 10; i++ {
  8. ch <- i
  9. fmt.Println("put data:", i)
  10. }
  11. }
  12. func read(ch chan int) {
  13. for {
  14. var b int
  15. b = <-ch
  16. fmt.Println(b)
  17. time.Sleep(time.Second)
  18. }
  19. }
  20. func main() {
  21. intChan := make(chan int, 5)
  22. go write(intChan)
  23. go read(intChan)
  24. time.Sleep(10 * time.Second)
  25. }

输出结果:

  1. put data: 0
  2. put data: 1
  3. put data: 2
  4. put data: 3
  5. put data: 4
  6. put data: 5
  7. 0
  8. 1
  9. put data: 6
  10. 2
  11. put data: 7
  12. 3
  13. put data: 8
  14. 4
  15. put data: 9
  16. 5
  17. 6
  18. 7
  19. 8
  20. 9

默认为同步模式,需要发送和接收配对。否则会被阻塞,直到另一方准备好后被唤醒。

  1. package main
  2. import "fmt"
  3. func main() {
  4. data := make(chan int) // 数据交换队列
  5. exit := make(chan bool) // 退出通知
  6. go func() {
  7. for d := range data {
  8. // 从队列迭代接收数据,直到 close 。
  9. fmt.Println(d)
  10. }
  11. fmt.Println("recv over.")
  12. exit <- true // 发出退出通知。
  13. }()
  14. data <- 1 // 发送数据。
  15. data <- 2
  16. data <- 3
  17. close(data) // 关闭队列。
  18. fmt.Println("send over.")
  19. <-exit // 等待退出通知。
  20. }

输出结果:

  1. 1
  2. 2
  3. send over.
  4. 3
  5. recv over.

异步方式通过判断缓冲区来决定是否阻塞。如果缓冲区已满,发送被阻塞;缓冲区为空,接收被阻塞。

通常情况下,异步 channel 可减少排队阻塞,具备更高的效率。但应该考虑使用指针规避大对象拷贝,将多个元素打包,减小缓冲区大小等。

  1. package main
  2. import (
  3. "fmt"
  4. )
  5. func main() {
  6. data := make(chan int, 3) // 缓冲区可以存储 3 个元素
  7. exit := make(chan bool)
  8. data <- 1 // 在缓冲区未满前,不会阻塞。
  9. data <- 2
  10. data <- 3
  11. go func() {
  12. for d := range data {
  13. // 在缓冲区未空前,不会阻塞。
  14. fmt.Println(d)
  15. }
  16. exit <- true
  17. }()
  18. data <- 4 // 如果缓冲区已满,阻塞。
  19. data <- 5
  20. close(data)
  21. <-exit
  22. }

输出结果:

  1. 1
  2. 2
  3. 3
  4. 4
  5. 5

channel选择

如果需要同时处理多个 channel,可使用 select 语句。它随机选择一个可用 channel 做收发操作,或执行 default case。

  1. package main
  2. import (
  3. "fmt"
  4. "os"
  5. )
  6. func main() {
  7. a, b := make(chan int, 3), make(chan int)
  8. go func() {
  9. v, ok, s := 0, false, ""
  10. for {
  11. select {
  12. case v, ok = <-a:
  13. s = "a"
  14. case v, ok = <-b:
  15. s = "b"
  16. }
  17. if ok {
  18. fmt.Println(s, v)
  19. } else {
  20. os.Exit(0)
  21. }
  22. }
  23. }()
  24. for i := 0; i < 5; i++ {
  25. select { // 随机选择可 channel,接收数据。
  26. case a <- i:
  27. case b <- i:
  28. }
  29. }
  30. close(a)
  31. select {} // 没有可用 channel,阻塞 main goroutine。
  32. }

输出结果:

  1. // 每次运行输出结果都不同
  2. b 3
  3. a 0
  4. a 1
  5. a 2
  6. b 4

在循环中使用 select default case 需要小心,避免形成洪水。

模式 :用简单工厂模式打包并发任务和 channel。

  1. package main
  2. import (
  3. "math/rand"
  4. "time"
  5. )
  6. func NewTest() chan int {
  7. c := make(chan int)
  8. rand.Seed(time.Now().UnixNano())
  9. go func() {
  10. time.Sleep(time.Second)
  11. c <- rand.Int()
  12. }()
  13. return c
  14. }
  15. func main() {
  16. t := NewTest()
  17. println(<-t) // 等待 goroutine 结束返回。
  18. }

用 channel 实现信号量 (semaphore)。

  1. package main
  2. import (
  3. "fmt"
  4. "runtime"
  5. "sync"
  6. )
  7. func main() {
  8. runtime.GOMAXPROCS(2)
  9. wg := sync.WaitGroup{}
  10. wg.Add(3)
  11. sem := make(chan int, 1)
  12. for i := 0; i < 3; i++ {
  13. go func(id int) {
  14. defer wg.Done()
  15. sem <- 1 // 向 sem 发送数据,阻塞或者成功。
  16. for x := 0; x < 3; x++ {
  17. fmt.Println(id, x)
  18. }
  19. <-sem // 接收数据,使得其他阻塞 goroutine 可以发送数据。
  20. }(i)
  21. }
  22. wg.Wait()
  23. }

输出结果:

  1. // 每次运行输出结果都不同
  2. 0 0
  3. 0 1
  4. 0 2
  5. 2 0
  6. 2 1
  7. 2 2
  8. 1 0
  9. 1 1
  10. 1 2

用 closed channel 发出退出通知。

  1. package main
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. func main() {
  7. var wg sync.WaitGroup
  8. quit := make(chan bool)
  9. for i := 0; i < 2; i++ {
  10. wg.Add(1)
  11. go func(id int) {
  12. defer wg.Done()
  13. task := func() {
  14. println(id, time.Now().Nanosecond())
  15. time.Sleep(time.Second)
  16. }
  17. for {
  18. select {
  19. case <-quit: // closed channel 不会阻塞,因此可用作退出通知。
  20. return
  21. default: // 执行正常任务。
  22. task()
  23. }
  24. }
  25. }(i)
  26. }
  27. time.Sleep(time.Second * 5) // 让测试 goroutine 运行一会。
  28. close(quit) // 发出退出通知。
  29. wg.Wait()
  30. }

用 select 实现超时 (timeout)。

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. w := make(chan bool)
  8. c := make(chan int, 2)
  9. go func() {
  10. select {
  11. case v := <-c:
  12. fmt.Println(v)
  13. case <-time.After(time.Second * 3):
  14. fmt.Println("timeout.")
  15. }
  16. w <- true
  17. }()
  18. // c <- 1 // 注释掉,引发 timeout。
  19. <-w
  20. }

WaitGroup

WaitGroup在go语言中,用于线程同步,单从字面意思理解,wait等待的意思,group组、团队的意思,WaitGroup就是指等待一组,等待一个系列执行完成后才会继续向下执行。

先说说WaitGroup的用途:它能够一直等到所有的goroutine执行完成,并且阻塞主线程的执行,直到所有的goroutine执行完成。

WaitGroup总共有三个方法:Add(delta int),Done(),Wait()。简单的说一下这三个方法的作用。

  • Add:添加或者减少等待goroutine的数量

  • Done:相当于Add(-1)

  • Wait:执行阻塞,直到所有的WaitGroup数量变成0

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. func main() {
  8. wg := sync.WaitGroup{}
  9. for i := 0; i < 10; i++ {
  10. wg.Add(1)
  11. go calc(&wg, i)
  12. }
  13. wg.Wait()
  14. fmt.Println("all goroutine finish")
  15. }
  16. func calc(w *sync.WaitGroup, i int) {
  17. fmt.Println("calc:", i)
  18. time.Sleep(time.Second)
  19. w.Done()
  20. }

运行结果:

  1. calc: 1
  2. calc: 5
  3. calc: 7
  4. calc: 8
  5. calc: 2
  6. calc: 4
  7. calc: 6
  8. calc: 0
  9. calc: 3
  10. calc: 9
  11. all goroutine finish

Context

在Go中http包的Server中,每一个请求在都有一个对应的goroutine去处理。请求处理函数通常会启动额外的goroutine用来访问后端服务,比如数据库和RPC服务。用来处理一个请求的goroutine通常需要访问一些与请求特定的数据,比如终端用户的身份认证信息,验证相关的令牌,请求的截止时间。然后系统才能释放这些goroutine占用的资源。

在Google内部,开发了Context包,专门用来简化对于处理单个请求的多个goroutine之间与请求域的数据,取消信号,截止时间等相关操作,这些操作可能涉及多个API调用。

go get golang.org/x/net/context命令获取这个包。

注意: 使用时遵循context规则

  1. 不要将 Context 放入结构体,Context应该作为第一个参数传 入,命名为ctx。
  2. 即使函数允许,也不要传入nil的 Context。如果不知道用哪种 Context,可以使用context.TODO()。
  3. 使用context的Value相关方法,只应该用于在程序和接口中传递和请求相关数据,不能用它来传递一些可选的参数
  4. 相同的 Context 可以传递给在不同的goroutine;Context 是并发安全的。

Context结构

  1. type Context interface {
  2. Deadline() (deadline time.Time, ok bool)
  3. Done() <-chan struct{}
  4. Err() error
  5. Value(key interface{}) interface{}
  6. }
  7. Deadline() 返回一个time.Time,是当前 Context 的应该结束的时间,ok 表示是否有 deadline
  8. Done() 返回一个struct{}类型的只读 channel
  9. Err() 返回 Context 被取消时的错误
  10. Value(key interface{}) Context 自带的 K-V 存储功能
  11. // Deadline会返回一个超时时间,Goroutine获得了超时时间后,例如可以对某些io操作设定超时时间。
  12. // Done方法返回一个信道(channel),当Context被撤销或过期时,该信道是关闭的,即它是一个表示Context是否已关闭的信号。
  13. // 当Done信道关闭后,Err方法表明Context被撤的原因。
  14. // Value可以让Goroutine共享一些数据,当然获得数据是协程安全的。但使用这些数据的时候要注意同步,比如返回了一个map,而这个map的读写则要加锁。

网络请求超时控制

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "io/ioutil"
  6. "net/http"
  7. "time"
  8. )
  9. type Result struct {
  10. r *http.Response
  11. err error
  12. }
  13. func process() {
  14. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  15. //释放资源
  16. defer cancel()
  17. tr := &http.Transport{}
  18. client := &http.Client{Transport: tr}
  19. resultChan := make(chan Result, 1)
  20. //发起请求
  21. req, err := http.NewRequest("GET", "http://www.baidu.com", nil)
  22. // req, err := http.NewRequest("GET", "http://www.google.com", nil)
  23. if err != nil {
  24. fmt.Println("http request failed, err:", err)
  25. return
  26. }
  27. /*
  28. func (c *Client) Do(req *Request) (*Response, error)
  29. */
  30. go func() {
  31. resp, err := client.Do(req)
  32. pack := Result{r: resp, err: err}
  33. //将返回信息写入管道(正确或者错误的)
  34. resultChan <- pack
  35. }()
  36. select {
  37. case <-ctx.Done():
  38. tr.CancelRequest(req)
  39. er := <-resultChan
  40. fmt.Println("Timeout!", er.err)
  41. case res := <-resultChan:
  42. defer res.r.Body.Close()
  43. out, _ := ioutil.ReadAll(res.r.Body)
  44. fmt.Printf("Server Response: %s", out)
  45. }
  46. return
  47. }
  48. func main() {
  49. process()
  50. }

如果修改下代码:

req, err := http.NewRequest(“GET”, “http://google.com“, nil)

请求超时,输出日志信息如下:

Timeout! Get http://www.google.com: net/http: request canceled while waiting for connection

WithValue 传递元数据:

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. )
  6. func process(ctx context.Context) {
  7. ret,ok := ctx.Value("trace_id").(int)
  8. if !ok {
  9. ret = 21342423
  10. }
  11. fmt.Printf("ret:%d\n", ret)
  12. s , _ := ctx.Value("session").(string)
  13. fmt.Printf("session:%s\n", s)
  14. }
  15. func main() {
  16. ctx := context.WithValue(context.Background(), "trace_id", 13483434)
  17. ctx = context.WithValue(ctx, "session", "sdlkfjkaslfsalfsafjalskfj")
  18. process(ctx)
  19. }

输出结果:

ret:13483434

session:sdlkfjkaslfsalfsafjalskfj

通过Context我们也可以传递一些必须的元数据,这些数据会附加在Context上以供使用。

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. )
  7. var key string = "name"
  8. func main() {
  9. ctx, cancel := context.WithCancel(context.Background())
  10. //附加值
  11. valueCtx := context.WithValue(ctx, key, "【监控1】")
  12. go watch(valueCtx)
  13. time.Sleep(10 * time.Second)
  14. fmt.Println("可以了,通知监控停止")
  15. cancel()
  16. //为了检测监控过是否停止,如果没有监控输出,就表示停止了
  17. time.Sleep(5 * time.Second)
  18. }
  19. func watch(ctx context.Context) {
  20. for {
  21. select {
  22. case <-ctx.Done():
  23. //取出值
  24. fmt.Println(ctx.Value(key), "监控退出,停止了...")
  25. return
  26. default:
  27. //取出值
  28. fmt.Println(ctx.Value(key), "goroutine监控中...")
  29. time.Sleep(2 * time.Second)
  30. }
  31. }
  32. }

输出结果:

  1. 【监控1 goroutine监控中...
  2. 【监控1 goroutine监控中...
  3. 【监控1 goroutine监控中...
  4. 【监控1 goroutine监控中...
  5. 【监控1 goroutine监控中...
  6. 可以了,通知监控停止
  7. 【监控1 监控退出,停止了...

超时控制 WithDeadline

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. )
  7. func main() {
  8. d := time.Now().Add(4 * time.Second)
  9. // d := time.Now().Add(2 * time.Second)
  10. ctx, cancel := context.WithDeadline(context.Background(), d)
  11. defer cancel()
  12. select {
  13. case <-time.After(3 * time.Second):
  14. fmt.Println("overslept")
  15. case <-ctx.Done():
  16. fmt.Println(ctx.Err())
  17. }
  18. }

输出结果:

overslept

如果将上面代码修改为:

d := time.Now().Add(2 * time.Second)

输出结果:

context deadline exceeded

WithCancel

我们来了解一个利用context结束goroutine的demo

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. )
  7. /*
  8. 创建一个管道chan,启动goroutine
  9. for循环存数据
  10. **/
  11. func gen(ctx context.Context) <-chan int {
  12. dst := make(chan int)
  13. n := 1
  14. go func() {
  15. for {
  16. select {
  17. case <-ctx.Done():
  18. //执行defer cancel操作后,就会执行到该select入库
  19. fmt.Println("i exited")
  20. return // returning not to leak the goroutine
  21. case dst <- n:
  22. n++
  23. }
  24. }
  25. }()
  26. return dst
  27. }
  28. func test() {
  29. ctx, cancel := context.WithCancel(context.Background())
  30. //当取数据n == 5时候,执行defer cancel操作
  31. defer cancel()
  32. intChan := gen(ctx)
  33. for n := range intChan {
  34. fmt.Println(n)
  35. if n == 5 {
  36. break
  37. }
  38. }
  39. }
  40. func main() {
  41. test()
  42. time.Sleep(time.Second * 5)
  43. }

输出结果:

  1. 1
  2. 2
  3. 3
  4. 4
  5. 5
  6. i exited
本文标签:并发 进程 线程 并行 协程 Goroutine Chan WaitGroup Context