golang 里面的超时取消

之前读书 《Go 语言并发之道》 里面有讲到 context 的超时取消,最近在工作上也遇到了一个问题和超时取消有关系的。

参考文档

深有同感

公众号那篇文章的有一句话,菜鸡深有同感:我记得我第一次接触context时,同事都说这个用来做并发控制的,可以设置超时时间,超时就会取消往下执行,快速返回,我就单纯的认为只要函数中带着context参数往下传递就可以做到超时取消,快速返回。
我曾经也是这样认为的,哈哈哈。

kafka write 超时

今年从 devops 转 backend 后端开发,遇到一个 kafka write 超时的问题。

1
err = w.writer.Save(castedEvent)

这里调用 kafka team 的 client 写入, 但是 client 的 timeout 是 30 s , 重试 3 次,再加上 broker 比较多,等待的时间就会更久。
Net.[Dial|Read]Timeout = 30s
Metadata.Retry.Max=3
Metadata.Retry.Backoff=250ms
for example: a producer with 1 broker Addr and 6 partitions (intotal 6+1 brokers considered in the library) the processing time is abt 7-8 min for the first msg to return error msg
Note:

  • when there are more brokers and partitions, the processing time of the message will be multiplied
  • if the timeout of the rest api is shorter than the producer processing time, there might not be any success/error msg returned and displayed on datadog

交代完问题的背景,我就开始思考如何才能提前取消,而不是等待 七八分钟。

single gorutine

因为工作上那段代码逻辑只有一个 gorutine, 我就尽量想用一个简化版的例子实现超时取消。
《Go 语言并发之道》 关于 context 超时取消给我印象最深刻的一句话: goroutine 中任何阻塞操作都必须是可抢占的 ,以便它可以被取消。
我就在思考,kafka 在写入的时候,在哪一步可以被抢占呢? 我猜测应该是尝试写之后,如果失败了,就 backoff 在重试,这一段贤者时间,可以被抢占。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
"context"
"log"
"math/rand"
"time"
)

func kafkaWrite() bool {
i := rand.Intn(2)
log.Println(i)
if i == 0 {
return true
} else {
return false
}
}

func tryWrite(ctx context.Context) error {
for {
result := kafkaWrite()
if result {
return nil
}
select {
case <-ctx.Done():
log.Println("timeout exit")
return ctx.Err()

default:
time.Sleep(1 * time.Second)
}
}
}

func main() {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
err := tryWrite(ctx)
if err != nil {
log.Println(err)
} else {
log.Println("Success to process in time")
}

}

这里我们 granular 很细了,也用到了 context。 但是实际上我们在调用 kafka SDK 的时候,并不能把 context 传进去。

multiple gorutine

在咨询大佬 jiahao 后,他给出了另外一种方法,多用一个 channel,监听 write 是否完成。而且也不需要传 context 到 kafka write 代码里面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package main

import (
"context"
"errors"
"fmt"
"log"
"math/rand"
"runtime"
"time"
)

func kafkaWrite() error {
i := rand.Intn(2)
log.Println(i)
if i == 0 {
return nil
} else {
for i := 0; i < 10; i++ {
log.Println("sleeping", i)
time.Sleep(1 * time.Second)
}
return errors.New("write error")
}

}

func tryWrite(ctx context.Context) error {
jobDone := make(chan struct{})
go func() {
err := kafkaWrite()
if err != nil {
log.Println("failed to write")
} else {
//jobDone <- struct{}{}
close(jobDone)
}
}()

select {
case <-ctx.Done():
log.Println("timeout exit")
return ctx.Err()
case <-jobDone:
log.Println("job Done")
return nil
}

}

func main() {
go func() {
for i := 0; i < 15; i++ {
fmt.Printf("gorutine number is: %v\n", runtime.NumGoroutine())
time.Sleep(1 * time.Second)
}
}()
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
err := tryWrite(ctx)
if err != nil {
log.Println(err)
} else {
log.Println("Success to process in time")
}

time.Sleep(10 * time.Second)
}

这个确实能提前取消,但那个 gorutine 会一直运行,直到 timeout 才消亡。 我们在这里专门用一个 gorutine 去监控当前 gorutine 数量 runtime.NumGoroutine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
gorutine number is: 2
2022/05/17 20:41:15 1
2022/05/17 20:41:15 sleeping 0
gorutine number is: 3
2022/05/17 20:41:16 sleeping 1
2022/05/17 20:41:17 sleeping 2
gorutine number is: 3
2022/05/17 20:41:18 timeout exit
2022/05/17 20:41:18 context deadline exceeded
gorutine number is: 3
2022/05/17 20:41:18 sleeping 3
gorutine number is: 3
2022/05/17 20:41:19 sleeping 4
gorutine number is: 3
2022/05/17 20:41:20 sleeping 5
gorutine number is: 3
2022/05/17 20:41:21 sleeping 6
gorutine number is: 3
2022/05/17 20:41:22 sleeping 7
gorutine number is: 3
2022/05/17 20:41:23 sleeping 8
2022/05/17 20:41:24 sleeping 9
gorutine number is: 3
2022/05/17 20:41:25 failed to write
gorutine number is: 2
gorutine number is: 2
gorutine number is: 2