Golang中使用kafka
golang中比较好用的kafka client有
- sarama
- confluent-kafka-go
- go_kafka_client
- optiopay-kafka
- siesta
其中 sarama的使用者应该是最多的, 然后还有一个sarama的cluster版本 sarama-cluster
本文简单描述下sarama的一些简单使用
生产者接口
func producer_test() {fmt.Printf("producer_test\n")config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Partitioner = sarama.NewRandomPartitionerconfig.Producer.Return.Successes = trueconfig.Producer.Return.Errors = trueconfig.Version = sarama.V0_11_0_2producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)if err != nil {fmt.Printf("producer_test create producer error :%s\n", err.Error())return}defer producer.AsyncClose()// send messagemsg := &sarama.ProducerMessage{Topic: "kafka_go_test",Key: sarama.StringEncoder("go_test"),}value := "this is message"for {fmt.Scanln(&value)msg.Value = sarama.ByteEncoder(value)fmt.Printf("input [%s]\n", value)// send to chainproducer.Input() <- msgselect {case suc := <-producer.Successes():fmt.Printf("offset: %d, timestamp: %s", suc.Offset, suc.Timestamp.String())case fail := <-producer.Errors():fmt.Printf("err: %s\n", fail.Err.Error())}}
}
消费者接口
func consumer_test() {fmt.Printf("consumer_test")config := sarama.NewConfig()config.Consumer.Return.Errors = trueconfig.Version = sarama.V0_11_0_2// consumerconsumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)if err != nil {fmt.Printf("consumer_test create consumer error %s\n", err.Error())return}defer consumer.Close()partition_consumer, err := consumer.ConsumePartition("kafka_go_test", 0, sarama.OffsetOldest)if err != nil {fmt.Printf("try create partition_consumer error %s\n", err.Error())return}defer partition_consumer.Close()for {select {case msg := <-partition_consumer.Messages():fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s\n",msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))case err := <-partition_consumer.Errors():fmt.Printf("err :%s\n", err.Error())}}}
元数据接口
func metadata_test() {fmt.Printf("metadata test\n")config := sarama.NewConfig()config.Version = sarama.V0_11_0_2client, err := sarama.NewClient([]string{"localhost:9092"}, config)if err != nil {fmt.Printf("metadata_test try create client err :%s\n", err.Error())return}defer client.Close()// get topic settopics, err := client.Topics()if err != nil {fmt.Printf("try get topics err %s\n", err.Error())return}fmt.Printf("topics(%d):\n", len(topics))for _, topic := range topics {fmt.Println(topic)}// get broker setbrokers := client.Brokers()fmt.Printf("broker set(%d):\n", len(brokers))for _, broker := range brokers {fmt.Printf("%s\n", broker.Addr())}
}
转载于:https://www.cnblogs.com/596014054-yangdongsheng/p/10446828.html
Golang中使用kafka相关推荐
- Golang中Buffer高效拼接字符串以及自定义线程安全Buffer
本文原创文章,转载注明出处,博客地址 https://segmentfault.com/u/to... 第一时间看后续精彩文章.觉得好的话,顺手分享到朋友圈吧,感谢支持. Go中可以使用"+ ...
- 如何在golang中关闭bufio.reader_Golang 并发模型系列:1. 轻松入门流水线模型
Go语言中文网,致力于每日分享编码.开源等知识,欢迎关注我,会有意想不到的收获! Golang作为一个实用主义的编程语言,非常注重性能,在语言特性上天然支持并发,它有多种并发模型,通过流水线模型系列文 ...
- go语言的iota是什么意思_关于Golang中的iota
快速一览 iota是Golang中提供的一个简化常量和枚举编程的标识符,合理的使用这个标识符可以让代码变得更简洁,省去大量的不必要的代码. 比如下面的这个常量定义 const ( a = 1 b = ...
- Golang中的panic和recover(捕获异常)
Golang中的panic和recover(捕获异常) 参考文章: (1)Golang中的panic和recover(捕获异常) (2)https://www.cnblogs.com/zhzhlong ...
- golang 中string和int类型相互转换
总结了golang中字符串和各种int类型之间的相互转换方式: string转成int: test_int, err := strconv.Atoi(test_string) if err != ni ...
- golang中并发sync和channel
golang中并发sync和channel chenbaoke · 2014-12-08 13:00:01 · 19151 次点击 · 预计阅读时间 5 分钟 · 不到1分钟之前 开始浏览 这是一个创 ...
- golang中的sync.WaitGroup
golang中的sync.WaitGroup Posted on 2015/04/09刚才看golang的sync的包,看见一个很有用的功能.就是WaitGroup. 先说说WaitGroup的用途: ...
- 初步解读Golang中的接口相关编写方法
初步解读Golang中的接口相关编写方法 概述如果说goroutine和channel是Go并发的两大基石,那么接口是Go语言编程中数据类型的关键.在Go语言的实际编程中,几乎所有的数据结构都围绕接口 ...
- golang中utf8和汉字互转
golang中utf8和汉字互转 package mainimport ("fmt""strconv""strings" )func mai ...
最新文章
- python语音处理工具
- 爬虫python需要什么软件-Python爬虫需要学习那些东西?
- PHP中不用第三个变量交换两个变量的值
- win7xp双系统引导修复工具
- 【Chocolatey】安装python3
- angular5项目端口冲突之解决办法
- 问题:图片怎么保存到数据库, 以及怎么把图片从数据库中取出来使用?(已解决)...
- 2020美国纽约大学计算机科学排名,2020美国纽约大学排名第几
- wordpress home.php,WordPress主题通过function.php来加载js和css文件
- MySQL 后from多个表_MYSQL回顾(多表查询相关)
- 面试大厂应该注意哪些问题?隔壁都馋哭了
- shell输入输出重定向
- matlab中单独存图_matlab中仅保存plot部分(除去空白)和图像的叠加
- java根据种子生成固定值_java固定种子随机数预测
- 论文写作笔记4 期刊选择-医学计算机
- 企业微信开发----H5发送表单请求到企业微信内部审核
- eps在c语言,C语言中eps指的是什么东西?
- 计算机电子极域控制,极域电子教室的反控制实现【无需教师端】
- 成都托普计算机职业技术怎么样学校,成都中职学校前景怎么样
- 太阳的后裔--OST.3 This love这份爱