环境准备

  1. 启动zookeeper集群和kafka集群,在kafka集群中打开一个消费者
    [hadoop@hadoop-100 kafka]$ zkservers start
    [hadoop@hadoop-100 kafka]$ zkservers status
    [hadoop@hadoop-100 kafka]$ mykafka start
    [hadoop@hadoop-100 kafka]$ myjps
    [hadoop@hadoop-100 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop-100:2181 --topic first
  2. 导入pom依赖
<dependencies><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>0.11.0.0</version></dependency>
</dependencies>

Kafka生产者Java API

创建生产者(过时的API)

package com.zj.producer;import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.List;
import java.util.Properties;public class OldProducer {public static void main(String[] args) {Properties properties = new Properties();properties.put("metadata.broker.list", "hadoop-100:9092");properties.put("request.required.acks", "1");properties.put("serializer.class", "kafka.serializer.StringEncoder");Producer<Integer, String> producer = new Producer<>(new ProducerConfig(properties));KeyedMessage<Integer, String> message = new KeyedMessage<Integer, String>("first","hello api");producer.send(message);}
}

创建生产者(新API)

package com.zj.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class NewProducer {public static void main(String[] args) {Properties props = new Properties();// Kafka服务端的主机名和端口号props.put("bootstrap.servers", "hadoop-100:9092");// 等待所有副本节点的应答props.put("acks", "all");// 消息发送最大尝试次数props.put("retries", 0);// 一批消息处理大小props.put("batch.size", 16384);// 请求延时props.put("linger.ms", 1);// 发送缓存区内存大小props.put("buffer.memory", 33554432);// key序列化props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// value序列化props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)));}producer.close();}
}

创建生产者带回调函数(新API)

package com.zj.producer;import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class NewProducerWithCallBack {public static void main(String[] args) {Properties props = new Properties();// Kafka服务端的主机名和端口号props.put("bootstrap.servers", "hadoop-100:9092");// 等待所有副本节点的应答props.put("acks", "all");// 消息发送最大尝试次数props.put("retries", 0);// 一批消息处理大小props.put("batch.size", 1);// 请求延时props.put("linger.ms", 1);// 发送缓存区内存大小props.put("buffer.memory", 33554432);// key序列化props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// value序列化props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<String, String>("second", Integer.toString(i), "hello" + Integer.toString(i)), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if(metadata != null) {System.out.println(metadata.partition() + "," + metadata.offset());}}});}producer.close();}
}

自定义分区生产者

需求,将所有数据存储到topic的第0号分区上
定义一个类实现Partitioner接口,重写里面的方法(过时API)

package com.zj.producer.partition;import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;public class OldApiMyPartition implements Partitioner {public OldApiMyPartition(VerifiableProperties props) {}@Overridepublic int partition(Object key, int numPartitions) {return 0;}
}

自定义分区(新API)

package com.zj.producer.partition;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;public class NewApiMyPartition implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return 0;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

在代码中调用,新API

package com.zj.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class ProducerWithPartition {public static void main(String[] args) {Properties props = new Properties();// Kafka服务端的主机名和端口号props.put("bootstrap.servers", "hadoop-102:9092");// 等待所有副本节点的应答props.put("acks", "all");// 消息发送最大尝试次数props.put("retries", 0);// 一批消息处理大小props.put("batch.size", 16384);// 增加服务端请求延时props.put("linger.ms", 1);// 发送缓存区内存大小props.put("buffer.memory", 33554432);// key序列化props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// value序列化props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 自定义分区props.put("partitioner.class", "com.zj.producer.partition.NewApiMyPartition");Producer<String, String> producer = new KafkaProducer<>(props);producer.send(new ProducerRecord<String, String>("second", "1", "hhh555"));producer.close();}
}

在代码中调用,旧API

package com.zj.producer;import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;public class OldApiProducerWithPartition {public static void main(String[] args) {Properties properties = new Properties();properties.put("metadata.broker.list", "hadoop-100:9092");properties.put("request.required.acks", "1");properties.put("serializer.class", "kafka.serializer.StringEncoder");properties.put("partitioner.class", "com.zj.producer.partition.OldApiMyPartition");Producer<Integer, String> producer = new Producer<>(new ProducerConfig(properties));KeyedMessage<Integer, String> message = new KeyedMessage<Integer, String>("first","hello api");producer.send(message);}
}

Kafka消费者Java API

高级API

创建消费者(过时API)

package com.zj.consumer;import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;public class HighOldApiConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put("zookeeper.connect", "hadoop-100:2181");properties.put("group.id", "test1");properties.put("zookeeper.session.timeout.ms", "500");properties.put("zookeeper.sync.time.ms", "250");properties.put("auto.commit.interval.ms", "1000");// 创建消费者连接器ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));HashMap<String, Integer> topicCount = new HashMap<>();topicCount.put("first", 1);Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCount);KafkaStream<byte[], byte[]> stream = consumerMap.get("first").get(0);ConsumerIterator<byte[], byte[]> it = stream.iterator();while (it.hasNext()) {System.out.println(new String(it.next().message()));}}
}

低级API

实现使用低级API读取指定topic,指定partition,指定offset的数据

消费者使用低级API 的主要步骤

  1. 根据指定的分区从主题元数据中找到主副本
  2. 获取分区最新的消费进度
  3. 从主副本拉取分区的消息
  4. 识别主副本的变化,重试

方法描述
findLeader(),客户端向种子节点发送主题元数据,将副本集加入备用节点
getLastOffset(),消费者客户端发送偏移量请求,获取分区最近的偏移量
run(),消费者低级AP I拉取消息的主要方法
findNewLeader(),当分区的主副本节点发生故障,客户将要找出新的主副本

package com.zj.consumer;import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.BrokerEndPoint;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;public class LowerConsumer {private List<String> mReplicaBrokers = new ArrayList<>();public LowerConsumer() {mReplicaBrokers = new ArrayList<>();}public static void main(String args[]) {LowerConsumer lowerConsumer = new LowerConsumer();// 最大读取消息数量long maxReads = Long.parseLong("3");// 要订阅的topicString topic = "first";// 要查找的分区int partition = Integer.parseInt("0");// broker节点的ipList<String> seeds = new ArrayList<>();seeds.add("192.168.114.100");seeds.add("192.168.114.101");seeds.add("192.168.114.102");// 端口int port = Integer.parseInt("9092");try {lowerConsumer.run(maxReads, topic, partition, seeds, port);} catch (Exception e) {System.out.println("Oops:" + e);e.printStackTrace();}}public void run(long aMaxReads, String aTopic, int aPartition, List<String> aSeedBrokers, int aPort) throws Exception {// 获取指定Topic partition的元数据PartitionMetadata metadata = findLeader(aSeedBrokers, aPort, aTopic, aPartition);if (metadata == null) {System.out.println("Can't find metadata for Topic and Partition. Exiting");return;}if (metadata.leader() == null) {System.out.println("Can't find Leader for Topic and Partition. Exiting");return;}String leadBroker = metadata.leader().host();String clientName = "Client_" + aTopic + "_" + aPartition;SimpleConsumer consumer = new SimpleConsumer(leadBroker, aPort, 100000, 64 * 1024, clientName);long readOffset = getLastOffset(consumer, aTopic, aPartition, kafka.api.OffsetRequest.EarliestTime(), clientName);int numErrors = 0;while (aMaxReads > 0) {if (consumer == null) {consumer = new SimpleConsumer(leadBroker, aPort, 100000, 64 * 1024, clientName);}FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(aTopic, aPartition, readOffset, 100000).build();FetchResponse fetchResponse = consumer.fetch(req);if (fetchResponse.hasError()) {numErrors++;// Something went wrong!short code = fetchResponse.errorCode(aTopic, aPartition);System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);if (numErrors > 5) {break;}if (code == ErrorMapping.OffsetOutOfRangeCode()) {// We asked for an invalid offset. For simple case ask for// the last element to resetreadOffset = getLastOffset(consumer, aTopic, aPartition, kafka.api.OffsetRequest.LatestTime(), clientName);continue;}consumer.close();consumer = null;leadBroker = findNewLeader(leadBroker, aTopic, aPartition, aPort);continue;}numErrors = 0;long numRead = 0;for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(aTopic, aPartition)) {long currentOffset = messageAndOffset.offset();if (currentOffset < readOffset) {System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));numRead++;aMaxReads--;}if (numRead == 0) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}if (consumer != null)consumer.close();}public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);OffsetResponse response = consumer.getOffsetsBefore(request);if (response.hasError()) {System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));return 0;}long[] offsets = response.offsets(topic, partition);return offsets[0];}private String findNewLeader(String a_oldLeader, String aTopic, int aPartition, int aPort) throws Exception {for (int i = 0; i < 3; i++) {boolean goToSleep = false;PartitionMetadata metadata = findLeader(mReplicaBrokers, aPort, aTopic, aPartition);if (metadata == null) {goToSleep = true;} else if (metadata.leader() == null) {goToSleep = true;} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {// first time through if the leader hasn't changed give// ZooKeeper a second to recover// second time, assume the broker did recover before failover,// or it was a non-Broker issue//goToSleep = true;} else {return metadata.leader().host();}if (goToSleep) {Thread.sleep(1000);}}System.out.println("Unable to find new leader after Broker failure. Exiting");throw new Exception("Unable to find new leader after Broker failure. Exiting");}private PartitionMetadata findLeader(List<String> aSeedBrokers, int aPort, String aTopic, int aPartition) {PartitionMetadata returnMetaData = null;loop:for (String seed : aSeedBrokers) {SimpleConsumer consumer = null;try {consumer = new SimpleConsumer(seed, aPort, 100000, 64 * 1024, "leaderLookup");List<String> topics = Collections.singletonList(aTopic);TopicMetadataRequest req = new TopicMetadataRequest(topics);kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);List<TopicMetadata> metaData = resp.topicsMetadata();for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {if (part.partitionId() == aPartition) {returnMetaData = part;break loop;}}}} catch (Exception e) {System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + aTopic + ", " + aPartition + "] Reason: " + e);} finally {if (consumer != null) {consumer.close();}}}if (returnMetaData != null) {mReplicaBrokers.clear();for (BrokerEndPoint replica : returnMetaData.replicas()) {mReplicaBrokers.add(replica.host());}}return returnMetaData;}
}

4、Kafka API实战相关推荐

  1. SpringBoot整合kafka之kafka分区实战

    本文来说下SpringBoot整合kafka之kafka分区实战 文章目录 准备工作 程序代码 程序测试 本文小结 准备工作 当然我们也可以不手动创建topic,在执行代码kafkaTemplate. ...

  2. 一文详解Kafka API

    摘要:Kafka的API有Producer API,Consumer API还有自定义Interceptor (自定义拦截器),以及处理的流使用的Streams API和构建连接器的Kafka Con ...

  3. Vue3+Vant+Vue-cli+Restful api实战—图书商城移动端

    Vue3+Vant+Vue-cli+Restful api实战-图书商城移动端 经过将近一个月的努力,项目终于做完了,编写的时候在代码中写了很多注释,方便自己后期查看,有问题的小伙伴可以找我帮忙解决! ...

  4. Flink Kafka Doris实战demo

    Flink Kafka Doris实战demo 环境: Flink 1.12 Doris 0.12 Kafka 1.0.1+kafka3.1.1 一:编译doris 参考官网Docker编译:http ...

  5. Apache Kafka API AdminClient Scram账户的创建与删除

    前言 由于Apache官方一直没有提供AdminClient中对账户这一块的操作,因此这部分大多数时候都是用命令行去操作的,但是命令行毕竟不是很方便.为了解决这部分问题,笔者去读了Kafka Scal ...

  6. Apache Kafka API AdminClient Scram账户的操作(增删改查)

    前言 很久没有更新Kafka API相关的文档了,因为笔者工作变动Kafka这部分内容在工作中接触的就相对于之前少了一些.但架不住kafka官方还是一如既往的勤奋,官方操作Scram账户的创建与删除这 ...

  7. storm和kafka集成报java.lang.ClassNotFoundException: kafka.api.OffsetRequest解决方法

    添加依赖 <dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka& ...

  8. 第53课实战操作Kafka+Flume成功! Spark大型项目广告点击项目技术骨架实现之Spark+Kafka+Flume实战

    第53课实战操作Kafka+Flume成功! Spark大型项目广告点击项目技术骨架实现之Spark+Kafka+Flume实战 flume 安装在集群的worker4上,地址192.168.189. ...

  9. Kafka API的运用(Producer API)

    文章目录 四.Kafka API 1.Producer API 1.1 消息发送流程 1.2 异步发送 API 1.3 分区器 1.4 同步发送 API 四.Kafka API 1.Producer ...

最新文章

  1. PHP中问号?和冒号: 的作用
  2. L. Coordinate Paper(CCPC 长春)构造
  3. 求 s=a+aa+ aaa+ aaaa +aaaaa+........的值,a是从键盘输入的,项数也为键盘输入
  4. Flink 与 Hive 的磨合期
  5. 大学生体测成绩判断c语言_体育改革瞄准高校,体测不过关可能真的毕不了业了...
  6. SQL Server: create table sql script
  7. leetcode题库:5.最长回文子串Longest Palindrome string
  8. android获取sd的大小,Android实现获取SD卡总容量,可用大小,机身内存总容量及可用大小的方法...
  9. obs多推流地址_什么都比不上动手能力,OBS 推流实践小记
  10. Java、OC、C/C++中的null
  11. 服务器多开虚拟机怎么使用教程,游戏多开,你需要这个虚拟机教程
  12. 如何安装biopython_Biopython - 安装
  13. 写在《大国崛起》之后,“中国崛起”之前(二)
  14. 关于小米手机修改开发者模式中最小宽度无限重启的问题
  15. 网络流量分析之流量采集到流量还原
  16. nlp情感分析经典书籍推荐_通过监督学习对书籍进行情感分析
  17. 机器学习实战之信用卡诈骗(二)
  18. 知识图谱问答 | (3) 关系分类概述
  19. python鲜花_【实战案例】90 行Python代码实现一棵鲜花盛开树
  20. android八股文

热门文章

  1. dashboard与coredns服务启动发生ContainerCreating的对应方法
  2. 微课在中职计算机基础中的应用,微课在中职计算机基础教学中的应用探析
  3. 我的前半生之十四,谈论富婆是一种高级趣味
  4. VC++ FTP文件上传(断点续传)
  5. 中计播客 | 为防止员工加班,无人机都用上了!
  6. win10计算机拨号连接,Win10宽带连接错误651_Win10宽带拨号错误651-192路由网
  7. 腾讯云服务器违规封禁数据恢复和迁移教程,腾讯云账号违规被封如何解封迁移数据
  8. MakerDAO亚洲区负责人王奇君:我的DAI很稳!
  9. win7计算机虚拟内存,Win7系统怎样关闭虚拟内存?win7关闭虚拟内存的方法
  10. 《python数据分析(第2版)-阿曼多.凡丹戈》读书笔记第1章-jupyter及常见库