kafka集群编程指南

@(KAFKA)[kafka, 大数据]

  • kafka集群编程指南
  • 一概述
    • 一主要内容
    • 二关于scala与java的说明
  • 二producer的API
    • 一scala版本deprecated

      • 1一个简单例子
      • 2指定partitioner的producer
      • 关于KeyedMessage的分析
    • 二java版本
  • 三consumer的API
    • 一high level consummer
    • 二simplelow level consumer
      • 原理介绍

        • 为什么需要 SimpleConsumer
        • 使用SimpleConsumer的缺点
        • 使用SimpleConsumer的步骤
      • 详细步骤
      • 3代码中的一些方法分析
        • 1找到topic和分区的ledaer broker
        • 2查找从哪个offset开始读取消息
        • 3错误处理
          • 4读取数据
    • 3关于offset的指定
  • 四使用storm从kafka集群中消费消息
  • 五使用spark streaming从kafka集群中消费消息
  • 六与hadoop的集成

JAVA API:http://kafka.apache.org/082/javadoc/index.html

一、概述

(一)主要内容

本文主要介绍了以下4部分内容

(1)向kafka集群发送消息的producer JAVA API

(2)从kafka集群消费消息的consumer JAVA API

(3)使用storm从kafka集群中消费消息

(4)使用spark streaming从kafka集群中消费消息

(二)关于scala与java的说明

由于kafka本身是用scala语言写的,但大多使用kafka集群的用户都习惯使用java语言,因此,kafka使用scala语言写了一个java版本的API,目前它同时支持producer与consumer。

从0.8.2版本开始,kafka使用java语言重写了producer API,并计划于0.8.3(官方说下一个版本,没有具体说哪个)使用java语言重写consumer API。官方推荐使用新producer API代替原有的scala语言写的API。

总结:

(1)kafka_0.8.2有2个版本producer API,分别是scala版本与java版本,前者放到源码的core目录下,后者放在源码的client目录下。官方推荐使用java语言版本,scala语言版本不再更新。

(2)kafka_0.8.2目前只有scala版本的consumer API,计划于下一个版本中增加java版本的。

二、producer的API

此部分先简单介绍一下scala版本的API,然后再深入介绍java版本的API。

(一)scala版本(deprecated)

1、一个简单例子

基本步骤为创建producer——>使用producer发送消息——>关闭producer
(1)创建producer

Properties props = new Properties();
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list","192.168.172.117:9092");
props.put("producer.type", "sync");
Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props));

(2)使用producer发送消息

producer.send(new KeyedMessage<Integer, String>("topic", "message”);

2个参数分别为topic名称和发送的消息内容,均为String类型。

(3)关闭producer

producer.close();

2、指定partitioner的producer

有时候,你需要指定哪条消息发送到哪个分区中去,这样可以使得负载更加均衡,或者是满足特定的分析要求。以下我们举一个例子,相同ip的信息会被发送到相同的机器。

先看一些说明,再列出完整代码

(1)partitioner.class

server.properties中的这个参数指定了使用哪个分区类对消息进行分区,默认为kafka.producer.DefaultPartitioner,它会根据消息的key的hash值进行分区。

(2)指定partitioner类

我们可以定义自己的Partitioner类,然后在代码中指定使用这个类。
这个类必须实现这个接口

trait Partitioner {/*** Uses the key to calculate a partition bucket id for routing* the data to the appropriate broker partition* @return an integer between 0 and numPartitions-1*/def partition(key: Any, numPartitions: Int): Int
}

然后在代码中指定使用这个类:

    props.put("partitioner.class", "com.lujinhong.demo.kafka.producer.SimplePartitioner");

(3)完整代码如下:

SimplePartitioner.java

package com.lujinhong.demo.kafka.producer;import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;/*** @author lujinhong* @date 2015年8月11日 下午6:47:02* @Description:*/
// Partitioning Code: (分区函数)public class SimplePartitioner implements Partitioner {public SimplePartitioner(VerifiableProperties props) {}public int partition(Object key, int a_numPartitions) {int partition = 0;String sKey = key.toString();int offset = sKey.lastIndexOf('.');if (offset > 0) {partition = Integer.parseInt(sKey.substring(offset + 1))% a_numPartitions;}return partition;}
}

PartitionerProducer.java

package com.lujinhong.demo.kafka.producer;/*** @author lujinhong* @date 2015年8月11日 下午6:35:07* @Description: */
import java.util.*;import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;public class PartitionerProducer {public static void main(String[] args) {//long events = Long.parseLong(args[0]);Random rnd = new Random();Properties props = new Properties();props.put("metadata.broker.list", "192.168.172.98:9092,192.168.172.111:9092");props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("partitioner.class", "com.lujinhong.demo.kafka.producer.SimplePartitioner");props.put("request.required.acks", "1");ProducerConfig config = new ProducerConfig(props);Producer<String, String> producer = new Producer<String, String>(config);for (long nEvents = 0; nEvents < 1000; nEvents++) {long runtime = new Date().getTime();String ip = "192.168.2." + rnd.nextInt(255);String msg = runtime + ",www.example.com," + ip;KeyedMessage<String, String> data = new KeyedMessage<String, String>("ljh_test", ip, msg);producer.send(data);}producer.close();}
}

3.关于KeyedMessage的分析

不管你是否使用partitioner,producer均是使用KeyedMessage发送消息的,KeyedMessage有三种形式:

(1)最完整的形式应该是:

KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V)

即有4个参数,分别为topic名称,消息的Key,分区所用的key,以及消息内容

(2)忽略分区所有的key

 def this(topic: String, key: K, message: V) = this(topic, key, key, message)

此时,当消息的key同时作为分区的key

(3)忽略所有key

      def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)

此时,将key设置为空。

package kafka.producer/*** A topic, key, and value.* If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.*/
case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {if(topic == null)throw new IllegalArgumentException("Topic cannot be null.")def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)def this(topic: String, key: K, message: V) = this(topic, key, key, message)def partitionKey = {if(partKey != null)partKeyelse if(hasKey)keyelsenull  }def hasKey = key != null
}

(二)java版本

待补充

三、consumer的API

high level consumer可以完成大部分的数据消费工作,它做了高度的封装,使得你很容易就读取到数据。

但如果要做一引起底层的操作,比如多次读取数据,指定offset等,则需要使用simple consumer。

(一)high level consummer

提供了更高层的抽象, 详细请参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

本demo的2个类分别完成了以下2个功能:
(1)构建一个 或者多个KafkaStream对象,并把这个对象提交到线程池中。
(2)处理这些KafkaStream。

我们先看第一个类 HighLevelConsumerDemo的关键步骤:
(1)创建 ConsumerConfig,用于指定consumer的配置。

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {Properties props = new Properties();props.put("zookeeper.connect", a_zookeeper);props.put("group.id", a_groupId);props.put("zookeeper.session.timeout.ms", "400");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");return new ConsumerConfig(props);
}

(2)使用上面的ConsumerConfig创建 ConsumerConnector对象。

  consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));

(3)创建一个 KafkaStream列表

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(a_numThreads));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

先是构建一个(topic名称,线程数)的map,然后将其传递给consumer.createMessageStreams,这会创建一个以(topic名称,KafkaStream列表)的map,其中KafkaStream列表的size为前面指定的线程数。最后,通过get就可以获得KafkaStream列表。

First we create a Map that tells Kafka how many threads we are providing for which topics. The consumer.createMessageStreams is how we pass this information to Kafka. The return is a map of KafkaStream to listen on for each topic. (Note here we only asked Kafka for a single Topic but we could have asked for multiple by adding another element to the Map.)

就是说这里其实可以指定多个topic的,但最好不要这样做吧。

(4)创建一个线程池

    executor = Executors.newFixedThreadPool(a_numThreads);

使用java concurrent包创建了一个固定大小的线程池。

(5)为线程池指定每个线程执行的内容

    int threadNumber = 0;for (final KafkaStream stream : streams) {executor.submit(new ConsumerTest(stream, threadNumber));threadNumber++;}

(6)10秒钟后,关闭consumer与线程池。

public void shutdown() {if (consumer != null) consumer.shutdown();if (executor != null) executor.shutdown();try {if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");}} catch (InterruptedException e) {System.out.println("Interrupted during shutdown, exiting uncleanly");}

}

处理KafkaStream对象中的内容
(1)通过 KafkaStream.iterator()来持续获取消息。
(2)通过 it.next().message()来读取消息内容。

关于线程数与分区数的关系:
(1)如果线程数>分区数,则有一些线程一直空闲,它获取不到任何的消息。
(2)如果线程数<分区数,则有一些线程会读取多个分区。这将不能保证不同分区间消息获取的时间顺序,但在每一个分区内消息还是执照顺序获取的。比如这个线程有可能从分区1获取100个消息,然后再从分区2获取到10个消息。
因此一般而言,线程数与分区数相等即可,或者略大一点以作冗余。

关于关闭consumer时zk的一些说明

由于consumer会每隔一段时间才去将offset提交到zk,因此不要马上关闭关闭executor,而是等待一段时间后。
Kafka does not update Zookeeper with the message offset last read after every read, instead it waits a short period of time. Due to this delay it is possible that your logic has consumed a message and that fact hasn’t been synced to zookeeper. So if your client exits/crashes you may find messages being replayed next time to start.

Also note that sometimes the loss of a Broker or other event that causes the Leader for a Partition to change can also cause duplicate messages to be replayed.

To help avoid this, make sure you provide a clean way for your client to exit instead of assuming it can be ‘kill -9’d.

As an example, the main here sleeps for 10 seconds, which allows the background consumer threads to consume data from their streams 10 seconds. Since auto commit is on, they will commit offsets every second. Then, shutdown is called, which calls shutdown on the consumer, then on the ExecutorService, and finally tries to wait for the ExecutorService to finish all outsanding work. This gives the consumer threads time to finish processing the few outstanding messages that may remain in their streams. Shutting down the consumer causes the iterators for each stream to return false for hasNext() once all messages already received from the server are processed, so the other threads should exit gracefully. Additionally, with auto commit enabled, the call to consumer.shutdown() will commit the final offsets.

完整代码如下:
(1)HighLevelConsumerDemo

package com.lujinhong.demo.kafka.consumer;import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class HighLevelConsumerDemo {private final ConsumerConnector consumer;private final String topic;private  ExecutorService executor;public HighLevelConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));this.topic = a_topic;}public void shutdown() {if (consumer != null) consumer.shutdown();if (executor != null) executor.shutdown();try {if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");}} catch (InterruptedException e) {System.out.println("Interrupted during shutdown, exiting uncleanly");}}public void run(int a_numThreads) {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(a_numThreads));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);// now launch all the threads//executor = Executors.newFixedThreadPool(a_numThreads);// now create an object to consume the messages//int threadNumber = 0;for (final KafkaStream stream : streams) {executor.submit(new ConsumerTest(stream, threadNumber));threadNumber++;}}private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {Properties props = new Properties();props.put("zookeeper.connect", a_zookeeper);props.put("group.id", a_groupId);props.put("zookeeper.session.timeout.ms", "400");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");return new ConsumerConfig(props);}public static void main(String[] args) {String zooKeeper = args[0];String groupId = args[1];String topic = args[2];int threads = Integer.parseInt(args[3]);HighLevelConsumerDemo example = new HighLevelConsumerDemo(zooKeeper, groupId, topic);example.run(threads);try {Thread.sleep(10000);} catch (InterruptedException ie) {}example.shutdown();}
}

(2)ConsumerTest

package com.lujinhong.demo.kafka.consumer;/*** @author lujinhong* @date 2015年8月7日 上午10:43:58* @Description: */
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;public class ConsumerTest implements Runnable {private KafkaStream m_stream;private int m_threadNumber;public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {m_threadNumber = a_threadNumber;m_stream = a_stream;}public void run() {ConsumerIterator<byte[], byte[]> it = m_stream.iterator();while (it.hasNext())System.out.println(  "Thread " + m_threadNumber + ": " + new String(it.next().message()));System.out.println("Shutting down Thread: " + m_threadNumber);}
}

(二)simple(low level) consumer

1. 原理介绍

对于大部分的应用来说,high level的api已经足够完成功能。如果需要更底层的功能(如定义启动时的offset等),则需要使用simple(low level) consumer。
详细请参考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
For most applications, the high level consumer Api is good enough. Some applications want features not exposed to the high level consumer yet (e.g., set initial offset when restarting the consumer). They can instead use our low level SimpleConsumer Api. The logic will be a bit more complicated and you can follow the example in here.

为什么需要 SimpleConsumer

使用SimpleConsumer的原因是你需要取得分区的更大控制度,这是Consumer Group做不到的。比如说:

  1. 多次读取一个消息
  2. 读取分区中的一部分消息,而不是全部
  3. 做事务保障,以保证每个消息处理且仅处理一次。

使用SimpleConsumer的缺点

使用SimpleConsumer比起Conusmer Group需要增加大量的工作。注意对于SimpleConsumer而言,没有Group的概念。

  1. 你必须处理你的offset,以便知道应用在哪里停止消费的。
  2. 你必须找出哪个broker是某个分区的leader。
  3. 你必须处理ledaer发生变化的情况。

使用SimpleConsumer的步骤

  1. 找到活动的broker并确定哪个是topic和分区的leader。
  2. 找到哪个是topic和分区的relica broker。
  3. 定义一个请求,指定你关心的数据
  4. 取得数据
  5. 识别leader变化,并自动恢复

2. 详细步骤

见代码中的注释

public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {// find the meta data about the topic and partition we are interested in//1、找到leaderPartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);if (metadata == null) {System.out.println("Can't find metadata for Topic and Partition. Exiting");return;}if (metadata.leader() == null) {System.out.println("Can't find Leader for Topic and Partition. Exiting");return;}String leadBroker = metadata.leader().host();String clientName = "Client_" + a_topic + "_" + a_partition;//2、构建consumer对象SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);//3、指定offsetlong readOffset = getLastOffset(consumer,a_topic, a_partition, 101000000000000L, clientName);//kafka.api.OffsetRequest.LatestTime()int numErrors = 0;while (a_maxReads > 0) {if (consumer == null) {consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);}//4、构建FetchRequest对象FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka.build();//5、通过consumer发起消费请求,返回FetchResponse对象。FetchResponse fetchResponse = consumer.fetch(req);//6、如果发生错误,则进行错误处理if (fetchResponse.hasError()) {numErrors++;// Something went wrong!short code = fetchResponse.errorCode(a_topic, a_partition);System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);if (numErrors > 5) break;if (code == ErrorMapping.OffsetOutOfRangeCode())  {//如果设定的时间值超过了当前时间,则返回最新的消息。// We asked for an invalid offset. For simple case ask for the last element to resetreadOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);continue;}consumer.close();consumer = null;leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);continue;}numErrors = 0;long numRead = 0;//7、处理获取到的每一个消息for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset = messageAndOffset.offset();//如果读取到的offset小于设定的开始offset,则跳过。if (currentOffset < readOffset) {System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);//输出offset及消息内容System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));numRead++;a_maxReads--;}if (numRead == 0) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}if (consumer != null) consumer.close();
}

3、代码中的一些方法分析

(1)找到topic和分区的ledaer broker

最简单的方式是把你知识的一些broker作为参数传给程序,可以通过配置文件、命令行等试均可。这不需要所有的broker,只需要其中一部分就可以了。

程序说明如下:
* 先创建一个SimpleConsumer对象,几个参数的含义如下:

    new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize, clientId)
  • 创建一个TopicMetadataRequest对象,然后使用consumer去发送请求

    List<String> topics = Collections.singletonList(a_topic);
    TopicMetadataRequest req = new TopicMetadataRequest(topics);
    kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
    
  • 取得topic的元数据,然后取得具体的分区元信息

            List<TopicMetadata> metaData = resp.topicsMetadata();for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {if (part.partitionId() == a_partition) {returnMetaData = part;break loop;}}}
    

    一旦查找到所属的信息,则马上退出整个循环。

  • 最后取得replica broker,并将之保存到一个List中

        if (returnMetaData != null) {m_replicaBrokers.clear();for (kafka.cluster.Broker replica : returnMetaData.replicas()) {m_replicaBrokers.add(replica.host());}
    }
    

    完整代码如下:

    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
    PartitionMetadata returnMetaData = null;
    loop:
    for (String seed : a_seedBrokers) {SimpleConsumer consumer = null;try {consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");List<String> topics = Collections.singletonList(a_topic);TopicMetadataRequest req = new TopicMetadataRequest(topics);kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);List<TopicMetadata> metaData = resp.topicsMetadata();for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {if (part.partitionId() == a_partition) {returnMetaData = part;break loop;}}}} catch (Exception e) {System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic+ ", " + a_partition + "] Reason: " + e);} finally {if (consumer != null) consumer.close();}
    }
    if (returnMetaData != null) {m_replicaBrokers.clear();for (kafka.cluster.Broker replica : returnMetaData.replicas()) {m_replicaBrokers.add(replica.host());}
    }
    return returnMetaData;
    }
    

(2)查找从哪个offset开始读取消息

现在我们需要定义从哪个offset开始读取数据。kafka提供了2个常量来指定offset
* kafka.api.OffsetRequest.EarliestTime() :从集群现在最早的数据开始读取
* kafka.api.OffsetRequest.LatestTime(): 从最晚的数据开始读取,也就是新进入集群的消息。

不要认为0就是开始的offset,因为消息会过期然后被删除的。

public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,long whichTime, String clientName) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);OffsetResponse response = consumer.getOffsetsBefore(request);if (response.hasError()) {System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );return 0;}long[] offsets = response.offsets(topic, partition);return offsets[0];
}

(3)错误处理

由于SimpleConsumer不会自动处理错误,我们需要处理一下这些错误。

if (fetchResponse.hasError()) {numErrors++;// Something went wrong!short code = fetchResponse.errorCode(a_topic, a_partition);System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);if (numErrors > 5) break;if (code == ErrorMapping.OffsetOutOfRangeCode())  {// We asked for an invalid offset. For simple case ask for the last element to resetreadOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);continue;}consumer.close();consumer = null;leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);continue;}

一旦返回错误,我们先记录错误原因,关闭consumer,然后查找一个新的leader。

private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {for (int i = 0; i < 3; i++) {boolean goToSleep = false;PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);if (metadata == null) {goToSleep = true;} else if (metadata.leader() == null) {goToSleep = true;} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {// first time through if the leader hasn't changed give ZooKeeper a second to recover// second time, assume the broker did recover before failover, or it was a non-Broker issue//goToSleep = true;} else {return metadata.leader().host();}if (goToSleep) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}System.out.println("Unable to find new leader after Broker failure. Exiting");throw new Exception("Unable to find new leader after Broker failure. Exiting");}

This method uses the findLeader() logic we defined earlier to find the new leader, except here we only try to connect to one of the replicas for the topic/partition. This way if we can’t reach any of the Brokers with the data we are interested in we give up and exit hard.
Since it may take a short time for ZooKeeper to detect the leader loss and assign a new leader, we sleep if we don’t get an answer. In reality ZooKeeper often does the failover very quickly so you never sleep.

(4)读取数据
// When calling FetchRequestBuilder, it's important NOT to call .replicaId(), which is meant for internal use only.
// Setting the replicaId incorrectly will cause the brokers to behave incorrectly.
FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();
FetchResponse fetchResponse = consumer.fetch(req);if (fetchResponse.hasError()) {// See code in previous section
}
numErrors = 0;long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset = messageAndOffset.offset();if (currentOffset < readOffset) {System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));numRead++;a_maxReads--;
}if (numRead == 0) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}
}

Note that the ‘readOffset’ asks the last read message what the next Offset would be. This way when the block of messages is processed we know where to ask Kafka where to start the next fetch.
Also note that we are explicitly checking that the offset being read is not less than the offset that we requested. This is needed since if Kafka is compressing the messages, the fetch request will return an entire compressed block even if the requested offset isn’t the beginning of the compressed block. Thus a message we saw previously may be returned again. Note also that we ask for a fetchSize of 100000 bytes. If the Kafka producers are writing large batches, this might not be enough, and might return an empty message set. In this case, the fetchSize should be increased until a non-empty set is returned.
Finally, we keep track of the # of messages read. If we didn’t read anything on the last request we go to sleep for a second so we aren’t hammering Kafka when there is no data

3、关于offset的指定

(1)定义request对象

    FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000) .build();

其中readOffset是将要读取的offset。

(2)通过request去发送请求

FetchResponse fetchResponse = consumer.fetch(req);

(3) 遍历每一个response

        for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset = messageAndOffset.offset();//如果读取到的offset小于设定的开始offset,则跳过。if (currentOffset < readOffset) {System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);//输出offset及消息内容System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));numRead++;a_maxReads--;}

(4)关于offset的指定
首先注意区分2个概念,offset与代码中指定的值,如EaliestTime(), LastestTime(),或者是一个13位的UNIX 时间戳。

前者是一个数值,创建topic时,offset为0,然后不断的递增。但要注意,0不总是代表kafka集群中最早的数值,因为超过一定时间的数据会被删除。kafka集群的数据文件中的文件名就是这个文件中第一个消息的offset。

后者是方便程序开发所定义的几个值,可以是最早的消息,最晚的消息,或者是指定一个特定的时间。因为用户不可能说从哪个offset开始消费消息,而应该是从哪个时间开始消费消息。

下面我们看看如何通过指定一个时间来确定一个offset:

    long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);

具体是这样的:

public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,long whichTime, String clientName) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);OffsetResponse response = consumer.getOffsetsBefore(request);if (response.hasError()) {System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );return 0;}long[] offsets = response.offsets(topic, partition);return offsets[0];
}

另:clientName有什么用呢?

四、使用storm从kafka集群中消费消息

一个简单示例:
Core Spout

BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

Trident Spout

TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts("localhost");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);

详细请参考storm-kafka编程指南

五、使用spark streaming从kafka集群中消费消息

1、定义你所需要完成的功能

这里以日志的过滤为例:

public class FiltersFuntion implements Function<Tuple2<String, String>, String>,akka.japi.Function<Tuple2<String, String>, String>

关键是覆盖Function中的call()方法,它定义了对每一个消息所作的处理

    @Overridepublic String call(Tuple2<String, String> v1) throws Exception {

2、定义应用的结构

public final class JavaKafkaWordCount {private static final Pattern SPACE = Pattern.compile(" ");private JavaKafkaWordCount() {}public static void main(String[] args) {if (args.length < 4) {System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");System.exit(1);}SparkConf sparkConf = new SparkConf().setAppName("ljh_JavaKafkaWordCount");// Create the context with a 1 second batch sizeJavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));int numThreads = Integer.parseInt(args[3]);Map<String, Integer> topicMap = new HashMap<String, Integer>();String[] topics = args[2].split(",");for (String topic: topics) {topicMap.put(topic, numThreads);}JavaPairReceiverInputDStream<String, String> messages =KafkaUtils.createStream(jssc, args[0], args[1], topicMap);JavaDStream<String> lines = messages.map(new FiltersFuntion() );JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterable<String> call(String x) {return Lists.newArrayList(SPACE.split(x));}});JavaPairDStream<String, Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) {return new Tuple2<String, Integer>(s, 1);}}).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer i1, Integer i2) {return i1 + i2;}});wordCounts.print();jssc.start();jssc.awaitTermination();}
}

六、与hadoop的集成

通过一个叫camus的第三方项目实现,请谨慎使用。
Providing a horizontally scalable solution for aggregating and loading data into Hadoop was one of our basic use cases. To support this use case, we provide a Hadoop-based consumer which spawns off many map tasks to pull data from the Kafka cluster in parallel. This provides extremely fast pull-based Hadoop data load capabilities (we were able to fully saturate the network with only a handful of Kafka servers).

详细请参考项目:https://github.com/linkedin/camus/

kafka集群编程指南相关推荐

  1. 4.2.9 Kafka集群与运维, 应用场景, 集群搭建, 集群监控JMX(度量指标, JConsole, 编程获取, Kafka Eagle)

    目录 3.1 集群应用场景 1 消息传递 2 网站活动路由 3 监控指标 4 日志汇总 5 流处理 6 活动采集 7 提交日志 总结 3.2 集群搭建 3.2.1 Zookeeper集群搭建 3.2. ...

  2. kafka 丢弃数据_20条关于Kafka集群应对高吞吐量的避坑指南

    Apache Kafka是一款流行的分布式数据流平台,它已经广泛地被诸如New Relic(数据智能平台).Uber.Square(移动支付公司)等大型公司用来构建可扩展的.高吞吐量的.高可靠的实时数 ...

  3. 避坑指南:Kafka集群快速扩容的方案总结

    什么是数据迁移 Apache Kafka 对于数据迁移的官方说法是分区重分配.即重新分配分区在集群的分布情况.官方提供了kafka-reassign-partitions.sh脚本来执行分区重分配操作 ...

  4. 09 Confluent_Kafka权威指南 第九章:管理kafka集群

    文章目录 CHAPTER 9 Administering Kafka 管理kafka Topic Operations 主题操作 Creating a New Topic 创建新的topic Spec ...

  5. 记学编程以来第一次找错费时一天的经历——kafka集群创建主题时报错

    错误如下: Exception in thread "main" joptsimple.UnrecognizedOptionException: bootstrap-server ...

  6. 【Kafka】Kafka3.3.1集群搭建指南KRaft版本

    目录 一.背景和描述 二.资源情况 三.技术选型 四.部署Kraft版本集群 五.配置SSL模式 六.Springboot使用SSL集成 参考资料 一.背景和描述 考虑资源安全性,需要搭建不依赖Zoo ...

  7. 如何找到Kafka集群的吞吐量极限?\n

    Kafka是非常流行的分布式流式处理和大数据消息队列解决方案,在技术行业已经得到了广泛采用,在Dropbox也不例外.Kafka在Dropbox的很多分布式系统数据结构中发挥着重要的作用:数据分析.机 ...

  8. 吞吐量达到瓶颈后下降_如何找到 Kafka 集群的吞吐量极限?

    Kafka 是非常流行的分布式流式处理和大数据消息队列解决方案,在技术行业已经得到了广泛采用,在 Dropbox 也不例外.Kafka 在 Dropbox 的很多分布式系统数据结构中发挥着重要的作用: ...

  9. 《Apache Kafka实战》读书笔记-调优Kafka集群

    <Apache Kafka实战>读书笔记-调优Kafka集群 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 一.确定调优目标 1>.常见的非功能性要求 一.性能( ...

最新文章

  1. flutter 泛型_Flutter/Dart - 泛型
  2. CentOS上安装 jdk
  3. [原]OpenCV2.2无法打开摄像头或读取视频的解决方法
  4. python编程语言是什么-什么是编程语言,什么是Python解释器
  5. 【Java 网络编程】客户端 Socket 配置 ( 超时时间 | 端口复用 | Nagle 算法 | 心跳包机制 | 连接关闭机制 | 缓冲区大小 | 性能权重设置 | 紧急数据设置 )
  6. iphone同步助手_iPhone怎么批量删除联系人?
  7. Null和Undefined类型
  8. 使用构建器模式来帮助您的单元测试
  9. Android批量图片加载经典系列——使用二级缓存、异步网络负载形象
  10. 灵活的Zend Framework之使用自定义的Frontcontroller
  11. MacBook安装git教程,git学习这一篇就够了!
  12. SLIC超像素分割算法
  13. ice服务器修复教程,Bootice:系统引导菜单修复利器的功能解说
  14. loss weight
  15. Vue开发实例(11)之el-menu实现左侧菜单导航
  16. 切比雪夫不等式例题讲解_浅谈|f(x)|最大值的最小值问题--切比雪夫最佳逼近直线在高考中的应用...
  17. 编译Busybox产生的两个错误
  18. PS小菜-1(快速旋转图片)
  19. 服务器xp系统无法粘贴到本地,电脑远程登录时本机和远程机间不能直接复制粘贴文件...
  20. 新手怎么重装系统?只需3步看完小白也会装!

热门文章

  1. 高效万进制——蓝桥杯|HDOJ 1002 大数加法——30行代码AC
  2. [leetcode] 154.寻找旋转排序数组中的最小值 II
  3. MyEclipse使用总结
  4. python sklearn 梯度下降法_(四)梯度下降法及其python实现
  5. 学python的前提_Python语言学习前提:条件语句
  6. js function如何传入参数未字符串_JavaScript 学习之路- JS 小测验
  7. 获取字符串中的.前面的长度_算法连载之求解不含有重复字符的最长子串长度...
  8. java静态多态_Java静态方法不具有多态性详解
  9. python两个数相加时_两数相加 leetcode Python
  10. 湖南大学计算机系统原理实验,湖南大学-计算机组成原理实验-实验3-bomblab_图文.pdf...