引言

网络上关于 go 实现 kafka 消息发送和接收的文章很多,但是实际操作起来又不是很清楚,本文在网络资源的基础上,结合自己搭建过程中遇到的问题进行了总结。

本文的实验主机:Mac笔记本。

一、核心概念

kafka是消息中间件的一种,是一种分布式流平台,是用于构建实时数据管道和流应用程序。具有横向扩展,容错,wicked fast(变态快)等优点。

kafka中涉及的名词:

  • 消息记录(record): 由一个key,一个value和一个时间戳构成,消息最终存储在主题下的分区中, 记录在生产者中称为生产者记录(ProducerRecord), 在消费者中称为消费者记录(ConsumerRecord),Kafka集群保持所有的消息,直到它们过期, 无论消息是否被消费了,在一个可配置的时间段内,Kafka集群保留所有发布的消息,不管这些消息有没有被消费。比如,如果消息的保存策略被设置为2天,那么在一个消息被发布的两天时间内,它都是可以被消费的。之后它将被丢弃以释放空间。Kafka的性能是和数据量无关的常量级的,所以保留太多的数据并不是问题。
  • 生产者(producer): 生产者用于发布(send)消息。
  • 消费者(consumer): 消费者用于订阅(subscribe)消息。
  • 消费者组(consumer group): 相同的group.id的消费者将视为同一个消费者组, 每个消费者都需要设置一个组id, 每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
  • 主题(topic): 消息的一种逻辑分组,用于对消息分门别类,每一类消息称之为一个主题,相同主题的消息放在一个队列中。
  • 分区(partition): 消息的一种物理分组, 一个主题被拆成多个分区,每一个分区就是一个顺序的、不可变的消息队列,并且可以持续添加,分区中的每个消息都被分配了一个唯一的id,称之为偏移量(offset),在每个分区中偏移量都是唯一的。每个分区对应一个逻辑log,有多个segment组成。
  • 偏移量(offset): 分区中的每个消息都一个一个唯一id,称之为偏移量,它代表已经消费的位置。可以自动或者手动提交偏移量(即自动或者手动控制一条消息是否已经被成功消费)。
  • 代理(broker): 一台kafka服务器称之为一个broker。
  • 副本(replica):副本只是一个分区(partition)的备份。 副本从不读取或写入数据。 它们用于防止数据丢失。
  • 领导者(leader):Leader 是负责给定分区的所有读取和写入的节点。 每个分区都有一个服务器充当Leader, producer 和 consumer 只跟 leader 交互。
  • 追随者(follower):跟随领导者指令的节点被称为Follower。 如果领导失败,一个追随者将自动成为新的领导者。 跟随者作为正常消费者,拉取消息并更新其自己的数据存储。replica 中的一个角色,从 leader 中复制数据。
  • zookeeper:Kafka代理是无状态的,所以他们使用ZooKeeper来维护它们的集群状态。ZooKeeper用于管理和协调Kafka代理。

kafka功能

  • 发布订阅:生产者(producer)生产消息(数据流), 将消息发送到到kafka指定的主题队列(topic)中,也可以发送到topic中的指定分区(partition)中,消费者(consumer)从kafka的指定队列中获取消息,然后来处理消息。
  • 流处理(Stream Process): 将输入topic转换数据流到输出topic。
  • 连接器(Connector) : 将数据从应用程序(源系统)中导入到kafka,或者从kafka导出数据到应用程序(宿主系统sink system), 例如:将文件中的数据导入到kafka,从kafka中将数据导出到文件中。

kafka中的消息模型

  • 队列:同名的消费者组员瓜分消息。
  • 发布订阅:广播消息给多个消费者组(不同名)。

生产者(producer)将消息记录(record)发送到kafka中的主题中(topic), 一个主题可以有多个分区(partition), 消息最终存储在分区中,消费者(consumer)最终从主题的分区中获取消息。

详细的过程可以参考本文的链接。

二、安装与启动

本文主要针对Mac系统进行的操作。

安装

brew install kafka

如果本机没有 zookeeper,在 kafka 的安装过程中,会自动安装 zookeeper。安装过程中可能会出现失败,原因可能是 Kafka 依赖 zookeeper,而 zookeeper 依赖 JDK。因此需要为电脑配置 JDK,我下载安装的是 JDK1.8,具体的安装过程参考:
https://blog.csdn.net/deliciousion/article/details/78046007
https://blog.csdn.net/Mrljdx/article/details/43412353

kafka的安装目录:/usr/local/Cellar/kafka
kafka的配置文件目录:/usr/local/etc/kafka
kafka服务的配置文件:/usr/local/etc/kafka/server.properties
zookeeper配置文件: /usr/local/etc/kafka/zookeeper.properties

#修改server.properties
vim /usr/local/etc/kafka/server.properties
#增加一行配置
listeners=PLAINTEXT://localhost:9092

启动 zookeeper

 # 新起一个终端启动zookeeperzkserver start

或者

cd  /usr/local/Cellar/kafka/2.1.0
# 新起一个终端启动zookeeper
./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

启动 kafka

cd  /usr/local/Cellar/kafka/2.1.0
# 新起一个终端启动zookeeper
./bin/kafka-server-start /usr/local/etc/kafka/server.properties

kafka 服务也可以很优雅的进行关闭,首先要把server配置文件添加如下项:

vim /usr/local/etc/kafka/server.properties
#添加一行
ontrolled.shutdown.enable=true

然后就可以通过bin目录下 zookeeper-server-stop.sh 关闭 kafka 服务了。

当 kafka 在启动过程中出现问题的时候,可以尝试采用以下的操作:

1、到 /usr/local/var/lib 目录下删除 kafka-logs 目录
2、重启 kafka

查看 zookeeper & kafka

当 zookeeper 和 kafka 完成启动后,可以在命令终端输入以下命令:

jps

可以看到如下内容,说明启动成功。

创建 topic

# 创建一个名为“test”的主题,该主题有1个分区
./bin/kafka-topics --create \--zookeeper localhost:2181 \--partitions 1  \--replication-factor 1 \--topic test

如果分区配置错误,可以进行下述操作进行删除:

# 删除分区
./bin/kafka-topics --create \--zookeeper localhost:2181 \--partitions 1  \--replication-factor 1 \--topic test \
--delete-config

删除 topic的时候,首先要把 server 配置文件添加如下项:

vim /usr/local/etc/kafka/server.properties
#添加一行
delete.topic.enable=true
#然后可以执行
./bin/kafka-topics  --delete --topic test

查看 topic

# 创建成功可以通过 list 列举所有的主题
./bin/kafka-topics --list --zookeeper localhost:2181
# 查看某个主题的信息
./bin/kafka-topics --describe --zookeeper localhost:2181 --topic <name>

生产消息(发送消息)

# 新起一个终端,作为生产者,用于发送消息,每一行算一条消息,将消息发送到kafka服务器
cd /usr/local/Cellar/kafka/2.1.0./bin/kafka-console-producer --broker-list localhost:9092 --topic test > This is a message

消费消息(接收消息)

# 新起一个终端作为消费者,接收消息
cd /usr/local/Cellar/kafka/2.1.0
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message

在生产者发送消息

在生产消息(发送消息)中新起的终端属于一条消息(任意字符),输入完回车就算一条消息,可以看到在步骤7中的消费者端就会显示刚才输入的消息。

三、Go 实现消息接收,发送

准备

  • 安装依赖库sarama
    go get github.com/Shopify/sarama
    该库要求kafka版本在0.8及以上,支持kafka定义的high-level API和low-level API,但不支持常用的consumer自动rebalance和offset追踪,所以一般得结合cluster版本使用。
  • sarama-cluster依赖库
    go get github.com/bsm/sarama-cluster
    需要kafka 0.9及以上版本。

生产者代码

producer.go

var Address = []string{"localhost:9092"}func main()  {syncProducer(Address)//SaramaProducer()
}//同步消息模式
func syncProducer(address []string)  {config := sarama.NewConfig()config.Producer.Return.Successes = trueconfig.Producer.Timeout = 5 * time.Secondp, err := sarama.NewSyncProducer(address, config)if err != nil {log.Printf("sarama.NewSyncProducer err, message=%s \n", err)return}defer p.Close()topic := "test"srcValue := "sync: this is a message. index=%d"for i:=0; i<10; i++ {value := fmt.Sprintf(srcValue, i)msg := &sarama.ProducerMessage{Topic:topic,Value:sarama.ByteEncoder(value),}part, offset, err := p.SendMessage(msg)if err != nil {log.Printf("send message(%s) err=%s \n", value, err)}else {fmt.Fprintf(os.Stdout, value + "发送成功,partition=%d, offset=%d \n", part, offset)}time.Sleep(2*time.Second)}
}func SaramaProducer()  {config := sarama.NewConfig()//等待服务器所有副本都保存成功后的响应config.Producer.RequiredAcks = sarama.WaitForAll//随机向partition发送消息config.Producer.Partitioner = sarama.NewRandomPartitioner//是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.config.Producer.Return.Successes = trueconfig.Producer.Return.Errors = true//设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置//注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息config.Version = sarama.V0_10_0_1fmt.Println("start make producer")//使用配置,新建一个异步生产者producer, e := sarama.NewAsyncProducer([]string{"182.61.9.153:6667","182.61.9.154:6667","182.61.9.155:6667"}, config)if e != nil {fmt.Println(e)return}defer producer.AsyncClose()//循环判断哪个通道发送过来数据.fmt.Println("start goroutine")go func(p sarama.AsyncProducer) {for{select {case  <-p.Successes()://fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)case fail := <-p.Errors():fmt.Println("err: ", fail.Err)}}}(producer)var value stringfor i:=0;;i++ {time.Sleep(500*time.Millisecond)time11:=time.Now()value = "this is a message 0606 "+time11.Format("15:04:05")// 发送的消息,主题。// 注意:这里的msg必须得是新构建的变量,不然你会发现发送过去的消息内容都是一样的,因为批次发送消息的关系。msg := &sarama.ProducerMessage{Topic: "0606_test",}//将字符串转化为字节数组msg.Value = sarama.ByteEncoder(value)//fmt.Println(value)//使用通道发送producer.Input() <- msg}
}

消费者代码

consumer.go

func main()  {topic := []string{"test"}var wg = &sync.WaitGroup{}wg.Add(2)//广播式消费:消费者1go clusterConsumer(wg, Address, topic, "group-1")//广播式消费:消费者2go clusterConsumer(wg, Address, topic, "group-2")wg.Wait()
}// 支持brokers cluster的消费者
func clusterConsumer(wg *sync.WaitGroup,brokers, topics []string, groupId string)  {defer wg.Done()config := cluster.NewConfig()config.Consumer.Return.Errors = trueconfig.Group.Return.Notifications = trueconfig.Consumer.Offsets.Initial = sarama.OffsetNewest// init consumerconsumer, err := cluster.NewConsumer(brokers, groupId, topics, config)if err != nil {log.Printf("%s: sarama.NewSyncProducer err, message=%s \n", groupId, err)return}defer consumer.Close()// trap SIGINT to trigger a shutdownsignals := make(chan os.Signal, 1)signal.Notify(signals, os.Interrupt)// consume errorsgo func() {for err := range consumer.Errors() {log.Printf("%s:Error: %s\n", groupId, err.Error())}}()// consume notificationsgo func() {for ntf := range consumer.Notifications() {log.Printf("%s:Rebalanced: %+v \n", groupId, ntf)}}()// consume messages, watch signalsvar successes int
Loop:for {select {case msg, ok := <-consumer.Messages():if ok {fmt.Fprintf(os.Stdout, "%s:%s/%d/%d\t%s\t%s\n", groupId, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)consumer.MarkOffset(msg, "")  // mark message as processedsuccesses++}case <-signals:break Loop}}fmt.Fprintf(os.Stdout, "%s consume %d messages \n", groupId, successes)
}

流程说明:

  • 启动 zookeeper
  • 启动 kafka
  • 创建 Topic
  • 开新终端,运行 go run producer.go
  • 开新终端,运行 go run consumer.go

注意 topic 的名字要与 producer.go 和 consumer.go 一致。

总结:上述操作经过实际验证,如果大家在操作过程中遇到问题,欢迎及时交流,共同进步。

四、参考文章

https://blog.csdn.net/tflasd1157/article/details/81985722
https://blog.frenlee.com/2017/05/kafka-demo-golang-implementation/

go 实现 kafka 消息发送、接收相关推荐

  1. 带你认识三种kafka消息发送模式

    摘要:在kafka-0.8.2之后,producer不再区分同步(sync)和异步方式(async),所有的请求以异步方式发送,这样提升了客户端效率. 本文分享自华为云社区<kafka消息发送模 ...

  2. akka 消息发送接收_Akka型演员:探索接收器模式

    akka 消息发送接收 在上一篇文章中,我们研究了Akka Typed提供的一些基本功能. 在本文和下一篇文章中,我们将更进一步地了解一些其他功能,并通过查看Akka Typed提供的两种不同模式来做 ...

  3. Golang实现Kafka消息发送、接收

    一:核心概念 kafka是消息中间件的一种,是一种分布式流平台,是用于构建实时数据管道和流应用程序.具有横向扩展,容错,wicked fast(变态快)等优点. kafka中涉及的名词: 消息记录(r ...

  4. Kafka入门教程 Golang实现Kafka消息发送、接收

    一:核心概念 kafka是消息中间件的一种,是一种分布式流平台,是用于构建实时数据管道和流应用程序.具有横向扩展,容错,wicked fast(变态快)等优点. kafka中涉及的名词: 消息记录(r ...

  5. kafka 消息发送和接收

    发送代码实例 public class KafkaProducerDemo extends Thread{private final KafkaProducer<Integer,String&g ...

  6. RabbitMQ入门学习系列(三).消息发送接收

    快速阅读 用Rabitmq的队列管理,以及如何保证消息在队列中不丢失.通过ack的消息确认和持久化进行操作.以及Rabbit中如何用Web面板进行管理队列.消费者如何处理耗时的任务 生产者代码创建链接 ...

  7. html消息发送接收,在html页面中 如何应用mqtt协议发送/接收消息

    经过前面几篇文章的介绍,在很多场景下利用NodeMCU加持mqtt协议来控制几乎所有需要传感器监控的行业都能极大地简化物联的成本.在这样一个基础上,还能拓展出很多好玩的.实际运用的甚至能够作为商业化运 ...

  8. Kafka消息发送失败解决方案

    防火墙设置 防火墙会屏蔽掉Kafka的访问,如果在内网限制很强的情况下,只能是逐个端口用 telnet 排查Kafka用到哪些端口,用lsof -i:<端口号>排查对应进程. 最粗暴的方式 ...

  9. 深入理解ActiveMQ支持的2类消息发送接收模型queue和topic

    本文已经收录进专栏,谢谢支持.

最新文章

  1. Windows10下安装unbuntu双系统 以及花屏解决办法
  2. 手把手,嘴对嘴,讲解UCOSII嵌入式操作系统的任务调度策略(二)
  3. 【机器学习基础】通俗易懂无监督学习K-Means聚类算法及代码实践
  4. HDU1007 Quoit Design 分治+递归
  5. resources.arsc格式(包-类型-资源项)
  6. 菜鸟,下一代分布式体系架构的设计理念
  7. sniffer 工具
  8. 内容分发网络 CDN 是如何提高网页加载时间的?
  9. android中json解析及使用(上)
  10. JAVA多线程之synchronized和volatile实例讲解
  11. rust大油井频率怎么用_90%的人都不会用电吹风!用不好危害大!1分钟告诉你到底怎么用...
  12. tomcat后台密码爆破脚本(python+字典)_Web中间件漏洞之Tomcat篇
  13. 视觉SLAM十四讲学习笔记——ch10 后端2
  14. java微信企业号接入_java微信企业号接入开发
  15. Qt自定义Combobox实现列表上拉展示
  16. 方正中间件创业大赛南京赛区圆满落幕
  17. 计算机没有显卡驱动,电脑没有显卡怎么办
  18. 职场人士升职加薪必备的工作软件,总有一款适合你
  19. 中控考勤机连服务器显示1007,中控智慧ZK-S1007动态人脸识别考勤门禁终端
  20. 查看iOS手机系统日志,在mac/window电脑上查看

热门文章

  1. 美团把AI搞出一股烟火气
  2. 又是加拿大!连年拒签NeurIPS参会者被指太荒唐,Hinton亲自过问也没辙
  3. GitHub免费支持CI/CD了,开发测试部署高度自动化,支持各种语言,网友:第三方凉凉...
  4. ListT 循环修改其中的数据
  5. 如何在WIN7上添加磁盘
  6. Jquery加载dom元素
  7. python 调用sqldr_sqlldr并发
  8. 启动子级时出错_【本音知识】弹钢琴时如何背谱?
  9. 5G UE — USIM Card — 5G 的 USIM 卡
  10. Go 语言编程 — go-restful RESTful 框架