057 Java中kafka的Producer程序实现
1.需要启动的服务
这里启动的端口是9092。
bin/kafka-console-consumer.sh --topic beifeng --zookeeper linux-hadoop01.ibeifeng.com:2181/kafka
2.producer的程序
1 package com.jun.it; 2 import kafka.javaapi.producer.Producer; 3 import kafka.producer.KeyedMessage; 4 import kafka.producer.ProducerConfig; 5 import java.util.Properties; 6 import java.util.Random; 7 import java.util.concurrent.atomic.AtomicBoolean; 8 public class JavaKafkaProducer { 9 public static final char[] chars = "qazwsxedcrfvtgbyhnujmikolp0123456789".toCharArray(); 10 public static final int charsLength = chars.length; 11 public static final Random random = new Random(System.currentTimeMillis()); 12 private Producer<String, String> producer = null; 13 14 private String topicName = null; 15 private String brokerList = null; 16 private boolean isSync = true; // 默认为同步 17 18 /** 19 * 构造函数 20 * 21 * @param topicName 22 * @param brokerList 23 */ 24 public JavaKafkaProducer(String topicName, String brokerList) { 25 this(topicName, brokerList, true); 26 } 27 28 /** 29 * 构造函数,主要是产生producer 30 * 31 * @param topicName 32 * @param brokerList 33 * @param isSync 34 */ 35 public JavaKafkaProducer(String topicName, String brokerList, boolean isSync) { 36 // 赋值 37 this.topicName = topicName; 38 this.brokerList = brokerList; 39 this.isSync = isSync; 40 41 // 1. 给定配置信息:参考http://kafka.apache.org/082/documentation.html#producerconfigs 42 Properties props = new Properties(); 43 // kafka集群的连接信息 44 props.put("metadata.broker.list", this.brokerList); 45 // kafka发送数据方式 46 if (this.isSync) { 47 // 同步发送数据 48 props.put("producer.type", "sync"); 49 } else { 50 // 异步发送数据 51 props.put("producer.type", "async"); 52 /** 53 * 0: 不等待broker的返回 54 * 1: 表示至少等待1个broker返回结果 55 * -1:表示等待所有broker返回数据接收成功的结果 56 */ 57 props.put("request.required.acks", "0"); 58 } 59 // key/value数据序列化的类 60 /** 61 * 默认是:DefaultEncoder, 指发送的数据类型是byte类型 62 * 如果发送数据是string类型,必须更改StringEncoder 63 */ 64 props.put("serializer.class", "kafka.serializer.StringEncoder"); 65 66 // 2. 构建Kafka的Producer Configuration上下文 67 ProducerConfig config = new ProducerConfig(props); 68 69 // 3. 构建Kafka的生产者:Producerr 70 this.producer = new Producer<String, String>(config); 71 } 72 73 /** 74 * 关闭producer连接 75 */ 76 public void closeProducer() { 77 producer.close(); 78 } 79 80 /** 81 * 提供给外部应用调用的直接运行发送数据代码的方法 82 * 83 * @param threadNumbers 84 * @param isRunning 85 */ 86 public void run(int threadNumbers, final AtomicBoolean isRunning) { 87 for (int i = 0; i < threadNumbers; i++) { 88 new Thread(new Runnable() { 89 public void run() { 90 int count = 0; 91 while (isRunning.get()) { 92 // 只有在运行状态的情况下,才发送数据 93 KeyedMessage<String, String> message = generateMessage(); 94 // 发送数据 95 producer.send(message); 96 count++; 97 // 打印一下 98 if (count % 100 == 0) { 99 System.out.println("Count = " + count + "; message:" + message); 100 } 101 102 // 假设需要休息一下 103 try { 104 Thread.sleep(random.nextInt(100) + 10); 105 } catch (InterruptedException e) { 106 // nothings 107 } 108 } 109 System.out.println("Thread:" + Thread.currentThread().getName() + " send message count is:" + count); 110 } 111 }).start(); 112 } 113 } 114 115 /** 116 * 产生一个随机的Kafka的KeyedMessage对象 117 * 118 * @return 119 */ 120 public KeyedMessage<String, String> generateMessage() { 121 String key = generateString(3) + "_" + random.nextInt(10); 122 StringBuilder sb = new StringBuilder(); 123 int numWords = random.nextInt(5) + 1; // [1,5]单词 124 for (int i = 0; i < numWords; i++) { 125 String word = generateString(random.nextInt(5) + 1); // 单词中字符最少1个最多5个 126 sb.append(word).append(" "); 127 } 128 String message = sb.toString().trim(); 129 return new KeyedMessage(this.topicName, key, message); 130 } 131 132 /** 133 * 随机生产一个给定长度的字符串 134 * 135 * @param numItems 136 * @return 137 */ 138 public static String generateString(int numItems) { 139 StringBuilder sb = new StringBuilder(); 140 for (int i = 0; i < numItems; i++) { 141 sb.append(chars[random.nextInt(charsLength)]); 142 } 143 return sb.toString(); 144 } 145 }
3.测试类
1 package com.jun.it; 2 3 import java.util.concurrent.atomic.AtomicBoolean; 4 5 public class JavaKafkaProducerTest { 6 public static void main(String[] args) { 7 String topicName = "beifeng"; 8 String brokerList = "linux-hadoop01.ibeifeng.com:9092,linux-hadoop01.ibeifeng.com:9093"; 9 int threadNums = 10; 10 AtomicBoolean isRunning = new AtomicBoolean(true); 11 JavaKafkaProducer producer = new JavaKafkaProducer(topicName, brokerList); 12 producer.run(threadNums, isRunning); 13 14 // 停留60秒后,进行关闭操作 15 try { 16 Thread.sleep(1000); 17 } catch (InterruptedException e) { 18 // nothings 19 } 20 isRunning.set(false); 21 22 // 关闭连接 23 producer.closeProducer(); 24 } 25 }
4.效果
二:使用自定义的分区器
1.分区器
1 package com.jun.it; 2 3 import kafka.producer.Partitioner; 4 import kafka.utils.VerifiableProperties; 5 6 public class JavaKafkaPartitioner implements Partitioner { 7 /** 8 * 默认无参构造函数 9 */ 10 public JavaKafkaPartitioner() { 11 this(new VerifiableProperties()); 12 } 13 14 /** 15 * 该构造函数必须给定 16 * 17 * @param properties 初始化producer的时候给定的配置信息 18 */ 19 public JavaKafkaPartitioner(VerifiableProperties properties) { 20 // nothings 21 } 22 23 @Override 24 public int partition(Object key, int numPartitions) { 25 String tmp = (String) key; 26 int index = tmp.lastIndexOf('_'); 27 int number = Integer.valueOf(tmp.substring(index + 1)); 28 return number % numPartitions; 29 } 30 }
2.producer类重新修改
1 package com.jun.it; 2 import kafka.javaapi.producer.Producer; 3 import kafka.producer.KeyedMessage; 4 import kafka.producer.ProducerConfig; 5 import java.util.Properties; 6 import java.util.Random; 7 import java.util.concurrent.atomic.AtomicBoolean; 8 public class JavaKafkaProducer { 9 public static final char[] chars = "qazwsxedcrfvtgbyhnujmikolp0123456789".toCharArray(); 10 public static final int charsLength = chars.length; 11 public static final Random random = new Random(System.currentTimeMillis()); 12 private Producer<String, String> producer = null; 13 14 private String topicName = null; 15 private String brokerList = null; 16 private boolean isSync = true; // 默认为同步 17 private String partitionerClass = null; // 数据分区器class类 18 19 /** 20 * 构造函数 21 * 22 * @param topicName 23 * @param brokerList 24 */ 25 public JavaKafkaProducer(String topicName, String brokerList) { 26 this(topicName, brokerList, true, null); 27 } 28 29 /** 30 * 构造函数 31 * 32 * @param topicName 33 * @param brokerList 34 * @param partitionerClass 35 */ 36 public JavaKafkaProducer(String topicName, String brokerList, String partitionerClass) { 37 this(topicName, brokerList, true, partitionerClass); 38 } 39 40 /** 41 * 构造函数,主要是产生producer 42 * 43 * @param topicName 44 * @param brokerList 45 * @param isSync 46 */ 47 public JavaKafkaProducer(String topicName, String brokerList, boolean isSync, String partitionerClass) { 48 // 赋值 49 this.topicName = topicName; 50 this.brokerList = brokerList; 51 this.isSync = isSync; 52 this.partitionerClass = partitionerClass; 53 54 // 1. 给定配置信息:参考http://kafka.apache.org/082/documentation.html#producerconfigs 55 Properties props = new Properties(); 56 // kafka集群的连接信息 57 props.put("metadata.broker.list", this.brokerList); 58 // kafka发送数据方式 59 if (this.isSync) { 60 // 同步发送数据 61 props.put("producer.type", "sync"); 62 } else { 63 // 异步发送数据 64 props.put("producer.type", "async"); 65 /** 66 * 0: 不等待broker的返回 67 * 1: 表示至少等待1个broker返回结果 68 * -1:表示等待所有broker返回数据接收成功的结果 69 */ 70 props.put("request.required.acks", "0"); 71 } 72 // key/value数据序列化的类 73 /** 74 * 默认是:DefaultEncoder, 指发送的数据类型是byte类型 75 * 如果发送数据是string类型,必须更改StringEncoder 76 */ 77 props.put("serializer.class", "kafka.serializer.StringEncoder"); 78 79 // 给定分区器的class参数 80 if (this.partitionerClass != null && !this.partitionerClass.trim().isEmpty()) { 81 // 默认是:DefaultPartiioner,基于key的hashCode进行hash后进行分区 82 props.put("partitioner.class", this.partitionerClass.trim()); 83 } 84 85 // 2. 构建Kafka的Producer Configuration上下文 86 ProducerConfig config = new ProducerConfig(props); 87 88 // 3. 构建Kafka的生产者:Producerr 89 this.producer = new Producer<String, String>(config); 90 } 91 92 /** 93 * 关闭producer连接 94 */ 95 public void closeProducer() { 96 producer.close(); 97 } 98 99 /** 100 * 提供给外部应用调用的直接运行发送数据代码的方法 101 * 102 * @param threadNumbers 103 * @param isRunning 104 */ 105 public void run(int threadNumbers, final AtomicBoolean isRunning) { 106 for (int i = 0; i < threadNumbers; i++) { 107 new Thread(new Runnable() { 108 public void run() { 109 int count = 0; 110 while (isRunning.get()) { 111 // 只有在运行状态的情况下,才发送数据 112 KeyedMessage<String, String> message = generateMessage(); 113 // 发送数据 114 producer.send(message); 115 count++; 116 // 打印一下 117 if (count % 100 == 0) { 118 System.out.println("Count = " + count + "; message:" + message); 119 } 120 121 // 假设需要休息一下 122 try { 123 Thread.sleep(random.nextInt(100) + 10); 124 } catch (InterruptedException e) { 125 // nothings 126 } 127 } 128 System.out.println("Thread:" + Thread.currentThread().getName() + " send message count is:" + count); 129 } 130 }).start(); 131 } 132 } 133 134 /** 135 * 产生一个随机的Kafka的KeyedMessage对象 136 * 137 * @return 138 */ 139 public KeyedMessage<String, String> generateMessage() { 140 String key = generateString(3) + "_" + random.nextInt(10); 141 StringBuilder sb = new StringBuilder(); 142 int numWords = random.nextInt(5) + 1; // [1,5]单词 143 for (int i = 0; i < numWords; i++) { 144 String word = generateString(random.nextInt(5) + 1); // 单词中字符最少1个最多5个 145 sb.append(word).append(" "); 146 } 147 String message = sb.toString().trim(); 148 return new KeyedMessage(this.topicName, key, message); 149 } 150 151 /** 152 * 随机生产一个给定长度的字符串 153 * 154 * @param numItems 155 * @return 156 */ 157 public static String generateString(int numItems) { 158 StringBuilder sb = new StringBuilder(); 159 for (int i = 0; i < numItems; i++) { 160 sb.append(chars[random.nextInt(charsLength)]); 161 } 162 return sb.toString(); 163 } 164 }
3.测试类
1 package com.jun.it; 2 3 import java.util.concurrent.atomic.AtomicBoolean; 4 5 public class JavaKafkaProducerTest { 6 public static void main(String[] args) { 7 String topicName = "beifeng"; 8 String brokerList = "linux-hadoop01.ibeifeng.com:9092,linux-hadoop01.ibeifeng.com:9093"; 9 String partitionerClass = "com.jun.it.JavaKafkaPartitioner"; 10 int threadNums = 10; 11 AtomicBoolean isRunning = new AtomicBoolean(true); 12 JavaKafkaProducer producer = new JavaKafkaProducer(topicName, brokerList,partitionerClass); 13 producer.run(threadNums, isRunning); 14 15 // 停留60秒后,进行关闭操作 16 try { 17 Thread.sleep(1000); 18 } catch (InterruptedException e) { 19 // nothings 20 } 21 isRunning.set(false); 22 23 // 关闭连接 24 producer.closeProducer(); 25 } 26 }
4.效果
转载于:https://www.cnblogs.com/juncaoit/p/9426106.html
057 Java中kafka的Producer程序实现相关推荐
- java连接kafka api_Kafka-JavaAPI(Producer And Consumer)
Kafka--JAVA API(Producer和Consumer) Kafka 版本2.11-0.9.0.0 producer package com.yzy.spark.kafka; import ...
- java中子类继承父类程序执行顺序问题
为什么80%的码农都做不了架构师?>>> Java中,new一个类的对象,类里面的静态代码块.非静态代码.无参构造方法.有参构造方法.类的一般方法等部分,它们的执行顺序相对来说 ...
- java中如何运行小程序_一起学java(一)——运行第一个小程序
接下来的一段时间内会更新一起学java系列,喜欢的关注一下我吧.微信公众号:什么都不懂的大佬:初学,有错误的地方请大家多多指教. ---------------分割线-------------- 一. ...
- java中的基本小程序_12个用Java编写基础小程序经典案例(收藏篇)
原标题:12个用Java编写基础小程序&经典案例(收藏篇) 如果是刚接触或者刚学习java,练习一些基础的算法还是必须的,可以提升思维和语法的使用. 1.输出两个int数中的最大值 impor ...
- java中画房子的程序_房子用java绘图(控制台)
我必须制作一个程序,要求用户输入高度和长度(均为偶数),然后程序将绘制一个房子.房子的屋顶是(宽度/ 2)行数.示例程序应如下所示: Enter height and width of the hou ...
- java中编写单选按钮的程序_java的单选按钮GUI程序设计
单选按钮在GUI程序设计中的概念来自于老式的电子管汽车收音机的机械按钮:当我们按下一个按钮时,其它的按钮就会弹起.因此它允许我们强制从众多选择中作出单一选择. AWT没有单独的描述单选钮的类;取而代之 ...
- android调用python框架_在Java中从Android应用程序执行Python脚本?
我正试图找到一种在Android中从Java代码执行Python脚本的方法.我对这个问题做了一个研究,但我发现的唯一问题是,如何在APK for android(Kivy e.t.c.)中转换pyth ...
- java中foreach用法_java程序中foreach用法示例
语法 for (Object objectname : preArrayList(一个Object对象的列表)) {} 示例 package com.kuaff.jdk5; import java.u ...
- iw在java中什么意思,请问程序里piw和iw有什么区别
PIW是指模拟量输入信号,IW是数字开关量 回答者: 石头捡到布 - 毕业实践员  第4级 2010-01-28 14:29:32 个人理解:对于没有相应的映像缓冲区的 ...
最新文章
- python selenium xpath_python+selenium十四:xpath和contains模糊匹配
- Javaweb异常提示信息统一处理
- 使用 VirtualBox + Vagrant + 宝塔 Linux 面板搭建本地虚拟开发环境
- c++语言常量,Go语言常量和const关键字
- SecureCRT中使用 rz 上传文件 遇到 rz: command not found 的解决办法
- wxpython中文教程_wxPython中文教程 简单入门加实例
- centos6.5 安装mysql5.6_centos6.5 安装mysql5.6
- HadoopLearning
- 从物理页面的争抢看linux内核内存管理
- Redis开发与运维 笔记一
- 架构方案(9) 如何构建一套高可用的 APP 消息推送平台
- 为你的域名添加子域名(二级域名)并绑定网站
- idea提交项目出现push rejected
- 指针使用入门与 unsafe.Pointer
- joint_state_publisher_gui
- 分步骤详细解说:H5性能优化方案
- 简单解释一下一个项目中的pojo模块
- 创意人像海报故障艺术海报教程故障艺术海报怎么做
- android编辑框最大字数,(转)Android中EditText的输入字数限制
- ssm+mysql+养老院信息管理系统 毕业设计-附源码181550
热门文章
- 查看Mysql实时执行的Sql语句
- 条款34:区分接口继承和实现继承(Different between inheritance of interface and inheritance of implemenation)...
- C#使用System.Data.SQLite操作SQLite
- tnsnames.ora 监听配置文件详解
- CodeProject - 在C#使用SHGetFileInfo获取(管理)文件或者文件夹图标(C#封装Win32函数的一个例子)...
- 自写函数的防抖和节流
- react-native相机
- MySQL8.0.25命令行安装与配置
- 如何向Word中插入代码块
- 美股涨跌幅限制是多少?