Kafka生产者和消费者
Java实现kafka生产者代码
参考 https://kafka.apache.org/10/documentation.html#consumerapi
有示例
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class MyProducer {/*** 实现生产数据到 kafka test 这个topic里面去* @param args*/public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "ip:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 获取KafkaProducer 这个类KafkaProducer<String, String> stringStringKafkaProducer = new KafkaProducer<>(props);//使用循环发送消息for (int i = 0; i < 20; i++){stringStringKafkaProducer.send(new ProducerRecord<String, String>("test", "message" + i));}//关闭stringStringKafkaProducer.close();}
}
Java实现kafka的消费者
第一种:自动提交offset方式
package edu.hfuu.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;
import java.util.Properties;public class MyConsumer {/*** 自动提交 offset* @param args*/public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "ip:9092");props.put("group.id", "test_group"); //消费组props.put("enable.auto.commit", "true"); //允许自动提交offsetprops.put("auto.commit.interval.ms", "1000"); //每隔多久自动提交offset//指定key、value的反序列化类props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//指定消费哪个topic里面的数据consumer.subscribe(Arrays.asList("test"));//使用死循环来消费test这个topic里面的数据while (true){//这里是我们所有拉取的数据 参数是超时时间ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {long offset = consumerRecord.offset();String value = consumerRecord.value();System.out.println("消息的offset值为:" + "消息的value值为:" + value);}}}
}
第二种:手动提交offset方式
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.protocol.types.Field;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;public class ManualConsumer {/*** 实现手动提交* @param args*/public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "ip:9092");props.put("group.id", "test_group"); //消费组props.put("enable.auto.commit", "false"); //手动提交offsetprops.put("auto.commit.interval.ms", "1000"); //每隔多久自动提交offset//指定key、value的反序列化类props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList("test"));int minBatchSize = 50;List<ConsumerRecord<String, String>> consumerRecordsList = new ArrayList<>();while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {consumerRecordsList.add(consumerRecord);}if (consumerRecordsList.size() >= minBatchSize) {//如果集合当中的数据大于等于20条数据的时候,我们批量进行处理
// insertToDB(consumerRecordsList); //保存到数据库System.out.println("手动提交数据");//提交offset,表示这一批次的数据全部处理完了
// consumer.commitAsync(); //异步提交offset值 ,效率更高,不会阻塞代码的执行consumer.commitSync(); //同步提交offset值consumerRecordsList.clear(); //清空集合中的数据}}}
}
Kafka生产者和消费者相关推荐
- 深入分析Kafka生产者和消费者
深入Kafka生产者和消费者 Kafka生产者 消息发送的流程 发送方式 发送并忘记 同步发送 异步发送 生产者属性配置 序列化器 分区器 自定义分区器 Kafka消费者 消费者属性配置 消费者基础概 ...
- Kafka 生产者、消费者命令行操作
Kafka 生产者.消费者命令行操作 1.查看操作生产者命令参数 bin/kafka-console-producer.sh 参数 --bootstrap-server <String: ser ...
- Jmeter之创建Kafka生产者和消费者进行性能测试
目录 1. A Brief Overview of Apache Kafka 2. Pepper-Box Serialized Config 3. Pepper Box Kafka Sampler 4 ...
- kafka生产者和消费者端的数据不一致
撸了今年阿里.头条和美团的面试,我有一个重要发现.......>>> kafka生产者生产30条数据,而消费者却不一定消费了30条数据,经过探索发现了main线程执行完成了而kafk ...
- java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明
本次的记录内容包括: 1.Java调用生产者APi流程 2.Kafka生产者Api的使用及说明 3.Kafka消费者Api的使用及说明 4.Kafka消费者自动提交Offset和手动提交Offset ...
- java最简单的kafka生产者和消费者,未结合spring
目录 1 最简单的生产者和消费者 1.1 引入maven 1.2 基本的生产者和代码注释 1.3 最简单消费者 2 生产者发送消息的三种方式 2.1 直接send之后就不管了,会自动重试,可能丢失消息 ...
- Kafka生产者与消费者详解
什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系 ...
- Kafka 生产者及消费者详解
一.Kafka 生产者 1.1 分区策略 1)分区的原因 (1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群 ...
- pykafka连接重要使用pykafka,kafka-python的api开发kafka生产者和消费者
https://pykafka.readthedocs.io/en/latest/api/producer.html 说明文档 </div><h2 class="heade ...
- kafka生产者、消费者java示例
1. 生产者 import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.Ke ...
最新文章
- 基础笔记6(exception)
- 2021-10-20 Speaking Class
- 从Java面试官的角度,如何快速判断程序员的能力
- 常见数据结构的 Python 实现(建议收藏)
- Python切片各种情况详解
- iOS 动画基础总结篇
- Kubernetes-基本介绍/核心功能/相关术语(一)
- 3件Unreal Engine 3不得不说的故事
- 第19章 可空值类型
- 6-2 视频分解图片
- 为什么要使用Keil MDK-ARM中间件库?
- 信息系统项目管理笔记
- 如何阅读Java源码?
- FX2N-2DA模拟量输出模块简述
- [Python]通过有道词典API获取单词发音MP3
- 量子计算与通讯的基本原理(量子纠缠)
- No matter what,just do not give up。
- postgresql仅修改时间戳的时分秒写法
- 用java画企鹅_Fireworks绘制简笔QQ企鹅图像
- 单片机-c语言LED灯循环闪烁