kafka消费者如何读同一生产者消息_Kafka入门之生产者消费者
一、Kafka安装与使用 ( kafka介绍 )
1. 下载Kafka
2. 安装
Kafka是使用scala编写的运行与jvm虚拟机上的程序,虽然也可以在windows上使用,但是kafka基本上是运行在linux服务器上,(也可以运行在windows上)因此我们这里也使用linux来开始今天的实战。首先确保你的机器上安装了jdk,kafka需要java运行环境,以前的kafka还需要zookeeper,新版的kafka已经内置了一个zookeeper环境,所以我们可以直接使用。说是安装,如果只需要进行最简单的尝试的话我们只需要解压到任意目录即可,这里我们将kafka压缩包解压到/home目录
Kafka目录如下:
1 其中bin是执行文件目录,包括linux下的执行文件,以及bin/window目录下包含windows执行的批处理命令;2 config中包含kafka的配置文件;3 libs中是kafka的各种依赖包。
配置Kafka文件(不配置也能在本地机上执行,不配置默认主机是localhost )
命令行输入: vi server.properties #编辑修改相应的参数
1 broker.id=0
2
3 port=9092#端口号4
5 host.name=192.168.0.11#服务器IP地址,修改为自己的服务器IP6
7 log.dirs=/usr/local/kafka/log/kafka #日志存放路径,上面创建的目录 (改成自己的目录)8
9 zookeeper.connect=localhost:2181 #zookeeper地址和端口,单机配置部署,localhost:2181
4. 命令行运行
4.1 启动zookeeper
cd进入kafka解压目录,输入
bin/zookeeper-server-start.sh config/zookeeper.properties
启动zookeeper成功后会看到如下的输出
4.2 启动kafka
cd进入kafka解压目录,输入
bin/kafka-server-start.sh config/server.properties
启动kafka成功后会看到如下的输出
5. 第一个消息(Linux)
5.1 创建一个topic
Kafka通过topic对同一类的数据进行管理,同一类的数据使用同一个topic可以在处理数据时更加的便捷
在kafka解压目录打开终端,输入
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topictest
创建一个名为test的topic
在创建topic后可以通过输入
bin/kafka-topics.sh --list --zookeeper localhost:2181
来查看已经创建的topic
5.2创建一个消息消费者
在kafka解压目录打开终端,输入
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topictest --from-beginning
可以创建一个用于消费topic为test的消费者
消费者创建完成之后,因为还没有发送任何数据,因此这里在执行后没有打印出任何数据
不过别着急,不要关闭这个终端,打开一个新的终端,接下来我们创建第一个消息生产者
5.3 创建一个消息生产者
在kafka解压目录打开一个新的终端,输入
bin/kafka-console-producer.sh --broker-list localhost:9092 --topictest
在执行完毕后会进入的编辑器页面
在发送完消息之后,可以回到我们的消息消费者终端中,可以看到,终端中已经打印出了我们刚才发送的消息
第4步可以通过脚本文件进行实现:
1) 创建启动脚本,假设我们的Kafka在/usr/local/目录下
cd /usr/local/kafka #创建启动脚本
vi kafkastart.sh #编辑,添加以下代码
1 #!/bin/sh
2 #创建启动脚本3 #启动zookeeper4 /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
5 sleep 3#等3秒后执行6
7 #启动kafka8 /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
2) 创建关闭脚本
vi kafkastop.sh #编辑,添加以下代码
1 #!/bin/sh
2 #创建关闭脚本3 #关闭kafka4 /usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/config/server.properties &
5 sleep 3#等3秒后执行6
7 #关闭zookeeper8 /usr/local/kafka/bin/zookeeper-server-stop.sh /usr/local/kafka/config/zookeeper.properties &
3)命令行添加执行权限
1 #添加脚本执行权限2 chmod +x kafkastart.sh3 chmod +x kafkastop.sh
4)命令行执行脚本
1 sh /usr/local/kafka/kafkastart.sh #启动kafka2
3 sh /usr/local/kafka/kafkastop.sh #关闭kafka
进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0,按下Shift+右键,选择“打开命令窗口”选项,打开命令行,输入:
6.1 启动zookeeper
.\bin\windows\zookeeper-server-start.bat .\config\server.properties
6.2 启动Kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties
注意:注意:不要关了这个窗口,启用Kafka前请确保ZooKeeper实例已经准备好并开始运行
6.3 创建主题
进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0,按下Shift+右键,选择“打开命令窗口”选项,打开命令行,输入:
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
注意:不要关了这个窗口
查看主题输入:
.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
6.4 创建生产者
进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0,按下Shift+右键,选择“打开命令窗口”选项,打开命令行,输入:
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
注意:不要关了这个窗口
6.5 创建消费者
进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0,按下Shift+右键,选择“打开命令窗口”选项,打开命令行,输入:
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
其中6.1和6.2可以使用批处理文件
1) 创建启动脚本,假设我们的Kafka在D:\Kafka\kafka_2.12-0.11.0.0目录下
切换到 D:\Kafka\kafka_2.12-0.11.0.0目录下 #创建启动脚本
用文本编辑器编辑kafkastart.bat #编辑,添加以下代码
#创建启动脚本
# ...自己添加
vi kafkastop.sh #编辑,添加以下代码
#创建关闭脚本
# 自己添加
双击即可运行。
7. 使用Java程序(模拟真实生产环境;生产者在Kafka服务器上,消费者在客户端; 可以推广到分布式环境中)
如果是生产者以及消费者都在本机进行测试,则Kafka中配置文件不需要改变;且生产者和消费者都在同一台机器上。
否则:
7.1 创建Topic
7.2 生产者
eclipse中创建一个名为KafkaProduce的Java Project;接着右击该项目new一个名为lib的Folder;然后将我们部署的kafka的libs中的所有Jar包拷贝到该项目的lib目录下;接着右击该项目,build Path,然后选择configure build path中的Libraries,接着Add Jars;将本项目lib下的所有Jar包添加进来。
packagecom.zc.kafka.producer.main;importjava.util.Properties;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.Producer;importorg.apache.kafka.clients.producer.ProducerRecord;/*** Kafka生产者
* 先启动生产者,发送消息到broker,这里简单发送了10条从0-9的消息,再启动消费者,控制台输出如下:*/
public classSimpleKafkaProducer {public static voidmain(String[] args) {//TODO Auto-generated method stub
Properties props= newProperties();//broker地址
props.put("bootstrap.servers", "192.168.0.11:9092"); // "localhost:9092"//请求时候需要验证
props.put("acks", "all");//请求失败时候需要重试
props.put("retries", 0);//内存缓存区大小
props.put("buffer.memory", 33554432);//指定消息key序列化方式
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//指定消息本身的序列化方式
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) { //i < 10//生产一条消息的时间有点长
producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));//System.out.println(i);
}
System.out.println("Message sent successfully");
producer.close();
}
}
7.3 消费者
eclipse中创建一个名为KafkaConsumer的Java Project;接着右击该项目new一个名为lib的Folder;然后将我们部署的kafka的libs中的所有Jar包拷贝到该项目的lib目录下;接着右击该项目,build Path,然后选择configure build path中的Libraries,接着Add Jars;将本项目lib下的所有Jar包添加进来。
packagecom.zc.kafka.consumer.main;importjava.util.Collections;importjava.util.Properties;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;/*** kafka消费者*/
public classSimpleKafkaConsumer {
@SuppressWarnings({"deprecation", "resource"})public static voidmain(String[] args) {//TODO Auto-generated method stub
Properties props = newProperties();
props.put("bootstrap.servers", "192.168.0.11:9092"); // "localhost:9092"//每个消费者分配独立的组号
props.put("group.id", "test");//如果value合法,则自动提交偏移量
props.put("enable.auto.commit", "true");//设置多久一次更新被消费消息的偏移量
props.put("auto.commit.interval.ms", "1000");//设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
props.put("session.timeout.ms", "30000");//
//props.put("auto.offset.reset", "earliest");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test")); //核心函数1:订阅topic
System.out.println("Subscribed to topic " + "test");//int i = 0;
while (true) {//System.out.println(i++);//核心函数2:long poll,一次拉取回来多个消息
/*读取数据,读取超时时间为100ms*/ConsumerRecords records = consumer.poll(100);//System.out.println(records.count());
for (ConsumerRecordrecord : records)//print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
7.4 打Jar包执行
1)打Jar包
右击该项目,选择Export;之后选择Runnable JAR file,接着next; 然后在Launch configuration中选择主类(含main方法),如果没有,则需要先运行该主类,接着Export destination选择Jar包的存放位置和名称,接着Library handling 选择第二个,Finish;会生成相应Jar包。
通过 java -jar XXX.jar 运行该Jar包。
2)执行
将生产者与消费者都打成相应Jar包;都可以在服务器(有Kafka环境)和客户机(没有Kafka环境)上执行;并且生产者和消费者可以在不同客户机上也可以在相同客户机上执行。
就是我们编程以及运行的kafka项目,跟有没有Kafka环境是无关的。
1. 服务器上先启动Kafka
2. 服务器或者客户机上启动生产者 java -jar KafkaProducer.jar
3. 服务器或者客户机上启动消费者 java -jar KafkaConsumer.jar
kafka消费者如何读同一生产者消息_Kafka入门之生产者消费者相关推荐
- kafka消费者如何读同一生产者消息_Kafka系列3:深入理解Kafka消费者
上面两篇聊了Kafka概况和Kafka生产者,包含了Kafka的基本概念.设计原理.设计核心以及生产者的核心原理.本篇单独聊聊Kafka的消费者,包括如下内容:消费者和消费者组 如何创建消费者 如何消 ...
- kafka消费者如何读同一生产者消息_Kafka消费者生产者实例
为了更为直观展示Kafka的消息生产消费的过程,我会从基于Console和基于Application两个方面介绍使用实例.Kafka是一个分布式流处理平台,具体来说有三层含义: 它允许发布和订阅记录流 ...
- kudu接受kafka消息_Kafka入门详解
1.1 什么是kafka? Kafka最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的 ...
- Kafka消息队列 入门到精通 看这一篇就够了
文章目录 第一章 概述 1.1 Kafka 的定义及特点 1.2 消息队列的介绍 1.3 Kafka 的基础架构 第二章 入门 2.1 Kafka 的安装部署 2.2 Kafka 命令行操作 第三章 ...
- windows安装kafka 2.8.1以及创建主题(topic)生产者和消费者消息接收和发送
windows 安装kafka 2.8.1 电脑上的前置的条件为 1.在电脑上有安装jdk 2.2.8.1中的kafka版本里面有包含了zookeeper,故此本版本不需要安装zookeeper 第一 ...
- kafka consumer配置拉取速度慢_Kafka消费者的使用和原理
这周我们学习下消费者,仍然还是先从一个消费者的Hello World学起: public class Consumer { public static void main(String[] args) ...
- Kafka生产者——消息发送流程,同步、异步发送API
生产者消息发送流程 发送原理 Kafka的Producer发送消息采用的是异步发送的方式. 在消息发送的过程中,涉及到了两个线程:main线程和Sender线程,以及一个线程共享变量:RecordAc ...
- RocketMQ的消费者消息重试和生产者消息重投
详细介绍了RocketMQ的消息重试机制,RocketMQ的消息重试可以分为生产者重试和消费者重试两个部分. 文章目录 1 生产者重试 2 消费者重试 2.1 异常重试 2.1.1 并发消费的重试 2 ...
- RabbitMQ:消费者ACK机制、生产者消息确认
文章目录 基础案例环境搭建: 环境: 1. 生产者发送消息确认 1.1 confirm 确认模式 1.2 return 退回模式 源代码 1.1.3 小结 2. 消费者签收消息(ACK) 2.1 代码 ...
最新文章
- vue点击增加class_Vuevbind动态绑定class
- IT人的学习方法论-续集 关于英语的学习
- spring_Spring MVC控制器的单元测试:REST API
- 动态规划入门_数塔问题
- wchar_t * 与 char * 互相转换小记
- 使用Mockito时遇到的一些问题
- [恢]hdu 1019
- 华为徐直军:2020年将末位淘汰10%主管,生存是第一要务
- ReactNative绑定函数中的this
- ubuntu重置root密码
- Java运行时的子类识别
- Bash中单引号和双引号之间的区别
- centos 和 radhat 配置epel仓库
- oneNote笔记名不同步
- 欢迎来到咆哮2020:人工智能时代
- 物研究所做一位科研人员
- js 正则例子 验证美国电话号码
- High-speed Charting Control 控件使用
- java任务系统设计_任务调度系统-任务依赖的设计
- Linux应用编程和网络编程(3)------- Linux中文件的属性
热门文章
- canvas实现的喜羊羊图像效果
- Dancing_Links总结 【by AbandonZHANG】
- 项目版本控制器SVN的环境建立(Subversion,subclipse)
- 模态对话框阻塞主线程的话不影响其他线程操作主线程控件(不阻塞)
- mysql数据库提示本地无法连接远程服务器(Host is not allowed to connect to this MySQL server)解决办法
- qt中Qtableview的用法
- QT操作sqlite概念
- 代码区,初始化全局数据区,BSS,堆区,栈区,程序环境变量区简介
- std::bind绑定成员函数,为什么第二个参数必须绑定对象地址
- Android开发之虹软人脸识别活体检测SDK包Bitmap转NV21方法