spark 车流量项目实战

这里目录标题

  • spark 车流量项目实战
    • 1、车流向项目介绍
      • 1.1. 数据采集
      • 1.2. 模块介绍
        • 1.2.1. 卡扣流量分析模块介绍
      • 1.3. 项目架构介绍
      • 1.4. 数据介绍
        • 1.4.1. 基本概念
        • 1.4.2. 表
      • 1.5. 需求分析
    • 2、数据流程
    • 3、spark任务
      • 如何将Spark任务提交到集群运行?
    • 4、模块功能
      • 4.1、卡口流量分析
        • 4.1.1. 卡口状态监控
          • 4.1.1.1. 统计卡口坏摄像头
          • 4.1.1.2. 统计每个区域车流量
          • 4.1.1.3. 统计每个区域的摄像头
          • 4.1.1.5 卡口流量分析 代码解析
        • 4.1.2. 区域车流量Top3及其速度
        • 4.1.3. 区域中高低速数量
        • 4.1.3. 指定卡口对应卡口车辆轨迹
      • 4.2、行车轨迹
        • 4.2.1. 车辆行车轨迹
        • 4.2.2. 车辆套牌
        • 4.2.3. 车辆抽样--蓄水池抽样算法
        • 4.2.4. 道路转换率
      • 4.3、区域道路流量Top3
        • 4.3.1 RDD解决
        • 4.3.2. Java连接Hive---SQL解决
      • hive中导入模拟数据
      • 4.4、Streaming 实时
        • 4.4.1. 道路实时拥堵情况 --kafka
        • 4.4.2. 动态改变广播变量(布控)

1、车流向项目介绍

1.1. 数据采集

数据从哪儿来?

  1. 我们知道数据来来源,比如网站,APP或者工业设备(比如卡口拍摄设备)实现实时数据采集,它首先有非常重要的一点就是所谓的埋点,也就是说,埋点,在网站的哪个页面哪些操作发生时,前端的代码比如javascript或者app android/ios,就通过网络请求Ajax; socket向后端的服务器发送日志数据。
  2. 如果是卡口信息,那么每次拍摄的信息都会传输到服务器端。
  3. 首先就是说网站或者页面设置埋点,那么就是你要跟前端的开发人员约定好,在哪些页面哪些操作发生的时候,网站的话就通过ajax引擎,APP的话就通过Socket网络请求,向后端的服务器发送指定格式的日志数据。卡口数据的话,是和厂商定制数据格式的,数据以指定的格式向服务器发送实时的数据。
  4. 接着通过Flume监控指定的文件夹,转移到HDFS里面去,实际大多数是放在Hive中因为Hive还有计算的能力,还有另外一条流程,实时数据,通常都是从分布式消息队列集群中读取的,比如Kafka,实时的log,实时的写入消息队列中,然后再由我们后端实时数据处理程序(storm、spark streaming),实时从kafka中读取数据,log日志
  5. 数据除了从Flume中来,也有可能直接使用kafka 的producer角色往kafka中直接生产数据
  6. 接下来就是大数据实时计算系统,比如说用storm、spark streaming开发的,可以实时的从kafka中拉取数据,然后对实时的数据进行处理和计算,这里可以有非常复杂的业务逻辑,甚至调用复杂的机器学习,数据挖掘,智能推荐的算法!然后实现实时的车辆调度,实时推荐等等。

1.2. 模块介绍

  • 卡扣流量分析 Spark Core
  • 卡扣车流量转化率 Spark Core
  • 各区域车流量最高top5的道路统计 SparkSQL
  • 稽查布控,道路实时拥堵统计 SparkStreaming

1.2.1. 卡扣流量分析模块介绍

根据使用者(平台使用者)指定的某些条件,筛选出指定的一批卡扣信息(比如根据区域、时间筛选)

检测卡扣状态,对于筛选出来的所有的卡口(不代表一个摄像头)信息统计

  1. 卡口正常数
  2. 异常数
  3. camera的正常数
  4. camera的异常数
  5. camera的详细信息(monitor_id:camera_id)
  6. 车流量最多的TonN卡扣号,延伸获取每一个卡扣的详细信息(Top5 )
  7. 随机抽取N个车辆信息,对这些数据可以进行多维度分析(因为随机抽取出来的N个车辆信息可以很权威的代表整个区域的车辆)
  8. 计算出经常高速通过的TopN卡口 (查看哪些卡扣经常被高速通过,高速,中速,正常,低速 根据三个速度段进行四次排序,高速通过的车辆数相同就比较中速通过的车辆数,以此来推)

1.3. 项目架构介绍

使用架构

J2EE平台,前端页面,在页面中可以指定任务类型,提交任务的参数(比如时间范围,区域设定)平台会接受到用户的提交请求,会调用底层封装的Spark-submit的shell脚本,怎么调用?运行的作业可以获取到用户指定的筛选条件,然后根据筛选条件进行计算。Spark任务的计算结果会写入到数据库中,比如MySQL,Redis等
最后J2EE平台可以通过前端页面,展示结果(表格或者图表的方式展示数据库中的结果)。

1.4. 数据介绍

1.4.1. 基本概念

卡扣号:在一条道路相同位置会有两个卡扣,这两个卡扣的编号是不同的,分别拍摄不同方向的车辆

摄像头编号:每一个卡扣拍摄的是一个方向的车辆,每一个方向都会有多个不同的车道,每一个车道对应一个摄像头,所以卡扣号与摄像头的对应关系是一对多的关系。

1.4.2. 表

monitor_flow_action表 监控到的车流信息表
date 日期 单位:天
monitor_id 卡口号
camera_id 摄像头编号
car 车牌
action_time 某个摄像头拍摄时间 单位:秒
speed 通过卡扣的速度
road_id 道路id
area_id 区域ID
monitor_camera_info表 每个卡扣对应的摄像头编号(标准表)
monitor_id 卡扣编号
camera_id 摄像头编号

具体内容见建表语句。

1.5. 需求分析

  • 按条件筛选卡扣信息

    • 可以指定 不同的条件,时间范围、区域范围、卡扣号等 可以灵活的分析不同区域的卡扣信息
  • 监测卡扣状态
    • 对符合条件的卡扣信息,可以动态的检查每一个卡扣的状态,查看卡扣是否正常工作,也可以查看摄像头
  • 车流量最多的TonN卡扣
    • 查看哪些卡扣的车流量最高,为什么会出现这么高的车流量。分析原因,例如今天出城的车辆非常多,啥原因,今天进城的车辆非常多,啥原因? 集会还是聚集? 这个功能点里面也会拿到具体的车辆的信息,分析一下本地车牌造成的还是外地车牌?
  • 在符合条件的卡扣信息中随机抽取N个车辆信息
    • 随机抽取N辆车的信息,可以权威的代表整个区域的车辆,这时候可以分析这些车的轨迹,看一下在不同的时间点车辆的流动方向。以便于道路的规划。
  • 计算出经常高速通过的TopN卡口
    • 统计出是否存在飙车现象,或者经常进行超速行驶,可以在此处安装违章拍摄设备

2、数据流程

数据处理流程:

公司有集群没有数据
分布式爬取数据,多节点爬取数据,一般将数据爬取到flume中,或者将数据直接爬取放入HDFS中。
公司有集群有数据
每天每时每刻在产生数据,数据直接清洗放在HBase或者HDFS中。或者日志数据直接使用flum导入分布式文件中。

一般有了数据之后又分为两个大的方向处理数据:

  1. 假设数据放在了HDFS集群中之后,一般下一步就要清洗数据,可以将数据通过Hive清洗,当然这里Hive一般使用外表,这样做的目的是可以将相同的数据只在HDFS中存入一份,避免过多的重复数据。清洗完成的数据一般又会放入Hive表中或者以结构化的数据放在HDFS上。得到清洗后的数据后一般会使用MR或者使用Spark来对数据进行分析处理,也可以对清洗后的数据使用SparkSQL来进行处理分析。之后,将分析完成的数据放入数据库中,如Redis,Mysql,Oracle中,供前端查询展示。
  2. 如果数据放入了flume中,一般将数据sink到kafka中,不同数据的种类放入不同的topic中。然后对打入kafka中的数据进行流式处理,一般可以使用storm或者SparkStreaming对数据进行清洗,分析处理,然后将结果放到数据库中,如Redis,Mysql,Oracle中,以供前端页面来查询展示。

3、spark任务

如何将Spark任务提交到集群运行?

最次也是脚本化执行Spark任务。

平台化提交Spark任务。流程图如下:

submit后先将数据存入Mysql中,task当做唯一主键,这样做是为了简化任务执行失败时,可以直接在数据库中查询之前的提交的业务参数,当任务失败后,下次retry时方便执行。

submit后可以使用java调用liunx系统脚本,通过taskId得到系统中的业务参数数据。

注意: 假如使用tomcat实现平台化,那么tomcat应该部署在客户端。

l java代码中如何执行liunx脚本?

Process proc = Runtime.**getRuntime**().exec(“sh 脚本”);
proc.waitFor();

4、模块功能

4.1、卡口流量分析

  • 全部使用SparkCore实现。

4.1.1. 卡口状态监控

4.1.1.1. 统计卡口坏摄像头
  def main(args: Array[String]): Unit = {//获取数据源val sparkSession = ContextUtils.getSparkSession("Hello01MonitorState")//读取数据MockDataUtil.mock2view(sparkSession)//------------------------------统计卡口摄像头通过的车辆的合计----------------------------import sparkSession.implicits._//开始读取数据val dataFrame: DataFrame = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-19' ")//开始操作val mcRdd: RDD[(String, Int)] = dataFrame.map(ele => Tuple2(ele.getString(1) + ":" + ele.getString(2), 1)).rdd//开始进行合并val flowRdd: RDD[(String, Int)] = mcRdd.reduceByKey(_ + _)//------------------------------统计卡口所有的摄像头----------------------------val cameraDataFrame = sparkSession.sql("select * from " + MockDataUtil.MONITOR_CAMERA_ACTION)val cameraRdd: RDD[(String, Int)] = cameraDataFrame.map(ele => ((ele.getString(0) + ":" + ele.getString(1)), 1)).rdd//------------------------------合并车流量和摄像头RDD----------------------------val allRDD: RDD[(String, (Option[Int], Int))] = flowRdd.rightOuterJoin(cameraRdd).filter(ele => ele._2._1.isEmpty)allRDD.foreach(println)}

4.1.1.2. 统计每个区域车流量
  def main(args: Array[String]): Unit = {//获取数据源val sparkSession = ContextUtils.getSparkSession("Hello02MonitorFlowCount")//读取数据MockDataUtil.mock2view(sparkSession)//------------------------------统计卡口通过的车辆的合计----------------------------import sparkSession.implicits._//开始读取数据val dataFrame: DataFrame = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ")//开始操作val mcRdd: RDD[(String, Int)] = dataFrame.map(ele => Tuple2(ele.getString(1), 1)).rdd//开始进行合并val flowRdd: RDD[(String, Int)] = mcRdd.reduceByKey(_ + _)flowRdd.foreach(println)}

4.1.1.3. 统计每个区域的摄像头
  def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("Hello03MonitorStateAnalyze")MockDataUtil.mock2view(sparkSession)//---------------------开始操作车流量信息,假设任务编号为1 日期参数为今天val flowInfo: RDD[(String, String)] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd.map(row => (row.getString(1), row)).groupByKey().map(ele => {val monitorId: String = ele._1val cameraIdSet = new mutable.HashSet[String]()ele._2.foreach(row => cameraIdSet.add(row.getString(2)))//拼接字符串val info: String = Constants.FIELD_MONITOR_ID + "=" + monitorId + "|" + Constants.FIELD_AREA_ID + "=浦东新区|" + Constants.FIELD_CAMERA_IDS + "=" + cameraIdSet.mkString("-") + "|" + Constants.FIELD_CAMERA_COUNT + "=" + cameraIdSet.size + "|" + Constants.FIELD_CAR_COUNT + "=" + ele._2.size//返回结果(monitorId, info)})//-----------------------开始操作摄像头数据val monitorInfo: RDD[(String, String)] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_CAMERA_ACTION).rdd.map(row => (row.getString(0), row.getString(1))).groupByKey().map(ele => {val monitorId: String = ele._1//拼接字符串val info: String = Constants.FIELD_CAMERA_IDS + "=" + ele._2.toList.mkString("-") + "|" + Constants.FIELD_CAMERA_COUNT + "=" + ele._2.size//返回结果(monitorId, info)})//-----------------------将数据Join到一起monitorInfo.leftOuterJoin(flowInfo).foreach(println)}

4.1.1.5 卡口流量分析 代码解析

4.1.2. 区域车流量Top3及其速度

  • 区域车流量top3
  def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("AreaTop3Road")MockDataUtil.mock2view(sparkSession)//开始计算val fRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rddfRdd.map(row => (row.getString(7) + "_" + row.getString(6) + "&" + (Math.random() * 30 + 10).toInt, 1)).reduceByKey(_ + _).map(ele => {val area_road_random = ele._1val count = ele._2(area_road_random.split("_")(0), area_road_random.split("_")(1).split("&")(0) + "_" + count)}).groupByKey().map(ele => {val map = new mutable.HashMap[String, Int]()ele._2.foreach(e => {val key = e.split("_")(0)val value = e.split("_")(1).toIntmap.put(key, map.get(key).getOrElse(0) + value)})"区划【" + ele._1 + "】车辆最多的三条道路分别为:" + map.toList.sortBy(_._2).takeRight(3).reverse.mkString("-")}).foreach(println)}

  • 区域各路速度
  def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("AreaTop3Speed")MockDataUtil.mock2view(sparkSession)val sRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rddsRdd.map(e=>{((e.getString(7),e.getString(6)),e.getString(5).toInt)}).groupByKey().map(e=>{val list: List[Int] = e._2.toListval i: Int = list.sum/list.size(e._1._1,(e._1._2,i))}).groupByKey().map(e=>{val tuples = e._2.toList.sortBy(_._2).reverse.take(3)var strBui: StringBuilder = new StringBuilderfor (i <- tuples ){val str: String = i._1 + "-均速度为:" + i._2strBui.append(">>>"+str)}(e._1,strBui)}).foreach(println)}

4.1.3. 区域中高低速数量

object Hello04MonitorTopNSpeed {def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("Hello04MonitorTopNSpeed")MockDataUtil.mock2view(sparkSession)//---------------------开始操作车流量信息,假设任务编号为1 日期参数为今天val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-20' ").rddval monitor2speedRDD: RDD[(String, Iterable[String])] = flowRdd.map(row => (row.getString(1), row.getString(5))).groupByKey()val speedCount2monitorRDD: RDD[(SpeedCount, String)] = monitor2speedRDD.map(ele => {//获取卡口号val monitorId: String = ele._1//声明一个Map[0,60,100,120]var high = 0;var normal = 0;var low = 0;//获取所有的速度的车辆技术ele._2.foreach(speed => {//判断速度if (speed.toInt > 100) {high += 1} else if (speed.toInt > 60) {normal += 1} else {low += 1}})//创建速度对象(SpeedCount(high, normal, low), monitorId)})speedCount2monitorRDD.sortByKey(false).map(x => (x._2, x._1)).foreach(println)}
}case class SpeedCount(high: Int, normal: Int, low: Int) extends Ordered[SpeedCount] with KryoRegistrator {override def compare(that: SpeedCount): Int = {var result = this.high - that.highif (result == 0) {result = this.normal - that.normalif (result == 0) {result = this.low - that.low}}return result}override def registerClasses(kryo: Kryo): Unit = {kryo.register(SpeedCount.getClass)}
}

4.1.3. 指定卡口对应卡口车辆轨迹

def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("Hello04MonitorTopNSpeed")MockDataUtil.mock2view(sparkSession)//获取数据val area01Rdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' and area_id = '01' ").rddval area02Rdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' and area_id = '02' ").rddval area01CarRdd = area01Rdd.map(row => (row.getString(3), row.getString(7))).groupByKey()val area02CarRdd = area02Rdd.map(row => (row.getString(3), row.getString(7))).groupByKey()area01CarRdd.join(area02CarRdd).foreach(println)}

4.2、行车轨迹

4.2.1. 车辆行车轨迹

 def main(args: Array[String]): Unit = {    val sparkSession = ContextUtils.getSparkSession("AreaCar")    MockDataUtil.mock2view(sparkSession)//查询 车子行驶轨迹 跟车分析    val c1Rdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd    val carRdd: RDD[(String, StringBuilder)] = c1Rdd.map(e => {      (e.getString(3), (e.getString(4), e.getString(6), e.getString(2)))    }).groupByKey()      .map(e => {        val tuples: List[(String, String, String)] = e._2.toList.sortBy(_._1)        val list = new StringBuilder        for (i <- tuples) {          //println(i)          val str: String = i._2 + ":" + i._3          list.append(str + "-")        }        (e._1, list)      })    //carRdd.foreach(println)
}

4.2.2. 车辆套牌

  def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("AreaCar")MockDataUtil.mock2view(sparkSession)
//假设任何的卡口距离都是 10分钟车程 ,如果同一分钟出现在不同的卡口就怀疑是套牌val deckRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdddeckRdd.map(e => {val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")(e.getString(3), (dateFormat.parse(e.getString(4)),e.getString(1)))}).groupByKey(1).map(e => {val list: List[(util.Date, String)] = e._2.toList.sortBy(x=>x._1)var bool = falsevar d: util.Date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-23 00:00:00")var mid="?"for (i <- list) {if (d.getTime - i._1.getTime < 600000 && i._2!=mid )bool = trued = i._1mid=i._2}(e._1, bool)}).filter(f => f._2).foreach(println)}

4.2.3. 车辆抽样–蓄水池抽样算法

    def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("Hello04MonitorTopNSpeed")MockDataUtil.mock2view(sparkSession)//获取数据val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-21' ").rdd//yyyy-MM-dd_HH , rowval hourRDD: RDD[(String, Row)] = flowRdd.map(row => (DateUtils.getDateHour(row.getString(4)), row))//车流量的总数,并进行广播val flowAllCount: Long = hourRDD.count()val broadcastFlowAllCount: Broadcast[Long] = sparkSession.sparkContext.broadcast(flowAllCount)//计算每个小时的比例 并进行广播val hourRatio: collection.Map[String, Double] = hourRDD.countByKey().map(e => {(e._1, e._2 * 1.0 / broadcastFlowAllCount.value)})val broadcastHourRatio: Broadcast[collection.Map[String, Double]] = sparkSession.sparkContext.broadcast(hourRatio)//开始进行抽样val sampleRDD: RDD[Row] = hourRDD.groupByKey().flatMap(ele => {val hour: String = ele._1val list: List[Row] = ele._2.iterator.toList//计算本时段要抽样的数据量val sampleRatio: Double = broadcastHourRatio.value.get(hour).getOrElse(0)val sampleNum: Long = Math.round(sampleRatio * 100)//开始进行取样(蓄水池抽样)val sampleList: ListBuffer[Row] = new ListBuffer[Row]()sampleList.appendAll(list.take(sampleNum.toInt))for (i <- sampleNum until list.size) {//随机生成一个数字val num = (Math.random() * list.size).toIntif (num < sampleNum) {sampleList.update(num, list(i.toInt))}}sampleList})sampleRDD.foreach(println)}

4.2.4. 道路转换率

  def main(args: Array[String]): Unit = {//创建会话val sparkSession = ContextUtils.getSparkSession("Hello07MonitorConvertRatio")MockDataUtil.mock2view(sparkSession)//开始计算val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd//计算每个卡口的总通车量val monitorCountMap: collection.Map[String, Long] = flowRdd.map(row => (row.getString(1), row)).countByKey()//计算卡口到卡口的通行率val sortRDD: RDD[(String, List[Row])] = flowRdd.map(row => (row.getString(3), row)).groupByKey().map(ele => (ele._1, ele._2.iterator.toList.sortBy(_.getString(4))))val m2mMap: collection.Map[String, Long] = sortRDD.flatMap(ele => {//存放映射关系val map: mutable.HashMap[String, Int] = mutable.HashMap[String, Int]()val list: List[Row] = ele._2.toListfor (i <- 0 until list.size; j <- i + 1 until list.size) {//拼接Keyval key = list(i).getString(1) + "->" + list(j).getString(1)map.put(key, map.get(key).getOrElse(0) + 1);}//返回结果map.toList}).countByKey()//开始进行计算m2mMap.foreach(ele => {println("卡口[" + ele._1 + "]的转换率为:" + ele._2.toDouble / monitorCountMap.get(ele._1.split("->")(0)).get)})
}

4.3、区域道路流量Top3

  • 数据倾斜问题

    • key添加后缀扩组,减小数据倾斜

4.3.1 RDD解决

  def main(args: Array[String]): Unit = {//创建会话val sparkSession = ContextUtils.getSparkSession("Hello07MonitorConvertRatio")MockDataUtil.mock2view(sparkSession)//开始计算val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd//开始计算flowRdd.map(row => (row.getString(7) + "_" + row.getString(6) + "&" + (Math.random() * 30 + 10).toInt, 1)).reduceByKey(_ + _).map(ele => {val area_road_random = ele._1val count = ele._2(area_road_random.split("_")(0), area_road_random.split("_")(1).split("&")(0) + "_" + count)}).groupByKey().map(ele => {val map = new mutable.HashMap[String, Int]()ele._2.foreach(e => {val key = e.split("_")(0)val value = e.split("_")(1).toIntmap.put(key, map.get(key).getOrElse(0) + value)})"区划【" + ele._1 + "】车辆最多的三条道路分别为:" + map.toList.sortBy(_._2).takeRight(3).reverse.mkString("-")}).foreach(println)
}

4.3.2. Java连接Hive—SQL解决

  • hive中导入模拟数据

  1. 打开hive手动创建database 为traffic 。

  2. 生产模拟数据,将模拟数据提交到linux上。

  3. 将data2hive代码 打包,提交到客户端运行。

  4. 查询hive中数据库为traffic中monitor_flow_action和monitor_camera_info两张数据库表是否导入数据。

若出现mysql数据库乱码的问题

在安装mysql的linux节点路径/etc/my.cnf中加入:

  • 在[client]下添加
    default-character-set=utf8
  • 在[mysqld]下添加
    default-character-set=utf8

如图:

运行项目中AreaTop3RoadFlowAnalyze代码

4.4、Streaming 实时

4.4.1. 道路实时拥堵情况 --kafka

  • 生产者
public class MockRealTimeData extends Thread {private static final Random random = new Random();private static final String[] locations = new String[]{"鲁", "沪", "沪", "沪", "沪", "京", "京", "深", "京", "京"};private static final String topic = "RoadRealTimeLog";private KafkaProducer<String, String> producer;public MockRealTimeData() {//创建配置文件列表Properties properties = new Properties();// kafka地址,多个地址用逗号分割properties.put("bootstrap.servers", "192.168.100.101:9092,192.168.100.102:9092,192.168.100.103:9092");//设置写出数据的格式properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//写出的应答方式properties.put("acks", "all");//错误重试properties.put("retries", 1);//批量写出properties.put("batch.size", 16384);//创建生产者对象producer = new KafkaProducer<String, String>(properties);}public void run() {while (true) {String date = DateUtils.getTodayDate();String baseActionTime = date + " " + StringUtils.fullFill(random.nextInt(24) + "");baseActionTime = date + " " + StringUtils.fullFill((Integer.parseInt(baseActionTime.split(" ")[1]) + 1) + "");String actionTime = baseActionTime + ":" + StringUtils.fullFill(random.nextInt(60) + "") + ":" + StringUtils.fullFill(random.nextInt(60) + "");String monitorId = StringUtils.fullFill(4, random.nextInt(9) + "");String car = locations[random.nextInt(10)] + (char) (65 + random.nextInt(26)) + StringUtils.fullFill(5, random.nextInt(99999) + "");String speed = random.nextInt(260) + "";String roadId = random.nextInt(50) + 1 + "";String cameraId = StringUtils.fullFill(5, random.nextInt(9999) + "");String areaId = StringUtils.fullFill(2, random.nextInt(8) + "");//封装消息对象ProducerRecord<String, String> banRecordBlue = new ProducerRecord<>(topic, "traffic_" + monitorId, date + "\t" + monitorId + "\t" + cameraId + "\t" + car + "\t" + actionTime + "\t" + speed + "\t" + roadId + "\t" + areaId);//发送消息producer.send(banRecordBlue);try {Thread.sleep(50);} catch (InterruptedException e) {e.printStackTrace();}}}/*** 启动Kafka Producer** @param args*/public static void main(String[] args) {MockRealTimeData mockRealTimeData = new MockRealTimeData();mockRealTimeData.start();}
}
  • 消费者
object Hello10RealRoadState {def main(args: Array[String]): Unit = {//创建Confval sparkConf = new SparkConf().setAppName("Hello10RealRoadState").setMaster("local[2]")val streamingContext = new StreamingContext(sparkConf, Seconds(2))//创建Kafka读取数据//配置信息val kafkaParams = Map[String, Object]("bootstrap.servers" -> "node01:9092,node02:9092,node03:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "traffic","auto.offset.reset" -> "earliest","enable.auto.commit" -> (false: java.lang.Boolean))val topics = Array("RoadRealTimeLog")//开始创建Kafkaval linesDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils createDirectStream(streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams))//窗口函数linesDStream.map(_.value()).window(Seconds(10), Seconds(10)).map(ele => (ele.split("\t")(1), ele.split("\t")(5).toInt)).groupByKey().map(ele => {(ele._1, ele._2.toList.sum.toDouble / ele._2.size)}).foreachRDD(rdd => {rdd.foreach(ele => {println(ele._1 + "--" + ele._2)})})//启动任务streamingContext.start()streamingContext.awaitTermination()  }
}

4.4.2. 动态改变广播变量(布控)

spark 车流量项目实战相关推荐

  1. Spark Streaming 项目实战 (4) | 得到最近1小时广告点击量实时统计并写入到redis

      大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语-不温不火,本意是希望自己性情温和.作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己 ...

  2. Spark大型项目实战、Elasticsearch完整视频(百度网盘链接)

    Elasticsearch顶尖高手系列课程(基础篇+高手篇) (从零开始精通分布式搜索ElasticSearch) #有效期 2019.11.22-29 链接:https://pan.baidu.co ...

  3. Spark Core项目实战(1) | 准备数据与计算Top10 热门品类(附完整项目代码及注释)

      大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语-不温不火,本意是希望自己性情温和.作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己 ...

  4. Spark Core项目实战(3) | 页面单跳转化率统计

      大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语-不温不火,本意是希望自己性情温和.作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己 ...

  5. spark 大型项目实战(三十一): --性能调优之在实际项目中使用fastutil优化数据格式

    fastutil介绍: fastutil是扩展了Java标准集合框架(Map.List.Set:HashMap.ArrayList.HashSet)的类库,提供了特殊类型的map.set.list和q ...

  6. spark项目实战:电商分析平台之各个范围Session步长、访问时长占比统计(需求一)

    spark项目实战:电商分析平台之各个范围Session步长.访问时长占比统计(需求一) 项目基本信息,架构,需要一览 各个范围Session步长.访问时长占比统计概述 各个范围Session步长.访 ...

  7. spark项目实战:电商分析平台之项目概述

    spark项目实战:电商分析平台之项目概述 目录 项目概述 程序架构分析 需求解析 初始代码和完成代码存放在github上面 1. 项目概述 在访问电商网站时,我们的一些访问行为会产生相应的埋点日志( ...

  8. hadoop+Spark实战基于大数据技术之电视收视率企业项目实战

    课程简介 本课程将通过一个电视收视率项目实战驱动讲解,项目案例是国内的一家广电企业作为非洲国家的一个运营商,以用户收视行为数据作为基础数据,通过对频道和节目的分析,采用多维度统计分析的方法挖掘用户的收 ...

  9. 基于大数据技术之电视收视率企业项目实战(hadoop+Spark)

    基于大数据技术之电视收视率企业项目实战(hadoop+Spark) 网盘地址:https://pan.baidu.com/s/1bEeSB1Y9nmjzctnbJMcBkg 密码:dohg 备用地址( ...

  10. Spark项目实战—电商用户行为分析

    文章目录 一.[SparkCore篇]项目实战-电商用户行为分析 前言:数据准备 1.数据规则如下: 2.详细字段说明: 3.样例类 (一)需求1:TOP10热门品类 1.需求说明 2.代码实现方案1 ...

最新文章

  1. 做三年地方网站不如别人打一场麻将
  2. python3 字符串操作总结
  3. python好学嘛-python语言好学吗
  4. MVC2验证(自定义异常)
  5. PHP发送数据到指定方法,php通过header发送自定义数据方法_php技巧
  6. 《华为美》歌曲洗脑惹争议 华为回应:不知情、未参与 感谢厚爱
  7. java三步 网易_Java基础:三步学会Java Socket编程·网易学院·教程
  8. Java继承、封装、多态
  9. 那些机器学习中无法衍生的强规则变量有吗?
  10. python中isupper是什么意思_Python string isupper()用法及代码示例
  11. C语言作业解决,c语言作业9
  12. VM虚拟机下载安装步骤
  13. C# amr转mp3 (ffmpeg)
  14. 3天完成Open CPU开发!7天完成Costdown!满足客户对成本、功耗、安全性等方面的需求!
  15. ps还原上一步快捷键_photoshop恢复上一步操作的快捷键是什么
  16. idea软件控制台Console里没有查找快捷键
  17. android 检查xposed,[原创]利用Xposed躲过Xposed检测
  18. 从瑞银集团看客户导向型财富管理机构如何从资产配置服务中获利
  19. mysql存储图片node_Node.js教程 阿里云mysql如何支持存储emoji表情
  20. 五笔打字:速成手册---半小时学会五笔打字

热门文章

  1. idea代码格式化代码
  2. 音频3A测试 AGC自动增益测试
  3. 从「模拟」理解零知识证明:平行宇宙与时光倒流—— 探索零知识证明系列(二)
  4. UE4像素流送PixelStreaming
  5. html5 easyui 布局,Easyui 在面板中创建复杂布局_EasyUI 插件
  6. 磁盘不见了只剩一个c盘_电脑开机后磁盘都不见了,只剩下C盘了,为什么啊,求大神指教。...
  7. wincc 7.4 sp1在win10 下安装环境搭建
  8. java学习之打印菱形和空心菱形
  9. DCM4CHEE 中worklist 乱码问题 dcmchee 中文乱码
  10. 如何将Eclipse设置为中文版