前言

由于kafka强依赖于zookeeper,所以需先搭建好zookeeper集群。由于zookeeper是由java编写的,需运行在jvm上,所以首先应具备java环境。
(ps:默认您的centos系统可联网,本教程就不教配置ip什么的了)
(ps2:没有wget的先装一下:yum install wget)
(ps3:人啊,就是要条理。东边放一点,西边放一点,过段时间就不知道自己装在哪里了。本教程所有下载均放在/usr/local目录下)
(ps4:kafka可能有内置zookeeper,感觉可以越过zookeeper教程,但是这里也配置出来了。我没试过)

一、配置jdk

因为oracle 公司不允许直接通过wget 下载官网上的jdk包。所以你直接wget以下地址下载下来的是一个只有5k的网页文件而已,并不是需要的jdk包。(垄断地位就是任性)。
(请通过java -version判断是否自带jdk,我的没带)

1、官网下载

下面是jdk8的官方下载地址:

https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8u211-later-5573849.html

2、上传解压

这里通过xftp上传到服务器指定位置:/usr/local

对压缩文件进行解压:

tar -zxvf jdk-8u221-linux-x64.tar.gz

对解压后的文件夹进行改名:

mv jdk1.8.0_221 jdk1.8

3、配置环境变量

vim /etc/profile
#java environmentexport JAVA_HOME=/usr/local/jdk1.8export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jarexport PATH=$PATH:${JAVA_HOME}/bin

操作之后的界面如下:

运行命令使环境生效

source /etc/profile

二、搭建zookeeper集群

1、下载zookeeper

创建zookeeper目录,在该目录下进行下载:

mkdir /usr/local/zookeeper

这一步如果出现连接被拒绝时可多试几次,我就是第二次请求才成功的。

wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz

等待下载完成之后解压:

tar -zxvf zookeeper-3.4.6.tar.gz

重命名为zookeeper1

mv zookeeper-3.4.6 zookeeper1cp -r zookeeper1 zookeeper2cp -r zookeeper1 zookeeper3

2、创建data、logs文件夹

在zookeeper1目录下创建

在data目录下新建myid文件。内容为1

3、修改zoo.cfg文件

cd /usr/local/zookeeper/zookeeper1/conf/cp zoo_sample.cfg zoo.cfg

进行过上面两步之后,有zoo.cfg文件了,现在修改内容为:

dataDir=/usr/local/zookeeper/zookeeper1/datadataLogDir=/usr/local/zookeeper/zookeeper1/logsserver.1=192.168.233.11:2888:3888server.2=192.168.233.11:2889:3889server.3=192.168.233.11:2890:3890

4、搭建zookeeper2

首先,复制改名。

cd /usr/local/zookeeper/cp -r zookeeper1 zookeeper2

然后修改具体的某些配置:

vim zookeeper2/conf/zoo.cfg

将下图三个地方1改成2

vim zookeeper2/data/myid

同时将myid中的值改成2

5、搭建zookeeper3

同上,复制改名

cp -r zookeeper1 zookeeper3

vim zookeeper3/conf/zoo.cfg

修改为3

vim zookeeper3/data/myid

修改为3

6、测试zookeeper集群

cd /usr/local/zookeeper/zookeeper1/bin/

由于启动所需代码比较多,这里简单写了一个启动脚本:

vim start

start的内容如下

cd /usr/local/zookeeper/zookeeper1/bin/./zkServer.sh start ../conf/zoo.cfgcd /usr/local/zookeeper/zookeeper2/bin/./zkServer.sh start ../conf/zoo.cfgcd /usr/local/zookeeper/zookeeper3/bin/./zkServer.sh start ../conf/zoo.cfg

下面是连接脚本:

vim login

login内容如下:

./zkCli.sh -server 192.168.233.11:2181,192.168.233.11:2182,192.168.233.11:2183

脚本编写完成,接下来启动:

sh startsh login

启动集群成功,如下图:

这里zookeeper就告一段落了,由于zookeeper占用着输入窗口,这里可以在xshell右键标签,新建ssh渠道。然后就可以在新窗口继续操作kafka了!

三、搭建kafka集群

1、下载kafka

首先创建kafka目录:

mkdir /usr/local/kafka

然后在该目录下载

cd /usr/local/kafka/wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz

下载成功之后解压:

tar -zxvf kafka_2.11-1.1.0.tgz

2、修改集群配置

首先进入conf目录下:

cd /usr/local/kafka/kafka_2.11-1.1.0/config

修改server.properties
修改内容:

broker.id=0log.dirs=/tmp/kafka-logslisteners=PLAINTEXT://192.168.233.11:9092

复制两份server.properties

cp server.properties server2.propertiescp server.properties server3.properties

修改server2.properties

vim server2.properties

修改主要内容为:

broker.id=1log.dirs=/tmp/kafka-logs1listeners=PLAINTEXT://192.168.233.11:9093

如上,修改server3.properties
修改内容为:

broker.id=2log.dirs=/tmp/kafka-logs2listeners=PLAINTEXT://192.168.233.11:9094

3、启动kafka

这里还是在bin目录编写一个脚本:

cd ../bin/vim start

脚本内容为:

./kafka-server-start.sh ../config/server.properties &./kafka-server-start.sh ../config/server2.properties &./kafka-server-start.sh ../config/server3.properties &

通过jps命令可以查看到,共启动了3个kafka。

4、创建Topic

cd /usr/local/kafka/kafka_2.11-1.1.0bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

kafka打印了几条日志

在启动的zookeeper中可以通过命令查询到这条topic!

ls /brokers/topics

查看kafka状态

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

可以看到此时有三个节点 1 , 2 , 0

Leader 是1 ,
因为分区只有一个 所以在0上面,
Replicas:主从备份是 1,2,0,
ISR(in-sync):现在存活的信息也是 1,2,0

5、启动生产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

由于不能按删除,不能按左右键去调整,所以语句有些乱啊。em…

6、启动消费者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

可以看出,启动消费者之后就会自动消费。

在生产者又造了一条。

消费者自动捕获成功!

四、集成springboot

先贴一张kafka兼容性目录:

不满足的话启动springboot的时候会抛异常的!!!ps:该走的岔路我都走了o(╥﹏╥)o
(我的kafka-clients是1.1.0,spring-kafka是2.2.2,中间那列暂时不用管)

回归正题,搞了两个小时,终于搞好了,想哭…
遇到的问题基本就是jar版本不匹配。
上面的步骤我也都会相应的去修改,争取大家按照本教程一遍过!!!

1、pom文件

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0modelVersion><parent><groupId>org.springframework.bootgroupId><artifactId>spring-boot-starter-parentartifactId><version>2.1.1.RELEASEversion><relativePath/> parent><groupId>com.gzkygroupId><artifactId>studyartifactId><version>0.0.1-SNAPSHOTversion><name>studyname><description>Demo project for Spring Bootdescription>

<properties><java.version>1.8java.version>properties>

<dependencies><dependency><groupId>org.springframework.bootgroupId><artifactId>spring-boot-starter-webartifactId>dependency>

<dependency><groupId>org.springframework.bootgroupId><artifactId>spring-boot-starter-testartifactId><scope>testscope><exclusions><exclusion><groupId>org.junit.vintagegroupId><artifactId>junit-vintage-engineartifactId>exclusion>exclusions>dependency>

<dependency><groupId>org.springframework.bootgroupId><artifactId>spring-boot-starter-redisartifactId><version>1.3.8.RELEASEversion>dependency>

<dependency><groupId>redis.clientsgroupId><artifactId>jedisartifactId>dependency>

<dependency><groupId>org.springframework.kafkagroupId><artifactId>spring-kafkaartifactId><version>2.2.0.RELEASEversion>dependency>

<dependency><groupId>org.apache.kafkagroupId><artifactId>kafka-clientsartifactId>dependency>

dependencies>

<build><plugins><plugin><groupId>org.springframework.bootgroupId><artifactId>spring-boot-maven-pluginartifactId>plugin>plugins>build>

project>

pom文件中,重点是下面这两个版本。

<parent><groupId>org.springframework.bootgroupId><artifactId>spring-boot-starter-parentartifactId><version>2.1.1.RELEASEversion><relativePath/> parent><dependency><groupId>org.springframework.kafkagroupId><artifactId>spring-kafkaartifactId><version>2.2.0.RELEASEversion>dependency>

2、application.yml

spring:redis:cluster:#设置key的生存时间,当key过期时,它会被自动删除;expire-seconds: 120#设置命令的执行时间,如果超过这个时间,则报错;command-timeout: 5000#设置redis集群的节点信息,其中namenode为域名解析,通过解析域名来获取相应的地址;nodes: 192.168.233.11:9001,192.168.233.11:9002,192.168.233.11:9003,192.168.233.11:9004,192.168.233.11:9005,192.168.233.11:9006kafka:# 指定kafka 代理地址,可以多个bootstrap-servers: 192.168.233.11:9092,192.168.233.11:9093,192.168.233.11:9094producer:retries: 0# 每次批量发送消息的数量batch-size: 16384buffer-memory: 33554432# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:# 指定默认消费者group idgroup-id: test-groupauto-offset-reset: earliestenable-auto-commit: trueauto-commit-interval: 100# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

server:port: 8085servlet:#context-path: /rediscontext-path: /kafka

没有配置Redis的可以把Redis部分删掉,也就是下图:
想学习配置Redis集群的可以参考:《Redis集群redis-cluster的搭建及集成springboot》

3、生产者

package com.gzky.study.utils;

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;

/** * kafka生产者工具类 * * @author biws * @date 2019/12/17 **/@Componentpublic class KfkaProducer {

private static Logger logger = LoggerFactory.getLogger(KfkaProducer.class);

@Autowiredprivate KafkaTemplate kafkaTemplate;/**     * 生产数据     * @param str 具体数据     */public void send(String str) {        logger.info("生产数据:" + str);        kafkaTemplate.send("testTopic", str);    }}

4、消费者

package com.gzky.study.utils;

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;

/** * kafka消费者监听消息 * * @author biws * @date 2019/12/17 **/@Componentpublic class KafkaConsumerListener {

private static Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class);

@KafkaListener(topics = "testTopic")public void onMessage(String str){//insert(str);//这里为插入数据库代码        logger.info("监听到:" + str);        System.out.println("监听到:" + str);    }

}

5、对外接口

package com.gzky.study.controller;

import com.gzky.study.utils.KfkaProducer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;

/** * kafka对外接口 * * @author biws * @date 2019/12/17 **/@RestControllerpublic class KafkaController {

@Autowired    KfkaProducer kfkaProducer;

/**     * 生产消息     * @param str     * @return     */@RequestMapping(value = "/sendKafkaWithTestTopic",method = RequestMethod.GET)@ResponseBody    public boolean sendTopic(@RequestParam String str){kfkaProducer.send(str);return true;    }}

6、postman测试

这里首先应该在服务器启动监听器(kafka根目录),下面命令必须是具体的服务器ip,不能是localhost,是我踩过的坑:

推荐此处重启一下集群
关闭kafka命令:

cd /usr/local/kafka/kafka_2.11-1.1.0/bin./kafka-server-stop.sh ../config/server.properties &./kafka-server-stop.sh ../config/server2.properties &./kafka-server-stop.sh ../config/server3.properties &

此处应该jps看一下,等待所有的kafka都关闭(关不掉的kill掉),再重新启动kafka:

./kafka-server-start.sh ../config/server.properties &./kafka-server-start.sh ../config/server2.properties &./kafka-server-start.sh ../config/server3.properties &

等待kafka启动成功后,启动消费者监听端口:

cd /usr/local/kafka/kafka_2.11-1.1.0bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.11:9092 --from-beginning --topic testTopic

曾经我乱输的测试信息全部被监听过来了!

启动springboot服务

然后用postman生产消息:

然后享受成果,服务器端监听成功。

项目中也监听成功!

点个在看 你最好看

往期推荐

腾讯强推Redis成长手册!原理+应用+集群+拓展+源码五飞

阿里要求其程序员必须精通的并发编程笔记:原理+模式+应用

支付宝阿牛整合Netty+Redis+ZK「终极」高并发手册

饿了么架构师发布“绝版”Java并发实现原理:JDK源码剖析

不吹不擂,10年架构师公开分享SQL工作笔记,5字真言总结到位

由浅入深吃透容器云+微服务+K8S+MQ+阿里云内部实施手册

kafka 怎么样连接图形化界面_从零开始搭建Kafka+SpringBoot分布式消息系统相关推荐

  1. kafka 怎么样连接图形化界面_图形化编程有多简单,点亮LED不到一分钟

    Arduino编程在所有单片机当中应该说是最简单的了,但是还可以更加简单. 比如说图形化编程,图形化编程真正让Arduino大众化了,因为谁都可以通过图形化编程方式来制作自己需要的小玩意. 啃萝卜 关 ...

  2. 小红帽怎样装图形化界面_纯技术篇:U盘装系统,不再多花冤枉钱

    U盘装系统,顾名思义就是用U盘安装电脑的操作系统.这里小编用现在最常见的大白菜装机软件来开始教程. 1.使用大白菜装机版制作大白菜U盘启动盘之前,需要准备一个存储空间大于2G的U盘. 2.下载并且安装 ...

  3. Cent os7 _ LINUX虚拟机安装_设置网络及图形化界面_安装教程

    准备工具(可到官网去下载): 1.VMware Workstation 15.5.6 版本 链接:https://pan.baidu.com/s/1o9w1Em91dYms0zR0fF7u3A 提取码 ...

  4. c++图形化界面_还能这样用?Linux下如何编译C程序?

    Windows下常用IDE来编译,Linux下直接使用gcc来编译,编译过程是Linux嵌入式编程的基础,也是嵌入式高频基础面试问题. 一.命令行编译及各个细分编译过程 hello.c示例代码: #i ...

  5. 腾讯云linux服务器怎么使用图形化界面_自己搭建一个自动签到和远程下载的服务器...

    先上效果图: 天天不用为签到发愁了.否则有一天忘记了.后悔死了.所以这里构造自己的签到工具.选一个N1或贝壳云这类的盒子.低功耗,高扩展.想怎么弄就怎么弄. 这里用贝壳云做例子: 使用的镜像是:Arm ...

  6. 从零开始搭建Kafka+SpringBoot分布式消息系统

    前言 由于kafka强依赖于zookeeper,所以需先搭建好zookeeper集群.由于zookeeper是由java编写的,需运行在jvm上,所以首先应具备java环境. (ps:默认您的cent ...

  7. 小红帽怎样装图形化界面_红帽linux 怎么设置中文图形界面

    展开全部 设置中文图形界面来的操自作方法和步骤如下: 1.第bai一步,打开duubuntu设置(齿zhi轮位于右上角),或直接dao在左侧找到设置,如下图所示,然后进入下一步. 2.其次,完成上述步 ...

  8. SkyEye图形化界面使用技巧篇(一)

    目录 01.如何最快找到设备? 02.如何进行设备连线? 03.如何查看接口信息? 04.如何查看设备属性 05.如何美化设备布局? 06.如何将搭建成果保存以备下次使用? 本文主要介绍SkyEye图 ...

  9. 【转载】VMware安装CentOS7时忘记装图形化界面——如何补装GNOME

    亲测有用 [转载链接]https://zhuanlan.zhihu.com/p/126601630 一.存在问题 在VMware虚拟机中成功安装centOS 7系统后,如果启动centOS 7系统直接 ...

最新文章

  1. 提取某个符合条件的字符串中的中文字符 例子
  2. 嵌入式学习笔记——SPI协议
  3. ORACLE工作原理小结
  4. 本地如何安装运行多个vue.js项目?
  5. dom vue 加载完 执行_前端面试题Vue
  6. 3D字体海报的这么玩?效果很赞,不得不学!
  7. python远程备份mysql_python远程备份mysql并压缩
  8. form表单无刷新提交文件(iframe)
  9. link 和 style 元素在 HTML 文档中的位置
  10. 开源项目征集 | CSDN “开源加速器计划”之【开源技术栈选型 Show】
  11. c语言代码表白_程序员教你表白:C/C++打造浪漫表白程序,找女朋友从现在开始...
  12. XILINX FPGA数字信号处理——3、数字的表示和运算的实现
  13. 捕捞季节 通达信副图指标公式 源码
  14. 使用Android Studio导入源码
  15. 拟物化设计与扁平化设计
  16. QDockWidget标题栏
  17. ORACLE莫明其妙出错!
  18. java web 怎么实现直播_java web开发直播平台可以实现但有缺陷
  19. 一些五笔不好打出来的字(转)-留作记念
  20. 常用编程工具:VS Code,这款编译工具到底好不好用?

热门文章

  1. 【CSDN】-京东云部署java项目及性能测试
  2. mplab x ide 中文使用手册_中文文档:MPLAB ICD 4在线调试器用户指南
  3. java定义计算机类并模拟其操作
  4. java cpu高_Java中的CPU占用高和内存占用高的问题排查
  5. linux中iconv函数,Linux下编码转换(iconv函数族)
  6. Java 画精美图形
  7. 如何用记事本编写运行java程序?
  8. wps中图片怎么居中_wps图片怎么添加推动声
  9. python实例 79,80
  10. dnn神经网络 缺点_抄近路神经网络如何因找捷径而犯错