案例一  入门实例

/*** java客户端模拟生产者生产topic* topic是数据的分类主题*/
public class Producter1 {public static void main(String[] args) throws InterruptedException {Properties p = new Properties();p.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");p.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");p.setProperty("bootstrap.servers", "lx01:9092,lx02:9092,lx03:9092");p.setProperty("acts", "1");p.setProperty("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");// 创建一个生产者对象KafkaProducer<String, String> kp = new KafkaProducer<>(p);for (int i = 0; i < 1000; i++) {Thread.sleep(1000);//向 hang=kafka主题中发送消息   主题和消息ProducerRecord pr = new ProducerRecord("hang-kafka", "hang" + i);kp.send(pr);System.out.println("---------------hang" + i + "----------------");}kp.close();}
}

案例一 入门程序(带回调)

回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

/*** java客户端模拟生产者生产topic* topic是数据的分类主题*/
public class Producter2 {public static void main(String[] args) throws InterruptedException {Properties p = new Properties();p.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");p.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");p.setProperty("bootstrap.servers", "lx01:9092,lx02:9092,lx03:9092");p.setProperty("acts", "1");p.setProperty("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");// 创建一个生产者对象KafkaProducer<String, String> kp = new KafkaProducer<>(p);for (int i = 0; i < 1000; i++) {Thread.sleep(1000);//向 hang=kafka主题中发送消息   主题和消息ProducerRecord pr = new ProducerRecord("hang-kafka", "hang" + i);kp.send(pr, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e==null){String topic = recordMetadata.topic();long offset = recordMetadata.offset();int p = recordMetadata.partition();System.out.println(topic+"---offset:  "+offset+"---partittion: "+p);}}});}kp.close();}
}

案例二 将mysql中的数据发送到kafka

/*** 将mysql中的数据  发送到kafka中*/
public class SqlData2KAFKA {public static void main(String[] args) throws Exception {Properties p = new Properties();p.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");p.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");p.setProperty("bootstrap.servers", "lx01:9092,lx02:9092,lx03:9092");p.setProperty("acts", "1");p.setProperty("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");KafkaProducer<String, String> product = new KafkaProducer<>(p);// 获取数据库连接对象Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/db_doit15", "root", "root");PreparedStatement ps = conn.prepareStatement("select * from tb_product ");ResultSet rs = ps.executeQuery();while (rs.next()) {int id = rs.getInt("id");String name = rs.getString("name");int price = rs.getInt("price");String category = rs.getString("category");String value = id + "," + name + "," + price + "," + category;ProducerRecord<String, String> msg = new ProducerRecord<>("tb_product", value);product.send(msg);}product.close();rs.close();ps.close();conn.close();}
}

案例三  将日志中的数据发送到kafka

/*** 将日志中的数据发送到kafka中*/
public class Log2KafkKa {public static void main(String[] args) throws Exception {Properties p = new Properties();p.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");p.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");p.setProperty("bootstrap.servers", "lx01:9092,lx02:9092,lx03:9092");p.setProperty("acts", "1");p.setProperty("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");// 创建一个生产者KafkaProducer<String, String> producter = new KafkaProducer<>(p);BufferedReader br = new BufferedReader(new FileReader("D:\\data\\flow\\input\\flow.log"));String line = null;//读取数据   每行的数据为一个消息while ((line = br.readLine()) != null) {ProducerRecord<String, String> msg = new ProducerRecord<>("flow.log", line);// 生产者将消息 发送出去producter.send(msg);}// 是放资源br.close();producter.close();}
}

多易教育KAFKA实战(2)-java生产者客户端API示例代码相关推荐

  1. 多易教育KAFKA实战(3)-java消费者客户端API示例代码

    本节目录 入门程序 消费日志topic 滑动窗口统计消费topic 1 入门程序 public class ConsumerDemo {public static void main(String[] ...

  2. 多易教育KAFKA实战(4)-原理加强

    本节目录 数据可靠性 数据一致性 kafka消费者组 1 数据可靠性 Kafka 作为一个商业级消息中间件,消息可靠性的重要性可想而知.下面要探讨的角度: Producer 往 Broker 发送消息 ...

  3. 多易教育KAFKA实战(1)-KAFKA集群安装和shell客户端

    注意kafka的安装需要依赖Zookeeper集群 ,所以安装kafka之前先安装zookeeper! zookeeper安装 上传安装包 解压 tar -zxvf zookeeper-3.4.6.t ...

  4. 多易教育17期课堂笔记--Hbase---shell客户端02

    免费视频教程 https://www.51doit.com/ 或者联系博主微信 17710299606 1namespace 名称空间 ; 类似于数据库中的database alter_namespa ...

  5. java 时间api源码,时间API(示例代码)

    1. 时间API 我们的时间在java里是long类型的整数,这个整数称之为时间戳(也叫格林威治时间),即从1970-01-01到现在为止所经过的毫秒数,单有这个时间戳是不能准确表达世界各地的时间,还 ...

  6. java中的年轻态,14、Java垃圾回收机制(示例代码)

    垃圾回收原理和算法 ??Java引入了垃圾回收机制,令C++程序员最头疼的内存管理问题迎刃而解.Java程序员可以将更多的精力放到业务逻辑上面,而不是内存管量上面,大大的提高了开发效率.这是因为Jav ...

  7. java登陆密码验证失败,java用户名密码验证示例代码分享

    类:NameII    权限:public 方法:main    权限:public 参数:name,password,denglu,i; 参数介绍: name,数据类型 String ,用来存储一个 ...

  8. 堆排序java实例_堆排序(示例代码)

    前言:网上有很多堆排序的案例,我只想写自己堆排序. 一:堆结构 即:一个父节点最多只能有两个子节点(可以没有),如下图 图1        图2           图3       图4 二: 数组 ...

  9. idea java api_intellij idea怎么设置java帮助文档(示例代码)

    打开idea我引用的jar包都放在 Project Structure-->Modules-->libs文件夹(双击) 双击jar包所在文件夹,跳出对话框. 1.如果api对应的javad ...

最新文章

  1. mybatis-错误记录java.lang.ExceptionInInitializerError
  2. Labview串口通信
  3. 60. Leetcode 面试题 10.03. 搜索旋转数组 (二分查找-局部有序)
  4. python使用工具简介介绍
  5. 《Webservice的应用与开发》学习笔记 ·002【XML进阶、XML Schema】
  6. java生成excel到本地_java 将数据库中的数据导出成Excel文件 并保存到本地 将文件地址返回给前端...
  7. web前端安全机制问题全解析
  8. Android BLE开发之Android手机与BLE终端通信
  9. SameMovie HBOMax Video Downloader for Mac如何在 Mac 上下载 HBO Max 视频?
  10. node.js与npm下载及配置流程
  11. 大数据从入门到实战——MongoDB实验——数据库基本操作
  12. QCC302X/QCC303X蓝牙对讲与蓝牙扩音器
  13. np和tensor转换
  14. pdf转图片怎么清晰?
  15. WebRTC音频降噪使用
  16. 程序员如何找对象(1)
  17. linux下如何重启网卡,linux系统如何重启网卡
  18. Commons-Collections简介
  19. 查看python解释器位置
  20. 他两次都没能感动中国,不知道有没有感动我们[转载]

热门文章

  1. Java--枚举类型(枚举类型介绍 定义枚举类型 枚举类型的使用 枚举类型的注意事项 遍历枚举项)
  2. 学习Android:第一个app《hello word》
  3. 【Vue3】电商网站吸顶功能
  4. TVS 管 和TSS管
  5. 树莓派PICO问题一
  6. 为什么渗透提权这么难
  7. 金士顿固态硬盘不认盘修复_30分钟大法固态硬盘不识别自检修复三十分钟的救盘方法如下...
  8. 乘法表输出及其扩展(附带部分代码书写习惯) C++实现
  9. BSP 工程管理实验
  10. java实现年会微信签到,签到后在大屏中展示,导出签到信息