Java实现kafka生产者代码

参考 https://kafka.apache.org/10/documentation.html#consumerapi
有示例

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class MyProducer {/*** 实现生产数据到 kafka test 这个topic里面去* @param args*/public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "ip:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 获取KafkaProducer 这个类KafkaProducer<String, String> stringStringKafkaProducer = new KafkaProducer<>(props);//使用循环发送消息for (int i = 0; i < 20; i++){stringStringKafkaProducer.send(new ProducerRecord<String, String>("test", "message" + i));}//关闭stringStringKafkaProducer.close();}
}

Java实现kafka的消费者

第一种:自动提交offset方式

package edu.hfuu.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;
import java.util.Properties;public class MyConsumer {/*** 自动提交 offset* @param args*/public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "ip:9092");props.put("group.id", "test_group");  //消费组props.put("enable.auto.commit", "true");    //允许自动提交offsetprops.put("auto.commit.interval.ms", "1000"); //每隔多久自动提交offset//指定key、value的反序列化类props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//指定消费哪个topic里面的数据consumer.subscribe(Arrays.asList("test"));//使用死循环来消费test这个topic里面的数据while (true){//这里是我们所有拉取的数据 参数是超时时间ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {long offset = consumerRecord.offset();String value = consumerRecord.value();System.out.println("消息的offset值为:" + "消息的value值为:" + value);}}}
}

第二种:手动提交offset方式

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.protocol.types.Field;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;public class ManualConsumer {/*** 实现手动提交* @param args*/public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "ip:9092");props.put("group.id", "test_group");  //消费组props.put("enable.auto.commit", "false");    //手动提交offsetprops.put("auto.commit.interval.ms", "1000"); //每隔多久自动提交offset//指定key、value的反序列化类props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList("test"));int minBatchSize = 50;List<ConsumerRecord<String, String>> consumerRecordsList = new ArrayList<>();while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {consumerRecordsList.add(consumerRecord);}if (consumerRecordsList.size() >= minBatchSize) {//如果集合当中的数据大于等于20条数据的时候,我们批量进行处理
//                insertToDB(consumerRecordsList);  //保存到数据库System.out.println("手动提交数据");//提交offset,表示这一批次的数据全部处理完了
//                consumer.commitAsync();  //异步提交offset值 ,效率更高,不会阻塞代码的执行consumer.commitSync();  //同步提交offset值consumerRecordsList.clear(); //清空集合中的数据}}}
}

Kafka生产者和消费者相关推荐

  1. 深入分析Kafka生产者和消费者

    深入Kafka生产者和消费者 Kafka生产者 消息发送的流程 发送方式 发送并忘记 同步发送 异步发送 生产者属性配置 序列化器 分区器 自定义分区器 Kafka消费者 消费者属性配置 消费者基础概 ...

  2. Kafka 生产者、消费者命令行操作

    Kafka 生产者.消费者命令行操作 1.查看操作生产者命令参数 bin/kafka-console-producer.sh 参数 --bootstrap-server <String: ser ...

  3. Jmeter之创建Kafka生产者和消费者进行性能测试

    目录 1. A Brief Overview of Apache Kafka 2. Pepper-Box Serialized Config 3. Pepper Box Kafka Sampler 4 ...

  4. kafka生产者和消费者端的数据不一致

    撸了今年阿里.头条和美团的面试,我有一个重要发现.......>>> kafka生产者生产30条数据,而消费者却不一定消费了30条数据,经过探索发现了main线程执行完成了而kafk ...

  5. java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明

    本次的记录内容包括: 1.Java调用生产者APi流程 2.Kafka生产者Api的使用及说明 3.Kafka消费者Api的使用及说明 4.Kafka消费者自动提交Offset和手动提交Offset ...

  6. java最简单的kafka生产者和消费者,未结合spring

    目录 1 最简单的生产者和消费者 1.1 引入maven 1.2 基本的生产者和代码注释 1.3 最简单消费者 2 生产者发送消息的三种方式 2.1 直接send之后就不管了,会自动重试,可能丢失消息 ...

  7. Kafka生产者与消费者详解

    什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系 ...

  8. Kafka 生产者及消费者详解

    一.Kafka 生产者 1.1 分区策略 1)分区的原因 (1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群 ...

  9. pykafka连接重要使用pykafka,kafka-python的api开发kafka生产者和消费者

    https://pykafka.readthedocs.io/en/latest/api/producer.html 说明文档 </div><h2 class="heade ...

  10. kafka生产者、消费者java示例

    1. 生产者 import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.Ke ...

最新文章

  1. 基础笔记6(exception)
  2. 2021-10-20 Speaking Class
  3. 从Java面试官的角度,如何快速判断程序员的能力
  4. 常见数据结构的 Python 实现(建议收藏)
  5. Python切片各种情况详解
  6. iOS 动画基础总结篇
  7. Kubernetes-基本介绍/核心功能/相关术语(一)
  8. 3件Unreal Engine 3不得不说的故事
  9. 第19章 可空值类型
  10. 6-2 视频分解图片
  11. 为什么要使用Keil MDK-ARM中间件库?
  12. 信息系统项目管理笔记
  13. 如何阅读Java源码?
  14. FX2N-2DA模拟量输出模块简述
  15. [Python]通过有道词典API获取单词发音MP3
  16. 量子计算与通讯的基本原理(量子纠缠)
  17. No matter what,just do not give up。
  18. postgresql仅修改时间戳的时分秒写法
  19. 用java画企鹅_Fireworks绘制简笔QQ企鹅图像
  20. 单片机-c语言LED灯循环闪烁

热门文章

  1. Java使用IP代理突破IP限制进行投票
  2. 信息系统安全实验(一):InterNIC、Nslookup、Sam spade、Nmap、Nessus的使用
  3. 高校青年教师应该怎么提高收入
  4. 修改openssh版本信息
  5. enumerate和iter的使用
  6. 用 Python 实现词云可视化
  7. html网页制作摘要,关于静态HTML网页制作
  8. CSS元素宽度、继承父元素宽度、cale函数计算宽度总结
  9. 氢键H-H的博客目录
  10. 无监督图像分类《SCAN:Learning to Classify Images without》代码分析笔记(1):simclr