这里调用 kafka team 的 client 写入, 但是 client 的 timeout 是 30 s , 重试 3 次,再加上 broker 比较多,等待的时间就会更久。 Net.[Dial|Read]Timeout = 30s Metadata.Retry.Max=3 Metadata.Retry.Backoff=250ms for example: a producer with 1 broker Addr and 6 partitions (intotal 6+1 brokers considered in the library) the processing time is abt 7-8 min for the first msg to return error msg Note:
when there are more brokers and partitions, the processing time of the message will be multiplied
if the timeout of the rest api is shorter than the producer processing time, there might not be any success/error msg returned and displayed on datadog
funckafkaWrite()bool { i := rand.Intn(2) log.Println(i) if i == 0 { returntrue } else { returnfalse } }
functryWrite(ctx context.Context)error { for { result := kafkaWrite() if result { returnnil } select { case <-ctx.Done(): log.Println("timeout exit") return ctx.Err()
default: time.Sleep(1 * time.Second) } } }
funcmain() { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() err := tryWrite(ctx) if err != nil { log.Println(err) } else { log.Println("Success to process in time") }
gorutine number is: 2 2022/05/17 20:41:15 1 2022/05/17 20:41:15 sleeping 0 gorutine number is: 3 2022/05/17 20:41:16 sleeping 1 2022/05/17 20:41:17 sleeping 2 gorutine number is: 3 2022/05/17 20:41:18 timeoutexit 2022/05/17 20:41:18 context deadline exceeded gorutine number is: 3 2022/05/17 20:41:18 sleeping 3 gorutine number is: 3 2022/05/17 20:41:19 sleeping 4 gorutine number is: 3 2022/05/17 20:41:20 sleeping 5 gorutine number is: 3 2022/05/17 20:41:21 sleeping 6 gorutine number is: 3 2022/05/17 20:41:22 sleeping 7 gorutine number is: 3 2022/05/17 20:41:23 sleeping 8 2022/05/17 20:41:24 sleeping 9 gorutine number is: 3 2022/05/17 20:41:25 failed to write gorutine number is: 2 gorutine number is: 2 gorutine number is: 2