java生产者消费者代码_Java实现Kafka生产者消费者代码实例
Kafka的结构与RabbitMQ类似,消息生产者向Kafka服务器发送消息,Kafka接收消息后,再投递给消费者。
生产者的消费会被发送到Topic中,Topic中保存着各类数据,每一条数据都使用键、值进行保存。
每一个Topic中都包含一个或多个物理分区(Partition),分区维护着消息的内容和索引,它们有可能被保存在不同服务器。
新建一个Maven项目,pom.xml 加入依赖:
org.apache.kafka
kafka-clients
2.3.0
1、编写生产者
将消息投递到Kafka服务器的名称为“topic1”的Topic中
package com.example.kafkatest;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Producer {
public static void main(String[] args) {
//配置信息
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "localhost:9092");
//设置数据key和value的序列化处理类
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
//创建生产者实例
KafkaProducer producer = new KafkaProducer<>(props);
ProducerRecord record = new ProducerRecord("topic1", "userName", "lc");
//发送记录
producer.send(record);
producer.close();
}
}
运行后,可打开命令行工具,进入Kafka目录,执行命令查询服务器的Topic:
bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
结果如下:
2、编写消费者
本例中,消费者和生产者在同一个项目中,只是使用不同的启动类。
消费者会为自已指定一个消费者组的标识,每一条发布到Topic的记录,都会被交付给消费者组的一个消费者实例。
如果多个消费者实例有相同的消费者组,则这些记录会分配到各个消费者实例上,以达到负载均衡的目录。
如果所有的消费者有不同的消费者组,则每一条记录都会广播到全部的消费者进行处理。
package com.example.rabbittest;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
//配置信息
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "localhost:9092");
//必须指定消费者组
props.put("group.id", "test");
//设置数据key和value的序列化处理类
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", StringDeserializer.class);
//创建消息者实例
KafkaConsumer consumer = new KafkaConsumer<>(props);
//订阅topic1的消息
consumer.subscribe(Arrays.asList("topic1"));
//到服务器中读取记录
while (true){
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord record : records){
System.out.println("key:" + record.key() + "" + ",value:" + record.value());
}
}
}
}
运行后,IDEA控制台其中输出如下:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。
java生产者消费者代码_Java实现Kafka生产者消费者代码实例相关推荐
- java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明
本次的记录内容包括: 1.Java调用生产者APi流程 2.Kafka生产者Api的使用及说明 3.Kafka消费者Api的使用及说明 4.Kafka消费者自动提交Offset和手动提交Offset ...
- java多线程生产者与消费者问题_java多线程实现生产者与消费者问题
生产者与消费者多线程实现,首先的问题就是同步,就是关于临界资源的访问 我们首先来定义一个临界资源类,这里设为Q class Q { int z=4; } 这个int型的z就是我假设的临界资源的个数 然 ...
- java 与c 运行效率_Java语言与C语言代码运行效率的比较
<Java语言与C语言代码运行效率的比较>由会员分享,可在线阅读,更多相关<Java语言与C语言代码运行效率的比较(2页珍藏版)>请在人人文库网上搜索. 1.Java语言与C语 ...
- java 组织机构代码_JAVA实现社会统一信用代码校验的方法|chu
网上找了几个,写的都不太适合,有的写出来了,也没有给出参考的算法链接.这样就导致了如果产生错误我们无法排查(不懂原理怎么排查对吧). 如果在使用过程中有疑虑,请参考:较验规则 package com. ...
- java 组织机构代码_JAVA实现社会统一信用代码较验
网上找了几个,写的都不太适合,有的写出来了,也没有给出参考的算法链接.这样就导致了如果产生错误我们无法排查(不懂原理怎么排查对吧). 如果在使用过程中有疑虑,请参考:较验规则 代码实现 package ...
- 运行java提示未签名_java – JNLP:在签名代码中加载未签名的代码
我们在克服Java webstart的混合代码错误时遇到了困难.总之,我们有我们的主要JNLP文件,我们已经签署了它直接加载的所有代码.我们已将all-permissions选项添加到主JNLP中.它 ...
- java转换大小写快捷键_Java 大小写最快转换方式实例代码
Java 大小写最快转换方式实例代码 这里直接给出实现代码,在代码中注释都很清楚,不多做介绍. Java代码 package io.mycat; import java.util.stream.Int ...
- java 组织机构代码_JAVA实现社会统一信用代码校验的方法
网上找了几个,写的都不太适合,有的写出来了,也没有给出参考的算法链接.这样就导致了如果产生错误我们无法排查(不懂原理怎么排查对吧). 如果在使用过程中有疑虑,请参考:较验规则 package com. ...
- java结束全部操作代码_Java创建与结束线程代码示例
这篇文章主要介绍了Java创建与结束线程代码示例,小编觉得挺不错的,这里分享给大家,供需要的朋友参考. 本文讲述了在Java中如何创建和结束线程的最基本方法,只针对于Java初学者.一些高级知识如线程 ...
最新文章
- Struts2与Struts的区别
- Media Player控件常用的样式
- 软件构造学习笔记-第十一周
- IO多路复用之poll总结
- InstallShield SdShowMsg未关闭导致安装程序无法停止
- C# XML操作之读取XML数据
- qt输出中文乱码处理(解决方法)
- ThinkPHP—URL的访问以及各种方法的操作
- pandas数据处理实践四(时间序列date_range、数据分箱cut、分组技术GroupBy)
- 早上运动和晚上运动,哪个更利于减肥?
- HTML代码示例和介绍
- var模型eviews操作步骤
- Ubuntu下编译nanomsg库
- 如何查看 Codeigniter 版本号?
- 雨后小故事动态邪恶_当您遇到“邪恶”的问题时,使故事变小
- cmt obm odm 代工模式oem_什么是OEM,ODM与OBM
- 应届大学生入职的时候首要问公司的主要两个问题是?
- 在Silverlight 2 beta1中使用IronPython等动态语言
- 面对境外网络攻击,作为安全技术人员该如何有效防御?
- Shell攻关之shell基础
热门文章
- 2017安徽二级c语言,2017计算机二级C语言测试题及答案
- java 怎么用 string method return数量_java教程之Map应该怎么用
- 杂牌手柄模拟xboxone手柄_震了,Xbox One 精英手柄2代摸了一次就不舍得放下了
- html全局浮窗,Html 实现浮动窗口
- mysql数据库邮箱什么类型_MySQL的数据类型介绍
- mysql ab复制延时_Mysql的AB复制
- OpenXLSX 字段读取问题
- UE4角色Location远距离时动画抖动问题(float精度不够)解决方案
- openssl-1.0.1e for arm
- 让S3c2410里拥有HIVE注册表的 全部步骤