使用java创建kafka的生产者和消费者
创建一个Kafka的主题,连接到zk集群,副本因子3,分区3,主题名是test111
[root@h5 kafka]# bin/kafka-topics.sh --create --zookeeper h5:2181 --topic test111 --replication-factor 3 --partitions 3
查看Kafka的主题详情
[root@h5 kafka]# bin/kafka-topics.sh --describe --zookeeper h5:2181 --topic test111
查看Kafka所有的主题
[root@h5 kafka]# bin/kafka-topics.sh --list --zookeeper h5:2181
删除Kafka指定的主题
[root@h5 kafka]# bin/kafka-topics.sh --delete --zookeeper h5:2181,h6:2181,h7:2181 --topic test111
如删除时提示
Topic guowang1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
请修改Kafka/config/server.properties新增delete.topic.enable=true
kafka_2.10-0.8.2.0.jar
kafka-clients-0.8.2.0.jar
metrics-core-2.2.0.jar
scala-library-2.10.4.jar
zkclient-0.3.jar
zookeeper-3.4.6.jar
1、生产者
1 package storm.test.kafka; 2 3 import java.util.Properties; 4 5 import kafka.javaapi.producer.Producer; 6 import kafka.producer.KeyedMessage; 7 import kafka.producer.ProducerConfig; 8 import kafka.serializer.StringEncoder; 9 10 public class TestProducer { 11 12 public static void main(String[] args) throws Exception { 13 Properties prop = new Properties(); 14 prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181"); 15 prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092"); 16 prop.put("serializer.class", StringEncoder.class.getName()); 17 Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(prop)); 18 int i = 0; 19 while(true){ 20 producer.send(new KeyedMessage<String, String>("test111", "msg:"+i++)); 21 Thread.sleep(1000); 22 } 23 } 24 25 }
2、消费者
1 package storm.test.kafka; 2 3 import java.util.HashMap; 4 import java.util.List; 5 import java.util.Map; 6 import java.util.Properties; 7 8 import kafka.consumer.Consumer; 9 import kafka.consumer.ConsumerConfig; 10 import kafka.consumer.ConsumerIterator; 11 import kafka.consumer.KafkaStream; 12 import kafka.javaapi.consumer.ConsumerConnector; 13 import kafka.serializer.StringEncoder; 14 15 public class TestConsumer { 16 17 static final String topic = "test111"; 18 19 public static void main(String[] args) { 20 Properties prop = new Properties(); 21 prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181"); 22 prop.put("serializer.class", StringEncoder.class.getName()); 23 prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092"); 24 prop.put("group.id", "group1"); 25 ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop)); 26 Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 27 topicCountMap.put(topic, 1); 28 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); 29 final KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(topic).get(0); 30 ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator(); 31 while (iterator.hasNext()) { 32 String msg = new String(iterator.next().message()); 33 System.out.println("收到消息:"+msg); 34 } 35 } 36 37 }
转载于:https://www.cnblogs.com/mengyao/p/4526075.html
使用java创建kafka的生产者和消费者相关推荐
- 详细讲解如何使用Java连接Kafka构建生产者和消费者(带测试样例)
1 缘起 学习消息队列的过程中,先补习了RabbitMQ相关知识, 接着又重温了Kafka相关的知识, 发现,我并没有积累Java原生操作Kafka的文章, 只使用SpringBoot集成过Kafka ...
- kafka中生产者和消费者的分区问题
本文来书说下kafka中生产者和消费者的分区问题 文章目录 概述 主题的分区数设置 分区与生产者 分区与消费者 range roundrobin(轮询) 本文参考 本文小结 概述 我们知道,生产者发送 ...
- 从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例
从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例 前言 加依赖 生产者 加配置 生产者代码示例 消费者 加配置 消费者监听器示例 调用 关于 Serializer ...
- Java基础进阶多线程-生产者和消费者模式
1.什么是"生产者和消费者模式"? 生产线程负责生产,消费线程负责消费 生产线程和消费线程要达到均衡 这是一种特殊的业务需求,在这种特殊的情况下需要使用wait方法和notify方 ...
- 一个使用Java BlockingQueue实现的生产者和消费者
消费者 package consumer;import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit ...
- 基于java多线程来实现生产者和消费者的实例
声明:本实例是在网上看到,做了很小的修改.所以感谢之前的作者.只是一时忘了哪儿看到,没法加入链接,向原作者道歉,以示尊重.抱歉-^)... 同步栈: 1 class SycnStack { 2 pri ...
- Java多线程2.3.生产者与消费者之间的关系2
生产者与消费者之间的关系 1.线程间通信问题描述图 2.线程的状态转换图及常见执行情况 3.等待唤醒机制思路图解 4.线程的生命周期
- kafka(四)生产者和消费者配置优化
1.生产者producer 1.1 producer 参数acks设置 在消息被认为是"已提交"之前,producer需要leader确认的produce请求的应答数.该参数用于控 ...
- java 多线程经典例子——生产者与消费者的问题
产品名称类: public class Product {//产品名称private String name;public String getName() {return name;}public ...
- java实现Kafka生产者示例
使用java实现Kafka的生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 3 ...
最新文章
- 参与开源项目,结识技术大牛!CSDN“开源加速器计划”招募志愿者啦!
- Qt Creator连接MCU
- 微软旗下Maluuba推出看图问答数据集,想让AI看懂图表
- atitit.新增编辑功能 跟orm的实现 attilax p31
- 第一章 时间序列基础知识
- 针对我国——国产数据库进行分析
- 【SpringBoot项目No qualifying bean of type ‘×××Mapper‘ available:的错误解决】
- configure: error: Package requirements (oniguruma) were not met
- linux基础命令之一
- 检测屏幕.html,15款html5响应式网站跨屏幕测试工具
- 如何查找网络虚假谣言信息?
- ctf练习之闯关游戏
- python如何生成26个英文字母(包括大小和小写)以及附上英文可见(常用)字符的ASCII码表。
- ARM NEON优化3.RGB Packed转RGB Planar
- html中符号向下箭头号,html箭头相关符号
- MyCat-web 可视化运维管理和监控平台
- 梅斯健康再冲刺上市:研发投入远不及营销费用,启明、腾讯为股东
- 消息队列 64式 : 2、oslo.messaging消息处理源码分析
- 基于Python语言和PyQt5的铁路列车运行图系统
- Windows Server 2008在密码策略里禁用复杂密码
热门文章
- 【XS128】Link error L1822 symbol _FADD / _FSUB/ _FDIV/ _FMUL.....错误解决的方法
- 优秀的基于VUE移动端UI框架合集
- smali注入常用代码
- C语言精要总结-内存地址对齐与struct大小判断篇
- phpcms V9 栏目管理
- 20190816 On Java8 第六章 初始化和清理
- java day19【File类、递归】
- js进阶 14-9 ajax事件有哪些
- Git提交空文件夹的技巧
- XAMPP浏览器输入localhost跳转localhost/dashboard/