目录

​​​​​​​物联网设备数据分析

​​​​​​​设备监控数据准备

​​​​​​​创建Topic

​​​​​​​模拟数据

​​​​​​​SQL风格

​​​​​​​DSL风格


物联网设备数据分析

在物联网时代,大量的感知器每天都在收集并产生着涉及各个领域的数据。物联网提供源源不断的数据流,使实时数据分析成为分析数据的理想工具。

模拟一个智能物联网系统的数据统计分析,产生设备数据发送到Kafka,结构化流Structured Streaming实时消费统计。对物联网设备状态信号数据,实时统计分析:

 1)、信号强度大于30的设备;

 2)、各种设备类型的数量;

 3)、各种设备类型的平均信号强度;

​​​​​​​设备监控数据准备

编写程序模拟生成物联网设备监控数据,发送到Kafka Topic中,此处为了演示字段较少,实际生产项目中字段很多。

​​​​​​​创建Topic

启动Kafka Broker服务,创建Topic【search-log-topic】,命令如下所示:

#查看topic信息/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181#删除topic/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic iotTopic#创建topic/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic iotTopic#模拟生产者/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic iotTopic#模拟消费者/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic iotTopic --from-beginning

​​​​​​​模拟数据

模拟设备监控日志数据,字段信息封装到CaseClass样例类【DeviceData】类:

模拟产生日志数据类【MockIotDatas】具体代码如下:

package cn.itcast.structedstreamingimport java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.json4s.jackson.Jsonimport scala.util.Randomobject MockIotDatas {def main(args: Array[String]): Unit = {// 发送Kafka Topicval props = new Properties()props.put("bootstrap.servers", "node1:9092")props.put("acks", "1")props.put("retries", "3")props.put("key.serializer", classOf[StringSerializer].getName)props.put("value.serializer", classOf[StringSerializer].getName)val producer = new KafkaProducer[String, String](props)val deviceTypes = Array("db", "bigdata", "kafka", "route", "bigdata", "db", "bigdata", "bigdata", "bigdata")val random: Random = new Random()while (true) {val index: Int = random.nextInt(deviceTypes.length)val deviceId: String = s"device_${(index + 1) * 10 + random.nextInt(index + 1)}"val deviceType: String = deviceTypes(index)val deviceSignal: Int = 10 + random.nextInt(90)// 模拟构造设备数据val deviceData = DeviceData(deviceId, deviceType, deviceSignal, System.currentTimeMillis())// 转换为JSON字符串val deviceJson: String = new Json(org.json4s.DefaultFormats).write(deviceData)println(deviceJson)Thread.sleep(100 + random.nextInt(500))val record = new ProducerRecord[String, String]("iotTopic", deviceJson)producer.send(record)}// 关闭连接producer.close()}/*** 物联网设备发送状态数据*/case class DeviceData(device: String, //设备标识符IDdeviceType: String, //设备类型,如服务器mysql, redis, kafka或路由器routesignal: Double, //设备信号time: Long //发送数据时间)}

相当于大机房中各个服务器定时发送相关监控数据至Kafka中,服务器部署服务有数据库db、大数据集群bigdata、消息队列kafka及路由器route等等,数据样本:

{"device":"device_50","deviceType":"bigdata","signal":91.0,"time":1590660338429}{"device":"device_20","deviceType":"bigdata","signal":17.0,"time":1590660338790}{"device":"device_32","deviceType":"kafka","signal":93.0,"time":1590660338908}{"device":"device_82","deviceType":"bigdata","signal":72.0,"time":1590660339380}{"device":"device_32","deviceType":"kafka","signal":10.0,"time":1590660339972}{"device":"device_30","deviceType":"kafka","signal":81.0,"time":1590660340442}{"device":"device_32","deviceType":"kafka","signal":29.0,"time":1590660340787}{"device":"device_96","deviceType":"bigdata","signal":18.0,"time":1590660343554}

​​​​​​​SQL风格

按照业务需求,从Kafka消费日志数据,提取字段信息,将DataFrame注册为临时视图,其中使用函数get_json_object提取JSON字符串中字段值,编写SQL执行分析,将最终结果打印控制台

代码如下:

package cn.itcast.structedstreamingimport org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types.{DoubleType, LongType}
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 对物联网设备状态信号数据,实时统计分析,基于SQL编程* 1)、信号强度大于30的设备* 2)、各种设备类型的数量* 3)、各种设备类型的平均信号强度*/
object IotStreamingOnlineSQL {def main(args: Array[String]): Unit = {// 1. 构建SparkSession会话实例对象,设置属性信息val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "3").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import org.apache.spark.sql.functions._import spark.implicits._// 2. 从Kafka读取数据,底层采用New Consumer APIval iotStreamDF: DataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "node1:9092").option("subscribe", "iotTopic")// 设置每批次消费数据最大值.option("maxOffsetsPerTrigger", "100000").load()// 3. 对获取数据进行解析,封装到DeviceData中val etlStreamDF: DataFrame = iotStreamDF// 获取value字段的值,转换为String类型.selectExpr("CAST(value AS STRING)")// 将数据转换Dataset.as[String] // 内部字段名为value// 过滤数据.filter(StringUtils.isNotBlank(_))// 解析JSON数据:{"device":"device_65","deviceType":"db","signal":12.0,"time":1589718910796}.select(get_json_object($"value", "$.device").as("device_id"),get_json_object($"value", "$.deviceType").as("device_type"),get_json_object($"value", "$.signal").cast(DoubleType).as("signal"),get_json_object($"value", "$.time").cast(LongType).as("time"))// 4. 依据业务,分析处理// TODO: signal > 30 所有数据,按照设备类型 分组,统计数量、平均信号强度// 4.1 注册DataFrame为临时视图etlStreamDF.createOrReplaceTempView("t_iots")// 4.2 编写SQL执行查询val resultStreamDF: DataFrame = spark.sql("""|SELECT|  device_type,|  COUNT(device_type) AS count_device,|  ROUND(AVG(signal), 2) AS avg_signal|FROM t_iots|WHERE signal > 30 GROUP BY device_type|""".stripMargin)// 5. 启动流式应用,结果输出控制台val query: StreamingQuery = resultStreamDF.writeStream.outputMode(OutputMode.Complete()).foreachBatch((batchDF: DataFrame, batchId: Long) => {println("===========================================")println(s"BatchId = ${batchId}")println("===========================================")if (!batchDF.isEmpty) {batchDF.coalesce(1).show(20, truncate = false)}}).start()query.awaitTermination()query.stop()}
}

​​​​​​​DSL风格

按照业务需求,从Kafka消费日志数据,基于DataFrame数据结构调用函数分析,代码如下:

package cn.itcast.structedstreamingimport org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types.{DoubleType, LongType}
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 对物联网设备状态信号数据,实时统计分析:* 1)、信号强度大于30的设备* 2)、各种设备类型的数量* 3)、各种设备类型的平均信号强度*/
object IotStreamingOnlineDSL {def main(args: Array[String]): Unit = {// 1. 构建SparkSession会话实例对象,设置属性信息val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "3").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import org.apache.spark.sql.functions._import spark.implicits._// 2. 从Kafka读取数据,底层采用New Consumer APIval iotStreamDF: DataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "node1:9092").option("subscribe", "iotTopic")// 设置每批次消费数据最大值.option("maxOffsetsPerTrigger", "100000").load()// 3. 对获取数据进行解析,封装到DeviceData中val etlStreamDF: DataFrame = iotStreamDF// 获取value字段的值,转换为String类型.selectExpr("CAST(value AS STRING)")// 将数据转换Dataset.as[String] // 内部字段名为value// 过滤数据.filter(StringUtils.isNotBlank(_))// 解析JSON数据:{"device":"device_65","deviceType":"db","signal":12.0,"time":1589718910796}.select(get_json_object($"value", "$.device").as("device_id"),get_json_object($"value", "$.deviceType").as("device_type"),get_json_object($"value", "$.signal").cast(DoubleType).as("signal"),get_json_object($"value", "$.time").cast(LongType).as("time"))// 4. 依据业务,分析处理// TODO: signal > 30 所有数据,按照设备类型 分组,统计数量、平均信号强度val resultStreamDF: DataFrame = etlStreamDF// 信号强度大于10.filter($"signal" > 30)// 按照设备类型 分组.groupBy($"device_type")// 统计数量、评价信号强度.agg(count($"device_type").as("count_device"),round(avg($"signal"), 2).as("avg_signal"))// 5. 启动流式应用,结果输出控制台val query: StreamingQuery = resultStreamDF.writeStream.outputMode(OutputMode.Complete()).format("console").option("numRows", "10").option("truncate", "false").start()query.awaitTermination()query.stop()}
}

2021年大数据Spark(五十一):Structured Streaming 物联网设备数据分析相关推荐

  1. 2021年大数据Spark(十一):应用开发基于IDEA集成环境

    目录 Spark应用开发-基于IDEA 创建工程 WordCount本地运行 WordCount集群运行 注意 修改代码如下 打成jar包 改名 上传jar包 提交到Yarn WordCount-Ja ...

  2. 2021年大数据Kafka(十一):❤️Kafka的消费者负载均衡机制和数据积压问题❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka的消费者负载均衡机制和数据积压问题 一.kafka ...

  3. 2021年大数据HBase(十一):Apache Phoenix的视图操作

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Apache Phoenix的视图操作 一.应用场景 ...

  4. 2021年大数据Hive(十一):Hive调优

    全网最详细的大数据Hive文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Hive调优 一.本地模式 1.空key处理 二.SQL ...

  5. 2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析

    目录 事件时间窗口分析 时间概念 ​​​​​​​event-time ​​​​​​​延迟数据处理 ​​​​​​​延迟数据 ​​​​​​​Watermarking 水位 ​​​​​​​官方案例演示 事件 ...

  6. 2021年大数据Spark(五十三):Structured Streaming Deduplication

    目录 Streaming Deduplication 介绍 需求 ​​​​​​​代码演示 Streaming Deduplication 介绍 在实时流式应用中,最典型的应用场景:网站UV统计. 1: ...

  7. 2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构

    目录 案例一 实时数据ETL架构 准备主题 ​​​​​​​模拟基站日志数据 ​​​​​​​实时增量ETL 案例一 实时数据ETL架构 在实际实时流式项目中,无论使用Storm.SparkStreami ...

  8. 2021年大数据Spark(四十五):Structured Streaming Sources 输入源

    目录 Sources 输入源 Socket数据源-入门案例 需求 编程实现 ​​​​​​​文件数据源-了解 ​​​​​​​需求 ​​​​​​​代码实现 ​​​​​​​Rate source-了解 So ...

  9. 2021年大数据Spark(四十九):Structured Streaming 整合 Kafka

    目录 整合 Kafka 说明 Kafka特定配置 ​​​​​​​KafkaSoure 1.消费一个Topic数据 2.消费多个Topic数据 3.消费通配符匹配Topic数据 ​​​​​​​Kafka ...

最新文章

  1. 写给程序员的 HR 面试指南,助你踢好面试的临门一脚!
  2. hadoop之MapReduce框架TaskTracker端心跳机制分析(源码分析第六篇)
  3. 列表和字典之间的相互转换-Python3
  4. 萧山职称计算机考试培训,浙江萧山2017年职称计算机考试时间安排
  5. SVN(TortoiseSVN)提交时忽略bin跟obj目录
  6. asp.net关于倒出excel文件
  7. 2021年中国带锯机市场趋势报告、技术动态创新及2027年市场预测
  8. 电竞大数据平台 FunData 的系统架构演进
  9. HDU2525 Clone Wars【模拟】
  10. Halcon形态学操作、区域处理相关常用API
  11. 【PC】如何让程序开机自启动/如何打开开机自启动文件夹
  12. Android 经典系统 HTC One 802d国行电信专版/最新Viper2.6.0/永久root/强大的功能、高级设置/纯净省电ROM
  13. jquery实现菜单点击左右滑动效果
  14. Epicor ERP
  15. JAVA学习日志 关于调用方法、生成对象的例子。还是用数字卦程序修改
  16. MMD导入unity中使用
  17. 视频号怎么赚钱?4个赚钱小技巧,实现视频号流量变现!
  18. (持续更新)Ubuntu22.04双系统的安装、扩容、重装及配置
  19. C++笔记8:C++提高编程2:STL---标准模板库
  20. 典当行抵押需要什么资料

热门文章

  1. Java | kotlin 手动注入bean,解决lateinit property loginService has not been initialized异常
  2. 科学处理java.lang.StackOverflowError: null异常
  3. 【CentOS】利用Kubeadm部署Kubernetes (K8s)
  4. .net连接mysql数据_.net连接MYSQL数据库的方法及示例!
  5. 科大奥锐干涉法测微小量实验的数据_光学干涉观测精确丈量宇宙 | 赛先生天文...
  6. 算法设计思想(5)— 递归法
  7. GCC 连接器、链接标准库 gcc -l、链接手动创建库(指定目录的库 gcc -L)
  8. Docker 入门系列(2)- Docker 镜像, 免 sudo 使用 docker 命令、获取查看、修改镜像标签、查找删除创建镜像、导入导出镜像
  9. oracle dba_tables各字段含义
  10. LeetCode简单题之增量元素之间的最大差值