多易教育KAFKA实战(2)-java生产者客户端API示例代码
案例一 入门实例
/*** java客户端模拟生产者生产topic* topic是数据的分类主题*/
public class Producter1 {public static void main(String[] args) throws InterruptedException {Properties p = new Properties();p.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");p.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");p.setProperty("bootstrap.servers", "lx01:9092,lx02:9092,lx03:9092");p.setProperty("acts", "1");p.setProperty("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");// 创建一个生产者对象KafkaProducer<String, String> kp = new KafkaProducer<>(p);for (int i = 0; i < 1000; i++) {Thread.sleep(1000);//向 hang=kafka主题中发送消息 主题和消息ProducerRecord pr = new ProducerRecord("hang-kafka", "hang" + i);kp.send(pr);System.out.println("---------------hang" + i + "----------------");}kp.close();}
}
案例一 入门程序(带回调)
回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
/*** java客户端模拟生产者生产topic* topic是数据的分类主题*/
public class Producter2 {public static void main(String[] args) throws InterruptedException {Properties p = new Properties();p.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");p.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");p.setProperty("bootstrap.servers", "lx01:9092,lx02:9092,lx03:9092");p.setProperty("acts", "1");p.setProperty("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");// 创建一个生产者对象KafkaProducer<String, String> kp = new KafkaProducer<>(p);for (int i = 0; i < 1000; i++) {Thread.sleep(1000);//向 hang=kafka主题中发送消息 主题和消息ProducerRecord pr = new ProducerRecord("hang-kafka", "hang" + i);kp.send(pr, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e==null){String topic = recordMetadata.topic();long offset = recordMetadata.offset();int p = recordMetadata.partition();System.out.println(topic+"---offset: "+offset+"---partittion: "+p);}}});}kp.close();}
}
案例二 将mysql中的数据发送到kafka
/*** 将mysql中的数据 发送到kafka中*/
public class SqlData2KAFKA {public static void main(String[] args) throws Exception {Properties p = new Properties();p.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");p.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");p.setProperty("bootstrap.servers", "lx01:9092,lx02:9092,lx03:9092");p.setProperty("acts", "1");p.setProperty("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");KafkaProducer<String, String> product = new KafkaProducer<>(p);// 获取数据库连接对象Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/db_doit15", "root", "root");PreparedStatement ps = conn.prepareStatement("select * from tb_product ");ResultSet rs = ps.executeQuery();while (rs.next()) {int id = rs.getInt("id");String name = rs.getString("name");int price = rs.getInt("price");String category = rs.getString("category");String value = id + "," + name + "," + price + "," + category;ProducerRecord<String, String> msg = new ProducerRecord<>("tb_product", value);product.send(msg);}product.close();rs.close();ps.close();conn.close();}
}
案例三 将日志中的数据发送到kafka
/*** 将日志中的数据发送到kafka中*/
public class Log2KafkKa {public static void main(String[] args) throws Exception {Properties p = new Properties();p.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");p.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");p.setProperty("bootstrap.servers", "lx01:9092,lx02:9092,lx03:9092");p.setProperty("acts", "1");p.setProperty("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");// 创建一个生产者KafkaProducer<String, String> producter = new KafkaProducer<>(p);BufferedReader br = new BufferedReader(new FileReader("D:\\data\\flow\\input\\flow.log"));String line = null;//读取数据 每行的数据为一个消息while ((line = br.readLine()) != null) {ProducerRecord<String, String> msg = new ProducerRecord<>("flow.log", line);// 生产者将消息 发送出去producter.send(msg);}// 是放资源br.close();producter.close();}
}
多易教育KAFKA实战(2)-java生产者客户端API示例代码相关推荐
- 多易教育KAFKA实战(3)-java消费者客户端API示例代码
本节目录 入门程序 消费日志topic 滑动窗口统计消费topic 1 入门程序 public class ConsumerDemo {public static void main(String[] ...
- 多易教育KAFKA实战(4)-原理加强
本节目录 数据可靠性 数据一致性 kafka消费者组 1 数据可靠性 Kafka 作为一个商业级消息中间件,消息可靠性的重要性可想而知.下面要探讨的角度: Producer 往 Broker 发送消息 ...
- 多易教育KAFKA实战(1)-KAFKA集群安装和shell客户端
注意kafka的安装需要依赖Zookeeper集群 ,所以安装kafka之前先安装zookeeper! zookeeper安装 上传安装包 解压 tar -zxvf zookeeper-3.4.6.t ...
- 多易教育17期课堂笔记--Hbase---shell客户端02
免费视频教程 https://www.51doit.com/ 或者联系博主微信 17710299606 1namespace 名称空间 ; 类似于数据库中的database alter_namespa ...
- java 时间api源码,时间API(示例代码)
1. 时间API 我们的时间在java里是long类型的整数,这个整数称之为时间戳(也叫格林威治时间),即从1970-01-01到现在为止所经过的毫秒数,单有这个时间戳是不能准确表达世界各地的时间,还 ...
- java中的年轻态,14、Java垃圾回收机制(示例代码)
垃圾回收原理和算法 ??Java引入了垃圾回收机制,令C++程序员最头疼的内存管理问题迎刃而解.Java程序员可以将更多的精力放到业务逻辑上面,而不是内存管量上面,大大的提高了开发效率.这是因为Jav ...
- java登陆密码验证失败,java用户名密码验证示例代码分享
类:NameII 权限:public 方法:main 权限:public 参数:name,password,denglu,i; 参数介绍: name,数据类型 String ,用来存储一个 ...
- 堆排序java实例_堆排序(示例代码)
前言:网上有很多堆排序的案例,我只想写自己堆排序. 一:堆结构 即:一个父节点最多只能有两个子节点(可以没有),如下图 图1 图2 图3 图4 二: 数组 ...
- idea java api_intellij idea怎么设置java帮助文档(示例代码)
打开idea我引用的jar包都放在 Project Structure-->Modules-->libs文件夹(双击) 双击jar包所在文件夹,跳出对话框. 1.如果api对应的javad ...
最新文章
- mybatis-错误记录java.lang.ExceptionInInitializerError
- Labview串口通信
- 60. Leetcode 面试题 10.03. 搜索旋转数组 (二分查找-局部有序)
- python使用工具简介介绍
- 《Webservice的应用与开发》学习笔记 ·002【XML进阶、XML Schema】
- java生成excel到本地_java 将数据库中的数据导出成Excel文件 并保存到本地 将文件地址返回给前端...
- web前端安全机制问题全解析
- Android BLE开发之Android手机与BLE终端通信
- SameMovie HBOMax Video Downloader for Mac如何在 Mac 上下载 HBO Max 视频?
- node.js与npm下载及配置流程
- 大数据从入门到实战——MongoDB实验——数据库基本操作
- QCC302X/QCC303X蓝牙对讲与蓝牙扩音器
- np和tensor转换
- pdf转图片怎么清晰?
- WebRTC音频降噪使用
- 程序员如何找对象(1)
- linux下如何重启网卡,linux系统如何重启网卡
- Commons-Collections简介
- 查看python解释器位置
- 他两次都没能感动中国,不知道有没有感动我们[转载]