Kafka学习笔记-Java简单操作
Maven依赖包:
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>0.8.2.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
- <version>0.8.2.1</version>
- </dependency>
代码如下:
- import java.util.Properties;
- import org.apache.kafka.clients.producer.Callback;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class KafkaProducerTest {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerTest.class);
- private static Properties properties = null;
- static {
- properties = new Properties();
- properties.put("bootstrap.servers", "centos.master:9092,centos.slave1:9092,centos.slave2:9092");
- properties.put("producer.type", "sync");
- properties.put("request.required.acks", "1");
- properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
- properties.put("partitioner.class", "kafka.producer.DefaultPartitioner");
- properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
- // properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
- // properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- }
- public void produce() {
- KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<byte[],byte[]>(properties);
- ProducerRecord<byte[],byte[]> kafkaRecord = new ProducerRecord<byte[],byte[]>(
- "test", "kkk".getBytes(), "vvv".getBytes());
- kafkaProducer.send(kafkaRecord, new Callback() {
- public void onCompletion(RecordMetadata metadata, Exception e) {
- if(null != e) {
- LOG.info("the offset of the send record is {}", metadata.offset());
- LOG.error(e.getMessage(), e);
- }
- LOG.info("complete!");
- }
- });
- kafkaProducer.close();
- }
- public static void main(String[] args) {
- KafkaProducerTest kafkaProducerTest = new KafkaProducerTest();
- for (int i = 0; i < 10; i++) {
- kafkaProducerTest.produce();
- }
- }
- }
- import java.util.List;
- import java.util.Map;
- import java.util.Properties;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class KafkaConsumerTest {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerTest.class);
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- "centos.master:9092,centos.slave1:9092,centos.slave2:9092");
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
- properties.put(ConsumerConfig.SESSION_TIMEOUT_MS, "1000");
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
- properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");
- // properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "roundrobin");
- properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10000");
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<byte[], byte[]>(properties);
- kafkaConsumer.subscribe("test");
- // kafkaConsumer.subscribe("*");
- boolean isRunning = true;
- while(isRunning) {
- Map<String, ConsumerRecords<byte[], byte[]>> results = kafkaConsumer.poll(100);
- if (null != results) {
- for (Map.Entry<String, ConsumerRecords<byte[], byte[]>> entry : results.entrySet()) {
- LOG.info("topic {}", entry.getKey());
- ConsumerRecords<byte[], byte[]> consumerRecords = entry.getValue();
- List<ConsumerRecord<byte[], byte[]>> records = consumerRecords.records();
- for (int i = 0, len = records.size(); i < len; i++) {
- ConsumerRecord<byte[], byte[]> consumerRecord = records.get(i);
- LOG.info("topic {} partition {}", consumerRecord.topic(), consumerRecord.partition());
- try {
- LOG.info("offset {} value {}", consumerRecord.offset(), new String(consumerRecord.value()));
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
- }
- }
- }
- kafkaConsumer.close();
- }
- }
发现KafkaConsumer的poll方法未实现
- @Override
- public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
- // TODO Auto-generated method stub
- return null;
- }
后改为kafka.javaapi.consumer.SimpleConsumer实现,正常运行
- import java.nio.ByteBuffer;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import kafka.api.FetchRequest;
- import kafka.api.FetchRequestBuilder;
- import kafka.api.PartitionOffsetRequestInfo;
- import kafka.cluster.Broker;
- import kafka.common.ErrorMapping;
- import kafka.common.TopicAndPartition;
- import kafka.javaapi.FetchResponse;
- import kafka.javaapi.OffsetRequest;
- import kafka.javaapi.OffsetResponse;
- import kafka.javaapi.PartitionMetadata;
- import kafka.javaapi.TopicMetadata;
- import kafka.javaapi.TopicMetadataRequest;
- import kafka.javaapi.TopicMetadataResponse;
- import kafka.javaapi.consumer.SimpleConsumer;
- import kafka.message.MessageAndOffset;
- public class KafkaSimpleConsumerTest {
- private List<String> borkerList = new ArrayList<String>();
- public KafkaSimpleConsumerTest() {
- borkerList = new ArrayList<String>();
- }
- public static void main(String args[]) {
- KafkaSimpleConsumerTest kafkaSimpleConsumer = new KafkaSimpleConsumerTest();
- // 最大读取消息数量
- long maxReadNum = Long.parseLong("3");
- // 订阅的topic
- String topic = "test";
- // 查找的分区
- int partition = Integer.parseInt("0");
- // broker节点
- List<String> seeds = new ArrayList<String>();
- seeds.add("centos.master");
- seeds.add("centos.slave1");
- seeds.add("centos.slave2");
- // 端口
- int port = Integer.parseInt("9092");
- try {
- kafkaSimpleConsumer.run(maxReadNum, topic, partition, seeds, port);
- } catch (Exception e) {
- System.out.println("Oops:" + e);
- e.printStackTrace();
- }
- }
- public void run(long maxReadNum, String topic, int partition, List<String> seedBrokers, int port) throws Exception {
- // 获取指定topic partition的元数据
- PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition);
- if (metadata == null) {
- System.out.println("can't find metadata for topic and partition. exit");
- return;
- }
- if (metadata.leader() == null) {
- System.out.println("can't find leader for topic and partition. exit");
- return;
- }
- String leadBroker = metadata.leader().host();
- String clientName = "client_" + topic + "_" + partition;
- SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
- long readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
- int numErrors = 0;
- while (maxReadNum > 0) {
- if (consumer == null) {
- consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
- }
- FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(topic, partition, readOffset, 100000).build();
- FetchResponse fetchResponse = consumer.fetch(req);
- if (fetchResponse.hasError()) {
- numErrors++;
- short code = fetchResponse.errorCode(topic, partition);
- System.out.println("error fetching data from the broker:" + leadBroker + " reason: " + code);
- if (numErrors > 5)
- break;
- if (code == ErrorMapping.OffsetOutOfRangeCode()) {
- readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);
- continue;
- }
- consumer.close();
- consumer = null;
- leadBroker = findNewLeader(leadBroker, topic, partition, port);
- continue;
- }
- numErrors = 0;
- long numRead = 0;
- for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {
- long currentOffset = messageAndOffset.offset();
- if (currentOffset < readOffset) {
- System.out.println("found an old offset: " + currentOffset + " expecting: " + readOffset);
- continue;
- }
- readOffset = messageAndOffset.nextOffset();
- ByteBuffer payload = messageAndOffset.message().payload();
- byte[] bytes = new byte[payload.limit()];
- payload.get(bytes);
- System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
- numRead++;
- maxReadNum--;
- }
- if (numRead == 0) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {
- }
- }
- }
- if (consumer != null)
- consumer.close();
- }
- /**
- * 从活跃的Broker列表中找出指定Topic、Partition中的Leader Broker
- * @param seedBrokers
- * @param port
- * @param topic
- * @param partition
- * @return
- */
- private PartitionMetadata findLeader(List<String> seedBrokers, int port, String topic, int partition) {
- PartitionMetadata partitionMetadata = null;
- loop: for (String seedBroker : seedBrokers) {
- SimpleConsumer consumer = null;
- try {
- consumer = new SimpleConsumer(seedBroker, port, 100000, 64 * 1024, "leaderLookup");
- List<String> topics = Collections.singletonList(topic);
- TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(topics);
- TopicMetadataResponse topicMetadataResponse = consumer.send(topicMetadataRequest);
- List<TopicMetadata> topicMetadatas = topicMetadataResponse.topicsMetadata();
- for (TopicMetadata topicMetadata : topicMetadatas) {
- for (PartitionMetadata pMetadata : topicMetadata.partitionsMetadata()) {
- if (pMetadata.partitionId() == partition) {
- partitionMetadata = pMetadata;
- break loop;
- }
- }
- }
- } catch (Exception e) {
- System.out.println("error communicating with broker [" + seedBroker + "] to find leader for [" + topic + ", " + partition + "] reason: " + e);
- } finally {
- if (consumer != null)
- consumer.close();
- }
- }
- if (partitionMetadata != null) {
- borkerList.clear();
- for (Broker replica : partitionMetadata.replicas()) {
- borkerList.add(replica.host());
- }
- }
- return partitionMetadata;
- }
- public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
- TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
- Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
- requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
- OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
- OffsetResponse response = consumer.getOffsetsBefore(request);
- if (response.hasError()) {
- System.out.println("error fetching data offset data the broker. reason: " + response.errorCode(topic, partition));
- return 0;
- }
- long[] offsets = response.offsets(topic, partition);
- return offsets[0];
- }
- private String findNewLeader(String oldLeader, String topic, int partition, int port) throws Exception {
- for (int i = 0; i < 3; i++) {
- boolean goToSleep = false;
- PartitionMetadata metadata = findLeader(borkerList, port, topic, partition);
- if (metadata == null) {
- goToSleep = true;
- } else if (metadata.leader() == null) {
- goToSleep = true;
- } else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
- goToSleep = true;
- } else {
- return metadata.leader().host();
- }
- if (goToSleep) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {
- }
- }
- }
- System.out.println("unable to find new leader after broker failure. exit");
- throw new Exception("unable to find new leader after broker failure. exit");
- }
- }
转载于:https://www.cnblogs.com/edison2012/p/5759223.html
Kafka学习笔记-Java简单操作相关推荐
- java算法优化_Java学习笔记---Java简单的代码算法优化(例)
例:用一张1元纸币兑换1分.2分.5分硬币,要求兑换50枚硬币,求出所有组合. package mypackage01; public class demo { public static void ...
- 大数据 -- kafka学习笔记:知识点整理(部分转载)
一 为什么需要消息系统 1.解耦 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险.许多 ...
- JDBC学习笔记——Java语言与数据库的鹊桥
JDBC学习笔记--Java语言与数据库的鹊桥 JDBC(Java DataBase Connectivity):SUN公司提供的 一套操作数据库的标准规范,说白了就是用Java语言来操作数据 ...
- 深入理解Java虚拟机(第3版)学习笔记——JAVA内存区域(超详细)
深入理解Java虚拟机(第3版)学习笔记--JAVA内存区域(超详细) 运行时数据区域 程序计数器 java虚拟机栈 本地方法栈 java堆 方法区 运行时常量池 直接内存 对象的创建 对象的内存布局 ...
- tensorflow学习笔记——使用TensorFlow操作MNIST数据(1)
续集请点击我:tensorflow学习笔记--使用TensorFlow操作MNIST数据(2) 本节开始学习使用tensorflow教程,当然从最简单的MNIST开始.这怎么说呢,就好比编程入门有He ...
- Kafka学习笔记(3)----Kafka的数据复制(Replica)与Failover
1. CAP理论 1.1 Cosistency(一致性) 通过某个节点的写操作结果对后面通过其他节点的读操作可见. 如果更新数据后,并发访问的情况下可立即感知该更新,称为强一致性 如果允许之后部分或全 ...
- VC学习笔记:简单绘图
VC学习笔记:简单绘图 SkySeraph Oct.29th 2009 HQU Email-zgzhaobo@gmail.com QQ-452728574 Latest Modified Date ...
- Tensorflow2学习笔记:简单灰度图分类
Tensorflow2学习笔记:简单灰度图分类 相关介绍 实验环境 实验步骤 导入相关库 导入数据集 浏览数据 预处理数据 构建模型 设置层 编译模型 训练模型 向模型馈送数据 评估准确率 进行预测 ...
- Kafka学习笔记(一):什么是消息队列?什么是Kafka?
目录 一.消息队列的概述 (一)前置知识点 1.集群和分布式 2.队列(Queue)的含义 3.同步与异步的含义 (二)消息队列的含义与特点 二.Kafka (一) 概述 (二) 常用名词含义 导航栏 ...
最新文章
- Boost TCP serverclient 有回调无发送
- Spring Data JPA 常用注解
- poj 3295 Tautology(经典构造算法题)
- Flex页面跳转的五种实现方式
- python 线程超时设置_python 条件变量Condition(36)
- 2019 .NET China Conf:路一直都在,社区会更好
- php tar.gz文件,PHP解压tar.gz格式文件的方法,_PHP教程
- Qt creator5.7 OpenCV249之均值滤波(含源码下载)
- day-16 jquery的DOM文档操作及bootstrap
- Qone 自动删除说说脚本
- Eclipse中的Git使用之Branch创建,Merge
- delphi与python_python和delphi哪个好
- 国外云服务器有哪些?国外云服务器大全
- 阿里云服务器linux 启动网卡失败,提示does not seem to be present,delaying initialization
- python colorbar刻度_python-如何添加Matplotlib Colorbar刻度
- HTML技巧篇——禁止网页元素被右击、拖动、选中、复制
- java 随机手机验证码_Java随机生成手机短信验证码的方法
- CentOS7使用Yum安装k8s
- ng-alain php,基于阿里出得ng-Alain搭建后台管理系统
- 【java】【kotlin】判断当前日期是星期几、是否为月底;获取当前季度起始时间