文章目录

  • POM 依赖
  • 生产者
  • 消费者
  • 测试

POM 依赖

版本请同使用的kafka服务端的版本保持一致

     <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency>

生产者

请小伙伴注意一下注释,这里就不做多余的解释啦

package com.artisan.kafka.first;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/2/17 19:45* @mark: show me the code , change the world*/
public class ProduceClient {private static final String TOPIC = "artisanTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {// 属性设置 Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.126.140:9092");properties.put(ProducerConfig.ACKS_CONFIG,"1");properties.put(ProducerConfig.RETRIES_CONFIG,3);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 根据属性实例化KafkaProducerProducer<String,String>  producer = new KafkaProducer<String, String>(properties);// 创建消息  三个参数,分别是 Topic ,消息的 key ,消息的 message 。String message = "mockValue";ProducerRecord<String ,String> producerRecord = new ProducerRecord<>(TOPIC, "mockKey", message);// 发送消息  (同步)Future<RecordMetadata> result = producer.send(producerRecord);// 获取同步发送的结果RecordMetadata recordMetadata = result.get();System.out.println(String.format("Message[ %s ] sent to Topic: %s  || Partition: %s || Offset: %s",message, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()));}}

消费者

请小伙伴注意一下注释,这里就不做多余的解释啦

package com.artisan.kafka.first;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/2/17 20:09* @mark: show me the code , change the world*/
public class ConsumerClient {private static final String TOPIC = "artisanTopic";public static void main(String[] args) {// 属性设置Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG , "192.168.126.140:9092");  // Broker 的地址properties.put(ConsumerConfig.GROUP_ID_CONFIG,"artisan-consumer-group");// 消费者分组properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");  // 设置消费者分组最初的消费进度为 earliestproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"); // 是否自动提交消费进度properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); // 自动提交消费进度频率properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息的 key 的反序列化方式properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 消息的 value 的反序列化方式// 根据设置实例化KafkaConsumerConsumer<String,String>  consumer = new KafkaConsumer<>(properties);// 订阅消息consumer.subscribe(Collections.singleton(TOPIC));// 循环拉取消息while (true){// 拉取消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));// 遍历处理消息records.forEach(record -> System.out.println(String.format("接收到消息:Key %s  || 内容 %s" , record.key(),record.value())));}}
}

属性的话,需要结合kafka的特性来讲解,后面的单独介绍


测试

运行Produce

运行消费端

Apache Kafka-生产消费基础篇相关推荐

  1. Apache Kafka-Spring Kafka生产消费@KafkaListener源码解析

    文章目录 概述 Spring-kafka生产者源码流程 Spring-kafka消费者源码流程(`@EnableKafka`和`@KafkaListener` ) Flow KafkaListener ...

  2. kafka生产消费原理笔记

    一.什么是kafka Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性 ...

  3. java利用kafka生产消费消息

    2019独角兽企业重金招聘Python工程师标准>>> 1.producer程序 package com.test.frame.kafka.controller;import kaf ...

  4. java kafka 消费_java利用kafka生产消费消息

    1.producer程序 package com.test.frame.kafka.controller; import kafka.javaapi.producer.Producer; import ...

  5. 本地windows下新建kafka生产消费数据

    Kafka的运行依赖于Zookeeper,所以在运行Kafka之前我们需要安装并运行Zookeeper 1.安装zookeeper 1.1 下载安装文件: http://mirror.bit.edu. ...

  6. Apache Doris 系列: 基础篇-Routine Load

    简介 Routine Load 支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中. 目前仅支持通过无认证或者 SSL 认证方式,从 Kakfa 导入 CS ...

  7. Apache Doris 系列: 基础篇-Flink SQL写入Doris

    简介 本文介绍 Flink SQL如何流式写入 Apache Doris,分为一下几个部分: Flink Doris connector Doris FE 节点配置 Flink SQL 写 Doris ...

  8. KAFKA 集成 SpringBoot2 消息发送和消费消息(基础篇)

    文章目录 1. 技术选型 2. 导入依赖 3. kafka配置 4. 生产者(同步) 5. 生产者(异步) 6. 消费者 1. 技术选型 软件/框架 版本 jdk 1.8.0_202 springbo ...

  9. MAC搭建kafka客户端以及实现生产消费

    Kafka 部分参数说明 (1)max.in.flight.requests.per.connection Kafka 可以保证同一个分区里的消息是有序的.也就是说,如果生产者按照一定的顺序发送消息, ...

最新文章

  1. if you have something important on the clean my mac
  2. 【字符比较】单字符比较值是否相等
  3. python类的继承super方法_Python类的继承super相关原理解析
  4. 嘉实多RO150合成齿轮油
  5. IDEA开发中,类的头位置生成作者时间信息
  6. 归并排序Merge sort(转)
  7. @SpringBootApplication注解分析
  8. AAAI2020中的四篇推荐系统好文(附论文下载链接)
  9. C++11/14::右值引用
  10. jqueryui时间插件_jQueryUI菜单插件教程示例
  11. 文件和目录属性ls which alias
  12. paip.python错误解决18
  13. 线性代数 第六版 答案
  14. 【Linux环境下C语言编程】
  15. c#调用摄像头进行二维码扫码
  16. emacs下使用google-cpplint
  17. 邮件群发平台_招聘平台挑选邮件群发平台时应该注意什么
  18. STM32 HAL库获取系统时钟与标准库获取系统时钟
  19. 环保设备物联网远程监控维护解决方案
  20. Spring事务的传播机制

热门文章

  1. Linux脚本获取日期,Shell脚本获取格式化日期与时间
  2. python 数组基本用法
  3. tomcat linux dump,Linux下Tomcat常用命令与配置
  4. 学生电脑哪个牌子好_专卖工作服哪个牌子好
  5. pytorch笔记 torch.clamp(截取上下限)
  6. 机器学习笔记:Transformer
  7. R语言实战应用精讲50篇(二十七)-时空数据分析-经验空间/时间均值(latex公式+R代码绘图)
  8. Flink从入门到精通100篇(十五)-Flink SQL FileSystem Connector 分区提交与自定义小文件合并策略 ​
  9. R语言-向量自回归模型VAR的实现
  10. anaconda2-keras安装;keras后端修改