geotools 读取shp属性过滤_Flink进阶之使用布隆过滤器实现UV统计
关注公众号,回复关键字“Flink”获取更多资料
1
思路分析
在实际应用中,我们往往关注在一段时间内有多少不同的用户访问了网站,也就是网站的独立访客数(Unique Visitor),也就是UV.
在UV不太大的情况下,我们可以把所有数据的userId都存在了窗口计算的状态里,在窗口收集数据的过程中,状态会不断增大。一般情况下,只要不超出内存的承受范围,这种做法也没什么问题;但如果我们遇到的数据量很大呢?把所有数据暂存放到内存里,显然不是一个好注意。
我们会想到,可以利用redis这种内存级k-v数据库,为我们做一个缓存。但如果我们遇到的情况非常极端,数据大到惊人呢?比如上亿级的用户,要去重计算UV。
如果放到redis中,亿级的用户id(每个20字节左右的话)可能需要几G甚至几十G的空间来存储。当然放到redis中,用集群进行扩展也不是不可以,但明显代价太大了。
一个更好的想法是,其实我们不需要完整地存储用户ID的信息,只要知道他在不在就行了。所以其实我们可以进行压缩处理,用一位(bit)就可以表示一个用户的状态。这个思想的具体实现就是布隆过滤器(BloomFilter)。
本质上布隆过滤器是一种数据结构,比较巧妙的概率型数据结构(probabilisticdatastructure),特点是高效地插入和查询,可以用来告诉你“某样东西一定不存在或者可能存在”。
它本身是一个很长的二进制向量,既然是二进制的向量,那么显而易见的,存放的不是0,就是1。相比于传统的List、Set、Map等数据结构,它更高效、占用空间更少,但是缺点是其返回的结果是概率性的,而不是确切的。
我们的目标就是,利用某种方法(一般是Hash函数)把每个数据,对应到一个位图的某一位上去;如果数据存在,那一位就是1,不存在则为0。
2
具体实现
接下来我们就来具体实现一下。注意这里我们用到了redis连接存取数据,所以需要加入redis客户端的依赖:
<dependencies> <dependency> <groupId>redis.clientsgroupId> <artifactId>jedisartifactId> <version>2.8.1version> dependency>dependencies>
我们准备了一份web服务器的日志数据,这里以apache服务器的一份log为例,每一行日志记录了访问者的IP、userID、访问时间、访问方法以及访问的url.
创建POJO类UserBehavior,用于将原始数据包装成此类输出.
public class UserBehavior { // 定义私有属性 private Long userId; private Long itemId; private Integer categoryId; private String behavior; private Long timestamp; public UserBehavior() { } public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) { this.userId = userId; this.itemId = itemId; this.categoryId = categoryId; this.behavior = behavior; this.timestamp = timestamp; } public Long getUserId() { return userId; } public void setUserId(Long userId) { this.userId = userId; } public Long getItemId() { return itemId; } public void setItemId(Long itemId) { this.itemId = itemId; } public Integer getCategoryId() { return categoryId; } public void setCategoryId(Integer categoryId) { this.categoryId = categoryId; } public String getBehavior() { return behavior; } public void setBehavior(String behavior) { this.behavior = behavior; } public Long getTimestamp() { return timestamp; } public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } @Override public String toString() { return "UserBehavior{" + "userId=" + userId + ", itemId=" + itemId + ", categoryId=" + categoryId + ", behavior='" + behavior + '\'' + ", timestamp=" + timestamp + '}'; }}
创建POJO类PageViewCount,用于将最终的结果数据包装输出。
public class PageViewCount { private String url; private Long windowEnd; private Long count; public PageViewCount() { } public PageViewCount(String url, Long windowEnd, Long count) { this.url = url; this.windowEnd = windowEnd; this.count = count; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public Long getWindowEnd() { return windowEnd; } public void setWindowEnd(Long windowEnd) { this.windowEnd = windowEnd; } public Long getCount() { return count; } public void setCount(Long count) { this.count = count; } @Override public String toString() { return "PageViewCount{" + "url='" + url + '\'' + ", windowEnd=" + windowEnd + ", count=" + count + '}'; }}
在src/main/java下创建UvWithBloomFilter.java文件,具体代码如下:
public class UvWithBloomFilter { public static void main(String[] args) throws Exception { // 1. 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 2. 读取数据,创建DataStream URL resource = UniqueVisitor.class.getResource("/UserBehavior.csv"); DataStream inputStream = env.readTextFile(resource.getPath()); // 3. 转换为POJO,分配时间戳和watermark DataStream dataStream = inputStream .map(line -> { String[] fields = line.split(","); return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4])); }) .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() { @Override public long extractAscendingTimestamp(UserBehavior element) { return element.getTimestamp() * 1000L; } }); // 开窗统计uv值 SingleOutputStreamOperator uvStream = dataStream .filter(data -> "pv".equals(data.getBehavior())) .timeWindowAll(Time.hours(1)) .trigger( new MyTrigger() ) .process( new UvCountResultWithBloomFliter() ); uvStream.print(); env.execute("uv count with bloom filter job"); } // 自定义触发器 public static class MyTrigger extends Trigger<UserBehavior, TimeWindow>{ @Override public TriggerResult onElement(UserBehavior element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { // 每一条数据来到,直接触发窗口计算,并且直接清空窗口 return TriggerResult.FIRE_AND_PURGE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { } } // 自定义一个布隆过滤器 public static class MyBloomFilter { // 定义位图的大小,一般需要定义为2的整次幂 private Integer cap; public MyBloomFilter(Integer cap) { this.cap = cap; } // 实现一个hash函数 public Long hashCode( String value, Integer seed ){ Long result = 0L; for( int i = 0; i < value.length(); i++ ){ result = result * seed + value.charAt(i); } return result & (cap - 1); } } // 实现自定义的处理函数 public static class UvCountResultWithBloomFliter extends ProcessAllWindowFunction<UserBehavior, PageViewCount, TimeWindow>{ // 定义jedis连接和布隆过滤器 Jedis jedis; MyBloomFilter myBloomFilter; @Override public void open(Configuration parameters) throws Exception { jedis = new Jedis("localhost", 6379); myBloomFilter = new MyBloomFilter(1<<29); // 要处理1亿个数据,用64MB大小的位图 } @Override public void process(Context context, Iterable elements, Collector out) throws Exception { // 将位图和窗口count值全部存入redis,用windowEnd作为key Long windowEnd = context.window().getEnd(); String bitmapKey = windowEnd.toString(); // 把count值存成一张hash表 String countHashName = "uv_count"; String countKey = windowEnd.toString(); // 1. 取当前的userId Long userId = elements.iterator().next().getUserId(); // 2. 计算位图中的offset Long offset = myBloomFilter.hashCode(userId.toString(), 61); // 3. 用redis的getbit命令,判断对应位置的值 Boolean isExist = jedis.getbit(bitmapKey, offset); if( !isExist ){ // 如果不存在,对应位图位置置1 jedis.setbit(bitmapKey, offset, true); // 更新redis中保存的count值 Long uvCount = 0L; // 初始count值 String uvCountString = jedis.hget(countHashName, countKey); if( uvCountString != null && !"".equals(uvCountString) ) uvCount = Long.valueOf(uvCountString); jedis.hset(countHashName, countKey, String.valueOf(uvCount + 1)); out.collect(new PageViewCount("uv", windowEnd, uvCount + 1)); } } @Override public void close() throws Exception { jedis.close(); } }}
扫码关注我们
微信号|bigdata_story
B站|大数据那些事
想获取更多更全资料
扫码加好友入群
欢迎各位大佬加入开源共享
共同面对大数据领域疑难问题
来稿请投邮箱:miaochuanhai@126.com
geotools 读取shp属性过滤_Flink进阶之使用布隆过滤器实现UV统计相关推荐
- java使用geotools读取shp文件
java使用geotools读取shp文件 测试shp文件 引入geotools包 压缩包文件处理 shp文件相关信息的读取 运行结果 GeoTools是一个开源的Java GIS工具包,可利用它来开 ...
- GeoJson的生成与解析,JSON解析,Java读写geojson,geotools读取shp文件,Geotools中Geometry对象与GeoJson的相互转换
GeoJson的生成与解析 一.wkt格式的geometry转成json格式 二.json格式转wkt格式 三.json格式的数据进行解析 四.Java读写geojson 五.geotools读取sh ...
- GeoTools读取shp文件中文乱码解决方案汇总
Java在GeoTools组件读取Shp文件属性乱码问题,解决汇总(持续更新,暂时没有完美解决方案) GeoTools组件在读取Shp文件的属性表信息时,当读取到中文字符时,在代码中的显示为乱码. 问 ...
- geotools读取shp文件及shp文件操作工具类代码
geotools读取shp文件及shp文件操作工具类代码.pdf 完整文档下载地址 https://download.csdn.net/download/a772304419/17468931 imp ...
- GeoTools——读取shapefile数据
目录 一.引言 二.代码操作 1.服务端 2.返回数据 3.客户端 三.总结 一.引言 GeoTools在开源gis世界中使用极为常见,地位类比于arcgis中的arcgis engine,当我们要使 ...
- 第六章 商品详情进阶 + redis分布式锁 + redis问题解决 + redisson + 布隆过滤器
一.商品详情页面优化 1.1 思路 虽然咱们实现了页面需要的功能,但是考虑到该页面是被用户高频访问的,所以性能需要优化. 一般一个系统最大的性能瓶颈,就是数据库的io操作.从数据库入手也是调优性价比最 ...
- GDAL C#读取shp中文属性值乱码问题
GDAL的C#版本读取shp中,如果属性值中含有中文,读出来有可能是乱码的问题,根据SWIG生成的C#代码调试发现问题所在,在Ogr.cs文件中有这么一个函数,代码如下: internal stati ...
- 简析服务端通过geotools导入SHP至PG的方法
文章版权由作者李晓晖和博客园共有,若转载请于明显处标明出处:http://www.cnblogs.com/naaoveGIS/ 1.背景 项目中需要在浏览器端直接上传SHP后服务端进行数据的自动入PG ...
- SpringBoot + geotools 操作 shp文件
SpringBoot整合GeoTools 1.GeoTools相关的依赖 2.本文所用到的公共类及实体类 3.本文所用到的数据库表 4.WKT格式怎么转化为GeoJson格式 5.GeoJson格式怎 ...
最新文章
- vue checkbox 默认选中
- python主要运用于-Python的8大主要应用领域,看看哪个是你的菜?
- 你遇到过哪些理工科的实验高手,他们有哪些优秀的思维习惯?
- mysql shell
- 4-6:TCP协议之滑动窗口
- php中显示不出来,图片显示不出来,但是数据库里有显示
- 开奖及送书|《漫画算法:小灰的算法之旅(Python篇)》
- java编写八数码_java实现八数码
- python3带tkinter窗口的ftp服务器,并使用pyinstaller打包成exe
- 虚拟化系列-Citrix XenServer 6.1 XenMotion与HA
- 芒果云 在线代码编辑器
- ospf的五类LSA
- 【备忘】大数据最火爆技术spark之王家林2016最新高清视频教程
- JVM性能调优(一)(JVM参数详解、内存分析等)
- mysql错误码2002_MySQL错误ERROR 2002 (HY000): Can't connect to local MySQL server through socket
- 计算机中通道的基本功能,Photoshop中各个面板的基本功能介绍 -电脑资料
- 如何使用Microsoft PowerPoint制作海报
- [转载]如何在非443端口开https
- FFmpeg[22] - 解决ffmpeg yasm not found, use --disable-yasm for a crippled build
- 符合FDA标准的邮件安全证书(S/MIME)有哪些?
热门文章
- 谷歌再遭反垄断起诉:曾试图“扼杀”三星应用商店!
- Mac、iPad 之间拖拽即可移动文件、iOS 15 来了,这届 WWDC21 精彩内容尽在这里!
- 超过 1 亿 Android 用户的数据遭泄露!
- 为什么 Deno 没有众望所归?超越 Node.js 还要做些什么?
- 实战:基于OpenCV进行长时间曝光
- 动真格!阿里云刚说缺 5000 程序员,今天就来抢人!
- @开发者 争抢技术红利,百度自研 4 款人脸硬件要和大家见面了!
- 科技驰援背后:技术没有假期!
- 你抢的不是春节红包而是云!
- 从事 Android 开发六年,我学到的那些事!