本节目录

  1. 入门程序
  2. 消费日志topic
  3. 滑动窗口统计消费topic

1 入门程序

public class ConsumerDemo {public static void main(String[] args) {Properties props = new Properties();props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("bootstrap.servers", "h1:9092,h2:9092,h3:9092");// groupid是消费者所属的消费组的标识  ,同一个组中的消费者在消费一个topic数据时,会各自消费一部分,互不重复props.setProperty("group.id", "g1");// 消费起始偏移量自动指定,默认值是latestprops.setProperty("auto.offset.reset", "latest");// 是否要把消费者消费到的offset自动提交到kafka去保存 :__consumer_offsets/**** 消费者组在消费数据的时候,可以讲当前消费到的offset位置,自动记录到kafka中一个特别的topic中: __consumer_offsets* 这样一来,万一在某一瞬间,消费者所在机器宕机,崩溃* 那么,将消费者重启后,它能从kafka的__consumer_offsets中找到崩溃前消费到的位置,从而可以从那个地方继续往后消费,避免重复消费*/props.setProperty("enable.auto.commit", "true");// 构造一个kafka消费者客户端KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);ArrayList<String> topics = new ArrayList<>();topics.add("doit14-2");// 消费者在消费数据之前,需要先“订阅”主题consumer.subscribe(topics);while (true) {// ConsumerRecords是一个Iterable,里面有一个迭代器,所以,取数据时可以拿出迭代器iterator来迭代,也可以直接用增强for循环迭代ConsumerRecords<String, String> records = consumer.poll(200);/*Iterator<ConsumerRecord<String, String>> iter = records.iterator();while(iter.hasNext()){ConsumerRecord<String, String> rec = iter.next();String key = rec.key();String value = rec.value();}*/for (ConsumerRecord<String, String> rec : records) {String key = rec.key();String value = rec.value();System.out.println("key: " + key + "\t value: " + value);}}}
}

2 消费日志topic

/**
*   --多易教育* web用户行为日志数据统计:行为事件的发生次数*/
public class LogEventCalc {public static void main(String[] args) {Properties  = new Properties();p.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");p.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");p.setProperty("bootstrap.servers", "h1:9092,h2:9092,h3:9092");p.setProperty("group.id", "g1");p.setProperty("auto.offset.reset", "latest");p.setProperty("enable.auto.commit", "true");// 构造一个kafka消费者客户端KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);ArrayList<String> topics = new ArrayList<>();topics.add("doit14-log");// 消费者在消费数据之前,需要先“订阅”主题consumer.subscribe(topics);HashMap<String, Integer> cntMap = new HashMap<>();while(true){ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {String line = record.value();//解析这一条json数据JSONObject jsonObject = JSON.parseObject(line);// 提取行为事件类型String eventid = jsonObject.getString("eventid");// 对事件计数cntMap.put(eventid,cntMap.getOrDefault(eventid,0)+1);// 打印统计结果System.out.println(cntMap);}}}
}

3 滑动窗口统计消费topic

/*** -- 多易教育 * 滑动窗口统计* 窗口长度:10s* 滑动距离:5s*/
public class SlideWindowCalc {public static void main(String[] args) {// 构造所需的数据结构ArrayBlockingQueue<ConsumerRecord> queue = new ArrayBlockingQueue<>(1000);HashMap<String, Integer> cntMap = new HashMap<>();// 1. 构造一个线程,来持续不断地从kafka中拉取数据,放入一个阻塞队列new Thread(new Runnable() {@Overridepublic void run() {// 构造一个kafka消费者客户端Properties props = new Properties();props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("bootstrap.servers", "h1:9092,h2:9092,h3:9092");props.setProperty("auto.offset.reset", "latest");props.setProperty("group.id","aa");props.setProperty("enable.auto.commit", "true");// 构造一个kafka消费者客户端KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);// 消费者在消费数据之前,需要先“订阅”主题consumer.subscribe(Arrays.asList("doit14-window"));while(true){ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {try {// 将kafka消息用阻塞方法插入队列,如果队列已满,会一直等待可用空间的到来queue.put(record);} catch (InterruptedException e) {e.printStackTrace();}}}}}).start();// 2. 构造一个定时任务,每隔5s运行一次,从阻塞队列中取最近10s的数据进行统计Timer timer = new Timer();TimerTask task = new TimerTask(){@Overridepublic void run() {long taskStartTs = System.currentTimeMillis();// 从阻塞队列中取数据,进行统计//queue.take(); //阻塞方法,如果队列中有数据,就取到数据并返回,如果队列中没有数据,则等待数据的到来try {while(true) {ConsumerRecord<String,String> rec = queue.poll(10, TimeUnit.MILLISECONDS);if(rec != null && rec.timestamp()>taskStartTs-10000 && rec.timestamp()<taskStartTs ){String word = rec.value();cntMap.put(word,cntMap.getOrDefault(word,0)+1);}else if(rec != null && rec.timestamp()>=taskStartTs){break;}else if(System.currentTimeMillis()-taskStartTs > 2500){break;}}System.out.println("-----------------calc time : " + taskStartTs + " -------------------- ");System.out.println(cntMap);// 清空hashmapcntMap.clear();} catch (InterruptedException e) {e.printStackTrace();}}};timer.schedule(task,0,5000);}}

多易教育KAFKA实战(3)-java消费者客户端API示例代码相关推荐

  1. 多易教育KAFKA实战(2)-java生产者客户端API示例代码

    案例一  入门实例 /*** java客户端模拟生产者生产topic* topic是数据的分类主题*/ public class Producter1 {public static void main ...

  2. 多易教育KAFKA实战(4)-原理加强

    本节目录 数据可靠性 数据一致性 kafka消费者组 1 数据可靠性 Kafka 作为一个商业级消息中间件,消息可靠性的重要性可想而知.下面要探讨的角度: Producer 往 Broker 发送消息 ...

  3. 多易教育KAFKA实战(1)-KAFKA集群安装和shell客户端

    注意kafka的安装需要依赖Zookeeper集群 ,所以安装kafka之前先安装zookeeper! zookeeper安装 上传安装包 解压 tar -zxvf zookeeper-3.4.6.t ...

  4. 多易教育17期课堂笔记--Hbase---shell客户端02

    免费视频教程 https://www.51doit.com/ 或者联系博主微信 17710299606 1namespace 名称空间 ; 类似于数据库中的database alter_namespa ...

  5. java 时间api源码,时间API(示例代码)

    1. 时间API 我们的时间在java里是long类型的整数,这个整数称之为时间戳(也叫格林威治时间),即从1970-01-01到现在为止所经过的毫秒数,单有这个时间戳是不能准确表达世界各地的时间,还 ...

  6. java中的年轻态,14、Java垃圾回收机制(示例代码)

    垃圾回收原理和算法 ??Java引入了垃圾回收机制,令C++程序员最头疼的内存管理问题迎刃而解.Java程序员可以将更多的精力放到业务逻辑上面,而不是内存管量上面,大大的提高了开发效率.这是因为Jav ...

  7. java登陆密码验证失败,java用户名密码验证示例代码分享

    类:NameII    权限:public 方法:main    权限:public 参数:name,password,denglu,i; 参数介绍: name,数据类型 String ,用来存储一个 ...

  8. 堆排序java实例_堆排序(示例代码)

    前言:网上有很多堆排序的案例,我只想写自己堆排序. 一:堆结构 即:一个父节点最多只能有两个子节点(可以没有),如下图 图1        图2           图3       图4 二: 数组 ...

  9. idea java api_intellij idea怎么设置java帮助文档(示例代码)

    打开idea我引用的jar包都放在 Project Structure-->Modules-->libs文件夹(双击) 双击jar包所在文件夹,跳出对话框. 1.如果api对应的javad ...

最新文章

  1. Springboot 多文件上传
  2. 微信小程序实时聊天之WebSocket
  3. 据说这是程序员为什么改行送外卖的原因
  4. 计算机原理之程序是怎么运行的
  5. 02如何抓住重点,系统高效地学习数据结构与算法?
  6. linux虚拟机备份树莓派,为树莓派做系统备份镜像(for Linux #038; Mac),
  7. 安编译器错误_centos 安装pcre报c++编译器错误
  8. springboot 优雅停机_Spring boot 2.3优雅下线,距离生产还有多远?
  9. 基于C语言EOF与getchar()的使用详解
  10. 处理The Network Adapter could not establish the connection
  11. 安装mp4,mp3等媒体解码器
  12. Linux 将本地文件上传Linux服务器, 即ssh 命令上传本地文件
  13. MCS-51单片机指令系统总结(自学笔记)
  14. java注解(Annotation)-小羊的记录本(转)
  15. 啡鸟集咖啡报告:每天喝3-4杯咖啡有助延年益寿
  16. win10家庭版如何修改用户名对应的文件夹的名字(中文该成英文字符)
  17. 对计算机硬件和软件资源进行,网络技术应用下计算机软硬件资源共享的实现
  18. 很抱歉,EXCEL遇到错误,使其无法正常工作,因此需要关闭EXCEL。是否希望我们立即修复?...
  19. Ubuntu 下最好用的pdf阅读器okular
  20. 网络营销实验一 企业网站专业性诊断评价

热门文章

  1. 登陆mysql的命令行
  2. python画有权重网络图_Python可视化之NetworkX绘制网络图\节点关系
  3. 数据库系统-存储过程
  4. TVS管的参数理解与选型
  5. 【分享】免费的国际一级域名和100M支持asp、cgi空间
  6. python--敲击木鱼积累功德小项目(更新版(2))
  7. python监控目录变化_如何用python语言监控文件或目录变化
  8. 技术创业者必读:从验证想法到技术产品商业化的全方位解析
  9. 如何使用Kumo Java Word Cloud?
  10. TI公司TMS封装与引脚对应关系