本篇内容介绍了“怎么让Kafka达到最佳吞吐量”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
func main() { // 1. 初始化 pusher := kq.NewPusher([]string{ "127.0.0.1:19092", "127.0.0.1:19092", "127.0.0.1:19092", }, "kq") ticker := time.NewTicker(time.Millisecond) for round := 0; round < 3; round++ { select { case <-ticker.C: count := rand.Intn(100) m := message{ Key: strconv.FormatInt(time.Now().UnixNano(), 10), Value: fmt.Sprintf("%d,%d", round, count), Payload: fmt.Sprintf("%d,%d", round, count), } body, err := json.Marshal(m) if err != nil { log.Fatal(err) } fmt.Println(string(body)) // 2. 写入 if err := pusher.Push(string(body)); err != nil { log.Fatal(err) } } } }
将 kafka cluster
配置以及 topic
传入,你就得到一个操作 kafka
的 push operator
。
至于写入消息,简单的调用 pusher.Push(msg)
就行。是的,就这么简单!
> 当然,目前只支持单个 msg
写入。可能有人会疑惑,那就继续往下看,为什么只能一条一条写入?
一起看看 pusher
初始化哪些步骤:
NewPusher(clusterAddrs, topic, opts...) |- kafka.NewWriter(kfConfig) // 与 kf 之前的连接 |- executor = executors.NewChunkExecutor() // 设置内部写入的executor为字节数定量写入
建立与 kafka cluster
的连接。此处肯定就要传入 kafka config
;
设置内部暂存区的写入函数以及刷新规则。
使用 chunkExecutor
作用不言而喻:将随机写 -> 批量写,减少 I/O 消耗;同时保证单次写入不能超过默认的 1M
或者自己设定的最大写入字节数。
其实再往 chunkExecutor
内部看,其实每次触发插入有两个指标:
maxChunkSize
:单次最大写入字节数
flushInterval
:刷新暂存消息插入的间隔时间
在触发写入,只要满足任意一个指标都会执行写入。同时在 executors
都有设置插入间隔时间,以防暂存区写入阻塞而暂存区内消息一直不被刷新清空。
> 更多关于 executors
可以参看以下:https://zeromicro.github.io/go-zero/executors.html
根据上述初始化对 executors
介绍,插入过程中也少不了它的配合:
func (p *Pusher) Push(v string) error { // 1. 将 msg -> kafka 内部的 Message msg := kafka.Message{ Key: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)), Value: []byte(v), } // 使用 executor.Add() 插入内部的 container // 当 executor 初始化失败或者是内部发生错误,也会将 Message 直接插入 kafka if p.executor != nil { return p.executor.Add(msg, len(v)) } else { return p.produer.WriteMessages(context.Background(), msg) } }
过程其实很简单。那 executors.Add(msg, len(msg))
是怎么把 msg
插入到 kafka
呢?
插入的逻辑其实在初始化中就声明了:
pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) { chunk := make([]kafka.Message, len(tasks)) // 1 for i := range tasks { chunk[i] = tasks[i].(kafka.Message) } // 2 if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil { logx.Error(err) } }, newOptions(opts)...)
触发插入时,将暂存区中存储的 []msg
依次拿出,作为最终插入消息集合;
将上一步的消息集合,作为一个批次插入 kafka
的 topic
中
这样 pusher -> chunkExecutor -> kafka
一个链路就出现了。
“怎么让Kafka达到最佳吞吐量”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。