文章目录

  • 电子围栏简介和应用场景
  • 电子围栏规则和分析结果数据结构
  • 电子围栏分析步骤
  • 电子围栏分析任务实现
  • 广播状态与实现
  • 电子围栏中的 ConnectStreamed应用
    • connect流说明
    • connect流使用场景
    • 两点之间球面距离的计算——DistanceCaculateUtil
    • 电子围栏中自定义对象将两个数据流合并
  • 设置窗口并计算确定是否在电子围栏内告警
  • 合并分析电子围栏结果
    • 读取电子围栏分析结果并广播
    • 窗口流数据与广播流数据连接
  • 电子围栏分析结果入库
  • 测试电子围栏

电子围栏简介和应用场景

  • 电子围栏简介和意义

地理围栏是一个虚拟的空间围栏,可以帮助开发者检测人或物何时进入或离开预定义区域,并支持实时报警功能。

  • 电子围栏的应用场景
  1. 签到打卡类场景
  2. 共享单车类场景
  3. 线下门店促销场景
  • 创建电子围栏
  • 在此项目中,使用的电子围栏是规则的圆形,判断是否在圆形电子围栏区域内,可以使用车辆位置和中心点球面距离小于等于半径,在电子围栏的区域内。
  • 还有一些不规则的电子围栏,这些可以使用射线取点的个数来判断是否在电子围栏内,如果是偶数在电子围栏外,否则是电子围栏内。

电子围栏规则和分析结果数据结构

  • 电子围栏的定义

  • 电子围栏规则数据结构

  • 字段
  • 数据样本示例

  • 电子围栏分析结果数据结构

  • 字段

电子围栏分析步骤

  • 电子围栏任务8大步骤
  1. 电子围栏分析任务设置、原始数据json解析、过滤异常数据

  2. 读取已存在电子围栏中的车辆与电子围栏信息(广播流临时结果数据)

  3. 原始车辆数据与电子围栏广播流进行合并,生成电子围栏规则模型流数据(DStream)

  4. 创建90秒翻滚窗口,计算电子围栏信息(ElectricFenceModel中的值根据车辆是否在围栏内进行设置)

  5. 读取电子围栏分析结果表数据并广播

  6. 翻滚窗口电子围栏对象模型流数据与电子围栏分析结果数据广播流进行connect

  7. 对电子围栏对象模型,添加uuid和inMySQL(车辆是否已存在mysql表中)

  8. 电子围栏分析结果数据落地mysql,也可以选择落地mongo

电子围栏分析任务实现

  • 电子栅栏分析的逻辑图

  • 电子围栏分析主类:ElectricFenceTask

  • 简化 ItcastDataObj 对象:ItcastDataPartObj.java

  • 简化解析 ItcastParseUtil 对象: JsonParsePartUtil.java

  • 测试工具类对象

//实现步骤:
1)初始化flink流处理的运行环境(设置按照事件时间处理数据、设置hadoopHome的用户名、设置checkpoint)
2)读取kafka数据源(调用父类的方法)
3)将字符串转换成javaBean(ItcastDataPartObj)对象
4)过滤出来正常数据
5)读取电子围栏规则数据以及电子围栏规则关联的车辆数据并进行广播
6)将原始数据(消费的kafka数据)与电子围栏规则数据进行关联操作(Connect)并flatMap为 ElectricFenceRulesFuntion
7)对上步数据分配水印(30s)并根据 vin 分组后应用90s滚动窗口,然后对窗口进行自定义函数的开发(计算出来该窗口的数据属于电子围栏外还是电子围栏内)
8)读取电子围栏分析结果表的数据并进行广播
9)对第七步和第八步产生的数据进行关联操作(connect)
10)对第九步的结果进行滚动窗口操作,应用自定义窗口函数(实现添加uuid和inMysql属性赋值)
11)将分析后的电子围栏结果数据实时写入到mysql数据库中
12)运行作业,等待停止

广播状态与实现

  • 回顾广播变量概念

广播变量就是将变量广播到各个 taskmanager的内存中,可以共享数据,一般情况下广播变量的类型是 map 类型 key->value

  • 广播变量的数据格式是——map类型state

  • 如何使用广播变量

HashMap<String,ElectriFenceResultTmp> ,其中String:vin

  • 电子围栏转换临时对象——ElectricFenceResultTmp
@Data
@AllArgsConstructor
public class ElectricFenceResultTmp {//电子围栏idprivate int id;//电子围栏名称private String name;//电子围栏中心地址private String address;//电子围栏半径private float radius;//电子围栏中心点的经度private double longitude;//电子围栏中心点的维度private double latitude;//电子围栏的开始时间private Date startTime;//电子围栏的结束时间private Date endTime;@Overridepublic String toString() {return "ElectricFenceResultTmp{" +"id=" + id +", name='" + name + '\'' +", address='" + address + '\'' +", radius=" + radius +", longitude=" + longitude +", latitude=" + latitude +", startTime=" + startTime +", endTime=" + endTime +'}';}
}
  • 自定义 source 读取 MySQL 的数据源并广播

    • 定义读取电子围栏规则类——MysqlElectricFenceSouce

      返回类型为 HashMap<String,ElectricFenceResultTmp>

//读取mysql存储的电子围栏规则表数据以及电子围栏规则关联的电子围栏规则车辆表数据,根据分析,一个车辆可能适配多个电子围栏规则,所以返回的数据类型定义为HashMap<vin, 电子围栏规则对象>,为了方便处理,我们只处理一个车辆关联一个电子围栏规则的场景(真事的业务开发中一定是一个车辆可能有很多很多对应电子围栏规则的)。
//继承 RichSourceFunction<HashMap<String, ElectricFenceResultTmp>>
//1.重写 open 方法
//1.1 获取上下文中的 parameterTool
//1.2 读取配置文件中,注册驱动 url user password
//1.3 实例化statement
//2.重写 close 方法
//2.1 关闭 statement 和 conn
//3.重写 run 方法
//3.1 每指定时间循环读取 mysql 中的电子围栏规则
//3.2 收集 electricFenceResult 指定休眠时间
//4.重写 cancel 方法
  • 读取数据库中配置信息

    select vins.vin,setting.id,setting.name,setting.address,setting.radius,setting.longitude,setting.latitude,setting.start_time,setting.end_time
    from vehicle_networking.electronic_fence_setting setting
    inner join vehicle_networking.electronic_fence_vins vins on setting.id=vins.setting_id
    where setting.status=1
    

电子围栏中的 ConnectStreamed应用

connect流说明

connect流使用场景

两点之间球面距离的计算——DistanceCaculateUtil

  • 导入工具jar包坐标

    <!-- geodesy地址位置查询依赖 -->
    <dependency><groupId>org.gavaghan</groupId><artifactId>geodesy</artifactId><version>${geodesy.version}</version>
    </dependency>
    
  • 两点之间球面距离的计算工具类

    /*** TODO 球面距离计算工具类;根据两个点的经纬度,计算出距离*/
    public class DistanceCaculateUtil {/*** @desc:计算地址位置方法,坐标系、经纬度用于计算距离(直线距离)* @param gpsFrom* @param gpsTo* @param ellipsoid* @return 计算距离*/private static Double getDistanceMeter(GlobalCoordinates gpsFrom, GlobalCoordinates gpsTo, Ellipsoid ellipsoid) {//GeodeticCurve geodeticCurve = new GeodeticCalculator().calculateGeodeticCurve(ellipsoid, gpsFrom, gpsTo);return geodeticCurve.getEllipsoidalDistance();}/*** @desc:使用传入的ellipsoidsphere方法计算距离* @param latitude 位置1经度* @param longitude 位置1维度* @param latitude2 位置2经度* @param longitude2 位置2维度* @param ellipsoid 椭圆计算算法* @return*/private static Double ellipsoidMethodDistance(Double latitude, Double longitude, Double latitude2, Double longitude2, Ellipsoid ellipsoid){// todo 位置点经度、维度不为空 位置点2经度、维度不为空 椭圆算法Objects.requireNonNull(latitude, "latitude is not null");Objects.requireNonNull(longitude, "longitude is not null");Objects.requireNonNull(latitude2, "latitude2 is not null");Objects.requireNonNull(longitude2, "longitude2 is not null");Objects.requireNonNull(ellipsoid, "ellipsoid method is not null");// todo 地球坐标对象:封装经度维度坐标对象GlobalCoordinates source = new GlobalCoordinates(latitude, longitude);GlobalCoordinates target = new GlobalCoordinates(latitude2, longitude2);// todo 椭圆范围计算方法return getDistanceMeter(source, target, ellipsoid);}/*** @desc:使用ellipsoidsphere方法计算距离* @param latitude* @param longitude* @param latitude2* @param longitude2* @return distance 单位:m*/public static Double getDistance(Double latitude,Double longitude,Double latitude2,Double longitude2) {// 椭圆范围计算方法:Ellipsoid.Spherereturn ellipsoidMethodDistance(latitude, longitude, latitude2, longitude2, Ellipsoid.Sphere);}
    }
    

电子围栏中自定义对象将两个数据流合并

  • 通过关联两个数据流后CoFlatMap 后生成实体类—— ElectricFenceModel

    /*** 电子围栏规则计算模型*/
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class ElectricFenceModel implements Comparable<ElectricFenceModel> {//车架号private String vin = "";//电子围栏结果表UUIDprivate Long uuid = -999999L;//上次状态 0 里面 1 外面private int lastStatus = -999999;//当前状态 0  里面 1 外面private int nowStatus = -999999;//位置时间 yyyy-MM-dd HH:mm:ssprivate String gpsTime = "";//位置纬度--private Double lat = -999999D;//位置经度--private Double lng = -999999D;//电子围栏IDprivate int eleId = -999999;//电子围栏名称private String eleName = "";//中心点地址private String address = "";//中心点纬度private Double latitude;//中心点经度private Double longitude = -999999D;//电子围栏半径private Float radius = -999999F;//出围栏时间private String outEleTime = null;//进围栏时间private String inEleTime = null;//是否在mysql结果表中private Boolean inMysql = false;//状态报警 0:出围栏 1:进围栏private int statusAlarm = -999999;//报警信息private String statusAlarmMsg = "";//终端时间private String terminalTime = "";// 扩展字段 终端时间private Long terminalTimestamp = -999999L;@Overridepublic int compareTo(ElectricFenceModel o) {if(this.getTerminalTimestamp() > o.getTerminalTimestamp()){return  1;}else if(this.getTerminalTimestamp() < o.getTerminalTimestamp()){return  -1;}else{return 0;}}
    }
    
  • 实现将两个流合并CoFlatMapFunction接口—— ElectricFenceRulesFuntion

    //1.定义返回的 ElectricFenceModel//2.判断如果流数据数据质量(车辆的经纬度不能为0或-999999,车辆GpsTime不能为空)//2.1.获取当前车辆的 vin//2.2.通过vin获取电子围栏的配置信息//2.3.如果电子围栏配置信息不为空//2.3.1.说明当前车辆关联了电子围栏规则,需要判断当前上报的数据是否在电子围栏规则的生效时间内,先获取上报地理位置时间gpsTimestamp//2.3.2.如果当前gpsTimestamp>=开始时间戳并且gpsTimestamp<=结束时间戳,以下内容存入到 ElectricFenceModel//2.3.2.1.上报车辆的数据在电子围栏生效期内 vin gpstime lng lat 终端时间和终端时间戳//2.3.2.2.电子围栏id,电子围栏名称,地址,半径//2.3.2.3.电子围栏经纬度//2.3.2.4.计算经纬度和电子围栏经纬度距离距离,如果两点之间大于半径(单位是千米)的距离,就是存在于圆外,否则反之//2.3.2.5.收集结果数据
    

设置窗口并计算确定是否在电子围栏内告警

  • 设置水印机制

  • 根据 vin 进行分组

  • 创建 90 秒翻滚窗口

  • 自定义电子围栏窗口实现类:ElectricFenceWindowFunction

    //对电子围栏进行自定义窗口操作,处理电子围栏判断逻辑
    //继承 RichWindowFunction<ElectricFenceModel, ElectricFenceModel, String, TimeWindow>
    //1.定义存储历史电子围栏数据的state,<vin,是否在电子围栏内0:内,1:外> MapState<String, Integer>
    //2.重写open方法
    //2.1 定义mapState的描述器(相当于表结构) <String,Integer>
    //2.2 获取 parameterTool,用来读取配置文件参数
    //2.3 读取状态的超时时间 "vehicle.state.last.period" ,构建ttl设置更新类型和状态可见
    //2.4 设置状态描述 StateTtlConfig,开启生命周期时间
    //2.5 获取map状态
    
  • apply 方法步骤如下

         //1.创建返回对象//2.对窗口内的数据进行排序//3.从 state 中获取车辆vin对应的上一次窗口电子围栏lastStateValue标记(车辆上一次窗口是否在电子围栏中)0:电子围栏内 1:电子围栏外//4.如果上次状态为空,初始化赋值//5.判断当前处于电子围栏内还是电子围栏外//5.1.定义当前车辆电子围栏内出现的次数//5.2.定义当前车辆电子围栏外出现的次数//6.定义当前窗口的电子围栏状态//7. 90s内车辆出现在电子围栏内的次数多于出现在电子围栏外的次数,则认为当前处于电子围栏内//8. 将当前窗口的电子围栏状态写入到 state 中,供下次判断//9.如果当前电子围栏状态与上一次电子围栏状态不同//9.1.如果上一次窗口处于电子围栏外,而本次是电子围栏内,则将进入电子围栏的时间写入到数据库中//9.1.1.过滤出来状态为0的第一条数据//9.1.2.拷贝属性给 electricFenceModel 并将进入终端时间赋值,并且将状态告警字段赋值为1 0:出围栏 1:进围栏,将数据collect返回//9.2.如果上一次窗口处于电子围栏内,而本次是电子围栏外,则将出电子围栏的时间写入到数据库中//9.2.1.过滤出来状态倒序为1的第一条数据//9.2.2.拷贝属性给 electricFenceModel 并将出终端时间赋值,并且将状态告警 0:出围栏 1:进围栏,将数据collect返回
    
  • 如果判断为进入到电子围栏,进入到电子围栏的第一条数据的时间会被记录下来

合并分析电子围栏结果

读取电子围栏分析结果并广播

  • 读取mysql的电子围栏结果表的数据——MysqlElectricFenceResultSource

    //读取电子围栏分析结果表的数据,并进行广播
    //继承自 RichSourceFunction<HashMap<String, Long>>
    //1.重写 open 方法,初始化连接
    //1.1 编写sql "select vin, min(id) id from vehicle_networking.electric_fence where inTime is not null and outTime is null GROUP BY vin;"
    //2.重写 close 方法
    //3.重写 run 方法 获取出来vin 和 id 封装成map并返回
    //4.重写 cancel 方法
    
  • 读取电子栅栏的 vin 和 最近id

    select vin,min(id) id from vehicle_networking.electric_fence where inTime is not null and outTime is null group by vin
    
  • 将读取的电子栅栏信息数据流广播出去

窗口流数据与广播流数据连接

  • 将电子栅栏模型数据流和电子栅栏 获取的<vin,id>流进行关联,并进行 flatMap

  • 实现电子围栏分析结果模型添加 uuid 和 inMysql 字段 —— ElectricFenceModelFunction

    //实现 CoFlatMapFunction<ElectricFenceModel, HashMap<String, Long>, ElectricFenceModel>
    //1.重写flatMap1方法
    //1.1.通过getvin获取配置流中是否存在值
    //2.如果不为 null
    //2.1.设置为当前时间戳
    //2.2.设置库中InMysql是否存在为 true
    //3.否则
    //3.1.设置 uuid 为最大值-当前时间戳
    //3.2 设置库中是否存在为 false
    //4.收集数据
    //5.重写 flatMap2 方法
    //5.1.读取配置数据
    

电子围栏分析结果入库

  • 将电子围栏分析结果数据写入到 mysql 数据库中 —— ElectricFenceMysqlSink

    //继承于 RichSinkFunction<ElectricFenceModel>
    //1. 重写 open 方法,获取参数,创建连接
    //2. 重写 invoke 方法,
    //2.1 出围栏(且能获取到进围栏状态的)则修改进围栏的状态, 否则 进入围栏,转换ElectricFenceModel对象,插入结构数据到电子围栏结果表
    //3. 重写 close 方法
    

测试电子围栏

实时即未来,车联网项目之电子围栏分析【六】相关推荐

  1. 实时即未来,大数据项目车联网之车辆驾驶行程分析【十三】

    文章目录 车辆驾驶行程分析 驾驶行程分析业务逻辑 1 车联网项目数据存储分层设计 2 车辆驾驶行程定义 1 驾驶行程业务简介 2 驾驶行程分析流程 3 驾驶行程分析业务价值 2. 驾驶行程分析任务设置 ...

  2. 实时即未来,大数据项目车联网之项目基石与前瞻【一】

    文章目录 写在前面 车联网项目全新升级 车联网行业背景介绍 车联网技术 汽车行业 新能源汽车 车联网行业技术 车辆网行业产业链与国内知名企业 车联网项目 车联网技术架构和技术选型 车联网项目的架构搭建 ...

  3. b2c项目基础架构分析(一)b2c 大型站点方案简述 已补充名词解释

    b2c项目基础架构分析(一)b2c 大型站点方案简述 已补充名词解释 我最近一直在找适合将来用于公司大型bs,b2b b2c的基础架构. 实际情况是要建立一个bs架构b2b.b2c的网站,当然还包括w ...

  4. b2c项目基础架构分析(二)前端框架 以及补漏的第一篇名词解释

    b2c项目基础架构分析(二)前端框架 以及补漏的第一篇名词解释 继续上篇,上篇里忘记了也很重要的前端部分,今天的网站基本上是以一个启示页,然后少量的整页切换,大量的浏览器后台调用web服务局部.动态更 ...

  5. RFID技术为智能轮胎在未来车联网领域的应用奠定了基础

    4月7日小编向大家公布了工信部7月1日将要实施的关于轮胎用射频识别(RFID)电子标签的4项行业标准,今天向大家分享关于轮胎RFID技术的推广之路. RFID技术的发展,使轮胎拥有"身份证& ...

  6. 中国电子设计自动化(EDA)软件行业未来发展趋势与投资前景分析报告22022-2028年版

    中国电子设计自动化(EDA)软件行业未来发展趋势与投资前景分析报告22022-2028年版 mmmmmm鸿**mmm晟&mmmmm信**mmmmm合&mmmmm研**mmmmmmm究& ...

  7. 实时即未来?一个小微企业心中的流计算

    摘要:本文由墨芷技术团队唐铎老师分享,主要讲述其技术团队内部引入流计算的整个过程,包括最初的决策.期间的取舍以及最终落地,一路走来他们的思考.感悟以及经验分享. 初识 Flink 为什么一定要上 Fl ...

  8. 关于未来车联网的思考与展望

    对于未来车联网的展望   摘要:万物互联的时代,车联网也是至关重要的一环,车联网系统的实现需要依靠强大的通信能力做为依靠和支撑,而5G 技术的逐渐成熟,为车联网提供了低时延.高可靠.大容量的通信设备, ...

  9. 联合国教科文组织联合好未来发布“人工智能与未来学习”项目成果

    为帮助全球各国政府.中小学有效理解并设计AI课程,日前,联合国教科文组织联合好未来正式对外发布"人工智能与未来学习"项目成果<K-12 AI课程:官方认可的AI课程设计指南& ...

最新文章

  1. JauntVR中文版登陆小米商店,首波内容有《五十度黑》
  2. python划分有限元网格_有限元网格划分应该考虑些什么
  3. new ext.toolbar控制按钮间距_按钮规范系列 - 「按钮尺寸」的设计详解
  4. randn函数加噪声_损失函数 (Loss Function)
  5. db2 两个结构相同的表_从两个工作表提取数据记录,并显示相同记录的报告
  6. 单片机c语言控制显示器,单片机实现LCD液晶显示器控制原理..docx
  7. 【转载保存】索引文件锁LockFactory
  8. 注解Annotation的IoC:从@Autowired到@Component
  9. How to: Create and Initialize Trace Listeners
  10. php中提取%3cdiv,cmseasy getshell 0day
  11. mysql 授权 navicat的登录数据库
  12. 前端常用PS技巧总结之更换图片背景颜色
  13. 基础知识之存活探针(Liveness Probe)
  14. 视频网关是什么,视频接入网关技术作用
  15. Gym101194F-Mr. Panda and Fantastic Beasts
  16. 10bit、8bit色彩深度的区别
  17. 【无标题】统计从键盘输入的字符中数字字符的个数,用换行符结束循环-C语言基础
  18. mysql按中文拼音排序_按拼音排序,mysql 按中文拼音顺序排序
  19. 听觉能力类毕业论文文献有哪些?
  20. 数据库设计(一) 需求分析

热门文章

  1. html让视频变形不留黑边,视频去黑边画面不变形|视频无损去黑边 去掉视频黑边且画面比例正常人物不变形...
  2. 青楼残梦-第一次被投诉后感
  3. 我叉,下载个ldc下载半天都搞不定.
  4. 数字世界,企业何以抵御勒索病毒?
  5. css框架bootstrap ie,前端开发必备的10个Bootstrap工具
  6. suma++[代碼分析一]: 主入口visualizer.cpp
  7. 微型计算机第六章课后答案,微型计算机原理及其接口技术第六章部分习题.doc...
  8. vyos my_cli_shell_api 不见了
  9. Java毕设项目大学生创业众筹系统(java+VUE+Mybatis+Maven+Mysql)
  10. 为什么抖音张同学这么火爆?用 Python 分析 1w+条评论数据,我发现了其中的秘密