多易教育KAFKA实战(3)-java消费者客户端API示例代码
本节目录
- 入门程序
- 消费日志topic
- 滑动窗口统计消费topic
1 入门程序
public class ConsumerDemo {public static void main(String[] args) {Properties props = new Properties();props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("bootstrap.servers", "h1:9092,h2:9092,h3:9092");// groupid是消费者所属的消费组的标识 ,同一个组中的消费者在消费一个topic数据时,会各自消费一部分,互不重复props.setProperty("group.id", "g1");// 消费起始偏移量自动指定,默认值是latestprops.setProperty("auto.offset.reset", "latest");// 是否要把消费者消费到的offset自动提交到kafka去保存 :__consumer_offsets/**** 消费者组在消费数据的时候,可以讲当前消费到的offset位置,自动记录到kafka中一个特别的topic中: __consumer_offsets* 这样一来,万一在某一瞬间,消费者所在机器宕机,崩溃* 那么,将消费者重启后,它能从kafka的__consumer_offsets中找到崩溃前消费到的位置,从而可以从那个地方继续往后消费,避免重复消费*/props.setProperty("enable.auto.commit", "true");// 构造一个kafka消费者客户端KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);ArrayList<String> topics = new ArrayList<>();topics.add("doit14-2");// 消费者在消费数据之前,需要先“订阅”主题consumer.subscribe(topics);while (true) {// ConsumerRecords是一个Iterable,里面有一个迭代器,所以,取数据时可以拿出迭代器iterator来迭代,也可以直接用增强for循环迭代ConsumerRecords<String, String> records = consumer.poll(200);/*Iterator<ConsumerRecord<String, String>> iter = records.iterator();while(iter.hasNext()){ConsumerRecord<String, String> rec = iter.next();String key = rec.key();String value = rec.value();}*/for (ConsumerRecord<String, String> rec : records) {String key = rec.key();String value = rec.value();System.out.println("key: " + key + "\t value: " + value);}}}
}
2 消费日志topic
/**
* --多易教育* web用户行为日志数据统计:行为事件的发生次数*/
public class LogEventCalc {public static void main(String[] args) {Properties = new Properties();p.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");p.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");p.setProperty("bootstrap.servers", "h1:9092,h2:9092,h3:9092");p.setProperty("group.id", "g1");p.setProperty("auto.offset.reset", "latest");p.setProperty("enable.auto.commit", "true");// 构造一个kafka消费者客户端KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);ArrayList<String> topics = new ArrayList<>();topics.add("doit14-log");// 消费者在消费数据之前,需要先“订阅”主题consumer.subscribe(topics);HashMap<String, Integer> cntMap = new HashMap<>();while(true){ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {String line = record.value();//解析这一条json数据JSONObject jsonObject = JSON.parseObject(line);// 提取行为事件类型String eventid = jsonObject.getString("eventid");// 对事件计数cntMap.put(eventid,cntMap.getOrDefault(eventid,0)+1);// 打印统计结果System.out.println(cntMap);}}}
}
3 滑动窗口统计消费topic
/*** -- 多易教育 * 滑动窗口统计* 窗口长度:10s* 滑动距离:5s*/
public class SlideWindowCalc {public static void main(String[] args) {// 构造所需的数据结构ArrayBlockingQueue<ConsumerRecord> queue = new ArrayBlockingQueue<>(1000);HashMap<String, Integer> cntMap = new HashMap<>();// 1. 构造一个线程,来持续不断地从kafka中拉取数据,放入一个阻塞队列new Thread(new Runnable() {@Overridepublic void run() {// 构造一个kafka消费者客户端Properties props = new Properties();props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("bootstrap.servers", "h1:9092,h2:9092,h3:9092");props.setProperty("auto.offset.reset", "latest");props.setProperty("group.id","aa");props.setProperty("enable.auto.commit", "true");// 构造一个kafka消费者客户端KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);// 消费者在消费数据之前,需要先“订阅”主题consumer.subscribe(Arrays.asList("doit14-window"));while(true){ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {try {// 将kafka消息用阻塞方法插入队列,如果队列已满,会一直等待可用空间的到来queue.put(record);} catch (InterruptedException e) {e.printStackTrace();}}}}}).start();// 2. 构造一个定时任务,每隔5s运行一次,从阻塞队列中取最近10s的数据进行统计Timer timer = new Timer();TimerTask task = new TimerTask(){@Overridepublic void run() {long taskStartTs = System.currentTimeMillis();// 从阻塞队列中取数据,进行统计//queue.take(); //阻塞方法,如果队列中有数据,就取到数据并返回,如果队列中没有数据,则等待数据的到来try {while(true) {ConsumerRecord<String,String> rec = queue.poll(10, TimeUnit.MILLISECONDS);if(rec != null && rec.timestamp()>taskStartTs-10000 && rec.timestamp()<taskStartTs ){String word = rec.value();cntMap.put(word,cntMap.getOrDefault(word,0)+1);}else if(rec != null && rec.timestamp()>=taskStartTs){break;}else if(System.currentTimeMillis()-taskStartTs > 2500){break;}}System.out.println("-----------------calc time : " + taskStartTs + " -------------------- ");System.out.println(cntMap);// 清空hashmapcntMap.clear();} catch (InterruptedException e) {e.printStackTrace();}}};timer.schedule(task,0,5000);}}
多易教育KAFKA实战(3)-java消费者客户端API示例代码相关推荐
- 多易教育KAFKA实战(2)-java生产者客户端API示例代码
案例一 入门实例 /*** java客户端模拟生产者生产topic* topic是数据的分类主题*/ public class Producter1 {public static void main ...
- 多易教育KAFKA实战(4)-原理加强
本节目录 数据可靠性 数据一致性 kafka消费者组 1 数据可靠性 Kafka 作为一个商业级消息中间件,消息可靠性的重要性可想而知.下面要探讨的角度: Producer 往 Broker 发送消息 ...
- 多易教育KAFKA实战(1)-KAFKA集群安装和shell客户端
注意kafka的安装需要依赖Zookeeper集群 ,所以安装kafka之前先安装zookeeper! zookeeper安装 上传安装包 解压 tar -zxvf zookeeper-3.4.6.t ...
- 多易教育17期课堂笔记--Hbase---shell客户端02
免费视频教程 https://www.51doit.com/ 或者联系博主微信 17710299606 1namespace 名称空间 ; 类似于数据库中的database alter_namespa ...
- java 时间api源码,时间API(示例代码)
1. 时间API 我们的时间在java里是long类型的整数,这个整数称之为时间戳(也叫格林威治时间),即从1970-01-01到现在为止所经过的毫秒数,单有这个时间戳是不能准确表达世界各地的时间,还 ...
- java中的年轻态,14、Java垃圾回收机制(示例代码)
垃圾回收原理和算法 ??Java引入了垃圾回收机制,令C++程序员最头疼的内存管理问题迎刃而解.Java程序员可以将更多的精力放到业务逻辑上面,而不是内存管量上面,大大的提高了开发效率.这是因为Jav ...
- java登陆密码验证失败,java用户名密码验证示例代码分享
类:NameII 权限:public 方法:main 权限:public 参数:name,password,denglu,i; 参数介绍: name,数据类型 String ,用来存储一个 ...
- 堆排序java实例_堆排序(示例代码)
前言:网上有很多堆排序的案例,我只想写自己堆排序. 一:堆结构 即:一个父节点最多只能有两个子节点(可以没有),如下图 图1 图2 图3 图4 二: 数组 ...
- idea java api_intellij idea怎么设置java帮助文档(示例代码)
打开idea我引用的jar包都放在 Project Structure-->Modules-->libs文件夹(双击) 双击jar包所在文件夹,跳出对话框. 1.如果api对应的javad ...
最新文章
- Springboot 多文件上传
- 微信小程序实时聊天之WebSocket
- 据说这是程序员为什么改行送外卖的原因
- 计算机原理之程序是怎么运行的
- 02如何抓住重点,系统高效地学习数据结构与算法?
- linux虚拟机备份树莓派,为树莓派做系统备份镜像(for Linux #038; Mac),
- 安编译器错误_centos 安装pcre报c++编译器错误
- springboot 优雅停机_Spring boot 2.3优雅下线,距离生产还有多远?
- 基于C语言EOF与getchar()的使用详解
- 处理The Network Adapter could not establish the connection
- 安装mp4,mp3等媒体解码器
- Linux 将本地文件上传Linux服务器, 即ssh 命令上传本地文件
- MCS-51单片机指令系统总结(自学笔记)
- java注解(Annotation)-小羊的记录本(转)
- 啡鸟集咖啡报告:每天喝3-4杯咖啡有助延年益寿
- win10家庭版如何修改用户名对应的文件夹的名字(中文该成英文字符)
- 对计算机硬件和软件资源进行,网络技术应用下计算机软硬件资源共享的实现
- 很抱歉,EXCEL遇到错误,使其无法正常工作,因此需要关闭EXCEL。是否希望我们立即修复?...
- Ubuntu 下最好用的pdf阅读器okular
- 网络营销实验一 企业网站专业性诊断评价