1. 生产者

import java.util.Properties; import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig; public class MyProducer {   public static void main(String[] args) {   Properties props = new Properties();   props.setProperty("metadata.broker.list","localhost:9092");   props.setProperty("serializer.class","kafka.serializer.StringEncoder");   props.put("request.required.acks","1");   ProducerConfig config = new ProducerConfig(props);   //创建生产这对象Producer<String, String> producer = new Producer<String, String>(config);//生成消息KeyedMessage<String, String> data = new KeyedMessage<String, String>("mykafka","test-kafka");try {   int i =1; while(i < 100){    //发送消息
                    producer.send(data);   } } catch (Exception e) {   e.printStackTrace();   }   producer.close();   }
}

View Code

2. 消费者

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;   import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;  public class MyConsumer extends Thread{ //消费者连接private final ConsumerConnector consumer;   //要消费的话题private final String topic;   public MyConsumer(String topic) {   consumer =kafka.consumer.Consumer   .createJavaConsumerConnector(createConsumerConfig());   this.topic =topic;   }   //配置相关信息private static ConsumerConfig createConsumerConfig() {   Properties props = new Properties();
//        props.put("zookeeper.connect","localhost:2181,10.XX.XX.XX:2181,10.XX.XX.XX:2181");//配置要连接的zookeeper地址与端口//The ‘zookeeper.connect’ string identifies where to find once instance of Zookeeper in your cluster.//Kafka uses ZooKeeper to store offsets of messages consumed for a specific topic and partition by this Consumer Groupprops.put("zookeeper.connect","localhost:2181");//配置zookeeper的组id (The ‘group.id’ string defines the Consumer Group this process is consuming on behalf of.)props.put("group.id", "0");//配置zookeeper连接超时间隔//The ‘zookeeper.session.timeout.ms’ is how many milliseconds Kafka will wait for //ZooKeeper to respond to a request (read or write) before giving up and continuing to consume messages.props.put("zookeeper.session.timeout.ms","10000"); //The ‘zookeeper.sync.time.ms’ is the number of milliseconds a ZooKeeper ‘follower’ can be behind the master before an error occurs.props.put("zookeeper.sync.time.ms", "200");//The ‘auto.commit.interval.ms’ setting is how often updates to the consumed offsets are written to ZooKeeper. //Note that since the commit frequency is time based instead of # of messages consumed, if an error occurs between updates to ZooKeeper on restart you will get replayed messages.props.put("auto.commit.interval.ms", "1000");return new ConsumerConfig(props);   }   public void run(){ Map<String,Integer> topickMap = new HashMap<String, Integer>();   topickMap.put(topic, 1);   Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap =consumer.createMessageStreams(topickMap);   KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);   ConsumerIterator<byte[],byte[]> it =stream.iterator();   System.out.println("*********Results********");   while(true){   if(it.hasNext()){ //打印得到的消息   System.err.println(Thread.currentThread()+" get data:" +new String(it.next().message()));   } try {   Thread.sleep(1000);   } catch (InterruptedException e) {   e.printStackTrace();   }   }   }  public static void main(String[] args) {   MyConsumer consumerThread = new MyConsumer("mykafka");   consumerThread.start();   }
}

View Code

3. 消费者的线程执行器实现

  首先建立一个处理消息的类Consumer

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class Consumer implements Runnable {private KafkaStream stream;private int threadNumber;public Consumer(KafkaStream a_stream, int a_threadNumber) {threadNumber = a_threadNumber;stream = a_stream;}public void run() {ConsumerIterator<byte[], byte[]> it = stream.iterator();while (it.hasNext())System.out.println("Thread " + threadNumber + ": " + new String(it.next().message()));System.out.println("Shutting down Thread: " + threadNumber);}
}

View Code

  其次实现多线程的调用

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;import java.util.concurrent.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ConsumerGroup {private final ConsumerConnector consumer;private final String topic;private  ExecutorService executor;public ConsumerGroup(String a_zookeeper, String a_groupId, String a_topic) {consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));this.topic = a_topic;}public void shutdown() {if (consumer != null) consumer.shutdown();if (executor != null) executor.shutdown();try {if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");}} catch (InterruptedException e) {System.out.println("Interrupted during shutdown, exiting uncleanly");}}public void run(int a_numThreads) {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(a_numThreads));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);// now launch all the threads//
        executor = Executors.newFixedThreadPool(a_numThreads);// now create an object to consume the messages//
        int threadNumber = 0;for (final KafkaStream stream : streams) {executor.submit(new Consumer(stream, threadNumber));threadNumber++;}}private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {Properties props = new Properties();props.put("zookeeper.connect", a_zookeeper);props.put("group.id", a_groupId);props.put("zookeeper.session.timeout.ms", "400");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");return new ConsumerConfig(props);}public static void main(String[] args) {String zooKeeper = "localhost:2181";String groupId = "0";String topic = "mykafka";int threads = 2;  //启动的线程数
 ConsumerGroup group = new ConsumerGroup(zooKeeper, groupId, topic);group.run(threads);try {Thread.sleep(10000);} catch (InterruptedException ie) {}group.shutdown();}
}

View Code

kafka生产者、消费者java示例相关推荐

  1. Spring Kafka生产者/消费者样本

    我的目的是演示Spring Kafka如何为原始Kafka Producer和Consumer API提供一种易于使用且对具有Spring背景的人熟悉的抽象. 示例场景 示例场景是一个简单的场景,我有 ...

  2. java 生产者消费者同步_经典线程同步问题(生产者消费者)--Java实现

    原创作品,转载请注明出自xelz's blog 生产者-消费者(producer-consumer)问题是一个著名的线程同步问题.它描述的是:有一群生产者线程在生产产品,并将这些产品提供给消费者线程去 ...

  3. 生产者消费者 java实现_Java生产者消费者的三种实现

    Java生产者消费者是最基础的线程同步问题,java岗面试中还是很容易遇到的,之前没写过多线程的代码,面试中被问到很尬啊,面完回来恶补下.在网上查到大概有5种生产者消费者的写法,分别如下. 用sync ...

  4. Kafka生产者消费者模型

    一.Kafka回顾 1.AMQP协议 消息队列中消息交互规范,多数分布式消息中间件基于该协议进行消息传输 2.Broker 对于kafka,将生产者发送的消息,动态的添加到磁盘,一个Broker等同于 ...

  5. java kafka 集群消费_kafka集群搭建和使用Java写kafka生产者消费者

    转自:http://chengjianxiaoxue.iteye.com/blog/2190488 1 kafka集群搭建 1.zookeeper集群 搭建在110, 111,112 2.kafka使 ...

  6. Java基础学习——多线程(线程间通信-生产者消费者代码示例)

    JDK 1.5提供了多线程升级方案 将同步synchronized替换成了显示的Lock操作.可以实现唤醒.冻结指定的线程. Lock接口 Lock 实现提供了比使用 synchronized 方法和 ...

  7. java生产者消费者代码_Java实现Kafka生产者消费者代码实例

    Kafka的结构与RabbitMQ类似,消息生产者向Kafka服务器发送消息,Kafka接收消息后,再投递给消费者. 生产者的消费会被发送到Topic中,Topic中保存着各类数据,每一条数据都使用键 ...

  8. Java多线程 生产者-消费者问题示例

    奶箱:相当于缓冲区,容量有限,生产者放入牛奶,消费者拿走牛奶 生产者:实现 Runnable 接口,箱子内有牛奶就取出,没有就等着 消费者:实现 Runnable 接口,箱子还有剩余空间就往里放牛奶, ...

  9. 使用Java的BlockingQueue实现生产者-消费者

    BlockingQueue也是java.util.concurrent下的主要用来控制线程同步的工具. BlockingQueue有四个具体的实现类,根据不同需求,选择不同的实现类 1.ArrayBl ...

最新文章

  1. 径向基函数插值(3)二维数据的插值
  2. 53.垃圾回收算法的实现原理、启动Java垃圾回收、Java垃圾回收过程、垃圾回收中实例的终结、对象什么时候符合垃圾回收的条件、GC Scope 示例程序、GC OutOfMemoryError的示例
  3. 2017年大白菜系统操作说_为什么操作系统在2017年更重要
  4. 数据结构专题(二):2.5在链表指定位置插入元素
  5. python的书籍推荐_python 书籍推荐
  6. xftp6无法使用处理
  7. android连接service,android连接webservice
  8. 学习java第二天 java体系结构与表面执行流程 (one 大白(●—●))
  9. [转载] 夯实Java基础系列8:深入理解Java内部类及其实现原理
  10. IOS 中的XML解析
  11. IDEA如何导出war包
  12. mysql 密码复杂度要求_MySQL设置密码复杂度
  13. 采用jacob读取并在网页中显示ppt、word、excel
  14. Python开源指南
  15. 【原创】NES第一波:如何用通用型6502宏汇编器,制作NES/FC游戏。
  16. 在uni-app当中引入本地图片注意事项以及阿里矢量图iconfont.css当中文件查找失败:‘./iconfont.eot解决办法
  17. openjpeg:解决静态链接时未定义引用错误:undefined reference to `__imp_opj_xxxxxxx'
  18. 关于IDEA在创建Maven子模块后的pom.xml文件没有parent标签的解决方法。
  19. 极路由饥饿营销引质疑 联合创始人拿数据正面回应
  20. Unity 编辑器ScrollView滚动卡顿优化

热门文章

  1. CentOS5.6下安装Oracle10G软件 【保留报错经验】
  2. swift实现提示框第三方库:MBProgressHUD
  3. 旋转卡壳——模板(对踵点)
  4. weex 阶段总结
  5. 深入理解C语言的define
  6. vue之mapMutaions的使用 vuex中 action 用法示例 api.js的使用
  7. android常见错误与问题
  8. 读书笔记(2) OpenLayers中的图层
  9. TechEd 2012奥兰多!
  10. ADO.NET的连接模式