【README】 java作为生产者,centos 作为消费者;

【1】生产者代码

-- pom.xml
<!-- 依赖 --> <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.0</version></dependency></dependencies>

生产者

-- 生产者
public class MyProducer {public static void main(String[] args) {/* 1.创建kafka生产者的配置信息 */Properties props = new Properties();/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");  /*3.ack应答级别*/props.put(ProducerConfig.ACKS_CONFIG, "all");/*4.重试次数*/props.put(ProducerConfig.RETRIES_CONFIG, 3); /*5.批次大小,一次发送多少数据,当数据大于16k,生产者会发送数据到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K);  /*6.等待时间, 等待时间超过1毫秒,即便数据没有大于16k, 也会写数据到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1); /*7. RecordAccumulator 缓冲区大小*/ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);  /*8. key, value 的序列化类 */ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());System.out.println(props); /* 9.创建生产者对象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props);  /* 10.发送数据 */ for (int i = 0; i < 10; i++) { Future<RecordMetadata> future = producer.send(new ProducerRecord<>("first01", "first01-20201229--A" + i));System.out.printf("写入数据%s \n", "first01-20201229--A" + i);try {System.out.println(future.get().offset());} catch (Exception e) {e.printStackTrace();} }/* 11.关闭资源 */  producer.close();System.out.println("kafka生产者写入数据完成"); }
}

【2】centos 消费者

[root@centos202 kafka-0.11]# kafka-console-consumer.sh --topic first01 --bootstrap-server centos201:9092 --from-beginning
first01-20201229--2
first01-20201229--6
first01-20201229--A0
first01-20201229--A1
first01-20201229--A2
first01-20201229--A3
first01-20201229--A4
first01-20201229--A5
first01-20201229--A6
first01-20201229--A7
first01-20201229--A8
first01-20201229--A9

【3】生产者发送消息超时问题

3.1、问题现场

kafka Expiring 1 record(s) for first01-3: 31539 ms has passed since batch creation plus linger time

3.2、解决方法

修改本地机器的hosts, 如下:

192.168.163.201 centos201
192.168.163.202 centos202
192.168.163.203 centos203 

java作为kafka生产者实验及Expiring超时问题解决相关推荐

  1. java实现Kafka生产者示例

    使用java实现Kafka的生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 3 ...

  2. java kafka 集群消费_kafka集群搭建和使用Java写kafka生产者消费者

    转自:http://chengjianxiaoxue.iteye.com/blog/2190488 1 kafka集群搭建 1.zookeeper集群 搭建在110, 111,112 2.kafka使 ...

  3. 【项目实战】Java 开发 Kafka 生产者

  4. java最简单的kafka生产者和消费者,未结合spring

    目录 1 最简单的生产者和消费者 1.1 引入maven 1.2 基本的生产者和代码注释 1.3 最简单消费者 2 生产者发送消息的三种方式 2.1 直接send之后就不管了,会自动重试,可能丢失消息 ...

  5. java客户端作为kafka生产者测试

    [README] 1.本文主要对 java客户端作为kafka 生产者进行测试, 消费者由 centos的kafka命令行线程扮演: 2.消息发送: kafka的生产者采用异步发送消息的方式,在消息发 ...

  6. java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明

    本次的记录内容包括: 1.Java调用生产者APi流程 2.Kafka生产者Api的使用及说明 3.Kafka消费者Api的使用及说明 4.Kafka消费者自动提交Offset和手动提交Offset ...

  7. 详细讲解如何使用Java连接Kafka构建生产者和消费者(带测试样例)

    1 缘起 学习消息队列的过程中,先补习了RabbitMQ相关知识, 接着又重温了Kafka相关的知识, 发现,我并没有积累Java原生操作Kafka的文章, 只使用SpringBoot集成过Kafka ...

  8. mysql作为kafka生产者_Kafka之生产者

    [TOC] 从编程的角度而言,生产者就是负责向 Kafka 发送消息的应用程序.在 Kafka 的历史变迁 中, 一共有两个大版本的生产者客户端: 第-个是于 Kafka开源之初使用 Scala语言编 ...

  9. java 集成 kafka 0.8.2.1 适配jdk1.6

    文章目录 一.版本说明 二.实战 2.1. 依赖 2.2. 生产者代码 2.3. 消费端代码 2.4. 测试 三.小伙伴疑难解答 3.1. 首先新建一个maven项目 3.2. 把我的依赖和代码复制过 ...

最新文章

  1. netty源码分析服务器启动 NioEventLoop创建
  2. 写在方法中的路由跳转
  3. 【不同的子序列问题】面试官写个字符串要我求有多少个“bigsai“,我懵了
  4. php mysql随机记录,php – 从MySQL中选择可变数量的随机记录
  5. 实时获取ccd图像_四元数数控:CCD视觉检测定位系统在玻璃瓶缺陷的检测
  6. 3 左右_3万左右电动迷你小汽车,3万左右电动迷你小汽车车型推荐
  7. monkey测试===通过monkey测试检查app内存泄漏和cpu占用
  8. mysql 去除warning_zabbix监控mysql之去掉烦人的warning告警语句
  9. 怎样不停请求接口实现实时刷新_快狗打车实时数仓和基于Hologres的数据服务建设...
  10. sublime用cmd窗口调试python_Sublime Text设置程序输出窗口为dos窗口
  11. 深入研究微服务架构——第一部分
  12. TThread类详解转
  13. 《C++游戏编程入门(第4版)》——1.9 本章小结
  14. 常见的几种网络抓包及协议分析工具
  15. MySQL报错:ERROR 3546 (HY000): @@GLOBAL.GTID_PURGED cannot be changed: the new value must be a superset
  16. 一战托福5个月112分 经验分享 + 备考资料大放送
  17. BZOJ3876支线剧情
  18. Aspose.Words 创建表格
  19. Ubuntu 16.04 解决钉钉、微信等打开chrome时无法打开链接,只能停留在主页的问题
  20. Ubuntu 无法连接上 cn.archive.ubuntu.com:80

热门文章

  1. 牛客挑战赛47 D Lots of Edges(最短路+递归枚举子集)
  2. Codeforces Round #741 (Div. 2)
  3. Acwing1086. 恨7不成妻(未解决)
  4. Ybtoj-排列计数【矩阵乘法,分块幂】
  5. jzoj5057-[GDSOI2017模拟4.13]炮塔【网络流,最大权闭合图】
  6. [8.21NOIP模拟赛]决战【tarjan】
  7. ssl提高组周六备考赛【2018.10.27】
  8. Codeforces Round #673 (Div. 2)——待补 E
  9. 【状压DP】剑之修炼(jzoj 2130)
  10. 初一模拟赛总结(2019.5.25)