1.producer程序

package com.test.frame.kafka.controller;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

import java.util.Properties;

public class KafkaProducer {

private final Producer producer;

public final static String TOPIC = "my-multi-topic";

//构造方法

private KafkaProducer() {

Properties props = new Properties();

props.put("metadata.broker.list", "localhost:9092");

props.put("serializer.class", "kafka.serializer.StringEncoder");

props.put("key.serializer.class", "kafka.serializer.StringEncoder");

props.put("request.required.acks", "-1");

producer = new Producer(new ProducerConfig(props));

}

void produce() {

int messageNo = 90;

final int COUNT = 100;

while (messageNo < COUNT) {

String key = String.valueOf(messageNo);

String data = "hello kafka message" + key;

producer.send(new KeyedMessage(TOPIC, key ,data));

System.out.println(data);

messageNo++;

}

}

public static void main(String[] args) throws Exception {

new KafkaProducer().produce();

}

}

运行结果:

消费方接收到的消息如下:

2.consumer端程序:

package com.test.frame.kafka.controller;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.serializer.StringDecoder;

import kafka.utils.VerifiableProperties;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

public class KafkaConsumer {

private final ConsumerConnector consumer;

private KafkaConsumer() {

Properties props = new Properties();

//zookeeper 配置

props.put("zookeeper.connect", "localhost:2181");

//group 代表一个消费组

props.put("group.id", "jd-group");

//zk连接超时

props.put("zookeeper.session.timeout.ms", "4000");

props.put("zookeeper.sync.time.ms", "200");

props.put("auto.commit.interval.ms", "1000");

props.put("auto.offset.reset", "smallest");

//序列化类

props.put("serializer.class", "kafka.serializer.StringEncoder");

ConsumerConfig config = new ConsumerConfig(props);

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);

}

void consume() {

Map topicCountMap = new HashMap();

topicCountMap.put(KafkaProducer.TOPIC, new Integer(1));

StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());

StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

Map>> consumerMap =

consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);

KafkaStream stream = consumerMap.get(KafkaProducer.TOPIC).get(0);

ConsumerIterator it = stream.iterator();

while (it.hasNext())

System.out.println(it.next().message());

}

public static void main(String[] args) {

new KafkaConsumer().consume();

}

}

运行结果如下:

此时已经联通成功。

java kafka 消费_java利用kafka生产消费消息相关推荐

  1. Java程序创建Kafka Topic,以及数据生产消费,常用的命令

    转自: Java程序创建Kafka Topic,以及数据生产消费,常用的命令_Zyy_z_的博客-CSDN博客_java kafka创建topicKafka简介: Kafka是一个分布式发布--订阅消 ...

  2. Kafka创建查看topic,生产消费指定topic消息

    启动zookeeper和Kafka之后,进入kafka目录(安装/启动kafka参考前面一章:https://www.cnblogs.com/cici20166/p/9425613.html) 1.创 ...

  3. Kafka 命令之查看topic生产消费数据查看组的消费信息

    1.创建 topic: [root@node1 bin]# ./kafka-topics.sh --zookeeper node2:2181,node3:2181,node3:2181 --creat ...

  4. java多线程 游戏_java利用多线程和Socket实现猜拳游戏

    本文实例为大家分享了利用多线程和Socket实现猜拳游戏的具体代码,供大家参考,具体内容如下 实例:猜拳游戏 猜拳游戏是指小时候玩的石头.剪刀.布的游戏.客户端与服务器的"较量", ...

  5. java jmf获取图像_java利用jmf实现拍照功能

    首先到SUN下载最新的JMF,然后安装.http://java.sun.com/products/java-media/jmf/index.jsp http://www.bt285.cn 然后,说一下 ...

  6. java preferences设置_Java利用Preferences设置个人偏好

    Preferences的中文意思即偏好或喜好的意思,也就是说同一个程序在每次运行完后,可以通过Preferences来记录用户的偏好,下次启动时,程序会利用这些信息来了解用户的喜好.而这些信息个人理解 ...

  7. java jxl包_java利用JXL包操作Excel表

    源码 package test; import java.io.File; import java.io.FileInputStream; import java.io.InputStream; im ...

  8. java 获取温度_Java利用RXTX串口通信工具类获取DS18B20温度传感器的温度值

    环境:Windows10,Eclipse4.5.2,JDK1.7 设备:DS18B20温度传感器(4线,485接口),USB转485接口转换器,笔记本电脑 注意点:RTU传输,使用的是字节,那么在程序 ...

  9. java 字母随机数_Java利用随机数生成字母

    RandomStr.java package sample; public class RandomStr { public static void main(String[] args) { //定 ...

最新文章

  1. 有人说:穷学IT富搞金融!程序员究竟是不是一帮苦孩子在做?
  2. ssh免密登录linux服务器
  3. PHP扩展模块Memcache Redis Mssql部署
  4. python重复执行函数_Python threading 单线程 timer重复调用函数
  5. 1067: [SCOI2007]降雨量 - BZOJ
  6. sublime Text3插入参考文献问题
  7. ​【安全牛学习笔记】操作系统识别
  8. Linux下C/C++开发工具注意事项
  9. edas部署需要哪些参数_强夯设计与施工中需要确定的主要技术参数有哪些
  10. 【JavaScript】数组
  11. CCIR601和CCIR656标准的区别
  12. React开发(174):ant design按钮确认删除
  13. Let‘s Fluent:更顺滑的MyBatis
  14. 信息学奥赛C++语言:最高分数的学生姓名
  15. Ext.grid.Panel一定要有renderTo或autoRender属性,不然页面为空
  16. mysql启动失败 linux_如何解决MySQL内存不足启动失败的问题
  17. mysql uroot p 报错,MySQL链接错误集。
  18. 鸿蒙系统可以安装teams吗,鸿蒙致命弱点被曝光!不能装这个软件,80%用户将望而却步!...
  19. 自动控制原理:一阶系统的时域分析
  20. 关系数据库(范式判断、函数依赖、无损分解、正则覆盖)

热门文章

  1. C语言课后习题(47)
  2. PAT乙级(1019 数字黑洞)
  3. 分布式事务中间件你知道哪些?
  4. (赠书福利)2018 Oracle 数据技术嘉年华
  5. 事件Event:带你体验鸿蒙轻内核中一对多、多对多任务同步
  6. 【华为云技术分享】小白篇,认识Python最最最常用语重要的库Requests
  7. 非编程人学Python,要注意哪些隐秘的错误认知?
  8. 【Python3网络爬虫开发实战】1.6.1-Flask的安装
  9. 类似于html的语言,其他语言的类似CL-WHO的HTML模板?
  10. android studio moudel,Android Studio将module变为library