创建一个Kafka的主题,连接到zk集群,副本因子3,分区3,主题名是test111
        [root@h5 kafka]# bin/kafka-topics.sh --create --zookeeper h5:2181 --topic test111 --replication-factor 3 --partitions 3
    查看Kafka的主题详情
        [root@h5 kafka]# bin/kafka-topics.sh --describe --zookeeper h5:2181 --topic test111
    查看Kafka所有的主题    
        [root@h5 kafka]# bin/kafka-topics.sh --list --zookeeper h5:2181
    删除Kafka指定的主题    
        [root@h5 kafka]# bin/kafka-topics.sh --delete --zookeeper h5:2181,h6:2181,h7:2181 --topic test111
        如删除时提示
        Topic guowang1 is marked for deletion.
   Note: This will have no impact if delete.topic.enable is not set to true.
      请修改Kafka/config/server.properties新增delete.topic.enable=true
        kafka_2.10-0.8.2.0.jar
        kafka-clients-0.8.2.0.jar
        metrics-core-2.2.0.jar
        scala-library-2.10.4.jar
        zkclient-0.3.jar
        zookeeper-3.4.6.jar
    
    1、生产者

 1 package storm.test.kafka;
 2
 3         import java.util.Properties;
 4
 5         import kafka.javaapi.producer.Producer;
 6         import kafka.producer.KeyedMessage;
 7         import kafka.producer.ProducerConfig;
 8         import kafka.serializer.StringEncoder;
 9
10         public class TestProducer {
11
12             public static void main(String[] args) throws Exception {
13                 Properties prop = new Properties();
14                 prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181");
15                 prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092");
16                 prop.put("serializer.class", StringEncoder.class.getName());
17                 Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(prop));
18                 int i = 0;
19                 while(true){
20                     producer.send(new KeyedMessage<String, String>("test111", "msg:"+i++));
21                     Thread.sleep(1000);
22                 }
23             }
24
25         }

2、消费者

 1 package storm.test.kafka;
 2
 3         import java.util.HashMap;
 4         import java.util.List;
 5         import java.util.Map;
 6         import java.util.Properties;
 7
 8         import kafka.consumer.Consumer;
 9         import kafka.consumer.ConsumerConfig;
10         import kafka.consumer.ConsumerIterator;
11         import kafka.consumer.KafkaStream;
12         import kafka.javaapi.consumer.ConsumerConnector;
13         import kafka.serializer.StringEncoder;
14
15         public class TestConsumer {
16
17             static final String topic = "test111";
18
19             public static void main(String[] args) {
20                 Properties prop = new Properties();
21                 prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181");
22                 prop.put("serializer.class", StringEncoder.class.getName());
23                 prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092");
24                 prop.put("group.id", "group1");
25                 ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
26                 Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
27                 topicCountMap.put(topic, 1);
28                 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
29                 final KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(topic).get(0);
30                 ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
31                 while (iterator.hasNext()) {
32                     String msg = new String(iterator.next().message());
33                     System.out.println("收到消息:"+msg);
34                 }
35             }
36
37         }

转载于:https://www.cnblogs.com/mengyao/p/4526075.html

使用java创建kafka的生产者和消费者相关推荐

  1. 详细讲解如何使用Java连接Kafka构建生产者和消费者(带测试样例)

    1 缘起 学习消息队列的过程中,先补习了RabbitMQ相关知识, 接着又重温了Kafka相关的知识, 发现,我并没有积累Java原生操作Kafka的文章, 只使用SpringBoot集成过Kafka ...

  2. kafka中生产者和消费者的分区问题

    本文来书说下kafka中生产者和消费者的分区问题 文章目录 概述 主题的分区数设置 分区与生产者 分区与消费者 range roundrobin(轮询) 本文参考 本文小结 概述 我们知道,生产者发送 ...

  3. 从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例

    从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例 前言 加依赖 生产者 加配置 生产者代码示例 消费者 加配置 消费者监听器示例 调用 关于 Serializer ...

  4. Java基础进阶多线程-生产者和消费者模式

    1.什么是"生产者和消费者模式"? 生产线程负责生产,消费线程负责消费 生产线程和消费线程要达到均衡 这是一种特殊的业务需求,在这种特殊的情况下需要使用wait方法和notify方 ...

  5. 一个使用Java BlockingQueue实现的生产者和消费者

    消费者 package consumer;import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit ...

  6. 基于java多线程来实现生产者和消费者的实例

    声明:本实例是在网上看到,做了很小的修改.所以感谢之前的作者.只是一时忘了哪儿看到,没法加入链接,向原作者道歉,以示尊重.抱歉-^)... 同步栈: 1 class SycnStack { 2 pri ...

  7. Java多线程2.3.生产者与消费者之间的关系2

    生产者与消费者之间的关系 1.线程间通信问题描述图 2.线程的状态转换图及常见执行情况 3.等待唤醒机制思路图解 4.线程的生命周期

  8. kafka(四)生产者和消费者配置优化

    1.生产者producer 1.1 producer 参数acks设置 在消息被认为是"已提交"之前,producer需要leader确认的produce请求的应答数.该参数用于控 ...

  9. java 多线程经典例子——生产者与消费者的问题

    产品名称类: public class Product {//产品名称private String name;public String getName() {return name;}public ...

  10. java实现Kafka生产者示例

    使用java实现Kafka的生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 3 ...

最新文章

  1. 参与开源项目,结识技术大牛!CSDN“开源加速器计划”招募志愿者啦!
  2. Qt Creator连接MCU
  3. 微软旗下Maluuba推出看图问答数据集,想让AI看懂图表
  4. atitit.新增编辑功能 跟orm的实现 attilax p31
  5. 第一章 时间序列基础知识
  6. 针对我国——国产数据库进行分析
  7. 【SpringBoot项目No qualifying bean of type ‘×××Mapper‘ available:的错误解决】
  8. configure: error: Package requirements (oniguruma) were not met
  9. linux基础命令之一
  10. 检测屏幕.html,15款html5响应式网站跨屏幕测试工具
  11. 如何查找网络虚假谣言信息?
  12. ctf练习之闯关游戏
  13. python如何生成26个英文字母(包括大小和小写)以及附上英文可见(常用)字符的ASCII码表。
  14. ARM NEON优化3.RGB Packed转RGB Planar
  15. html中符号向下箭头号,html箭头相关符号
  16. MyCat-web 可视化运维管理和监控平台
  17. 梅斯健康再冲刺上市:研发投入远不及营销费用,启明、腾讯为股东
  18. 消息队列 64式 : 2、oslo.messaging消息处理源码分析
  19. 基于Python语言和PyQt5的铁路列车运行图系统
  20. Windows Server 2008在密码策略里禁用复杂密码

热门文章

  1. 【XS128】Link error L1822 symbol _FADD / _FSUB/ _FDIV/ _FMUL.....错误解决的方法
  2. 优秀的基于VUE移动端UI框架合集
  3. smali注入常用代码
  4. C语言精要总结-内存地址对齐与struct大小判断篇
  5. phpcms V9 栏目管理
  6. 20190816 On Java8 第六章 初始化和清理
  7. java day19【File类、递归】
  8. js进阶 14-9 ajax事件有哪些
  9. Git提交空文件夹的技巧
  10. XAMPP浏览器输入localhost跳转localhost/dashboard/