Background

  • 不同类型的传感器数据频率不同,低频的有的几分钟一个数,高频的有的一秒几十个数、几百个数。低频数据可以使用传统的mysql进行数据的存储。但数据频率比较高时,对程序的计算能力和数据存储能力要求较高,还好有现成的轮子可以直接拿来使用。
  • 本文介绍高频数据流的实时计算和存储,应用场景选用风电塔筒提升监测为例。
  • 之前的博客中也有探索,这里算是总结下吧,写出来是希望和大家交流,哪里有问题,多多指点哈。
  • 这里给出源码【170-tower-lift-processor】,但是你拿到肯定是起不来的,这里分享出来主要是看大体流程哈。

数据处理流程

private static void execStreamJob() {log.info("****************** 获取 Flink 执行环境");log.info("");// 获取 Flink 任务执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);log.info("****************** 配置 RabbitMQ 数据源并解析");log.info("");// 解析原始数据,把流元素封装成 List<MultiDataEntity>SingleOutputStreamOperator<List<MultiDataEntity>> rawStream = env.setParallelism(1)// 配置 RabbitMQ 数据源.addSource(DefaultConfig.getRMQSource())// 统计数据频率.process(new PrintSpeedFunc())// 过滤掉为 null 和 whitespace 的值.filter(new NullFilter())// 解析数据.process(new RMQPayloadParser());log.info("****************** 原始数据存储【influx】");log.info("");// 存储原始数据到 influxDB 中rawStream.addSink(new RawInfluxSink());log.info("****************** 把需要进行特殊计算的类型分出来成一道侧流【分流】");log.info("");// 其他OutputTag<List<MultiDataEntity>> tag1 = new OutputTag<List<MultiDataEntity>>("stream-other") {};// 应变OutputTag<List<MultiDataEntity>> tag2 = new OutputTag<List<MultiDataEntity>>("stream-yb") {};// 风环境OutputTag<List<MultiDataEntity>> tag3 = new OutputTag<List<MultiDataEntity>>("stream-we") {};// 位移OutputTag<List<MultiDataEntity>> tag4 = new OutputTag<List<MultiDataEntity>>("stream-wy") {};// 振动OutputTag<List<MultiDataEntity>> tag5 = new OutputTag<List<MultiDataEntity>>("stream-zd") {};// 倾角OutputTag<List<MultiDataEntity>> tag6 = new OutputTag<List<MultiDataEntity>>("stream-qj") {};// 拆分流SingleOutputStreamOperator<List<MultiDataEntity>> splitStream = rawStream.process(new TagStreamProcessFunc(tag1, tag2, tag3, tag4, tag5, tag6));// 其它类型流DataStream<List<MultiDataEntity>> otherStream = splitStream.getSideOutput(tag1);// 应变类型流DataStream<List<MultiDataEntity>> ybStream = splitStream.getSideOutput(tag2);// 风环境类型流DataStream<List<MultiDataEntity>> weStream = splitStream.getSideOutput(tag3);// 位移类型流DataStream<List<MultiDataEntity>> wyStream = splitStream.getSideOutput(tag4);// 振动类型流DataStream<List<MultiDataEntity>> zdStream = splitStream.getSideOutput(tag5);// 倾角类型流DataStream<List<MultiDataEntity>> qjStream = splitStream.getSideOutput(tag6);log.info("****************** 振动原始数据推送 mqtt【振动】");log.info("");// 把振动传感器原始监测数据推送到 mqttzdStream.addSink(new RawMqttSink());// 5 秒推送一个最大值和最小值SingleOutputStreamOperator<List<MultiDataEntity>> processedZDStream = zdStream.assignTimestampsAndWatermarks(new OriginFrequencyWatermark()).keyBy(new ProjectIdAndSensorTypeSelector()).window(TumblingEventTimeWindows.of(Time.seconds(5))).process(new MaxAndMinProcessor());// 推送大屏展示processedZDStream.addSink(new MaxAndMinMqttSink());log.info("****************** 应变参考点缓存 redis【应变】");log.info("");// 把应变参考点,通讯编号为31、32的传感器数据缓存到 redis 中,实时更新ybStream.addSink(new YBToRedisSink());// 顶固的减去顶固的参考点(31);地锚的减去地锚的参考点(32);计算所得的数值设置成应变类型的【第6个指标】SingleOutputStreamOperator<List<MultiDataEntity>> ybPFStream = ybStream.process(new YBProcessor());// 其他传感器数据【10秒去除一个最大值和一个最小值然后返回一个瞬时值】只推送 不存储SingleOutputStreamOperator<List<MultiDataEntity>> ybPSStream = ybPFStream.assignTimestampsAndWatermarks(new OriginFrequencyWatermark()).keyBy(new ProjectIdAndSensorTypeSelector()).window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new RemoveMaxAndMinProcessor());// 推送大屏ybPSStream.addSink(new YBMqttSink());// 对应变类型流进行前十秒均值计算,计算所得的数值设置成应变类型的【第5个指标】SingleOutputStreamOperator<List<MultiDataEntity>> processedYBStream = ybPFStream.assignTimestampsAndWatermarks(new OriginFrequencyWatermark()).keyBy(new ProjectIdAndSensorTypeSelector()).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.milliseconds(500))).apply(new TenSecMeanApplyFunc());log.info("****************** 风/叶夹角实时计算【风环境】");log.info("");// 对风环境类型流进行【风/叶夹角】实时计算SingleOutputStreamOperator<List<MultiDataEntity>> processedWEStream = weStream.process(new WEProcessor());log.info("****************** 位移类型数据处理【位移】");log.info("");// 计算当前轮毂实际高度SingleOutputStreamOperator<List<MultiDataEntity>> processedWYStream = wyStream.process(new WYProcessor());// 处理平整度数据(减去对应的复位值)wyStream.assignTimestampsAndWatermarks(new OriginFrequencyWatermark()).keyBy(new ProjectIdAndSensorTypeSelector()).window(TumblingEventTimeWindows.of(Time.seconds(2))).process(new EvennessProcessor());log.info("****************** 倾角数据处理【倾角】");log.info("");// 1秒内去除一个最大值然后返回一个最大值SingleOutputStreamOperator<List<MultiDataEntity>> processedQJStream = qjStream.assignTimestampsAndWatermarks(new OriginFrequencyWatermark()).keyBy(new ProjectIdAndSensorTypeSelector()).window(TumblingEventTimeWindows.of(Time.seconds(1))).process(new RemoveMaxProcessor());// 把处理后的倾角数据缓存到 redis 进行法兰倾斜计算processedQJStream.addSink(new QJRedisSink());log.info("****************** 合并所有侧流,存储并进行阈值告警判断【合并流】");log.info("");// 合并所有侧流DataStream<List<MultiDataEntity>> unionStream = zdStream.union(processedYBStream).union(processedWEStream).union(processedWYStream).union(processedQJStream);log.info("****************** 计算结果存储【influxDB】");log.info("");// 把均值计算结果写入 influxDBunionStream.addSink(new MeanInfluxSink());log.info("****************** 【阈值告警判断】");log.info("");// 先推送实时数据,再进行阈值判断,告警信息推送 mqtt;同时把告警信息存储 redis,定时任务存储 MySQLunionStream.addSink(new WarnDetector());log.info("****************** 任务配置完毕,流计算开始 . . . ");log.info("");try {// 因为Flink是懒加载的,所以必须调用execute方法,上面的代码才会执行env.execute(DefaultConfig.getJobName());} catch (Exception e) {log.error("流计算任务执行失败!");log.info("errMsg: {}", e.getMessage());e.printStackTrace();}}

传感器采集的数据怎么处理、怎么存储【flink】【influxdb】相关推荐

  1. 传感器采集保存数据与前端实时显示动态曲线图实现想法

    传感器1秒钟采集一次并且前端实时动态显示曲线图实现想法 首先,前端采用1.websocket来实现实时同步实现动态曲线变化.                     使用socket比http连接的次 ...

  2. 传感器采集数据 Python123

    下面是一个传感器采集数据文件 sensor-data.txt 的一部分:‪‬‪‬‪‬‪‬‪‬‮‬‭‬‫‬‪‬‪‬‪‬‪‬‪‬‮‬‪‬‫‬‪‬‪‬‪‬‪‬‪‬‮‬‪‬‮‬‪‬‪‬‪‬‪‬‪‬‮‬‫‬‪‬ ...

  3. 给我一个西门子plc采集大数据存储与分析方案

    对于西门子PLC采集大数据存储与分析方案,下面是一个建议: 数据采集: 在PLC中设置数据采集程序,以记录关键数据并定期发送到数据存储仓库. 数据存储: 使用大数据存储技术,例如 Hadoop.Spa ...

  4. Python传感器采集数据文件分析处理实验源码

    前些天发现了一个巨牛的人工智能学习电子书,通俗易懂,风趣幽默,无广告,忍不住分享一下给大家.(点击跳转人工智能学习资料) 一.题目 附件sensor-data.txt是一个传感器采集数据文件,其中,每 ...

  5. android手机传感器坐标系,一种将手机内置传感器采集数据从手机坐标系转换到参考坐标系的处理方法与流程...

    本发明提出了一种将手机内置传感器采集的三轴加速度数据从手机坐标系转换到大地参考坐标系的处理方法,该方法涉及到移动通信.模式识别和情景感知等领域. 背景技术: 目前基于智能手机的行为识别是环境智能中的一 ...

  6. 传感器信号 如何发送到服务器,传感器采集到的数据通过无线传输至监控服务器的四种方式...

    传感器采集到的数据通过无线传输至监控服务器的四种方式 [复制链接] 目前,工业物联网传感器数据采集无线传输至监控服务器比较常用的通讯解决方案主要有GPRS/4G,433MHz和2.4GHz等三种,而以 ...

  7. android surfaceview 大小_Android 使用Camera2 API采集视频数据

    Android 视频数据采集系列的最后一篇出炉了,和前两篇文章想比,这篇文章从系统API层面进行一些探索,涉及到的细节更多.初次接触 Camera2 API 会觉得它的使用有些繁琐,涉及到的类有些多, ...

  8. 全自动振弦监测仪VS101型单通道振弦传感器采集仪工程应用

    VS101型单通道振弦采集仪是专门为振弦式传感器而设计,在精确测量振弦传感器频率的同时,还可以同时采集温度数据,采集的数据可以本地存储,或者通过GSM/GPRS移动网络 进行数据远传. VS101型单 ...

  9. 如何采集工业设备数据?工业数据采集的方法有哪些?

    在21世纪,我们身处的是一个充满数据的世界.对于工业领域,这尤其如此.工业设备数据的采集和分析已经成为提升效率.优化运营.减少浪费和预防设备故障的关键步骤.这篇文章将详细介绍如何采集工业设备数据,以及 ...

最新文章

  1. Linux下批量杀掉筛选进程
  2. pb更新oracle表格,PB自定义retrieve刷新函数、PB导入excel表、打印
  3. IT旅途——程序员面试经验分享
  4. erdas2015几何校正模块在哪_在ERDAS中进行几何校正
  5. 个人对北理工2020级硕士研究生张××一篇学术论文涉嫌抄袭的看法
  6. HDU - 1427 速算24点(dfs)
  7. Visual C++2010的使用
  8. vector的求和用法accumulate
  9. leetcode179. 最大数
  10. C语言scanf函数详解和示例
  11. w10查看端口_win10系统使用dos命令查看端口的解决教程
  12. MATLAB eof用法,经验正交函数分析法(EOF)在matlab上的实现
  13. android 加载第三方so文件,Uni-app 以Module方式开发Android插件,引入第三方资源包so文件,但无法读取...
  14. H5横幅,旗帜飘动动画
  15. MySQL 删除大量数据
  16. 从应用迁移到平台微认证:鲲鹏技术解读
  17. PyObject和PyTypeObject
  18. TADF材料的机制原理;TADF的机理;热活化延迟荧光如何产生?
  19. 使用基于Boost的预处理器元编程实现变长类型列表的参数化
  20. 条码生成软件如何批量生成Code39码

热门文章

  1. linux磁盘快速拷贝,UNIX下用DD做磁盘完整拷贝
  2. 记第一次出差得出的经验
  3. unity:实现八方旅游的动态光影效果
  4. 实验6 存储过程mysql_MySQL数据库实验:任务六 数据库存储过程设计
  5. python手机版iphone-Python实现抢购IPhone手机
  6. 项目实战---模拟凡客网
  7. 程序人生:未来,企业真的只有几个前端工程师吗?
  8. 浏览器好用的插件集合
  9. gateway灰度发布
  10. 绩效考核的5大标准是什么?