前言

公司在用kafka接受和发送数据,自己学习过Rabbitmq,不懂kafka挺不爽的,说干就干!网上找了许多帖子,学习了很多,小小的demo自己也搭建起来了,美滋滋,下面我认为优秀的网站和自己的步骤展现给大家。

一、kafka介绍与原理

我们将消息的发布(publish)称作 producer,将消息的订阅(subscribe)表述为 consumer,将中间的存储阵列称作 broker(代理),这样就可以大致描绘出这样一个场面:

生产者将数据生产出来,交给 broker 进行存储,消费者需要消费数据了,就从broker中去拿出数据来,然后完成一系列对数据的处理操作。

乍一看返也太简单了,不是说了它是分布式吗,难道把 producer、 broker 和 consumer 放在三台不同的机器上就算是分布式了吗。看 kafka 官方给出的图:

多个 broker 协同合作,producer 和 consumer 部署在各个业务逻辑中被频繁的调用,三者通过 zookeeper管理协调请求和转发。这样一个高性能的分布式消息发布订阅系统就完成了。

图上有个细节需要注意,producer 到 broker 的过程是 push,也就是有数据就推送到 broker,而 consumer 到 broker 的过程是 pull,是通过 consumer 主动去拉数据的,而不是 broker 把数据主懂发送到 consumer 端的。

二、kafka的linux基本搭建

此处还是给大家一个链接很是优秀!kafka的linux搭建
注意:kafka依赖于zookeeper的节点,需要搭建zookeeper,linux安装zookeeper文章中有链接(kafaka高版本也自带zookeeper,其实博主用kafka自带的zookeeper启动,然后启动kafka没成功,用安装的zookeeper就成功了…)

三、springboot整合kafka的简单demo

1.引入依赖

在springboot项目中的pom.xml引入下列依赖:

  <dependency>            <groupId>org.springframework.kafkagroupId>            <artifactId>spring-kafkaartifactId>            <version>2.2.6.RELEASEversion>        dependency>        <dependency>            <groupId>org.apache.kafkagroupId>            <artifactId>kafka-clientsartifactId>            <version>2.1.0version>        dependency>       <dependency>            <groupId>org.projectlombokgroupId>            <artifactId>lombokartifactId>            <optional>trueoptional>        dependency>

2.yml配置文件

配置如下:

spring:  kafka:    bootstrap-servers: 192.168.200.130:9092  #此处是我虚拟机上linux的ip kafak的默认端口为9092    producer:          #生产者      acks: 1      client-id: kafka-producer      batch-size: 5      buffer-memory: 33554432      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer    consumer:        #消费者      group-id: hello-group      enable-auto-commit: false      auto-offset-reset: earliest      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer  application:    name: kafkatestserver:  port: 9192eureka:         #eureka注册中心配置  client:    service-url:      defaultZone: http://127.0.0.1:6868/eureka  instance:    prefer-ip-address: true

是不是依赖没注释看不懂ennnnn,别急嘛,那我来搬运点详细的注释解释(我的demo就是上面的)

producer

#============== kafka ===================# 指定kafka server的地址,集群配多个,中间,逗号隔开spring.kafka.bootstrap-servers=127.0.0.1:9092#=============== provider  =======================# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。spring.kafka.producer.retries=0# 每次批量发送消息的数量,produce积累到一定数据,一次发送spring.kafka.producer.batch-size=16384# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据spring.kafka.producer.buffer-memory=33554432#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。#可以设置的值为:all, -1, 0, 1spring.kafka.producer.acks=1# 指定消息key和消息体的编解码方式spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

consumer

#=============== consumer  =======================# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名spring.kafka.consumer.group-id=testGroup# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallestspring.kafka.consumer.auto-offset-reset=earliest# enable.auto.commit:true --> 设置自动提交offsetspring.kafka.consumer.enable-auto-commit=true#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。spring.kafka.consumer.auto-commit-interval=100# 指定消息key和消息体的编解码方式spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3.Controller

import lombok.AllArgsConstructor;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController@AllArgsConstructor@RequestMapping("Kafka")public class SimpleController {    private final KafkaTemplate<Object,Object> kafkaTemplate;    @GetMapping("/send/{message}")    public  String send(@PathVariable("message") String message){        kafkaTemplate.send("topic1","topic1:"+message);        kafkaTemplate.send("topic2","topic2:"+message);        return message;    }}

4.Listener

import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Componentpublic class SimpleListener {    @KafkaListener(topics = {"topic1","topic2"})    public void listen1(String data){        System.out.println(data);    }}

5.访问

地址栏 输入:http://localhost:9192/Kafka/send/helloKafaka (9192端口是我的boot项目的端口,看官根据自己的项目更改)

6.结果

控制台输出:

四,搭建的坑

我搭建使用的是虚拟机上的linux所以要么开放kafka端口要么关闭防火墙如果不开放项目启动的时候报错:Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
点击解决方案(找了好久还是大佬厉害)
解决方案

总结

以上就是今天要展示的kafka入门,虽然内容不多,但是入门了对吧,更深层次的原理及其使用场景(大数据等等)还等着我们去发掘呢,给我的感觉就是基本的会了,但是牛批的场景使用还是不会,就像高数一样同样学的高数课本但是有的题你不会人家会,而且还能会出花样来,所以多接触优秀的事物,多学习,多总结成就优秀的你,加油,陌生人,越努力越幸运!

kafka原理_kafka入门(原理搭建简单使用)相关推荐

  1. kafka分区与分组原理_Kafka工作原理

    Kafka工作原理 Kafka工作原理 4.1. topic和消息 4.2. Producer 4.3. Consumer 4.4. Kafka核心特性 4.5. consumer.consumer ...

  2. kafka原理_Kafka 架构原理,也就这么回事

    点击"开发者技术前线",选择"星标?" 在看|星标|留言,  真爱 作者:臧远慧 原文链接:https://juejin.im/post/5e217c3fe51 ...

  3. kudu接受kafka消息_Kafka入门详解

    1.1 什么是kafka? Kafka最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的 ...

  4. 理解vue ssr原理,自己搭建简单的ssr框架

    理解vue ssr原理,自己搭建简单的ssr框架 前言 大多数Vue项目要支持SSR应该是为了SEO考虑,毕竟对于WEB应用来说,搜索引擎是一个很大的流量入口.Vue SSR现在已经比较成熟了,但是如 ...

  5. 大数据面试通关手册 | Kylin入门/原理/调优/OLAP解决方案和行业典型应用

    Kylin入门/原理/调优/OLAP解决方案和行业典型应用一网打尽. 一:背景历史和使命 背景和历史 现今,大数据行业发展得如火如荼,新技术层出不穷,整个生态欣欣向荣.作为大数据领域最重要的技术的 A ...

  6. spring原理案例-基本项目搭建 01 spring framework 下载 官网下载spring jar包

    下载spring http://spring.io/ 最重要是在特征下面的这段话,需要注意: All avaible features and modules are described in the ...

  7. Apollo架构体系、Apollo运行原理、Apollo配置中心简单介绍(一)

    笔者在工作中遇到如下问题,随着程序功能越多,配置文件不断增加,一些功能的开关.服务器地址.接口地址.不同环境的一些配置文件不同,这些在每次发布不同环境.更新项目时都比较繁琐,后来学习微服务时接触到了S ...

  8. redis原理快速入门知识点总结

    redis原理快速入门知识点总结 1. 项目中缓存是如何使用的?为什么要用缓存?缓存使用不当会造成什么后果? 为什么用缓存? 1.高性能: 一些需要复杂操作耗时查出来的结果,且确定后面不怎么变化,但是 ...

  9. Kafka(消息队列原理,kafka定义,Kafka架构原理,kafka架构的工作流程)秒懂的kafka

    目录 什么是Kafka? 消息队列原理: 为什么要用Kafka? kafka的架构 kafka工作流程详解: 什么是Kafka? kafka是一个分布式消息队列 这个定义意味深长,记住容易,理解不易. ...

最新文章

  1. Linux之SELinux的基本应用
  2. iOS开发UI篇—模仿ipad版QQ空间登录界面
  3. 有趣的灵魂百里挑一,Linux同学你低下头干嘛,起来说下这个问题。
  4. __property 关键字的使用
  5. fatal: Could not read from remote repository.
  6. C++类中的static数据成员,static成员函数
  7. 二倍精灵图的做法(以firework为例)
  8. 基于Pipeline的CI/CD在趣头条的应用实践
  9. QQ小游戏 微信小游戏 即时通信 IM 删除会话 deleteConversation sdk
  10. 如何把pdf转换成word文档?
  11. 通用权限管理系统项目简单介绍
  12. 超强实用:中国各地特产风味大搜捕!
  13. 在多台终端设备的i茅台应用中,实现同时自动化预约X酒的解决方案
  14. 以太网交换机MAC地址表格式 IVL和SVL
  15. http请求限制和http连接限制
  16. ubuntu 系统声音静音问题
  17. Cadence Allegro PCB 切换上次视图的方法图文教程及视频演示
  18. 基于ssm物业报修管理系统毕业设计源码111024
  19. 破局红海市场?盘点那些传统企业要学会的超级产品战略方法论
  20. AI人体引力报警系统,人体感应报警系统,报警围栏,防攀爬报警围栏

热门文章

  1. 作为一名SAP从业人员,需要专门学习数学么
  2. 从手机App通过WebSocket向浏览器推送数据
  3. S/4HANA Extension field的UI visible checkbox逻辑
  4. GraphQL一些hello world级别的例子
  5. SAP Fiori Launchpad里home按钮的实现原理分析
  6. 禅道 bug状态 open_小工具大帮手,利用 @open-node/antman 实现 node.js 进程线上调试,无须重启...
  7. python二维数组去重复_php二维数组去重,array_unique出除重复数据
  8. python文件之间的相互调用_用Python创建功能模块——截取字符串模块
  9. 门面设计模式php,php设计模式-门面模式(Facade Pattern)
  10. 复旦大学计算机科学院夏令营,2020年复旦大学计算机科学技术学院夏令营接收推免生条件...