作业描述:

针对当前空⽓质量监测数据,环保部门需要根据监测数据实时发布预警信息,需要我们在实时到达的六种污染物监测数据中,根据每⼀种数据的监测值进⾏报警检测。

输⼊数据:“空⽓质量监控数据”,格式如下:

Time, id, PM2.5, PM10, SO2, CO, NO2, O3
例如:2017-10-26 10:10:10.666,1,160,260,400,50,1000,300

输出结果格式如下:

Time | id | Type | Value
⽰例 :Time| id | PM2.5 | 400

报警规则:

  • PM2.5>=150
  • PM10>=350
  • SO2>=800
  • CO>=60
  • NO2>=1200
  • O3>=400

开发环境:

  • Spark Streaming
  • Scala
  • IDEA + Maven
  • Mac

实现思路:

我简单的说一下我的实现思路:在这之前,你必须要知道什么流处理,因为Spark Streaming就是基于流处理的,如果你对流处理还不清楚的,请先百度学习一下什么是流处理。如果你想百度,我简单的解释一下流处理的意思。大概意思就是数据像水一样,是时时流动的,时时产生的;不像HDFS,数据是提前放到文件夹下,然后再去读写。例如sokect中的数据就是这样的。因此,在Spark Streaming中只少需要开启一个线程去检测是否有数据流进来。清楚了流处理方式后,下面就很简单了。使用Spark Streamng开启一个线程去时时检测socket的某个端口(我检测的端口是是9999),然后将获取到的数据转化成DStream对象,在通过FlatMap操作将数据进行切分生成新的DStream,再通过foreachRDD遍历所有切分好的DStream将他们转化成Array类型赋值给AirQuality对象,最后在AirQuality对象中做空气质量的判断。以上就是整体的实现思路了。还是很简单的。

开启scoket端口

nc -lk 9999

下面看一下maven的pom.xml配置:

    <dependencies><dependency><groupId>org.apache.spark</groupId><artifactId> spark-core_2.11</artifactId><version>2.2.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.2.0</version></dependency></dependencies>

下面是具体的实现:

class AirQuality(time:String,id:Int,PM2_5:Int,PM10:Int,SO2:Int,CO:Int,NO2:Int,O3:Int) {def calculateAirQuality:Unit = {if (this.id == 0){return}//空气质量检测if (this.PM2_5 >= 150) {this.printResult("PM2.5",this.PM2_5)}if (this.PM10 >= 350) {this.printResult("PM10",this.PM10)}if(this.SO2 >= 800){this.printResult("SO2",this.SO2)}if(this.CO >= 60){this.printResult("CO",this.CO)}if(this.NO2 >= 1200){this.printResult("NO2",this.NO2)}if(this.O3 >= 400) {this.printResult("O3", this.O3)}}def printResult(key:String,value:Int):Unit = {val result = this.time + " | " + this.id + " | " + key + " | " + valueprintln(result)}
}
object AirQuality {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[2]").setAppName("AirQuality")val ssc = new StreamingContext(conf,Seconds(1))//从socket端口读取数据,返回的是ReceiverInputDStreamval receiverInputStream = ssc.socketTextStream("localhost",9999);//对返回的是ReceiverInputDStream操作,进行切分数据,生成DStreamval words = receiverInputStream.flatMap(line => line.split(","))words.foreachRDD(rdd => {val arr = rdd.collect()if (arr.length > 7) {val airQuality = new AirQuality(arr(0),arr(1).trim.toInt,arr(2).trim.toInt,arr(3).trim.toInt,arr(4).trim.toInt,arr(5).trim.toInt,arr(6).trim.toInt,arr(7).trim.toInt)airQuality.calculateAirQuality}})ssc.start()ssc.awaitTermination()}}

输入数据
2017-10-26 10:10:10.666,1,160,260,400,50,1000,300
2017-10-26 10:10:10.666,1,140,360,400,50,1000,300
2017-10-26 10:10:10.666,1,140,260,900,50,1000,300
2017-10-26 10:10:10.666,1,140,260,400,70,1000,300
2017-10-26 10:10:10.666,1,140,260,400,50,1200,300
2017-10-26 10:10:10.666,1,140,260,400,50,1000,500
2017-10-26 10:10:10.666,1,160,360,900,70,1200,500

输出数据
2017-10-26 10:10:10.666 | 1 | PM2.5 | 160
…….

附上工程github地址

Spark Streaming通过Socket检测空气质量相关推荐

  1. 非暂态计算机可读存储介质是什么,检测空气质量方法及装置、空气净化器控制方法及装置与流程...

    本发明涉及电器技术领域,具体涉及一种检测空气质量的方法及装置.可移动空气净化器的控制方法及装置.可移动空气净化器.非暂态计算机可读存储介质. 背景技术: 随着现在空气污染状况的加剧和消费者对居住环境质 ...

  2. STM32CubMX_MQ135检测空气质量

    一.MQ135简介 MQ135是测量空气污染情况常用的一个传感器,具有代表性,价格低,寿命长,敏感度也OK,主要用于测量空气中二氧化碳,氮氧化物,氨气,酒精,苯类等.这几样气体可以说都属于家用空气污染 ...

  3. DStream实战之Spark Streaming接收socket数据实现WordCount 31

    需求 现在想要通过socket发送数据, 然后Spark Streaming接收数据并且统计socket发送的每个单词的个数. 1. 架构图 2. 实现流程 安装并启动生产者 首先在linux服务器上 ...

  4. Spark Streaming处理Socket流简单实例

    在本文中我将在IDEA工具中开发一个SparkStream程序用于监听本机9999端口所接收的数据 首先,我们将Spark Streaming类的名称以及从StreamingContext进行的一些隐 ...

  5. 【进阶实战】用PaddlePaddle检测空气质量

    In[1] #请先运行此代码块获得代码 !unzip -o /home/aistudio/data/data2365/testfluid.zip -d /home/aistudio/ !NEW_NAM ...

  6. 基于51单片机PM2.5空气质量检测系统(源程序+仿真+原理图+PCB+论文)

    资料编号:208 功能介绍:(全套毕设资料齐全) 本电路是由51单片机为控制核心,另外主要通过5个模块的电路设计实现功能,他们分别是LCD显示模块.粉尘传感器.A/D转换.蜂鸣器电路.LED指示电路. ...

  7. 创新“芯”引擎 | 国民技术N32G457 RT-Thread设计大赛 -基于RT-Thread的远程智能空气质量检测系统

    基于RT-Thread系统的一款远程智能空气质量检测系统:硬件部分由ESP8266WIFI模块,MQ135空气质量检测模块,0.96oled屏幕,N32G457主控板等组成:软件部分实现实时检测空气质 ...

  8. grafana计算不同时间的差值_大数据时代!如何基于Spark Streaming构建实时计算平台...

    随着互联网技术的迅速发展,用户对于数据处理的时效性.准确性与稳定性要求越来越高,如何构建一个稳定易用并提供齐备的监控与预警功能的实时计算平台也成了很多公司一个很大的挑战. 自2015年携程实时计算平台 ...

  9. eolinker开源版5_开源土豪沉金版空气质量检测仪AirWolf

    题主从17年-19年开源了几个空气质量检测仪项目. 17年, AirMini 开源项目:市面上有没有靠谱的pm2.5检测仪?回答 18年,AirDog 空气质量检测仪开源项目 19年,AirWolf ...

最新文章

  1. 分享是程序员的必备素质
  2. Linux grep命令——文本搜索工具
  3. http 协议上传文件multipart form-data boundary 说明--转载
  4. 多媒体技术是指以计算机为手段来获取,计算机应用基础win7课件第八章.ppt
  5. 19、任务十八——事件委托、数组处理
  6. 编写的windows程序,崩溃时产生crash dump文件的办法
  7. 配置环境JDK Tomcat Maven
  8. guid怎么做到唯一_怎么做成为一个好女人呢?
  9. [k8s]kubeadm k8s免费实验平台labs.play-with-k8s.com,k8s在线测试
  10. python代码实现文件复制txt文件_工具类(1.1)
  11. 11个前端开发者必备的网站
  12. 沈伟华:图神经网络的三连问
  13. [20170203]dg磁盘空间不足的处理.txt
  14. 半导体上下游最核心供应商名单(建议收藏)
  15. 怎么做简单版ps碎片飞溅效果
  16. Horizon8桌面虚拟化(一)
  17. 只允许微信浏览器访问
  18. 手动删除oem 13c
  19. justify/align-content/items使用区别
  20. 练习:人人网注册页面

热门文章

  1. 云演CTF——php4fun
  2. 串口通信——蓝牙模块使用(HC-08为例)
  3. Java:浅谈InputStream的close方法
  4. 爬取新浪社会新闻源代码
  5. 场景化学习——适应这个时代的学习方式
  6. whatsup gold snmp安装脚本
  7. Cause: java.lang.IllegalArgumentException
  8. 杭州记忆 | 科达为G20做的三件事
  9. S120怎么样实现本地远程控制切换?
  10. 18.java 容器都有哪些?