直播人数的统计(一)
从clickhouse中查询各维度(SQL里要通过group by 维度)指标,clickhouse通过JavaAPI来实现实时展示。
命令行开启clickhouse:
Eg: Select osname, isNew,count(distinct deviced ) from tb_user_event group by isNew,channel,osname;
优化:
引入ROCKSDB(一种statebackend)的原因:如果把状态全都放入内存中,statede里的数据比较多,而且状态会实时变化,每一个taskmanager里只要一个rocksdb, 将数据以二进制的数组存入到数据库,单独的一个key或者一个value不能超过2G.作用:可以实现增量的checkpoint.
将数据写到hdfs的缺点就是一旦数据变化了要把新的全量的数据通过checkpoint写入到hdfs,如果只把变化的增量checkpoint进去会更好。
FSstatebackend是把数据存储到HDFS里面,再把数据定期持久化到磁盘里面.
merge方法只有在SessionWindow时可能会调用,其他类型的Window不会调用,实时的统计累计关众、实时在线观众 :为了写入数据减少对外部的数据库压力,我们使用窗口,将数据进行增量聚合,这样输出的数据就变少了,对数据库的压力也变少了
1)
import com.doit.live.cn.demoText.POJO.DataBean;
import com.doit.live.cn.demoText.kafka.MyKafkaDeserializationSchema;
import com.doit.live.cn.demoText.udfs.JsonToBeanWithIdFunction;
import com.doit.live.cn.demoText.utils.FlinkUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
//存在一个问题,如果来一条计算一次,并且还要输出一次,这样对外部的数据库压力比较大//怎样改进//划分滚动窗口,滚动窗口进行聚合,仅会累加当前窗口中的数据,必须要累加历史数据//keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).red
public class liveaudiencecount {public static void main(String[] args) throws Exception{DataStream<Tuple2<String, String>> kafkaStreamWithId = FlinkUtil.createKafkaStreamWithId(args[0], MyKafkaDeserializationSchema.class);SingleOutputStreamOperator<DataBean> jsonbean = kafkaStreamWithId.process(new JsonToBeanWithIdFunction());SingleOutputStreamOperator<DataBean> filterBean = jsonbean.filter(bean -> "liveEnter".equals(bean.getEventId()) || "liveLeave".equals(bean.getEventId()));KeyedStream<DataBean, String> keyedStream = filterBean.keyBy(bean -> bean.getProperties().get("anchor_id").toString());SingleOutputStreamOperator<Tuple4<String, String, Integer, Integer>> res = keyedStream.process(new AudienceCount());res.print();}
}
import com.doit.live.cn.demoText.POJO.DataBean;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class AudienceCount extends KeyedProcessFunction<String, DataBean, Tuple4<String,String,Integer,Integer>> {private transient MapState<String,Integer> audiencestate;@Overridepublic void open(Configuration parameters) throws Exception {MapStateDescriptor<String, Integer> audiencestatede = new MapStateDescriptor<>("audiencestate", String.class, Integer.class);audiencestate = getRuntimeContext().getMapState(audiencestatede);}@Overridepublic void processElement(DataBean value, Context ctx, Collector<Tuple4<String, String, Integer, Integer>> out) throws Exception {String eventId = value.getEventId();String room_id = value.getProperties().get("room_id").toString();String totalroomid= room_id+"-total";Integer totalnum= audiencestate.get(totalroomid);if(totalnum==null){totalnum=0;}String onlineNum=room_id+"-online";Integer onlinenum= audiencestate.get(onlineNum);if(onlinenum==null){onlinenum=0;}if("liveEnter".equals(eventId)){totalnum+=1;onlinenum+=1;}else if("liveLeave".equals(eventId)){onlinenum-=1;}audiencestate.put(totalroomid,totalnum);audiencestate.put(onlineNum,onlinenum);out.collect(Tuple4.of(ctx.getCurrentKey(),room_id,totalnum,onlinenum));}
}
2)优化 为什么使用aggregate是因为reduce要求输入输出类型一致必须是DataBean不够灵活
import com.doit.live.cn.demoText.POJO.DataBean;
import com.doit.live.cn.demoText.kafka.MyKafkaDeserializationSchema;
import com.doit.live.cn.demoText.udfs.JsonToBeanWithIdFunction;
import com.doit.live.cn.demoText.utils.FlinkUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class liveaudiencecount2 {public static void main(String[] args) throws Exception{DataStream<Tuple2<String, String>> kafkaStreamWithId = FlinkUtil.createKafkaStreamWithId(args[0], MyKafkaDeserializationSchema.class);SingleOutputStreamOperator<DataBean> jsonbean = kafkaStreamWithId.process(new JsonToBeanWithIdFunction());SingleOutputStreamOperator<DataBean> filterBean = jsonbean.filter(bean -> "liveEnter".equals(bean.getEventId()) || "liveLeave".equals(bean.getEventId()));KeyedStream<DataBean, String> keyedStream = filterBean.keyBy(bean -> bean.getProperties().get("anchor_id").toString());WindowedStream<DataBean, String, TimeWindow> WindowedStream= keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> res = WindowedStream.aggregate(new AudienceCount2());res.print();}}
import com.doit.live.cn.demoText.POJO.DataBean;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple3;public class AudienceCount2 implements AggregateFunction<DataBean, Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> {@Overridepublic Tuple3<String, Integer, Integer> createAccumulator() {return Tuple3.of(null,0,0);}@Overridepublic Tuple3<String, Integer, Integer> add(DataBean bean, Tuple3<String, Integer, Integer> acc) {String roomId = bean.getProperties().get("room_id").toString();acc.f0=roomId;String eventId = bean.getEventId();if("liveEnter".equals(eventId)){acc.f1+=1;acc.f2+=1;}else if("liveLeave".equals(eventId)){acc.f2-=1;}return acc;}@Overridepublic Tuple3<String, Integer, Integer> getResult(Tuple3<String, Integer, Integer> acc) {return acc;}@Overridepublic Tuple3<String, Integer, Integer> merge(Tuple3<String, Integer, Integer> stringIntegerIntegerTuple3, Tuple3<String, Integer, Integer> acc1) {return null;}
}
直播人数的统计(一)相关推荐
- python爬虫:使用selenium、unittest和BeautifulSoup爬取斗鱼tv的当前直播人数
import unittest from selenium import webdriver from bs4 import BeautifulSoup as bsclass douyu(unitte ...
- 直播APP开发:直播人数是否该有限制
2019独角兽企业重金招聘Python工程师标准>>> 谈及直播,大众率先想到的一定是一对多的秀场直播或游戏直播,直播在中国兴起以来,就是一对多的模式在发展,所以一对多.视频直播等模 ...
- OpenCV视频识别检测人数跟踪统计
Python+OpenCV视频识别检测人数跟踪统计 如需远程调试,可加QQ905733049由专业技术人员远程协助! 运行代码如下: import numpy as np import cv2 imp ...
- 大数据之直播平台数据统计
已知有以下直播平台数据 json格式 {"id":158008900435,"uid":120010010445,"nickname":&q ...
- c++ 用类统计不及格人数_统计小课堂13
Previous review: 1)回归章节小结: 这周开始计数资料统计! -------------------------------------- 当观察数据按照某种属性和类别分组后,计数得到 ...
- wow服务器人数最新统计,魔兽世界怀旧服服务器人数统计 魔兽世界怀旧服人数比例查询...
魔兽世界怀旧服服务器人数统计是游戏每个服务器玩家数量,大家想知道排队人数喔,那么魔兽世界怀旧服服务器人数统计有多少.魔兽世界怀旧服人数比例查询呢,跑跑车游戏网为大家带来介绍. *魔兽世界怀旧服服务器人 ...
- mysql查询各专业人数_SQL统计各专业学生人数
统计"学生"表中学生的总人数的sql语句是: select count(*) from student; 其中select代表查询,count(*)是统计行数量,student是学 ...
- c语言如何统计不同分数段学生人数,Excel统计不同班级各个分数段的学生人数的方法...
有很多人不了解Excel统计不同班级各个分数段的学生人数的方法,那么今天小编就在这里给大家分享一点我的小经验,希望可以给你们带来帮助. Excel统计不同班级各个分数段的学生人数的方法 如图所示,我们 ...
- VC++海康威视视频人数流量统计数据库连接
第一次使用VC++二次开发海康威视设备,记录一下,直接代码 `#include <stdio.h> #include <WinSock.h> #include <mysq ...
最新文章
- 基于锚框与无需锚框的通用物体检测算法
- 过滤驱动加密文件(代码)
- 普华永道2030汽车产业报告 私家车真正Out了!
- suse的安装命令zypper,类似apt
- selenium自动化测试多条数据选择第一条
- win7(旗舰版)下,OleLoadPicture 加载内存中的图片(MagickGetImageBlob),返回值 0
- Factory Method(工厂方法)--对象创建型模式
- IT职业就业-学长有话说(二)
- Bootstrap 工具提示插件Tooltip的方法
- 3. 机器学习中为什么需要梯度下降_梯度提升(Gradient Boosting)算法
- 附件计算器中的MC、MR、MS、M+作用
- 分布式事务及分布式系统一致性解决方案
- 有り様、状態、様子 の 区別
- Android开发 Facebook取得key-hashes
- 软件需求说明书-总务办公管理系统
- ps4微软服务器地址,PSN服务更新 终于可以在PC上玩PS4游戏了!
- 带你玩转 3D 检测和分割 (二):核心组件分析之坐标系和 Box
- 我转行程序员的那一年(八)
- 道路中心线提取、河道中心线的提取(ArcScan)
- LM7805的特殊使用方法