Apache Kafka-生产消费基础篇
文章目录
- POM 依赖
- 生产者
- 消费者
- 测试
POM 依赖
版本请同使用的kafka服务端的版本保持一致
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency>
生产者
请小伙伴注意一下注释,这里就不做多余的解释啦
package com.artisan.kafka.first;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/2/17 19:45* @mark: show me the code , change the world*/
public class ProduceClient {private static final String TOPIC = "artisanTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {// 属性设置 Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.126.140:9092");properties.put(ProducerConfig.ACKS_CONFIG,"1");properties.put(ProducerConfig.RETRIES_CONFIG,3);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 根据属性实例化KafkaProducerProducer<String,String> producer = new KafkaProducer<String, String>(properties);// 创建消息 三个参数,分别是 Topic ,消息的 key ,消息的 message 。String message = "mockValue";ProducerRecord<String ,String> producerRecord = new ProducerRecord<>(TOPIC, "mockKey", message);// 发送消息 (同步)Future<RecordMetadata> result = producer.send(producerRecord);// 获取同步发送的结果RecordMetadata recordMetadata = result.get();System.out.println(String.format("Message[ %s ] sent to Topic: %s || Partition: %s || Offset: %s",message, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()));}}
消费者
请小伙伴注意一下注释,这里就不做多余的解释啦
package com.artisan.kafka.first;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/2/17 20:09* @mark: show me the code , change the world*/
public class ConsumerClient {private static final String TOPIC = "artisanTopic";public static void main(String[] args) {// 属性设置Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG , "192.168.126.140:9092"); // Broker 的地址properties.put(ConsumerConfig.GROUP_ID_CONFIG,"artisan-consumer-group");// 消费者分组properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); // 设置消费者分组最初的消费进度为 earliestproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"); // 是否自动提交消费进度properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); // 自动提交消费进度频率properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息的 key 的反序列化方式properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 消息的 value 的反序列化方式// 根据设置实例化KafkaConsumerConsumer<String,String> consumer = new KafkaConsumer<>(properties);// 订阅消息consumer.subscribe(Collections.singleton(TOPIC));// 循环拉取消息while (true){// 拉取消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));// 遍历处理消息records.forEach(record -> System.out.println(String.format("接收到消息:Key %s || 内容 %s" , record.key(),record.value())));}}
}
属性的话,需要结合kafka的特性来讲解,后面的单独介绍
测试
运行Produce
运行消费端
Apache Kafka-生产消费基础篇相关推荐
- Apache Kafka-Spring Kafka生产消费@KafkaListener源码解析
文章目录 概述 Spring-kafka生产者源码流程 Spring-kafka消费者源码流程(`@EnableKafka`和`@KafkaListener` ) Flow KafkaListener ...
- kafka生产消费原理笔记
一.什么是kafka Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性 ...
- java利用kafka生产消费消息
2019独角兽企业重金招聘Python工程师标准>>> 1.producer程序 package com.test.frame.kafka.controller;import kaf ...
- java kafka 消费_java利用kafka生产消费消息
1.producer程序 package com.test.frame.kafka.controller; import kafka.javaapi.producer.Producer; import ...
- 本地windows下新建kafka生产消费数据
Kafka的运行依赖于Zookeeper,所以在运行Kafka之前我们需要安装并运行Zookeeper 1.安装zookeeper 1.1 下载安装文件: http://mirror.bit.edu. ...
- Apache Doris 系列: 基础篇-Routine Load
简介 Routine Load 支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中. 目前仅支持通过无认证或者 SSL 认证方式,从 Kakfa 导入 CS ...
- Apache Doris 系列: 基础篇-Flink SQL写入Doris
简介 本文介绍 Flink SQL如何流式写入 Apache Doris,分为一下几个部分: Flink Doris connector Doris FE 节点配置 Flink SQL 写 Doris ...
- KAFKA 集成 SpringBoot2 消息发送和消费消息(基础篇)
文章目录 1. 技术选型 2. 导入依赖 3. kafka配置 4. 生产者(同步) 5. 生产者(异步) 6. 消费者 1. 技术选型 软件/框架 版本 jdk 1.8.0_202 springbo ...
- MAC搭建kafka客户端以及实现生产消费
Kafka 部分参数说明 (1)max.in.flight.requests.per.connection Kafka 可以保证同一个分区里的消息是有序的.也就是说,如果生产者按照一定的顺序发送消息, ...
最新文章
- if you have something important on the clean my mac
- 【字符比较】单字符比较值是否相等
- python类的继承super方法_Python类的继承super相关原理解析
- 嘉实多RO150合成齿轮油
- IDEA开发中,类的头位置生成作者时间信息
- 归并排序Merge sort(转)
- @SpringBootApplication注解分析
- AAAI2020中的四篇推荐系统好文(附论文下载链接)
- C++11/14::右值引用
- jqueryui时间插件_jQueryUI菜单插件教程示例
- 文件和目录属性ls which alias
- paip.python错误解决18
- 线性代数 第六版 答案
- 【Linux环境下C语言编程】
- c#调用摄像头进行二维码扫码
- emacs下使用google-cpplint
- 邮件群发平台_招聘平台挑选邮件群发平台时应该注意什么
- STM32 HAL库获取系统时钟与标准库获取系统时钟
- 环保设备物联网远程监控维护解决方案
- Spring事务的传播机制
热门文章
- Linux脚本获取日期,Shell脚本获取格式化日期与时间
- python 数组基本用法
- tomcat linux dump,Linux下Tomcat常用命令与配置
- 学生电脑哪个牌子好_专卖工作服哪个牌子好
- pytorch笔记 torch.clamp(截取上下限)
- 机器学习笔记:Transformer
- R语言实战应用精讲50篇(二十七)-时空数据分析-经验空间/时间均值(latex公式+R代码绘图)
- Flink从入门到精通100篇(十五)-Flink SQL FileSystem Connector 分区提交与自定义小文件合并策略 ​
- R语言-向量自回归模型VAR的实现
- anaconda2-keras安装;keras后端修改