Maven依赖包:

[plain] view plaincopy
  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>0.8.2.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.kafka</groupId>
  8. <artifactId>kafka_2.11</artifactId>
  9. <version>0.8.2.1</version>
  10. </dependency>

代码如下:

[java] view plaincopy
  1. import java.util.Properties;
  2. import org.apache.kafka.clients.producer.Callback;
  3. import org.apache.kafka.clients.producer.KafkaProducer;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import org.apache.kafka.clients.producer.RecordMetadata;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. public class KafkaProducerTest {
  9. private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerTest.class);
  10. private static Properties properties = null;
  11. static {
  12. properties = new Properties();
  13. properties.put("bootstrap.servers", "centos.master:9092,centos.slave1:9092,centos.slave2:9092");
  14. properties.put("producer.type", "sync");
  15. properties.put("request.required.acks", "1");
  16. properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
  17. properties.put("partitioner.class", "kafka.producer.DefaultPartitioner");
  18. properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
  19. //      properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  20. properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
  21. //      properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  22. }
  23. public void produce() {
  24. KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<byte[],byte[]>(properties);
  25. ProducerRecord<byte[],byte[]> kafkaRecord = new ProducerRecord<byte[],byte[]>(
  26. "test", "kkk".getBytes(), "vvv".getBytes());
  27. kafkaProducer.send(kafkaRecord, new Callback() {
  28. public void onCompletion(RecordMetadata metadata, Exception e) {
  29. if(null != e) {
  30. LOG.info("the offset of the send record is {}", metadata.offset());
  31. LOG.error(e.getMessage(), e);
  32. }
  33. LOG.info("complete!");
  34. }
  35. });
  36. kafkaProducer.close();
  37. }
  38. public static void main(String[] args) {
  39. KafkaProducerTest kafkaProducerTest = new KafkaProducerTest();
  40. for (int i = 0; i < 10; i++) {
  41. kafkaProducerTest.produce();
  42. }
  43. }
  44. }
[java] view plaincopy
  1. import java.util.List;
  2. import java.util.Map;
  3. import java.util.Properties;
  4. import org.apache.kafka.clients.consumer.ConsumerConfig;
  5. import org.apache.kafka.clients.consumer.ConsumerRecord;
  6. import org.apache.kafka.clients.consumer.ConsumerRecords;
  7. import org.apache.kafka.clients.consumer.KafkaConsumer;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. public class KafkaConsumerTest {
  11. private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerTest.class);
  12. public static void main(String[] args) {
  13. Properties properties = new Properties();
  14. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
  15. "centos.master:9092,centos.slave1:9092,centos.slave2:9092");
  16. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
  17. properties.put(ConsumerConfig.SESSION_TIMEOUT_MS, "1000");
  18. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
  19. properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");
  20. //      properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "roundrobin");
  21. properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10000");
  22. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  23. "org.apache.kafka.common.serialization.ByteArrayDeserializer");
  24. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
  25. "org.apache.kafka.common.serialization.ByteArrayDeserializer");
  26. KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<byte[], byte[]>(properties);
  27. kafkaConsumer.subscribe("test");
  28. //      kafkaConsumer.subscribe("*");
  29. boolean isRunning = true;
  30. while(isRunning) {
  31. Map<String, ConsumerRecords<byte[], byte[]>> results = kafkaConsumer.poll(100);
  32. if (null != results) {
  33. for (Map.Entry<String, ConsumerRecords<byte[], byte[]>> entry : results.entrySet()) {
  34. LOG.info("topic {}", entry.getKey());
  35. ConsumerRecords<byte[], byte[]> consumerRecords = entry.getValue();
  36. List<ConsumerRecord<byte[], byte[]>> records = consumerRecords.records();
  37. for (int i = 0, len = records.size(); i < len; i++) {
  38. ConsumerRecord<byte[], byte[]> consumerRecord = records.get(i);
  39. LOG.info("topic {} partition {}", consumerRecord.topic(), consumerRecord.partition());
  40. try {
  41. LOG.info("offset {} value {}", consumerRecord.offset(), new String(consumerRecord.value()));
  42. } catch (Exception e) {
  43. LOG.error(e.getMessage(), e);
  44. }
  45. }
  46. }
  47. }
  48. }
  49. kafkaConsumer.close();
  50. }
  51. }

发现KafkaConsumer的poll方法未实现

[java] view plaincopy
  1. @Override
  2. public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
  3. // TODO Auto-generated method stub
  4. return null;
  5. }

后改为kafka.javaapi.consumer.SimpleConsumer实现,正常运行

[java] view plaincopy
  1. import java.nio.ByteBuffer;
  2. import java.util.ArrayList;
  3. import java.util.Collections;
  4. import java.util.HashMap;
  5. import java.util.List;
  6. import java.util.Map;
  7. import kafka.api.FetchRequest;
  8. import kafka.api.FetchRequestBuilder;
  9. import kafka.api.PartitionOffsetRequestInfo;
  10. import kafka.cluster.Broker;
  11. import kafka.common.ErrorMapping;
  12. import kafka.common.TopicAndPartition;
  13. import kafka.javaapi.FetchResponse;
  14. import kafka.javaapi.OffsetRequest;
  15. import kafka.javaapi.OffsetResponse;
  16. import kafka.javaapi.PartitionMetadata;
  17. import kafka.javaapi.TopicMetadata;
  18. import kafka.javaapi.TopicMetadataRequest;
  19. import kafka.javaapi.TopicMetadataResponse;
  20. import kafka.javaapi.consumer.SimpleConsumer;
  21. import kafka.message.MessageAndOffset;
  22. public class KafkaSimpleConsumerTest {
  23. private List<String> borkerList = new ArrayList<String>();
  24. public KafkaSimpleConsumerTest() {
  25. borkerList = new ArrayList<String>();
  26. }
  27. public static void main(String args[]) {
  28. KafkaSimpleConsumerTest kafkaSimpleConsumer = new KafkaSimpleConsumerTest();
  29. // 最大读取消息数量
  30. long maxReadNum = Long.parseLong("3");
  31. // 订阅的topic
  32. String topic = "test";
  33. // 查找的分区
  34. int partition = Integer.parseInt("0");
  35. // broker节点
  36. List<String> seeds = new ArrayList<String>();
  37. seeds.add("centos.master");
  38. seeds.add("centos.slave1");
  39. seeds.add("centos.slave2");
  40. // 端口
  41. int port = Integer.parseInt("9092");
  42. try {
  43. kafkaSimpleConsumer.run(maxReadNum, topic, partition, seeds, port);
  44. } catch (Exception e) {
  45. System.out.println("Oops:" + e);
  46. e.printStackTrace();
  47. }
  48. }
  49. public void run(long maxReadNum, String topic, int partition, List<String> seedBrokers, int port) throws Exception {
  50. // 获取指定topic partition的元数据
  51. PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition);
  52. if (metadata == null) {
  53. System.out.println("can't find metadata for topic and partition. exit");
  54. return;
  55. }
  56. if (metadata.leader() == null) {
  57. System.out.println("can't find leader for topic and partition. exit");
  58. return;
  59. }
  60. String leadBroker = metadata.leader().host();
  61. String clientName = "client_" + topic + "_" + partition;
  62. SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
  63. long readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
  64. int numErrors = 0;
  65. while (maxReadNum > 0) {
  66. if (consumer == null) {
  67. consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
  68. }
  69. FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(topic, partition, readOffset, 100000).build();
  70. FetchResponse fetchResponse = consumer.fetch(req);
  71. if (fetchResponse.hasError()) {
  72. numErrors++;
  73. short code = fetchResponse.errorCode(topic, partition);
  74. System.out.println("error fetching data from the broker:" + leadBroker + " reason: " + code);
  75. if (numErrors > 5)
  76. break;
  77. if (code == ErrorMapping.OffsetOutOfRangeCode()) {
  78. readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);
  79. continue;
  80. }
  81. consumer.close();
  82. consumer = null;
  83. leadBroker = findNewLeader(leadBroker, topic, partition, port);
  84. continue;
  85. }
  86. numErrors = 0;
  87. long numRead = 0;
  88. for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {
  89. long currentOffset = messageAndOffset.offset();
  90. if (currentOffset < readOffset) {
  91. System.out.println("found an old offset: " + currentOffset + " expecting: " + readOffset);
  92. continue;
  93. }
  94. readOffset = messageAndOffset.nextOffset();
  95. ByteBuffer payload = messageAndOffset.message().payload();
  96. byte[] bytes = new byte[payload.limit()];
  97. payload.get(bytes);
  98. System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
  99. numRead++;
  100. maxReadNum--;
  101. }
  102. if (numRead == 0) {
  103. try {
  104. Thread.sleep(1000);
  105. } catch (InterruptedException ie) {
  106. }
  107. }
  108. }
  109. if (consumer != null)
  110. consumer.close();
  111. }
  112. /**
  113. * 从活跃的Broker列表中找出指定Topic、Partition中的Leader Broker
  114. * @param seedBrokers
  115. * @param port
  116. * @param topic
  117. * @param partition
  118. * @return
  119. */
  120. private PartitionMetadata findLeader(List<String> seedBrokers, int port, String topic, int partition) {
  121. PartitionMetadata partitionMetadata = null;
  122. loop: for (String seedBroker : seedBrokers) {
  123. SimpleConsumer consumer = null;
  124. try {
  125. consumer = new SimpleConsumer(seedBroker, port, 100000, 64 * 1024, "leaderLookup");
  126. List<String> topics = Collections.singletonList(topic);
  127. TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(topics);
  128. TopicMetadataResponse topicMetadataResponse = consumer.send(topicMetadataRequest);
  129. List<TopicMetadata> topicMetadatas = topicMetadataResponse.topicsMetadata();
  130. for (TopicMetadata topicMetadata : topicMetadatas) {
  131. for (PartitionMetadata pMetadata : topicMetadata.partitionsMetadata()) {
  132. if (pMetadata.partitionId() == partition) {
  133. partitionMetadata = pMetadata;
  134. break loop;
  135. }
  136. }
  137. }
  138. } catch (Exception e) {
  139. System.out.println("error communicating with broker [" + seedBroker + "] to find leader for [" + topic + ", " + partition + "] reason: " + e);
  140. } finally {
  141. if (consumer != null)
  142. consumer.close();
  143. }
  144. }
  145. if (partitionMetadata != null) {
  146. borkerList.clear();
  147. for (Broker replica : partitionMetadata.replicas()) {
  148. borkerList.add(replica.host());
  149. }
  150. }
  151. return partitionMetadata;
  152. }
  153. public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
  154. TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
  155. Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
  156. requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
  157. OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
  158. OffsetResponse response = consumer.getOffsetsBefore(request);
  159. if (response.hasError()) {
  160. System.out.println("error fetching data offset data the broker. reason: " + response.errorCode(topic, partition));
  161. return 0;
  162. }
  163. long[] offsets = response.offsets(topic, partition);
  164. return offsets[0];
  165. }
  166. private String findNewLeader(String oldLeader, String topic, int partition, int port) throws Exception {
  167. for (int i = 0; i < 3; i++) {
  168. boolean goToSleep = false;
  169. PartitionMetadata metadata = findLeader(borkerList, port, topic, partition);
  170. if (metadata == null) {
  171. goToSleep = true;
  172. } else if (metadata.leader() == null) {
  173. goToSleep = true;
  174. } else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
  175. goToSleep = true;
  176. } else {
  177. return metadata.leader().host();
  178. }
  179. if (goToSleep) {
  180. try {
  181. Thread.sleep(1000);
  182. } catch (InterruptedException ie) {
  183. }
  184. }
  185. }
  186. System.out.println("unable to find new leader after broker failure. exit");
  187. throw new Exception("unable to find new leader after broker failure. exit");
  188. }
  189. }

转载于:https://www.cnblogs.com/edison2012/p/5759223.html

Kafka学习笔记-Java简单操作相关推荐

  1. java算法优化_Java学习笔记---Java简单的代码算法优化(例)

    例:用一张1元纸币兑换1分.2分.5分硬币,要求兑换50枚硬币,求出所有组合. package mypackage01; public class demo { public static void ...

  2. 大数据 -- kafka学习笔记:知识点整理(部分转载)

    一 为什么需要消息系统 1.解耦 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险.许多 ...

  3. JDBC学习笔记——Java语言与数据库的鹊桥

    JDBC学习笔记--Java语言与数据库的鹊桥     JDBC(Java DataBase Connectivity):SUN公司提供的 一套操作数据库的标准规范,说白了就是用Java语言来操作数据 ...

  4. 深入理解Java虚拟机(第3版)学习笔记——JAVA内存区域(超详细)

    深入理解Java虚拟机(第3版)学习笔记--JAVA内存区域(超详细) 运行时数据区域 程序计数器 java虚拟机栈 本地方法栈 java堆 方法区 运行时常量池 直接内存 对象的创建 对象的内存布局 ...

  5. tensorflow学习笔记——使用TensorFlow操作MNIST数据(1)

    续集请点击我:tensorflow学习笔记--使用TensorFlow操作MNIST数据(2) 本节开始学习使用tensorflow教程,当然从最简单的MNIST开始.这怎么说呢,就好比编程入门有He ...

  6. Kafka学习笔记(3)----Kafka的数据复制(Replica)与Failover

    1. CAP理论 1.1 Cosistency(一致性) 通过某个节点的写操作结果对后面通过其他节点的读操作可见. 如果更新数据后,并发访问的情况下可立即感知该更新,称为强一致性 如果允许之后部分或全 ...

  7. VC学习笔记:简单绘图

    VC学习笔记:简单绘图 SkySeraph Oct.29th 2009  HQU Email-zgzhaobo@gmail.com  QQ-452728574 Latest Modified Date ...

  8. Tensorflow2学习笔记:简单灰度图分类

    Tensorflow2学习笔记:简单灰度图分类 相关介绍 实验环境 实验步骤 导入相关库 导入数据集 浏览数据 预处理数据 构建模型 设置层 编译模型 训练模型 向模型馈送数据 评估准确率 进行预测 ...

  9. Kafka学习笔记(一):什么是消息队列?什么是Kafka?

    目录 一.消息队列的概述 (一)前置知识点 1.集群和分布式 2.队列(Queue)的含义 3.同步与异步的含义 (二)消息队列的含义与特点 二.Kafka (一) 概述 (二) 常用名词含义 导航栏 ...

最新文章

  1. Boost TCP serverclient 有回调无发送
  2. Spring Data JPA 常用注解
  3. poj 3295 Tautology(经典构造算法题)
  4. Flex页面跳转的五种实现方式
  5. python 线程超时设置_python 条件变量Condition(36)
  6. 2019 .NET China Conf:路一直都在,社区会更好
  7. php tar.gz文件,PHP解压tar.gz格式文件的方法,_PHP教程
  8. Qt creator5.7 OpenCV249之均值滤波(含源码下载)
  9. day-16 jquery的DOM文档操作及bootstrap
  10. Qone 自动删除说说脚本
  11. Eclipse中的Git使用之Branch创建,Merge
  12. delphi与python_python和delphi哪个好
  13. 国外云服务器有哪些?国外云服务器大全
  14. 阿里云服务器linux 启动网卡失败,提示does not seem to be present,delaying initialization
  15. python colorbar刻度_python-如何添加Matplotlib Colorbar刻度
  16. HTML技巧篇——禁止网页元素被右击、拖动、选中、复制
  17. java 随机手机验证码_Java随机生成手机短信验证码的方法
  18. CentOS7使用Yum安装k8s
  19. ng-alain php,基于阿里出得ng-Alain搭建后台管理系统
  20. 【java】【kotlin】判断当前日期是星期几、是否为月底;获取当前季度起始时间

热门文章

  1. 2019长安大学ACM校赛网络同步赛 L XOR (规律,数位DP)
  2. QT_4_QpushButton的简单使用_对象树
  3. 浏览器的一个请求从发送到返回都经历了什么?
  4. oracle 数据库中数据导出到excel
  5. windows下实现微秒级的延时
  6. 安装mysql数据库要注意的
  7. SQL SERVER查询时间条件式写法
  8. 局部遮罩 shade(二)
  9. SQL:ISNULL
  10. 访问Access数据库需要注意的问题