原文地址:http://blog.csdn.net/honglei915/article/details/37697655

Kafka视频教程同步首发,欢迎观看!


Kafka Producer APIs

新版的Producer API提供了以下功能:

  1. 可以将多个消息缓存到本地队列里,然后异步的批量发送到broker,可以通过参数producer.type=async做到。缓存的大小可以通过一些参数指定:queue.timebatch.size。一个后台线程((kafka.producer.async.ProducerSendThread)从队列中取出数据并让kafka.producer.EventHandler将消息发送到broker,也可以通过参数event.handler定制handler,在producer端处理数据的不同的阶段注册处理器,比如可以对这一过程进行日志追踪,或进行一些监控。只需实现kafka.producer.async.CallbackHandler接口,并在callback.handler中配置。
  2. 自己编写Encoder来序列化消息,只需实现下面这个接口。默认的Encoder是kafka.serializer.DefaultEncoder
    interface Encoder<T> {public Message toMessage(T data);
    }
  3. 提供了基于Zookeeper的broker自动感知能力,可以通过参数zk.connect实现。如果不使用Zookeeper,也可以使用broker.list参数指定一个静态的brokers列表,这样消息将被随机的发送到一个broker上,一旦选中的broker失败了,消息发送也就失败了。
  4. 通过分区函数kafka.producer.Partitioner类对消息分区
    interface Partitioner<T> {int partition(T key, int numPartitions);
    }

    分区函数有两个参数:key和可用的分区数量,从分区列表中选择一个分区并返回id。默认的分区策略是hash(key)%numPartitions.如果key是null,就随机的选择一个。可以通过参数partitioner.class定制分区函数。

新的api完整实例如下:

package com.cuicui.kafkademon;import java.util.ArrayList;
import java.util.List;
import java.util.Properties;import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;/*** @author <a href="mailto:leicui001@126.com">崔磊</a>* @date 2015年11月4日 上午11:44:15*/
public class MyProducer {public static void main(String[] args) throws InterruptedException {Properties props = new Properties();props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("metadata.broker.list", KafkaProperties.BROKER_CONNECT);props.put("partitioner.class", "com.cuicui.kafkademon.MyPartitioner");props.put("request.required.acks", "1");ProducerConfig config = new ProducerConfig(props);Producer<String, String> producer = new Producer<String, String>(config);// 单个发送for (int i = 0; i <= 1000000; i++) {KeyedMessage<String, String> message =new KeyedMessage<String, String>(KafkaProperties.TOPIC, i + "", "Message" + i);producer.send(message);Thread.sleep(5000);}// 批量发送List<KeyedMessage<String, String>> messages = new ArrayList<KeyedMessage<String, String>>(100);for (int i = 0; i <= 10000; i++) {KeyedMessage<String, String> message =new KeyedMessage<String, String>(KafkaProperties.TOPIC, i + "", "Message" + i);messages.add(message);if (i % 100 == 0) {producer.send(messages);messages.clear();}}producer.send(messages);}
}

下面这个是用到的分区函数:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;public class MyPartitioner implements Partitioner {public MyPartitioner(VerifiableProperties props) {}/** @see kafka.producer.Partitioner#partition(java.lang.Object, int)*/@Overridepublic int partition(Object key, int partitionCount) {return Integer.valueOf((String) key) % partitionCount;}
}

KafKa Consumer APIs

Consumer API有两个级别。低级别的和一个指定的broker保持连接,并在接收完消息后关闭连接,这个级别是无状态的,每次读取消息都带着offset。

高级别的API隐藏了和brokers连接的细节,在不必关心服务端架构的情况下和服务端通信。还可以自己维护消费状态,并可以通过一些条件指定订阅特定的topic,比如白名单黑名单或者正则表达式。

低级别的API

package com.cuicui.kafkademon;import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;/*** offset自己维护 目标topic、partition均由自己分配* * @author <a href="mailto:leicui001@126.com">崔磊</a>* @date 2015年11月4日 上午11:44:15**/
public class MySimpleConsumer {public static void main(String[] args) {new MySimpleConsumer().consume();}/*** 消费消息*/public void consume() {int partition = 0;// 找到leaderBroker leaderBroker = findLeader(KafkaProperties.BROKER_CONNECT, KafkaProperties.TOPIC, partition);// 从leader消费SimpleConsumer simpleConsumer =new SimpleConsumer(leaderBroker.host(), leaderBroker.port(), 20000, 10000, "mySimpleConsumer");long startOffet = 1;int fetchSize = 1000;while (true) {long offset = startOffet;// 添加fetch指定目标tipic,分区,起始offset及fetchSize(字节),可以添加多个fetchFetchRequest req =new FetchRequestBuilder().addFetch(KafkaProperties.TOPIC, 0, startOffet, fetchSize).build();// 拉取消息FetchResponse fetchResponse = simpleConsumer.fetch(req);ByteBufferMessageSet messageSet = fetchResponse.messageSet(KafkaProperties.TOPIC, partition);for (MessageAndOffset messageAndOffset : messageSet) {Message mess = messageAndOffset.message();ByteBuffer payload = mess.payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);String msg = new String(bytes);offset = messageAndOffset.offset();System.out.println("partition : " + 3 + ", offset : " + offset + "  mess : " + msg);}// 继续消费下一批startOffet = offset + 1;}}/*** 找到制定分区的leader broker* * @param brokerHosts broker地址,格式为:“host1:port1,host2:port2,host3:port3”* @param topic topic* @param partition 分区* @return*/public Broker findLeader(String brokerHosts, String topic, int partition) {Broker leader = findPartitionMetadata(brokerHosts, topic, partition).leader();System.out.println(String.format("Leader tor topic %s, partition %d is %s:%d", topic, partition, leader.host(),leader.port()));return leader;}/*** 找到指定分区的元数据* * @param brokerHosts broker地址,格式为:“host1:port1,host2:port2,host3:port3”* @param topic topic* @param partition 分区* @return 元数据*/private PartitionMetadata findPartitionMetadata(String brokerHosts, String topic, int partition) {PartitionMetadata returnMetaData = null;for (String brokerHost : brokerHosts.split(",")) {SimpleConsumer consumer = null;String[] splits = brokerHost.split(":");consumer = new SimpleConsumer(splits[0], Integer.valueOf(splits[1]), 100000, 64 * 1024, "leaderLookup");List<String> topics = Collections.singletonList(topic);TopicMetadataRequest request = new TopicMetadataRequest(topics);TopicMetadataResponse response = consumer.send(request);List<TopicMetadata> topicMetadatas = response.topicsMetadata();for (TopicMetadata topicMetadata : topicMetadatas) {for (PartitionMetadata PartitionMetadata : topicMetadata.partitionsMetadata()) {if (PartitionMetadata.partitionId() == partition) {returnMetaData = PartitionMetadata;}}}if (consumer != null)consumer.close();}return returnMetaData;}/*** 根据时间戳找到某个客户端消费的offset* * @param consumer SimpleConsumer* @param topic topic* @param partition 分区* @param clientID 客户端的ID* @param whichTime 时间戳* @return offset*/public long getLastOffset(SimpleConsumer consumer, String topic, int partition, String clientID, long whichTime) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientID);OffsetResponse response = consumer.getOffsetsBefore(request);long[] offsets = response.offsets(topic, partition);return offsets[0];}
}

低级别的API是高级别API实现的基础,也是为了一些对维持消费状态有特殊需求的场景,比如Hadoop consumer这样的离线consumer。

高级别的API

package com.cuicui.kafkademon;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;/*** offset在zookeeper中记录,以group.id为key 分区和customer的对应关系由Kafka维护* * @author <a href="mailto:leicui001@126.com">崔磊</a>* @date 2015年11月4日 上午11:44:15*/
public class MyHighLevelConsumer {/*** 该consumer所属的组ID*/private String groupid;/*** 该consumer的ID*/private String consumerid;/*** 每个topic开几个线程?*/private int threadPerTopic;public MyHighLevelConsumer(String groupid, String consumerid, int threadPerTopic) {super();this.groupid = groupid;this.consumerid = consumerid;this.threadPerTopic = threadPerTopic;}public void consume() {Properties props = new Properties();props.put("group.id", groupid);props.put("consumer.id", consumerid);props.put("zookeeper.connect", KafkaProperties.ZK_CONNECT);props.put("zookeeper.session.timeout.ms", "60000");props.put("zookeeper.sync.time.ms", "2000");// props.put("auto.commit.interval.ms", "1000");ConsumerConfig config = new ConsumerConfig(props);ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);Map<String, Integer> topicCountMap = new HashMap<String, Integer>();// 设置每个topic开几个线程topicCountMap.put(KafkaProperties.TOPIC, threadPerTopic);// 获取streamMap<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topicCountMap);// 为每个stream启动一个线程消费消息for (KafkaStream<byte[], byte[]> stream : streams.get(KafkaProperties.TOPIC)) {new MyStreamThread(stream).start();}}/*** 每个consumer的内部线程* * @author cuilei05**/private class MyStreamThread extends Thread {private KafkaStream<byte[], byte[]> stream;public MyStreamThread(KafkaStream<byte[], byte[]> stream) {super();this.stream = stream;}@Overridepublic void run() {ConsumerIterator<byte[], byte[]> streamIterator = stream.iterator();// 逐条处理消息while (streamIterator.hasNext()) {MessageAndMetadata<byte[], byte[]> message = streamIterator.next();String topic = message.topic();int partition = message.partition();long offset = message.offset();String key = new String(message.key());String msg = new String(message.message());// 在这里处理消息,这里仅简单的输出// 如果消息消费失败,可以将已上信息打印到日志中,活着发送到报警短信和邮件中,以便后续处理System.out.println("consumerid:" + consumerid + ", thread : " + Thread.currentThread().getName()+ ", topic : " + topic + ", partition : " + partition + ", offset : " + offset + " , key : "+ key + " , mess : " + msg);}}}public static void main(String[] args) {String groupid = "myconsumergroup";MyHighLevelConsumer consumer1 = new MyHighLevelConsumer(groupid, "myconsumer1", 3);MyHighLevelConsumer consumer2 = new MyHighLevelConsumer(groupid, "myconsumer2", 3);consumer1.consume();consumer2.consume();}
}

这个API围绕着由KafkaStream实现的迭代器展开,每个流代表一系列从一个或多个分区多和broker上汇聚来的消息,每个流由一个线程处理,所以客户端可以在创建的时候通过参数指定想要几个流。一个流是多个分区多个broker的合并,但是每个分区的消息只会流向一个流。

每调用一次createMessageStreams都会将consumer注册到topic上,这样consumer和brokers之间的负载均衡就会进行调整。API鼓励每次调用创建更多的topic流以减少这种调整。createMessageStreamsByFilter方法注册监听可以感知新的符合filter的tipic。

漫游Kafka实战篇之客户端编程实例相关推荐

  1. 漫游kafka实战篇之搭建Kafka开发环境

    转载注明出处:http://blog.csdn.net/honglei915/article/details/37563647 Kafka视频教程同步首发,欢迎观看! 上篇文章中我们搭建了kafka的 ...

  2. 漫游Kafka实战篇之搭建Kafka运行环境

    原文地址:http://blog.csdn.net/honglei915/article/details/37564329 Kafka视频教程同步首发,欢迎观看! 接下来一步一步搭建Kafka运行环境 ...

  3. 漫游Kafka实战篇clientAPI

    原文地址:http://blog.csdn.net/honglei915/article/details/37697655 Kafka Producer APIs 旧版的Procuder API有两种 ...

  4. kafka java编程demo_Kafka简单客户端编程实例

    今天,我们给大家带来一篇如何利用Kafka的API进行客户端编程的文章,这篇文章很简单,就是利用Kafka的API创建一个生产者和消费者,生产者不断向Kafka写入消息,消费者则不断消费Kafka的消 ...

  5. kafka实战篇(二):消息消费实战

    写在前面:我是「且听风吟」,目前是某上市游戏公司的大数据开发工程师,热爱大数据开源技术,喜欢分享自己的所学所悟,现阶段正在从头梳理大数据体系的知识,以后将会把时间重点放在Spark和Flink上面. ...

  6. go语言之进阶篇http客户端编程

    1.http客户端编程 示例: http_server.go package mainimport ("fmt""net/http" )//w, 给客户端回复数 ...

  7. 漫游Kafka入门篇之简单介绍

    原文地址:http://blog.csdn.net/honglei915/article/details/37564521 Kafka视频教程同步首发,欢迎观看! 介绍 Kafka是一个分布式的.可分 ...

  8. 漫游Kafka设计篇之性能优化(7)

    Kafka在提高效率方面做了很大努力.Kafka的一个主要使用场景是处理网站活动日志,吞吐量是非常大的,每个页面都会产生好多次写操作.读方面,假设每个消息只被消费一次,读的量的也是很大的,Kafka也 ...

  9. 漫游Kafka设计篇之性能优化

    原文地址:http://blog.csdn.net/honglei915/article/details/37564757 Kafka视频教程同步首发,欢迎观看! Kafka在提高效率方面做了很大努力 ...

最新文章

  1. Maven的setting.xml配置文件详解(中文)
  2. 实现统计一个字符串中的每个单词出现的次数--基于Go语言
  3. MySql模糊查询中特殊字符处理
  4. 创建数据库是列名无效咋办_怎样解决列名无效 - 技术问答 - .Net源码论坛 .net源码,ASP.net|论坛 - Powered by Discuz!NT...
  5. 008_logback配置语法
  6. php抓取新浪新闻,新浪新闻采集程序
  7. 女生心中的理想男生!这些条件你符合几条?
  8. 重装系统后电脑没有计算机也没有网络连接,电脑网络重置以后没有了wifi连接...
  9. enum枚举类型的范例
  10. pyspark报错问题 Exception in thread main java.lang.UnsupportedClassVersionError 成功解决
  11. 经典的哲学家就餐问题
  12. ISO 8601规则
  13. Java小程序post如何传参,微信小程序向Java后台传输参数的方法实现
  14. 一张图片中的分离数据+十进制删改十六进制+连续base32和64编码+okk编码题目--rsa中求d题目
  15. 大数据或成大金融时代的奠基石
  16. OpenStack-M版(Mitaka)搭建基于(Centos7.2)+++十、Openstack对象存储服务(swift)上
  17. Iterator patten 读书笔记
  18. 1108 String复读机 – PAT乙级真题
  19. 不考研也有出路?教育部重要通知,事关第二学士学位!
  20. Java实现贪吃蛇小游戏(附完整源码)

热门文章

  1. ES6新特性之map和reduce方法的使用
  2. screen常用命令以及screen恢复会话时出现There is no screen to be resumed matching
  3. 一个分支强制替代另一个分支
  4. 【转】strcpy溢出的攻击示例
  5. RTCStartupDemo:一款极其简单的 WebRTC 入门项目
  6. build.xml引用其它文件的任务
  7. Capybara 2.14.1 发布,Web 应用验收测试框架
  8. 换个姿势为安装包重签名
  9. SQLServer文件收缩-图形化+命令
  10. IBM公布Kitura 1.0和Bluemix Runtime for Swift 3