文章目录

  • 一 独立访客数量计算
  • 二 布隆过滤器
    • 1 什么是布隆过滤器
    • 2 实现原理
      • (1)HashMap 的问题
      • (2)布隆过滤器数据结构
    • 3 使用布隆过滤器去重

一 独立访客数量计算

public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.readTextFile("E:\\develop\\MyWork\\flink2022tutorial\\src\\main\\resources\\UserBehavior.csv").map(new MapFunction<String, Example7.UserBehavior>() {@Overridepublic Example7.UserBehavior map(String value) throws Exception {String[] arr = value.split(",");return new Example7.UserBehavior(arr[0],arr[1],arr[2],arr[3],Long.parseLong(arr[4]) * 1000L);}}).keyBy(r -> r.behavior.equals("pv")).assignTimestampsAndWatermarks(WatermarkStrategy.<Example7.UserBehavior>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Example7.UserBehavior>() {@Overridepublic long extractTimestamp(Example7.UserBehavior element, long recordTimestamp) {return element.timestamp;}})).keyBy(r -> true).window(TumblingEventTimeWindows.of(Time.hours(1))).aggregate(new CountAgg(),new WindowResult()).print();env.execute();
}public static class WindowResult extends ProcessWindowFunction<Long,String,Boolean, TimeWindow>{@Overridepublic void process(Boolean s, Context context, Iterable<Long> elements, Collector<String> out) throws Exception {String windowStart = new Timestamp(context.window().getStart()).toString();String windowEnd = new Timestamp(context.window().getEnd()).toString();Long count = elements.iterator().next();out.collect("窗口" + windowStart + "~" + windowEnd + "的统计值是:" + count);}
}// 使用hashset对用户id进行去重,输出访客数量
public static class CountAgg implements AggregateFunction<Example7.UserBehavior, HashSet<String>,Long>{@Overridepublic HashSet<String> createAccumulator() {return new HashSet<>();}@Overridepublic HashSet<String> add(Example7.UserBehavior value, HashSet<String> accumulator) {accumulator.add(value.userId);return accumulator;}@Overridepublic Long getResult(HashSet<String> accumulator) {return (long)accumulator.size();}@Overridepublic HashSet<String> merge(HashSet<String> a, HashSet<String> b) {return null;}
}

此程序有一个隐患:现将所有数据keyBy到了同一条流,每一个小时取一次uv,添加到hashSet去重,如果程序的用户向很大,如1亿个独立访客,一个用户的用户id为100个字符,那么一个窗口中的独立访客就要占用10G的内存。

想要优化这种使用了增量聚合与全窗口聚合的程序,就需要使用一种新的数据结构 – 布隆过滤器。

二 布隆过滤器

1 什么是布隆过滤器

本质上布隆过滤器是一种数据结构,是一种比较巧妙的概率型数据结构(probabilistic datastructure),特点是高效地插入和查询,可以用来告诉你“某样东西一定不存在或者可能存在”。

相比于传统的 List、Set、Map 等数据结构,它更高效、占用空间更少,但是缺点是其返回的结果是概率性的,而不是确切的。

2 实现原理

(1)HashMap 的问题

通常我们判断某个元素是否存在用的是什么?

HashMap 可以将值映射到 HashMap 的 Key,然后可以在 O(1) 的时间复杂度内返回结果,效率奇高。但是 HashMap 的实现也有缺点,例如存储容量占比高,考虑到负载因子的存在,通常空间是不能被用满的,而一旦值很多,例如上亿的时候,那 HashMap 占据的内存大小就变得很可观了。

再加入我们的数据集存储在远程服务器上,本地服务接受输入,而数据集非常大不可能一次性读进内存构建 HashMap 的时候,也会存在问题。

(2)布隆过滤器数据结构

布隆过滤器是一个 bit 向量或者说 bit 数组,如下图:

如果要映射一个值到布隆过滤器中,需要使用多个不同的哈希函数生成多个哈希值,并对每个生成的哈希值指向的 bit 位重置为 1,例如针对值“baidu”和三个不同的哈希函数分别生成了哈希值 1、4、7,之后“baidu”字符串就被丢弃,则上图转变为:

那么此时现在再存一个值“tencent”,如果哈希函数返回 3、4、8 的话,图继续变为:

值得注意的是,4 这个 bit 位由于两个值的哈希函数都返回了这个 bit 位,因此它被覆盖了。

现在如果想查询“dianping”这个值是否存在,哈希函数返回了 1、5、8 三个值,结果发现 5 这个 bit 位上的值为 0,说明没有任何一个值映射到这个 bit 位上,因此可以很确定地说“dianping”这个值不存在。

而当需要查询“baidu”这个值是否存在的话,那么哈希函数必然会返回 1、4、7,然后检查发现这三个 bit 位上的
值均为 1,那么不可以说“baidu”存在,只能是“baidu”这个值可能存在,不确定1、4、7这三位没有被其他数据覆盖过。

3 使用布隆过滤器去重

使用org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter包下的布隆过滤器。

// 使用布隆过滤器对用户id进行去重,输出访客数量
public static class CountAgg implements AggregateFunction<Example7.UserBehavior, Tuple2<Long,BloomFilter<String>>,Long>{@Overridepublic Tuple2<Long, BloomFilter<String>> createAccumulator() {// 参数依次为要去重的元素类型,预估有多少人,误判率return Tuple2.of(0L,BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8),100000,0.01));}@Overridepublic Tuple2<Long, BloomFilter<String>> add(Example7.UserBehavior value, Tuple2<Long, BloomFilter<String>> accumulator) {if(!accumulator.f1.mightContain(value.userId)){accumulator.f1.put(value.userId);accumulator.f0 += 1L;}return accumulator;}@Overridepublic Long getResult(Tuple2<Long, BloomFilter<String>> accumulator) {return accumulator.f0;}@Overridepublic Tuple2<Long, BloomFilter<String>> merge(Tuple2<Long, BloomFilter<String>> a, Tuple2<Long, BloomFilter<String>> b) {return null;}

【Flink】需求实现之独立访客数量的计算 和 布隆过滤器的原理及使用相关推荐

  1. java基础巩固-宇宙第一AiYWM:为了维持生计,做项目经验之~【多用户关注共同的参数的统计功能】开发总结、再来个独立访客(Unique Visitor,简称UV)统计番外篇~整起

    实验室项目来了一个新需求:因为我们那个系统主要是人家用下位机把一些路面参数.空气湿度等参数测出来之后发送到我们阿里云服务器上的pg数据库中,然后我们对数据进行页面上的展示.然后公司那边说有个需求,就是 ...

  2. 网站统计中的PV(访问量):UV(独立访客):IP(独立IP)的定义与区别

    --------首先来看看ip.uv和pv的定义---------- PV(访问量):即Page View, 即页面浏览量或点击量,用户每次刷新即被计算一次. UV(独立访客):即Unique Vis ...

  3. 独立IP、特产浏览量(PV)、访问次数(VV)、独立访客(UV)有什么区别?

    转自 http://blog.sina.com.cn/s/blog_a5fc76bb0101073a.html 访问次数(VV):记录所有访客1天内访问了多少次您的特产,相同的访客有可能多次访问您的特 ...

  4. 网站独立访客数UV的统计--海量数据去重

    问题描述:统计每一小时的网站独立访客数UV(Unique Visitor) 问题背景:这是尚硅谷大数据技术之电商用户行为数据分析的一道例题,武晟然老师讲授的方法是,自定义布隆过滤器进行UV统计.受到老 ...

  5. 《Splunk智能运维实战》——3.4 显示唯一访客数量

    本节书摘来自华章计算机<Splunk智能运维实战>一书中的第3章,第3.4节,作者 [美]乔史·戴昆(Josh Diakun),保罗R.约翰逊(Paul R. Johnson),德莱克·默 ...

  6. 唯一身份访问者(独立访客)与访问次数的区别

    唯一身份访问者(独立访客)与访问次数的区别 在进行网站分析前,对网站分析的基本度量的了解是非常必要的,这样才会深入理解网站分析,否则就会不知所云. 定义: 1.唯一身份访问者(Unique Visit ...

  7. 如何查看微信小程序的累计独立访客(UV)

    查看方式: 累计独立访客(UV)不低于 1000 点击左侧菜单栏的统计模块 3.选择数据看板-访问核心数据-,累计用户数就可以看到了 开通条件 累计独立访客(UV)不低于 1000 存在刷粉行为或有严 ...

  8. 浏览页面 访客 数量 统计

    int n=0;String counter=(String)application.getAttribute("counter");if(counter!=null){n=Int ...

  9. 怎么快速达到微信小程序累计独立访客(UV)不低于 1000

    大家看看上面这个小程序也许能找到答案

最新文章

  1. hadoop 安全模式
  2. 计算机存储器可分两类,计算机存储器可分为几类?它们的主要区别是什么?
  3. Jscript中window.setInterval和window.setTimeout的区别
  4. DM365 color space
  5. linux用户操作的日志,linux 用户操作记录并录入日志
  6. 工作224:当前函数造成
  7. Eclipse 控制console
  8. Stanford_NLP_TOOLS:CRFClassifier
  9. PHP中register_globals参数为OFF和ON的区别
  10. 提高(微)服务安全的非完全攻略
  11. 福师计算机在线作业在每个w,16春季福师《计算机应用基础》在线作业二
  12. java框架常见面试题_java框架面试题总结
  13. vue3动态加载组件
  14. 视频处理中各个分辨率/数字电视系统显示格式 的介绍(QCIF,CIF,4CIF,D1,720P,1080I,1080P等)
  15. 3dmax材质编辑器模糊字有重影怎么解决?
  16. 玩转Vagrant之工作环境的迁移(box的导入与导出)
  17. MSSQL 负载均衡(Moebius)
  18. Resolver error Error Downloading VS Code Server failed - please install either curl or wget on the
  19. 毕业生的商业软件开发之路 --- 现代商业软件开发概况
  20. Android对现有的apk进行修改(汉化,修改QQ尾巴)

热门文章

  1. 苹果6s连上wifi上不了网络连接服务器未响应,6s连不上网了怎么解决
  2. ruby_对象的比较_等于号_3个等于号_equal_eql
  3. 电脑显示请检查映像服务器,该任务映像已损坏或已篡改的解决方法
  4. Docker 安装 SRS
  5. MySQL查询行记录关键字_MySQL数据库~~~~~查询行(文件的内容)
  6. Suggestion: use tools:overrideLibrary=xxx.xxx.xxx to force usage
  7. python 对数函数_使用Python玩转高等数学(4):对数函数
  8. dismiss和remove_Dialog的dismiss和cancel 区别 (转)
  9. 【Win8自带微软输入法删除图解】
  10. 阿里云 mysql 修改root密码修改_设置及修改MySQL root用户密码 - MySQL中文参考手册...