spark 车流量项目实战
spark 车流量项目实战
这里目录标题
- spark 车流量项目实战
- 1、车流向项目介绍
- 1.1. 数据采集
- 1.2. 模块介绍
- 1.2.1. 卡扣流量分析模块介绍
- 1.3. 项目架构介绍
- 1.4. 数据介绍
- 1.4.1. 基本概念
- 1.4.2. 表
- 1.5. 需求分析
- 2、数据流程
- 3、spark任务
- 如何将Spark任务提交到集群运行?
- 4、模块功能
- 4.1、卡口流量分析
- 4.1.1. 卡口状态监控
- 4.1.1.1. 统计卡口坏摄像头
- 4.1.1.2. 统计每个区域车流量
- 4.1.1.3. 统计每个区域的摄像头
- 4.1.1.5 卡口流量分析 代码解析
- 4.1.2. 区域车流量Top3及其速度
- 4.1.3. 区域中高低速数量
- 4.1.3. 指定卡口对应卡口车辆轨迹
- 4.2、行车轨迹
- 4.2.1. 车辆行车轨迹
- 4.2.2. 车辆套牌
- 4.2.3. 车辆抽样--蓄水池抽样算法
- 4.2.4. 道路转换率
- 4.3、区域道路流量Top3
- 4.3.1 RDD解决
- 4.3.2. Java连接Hive---SQL解决
- hive中导入模拟数据
- 4.4、Streaming 实时
- 4.4.1. 道路实时拥堵情况 --kafka
- 4.4.2. 动态改变广播变量(布控)
1、车流向项目介绍
1.1. 数据采集
数据从哪儿来?
- 我们知道数据来来源,比如网站,APP或者工业设备(比如卡口拍摄设备),实现实时数据采集,它首先有非常重要的一点就是所谓的埋点,也就是说,埋点,在网站的哪个页面哪些操作发生时,前端的代码比如javascript或者app android/ios,就通过网络请求Ajax; socket向后端的服务器发送日志数据。
- 如果是卡口信息,那么每次拍摄的信息都会传输到服务器端。
- 首先就是说网站或者页面设置埋点,那么就是你要跟前端的开发人员约定好,在哪些页面哪些操作发生的时候,网站的话就通过ajax引擎,APP的话就通过Socket网络请求,向后端的服务器发送指定格式的日志数据。卡口数据的话,是和厂商定制数据格式的,数据以指定的格式向服务器发送实时的数据。
- 接着通过Flume监控指定的文件夹,转移到HDFS里面去,实际大多数是放在Hive中因为Hive还有计算的能力,还有另外一条流程,实时数据,通常都是从分布式消息队列集群中读取的,比如Kafka,实时的log,实时的写入消息队列中,然后再由我们后端实时数据处理程序(storm、spark streaming),实时从kafka中读取数据,log日志
- 数据除了从Flume中来,也有可能直接使用kafka 的producer角色往kafka中直接生产数据。
- 接下来就是大数据实时计算系统,比如说用storm、spark streaming开发的,可以实时的从kafka中拉取数据,然后对实时的数据进行处理和计算,这里可以有非常复杂的业务逻辑,甚至调用复杂的机器学习,数据挖掘,智能推荐的算法!然后实现实时的车辆调度,实时推荐等等。
1.2. 模块介绍
- 卡扣流量分析 Spark Core
- 卡扣车流量转化率 Spark Core
- 各区域车流量最高top5的道路统计 SparkSQL
- 稽查布控,道路实时拥堵统计 SparkStreaming
1.2.1. 卡扣流量分析模块介绍
根据使用者(平台使用者)指定的某些条件,筛选出指定的一批卡扣信息(比如根据区域、时间筛选)
检测卡扣状态,对于筛选出来的所有的卡口(不代表一个摄像头)信息统计
- 卡口正常数
- 异常数
- camera的正常数
- camera的异常数
- camera的详细信息(monitor_id:camera_id)
- 车流量最多的TonN卡扣号,延伸获取每一个卡扣的详细信息(Top5 )
- 随机抽取N个车辆信息,对这些数据可以进行多维度分析(因为随机抽取出来的N个车辆信息可以很权威的代表整个区域的车辆)
- 计算出经常高速通过的TopN卡口 (查看哪些卡扣经常被高速通过,高速,中速,正常,低速 根据三个速度段进行四次排序,高速通过的车辆数相同就比较中速通过的车辆数,以此来推)
1.3. 项目架构介绍
使用架构
J2EE平台,前端页面,在页面中可以指定任务类型,提交任务的参数(比如时间范围,区域设定)平台会接受到用户的提交请求,会调用底层封装的Spark-submit的shell脚本,怎么调用?运行的作业可以获取到用户指定的筛选条件,然后根据筛选条件进行计算。Spark任务的计算结果会写入到数据库中,比如MySQL,Redis等
最后J2EE平台可以通过前端页面,展示结果(表格或者图表的方式展示数据库中的结果)。
1.4. 数据介绍
1.4.1. 基本概念
卡扣号:在一条道路相同位置会有两个卡扣,这两个卡扣的编号是不同的,分别拍摄不同方向的车辆
摄像头编号:每一个卡扣拍摄的是一个方向的车辆,每一个方向都会有多个不同的车道,每一个车道对应一个摄像头,所以卡扣号与摄像头的对应关系是一对多的关系。
1.4.2. 表
monitor_flow_action表 | 监控到的车流信息表 |
---|---|
date | 日期 单位:天 |
monitor_id | 卡口号 |
camera_id | 摄像头编号 |
car | 车牌 |
action_time | 某个摄像头拍摄时间 单位:秒 |
speed | 通过卡扣的速度 |
road_id | 道路id |
area_id | 区域ID |
monitor_camera_info表 | 每个卡扣对应的摄像头编号(标准表) |
---|---|
monitor_id | 卡扣编号 |
camera_id | 摄像头编号 |
具体内容见建表语句。
1.5. 需求分析
- 按条件筛选卡扣信息
- 可以指定 不同的条件,时间范围、区域范围、卡扣号等 可以灵活的分析不同区域的卡扣信息
- 监测卡扣状态
- 对符合条件的卡扣信息,可以动态的检查每一个卡扣的状态,查看卡扣是否正常工作,也可以查看摄像头
- 车流量最多的TonN卡扣
- 查看哪些卡扣的车流量最高,为什么会出现这么高的车流量。分析原因,例如今天出城的车辆非常多,啥原因,今天进城的车辆非常多,啥原因? 集会还是聚集? 这个功能点里面也会拿到具体的车辆的信息,分析一下本地车牌造成的还是外地车牌?
- 在符合条件的卡扣信息中随机抽取N个车辆信息
- 随机抽取N辆车的信息,可以权威的代表整个区域的车辆,这时候可以分析这些车的轨迹,看一下在不同的时间点车辆的流动方向。以便于道路的规划。
- 计算出经常高速通过的TopN卡口
- 统计出是否存在飙车现象,或者经常进行超速行驶,可以在此处安装违章拍摄设备
2、数据流程
数据处理流程:
公司有集群没有数据
分布式爬取数据,多节点爬取数据,一般将数据爬取到flume中,或者将数据直接爬取放入HDFS中。
公司有集群有数据
每天每时每刻在产生数据,数据直接清洗放在HBase或者HDFS中。或者日志数据直接使用flum导入分布式文件中。
一般有了数据之后又分为两个大的方向处理数据:
- 假设数据放在了HDFS集群中之后,一般下一步就要清洗数据,可以将数据通过Hive清洗,当然这里Hive一般使用外表,这样做的目的是可以将相同的数据只在HDFS中存入一份,避免过多的重复数据。清洗完成的数据一般又会放入Hive表中或者以结构化的数据放在HDFS上。得到清洗后的数据后一般会使用MR或者使用Spark来对数据进行分析处理,也可以对清洗后的数据使用SparkSQL来进行处理分析。之后,将分析完成的数据放入数据库中,如Redis,Mysql,Oracle中,供前端查询展示。
- 如果数据放入了flume中,一般将数据sink到kafka中,不同数据的种类放入不同的topic中。然后对打入kafka中的数据进行流式处理,一般可以使用storm或者SparkStreaming对数据进行清洗,分析处理,然后将结果放到数据库中,如Redis,Mysql,Oracle中,以供前端页面来查询展示。
3、spark任务
如何将Spark任务提交到集群运行?
最次也是脚本化执行Spark任务。
平台化提交Spark任务。流程图如下:
submit后先将数据存入Mysql中,task当做唯一主键,这样做是为了简化任务执行失败时,可以直接在数据库中查询之前的提交的业务参数,当任务失败后,下次retry时方便执行。
submit后可以使用java调用liunx系统脚本,通过taskId得到系统中的业务参数数据。
注意: 假如使用tomcat实现平台化,那么tomcat应该部署在客户端。
l java代码中如何执行liunx脚本?
Process proc = Runtime.**getRuntime**().exec(“sh 脚本”);
proc.waitFor();
4、模块功能
4.1、卡口流量分析
- 全部使用SparkCore实现。
4.1.1. 卡口状态监控
4.1.1.1. 统计卡口坏摄像头
def main(args: Array[String]): Unit = {//获取数据源val sparkSession = ContextUtils.getSparkSession("Hello01MonitorState")//读取数据MockDataUtil.mock2view(sparkSession)//------------------------------统计卡口摄像头通过的车辆的合计----------------------------import sparkSession.implicits._//开始读取数据val dataFrame: DataFrame = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-19' ")//开始操作val mcRdd: RDD[(String, Int)] = dataFrame.map(ele => Tuple2(ele.getString(1) + ":" + ele.getString(2), 1)).rdd//开始进行合并val flowRdd: RDD[(String, Int)] = mcRdd.reduceByKey(_ + _)//------------------------------统计卡口所有的摄像头----------------------------val cameraDataFrame = sparkSession.sql("select * from " + MockDataUtil.MONITOR_CAMERA_ACTION)val cameraRdd: RDD[(String, Int)] = cameraDataFrame.map(ele => ((ele.getString(0) + ":" + ele.getString(1)), 1)).rdd//------------------------------合并车流量和摄像头RDD----------------------------val allRDD: RDD[(String, (Option[Int], Int))] = flowRdd.rightOuterJoin(cameraRdd).filter(ele => ele._2._1.isEmpty)allRDD.foreach(println)}
4.1.1.2. 统计每个区域车流量
def main(args: Array[String]): Unit = {//获取数据源val sparkSession = ContextUtils.getSparkSession("Hello02MonitorFlowCount")//读取数据MockDataUtil.mock2view(sparkSession)//------------------------------统计卡口通过的车辆的合计----------------------------import sparkSession.implicits._//开始读取数据val dataFrame: DataFrame = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ")//开始操作val mcRdd: RDD[(String, Int)] = dataFrame.map(ele => Tuple2(ele.getString(1), 1)).rdd//开始进行合并val flowRdd: RDD[(String, Int)] = mcRdd.reduceByKey(_ + _)flowRdd.foreach(println)}
4.1.1.3. 统计每个区域的摄像头
def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("Hello03MonitorStateAnalyze")MockDataUtil.mock2view(sparkSession)//---------------------开始操作车流量信息,假设任务编号为1 日期参数为今天val flowInfo: RDD[(String, String)] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd.map(row => (row.getString(1), row)).groupByKey().map(ele => {val monitorId: String = ele._1val cameraIdSet = new mutable.HashSet[String]()ele._2.foreach(row => cameraIdSet.add(row.getString(2)))//拼接字符串val info: String = Constants.FIELD_MONITOR_ID + "=" + monitorId + "|" + Constants.FIELD_AREA_ID + "=浦东新区|" + Constants.FIELD_CAMERA_IDS + "=" + cameraIdSet.mkString("-") + "|" + Constants.FIELD_CAMERA_COUNT + "=" + cameraIdSet.size + "|" + Constants.FIELD_CAR_COUNT + "=" + ele._2.size//返回结果(monitorId, info)})//-----------------------开始操作摄像头数据val monitorInfo: RDD[(String, String)] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_CAMERA_ACTION).rdd.map(row => (row.getString(0), row.getString(1))).groupByKey().map(ele => {val monitorId: String = ele._1//拼接字符串val info: String = Constants.FIELD_CAMERA_IDS + "=" + ele._2.toList.mkString("-") + "|" + Constants.FIELD_CAMERA_COUNT + "=" + ele._2.size//返回结果(monitorId, info)})//-----------------------将数据Join到一起monitorInfo.leftOuterJoin(flowInfo).foreach(println)}
4.1.1.5 卡口流量分析 代码解析
4.1.2. 区域车流量Top3及其速度
- 区域车流量top3
def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("AreaTop3Road")MockDataUtil.mock2view(sparkSession)//开始计算val fRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rddfRdd.map(row => (row.getString(7) + "_" + row.getString(6) + "&" + (Math.random() * 30 + 10).toInt, 1)).reduceByKey(_ + _).map(ele => {val area_road_random = ele._1val count = ele._2(area_road_random.split("_")(0), area_road_random.split("_")(1).split("&")(0) + "_" + count)}).groupByKey().map(ele => {val map = new mutable.HashMap[String, Int]()ele._2.foreach(e => {val key = e.split("_")(0)val value = e.split("_")(1).toIntmap.put(key, map.get(key).getOrElse(0) + value)})"区划【" + ele._1 + "】车辆最多的三条道路分别为:" + map.toList.sortBy(_._2).takeRight(3).reverse.mkString("-")}).foreach(println)}
- 区域各路速度
def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("AreaTop3Speed")MockDataUtil.mock2view(sparkSession)val sRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rddsRdd.map(e=>{((e.getString(7),e.getString(6)),e.getString(5).toInt)}).groupByKey().map(e=>{val list: List[Int] = e._2.toListval i: Int = list.sum/list.size(e._1._1,(e._1._2,i))}).groupByKey().map(e=>{val tuples = e._2.toList.sortBy(_._2).reverse.take(3)var strBui: StringBuilder = new StringBuilderfor (i <- tuples ){val str: String = i._1 + "-均速度为:" + i._2strBui.append(">>>"+str)}(e._1,strBui)}).foreach(println)}
4.1.3. 区域中高低速数量
object Hello04MonitorTopNSpeed {def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("Hello04MonitorTopNSpeed")MockDataUtil.mock2view(sparkSession)//---------------------开始操作车流量信息,假设任务编号为1 日期参数为今天val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-20' ").rddval monitor2speedRDD: RDD[(String, Iterable[String])] = flowRdd.map(row => (row.getString(1), row.getString(5))).groupByKey()val speedCount2monitorRDD: RDD[(SpeedCount, String)] = monitor2speedRDD.map(ele => {//获取卡口号val monitorId: String = ele._1//声明一个Map[0,60,100,120]var high = 0;var normal = 0;var low = 0;//获取所有的速度的车辆技术ele._2.foreach(speed => {//判断速度if (speed.toInt > 100) {high += 1} else if (speed.toInt > 60) {normal += 1} else {low += 1}})//创建速度对象(SpeedCount(high, normal, low), monitorId)})speedCount2monitorRDD.sortByKey(false).map(x => (x._2, x._1)).foreach(println)}
}case class SpeedCount(high: Int, normal: Int, low: Int) extends Ordered[SpeedCount] with KryoRegistrator {override def compare(that: SpeedCount): Int = {var result = this.high - that.highif (result == 0) {result = this.normal - that.normalif (result == 0) {result = this.low - that.low}}return result}override def registerClasses(kryo: Kryo): Unit = {kryo.register(SpeedCount.getClass)}
}
4.1.3. 指定卡口对应卡口车辆轨迹
def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("Hello04MonitorTopNSpeed")MockDataUtil.mock2view(sparkSession)//获取数据val area01Rdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' and area_id = '01' ").rddval area02Rdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' and area_id = '02' ").rddval area01CarRdd = area01Rdd.map(row => (row.getString(3), row.getString(7))).groupByKey()val area02CarRdd = area02Rdd.map(row => (row.getString(3), row.getString(7))).groupByKey()area01CarRdd.join(area02CarRdd).foreach(println)}
4.2、行车轨迹
4.2.1. 车辆行车轨迹
def main(args: Array[String]): Unit = { val sparkSession = ContextUtils.getSparkSession("AreaCar") MockDataUtil.mock2view(sparkSession)//查询 车子行驶轨迹 跟车分析 val c1Rdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd val carRdd: RDD[(String, StringBuilder)] = c1Rdd.map(e => { (e.getString(3), (e.getString(4), e.getString(6), e.getString(2))) }).groupByKey() .map(e => { val tuples: List[(String, String, String)] = e._2.toList.sortBy(_._1) val list = new StringBuilder for (i <- tuples) { //println(i) val str: String = i._2 + ":" + i._3 list.append(str + "-") } (e._1, list) }) //carRdd.foreach(println)
}
4.2.2. 车辆套牌
def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("AreaCar")MockDataUtil.mock2view(sparkSession)
//假设任何的卡口距离都是 10分钟车程 ,如果同一分钟出现在不同的卡口就怀疑是套牌val deckRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdddeckRdd.map(e => {val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")(e.getString(3), (dateFormat.parse(e.getString(4)),e.getString(1)))}).groupByKey(1).map(e => {val list: List[(util.Date, String)] = e._2.toList.sortBy(x=>x._1)var bool = falsevar d: util.Date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-23 00:00:00")var mid="?"for (i <- list) {if (d.getTime - i._1.getTime < 600000 && i._2!=mid )bool = trued = i._1mid=i._2}(e._1, bool)}).filter(f => f._2).foreach(println)}
4.2.3. 车辆抽样–蓄水池抽样算法
def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("Hello04MonitorTopNSpeed")MockDataUtil.mock2view(sparkSession)//获取数据val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-21' ").rdd//yyyy-MM-dd_HH , rowval hourRDD: RDD[(String, Row)] = flowRdd.map(row => (DateUtils.getDateHour(row.getString(4)), row))//车流量的总数,并进行广播val flowAllCount: Long = hourRDD.count()val broadcastFlowAllCount: Broadcast[Long] = sparkSession.sparkContext.broadcast(flowAllCount)//计算每个小时的比例 并进行广播val hourRatio: collection.Map[String, Double] = hourRDD.countByKey().map(e => {(e._1, e._2 * 1.0 / broadcastFlowAllCount.value)})val broadcastHourRatio: Broadcast[collection.Map[String, Double]] = sparkSession.sparkContext.broadcast(hourRatio)//开始进行抽样val sampleRDD: RDD[Row] = hourRDD.groupByKey().flatMap(ele => {val hour: String = ele._1val list: List[Row] = ele._2.iterator.toList//计算本时段要抽样的数据量val sampleRatio: Double = broadcastHourRatio.value.get(hour).getOrElse(0)val sampleNum: Long = Math.round(sampleRatio * 100)//开始进行取样(蓄水池抽样)val sampleList: ListBuffer[Row] = new ListBuffer[Row]()sampleList.appendAll(list.take(sampleNum.toInt))for (i <- sampleNum until list.size) {//随机生成一个数字val num = (Math.random() * list.size).toIntif (num < sampleNum) {sampleList.update(num, list(i.toInt))}}sampleList})sampleRDD.foreach(println)}
4.2.4. 道路转换率
def main(args: Array[String]): Unit = {//创建会话val sparkSession = ContextUtils.getSparkSession("Hello07MonitorConvertRatio")MockDataUtil.mock2view(sparkSession)//开始计算val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd//计算每个卡口的总通车量val monitorCountMap: collection.Map[String, Long] = flowRdd.map(row => (row.getString(1), row)).countByKey()//计算卡口到卡口的通行率val sortRDD: RDD[(String, List[Row])] = flowRdd.map(row => (row.getString(3), row)).groupByKey().map(ele => (ele._1, ele._2.iterator.toList.sortBy(_.getString(4))))val m2mMap: collection.Map[String, Long] = sortRDD.flatMap(ele => {//存放映射关系val map: mutable.HashMap[String, Int] = mutable.HashMap[String, Int]()val list: List[Row] = ele._2.toListfor (i <- 0 until list.size; j <- i + 1 until list.size) {//拼接Keyval key = list(i).getString(1) + "->" + list(j).getString(1)map.put(key, map.get(key).getOrElse(0) + 1);}//返回结果map.toList}).countByKey()//开始进行计算m2mMap.foreach(ele => {println("卡口[" + ele._1 + "]的转换率为:" + ele._2.toDouble / monitorCountMap.get(ele._1.split("->")(0)).get)})
}
4.3、区域道路流量Top3
- 数据倾斜问题
- key添加后缀扩组,减小数据倾斜
4.3.1 RDD解决
def main(args: Array[String]): Unit = {//创建会话val sparkSession = ContextUtils.getSparkSession("Hello07MonitorConvertRatio")MockDataUtil.mock2view(sparkSession)//开始计算val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd//开始计算flowRdd.map(row => (row.getString(7) + "_" + row.getString(6) + "&" + (Math.random() * 30 + 10).toInt, 1)).reduceByKey(_ + _).map(ele => {val area_road_random = ele._1val count = ele._2(area_road_random.split("_")(0), area_road_random.split("_")(1).split("&")(0) + "_" + count)}).groupByKey().map(ele => {val map = new mutable.HashMap[String, Int]()ele._2.foreach(e => {val key = e.split("_")(0)val value = e.split("_")(1).toIntmap.put(key, map.get(key).getOrElse(0) + value)})"区划【" + ele._1 + "】车辆最多的三条道路分别为:" + map.toList.sortBy(_._2).takeRight(3).reverse.mkString("-")}).foreach(println)
}
4.3.2. Java连接Hive—SQL解决
hive中导入模拟数据
打开hive手动创建database 为traffic 。
生产模拟数据,将模拟数据提交到linux上。
将data2hive代码 打包,提交到客户端运行。
查询hive中数据库为traffic中monitor_flow_action和monitor_camera_info两张数据库表是否导入数据。
若出现mysql数据库乱码的问题
在安装mysql的linux节点路径/etc/my.cnf中加入:
- 在[client]下添加
default-character-set=utf8 - 在[mysqld]下添加
default-character-set=utf8
如图:
运行项目中AreaTop3RoadFlowAnalyze代码
4.4、Streaming 实时
4.4.1. 道路实时拥堵情况 --kafka
- 生产者
public class MockRealTimeData extends Thread {private static final Random random = new Random();private static final String[] locations = new String[]{"鲁", "沪", "沪", "沪", "沪", "京", "京", "深", "京", "京"};private static final String topic = "RoadRealTimeLog";private KafkaProducer<String, String> producer;public MockRealTimeData() {//创建配置文件列表Properties properties = new Properties();// kafka地址,多个地址用逗号分割properties.put("bootstrap.servers", "192.168.100.101:9092,192.168.100.102:9092,192.168.100.103:9092");//设置写出数据的格式properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//写出的应答方式properties.put("acks", "all");//错误重试properties.put("retries", 1);//批量写出properties.put("batch.size", 16384);//创建生产者对象producer = new KafkaProducer<String, String>(properties);}public void run() {while (true) {String date = DateUtils.getTodayDate();String baseActionTime = date + " " + StringUtils.fullFill(random.nextInt(24) + "");baseActionTime = date + " " + StringUtils.fullFill((Integer.parseInt(baseActionTime.split(" ")[1]) + 1) + "");String actionTime = baseActionTime + ":" + StringUtils.fullFill(random.nextInt(60) + "") + ":" + StringUtils.fullFill(random.nextInt(60) + "");String monitorId = StringUtils.fullFill(4, random.nextInt(9) + "");String car = locations[random.nextInt(10)] + (char) (65 + random.nextInt(26)) + StringUtils.fullFill(5, random.nextInt(99999) + "");String speed = random.nextInt(260) + "";String roadId = random.nextInt(50) + 1 + "";String cameraId = StringUtils.fullFill(5, random.nextInt(9999) + "");String areaId = StringUtils.fullFill(2, random.nextInt(8) + "");//封装消息对象ProducerRecord<String, String> banRecordBlue = new ProducerRecord<>(topic, "traffic_" + monitorId, date + "\t" + monitorId + "\t" + cameraId + "\t" + car + "\t" + actionTime + "\t" + speed + "\t" + roadId + "\t" + areaId);//发送消息producer.send(banRecordBlue);try {Thread.sleep(50);} catch (InterruptedException e) {e.printStackTrace();}}}/*** 启动Kafka Producer** @param args*/public static void main(String[] args) {MockRealTimeData mockRealTimeData = new MockRealTimeData();mockRealTimeData.start();}
}
- 消费者
object Hello10RealRoadState {def main(args: Array[String]): Unit = {//创建Confval sparkConf = new SparkConf().setAppName("Hello10RealRoadState").setMaster("local[2]")val streamingContext = new StreamingContext(sparkConf, Seconds(2))//创建Kafka读取数据//配置信息val kafkaParams = Map[String, Object]("bootstrap.servers" -> "node01:9092,node02:9092,node03:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "traffic","auto.offset.reset" -> "earliest","enable.auto.commit" -> (false: java.lang.Boolean))val topics = Array("RoadRealTimeLog")//开始创建Kafkaval linesDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils createDirectStream(streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams))//窗口函数linesDStream.map(_.value()).window(Seconds(10), Seconds(10)).map(ele => (ele.split("\t")(1), ele.split("\t")(5).toInt)).groupByKey().map(ele => {(ele._1, ele._2.toList.sum.toDouble / ele._2.size)}).foreachRDD(rdd => {rdd.foreach(ele => {println(ele._1 + "--" + ele._2)})})//启动任务streamingContext.start()streamingContext.awaitTermination() }
}
4.4.2. 动态改变广播变量(布控)
spark 车流量项目实战相关推荐
- Spark Streaming 项目实战 (4) | 得到最近1小时广告点击量实时统计并写入到redis
大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语-不温不火,本意是希望自己性情温和.作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己 ...
- Spark大型项目实战、Elasticsearch完整视频(百度网盘链接)
Elasticsearch顶尖高手系列课程(基础篇+高手篇) (从零开始精通分布式搜索ElasticSearch) #有效期 2019.11.22-29 链接:https://pan.baidu.co ...
- Spark Core项目实战(1) | 准备数据与计算Top10 热门品类(附完整项目代码及注释)
大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语-不温不火,本意是希望自己性情温和.作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己 ...
- Spark Core项目实战(3) | 页面单跳转化率统计
大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语-不温不火,本意是希望自己性情温和.作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己 ...
- spark 大型项目实战(三十一): --性能调优之在实际项目中使用fastutil优化数据格式
fastutil介绍: fastutil是扩展了Java标准集合框架(Map.List.Set:HashMap.ArrayList.HashSet)的类库,提供了特殊类型的map.set.list和q ...
- spark项目实战:电商分析平台之各个范围Session步长、访问时长占比统计(需求一)
spark项目实战:电商分析平台之各个范围Session步长.访问时长占比统计(需求一) 项目基本信息,架构,需要一览 各个范围Session步长.访问时长占比统计概述 各个范围Session步长.访 ...
- spark项目实战:电商分析平台之项目概述
spark项目实战:电商分析平台之项目概述 目录 项目概述 程序架构分析 需求解析 初始代码和完成代码存放在github上面 1. 项目概述 在访问电商网站时,我们的一些访问行为会产生相应的埋点日志( ...
- hadoop+Spark实战基于大数据技术之电视收视率企业项目实战
课程简介 本课程将通过一个电视收视率项目实战驱动讲解,项目案例是国内的一家广电企业作为非洲国家的一个运营商,以用户收视行为数据作为基础数据,通过对频道和节目的分析,采用多维度统计分析的方法挖掘用户的收 ...
- 基于大数据技术之电视收视率企业项目实战(hadoop+Spark)
基于大数据技术之电视收视率企业项目实战(hadoop+Spark) 网盘地址:https://pan.baidu.com/s/1bEeSB1Y9nmjzctnbJMcBkg 密码:dohg 备用地址( ...
- Spark项目实战—电商用户行为分析
文章目录 一.[SparkCore篇]项目实战-电商用户行为分析 前言:数据准备 1.数据规则如下: 2.详细字段说明: 3.样例类 (一)需求1:TOP10热门品类 1.需求说明 2.代码实现方案1 ...
最新文章
- 做三年地方网站不如别人打一场麻将
- python3 字符串操作总结
- python好学嘛-python语言好学吗
- MVC2验证(自定义异常)
- PHP发送数据到指定方法,php通过header发送自定义数据方法_php技巧
- 《华为美》歌曲洗脑惹争议 华为回应:不知情、未参与 感谢厚爱
- java三步 网易_Java基础:三步学会Java Socket编程·网易学院·教程
- Java继承、封装、多态
- 那些机器学习中无法衍生的强规则变量有吗?
- python中isupper是什么意思_Python string isupper()用法及代码示例
- C语言作业解决,c语言作业9
- VM虚拟机下载安装步骤
- C# amr转mp3 (ffmpeg)
- 3天完成Open CPU开发!7天完成Costdown!满足客户对成本、功耗、安全性等方面的需求!
- ps还原上一步快捷键_photoshop恢复上一步操作的快捷键是什么
- idea软件控制台Console里没有查找快捷键
- android 检查xposed,[原创]利用Xposed躲过Xposed检测
- 从瑞银集团看客户导向型财富管理机构如何从资产配置服务中获利
- mysql存储图片node_Node.js教程 阿里云mysql如何支持存储emoji表情
- 五笔打字:速成手册---半小时学会五笔打字
热门文章
- idea代码格式化代码
- 音频3A测试 AGC自动增益测试
- 从「模拟」理解零知识证明:平行宇宙与时光倒流—— 探索零知识证明系列(二)
- UE4像素流送PixelStreaming
- html5 easyui 布局,Easyui 在面板中创建复杂布局_EasyUI 插件
- 磁盘不见了只剩一个c盘_电脑开机后磁盘都不见了,只剩下C盘了,为什么啊,求大神指教。...
- wincc 7.4 sp1在win10 下安装环境搭建
- java学习之打印菱形和空心菱形
- DCM4CHEE 中worklist 乱码问题 dcmchee 中文乱码
- 如何将Eclipse设置为中文版