关注公众号,回复关键字“Flink”获取更多资料

1

思路分析

在实际应用中,我们往往关注在一段时间内有多少不同的用户访问了网站,也就是网站的独立访客数(Unique Visitor),也就是UV.

在UV不太大的情况下,我们可以把所有数据的userId都存在了窗口计算的状态里,在窗口收集数据的过程中,状态会不断增大。一般情况下,只要不超出内存的承受范围,这种做法也没什么问题;但如果我们遇到的数据量很大呢?把所有数据暂存放到内存里,显然不是一个好注意。

我们会想到,可以利用redis这种内存级k-v数据库,为我们做一个缓存。但如果我们遇到的情况非常极端,数据大到惊人呢?比如上亿级的用户,要去重计算UV。

如果放到redis中,亿级的用户id(每个20字节左右的话)可能需要几G甚至几十G的空间来存储。当然放到redis中,用集群进行扩展也不是不可以,但明显代价太大了。

一个更好的想法是,其实我们不需要完整地存储用户ID的信息,只要知道他在不在就行了。所以其实我们可以进行压缩处理,用一位(bit)就可以表示一个用户的状态。这个思想的具体实现就是布隆过滤器(BloomFilter)。

本质上布隆过滤器是一种数据结构,比较巧妙的概率型数据结构(probabilisticdatastructure),特点是高效地插入和查询,可以用来告诉你“某样东西一定不存在或者可能存在”。

它本身是一个很长的二进制向量,既然是二进制的向量,那么显而易见的,存放的不是0,就是1。相比于传统的List、Set、Map等数据结构,它更高效、占用空间更少,但是缺点是其返回的结果是概率性的,而不是确切的。

我们的目标就是,利用某种方法(一般是Hash函数)把每个数据,对应到一个位图的某一位上去;如果数据存在,那一位就是1,不存在则为0。

2

具体实现

接下来我们就来具体实现一下。注意这里我们用到了redis连接存取数据,所以需要加入redis客户端的依赖:

<dependencies>    <dependency>        <groupId>redis.clientsgroupId>        <artifactId>jedisartifactId>        <version>2.8.1version>    dependency>dependencies>

我们准备了一份web服务器的日志数据,这里以apache服务器的一份log为例,每一行日志记录了访问者的IP、userID、访问时间、访问方法以及访问的url.

创建POJO类UserBehavior,用于将原始数据包装成此类输出.

public class UserBehavior {    // 定义私有属性    private Long userId;    private Long itemId;    private Integer categoryId;    private String behavior;    private Long timestamp;    public UserBehavior() {    }    public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) {        this.userId = userId;        this.itemId = itemId;        this.categoryId = categoryId;        this.behavior = behavior;        this.timestamp = timestamp;    }    public Long getUserId() {        return userId;    }    public void setUserId(Long userId) {        this.userId = userId;    }    public Long getItemId() {        return itemId;    }    public void setItemId(Long itemId) {        this.itemId = itemId;    }    public Integer getCategoryId() {        return categoryId;    }    public void setCategoryId(Integer categoryId) {        this.categoryId = categoryId;    }    public String getBehavior() {        return behavior;    }    public void setBehavior(String behavior) {        this.behavior = behavior;    }    public Long getTimestamp() {        return timestamp;    }    public void setTimestamp(Long timestamp) {        this.timestamp = timestamp;    }    @Override    public String toString() {        return "UserBehavior{" +                "userId=" + userId +                ", itemId=" + itemId +                ", categoryId=" + categoryId +                ", behavior='" + behavior + '\'' +                ", timestamp=" + timestamp +                '}';    }}

创建POJO类PageViewCount,用于将最终的结果数据包装输出。

public class PageViewCount {    private String url;    private Long windowEnd;    private Long count;    public PageViewCount() {    }    public PageViewCount(String url, Long windowEnd, Long count) {        this.url = url;        this.windowEnd = windowEnd;        this.count = count;    }    public String getUrl() {        return url;    }    public void setUrl(String url) {        this.url = url;    }    public Long getWindowEnd() {        return windowEnd;    }    public void setWindowEnd(Long windowEnd) {        this.windowEnd = windowEnd;    }    public Long getCount() {        return count;    }    public void setCount(Long count) {        this.count = count;    }    @Override    public String toString() {        return "PageViewCount{" +                "url='" + url + '\'' +                ", windowEnd=" + windowEnd +                ", count=" + count +                '}';    }}

在src/main/java下创建UvWithBloomFilter.java文件,具体代码如下:

public class UvWithBloomFilter {    public static void main(String[] args) throws Exception {        // 1. 创建执行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        // 2. 读取数据,创建DataStream        URL resource = UniqueVisitor.class.getResource("/UserBehavior.csv");        DataStream inputStream = env.readTextFile(resource.getPath());        // 3. 转换为POJO,分配时间戳和watermark        DataStream dataStream = inputStream                .map(line -> {                    String[] fields = line.split(",");                    return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));                })                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {                    @Override                    public long extractAscendingTimestamp(UserBehavior element) {                        return element.getTimestamp() * 1000L;                    }                });        // 开窗统计uv值        SingleOutputStreamOperator uvStream = dataStream                .filter(data -> "pv".equals(data.getBehavior()))                .timeWindowAll(Time.hours(1))                .trigger( new MyTrigger() )                .process( new UvCountResultWithBloomFliter() );        uvStream.print();        env.execute("uv count with bloom filter job");    }    // 自定义触发器    public static class MyTrigger extends Trigger<UserBehavior, TimeWindow>{        @Override        public TriggerResult onElement(UserBehavior element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {            // 每一条数据来到,直接触发窗口计算,并且直接清空窗口            return TriggerResult.FIRE_AND_PURGE;        }        @Override        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {            return TriggerResult.CONTINUE;        }        @Override        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {            return TriggerResult.CONTINUE;        }        @Override        public void clear(TimeWindow window, TriggerContext ctx) throws Exception {        }    }    // 自定义一个布隆过滤器    public static class MyBloomFilter {        // 定义位图的大小,一般需要定义为2的整次幂        private Integer cap;        public MyBloomFilter(Integer cap) {            this.cap = cap;        }        // 实现一个hash函数        public Long hashCode( String value, Integer seed ){            Long result = 0L;            for( int i = 0; i < value.length(); i++ ){                result = result * seed + value.charAt(i);            }            return result & (cap - 1);        }    }    // 实现自定义的处理函数    public static class UvCountResultWithBloomFliter extends ProcessAllWindowFunction<UserBehavior, PageViewCount, TimeWindow>{        // 定义jedis连接和布隆过滤器        Jedis jedis;        MyBloomFilter myBloomFilter;        @Override        public void open(Configuration parameters) throws Exception {            jedis = new Jedis("localhost", 6379);            myBloomFilter = new MyBloomFilter(1<<29);    // 要处理1亿个数据,用64MB大小的位图        }        @Override        public void process(Context context, Iterable elements, Collector out) throws Exception {            // 将位图和窗口count值全部存入redis,用windowEnd作为key            Long windowEnd = context.window().getEnd();            String bitmapKey = windowEnd.toString();            // 把count值存成一张hash表            String countHashName = "uv_count";            String countKey = windowEnd.toString();            // 1. 取当前的userId            Long userId = elements.iterator().next().getUserId();            // 2. 计算位图中的offset            Long offset = myBloomFilter.hashCode(userId.toString(), 61);            // 3. 用redis的getbit命令,判断对应位置的值            Boolean isExist = jedis.getbit(bitmapKey, offset);            if( !isExist ){                // 如果不存在,对应位图位置置1                jedis.setbit(bitmapKey, offset, true);                // 更新redis中保存的count值                Long uvCount = 0L;    // 初始count值                String uvCountString = jedis.hget(countHashName, countKey);                if( uvCountString != null && !"".equals(uvCountString) )                    uvCount = Long.valueOf(uvCountString);                jedis.hset(countHashName, countKey, String.valueOf(uvCount + 1));                out.collect(new PageViewCount("uv", windowEnd, uvCount + 1));            }        }        @Override        public void close() throws Exception {            jedis.close();        }    }}

扫码关注我们

微信号|bigdata_story

B站|大数据那些事

想获取更多更全资料

扫码加好友入群

欢迎各位大佬加入开源共享

共同面对大数据领域疑难问题

来稿请投邮箱:miaochuanhai@126.com

geotools 读取shp属性过滤_Flink进阶之使用布隆过滤器实现UV统计相关推荐

  1. java使用geotools读取shp文件

    java使用geotools读取shp文件 测试shp文件 引入geotools包 压缩包文件处理 shp文件相关信息的读取 运行结果 GeoTools是一个开源的Java GIS工具包,可利用它来开 ...

  2. GeoJson的生成与解析,JSON解析,Java读写geojson,geotools读取shp文件,Geotools中Geometry对象与GeoJson的相互转换

    GeoJson的生成与解析 一.wkt格式的geometry转成json格式 二.json格式转wkt格式 三.json格式的数据进行解析 四.Java读写geojson 五.geotools读取sh ...

  3. GeoTools读取shp文件中文乱码解决方案汇总

    Java在GeoTools组件读取Shp文件属性乱码问题,解决汇总(持续更新,暂时没有完美解决方案) GeoTools组件在读取Shp文件的属性表信息时,当读取到中文字符时,在代码中的显示为乱码. 问 ...

  4. geotools读取shp文件及shp文件操作工具类代码

    geotools读取shp文件及shp文件操作工具类代码.pdf 完整文档下载地址 https://download.csdn.net/download/a772304419/17468931 imp ...

  5. GeoTools——读取shapefile数据

    目录 一.引言 二.代码操作 1.服务端 2.返回数据 3.客户端 三.总结 一.引言 GeoTools在开源gis世界中使用极为常见,地位类比于arcgis中的arcgis engine,当我们要使 ...

  6. 第六章 商品详情进阶 + redis分布式锁 + redis问题解决 + redisson + 布隆过滤器

    一.商品详情页面优化 1.1 思路 虽然咱们实现了页面需要的功能,但是考虑到该页面是被用户高频访问的,所以性能需要优化. 一般一个系统最大的性能瓶颈,就是数据库的io操作.从数据库入手也是调优性价比最 ...

  7. GDAL C#读取shp中文属性值乱码问题

    GDAL的C#版本读取shp中,如果属性值中含有中文,读出来有可能是乱码的问题,根据SWIG生成的C#代码调试发现问题所在,在Ogr.cs文件中有这么一个函数,代码如下: internal stati ...

  8. 简析服务端通过geotools导入SHP至PG的方法

    文章版权由作者李晓晖和博客园共有,若转载请于明显处标明出处:http://www.cnblogs.com/naaoveGIS/ 1.背景 项目中需要在浏览器端直接上传SHP后服务端进行数据的自动入PG ...

  9. SpringBoot + geotools 操作 shp文件

    SpringBoot整合GeoTools 1.GeoTools相关的依赖 2.本文所用到的公共类及实体类 3.本文所用到的数据库表 4.WKT格式怎么转化为GeoJson格式 5.GeoJson格式怎 ...

最新文章

  1. vue checkbox 默认选中
  2. python主要运用于-Python的8大主要应用领域,看看哪个是你的菜?
  3. 你遇到过哪些理工科的实验高手,他们有哪些优秀的思维习惯?
  4. mysql shell
  5. 4-6:TCP协议之滑动窗口
  6. php中显示不出来,图片显示不出来,但是数据库里有显示
  7. 开奖及送书|《漫画算法:小灰的算法之旅(Python篇)》
  8. java编写八数码_java实现八数码
  9. python3带tkinter窗口的ftp服务器,并使用pyinstaller打包成exe
  10. 虚拟化系列-Citrix XenServer 6.1 XenMotion与HA
  11. 芒果云 在线代码编辑器
  12. ospf的五类LSA
  13. 【备忘】大数据最火爆技术spark之王家林2016最新高清视频教程
  14. JVM性能调优(一)(JVM参数详解、内存分析等)
  15. mysql错误码2002_MySQL错误ERROR 2002 (HY000): Can't connect to local MySQL server through socket
  16. 计算机中通道的基本功能,Photoshop中各个面板的基本功能介绍 -电脑资料
  17. 如何使用Microsoft PowerPoint制作海报
  18. [转载]如何在非443端口开https
  19. FFmpeg[22] - 解决ffmpeg yasm not found, use --disable-yasm for a crippled build
  20. 符合FDA标准的邮件安全证书(S/MIME)有哪些?

热门文章

  1. 谷歌再遭反垄断起诉:曾试图“扼杀”三星应用商店!
  2. Mac、iPad 之间拖拽即可移动文件、iOS 15 来了,这届 WWDC21 精彩内容尽在这里!
  3. 超过 1 亿 Android 用户的数据遭泄露!
  4. 为什么 Deno 没有众望所归?超越 Node.js 还要做些什么?
  5. 实战:基于OpenCV进行长时间曝光
  6. 动真格!阿里云刚说缺 5000 程序员,今天就来抢人!
  7. @开发者 争抢技术红利,百度自研 4 款人脸硬件要和大家见面了!
  8. 科技驰援背后:技术没有假期!
  9. 你抢的不是春节红包而是云!
  10. 从事 Android 开发六年,我学到的那些事!