
@(KAFKA)[kafka, 大数据]

  • kafka集群编程指南
  • 一概述
    • 一主要内容
    • 二关于scala与java的说明
  • 二producer的API
    • 一scala版本deprecated

      • 1一个简单例子
      • 2指定partitioner的producer
      • 关于KeyedMessage的分析
    • 二java版本
  • 三consumer的API
    • 一high level consummer
    • 二simplelow level consumer
      • 原理介绍

        • 为什么需要 SimpleConsumer
        • 使用SimpleConsumer的缺点
        • 使用SimpleConsumer的步骤
      • 详细步骤
      • 3代码中的一些方法分析
        • 1找到topic和分区的ledaer broker
        • 2查找从哪个offset开始读取消息
        • 3错误处理
          • 4读取数据
    • 3关于offset的指定
  • 四使用storm从kafka集群中消费消息
  • 五使用spark streaming从kafka集群中消费消息
  • 六与hadoop的集成

JAVA API:http://kafka.apache.org/082/javadoc/index.html




(1)向kafka集群发送消息的producer JAVA API

(2)从kafka集群消费消息的consumer JAVA API


(4)使用spark streaming从kafka集群中消费消息



从0.8.2版本开始,kafka使用java语言重写了producer API,并计划于0.8.3(官方说下一个版本,没有具体说哪个)使用java语言重写consumer API。官方推荐使用新producer API代替原有的scala语言写的API。


(1)kafka_0.8.2有2个版本producer API,分别是scala版本与java版本,前者放到源码的core目录下,后者放在源码的client目录下。官方推荐使用java语言版本,scala语言版本不再更新。

(2)kafka_0.8.2目前只有scala版本的consumer API,计划于下一个版本中增加java版本的。






Properties props = new Properties();
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("producer.type", "sync");
Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props));


producer.send(new KeyedMessage<Integer, String>("topic", "message”);











trait Partitioner {/*** Uses the key to calculate a partition bucket id for routing* the data to the appropriate broker partition* @return an integer between 0 and numPartitions-1*/def partition(key: Any, numPartitions: Int): Int


    props.put("partitioner.class", "com.lujinhong.demo.kafka.producer.SimplePartitioner");



package com.lujinhong.demo.kafka.producer;import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;/*** @author lujinhong* @date 2015年8月11日 下午6:47:02* @Description:*/
// Partitioning Code: (分区函数)public class SimplePartitioner implements Partitioner {public SimplePartitioner(VerifiableProperties props) {}public int partition(Object key, int a_numPartitions) {int partition = 0;String sKey = key.toString();int offset = sKey.lastIndexOf('.');if (offset > 0) {partition = Integer.parseInt(sKey.substring(offset + 1))% a_numPartitions;}return partition;}


package com.lujinhong.demo.kafka.producer;/*** @author lujinhong* @date 2015年8月11日 下午6:35:07* @Description: */
import java.util.*;import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;public class PartitionerProducer {public static void main(String[] args) {//long events = Long.parseLong(args[0]);Random rnd = new Random();Properties props = new Properties();props.put("metadata.broker.list", ",");props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("partitioner.class", "com.lujinhong.demo.kafka.producer.SimplePartitioner");props.put("request.required.acks", "1");ProducerConfig config = new ProducerConfig(props);Producer<String, String> producer = new Producer<String, String>(config);for (long nEvents = 0; nEvents < 1000; nEvents++) {long runtime = new Date().getTime();String ip = "192.168.2." + rnd.nextInt(255);String msg = runtime + ",www.example.com," + ip;KeyedMessage<String, String> data = new KeyedMessage<String, String>("ljh_test", ip, msg);producer.send(data);}producer.close();}




KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V)



 def this(topic: String, key: K, message: V) = this(topic, key, key, message)



      def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)


package kafka.producer/*** A topic, key, and value.* If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.*/
case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {if(topic == null)throw new IllegalArgumentException("Topic cannot be null.")def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)def this(topic: String, key: K, message: V) = this(topic, key, key, message)def partitionKey = {if(partKey != null)partKeyelse if(hasKey)keyelsenull  }def hasKey = key != null




high level consumer可以完成大部分的数据消费工作,它做了高度的封装,使得你很容易就读取到数据。

但如果要做一引起底层的操作,比如多次读取数据,指定offset等,则需要使用simple consumer。

(一)high level consummer

提供了更高层的抽象, 详细请参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

(1)构建一个 或者多个KafkaStream对象,并把这个对象提交到线程池中。

我们先看第一个类 HighLevelConsumerDemo的关键步骤:
(1)创建 ConsumerConfig,用于指定consumer的配置。

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {Properties props = new Properties();props.put("zookeeper.connect", a_zookeeper);props.put("group.id", a_groupId);props.put("zookeeper.session.timeout.ms", "400");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");return new ConsumerConfig(props);

(2)使用上面的ConsumerConfig创建 ConsumerConnector对象。

  consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));

(3)创建一个 KafkaStream列表

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(a_numThreads));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);


First we create a Map that tells Kafka how many threads we are providing for which topics. The consumer.createMessageStreams is how we pass this information to Kafka. The return is a map of KafkaStream to listen on for each topic. (Note here we only asked Kafka for a single Topic but we could have asked for multiple by adding another element to the Map.)



    executor = Executors.newFixedThreadPool(a_numThreads);

使用java concurrent包创建了一个固定大小的线程池。


    int threadNumber = 0;for (final KafkaStream stream : streams) {executor.submit(new ConsumerTest(stream, threadNumber));threadNumber++;}


public void shutdown() {if (consumer != null) consumer.shutdown();if (executor != null) executor.shutdown();try {if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");}} catch (InterruptedException e) {System.out.println("Interrupted during shutdown, exiting uncleanly");}


(1)通过 KafkaStream.iterator()来持续获取消息。
(2)通过 it.next().message()来读取消息内容。



Kafka does not update Zookeeper with the message offset last read after every read, instead it waits a short period of time. Due to this delay it is possible that your logic has consumed a message and that fact hasn’t been synced to zookeeper. So if your client exits/crashes you may find messages being replayed next time to start.

Also note that sometimes the loss of a Broker or other event that causes the Leader for a Partition to change can also cause duplicate messages to be replayed.

To help avoid this, make sure you provide a clean way for your client to exit instead of assuming it can be ‘kill -9’d.

As an example, the main here sleeps for 10 seconds, which allows the background consumer threads to consume data from their streams 10 seconds. Since auto commit is on, they will commit offsets every second. Then, shutdown is called, which calls shutdown on the consumer, then on the ExecutorService, and finally tries to wait for the ExecutorService to finish all outsanding work. This gives the consumer threads time to finish processing the few outstanding messages that may remain in their streams. Shutting down the consumer causes the iterators for each stream to return false for hasNext() once all messages already received from the server are processed, so the other threads should exit gracefully. Additionally, with auto commit enabled, the call to consumer.shutdown() will commit the final offsets.


package com.lujinhong.demo.kafka.consumer;import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class HighLevelConsumerDemo {private final ConsumerConnector consumer;private final String topic;private  ExecutorService executor;public HighLevelConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));this.topic = a_topic;}public void shutdown() {if (consumer != null) consumer.shutdown();if (executor != null) executor.shutdown();try {if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");}} catch (InterruptedException e) {System.out.println("Interrupted during shutdown, exiting uncleanly");}}public void run(int a_numThreads) {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(a_numThreads));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);// now launch all the threads//executor = Executors.newFixedThreadPool(a_numThreads);// now create an object to consume the messages//int threadNumber = 0;for (final KafkaStream stream : streams) {executor.submit(new ConsumerTest(stream, threadNumber));threadNumber++;}}private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {Properties props = new Properties();props.put("zookeeper.connect", a_zookeeper);props.put("group.id", a_groupId);props.put("zookeeper.session.timeout.ms", "400");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");return new ConsumerConfig(props);}public static void main(String[] args) {String zooKeeper = args[0];String groupId = args[1];String topic = args[2];int threads = Integer.parseInt(args[3]);HighLevelConsumerDemo example = new HighLevelConsumerDemo(zooKeeper, groupId, topic);example.run(threads);try {Thread.sleep(10000);} catch (InterruptedException ie) {}example.shutdown();}


package com.lujinhong.demo.kafka.consumer;/*** @author lujinhong* @date 2015年8月7日 上午10:43:58* @Description: */
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;public class ConsumerTest implements Runnable {private KafkaStream m_stream;private int m_threadNumber;public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {m_threadNumber = a_threadNumber;m_stream = a_stream;}public void run() {ConsumerIterator<byte[], byte[]> it = m_stream.iterator();while (it.hasNext())System.out.println(  "Thread " + m_threadNumber + ": " + new String(it.next().message()));System.out.println("Shutting down Thread: " + m_threadNumber);}

(二)simple(low level) consumer

1. 原理介绍

对于大部分的应用来说,high level的api已经足够完成功能。如果需要更底层的功能(如定义启动时的offset等),则需要使用simple(low level) consumer。
For most applications, the high level consumer Api is good enough. Some applications want features not exposed to the high level consumer yet (e.g., set initial offset when restarting the consumer). They can instead use our low level SimpleConsumer Api. The logic will be a bit more complicated and you can follow the example in here.

为什么需要 SimpleConsumer

使用SimpleConsumer的原因是你需要取得分区的更大控制度,这是Consumer Group做不到的。比如说:

  1. 多次读取一个消息
  2. 读取分区中的一部分消息,而不是全部
  3. 做事务保障,以保证每个消息处理且仅处理一次。


使用SimpleConsumer比起Conusmer Group需要增加大量的工作。注意对于SimpleConsumer而言,没有Group的概念。

  1. 你必须处理你的offset,以便知道应用在哪里停止消费的。
  2. 你必须找出哪个broker是某个分区的leader。
  3. 你必须处理ledaer发生变化的情况。


  1. 找到活动的broker并确定哪个是topic和分区的leader。
  2. 找到哪个是topic和分区的relica broker。
  3. 定义一个请求,指定你关心的数据
  4. 取得数据
  5. 识别leader变化,并自动恢复

2. 详细步骤


public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {// find the meta data about the topic and partition we are interested in//1、找到leaderPartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);if (metadata == null) {System.out.println("Can't find metadata for Topic and Partition. Exiting");return;}if (metadata.leader() == null) {System.out.println("Can't find Leader for Topic and Partition. Exiting");return;}String leadBroker = metadata.leader().host();String clientName = "Client_" + a_topic + "_" + a_partition;//2、构建consumer对象SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);//3、指定offsetlong readOffset = getLastOffset(consumer,a_topic, a_partition, 101000000000000L, clientName);//kafka.api.OffsetRequest.LatestTime()int numErrors = 0;while (a_maxReads > 0) {if (consumer == null) {consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);}//4、构建FetchRequest对象FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka.build();//5、通过consumer发起消费请求,返回FetchResponse对象。FetchResponse fetchResponse = consumer.fetch(req);//6、如果发生错误,则进行错误处理if (fetchResponse.hasError()) {numErrors++;// Something went wrong!short code = fetchResponse.errorCode(a_topic, a_partition);System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);if (numErrors > 5) break;if (code == ErrorMapping.OffsetOutOfRangeCode())  {//如果设定的时间值超过了当前时间,则返回最新的消息。// We asked for an invalid offset. For simple case ask for the last element to resetreadOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);continue;}consumer.close();consumer = null;leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);continue;}numErrors = 0;long numRead = 0;//7、处理获取到的每一个消息for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset = messageAndOffset.offset();//如果读取到的offset小于设定的开始offset,则跳过。if (currentOffset < readOffset) {System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);//输出offset及消息内容System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));numRead++;a_maxReads--;}if (numRead == 0) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}if (consumer != null) consumer.close();


(1)找到topic和分区的ledaer broker


* 先创建一个SimpleConsumer对象,几个参数的含义如下:

    new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize, clientId)
  • 创建一个TopicMetadataRequest对象,然后使用consumer去发送请求

    List<String> topics = Collections.singletonList(a_topic);
    TopicMetadataRequest req = new TopicMetadataRequest(topics);
    kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
  • 取得topic的元数据,然后取得具体的分区元信息

            List<TopicMetadata> metaData = resp.topicsMetadata();for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {if (part.partitionId() == a_partition) {returnMetaData = part;break loop;}}}


  • 最后取得replica broker,并将之保存到一个List中

        if (returnMetaData != null) {m_replicaBrokers.clear();for (kafka.cluster.Broker replica : returnMetaData.replicas()) {m_replicaBrokers.add(replica.host());}


    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
    PartitionMetadata returnMetaData = null;
    for (String seed : a_seedBrokers) {SimpleConsumer consumer = null;try {consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");List<String> topics = Collections.singletonList(a_topic);TopicMetadataRequest req = new TopicMetadataRequest(topics);kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);List<TopicMetadata> metaData = resp.topicsMetadata();for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {if (part.partitionId() == a_partition) {returnMetaData = part;break loop;}}}} catch (Exception e) {System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic+ ", " + a_partition + "] Reason: " + e);} finally {if (consumer != null) consumer.close();}
    if (returnMetaData != null) {m_replicaBrokers.clear();for (kafka.cluster.Broker replica : returnMetaData.replicas()) {m_replicaBrokers.add(replica.host());}
    return returnMetaData;


* kafka.api.OffsetRequest.EarliestTime() :从集群现在最早的数据开始读取
* kafka.api.OffsetRequest.LatestTime(): 从最晚的数据开始读取,也就是新进入集群的消息。


public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,long whichTime, String clientName) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);OffsetResponse response = consumer.getOffsetsBefore(request);if (response.hasError()) {System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );return 0;}long[] offsets = response.offsets(topic, partition);return offsets[0];



if (fetchResponse.hasError()) {numErrors++;// Something went wrong!short code = fetchResponse.errorCode(a_topic, a_partition);System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);if (numErrors > 5) break;if (code == ErrorMapping.OffsetOutOfRangeCode())  {// We asked for an invalid offset. For simple case ask for the last element to resetreadOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);continue;}consumer.close();consumer = null;leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);continue;}


private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {for (int i = 0; i < 3; i++) {boolean goToSleep = false;PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);if (metadata == null) {goToSleep = true;} else if (metadata.leader() == null) {goToSleep = true;} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {// first time through if the leader hasn't changed give ZooKeeper a second to recover// second time, assume the broker did recover before failover, or it was a non-Broker issue//goToSleep = true;} else {return metadata.leader().host();}if (goToSleep) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}System.out.println("Unable to find new leader after Broker failure. Exiting");throw new Exception("Unable to find new leader after Broker failure. Exiting");}

This method uses the findLeader() logic we defined earlier to find the new leader, except here we only try to connect to one of the replicas for the topic/partition. This way if we can’t reach any of the Brokers with the data we are interested in we give up and exit hard.
Since it may take a short time for ZooKeeper to detect the leader loss and assign a new leader, we sleep if we don’t get an answer. In reality ZooKeeper often does the failover very quickly so you never sleep.

// When calling FetchRequestBuilder, it's important NOT to call .replicaId(), which is meant for internal use only.
// Setting the replicaId incorrectly will cause the brokers to behave incorrectly.
FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();
FetchResponse fetchResponse = consumer.fetch(req);if (fetchResponse.hasError()) {// See code in previous section
numErrors = 0;long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset = messageAndOffset.offset();if (currentOffset < readOffset) {System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));numRead++;a_maxReads--;
}if (numRead == 0) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}

Note that the ‘readOffset’ asks the last read message what the next Offset would be. This way when the block of messages is processed we know where to ask Kafka where to start the next fetch.
Also note that we are explicitly checking that the offset being read is not less than the offset that we requested. This is needed since if Kafka is compressing the messages, the fetch request will return an entire compressed block even if the requested offset isn’t the beginning of the compressed block. Thus a message we saw previously may be returned again. Note also that we ask for a fetchSize of 100000 bytes. If the Kafka producers are writing large batches, this might not be enough, and might return an empty message set. In this case, the fetchSize should be increased until a non-empty set is returned.
Finally, we keep track of the # of messages read. If we didn’t read anything on the last request we go to sleep for a second so we aren’t hammering Kafka when there is no data



    FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000) .build();



FetchResponse fetchResponse = consumer.fetch(req);

(3) 遍历每一个response

        for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset = messageAndOffset.offset();//如果读取到的offset小于设定的开始offset,则跳过。if (currentOffset < readOffset) {System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);//输出offset及消息内容System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));numRead++;a_maxReads--;}

首先注意区分2个概念,offset与代码中指定的值,如EaliestTime(), LastestTime(),或者是一个13位的UNIX 时间戳。




    long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);


public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,long whichTime, String clientName) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);OffsetResponse response = consumer.getOffsetsBefore(request);if (response.hasError()) {System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );return 0;}long[] offsets = response.offsets(topic, partition);return offsets[0];



Core Spout

BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

Trident Spout

TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts("localhost");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);


五、使用spark streaming从kafka集群中消费消息



public class FiltersFuntion implements Function<Tuple2<String, String>, String>,akka.japi.Function<Tuple2<String, String>, String>


    @Overridepublic String call(Tuple2<String, String> v1) throws Exception {


public final class JavaKafkaWordCount {private static final Pattern SPACE = Pattern.compile(" ");private JavaKafkaWordCount() {}public static void main(String[] args) {if (args.length < 4) {System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");System.exit(1);}SparkConf sparkConf = new SparkConf().setAppName("ljh_JavaKafkaWordCount");// Create the context with a 1 second batch sizeJavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));int numThreads = Integer.parseInt(args[3]);Map<String, Integer> topicMap = new HashMap<String, Integer>();String[] topics = args[2].split(",");for (String topic: topics) {topicMap.put(topic, numThreads);}JavaPairReceiverInputDStream<String, String> messages =KafkaUtils.createStream(jssc, args[0], args[1], topicMap);JavaDStream<String> lines = messages.map(new FiltersFuntion() );JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterable<String> call(String x) {return Lists.newArrayList(SPACE.split(x));}});JavaPairDStream<String, Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) {return new Tuple2<String, Integer>(s, 1);}}).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer i1, Integer i2) {return i1 + i2;}});wordCounts.print();jssc.start();jssc.awaitTermination();}


Providing a horizontally scalable solution for aggregating and loading data into Hadoop was one of our basic use cases. To support this use case, we provide a Hadoop-based consumer which spawns off many map tasks to pull data from the Kafka cluster in parallel. This provides extremely fast pull-based Hadoop data load capabilities (we were able to fully saturate the network with only a handful of Kafka servers).



