本人今天上午参考了不少博文,发现不少博文不是特别好,不是因为依赖冲突问题就是因为版本问题。

于是我结合相关的博文和案例,自己改写了下并参考了下,于是就有了这篇文章。希望能够给大家帮助,少走一些弯路。

一、KafKa的介绍

1.主要功能

根据官网的介绍,ApacheKafka®是一个分布式流媒体平台,它主要有3种功能:

  a.发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因。

  b.以容错的方式记录消息流,kafka以文件的方式来存储消息流。

  c.可以再消息发布的时候进行处理。

2.使用场景

a.在系统或应用程序之间构建可靠的用于传输实时数据的管道,消息队列功能。

b.构建实时的流数据处理程序来变换或处理数据流,数据处理功能。

3.详细介绍

Kafka目前主要作为一个分布式的发布订阅式的消息系统使用,下面简单介绍一下kafka的基本机制

消息传输过程:

Producer即生产者,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic,上图展示了两个producer发送了分类为topic1的消息,另外一个发送了topic2的消息。

Topic即主题,通过对消息指定主题可以将消息分类,消费者可以只关注自己需要的Topic中的消息

Consumer即消费者,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。

二、安装

安装包下载地址:http://kafka.apache.org/downloads

找到0.11.0.1版本,如图:

1.下载

wget https://archive.apache.org/dist/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz

2.解压

tar -xzvf kafka_2.11-0.11.0.1.tgz

配置说明:

    consumer.properites 消费者配置,这个配置文件用于配置开启的消费者,此处我们使用默认的即可。

    producer.properties 生产者配置,这个配置文件用于配置开启的生产者,此处我们使用默认的即可。

  server.properties kafka服务器的配置,此配置文件用来配置kafka服务器,目前仅介绍几个最基础的配置。

a.broker.id 申明当前kafka服务器在集群中的唯一ID,需配置为integer,并且集群中的每一个kafka服务器的id都应是唯一的,我们这里采用默认配置即可。

b.listeners 申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置,

例如:listeners=PLAINTEXT:// 192.168.126.143:9092。并确保服务器的9092端口能够访问。

  c.zookeeper.connect 申明kafka所连接的zookeeper的地址 ,需配置为zookeeper的地址,由于本次使用的是kafka高版本中自带zookeeper,

使用默认配置即可,zookeeper.connect=localhost:2181。

3.运行

首先运行zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

运行成功,显示如图:

然后运行kafka

bin/kafka-server-start.sh config/server.properties

运行成功,显示如图:

三、整合KafKa

1.新建Maven项目导入Maven依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.test</groupId><artifactId>kafka_demo</artifactId><version>0.0.1-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.9.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>1.1.1.RELEASE</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.2</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin><!-- 指定编译版本 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins><finalName>${project.artifactId}</finalName></build></project>

2.编写消息实体

package com.springboot.kafka.bean;import java.util.Date;import lombok.Data;@Data
public class Message {private Long id;    //idprivate String msg; //消息private Date sendTime;  //时间戳}

有了lombok,每次编写实体不必要使用快捷键生成seter或geter方法了,代码看起来更加简洁了。

3.编写消息发送者(可以理解为生产者,最好联系详细介绍中的图)

package com.springboot.kafka.producer;import java.util.Date;
import java.util.UUID;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.springboot.kafka.bean.Message;import lombok.extern.slf4j.Slf4j;@Component
@Slf4j
public class KafkaSender {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;private Gson gson = new GsonBuilder().create();//发送消息方法public void send() {Message message = new Message();message.setId(System.currentTimeMillis());message.setMsg(UUID.randomUUID().toString());message.setSendTime(new Date());log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));kafkaTemplate.send("zhisheng", gson.toJson(message));}
}

4.编写消息接收者(可以理解为消费者)

package com.springboot.kafka.producer;import java.util.Date;
import java.util.UUID;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.springboot.kafka.bean.Message;import lombok.extern.slf4j.Slf4j;@Component
@Slf4j
public class KafkaSender {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;private Gson gson = new GsonBuilder().create();//发送消息方法public void send() {Message message = new Message();message.setId(System.currentTimeMillis());message.setMsg(UUID.randomUUID().toString());message.setSendTime(new Date());log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));kafkaTemplate.send("zhisheng", gson.toJson(message));}
}

5.编写启动类

package com.springboot.kafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;import com.springboot.kafka.producer.KafkaSender;@SpringBootApplication
public class KafkaApplication {public static void main(String[] args) {ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);KafkaSender sender = context.getBean(KafkaSender.class);for (int i = 0; i < 3; i++) {//调用消息发送类中的消息发送方法
            sender.send();try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}}
}

6.编写application.properties配置文件

#============== kafka ===================
# \u6307\u5B9Akafka \u4EE3\u7406\u5730\u5740\uFF0C\u53EF\u4EE5\u591A\u4E2A
spring.kafka.bootstrap-servers=192.168.126.143:9092#=============== provider  =======================spring.kafka.producer.retries=0
# \u6BCF\u6B21\u6279\u91CF\u53D1\u9001\u6D88\u606F\u7684\u6570\u91CF
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432# \u6307\u5B9A\u6D88\u606Fkey\u548C\u6D88\u606F\u4F53\u7684\u7F16\u89E3\u7801\u65B9\u5F0F
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#=============== consumer  =======================
# \u6307\u5B9A\u9ED8\u8BA4\u6D88\u8D39\u8005group id
spring.kafka.consumer.group-id=test-consumer-groupspring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100# \u6307\u5B9A\u6D88\u606Fkey\u548C\u6D88\u606F\u4F53\u7684\u7F16\u89E3\u7801\u65B9\u5F0F
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

7.运行结果

示例代码地址:https://github.com/youcong1996/study_simple_demo/tree/kafka_demo

如果按照上述流程没有达到预计的效果可以git clone到本地。

转载于:https://www.cnblogs.com/youcong/p/10216573.html

SpringBoot实战(十四)之整合KafKa相关推荐

  1. SpringBoot入门建站全系列(二十八)整合Kafka做日志监控

    SpringBoot入门建站全系列(二十八)整合Kafka做日志监控 一.概述 Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端 ...

  2. kafka maven 依赖_SpringBoot入门建站全系列(二十八)整合Kafka做日志监控

    SpringBoot入门建站全系列(二十八)整合Kafka做日志监控 一.概述 Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端 ...

  3. 打怪升级之小白的大数据之旅(七十四)<初识Kafka>

    打怪升级之小白的大数据之旅(七十四) 初识Kafka 引言 学完Flume之后,接下来将为大家带来Kafka相关的知识点,在工作中,Kafka和Flume经常会搭配使用,那么Kafka究竟是什么呢?让 ...

  4. SpringBoot实战(四):SpringBoot整合Redis

    强烈推荐一个大神的人工智能的教程:http://www.captainai.net/zhanghan​ [前言] 最近自己在整理过去搭建过的框架,将用到的各个组件进行了梳理并融入自己新建的项目中(ht ...

  5. SpringBoot 实战 (十二) | 整合 thymeleaf

    微信公众号:一个优秀的废人 如有问题或建议,请后台留言,我会尽力解决你的问题. 前言 如题,今天介绍 Thymeleaf ,并整合 Thymeleaf 开发一个简陋版的学生信息管理系统. Spring ...

  6. SpringBoot第二十四篇: springboot整合docker

    这篇文篇介绍,怎么为 springboot程序构建一个docker镜像.docker 是一个开源的应用容器引擎,基于 Go 语言 并遵从Apache2.0协议开源.Docker 可以让开发者打包他们的 ...

  7. 【SpringBoot】十四、常见注解(场景及源码)

    目录 说明 一.引用值的注解 1. @Value 2. @ConfigurationProperties 3. @AliasFor 二.使用功能的注解 1. @Slf4j 2. @EnableSche ...

  8. SpringBoot笔记十四:消息队列

    [TOC] 什么是消息队列 消息队列就是消息存储的容器,Java里面有两种 JMS:Sun公司出品,有两种模式,点对点和发布订阅. AMQP:消息队列的一个协议,其实现有RabbitMQ,stormM ...

  9. SpringBoot实战(三):整合Mybatis配置多数据源

    [前言] 最近接到一个新需求,经过分析后做了相应的设计:其中需要在一个项目中操做不同的数据源:于是进行了相关验证:在此记录一下验证过程. [实战多数据源]          一.Pom中引入相应的Ja ...

最新文章

  1. Linux对用户态的动态内存管理
  2. 海南师范大学计算机设计大赛证书,我校品牌VI设计作品在中国大学生计算机设计大赛海南省赛中获得一等奖...
  3. Java包装类API详解
  4. 传感器信号 如何发送到服务器,传感器如何将消息发送给云服务器
  5. pyhton3 os模块
  6. c# datetime._C#| DateTime.Year属性与示例
  7. java 陷阱,java 中的陷阱。
  8. 小程序遵循的语法_我如何构建一个遵循股市针对freeCodeCamp挑战的应用程序。
  9. UI实用素材|下拉菜单细节设计,分层呈现
  10. Linux 离线安装软件
  11. TableLayout与MigLayout
  12. NameError: name 'reload' is not defined等python版本问题解决方案
  13. 锐捷认证客户端常见问题解决及简介
  14. mybatis if test 之 like concat()函数
  15. 机顶盒显示网关服务器数据下发超时,智能机顶盒网关服务器数据下发超时
  16. iOS简单人脸检测的实现
  17. Android HPSocket SE_SOCKET_CREATE (3)
  18. 批量提取CAD中文字
  19. 输入学生的学习成绩,学习成绩>=90分的同学用A表示,60-89分之间的用B表示,60分以下的用C表示。
  20. 一篇好文章带你走出阴霾

热门文章

  1. Scala:Functions and Closures
  2. 操作系统--内存管理方式
  3. c# 垃圾回收是引用类型而言的
  4. 分享个网盘,个人觉得很不错!
  5. 用js 判断datagrid 中的 checkbox 是否被选中
  6. nginx不同server不同日志文件_招标里的答疑是什么?和澄清文件有何不同?
  7. stringbuilder 拼接语句缺失右括号_Leetcode No.22 括号生成
  8. 计算机基础知识掌握欠缺,《计算机基础知识》实验教学改革探讨.pdf
  9. android数据库降级_Android SQLite (二.数据库创建,升级及降级)
  10. img 在video上面_HTML,img,video无法铺满屏幕解决方法,同视频做网页背景无法全屏的解决方法...