golang中比较好用的kafka client有

  • sarama
  • confluent-kafka-go
  • go_kafka_client
  • optiopay-kafka
  • siesta

其中 sarama的使用者应该是最多的, 然后还有一个sarama的cluster版本 sarama-cluster

本文简单描述下sarama的一些简单使用

生产者接口

func producer_test() {fmt.Printf("producer_test\n")config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Partitioner = sarama.NewRandomPartitionerconfig.Producer.Return.Successes = trueconfig.Producer.Return.Errors = trueconfig.Version = sarama.V0_11_0_2producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)if err != nil {fmt.Printf("producer_test create producer error :%s\n", err.Error())return}defer producer.AsyncClose()// send messagemsg := &sarama.ProducerMessage{Topic: "kafka_go_test",Key:   sarama.StringEncoder("go_test"),}value := "this is message"for {fmt.Scanln(&value)msg.Value = sarama.ByteEncoder(value)fmt.Printf("input [%s]\n", value)// send to chainproducer.Input() <- msgselect {case suc := <-producer.Successes():fmt.Printf("offset: %d,  timestamp: %s", suc.Offset, suc.Timestamp.String())case fail := <-producer.Errors():fmt.Printf("err: %s\n", fail.Err.Error())}}
}

消费者接口

func consumer_test() {fmt.Printf("consumer_test")config := sarama.NewConfig()config.Consumer.Return.Errors = trueconfig.Version = sarama.V0_11_0_2// consumerconsumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)if err != nil {fmt.Printf("consumer_test create consumer error %s\n", err.Error())return}defer consumer.Close()partition_consumer, err := consumer.ConsumePartition("kafka_go_test", 0, sarama.OffsetOldest)if err != nil {fmt.Printf("try create partition_consumer error %s\n", err.Error())return}defer partition_consumer.Close()for {select {case msg := <-partition_consumer.Messages():fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s\n",msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))case err := <-partition_consumer.Errors():fmt.Printf("err :%s\n", err.Error())}}}

元数据接口

func metadata_test() {fmt.Printf("metadata test\n")config := sarama.NewConfig()config.Version = sarama.V0_11_0_2client, err := sarama.NewClient([]string{"localhost:9092"}, config)if err != nil {fmt.Printf("metadata_test try create client err :%s\n", err.Error())return}defer client.Close()// get topic settopics, err := client.Topics()if err != nil {fmt.Printf("try get topics err %s\n", err.Error())return}fmt.Printf("topics(%d):\n", len(topics))for _, topic := range topics {fmt.Println(topic)}// get broker setbrokers := client.Brokers()fmt.Printf("broker set(%d):\n", len(brokers))for _, broker := range brokers {fmt.Printf("%s\n", broker.Addr())}
}

  

转载于:https://www.cnblogs.com/596014054-yangdongsheng/p/10446828.html

Golang中使用kafka相关推荐

  1. Golang中Buffer高效拼接字符串以及自定义线程安全Buffer

    本文原创文章,转载注明出处,博客地址 https://segmentfault.com/u/to... 第一时间看后续精彩文章.觉得好的话,顺手分享到朋友圈吧,感谢支持. Go中可以使用"+ ...

  2. 如何在golang中关闭bufio.reader_Golang 并发模型系列:1. 轻松入门流水线模型

    Go语言中文网,致力于每日分享编码.开源等知识,欢迎关注我,会有意想不到的收获! Golang作为一个实用主义的编程语言,非常注重性能,在语言特性上天然支持并发,它有多种并发模型,通过流水线模型系列文 ...

  3. go语言的iota是什么意思_关于Golang中的iota

    快速一览 iota是Golang中提供的一个简化常量和枚举编程的标识符,合理的使用这个标识符可以让代码变得更简洁,省去大量的不必要的代码. 比如下面的这个常量定义 const ( a = 1 b = ...

  4. Golang中的panic和recover(捕获异常)

    Golang中的panic和recover(捕获异常) 参考文章: (1)Golang中的panic和recover(捕获异常) (2)https://www.cnblogs.com/zhzhlong ...

  5. golang 中string和int类型相互转换

    总结了golang中字符串和各种int类型之间的相互转换方式: string转成int: test_int, err := strconv.Atoi(test_string) if err != ni ...

  6. golang中并发sync和channel

    golang中并发sync和channel chenbaoke · 2014-12-08 13:00:01 · 19151 次点击 · 预计阅读时间 5 分钟 · 不到1分钟之前 开始浏览 这是一个创 ...

  7. golang中的sync.WaitGroup

    golang中的sync.WaitGroup Posted on 2015/04/09刚才看golang的sync的包,看见一个很有用的功能.就是WaitGroup. 先说说WaitGroup的用途: ...

  8. 初步解读Golang中的接口相关编写方法

    初步解读Golang中的接口相关编写方法 概述如果说goroutine和channel是Go并发的两大基石,那么接口是Go语言编程中数据类型的关键.在Go语言的实际编程中,几乎所有的数据结构都围绕接口 ...

  9. golang中utf8和汉字互转

    golang中utf8和汉字互转 package mainimport ("fmt""strconv""strings" )func mai ...

最新文章

  1. python语音处理工具
  2. 爬虫python需要什么软件-Python爬虫需要学习那些东西?
  3. PHP中不用第三个变量交换两个变量的值
  4. win7xp双系统引导修复工具
  5. 【Chocolatey】安装python3
  6. angular5项目端口冲突之解决办法
  7. 问题:图片怎么保存到数据库, 以及怎么把图片从数据库中取出来使用?(已解决)...
  8. 2020美国纽约大学计算机科学排名,2020美国纽约大学排名第几
  9. wordpress home.php,WordPress主题通过function.php来加载js和css文件
  10. MySQL 后from多个表_MYSQL回顾(多表查询相关)
  11. 面试大厂应该注意哪些问题?隔壁都馋哭了
  12. shell输入输出重定向
  13. matlab中单独存图_matlab中仅保存plot部分(除去空白)和图像的叠加
  14. java根据种子生成固定值_java固定种子随机数预测
  15. 论文写作笔记4 期刊选择-医学计算机
  16. 企业微信开发----H5发送表单请求到企业微信内部审核
  17. eps在c语言,C语言中eps指的是什么东西?
  18. 计算机电子极域控制,极域电子教室的反控制实现【无需教师端】
  19. 成都托普计算机职业技术怎么样学校,成都中职学校前景怎么样
  20. 太阳的后裔--OST.3 This love这份爱

热门文章

  1. ASP.net 網站和Web Application的區別(轉)
  2. vue.js 前端开发常见问题
  3. ActiveMQ结合Spring收发消息
  4. 【转】ofbiz数据库表结构设计
  5. RPM方式安装MySQL5.6和windows下安装mysql解压版
  6. 页面排序(上下元素对换)
  7. 关系数据库的查询建表
  8. mysql多实例(mysqld_multi方式)
  9. android studio设置Tab为四空格缩进
  10. Android activity-alias 的使用