Kafka的结构与RabbitMQ类似,消息生产者向Kafka服务器发送消息,Kafka接收消息后,再投递给消费者。

生产者的消费会被发送到Topic中,Topic中保存着各类数据,每一条数据都使用键、值进行保存。

每一个Topic中都包含一个或多个物理分区(Partition),分区维护着消息的内容和索引,它们有可能被保存在不同服务器。

新建一个Maven项目,pom.xml 加入依赖:

org.apache.kafka

kafka-clients

2.3.0

1、编写生产者

将消息投递到Kafka服务器的名称为“topic1”的Topic中

package com.example.kafkatest;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Producer {

public static void main(String[] args) {

//配置信息

Properties props = new Properties();

//kafka服务器地址

props.put("bootstrap.servers", "localhost:9092");

//设置数据key和value的序列化处理类

props.put("key.serializer", StringSerializer.class);

props.put("value.serializer", StringSerializer.class);

//创建生产者实例

KafkaProducer producer = new KafkaProducer<>(props);

ProducerRecord record = new ProducerRecord("topic1", "userName", "lc");

//发送记录

producer.send(record);

producer.close();

}

}

运行后,可打开命令行工具,进入Kafka目录,执行命令查询服务器的Topic:

bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

结果如下:

2、编写消费者

本例中,消费者和生产者在同一个项目中,只是使用不同的启动类。

消费者会为自已指定一个消费者组的标识,每一条发布到Topic的记录,都会被交付给消费者组的一个消费者实例。

如果多个消费者实例有相同的消费者组,则这些记录会分配到各个消费者实例上,以达到负载均衡的目录。

如果所有的消费者有不同的消费者组,则每一条记录都会广播到全部的消费者进行处理。

package com.example.rabbittest;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;

import java.util.Arrays;

import java.util.Properties;

public class Consumer {

public static void main(String[] args) {

//配置信息

Properties props = new Properties();

//kafka服务器地址

props.put("bootstrap.servers", "localhost:9092");

//必须指定消费者组

props.put("group.id", "test");

//设置数据key和value的序列化处理类

props.put("key.deserializer", StringDeserializer.class);

props.put("value.deserializer", StringDeserializer.class);

//创建消息者实例

KafkaConsumer consumer = new KafkaConsumer<>(props);

//订阅topic1的消息

consumer.subscribe(Arrays.asList("topic1"));

//到服务器中读取记录

while (true){

ConsumerRecords records = consumer.poll(Duration.ofMillis(100));

for(ConsumerRecord record : records){

System.out.println("key:" + record.key() + "" + ",value:" + record.value());

}

}

}

}

运行后,IDEA控制台其中输出如下:

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

java生产者消费者代码_Java实现Kafka生产者消费者代码实例相关推荐

  1. java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明

    本次的记录内容包括: 1.Java调用生产者APi流程 2.Kafka生产者Api的使用及说明 3.Kafka消费者Api的使用及说明 4.Kafka消费者自动提交Offset和手动提交Offset ...

  2. java多线程生产者与消费者问题_java多线程实现生产者与消费者问题

    生产者与消费者多线程实现,首先的问题就是同步,就是关于临界资源的访问 我们首先来定义一个临界资源类,这里设为Q class Q { int z=4; } 这个int型的z就是我假设的临界资源的个数 然 ...

  3. java 与c 运行效率_Java语言与C语言代码运行效率的比较

    <Java语言与C语言代码运行效率的比较>由会员分享,可在线阅读,更多相关<Java语言与C语言代码运行效率的比较(2页珍藏版)>请在人人文库网上搜索. 1.Java语言与C语 ...

  4. java 组织机构代码_JAVA实现社会统一信用代码校验的方法|chu

    网上找了几个,写的都不太适合,有的写出来了,也没有给出参考的算法链接.这样就导致了如果产生错误我们无法排查(不懂原理怎么排查对吧). 如果在使用过程中有疑虑,请参考:较验规则 package com. ...

  5. java 组织机构代码_JAVA实现社会统一信用代码较验

    网上找了几个,写的都不太适合,有的写出来了,也没有给出参考的算法链接.这样就导致了如果产生错误我们无法排查(不懂原理怎么排查对吧). 如果在使用过程中有疑虑,请参考:较验规则 代码实现 package ...

  6. 运行java提示未签名_java – JNLP:在签名代码中加载未签名的代码

    我们在克服Java webstart的混合代码错误时遇到了困难.总之,我们有我们的主要JNLP文件,我们已经签署了它直接加载的所有代码.我们已将all-permissions选项添加到主JNLP中.它 ...

  7. java转换大小写快捷键_Java 大小写最快转换方式实例代码

    Java 大小写最快转换方式实例代码 这里直接给出实现代码,在代码中注释都很清楚,不多做介绍. Java代码 package io.mycat; import java.util.stream.Int ...

  8. java 组织机构代码_JAVA实现社会统一信用代码校验的方法

    网上找了几个,写的都不太适合,有的写出来了,也没有给出参考的算法链接.这样就导致了如果产生错误我们无法排查(不懂原理怎么排查对吧). 如果在使用过程中有疑虑,请参考:较验规则 package com. ...

  9. java结束全部操作代码_Java创建与结束线程代码示例

    这篇文章主要介绍了Java创建与结束线程代码示例,小编觉得挺不错的,这里分享给大家,供需要的朋友参考. 本文讲述了在Java中如何创建和结束线程的最基本方法,只针对于Java初学者.一些高级知识如线程 ...

最新文章

  1. Struts2与Struts的区别
  2. Media Player控件常用的样式
  3. 软件构造学习笔记-第十一周
  4. IO多路复用之poll总结
  5. InstallShield SdShowMsg未关闭导致安装程序无法停止
  6. C# XML操作之读取XML数据
  7. qt输出中文乱码处理(解决方法)
  8. ThinkPHP—URL的访问以及各种方法的操作
  9. pandas数据处理实践四(时间序列date_range、数据分箱cut、分组技术GroupBy)
  10. 早上运动和晚上运动,哪个更利于减肥?
  11. HTML代码示例和介绍
  12. var模型eviews操作步骤
  13. Ubuntu下编译nanomsg库
  14. 如何查看 Codeigniter 版本号?
  15. 雨后小故事动态邪恶_当您遇到“邪恶”的问题时,使故事变小
  16. cmt obm odm 代工模式oem_什么是OEM,ODM与OBM
  17. 应届大学生入职的时候首要问公司的主要两个问题是?
  18. 在Silverlight 2 beta1中使用IronPython等动态语言
  19. 面对境外网络攻击,作为安全技术人员该如何有效防御?
  20. Shell攻关之shell基础

热门文章

  1. 2017安徽二级c语言,2017计算机二级C语言测试题及答案
  2. java 怎么用 string method return数量_java教程之Map应该怎么用
  3. 杂牌手柄模拟xboxone手柄_震了,Xbox One 精英手柄2代摸了一次就不舍得放下了
  4. html全局浮窗,Html 实现浮动窗口
  5. mysql数据库邮箱什么类型_MySQL的数据类型介绍
  6. mysql ab复制延时_Mysql的AB复制
  7. OpenXLSX 字段读取问题
  8. UE4角色Location远距离时动画抖动问题(float精度不够)解决方案
  9. openssl-1.0.1e for arm
  10. 让S3c2410里拥有HIVE注册表的 全部步骤