kafka java api 生产者 producer 与消费者consumer
c踩坑
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic HelloWorld
将localhost必须和PLAINTEXT配置的地址保持一致,否则的话会无限警告不能接收数据
生产者;
import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaProducerDemo { public static void main(String[] args){ try { //新建一个类这个类用来配置kafka Properties properties = new Properties();//用来配置 kafka的IP地址和端口号 properties.put("bootstrap.servers", "master:9092");//ack 的状态可以分为三种 1,-1,all 1代表当生产者 properties.put("acks", "1");//重要//最多重复几次生产操作 properties.put("retries", 3);//重要 //缓存每个分区未发送消息。缓存的大小是通过 batch.size 配置指定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。 properties.put("batch.size", 16384);//监听间隔
properties.put("linger.ms", 1);//控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException。 properties.put("buffer.memory", 33554432);//序列化类型。 Kafka消息是以键值对的形式发送到Kafka集群的,其中Key是可选的,Value可以是任意类型。但是在Message被发送到Kafka集群之前,Producer需要把不同类型的消 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//初始化生产者 Producer<String, String> producer = null; producer = new KafkaProducer<String, String>(properties); for (int i = 0; i < 100; i++) { String msg = "Messagea" + i;//topic的名字:HelloWorld producer.send(new ProducerRecord<String, String>("HelloWorld1", msg)); System.out.println("Sent:" + msg); //Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } }}之后再启动消费者 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic HelloWorld --from-beginning //消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays;import java.util.Properties; public class ConsumerDemo { public static void main(String[] args) { Properties properties = new Properties(); properties.put("bootstrap.servers", "master:9092"); properties.put("group.id", "i"); //是否自动提交properties.put("enable.auto.commit", "false"); properties.put("auto.commit.interval.ms", "1000");//偏移量自增properties.put("auto.offset.reset", "earliest");////session保存时间properties.put("session.timeout.ms", "30000");//序列化properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);//topictest2kafkaConsumer.subscribe(Arrays.asList("HelloWorld")); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }}最后启动生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic HelloWorld
但是以上操作都有可能遇到数据重复或者数据丢失,在kafka0.11后引入了幂等和事务完美的解决了这个问题切忌是0.11版本之后包括0.11,如欲了解详情请看下篇 老规矩:散文欣赏《不老苍穹》今后的日子,我的头发越来越少,我写给你的句子越来越少,我剩下的时间越来越少,但世界永恒,星辰常在,苍穹不老。 最后的日子,我抱起你会很难,我看清你会很难,我记起你会很难,但此心至诚,百年不变,千岁未寒。
转载于:https://www.cnblogs.com/lppz/p/9991993.html
kafka java api 生产者 producer 与消费者consumer相关推荐
- Kafka学习笔记 --- 生产者producer与消费者关系comsumer
生产者:生产者可以将数据发布到所选择的topic(主题)中.生产者负责将记录分配到topic的哪一个 partition(分区)中.可以使用循环的方式来简单地实现负载均衡,也可以根据某些语义分区函数( ...
- kafka java api 删除_Kafka入门系列—6. Kafka 常用命令及Java API使用
常用命令 启动Zookeeper ./zkServer.sh start-foreground 可选参数: ./zkServer.sh {start|start-foreground|stop|res ...
- 【Kafka】kafka Java api 获取 kafka topic 或者 partition 占用的磁盘大小
1.概述 kafka Java api 获取 kafka topic 或者 partition 占用的磁盘大小 package com.dtwave.kafka.storage;import org. ...
- kafka java api 删除,Kafka:删除闲置的消费者组ID
In some cases, I use Kafka-stream to model a small in memory (hashmap) projection of a topic. The K, ...
- kafka java api 删除_使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)...
使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指定分区数和副本数等信息,如果将这些属性配置到server.properties文件中,以后调用Java API生成的主题 ...
- kafka java api 入数据报错:Error: MESSAGE_TOO_LARGE
1.kafka版本:kafka_2.11-2.1.0-kafka-4.0.0.jar 2.server.properties:所有调优参数都是默认 3.topic :null,所有参数默认 4.入库1 ...
- 【Kafka笔记】4.Kafka API详细解析 Java版本(Producer API,Consumer API,拦截器等)
简介 Kafka的API有Producer API,Consumer API还有自定义Interceptor (自定义拦截器),以及处理的流使用的Streams API和构建连接器的Kafka Con ...
- java kafka api_kafka java API的使用
Kafka包含四种核心的API: 1.Producer API支持应用将数据流发送到Kafka集群的主题 2.Consumer API支持应用从Kafka集群的主题中读取数据流 3.Streams A ...
- java kafka 消费_java编程之Kafka_消费者API详解
1 消息发送 1.异步发送导入依赖 org.apache.kafka kafka-clients 0.11.0.0 编写代码 需要用到的类: KafkaProducer:需要创建一个生产者对象,用来发 ...
最新文章
- Linux NUMA 架构 :基础软件工程师需要知道一些知识
- Kubernetes — kubectl 的基本使用
- endnote初始化数据库支持_5 个免费的在线 SQL 数据库环境,比Navicat 香
- Python UDP聊天器
- asp连接oracle6,asp下用OracleInProcServer完成对Oracle的连接和操作-ASP教程,数据库相关...
- 为什么大学要学一堆纸上谈兵的课程?(转)
- SpringBoot+Mybatis集成搭建
- 【蓝桥杯真题】蓝桥杯真题之旋转
- Java微服务和分布式区别
- 储油罐的变位识别与罐容表标定
- 轻重在平衡:平衡查找树的强大威力
- luogu 5561 [Celeste-B]Mirror Magic 后缀数组+RMQ+multiset
- Win11此应用无法在你的电脑上运行怎么解决
- 贝叶斯与朴素贝叶斯入门及实战
- vue3组件之间通信(二)——子传父属性和方法
- unity webgl打包 苹果12以上机型打开连接后模型黑屏卡帧问题
- Smiditor实现图片上传功能
- Linux 部署开源WAF模块 ModSecurity
- 2. Hibernate目录结构和基础JAR包介绍
- Google在线翻译工具:Translatium for Mac支持big sur
热门文章
- 奇瑞无界Pro 刮起性能机甲风
- C# 直接使用DllImport外部dll的方法
- 单片机里如何使用冒泡法实现数据从大到小排列_单片机实验一冒泡法排序.doc...
- Python量化策略入门1-如何利用聚宽(JoinQuant)下载金融数据
- topic是短语还是句子_英语topic
- 新聘应届生入职培训计划(华为)
- 社区宽带繁忙是什么意思_ISP许可证是什么?办理ISP许可证对企业有什么要求?...
- Linux高性能服务器I/0高级应用:非阻塞connect(15)
- 求助大神们 请问sublime行数左边的黄色竖杠表示什么
- 连载《国培计划》骨干教师的研修日志之六:关于米新江教授谈的授之以渔与渔之以渔