kafka

  • 消息队列
  • kafka架构
  • 安装kafka
    • kafak依赖zookeeper 需要先启动zk(集群)
      • zookeeper 启动
    • 单节点启动kafka
      • kafka配置文件 config/server.properties
      • kafka启动
    • 启动kafka集群
      • 配置文件
      • 群起脚本示例
    • 基本的命令行操作
      • 创建topic
      • 查看topic
      • 删除topic
    • 生产消息
    • 消费消息
      • 普通消费
      • 新版本消费
  • kafka高级
    • 存储
    • 生产者
      • ack 0 1 -1(ISR)
      • ISR (HW LEO) 多退少补
    • 消费者
    • 高效存储
    • 事物 生产者事物(跨回话)
  • kafka监控工具
    • eagel
  • kafka面试题
    • xsync 工具
  • go操作kafka
    • 依赖为github的samara
    • 生产消息
      • 同步发送消息
    • 消费消息
      • 普通一个消费者

消息队列

  • 一对一
  • 一对多 一个生产者多个消费者
    • 消息主动推送给消费者(推送能力一样,但是每个消费者的消费能力不一样)
    • 消费者主动去拉去消息(需要长期去轮询查询是否有消息) kafak

kafka架构

  • leader 消费者找leader要消息
  • follwer 用于集群间的数据同步备份
  • 同一个分区消息只能被同一个消费组里的某一个消费者消费

安装kafka

kafak依赖zookeeper 需要先启动zk(集群)

zookeeper 启动
cp conf/zoo_sample.cfg conf/zoo.cfg
bin/zkServer.sh start     2181 启动
bin/zkCli.sh -server 127.0.0.1:2181 进入终端分布式配置 https://blog.csdn.net/java_66666/article/details/81015302

单节点启动kafka

kafka配置文件 config/server.properties
TODO
log.dir = /xxx   实际上是数据持久化存储的位置// 配置公网访问
advertised.listeners=PLAINTEXT://你的公网ip:9093
advertised.listeners=PLAINTEXT://你的公网i:9092
kafka启动
bin/kafka-server-start.sh config/server.propertiesbin/kafka-server-start.sh -daemon  config/server.properties  后台启动

启动kafka集群

配置文件
broker.id=1、broker.id=2 不能重复
群起脚本示例
// 尚硅谷的大数据教程里面的
for i in hadoop102 hadoop103 hadoop104
do
echo "========== $i =========="
ssh $i '/opt/module/kafka/bin/kafka-server-start.sh -daemon
/opt/module/kafka/config/server.properties'
done

基本的命令行操作

创建topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first选项说明:--topic 定义 topic 名--replication-factor 定义副本数 (不能超过集群内zk的数量,否则报错)--partitions 定义分区数
查看topic
  • bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
删除topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first- 需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除

生产消息

 bin/kafka-console-producer.sh --brokerlist hadoop102:9092 --topic first

消费消息

普通消费
// 消费当前的消息 从zk读bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first// 某台消费者后来才启动,可以从头消费消息bin/kafka-console-consumer.sh --zookeeper hadoop102:2181   --from-beginning --topic first - --from-beginning:会把主题中以往所有的数据都读取出来。
新版本消费
// 从kafka读消息bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

kafka高级

存储

生产者

ack 0 1 -1(ISR)
ISR (HW LEO) 多退少补

消费者

高效存储

  • 顺序读写
  • 零拷贝技术
  • 分布式(多个主机多个分区并发读写)

事物 生产者事物(跨回话)

kafka监控工具

eagel

  • 1
  • 需要加一下环境变量
export KE_HOME=/opt/xxx
export PATN=$PATN:$KE_HOME:bin
  • 可能出现问题的配置 我自己没有实际操作过
 export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"export JMX_PORT="9999"

kafka面试题

  • https://blog.csdn.net/C_Xiang_Falcon/article/details/100917145#1KafkaISRInSyncRepliOSROutSyncRepliARAllRepli_3

xsync 工具

  • https://www.cnblogs.com/Mark-blog/p/11603313.html
  • https://www.jianshu.com/p/e74fbb091144

go操作kafka

依赖为github的samara

  • 参考 https://github.com/Shopify/sarama

生产消息

同步发送消息
func producer1() {var address = []string{"kafka1:9092"}// creates a new SyncProducer// config := sarama.NewConfig()// 可以根据定制配置producer, err := sarama.NewSyncProducer(address, nil)handleErr()defer producer.Close()// 构建返回值result := map[string]string{"ip":      "127.0.0.1", "info":     "需要发送的消息",}byteresult, _ := json.Marshal(result)value := string(byteresult)// 发送消息对象msg := &sarama.ProducerMessage{ // ProducerMessage 发送消息的对象Topic: "monitor", Value: sarama.ByteEncoder(value),Key:   sarama.ByteEncoder("applicationStop"),}// 发送消息partition, offset, err := producer.SendMessage(msg)fmt.Printf("发送信息成功,topic::%s,partition=%d, offset=%d \n", topic, partition, offset)// fmt.Fprintf(os.Stdout, value+"发送成功,partition=%d, offset=%d \n", partition, offset)
}

消费消息

普通一个消费者
func consumer() {consumer, err := sarama.NewConsumer(address, nil)if err != nil {fmt.Println("consumer()::sarama.NewConsumer::", err)return}// closedefer func() {if err := consumer.Close(); err != nil {fmt.Println("consumer()::consumer.Close()::", err)}}()// 从 topic 消费 // 消费最新数据partitionConsumer, err := consumer.ConsumePartition("monitor", 0, sarama.OffsetNewest)if err != nil {fmt.Println("consumer()::consumer.ConsumePartition::", err)return}// closedefer func() {if err := partitionConsumer.Close(); err != nil {fmt.Println("consumer()::partitionConsumer.Close()::", err)}}()// 监听消息for {select {// ConsumerMessage 接收消息的对象case msg := <-partitionConsumer.Messages(): if string(msg.Key) == "basic" {result := map[string]string{}err := json.Unmarshal(msg.Value, &result)fmt.Println("消息为::",result)}}
}

go 操作 kafka 实现发送和订阅相关推荐

  1. go 实现 kafka 消息发送、接收

    引言 网络上关于 go 实现 kafka 消息发送和接收的文章很多,但是实际操作起来又不是很清楚,本文在网络资源的基础上,结合自己搭建过程中遇到的问题进行了总结. 本文的实验主机:Mac笔记本. 一. ...

  2. java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明

    本次的记录内容包括: 1.Java调用生产者APi流程 2.Kafka生产者Api的使用及说明 3.Kafka消费者Api的使用及说明 4.Kafka消费者自动提交Offset和手动提交Offset ...

  3. Python 操作 Kafka --- kafka-python

    kafka-python:https://github.com/dpkp/kafka-python kafka-python 文档:https://kafka-python.readthedocs.i ...

  4. kafka java_Java操作Kafka

    java操作kafka非常的简单,然后kafka也提供了很多缺省值,一般情况下我们不需要修改太多的参数就能使用.下面我贴出代码. pom.xml org.apache.kafka kafka-clie ...

  5. kafka 脚本发送_Apache-Flink深度解析-DataStream-Connectors之Kafka

    聊什么 为了满足本系列读者的需求,在完成<Apache Flink 漫谈系列(14) - DataStream Connectors>之前,我先介绍一下Kafka在Apache Flink ...

  6. kafka实战教程(python操作kafka),kafka配置文件详解

    全栈工程师开发手册 (作者:栾鹏) 架构系列文章 应用往Kafka写数据的原因有很多:用户行为分析.日志存储.异步通信等.多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量 ...

  7. kafka详解(JAVA API操作kafka、kafka原理、kafka监控)-step2

    1.JAVA API操作kafka  修改Windows的Host文件: 目录:C:\Windows\System32\drivers\etc (win10) 内容: 192.168.40.150 k ...

  8. kafka入门(4)-java操作kafka

    kafka入门(4)-java操作kafka 准备工作 创建maven工程 导入Maven Kafka POM依赖 <repositories><!-- 代码库 -->< ...

  9. python使用kafka原理详解_Python操作Kafka原理及使用详解

    Python操作Kafka原理及使用详解 一.什么是Kafka Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理 ...

最新文章

  1. 自回归模型PixelCNN 的盲点限制以及如何修复
  2. 个性化邮件系统用例设计和实现
  3. uwsgi指定python路径_uWsgi服务器(2)--安装配置
  4. poj1236 Tarjan算法模板 详解
  5. 「あるいは」 「もしくは」 「または」 「それとも」的区别
  6. [python]getopt模块的使用介绍
  7. 实用的Portraiture滤镜磨皮教程
  8. maven安装jar包到本地仓库
  9. 如何避免出现SQL注入漏洞
  10. 软件测试项目案例.pdf,【精选】最经典软件测试案例.pdf
  11. python爬虫百度网盘_python爬取百度云网盘资源
  12. Excel 条件格式实现甘特图
  13. 12123 上传照片到文件服务器失败,12123软件上传不了照片怎么回事(教你最合理的上传方法)...
  14. 【WordPress】添加备案信息
  15. 什么是soft matting方法_建筑师学“交互”有什么意义?零基础如何展开?
  16. excel中vlookup函数的使用方法_Excel教程:函数VLOOKUP实用技巧
  17. CAD学习笔记中级课【导入导出】
  18. [电设训练]幅频特性测试仪
  19. Linux运维课程 第一阶段 重难点摘要(二)网络基础
  20. 再次阅读乔布斯的讲演—“好学若饥、谦卑若愚”

热门文章

  1. 阿里云部署Java网站和微信开发调试心得技巧(下)
  2. 禁止K8S容器内子进程拥有提升权限的能力
  3. web 框架的本质及自定义web框架 模板渲染jinja2 mvc 和 mtv框架 Django框架的下载安装 基于Django实现的一个简单示例...
  4. java计算机毕业设计青岛地区常见昆虫图鉴与论坛源码+数据库+lw文档+系统
  5. 小学生python编程写游戏_用python教小孩子编程做游戏(上)
  6. 初出茅庐——利用Python的Turtle库绘制玫瑰花
  7. Linux:HDMI驅動之HPD
  8. (C语言)图书管理系统(程序设计)
  9. 姜小白的python日记Day4 列表和元组
  10. 文字转语音软件哪个好,这一款值得推荐