从clickhouse中查询各维度(SQL里要通过group by 维度)指标,clickhouse通过JavaAPI来实现实时展示。
命令行开启clickhouse:
Eg: Select osname, isNew,count(distinct deviced ) from tb_user_event group by isNew,channel,osname;
优化:

引入ROCKSDB(一种statebackend)的原因:如果把状态全都放入内存中,statede里的数据比较多,而且状态会实时变化,每一个taskmanager里只要一个rocksdb, 将数据以二进制的数组存入到数据库,单独的一个key或者一个value不能超过2G.作用:可以实现增量的checkpoint.
将数据写到hdfs的缺点就是一旦数据变化了要把新的全量的数据通过checkpoint写入到hdfs,如果只把变化的增量checkpoint进去会更好。

FSstatebackend是把数据存储到HDFS里面,再把数据定期持久化到磁盘里面.
merge方法只有在SessionWindow时可能会调用,其他类型的Window不会调用,实时的统计累计关众、实时在线观众 :为了写入数据减少对外部的数据库压力,我们使用窗口,将数据进行增量聚合,这样输出的数据就变少了,对数据库的压力也变少了
1)

import com.doit.live.cn.demoText.POJO.DataBean;
import com.doit.live.cn.demoText.kafka.MyKafkaDeserializationSchema;
import com.doit.live.cn.demoText.udfs.JsonToBeanWithIdFunction;
import com.doit.live.cn.demoText.utils.FlinkUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
//存在一个问题,如果来一条计算一次,并且还要输出一次,这样对外部的数据库压力比较大//怎样改进//划分滚动窗口,滚动窗口进行聚合,仅会累加当前窗口中的数据,必须要累加历史数据//keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).red
public class liveaudiencecount {public static void main(String[] args) throws  Exception{DataStream<Tuple2<String, String>> kafkaStreamWithId = FlinkUtil.createKafkaStreamWithId(args[0], MyKafkaDeserializationSchema.class);SingleOutputStreamOperator<DataBean> jsonbean = kafkaStreamWithId.process(new JsonToBeanWithIdFunction());SingleOutputStreamOperator<DataBean> filterBean = jsonbean.filter(bean -> "liveEnter".equals(bean.getEventId()) || "liveLeave".equals(bean.getEventId()));KeyedStream<DataBean, String> keyedStream = filterBean.keyBy(bean -> bean.getProperties().get("anchor_id").toString());SingleOutputStreamOperator<Tuple4<String, String, Integer, Integer>> res = keyedStream.process(new AudienceCount());res.print();}
}
import com.doit.live.cn.demoText.POJO.DataBean;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class AudienceCount  extends KeyedProcessFunction<String, DataBean, Tuple4<String,String,Integer,Integer>> {private transient MapState<String,Integer> audiencestate;@Overridepublic void open(Configuration parameters) throws Exception {MapStateDescriptor<String, Integer> audiencestatede = new MapStateDescriptor<>("audiencestate", String.class, Integer.class);audiencestate = getRuntimeContext().getMapState(audiencestatede);}@Overridepublic void processElement(DataBean value, Context ctx, Collector<Tuple4<String, String, Integer, Integer>> out) throws Exception {String eventId = value.getEventId();String room_id = value.getProperties().get("room_id").toString();String totalroomid= room_id+"-total";Integer totalnum= audiencestate.get(totalroomid);if(totalnum==null){totalnum=0;}String onlineNum=room_id+"-online";Integer onlinenum= audiencestate.get(onlineNum);if(onlinenum==null){onlinenum=0;}if("liveEnter".equals(eventId)){totalnum+=1;onlinenum+=1;}else if("liveLeave".equals(eventId)){onlinenum-=1;}audiencestate.put(totalroomid,totalnum);audiencestate.put(onlineNum,onlinenum);out.collect(Tuple4.of(ctx.getCurrentKey(),room_id,totalnum,onlinenum));}
}

2)优化 为什么使用aggregate是因为reduce要求输入输出类型一致必须是DataBean不够灵活

import com.doit.live.cn.demoText.POJO.DataBean;
import com.doit.live.cn.demoText.kafka.MyKafkaDeserializationSchema;
import com.doit.live.cn.demoText.udfs.JsonToBeanWithIdFunction;
import com.doit.live.cn.demoText.utils.FlinkUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class liveaudiencecount2  {public static void main(String[] args) throws Exception{DataStream<Tuple2<String, String>> kafkaStreamWithId = FlinkUtil.createKafkaStreamWithId(args[0], MyKafkaDeserializationSchema.class);SingleOutputStreamOperator<DataBean> jsonbean = kafkaStreamWithId.process(new JsonToBeanWithIdFunction());SingleOutputStreamOperator<DataBean> filterBean = jsonbean.filter(bean -> "liveEnter".equals(bean.getEventId()) || "liveLeave".equals(bean.getEventId()));KeyedStream<DataBean, String> keyedStream = filterBean.keyBy(bean -> bean.getProperties().get("anchor_id").toString());WindowedStream<DataBean, String, TimeWindow> WindowedStream= keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> res = WindowedStream.aggregate(new AudienceCount2());res.print();}}
import com.doit.live.cn.demoText.POJO.DataBean;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple3;public class AudienceCount2 implements AggregateFunction<DataBean, Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> {@Overridepublic Tuple3<String, Integer, Integer> createAccumulator() {return Tuple3.of(null,0,0);}@Overridepublic Tuple3<String, Integer, Integer> add(DataBean bean, Tuple3<String, Integer, Integer> acc) {String roomId = bean.getProperties().get("room_id").toString();acc.f0=roomId;String eventId = bean.getEventId();if("liveEnter".equals(eventId)){acc.f1+=1;acc.f2+=1;}else if("liveLeave".equals(eventId)){acc.f2-=1;}return acc;}@Overridepublic Tuple3<String, Integer, Integer> getResult(Tuple3<String, Integer, Integer> acc) {return acc;}@Overridepublic Tuple3<String, Integer, Integer> merge(Tuple3<String, Integer, Integer> stringIntegerIntegerTuple3, Tuple3<String, Integer, Integer> acc1) {return null;}
}

直播人数的统计(一)相关推荐

  1. python爬虫:使用selenium、unittest和BeautifulSoup爬取斗鱼tv的当前直播人数

    import unittest from selenium import webdriver from bs4 import BeautifulSoup as bsclass douyu(unitte ...

  2. 直播APP开发:直播人数是否该有限制

    2019独角兽企业重金招聘Python工程师标准>>> 谈及直播,大众率先想到的一定是一对多的秀场直播或游戏直播,直播在中国兴起以来,就是一对多的模式在发展,所以一对多.视频直播等模 ...

  3. OpenCV视频识别检测人数跟踪统计

    Python+OpenCV视频识别检测人数跟踪统计 如需远程调试,可加QQ905733049由专业技术人员远程协助! 运行代码如下: import numpy as np import cv2 imp ...

  4. 大数据之直播平台数据统计

    已知有以下直播平台数据 json格式 {"id":158008900435,"uid":120010010445,"nickname":&q ...

  5. c++ 用类统计不及格人数_统计小课堂13

    Previous review: 1)回归章节小结: 这周开始计数资料统计! -------------------------------------- 当观察数据按照某种属性和类别分组后,计数得到 ...

  6. wow服务器人数最新统计,魔兽世界怀旧服服务器人数统计 魔兽世界怀旧服人数比例查询...

    魔兽世界怀旧服服务器人数统计是游戏每个服务器玩家数量,大家想知道排队人数喔,那么魔兽世界怀旧服服务器人数统计有多少.魔兽世界怀旧服人数比例查询呢,跑跑车游戏网为大家带来介绍. *魔兽世界怀旧服服务器人 ...

  7. mysql查询各专业人数_SQL统计各专业学生人数

    统计"学生"表中学生的总人数的sql语句是: select count(*) from student; 其中select代表查询,count(*)是统计行数量,student是学 ...

  8. c语言如何统计不同分数段学生人数,Excel统计不同班级各个分数段的学生人数的方法...

    有很多人不了解Excel统计不同班级各个分数段的学生人数的方法,那么今天小编就在这里给大家分享一点我的小经验,希望可以给你们带来帮助. Excel统计不同班级各个分数段的学生人数的方法 如图所示,我们 ...

  9. VC++海康威视视频人数流量统计数据库连接

    第一次使用VC++二次开发海康威视设备,记录一下,直接代码 `#include <stdio.h> #include <WinSock.h> #include <mysq ...

最新文章

  1. 基于锚框与无需锚框的通用物体检测算法
  2. 过滤驱动加密文件(代码)
  3. 普华永道2030汽车产业报告 私家车真正Out了!
  4. suse的安装命令zypper,类似apt
  5. selenium自动化测试多条数据选择第一条
  6. win7(旗舰版)下,OleLoadPicture 加载内存中的图片(MagickGetImageBlob),返回值 0
  7. Factory Method(工厂方法)--对象创建型模式
  8. IT职业就业-学长有话说(二)
  9. Bootstrap 工具提示插件Tooltip的方法
  10. 3. 机器学习中为什么需要梯度下降_梯度提升(Gradient Boosting)算法
  11. 附件计算器中的MC、MR、MS、M+作用
  12. 分布式事务及分布式系统一致性解决方案
  13. 有り様、状態、様子 の 区別
  14. Android开发 Facebook取得key-hashes
  15. 软件需求说明书-总务办公管理系统
  16. ps4微软服务器地址,PSN服务更新 终于可以在PC上玩PS4游戏了!
  17. 带你玩转 3D 检测和分割 (二):核心组件分析之坐标系和 Box
  18. 我转行程序员的那一年(八)
  19. 道路中心线提取、河道中心线的提取(ArcScan)
  20. LM7805的特殊使用方法

热门文章

  1. 华为硬件工程师社招机考题库_华为电子软硬件工程师招聘笔试题
  2. 《创客学院嵌入式从入门到精通》笔记--10全面掌握嵌入式系统移植
  3. 谷歌11亿美元买台企手机团队 对硬件有长远计划
  4. java调用百度内容审核接口检查文本
  5. Windows 服务程序(一)
  6. SAP FICO 成本对象控制解析
  7. CSS的鼠标手势实现
  8. teamviewer付费版,授权轻松访问后还是每次电脑重启后还需要输入密码问题。
  9. python实现ID3
  10. Win11 22H2怎么退回之前版本?Win11回滚Win10系统教程(三种方法)