java kafka 消费_java利用kafka生产消费消息
1.producer程序
package com.test.frame.kafka.controller;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
public class KafkaProducer {
private final Producer producer;
public final static String TOPIC = "my-multi-topic";
//构造方法
private KafkaProducer() {
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "-1");
producer = new Producer(new ProducerConfig(props));
}
void produce() {
int messageNo = 90;
final int COUNT = 100;
while (messageNo < COUNT) {
String key = String.valueOf(messageNo);
String data = "hello kafka message" + key;
producer.send(new KeyedMessage(TOPIC, key ,data));
System.out.println(data);
messageNo++;
}
}
public static void main(String[] args) throws Exception {
new KafkaProducer().produce();
}
}
运行结果:
消费方接收到的消息如下:
2.consumer端程序:
package com.test.frame.kafka.controller;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class KafkaConsumer {
private final ConsumerConnector consumer;
private KafkaConsumer() {
Properties props = new Properties();
//zookeeper 配置
props.put("zookeeper.connect", "localhost:2181");
//group 代表一个消费组
props.put("group.id", "jd-group");
//zk连接超时
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
//序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder");
ConsumerConfig config = new ConsumerConfig(props);
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
}
void consume() {
Map topicCountMap = new HashMap();
topicCountMap.put(KafkaProducer.TOPIC, new Integer(1));
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
Map>> consumerMap =
consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
KafkaStream stream = consumerMap.get(KafkaProducer.TOPIC).get(0);
ConsumerIterator it = stream.iterator();
while (it.hasNext())
System.out.println(it.next().message());
}
public static void main(String[] args) {
new KafkaConsumer().consume();
}
}
运行结果如下:
此时已经联通成功。
java kafka 消费_java利用kafka生产消费消息相关推荐
- Java程序创建Kafka Topic,以及数据生产消费,常用的命令
转自: Java程序创建Kafka Topic,以及数据生产消费,常用的命令_Zyy_z_的博客-CSDN博客_java kafka创建topicKafka简介: Kafka是一个分布式发布--订阅消 ...
- Kafka创建查看topic,生产消费指定topic消息
启动zookeeper和Kafka之后,进入kafka目录(安装/启动kafka参考前面一章:https://www.cnblogs.com/cici20166/p/9425613.html) 1.创 ...
- Kafka 命令之查看topic生产消费数据查看组的消费信息
1.创建 topic: [root@node1 bin]# ./kafka-topics.sh --zookeeper node2:2181,node3:2181,node3:2181 --creat ...
- java多线程 游戏_java利用多线程和Socket实现猜拳游戏
本文实例为大家分享了利用多线程和Socket实现猜拳游戏的具体代码,供大家参考,具体内容如下 实例:猜拳游戏 猜拳游戏是指小时候玩的石头.剪刀.布的游戏.客户端与服务器的"较量", ...
- java jmf获取图像_java利用jmf实现拍照功能
首先到SUN下载最新的JMF,然后安装.http://java.sun.com/products/java-media/jmf/index.jsp http://www.bt285.cn 然后,说一下 ...
- java preferences设置_Java利用Preferences设置个人偏好
Preferences的中文意思即偏好或喜好的意思,也就是说同一个程序在每次运行完后,可以通过Preferences来记录用户的偏好,下次启动时,程序会利用这些信息来了解用户的喜好.而这些信息个人理解 ...
- java jxl包_java利用JXL包操作Excel表
源码 package test; import java.io.File; import java.io.FileInputStream; import java.io.InputStream; im ...
- java 获取温度_Java利用RXTX串口通信工具类获取DS18B20温度传感器的温度值
环境:Windows10,Eclipse4.5.2,JDK1.7 设备:DS18B20温度传感器(4线,485接口),USB转485接口转换器,笔记本电脑 注意点:RTU传输,使用的是字节,那么在程序 ...
- java 字母随机数_Java利用随机数生成字母
RandomStr.java package sample; public class RandomStr { public static void main(String[] args) { //定 ...
最新文章
- 有人说:穷学IT富搞金融!程序员究竟是不是一帮苦孩子在做?
- ssh免密登录linux服务器
- PHP扩展模块Memcache Redis Mssql部署
- python重复执行函数_Python threading 单线程 timer重复调用函数
- 1067: [SCOI2007]降雨量 - BZOJ
- sublime Text3插入参考文献问题
- ​【安全牛学习笔记】操作系统识别
- Linux下C/C++开发工具注意事项
- edas部署需要哪些参数_强夯设计与施工中需要确定的主要技术参数有哪些
- 【JavaScript】数组
- CCIR601和CCIR656标准的区别
- React开发(174):ant design按钮确认删除
- Let‘s Fluent:更顺滑的MyBatis
- 信息学奥赛C++语言:最高分数的学生姓名
- Ext.grid.Panel一定要有renderTo或autoRender属性,不然页面为空
- mysql启动失败 linux_如何解决MySQL内存不足启动失败的问题
- mysql uroot p 报错,MySQL链接错误集。
- 鸿蒙系统可以安装teams吗,鸿蒙致命弱点被曝光!不能装这个软件,80%用户将望而却步!...
- 自动控制原理:一阶系统的时域分析
- 关系数据库(范式判断、函数依赖、无损分解、正则覆盖)
热门文章
- C语言课后习题(47)
- PAT乙级(1019 数字黑洞)
- 分布式事务中间件你知道哪些?
- (赠书福利)2018 Oracle 数据技术嘉年华
- 事件Event:带你体验鸿蒙轻内核中一对多、多对多任务同步
- 【华为云技术分享】小白篇,认识Python最最最常用语重要的库Requests
- 非编程人学Python,要注意哪些隐秘的错误认知?
- 【Python3网络爬虫开发实战】1.6.1-Flask的安装
- 类似于html的语言,其他语言的类似CL-WHO的HTML模板?
- android studio moudel,Android Studio将module变为library