go 操作 kafka 实现发送和订阅
kafka
- 消息队列
- kafka架构
- 安装kafka
- kafak依赖zookeeper 需要先启动zk(集群)
- zookeeper 启动
- 单节点启动kafka
- kafka配置文件 config/server.properties
- kafka启动
- 启动kafka集群
- 配置文件
- 群起脚本示例
- 基本的命令行操作
- 创建topic
- 查看topic
- 删除topic
- 生产消息
- 消费消息
- 普通消费
- 新版本消费
- kafka高级
- 存储
- 生产者
- ack 0 1 -1(ISR)
- ISR (HW LEO) 多退少补
- 消费者
- 高效存储
- 事物 生产者事物(跨回话)
- kafka监控工具
- eagel
- kafka面试题
- xsync 工具
- go操作kafka
- 依赖为github的samara
- 生产消息
- 同步发送消息
- 消费消息
- 普通一个消费者
消息队列
- 一对一
- 一对多 一个生产者多个消费者
- 消息主动推送给消费者(推送能力一样,但是每个消费者的消费能力不一样)
- 消费者主动去拉去消息(需要长期去轮询查询是否有消息) kafak
kafka架构
- leader 消费者找leader要消息
- follwer 用于集群间的数据同步备份
- 同一个分区消息只能被同一个消费组里的某一个消费者消费
安装kafka
kafak依赖zookeeper 需要先启动zk(集群)
zookeeper 启动
cp conf/zoo_sample.cfg conf/zoo.cfg
bin/zkServer.sh start 2181 启动
bin/zkCli.sh -server 127.0.0.1:2181 进入终端分布式配置 https://blog.csdn.net/java_66666/article/details/81015302
单节点启动kafka
kafka配置文件 config/server.properties
TODO
log.dir = /xxx 实际上是数据持久化存储的位置// 配置公网访问
advertised.listeners=PLAINTEXT://你的公网ip:9093
advertised.listeners=PLAINTEXT://你的公网i:9092
kafka启动
bin/kafka-server-start.sh config/server.propertiesbin/kafka-server-start.sh -daemon config/server.properties 后台启动
启动kafka集群
配置文件
broker.id=1、broker.id=2 不能重复
群起脚本示例
// 尚硅谷的大数据教程里面的
for i in hadoop102 hadoop103 hadoop104
do
echo "========== $i =========="
ssh $i '/opt/module/kafka/bin/kafka-server-start.sh -daemon
/opt/module/kafka/config/server.properties'
done
基本的命令行操作
创建topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first选项说明:--topic 定义 topic 名--replication-factor 定义副本数 (不能超过集群内zk的数量,否则报错)--partitions 定义分区数
查看topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
删除topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first- 需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除
生产消息
bin/kafka-console-producer.sh --brokerlist hadoop102:9092 --topic first
消费消息
普通消费
// 消费当前的消息 从zk读bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first// 某台消费者后来才启动,可以从头消费消息bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --from-beginning --topic first - --from-beginning:会把主题中以往所有的数据都读取出来。
新版本消费
// 从kafka读消息bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
kafka高级
存储
生产者
ack 0 1 -1(ISR)
ISR (HW LEO) 多退少补
消费者
高效存储
- 顺序读写
- 零拷贝技术
- 分布式(多个主机多个分区并发读写)
事物 生产者事物(跨回话)
kafka监控工具
eagel
- 1
- 需要加一下环境变量
export KE_HOME=/opt/xxx
export PATN=$PATN:$KE_HOME:bin
- 可能出现问题的配置 我自己没有实际操作过
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"export JMX_PORT="9999"
kafka面试题
https://blog.csdn.net/C_Xiang_Falcon/article/details/100917145#1KafkaISRInSyncRepliOSROutSyncRepliARAllRepli_3
xsync 工具
- https://www.cnblogs.com/Mark-blog/p/11603313.html
- https://www.jianshu.com/p/e74fbb091144
go操作kafka
依赖为github的samara
- 参考
https://github.com/Shopify/sarama
生产消息
同步发送消息
func producer1() {var address = []string{"kafka1:9092"}// creates a new SyncProducer// config := sarama.NewConfig()// 可以根据定制配置producer, err := sarama.NewSyncProducer(address, nil)handleErr()defer producer.Close()// 构建返回值result := map[string]string{"ip": "127.0.0.1", "info": "需要发送的消息",}byteresult, _ := json.Marshal(result)value := string(byteresult)// 发送消息对象msg := &sarama.ProducerMessage{ // ProducerMessage 发送消息的对象Topic: "monitor", Value: sarama.ByteEncoder(value),Key: sarama.ByteEncoder("applicationStop"),}// 发送消息partition, offset, err := producer.SendMessage(msg)fmt.Printf("发送信息成功,topic::%s,partition=%d, offset=%d \n", topic, partition, offset)// fmt.Fprintf(os.Stdout, value+"发送成功,partition=%d, offset=%d \n", partition, offset)
}
消费消息
普通一个消费者
func consumer() {consumer, err := sarama.NewConsumer(address, nil)if err != nil {fmt.Println("consumer()::sarama.NewConsumer::", err)return}// closedefer func() {if err := consumer.Close(); err != nil {fmt.Println("consumer()::consumer.Close()::", err)}}()// 从 topic 消费 // 消费最新数据partitionConsumer, err := consumer.ConsumePartition("monitor", 0, sarama.OffsetNewest)if err != nil {fmt.Println("consumer()::consumer.ConsumePartition::", err)return}// closedefer func() {if err := partitionConsumer.Close(); err != nil {fmt.Println("consumer()::partitionConsumer.Close()::", err)}}()// 监听消息for {select {// ConsumerMessage 接收消息的对象case msg := <-partitionConsumer.Messages(): if string(msg.Key) == "basic" {result := map[string]string{}err := json.Unmarshal(msg.Value, &result)fmt.Println("消息为::",result)}}
}
go 操作 kafka 实现发送和订阅相关推荐
- go 实现 kafka 消息发送、接收
引言 网络上关于 go 实现 kafka 消息发送和接收的文章很多,但是实际操作起来又不是很清楚,本文在网络资源的基础上,结合自己搭建过程中遇到的问题进行了总结. 本文的实验主机:Mac笔记本. 一. ...
- java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明
本次的记录内容包括: 1.Java调用生产者APi流程 2.Kafka生产者Api的使用及说明 3.Kafka消费者Api的使用及说明 4.Kafka消费者自动提交Offset和手动提交Offset ...
- Python 操作 Kafka --- kafka-python
kafka-python:https://github.com/dpkp/kafka-python kafka-python 文档:https://kafka-python.readthedocs.i ...
- kafka java_Java操作Kafka
java操作kafka非常的简单,然后kafka也提供了很多缺省值,一般情况下我们不需要修改太多的参数就能使用.下面我贴出代码. pom.xml org.apache.kafka kafka-clie ...
- kafka 脚本发送_Apache-Flink深度解析-DataStream-Connectors之Kafka
聊什么 为了满足本系列读者的需求,在完成<Apache Flink 漫谈系列(14) - DataStream Connectors>之前,我先介绍一下Kafka在Apache Flink ...
- kafka实战教程(python操作kafka),kafka配置文件详解
全栈工程师开发手册 (作者:栾鹏) 架构系列文章 应用往Kafka写数据的原因有很多:用户行为分析.日志存储.异步通信等.多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量 ...
- kafka详解(JAVA API操作kafka、kafka原理、kafka监控)-step2
1.JAVA API操作kafka 修改Windows的Host文件: 目录:C:\Windows\System32\drivers\etc (win10) 内容: 192.168.40.150 k ...
- kafka入门(4)-java操作kafka
kafka入门(4)-java操作kafka 准备工作 创建maven工程 导入Maven Kafka POM依赖 <repositories><!-- 代码库 -->< ...
- python使用kafka原理详解_Python操作Kafka原理及使用详解
Python操作Kafka原理及使用详解 一.什么是Kafka Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理 ...
最新文章
- 自回归模型PixelCNN 的盲点限制以及如何修复
- 个性化邮件系统用例设计和实现
- uwsgi指定python路径_uWsgi服务器(2)--安装配置
- poj1236 Tarjan算法模板 详解
- 「あるいは」 「もしくは」 「または」 「それとも」的区别
- [python]getopt模块的使用介绍
- 实用的Portraiture滤镜磨皮教程
- maven安装jar包到本地仓库
- 如何避免出现SQL注入漏洞
- 软件测试项目案例.pdf,【精选】最经典软件测试案例.pdf
- python爬虫百度网盘_python爬取百度云网盘资源
- Excel 条件格式实现甘特图
- 12123 上传照片到文件服务器失败,12123软件上传不了照片怎么回事(教你最合理的上传方法)...
- 【WordPress】添加备案信息
- 什么是soft matting方法_建筑师学“交互”有什么意义?零基础如何展开?
- excel中vlookup函数的使用方法_Excel教程:函数VLOOKUP实用技巧
- CAD学习笔记中级课【导入导出】
- [电设训练]幅频特性测试仪
- Linux运维课程 第一阶段 重难点摘要(二)网络基础
- 再次阅读乔布斯的讲演—“好学若饥、谦卑若愚”
热门文章
- 阿里云部署Java网站和微信开发调试心得技巧(下)
- 禁止K8S容器内子进程拥有提升权限的能力
- web 框架的本质及自定义web框架 模板渲染jinja2 mvc 和 mtv框架 Django框架的下载安装 基于Django实现的一个简单示例...
- java计算机毕业设计青岛地区常见昆虫图鉴与论坛源码+数据库+lw文档+系统
- 小学生python编程写游戏_用python教小孩子编程做游戏(上)
- 初出茅庐——利用Python的Turtle库绘制玫瑰花
- Linux:HDMI驅動之HPD
- (C语言)图书管理系统(程序设计)
- 姜小白的python日记Day4 列表和元组
- 文字转语音软件哪个好,这一款值得推荐