java作为kafka生产者实验及Expiring超时问题解决
【README】 java作为生产者,centos 作为消费者;
【1】生产者代码
-- pom.xml
<!-- 依赖 --> <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.0</version></dependency></dependencies>
生产者
-- 生产者
public class MyProducer {public static void main(String[] args) {/* 1.创建kafka生产者的配置信息 */Properties props = new Properties();/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092"); /*3.ack应答级别*/props.put(ProducerConfig.ACKS_CONFIG, "all");/*4.重试次数*/props.put(ProducerConfig.RETRIES_CONFIG, 3); /*5.批次大小,一次发送多少数据,当数据大于16k,生产者会发送数据到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K); /*6.等待时间, 等待时间超过1毫秒,即便数据没有大于16k, 也会写数据到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1); /*7. RecordAccumulator 缓冲区大小*/ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M); /*8. key, value 的序列化类 */ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());System.out.println(props); /* 9.创建生产者对象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props); /* 10.发送数据 */ for (int i = 0; i < 10; i++) { Future<RecordMetadata> future = producer.send(new ProducerRecord<>("first01", "first01-20201229--A" + i));System.out.printf("写入数据%s \n", "first01-20201229--A" + i);try {System.out.println(future.get().offset());} catch (Exception e) {e.printStackTrace();} }/* 11.关闭资源 */ producer.close();System.out.println("kafka生产者写入数据完成"); }
}
【2】centos 消费者
[root@centos202 kafka-0.11]# kafka-console-consumer.sh --topic first01 --bootstrap-server centos201:9092 --from-beginning
first01-20201229--2
first01-20201229--6
first01-20201229--A0
first01-20201229--A1
first01-20201229--A2
first01-20201229--A3
first01-20201229--A4
first01-20201229--A5
first01-20201229--A6
first01-20201229--A7
first01-20201229--A8
first01-20201229--A9
【3】生产者发送消息超时问题
3.1、问题现场
kafka Expiring 1 record(s) for first01-3: 31539 ms has passed since batch creation plus linger time
3.2、解决方法
修改本地机器的hosts, 如下:
192.168.163.201 centos201
192.168.163.202 centos202
192.168.163.203 centos203
java作为kafka生产者实验及Expiring超时问题解决相关推荐
- java实现Kafka生产者示例
使用java实现Kafka的生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 3 ...
- java kafka 集群消费_kafka集群搭建和使用Java写kafka生产者消费者
转自:http://chengjianxiaoxue.iteye.com/blog/2190488 1 kafka集群搭建 1.zookeeper集群 搭建在110, 111,112 2.kafka使 ...
- 【项目实战】Java 开发 Kafka 生产者
- java最简单的kafka生产者和消费者,未结合spring
目录 1 最简单的生产者和消费者 1.1 引入maven 1.2 基本的生产者和代码注释 1.3 最简单消费者 2 生产者发送消息的三种方式 2.1 直接send之后就不管了,会自动重试,可能丢失消息 ...
- java客户端作为kafka生产者测试
[README] 1.本文主要对 java客户端作为kafka 生产者进行测试, 消费者由 centos的kafka命令行线程扮演: 2.消息发送: kafka的生产者采用异步发送消息的方式,在消息发 ...
- java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明
本次的记录内容包括: 1.Java调用生产者APi流程 2.Kafka生产者Api的使用及说明 3.Kafka消费者Api的使用及说明 4.Kafka消费者自动提交Offset和手动提交Offset ...
- 详细讲解如何使用Java连接Kafka构建生产者和消费者(带测试样例)
1 缘起 学习消息队列的过程中,先补习了RabbitMQ相关知识, 接着又重温了Kafka相关的知识, 发现,我并没有积累Java原生操作Kafka的文章, 只使用SpringBoot集成过Kafka ...
- mysql作为kafka生产者_Kafka之生产者
[TOC] 从编程的角度而言,生产者就是负责向 Kafka 发送消息的应用程序.在 Kafka 的历史变迁 中, 一共有两个大版本的生产者客户端: 第-个是于 Kafka开源之初使用 Scala语言编 ...
- java 集成 kafka 0.8.2.1 适配jdk1.6
文章目录 一.版本说明 二.实战 2.1. 依赖 2.2. 生产者代码 2.3. 消费端代码 2.4. 测试 三.小伙伴疑难解答 3.1. 首先新建一个maven项目 3.2. 把我的依赖和代码复制过 ...
最新文章
- netty源码分析服务器启动 NioEventLoop创建
- 写在方法中的路由跳转
- 【不同的子序列问题】面试官写个字符串要我求有多少个“bigsai“,我懵了
- php mysql随机记录,php – 从MySQL中选择可变数量的随机记录
- 实时获取ccd图像_四元数数控:CCD视觉检测定位系统在玻璃瓶缺陷的检测
- 3 左右_3万左右电动迷你小汽车,3万左右电动迷你小汽车车型推荐
- monkey测试===通过monkey测试检查app内存泄漏和cpu占用
- mysql 去除warning_zabbix监控mysql之去掉烦人的warning告警语句
- 怎样不停请求接口实现实时刷新_快狗打车实时数仓和基于Hologres的数据服务建设...
- sublime用cmd窗口调试python_Sublime Text设置程序输出窗口为dos窗口
- 深入研究微服务架构——第一部分
- TThread类详解转
- 《C++游戏编程入门(第4版)》——1.9 本章小结
- 常见的几种网络抓包及协议分析工具
- MySQL报错:ERROR 3546 (HY000): @@GLOBAL.GTID_PURGED cannot be changed: the new value must be a superset
- 一战托福5个月112分 经验分享 + 备考资料大放送
- BZOJ3876支线剧情
- Aspose.Words 创建表格
- Ubuntu 16.04 解决钉钉、微信等打开chrome时无法打开链接,只能停留在主页的问题
- Ubuntu 无法连接上 cn.archive.ubuntu.com:80
热门文章
- 牛客挑战赛47 D Lots of Edges(最短路+递归枚举子集)
- Codeforces Round #741 (Div. 2)
- Acwing1086. 恨7不成妻(未解决)
- Ybtoj-排列计数【矩阵乘法,分块幂】
- jzoj5057-[GDSOI2017模拟4.13]炮塔【网络流,最大权闭合图】
- [8.21NOIP模拟赛]决战【tarjan】
- ssl提高组周六备考赛【2018.10.27】
- Codeforces Round #673 (Div. 2)——待补 E
- 【状压DP】剑之修炼(jzoj 2130)
- 初一模拟赛总结(2019.5.25)