使用kafka-clients操作kafka始终不成功,原因不清楚,下面贴出相关代码及配置,请懂得指点一下,谢谢!

环境及依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.2.0</version>
</dependency>

JDK版本为1.8、Kafka版本为2.12-0.10.2.0,服务器使用CentOS-7构建。

测试代码

  • TestBase.java

public class TestBase {protected Logger log = LoggerFactory.getLogger(this.getClass());protected String kafka_server = "192.168.60.160:9092" ;protected String topic = "zlikun_topic";}
  • ProducerTest.java

public class ProducerTest extends TestBase {protected Properties props = new Properties();@Beforepublic void init() {props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG ,MyPartitioner.class) ;}@Testpublic void test() throws InterruptedException {KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), Integer.toString(i)), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.printf("offset = %d ,partition = %d \n", recordMetadata.offset() ,recordMetadata.partition());} else {log.error("send error !" ,e);}}});}TimeUnit.SECONDS.sleep(3);producer.close();}}
  • ConsumerTest.java

public class ConsumerTest extends TestBase {private Properties props = new Properties();@Beforepublic void init() {props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);props.put(ConsumerConfig.GROUP_ID_CONFIG ,"zlikun") ;props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);}@Testpublic void test() {Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));
//        consumer.assign(Arrays.asList(new TopicPartition(topic, 1)));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}}

问题

# 测试topic为手动创建
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic zlikun_topic

控制台输出信息

[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time

Java操作Kafka执行不成功 >> java

这个答案描述的挺清楚的:
http://www.goodpm.net/postreply/java/1010000008863969/Java操作Kafka执行不成功.html

转载于:https://www.cnblogs.com/scrumme/p/7668819.html

Java操作Kafka执行不成功相关推荐

  1. kafka入门(4)-java操作kafka

    kafka入门(4)-java操作kafka 准备工作 创建maven工程 导入Maven Kafka POM依赖 <repositories><!-- 代码库 -->< ...

  2. 云计算大数据之 Java 操作 Kafka

    云计算大数据之 Java 操作 Kafka 版权声明: 本文为博主学习整理原创文章,如有不正之处请多多指教. 未经博主允许不得转载.https://blog.csdn.net/qq_42595261/ ...

  3. JAVA操作REDIS执行原子操作

    JAVA操作REDIS执行原子操作 JAVA操作REDIS执行原子操作 为什么要使用原子操作 JAVA操作REDIS执行原子操作 为什么要使用原子操作 众所周知,redis 作为数据库的前置库,给数据 ...

  4. Java操作Kafka收发消息demo

    通过Java程序来进行Kafka收发消息的演示 Kafka自身提供的Java客户端来演示消息的收发,与Kafka的Java客户端相关的Maven依赖如下: <properties>< ...

  5. Java操作Kafka创建Topic、Producer、Consumer

    环境 JDK 1.8 Zookeeper 3.6.1 Kafka 2.6.0 引入依赖 <dependency><groupId>org.apache.kafka</gr ...

  6. kafka java_Java操作Kafka

    java操作kafka非常的简单,然后kafka也提供了很多缺省值,一般情况下我们不需要修改太多的参数就能使用.下面我贴出代码. pom.xml org.apache.kafka kafka-clie ...

  7. kafka详解(JAVA API操作kafka、kafka原理、kafka监控)-step2

    1.JAVA API操作kafka  修改Windows的Host文件: 目录:C:\Windows\System32\drivers\etc (win10) 内容: 192.168.40.150 k ...

  8. Java 操作SSH2实现远程执行linux命令

    引入依赖 <dependency><groupId>ch.ethz.ganymed</groupId><artifactId>ganymed-ssh2& ...

  9. kafka实战教程(python操作kafka),kafka配置文件详解

    全栈工程师开发手册 (作者:栾鹏) 架构系列文章 应用往Kafka写数据的原因有很多:用户行为分析.日志存储.异步通信等.多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量 ...

最新文章

  1. spring-data-jpa Repository的基本知识
  2. 数据库查询构建控件集Active Query Builder 控件
  3. tns(thrift 分布式组件)介绍
  4. 设计模式总结 (3)创建内存型模式
  5. 【Flask项目】项目准备之-创建模块的蓝图
  6. vscode安装sftp控制文件自动上传
  7. 控制器对应view生命周期
  8. 在pycharm中导入anaconda的库
  9. UI设计灵感|引人注目的弹窗设计参考
  10. ubuntu安装vasp_用强大的GROMACS分析工具分析VASP的动力学结果
  11. XUtils BitmapUtils 改造以加入drawable支持
  12. python生成一组随机数_python怎么产生不重复的随机数
  13. 内网穿透的一种方式——基于ngrok的小米球
  14. 短信验证码功能(阿里云版)
  15. 硬盘在计算机的内部结构,33.硬盘篇-认识机械硬盘上的固件和内部结构-电脑自学网...
  16. nodejs断言库_断言库的比较
  17. 家乡的春节html,家乡的春节日记
  18. linux下下载fnl数据,使用python直接提取fnl再分析资料的气象因子数据
  19. Super-Resolution Mapping of Impervious Surfaces from Remotely Sensed Imagery with Points-of-Interest
  20. 基于Debezium 1.6和Oracle 11g 的 Debezium-Oracle实战

热门文章

  1. java 抽象工厂工厂_Java设计模式之简单工厂、工厂方法和抽象工厂
  2. linux 内核模块 编写例子,Linux内核模块实例
  3. bootstrap-fileinput 使用
  4. java 著名的应用程序_即刻就业:java的应用程序有哪些
  5. 双系统gazebo闪退_记录Ubuntu16.04下PX4联合Gazebo仿真时遇到的问题与解决方法
  6. linux fcntl注销信号,fcntl · Linux C API 参考手册 · 看云
  7. matlab decomposition filters,MATLAB小波去噪求助(附算法和显示图片)!不知自己哪个地方出了问题,求指点! - 信息科学 - 小木虫 - 学术 科研 互动社区...
  8. linux 网络端口状态,Linux下用netstat查看网络状态、端口状态(转)
  9. 第十六届全国大学智能汽车竞赛全向组沁恒芯片申请统计情况
  10. 为什么单片机通常只有那么小的数据内存?