1.业务概况(显示总订单量、订单成功量、总金额、花费时间)
2.业务详细概述(每小时的充值订单量、每小时的充值成功订单量)
3.业务质量(每个省份的充值成功订单量)
4.实时统计每分钟的充值金额和订单量

整体步骤:
提取数据库中存储的偏移量–>广播省份映射关系–>获取kafka的数据–>数据处理(JSON对象解析,省份、时间、结果、费用)
–>计算业务概况(显示总订单量、订单成功量、总金额、花费时间)–>业务概述(每小时的充值总订单量,每小时的成功订单量)
—>业务质量(每个省份的成功订单量)—>实时统计每分钟的充值金额和订单量

下面是代码封装的包


项目需求实现:
1)用flume收集数据,放入到kafka,下面是详细配置。

#定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 描述和配置source组件:r1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/datas/flume
a1.sources.r1.fileHeader = true# 描述和配置sink组件:k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = flumeLogs
a1.sinks.k1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2)用SparkStreaming去消费kafka里面的数据前,做一些Kafka参数的配置以及放入Redis数据库所需要的配置。

(1)在IDEA中配置kafka和Redis相关参数,方便获取kafka里面的数据并且存储到redis里面


import com.typesafe.config.{Config, ConfigFactory}
import org.apache.kafka.common.serialization.StringDeserializerobject AppParams {/*** 解析application.conf配置文件* 加载resource下面的配置文件,默认规则:application.conf->application.json->application.properties*/private lazy val config: Config = ConfigFactory.load()/*** 返回订阅的主题*/val topic = config.getString("kafka.topic").split(",")/*** kafka集群所在的主机和端口*/val borkers = config.getString("kafka.broker.list")/*** 消费者的ID*/val groupId = config.getString("kafka.group.id")/*** kafka的相关参数*/val kafkaParams = Map[String, Object]("bootstrap.servers" -> borkers,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> groupId,"auto.offset.reset" -> "earliest","enable.auto.commit" -> "false")/*** redis服务器地址*/val redisHost = config.getString("redis.host")/*** 将数据写入到哪个库*/val selectDBIndex = config.getInt("redis.db.index")/*** 省份code和省份名称的映射关系*/import scala.collection.JavaConversions._val pCode2PName  = config.getObject("pcode2pname").unwrapped().toMap
}

(2)方便计算订单完成所需要的时间,封装了一个类


import org.apache.commons.lang3.time.FastDateFormatobject CaculateTools {// 非线程安全的//private val format = new SimpleDateFormat("yyyyMMddHHmmssSSS")// 线程安全的DateFormatprivate val format = FastDateFormat.getInstance("yyyyMMddHHmmssSSS")/*** 计算时间差*/def caculateTime(startTime:String,endTime:String):Long = {val start = startTime.substring(0,17)format.parse(endTime).getTime - format.parse(start).getTime}}

(3)做一个Redis池去操作Redis中的数据


import com.alibaba.fastjson.JSON
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.rdd.RDD
import redis.clients.jedis.JedisPoolobject Jpools {private val poolConfig = new GenericObjectPoolConfig()poolConfig.setMaxIdle(5)      //最大的空闲连接数,连接池中最大的空闲连接数,默认是8poolConfig.setMaxTotal(2000)  //只支持最大的连接数,连接池中最大的连接数,默认是8//连接池是私有的不能对外公开访问private lazy val jedisPool = new JedisPool(poolConfig, AppParams.redisHost)def getJedis={val jedis = jedisPool.getResourcejedis.select(AppParams.selectDBIndex)jedis}}

(4)每次放入Redis前需要判断偏移量,防止数据重复以及消耗资源


import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import scalikejdbc.{DB, SQL}
import scalikejdbc.config.DBsobject OffsetManager {DBs.setup()/*** 获取自己存储的偏移量信息*/def getMydbCurrentOffset: Unit ={DB.readOnly(implicit session =>SQL("select * from streaming_offset where groupId=?").bind(AppParams.groupId).map(rs =>(new TopicPartition(rs.string("topicName"),rs.int("partitionId")),rs.long("offset"))).list().apply().toMap)}/*** 持久化存储当前批次的偏移量*/def saveCurrentOffset(offsetRanges: Array[OffsetRange]) = {DB.localTx(implicit session =>{offsetRanges.foreach(or =>{SQL("replace into streaming_offset values (?,?,?,?)").bind(or.topic,or.partition,or.untilOffset,AppParams.groupId).update().apply()})})}
}

(5)设置自己的kafka、mysql(存储偏移量)、redis的配置

kafka.topic = "flumeLog"
kafka.broker.list = "hadoop01:9092,hadoop02:9092,hadoop03:9092"
kafka.group.id = "day2_001"
# MySQL example
db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://localhost/bigdata?characterEncoding=utf-8"
db.default.user="root"
db.default.password="926718"
# redis
redis.host="hadoop02"
redis.db.index=10# 映射配置
pcode2pname {100="北京"200="广东"210="上海"220="天津"230="重庆"240="辽宁"250="江苏"270="湖北"280="四川"290="陕西"311="河北"351="山西"371="河南"431="吉林"451="黑龙江"471="内蒙古"531="山东"551="安徽"571="浙江"591="福建"731="湖南"771="广西"791="江西"851="贵州"871="云南"891="西藏"898="海南"931="甘肃"951="宁夏"971="青海"991="新疆"
}

3)做好一系列配置之后就开始SparkStreaming数据处理的核心
先说明一下日志文件中字段的含义

(1)下面是SparkStreaming核心代码


import com.alibaba.fastjson.JSON
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDDobject KpiTools {/*** 业务概况(总订单量、成功订单量、总金额、花费时间** @param baseData*/def kpi_general(baseData: RDD[(String, String, List[Double], String, String)]): Unit = {baseData.map(tp => (tp._1, tp._3)).reduceByKey((list1, list2) => {//将所有的元素拉链为一个列表之后进行相加计算list1.zip(list2).map(tp => tp._1 + tp._2)}).foreachPartition(partition => {val jedis = Jpools.getJedispartition.foreach(tp => {//所有的数据都计算完成之后,显示在数据库中jedis.hincrBy("A-" + tp._1, "total", tp._2(0).toLong)jedis.hincrBy("A-" + tp._1, "succ", tp._2(1).toLong)jedis.hincrByFloat("A-" + tp._1, "money", tp._2(2))jedis.hincrBy("A-" + tp._1, "cost", tp._2(3).toLong)// key的有效期jedis.expire("A-" + tp._1, 48 * 60 * 60)})jedis.close()})}/*** 业务概述:每小时的充值总订单量,每小时的成功订单量* 日期、时间、LIST(总订单量、成功订单量、充值成功总金额、时长)、** @param baseData*/def kpi_general_hour(baseData: RDD[(String, String, List[Double], String, String)]): Unit = {baseData.map(tp => ((tp._1, tp._2), List(tp._3(0), tp._3(1)))).reduceByKey((list1, list2) => {//将所有的元素拉链为一个列表之后进行相加计算list1.zip(list2).map(tp => tp._1 + tp._2)}).foreachPartition(partition => {val jedis = Jpools.getJedispartition.foreach(tp => {//所有的数据都计算完成之后,显示在数据库中jedis.hincrBy("B-" + tp._1._1, "T:" + tp._1._2, tp._2(0).toLong)jedis.hincrBy("B-" + tp._1._1, "S" + tp._1._2, tp._2(1).toLong)// key的有效期jedis.expire("B-" + tp._1, 48 * 60 * 60)})jedis.close()})}/*** 业务质量* 总的充值成功订单量*/def kpi_quality(baseData: RDD[(String, String, List[Double], String, String)], p2p: Broadcast[Map[String, AnyRef]]) = {baseData.map(tp => ((tp._1,tp._4),tp._3(1))).reduceByKey(_+_).foreachPartition(partition => {val jedis = Jpools.getJedispartition.foreach(tp => {//总的充值成功和失败订单数量jedis.hincrBy("C-" + tp._1._1,p2p.value.getOrElse(tp._1._2,tp._1._2).toString,tp._2.toLong)jedis.expire("C-" + tp._1._1, 48 * 60 * 60)})jedis.close()})}/*** 实时统计每分钟的充值金额和订单量* // (日期, 小时, Kpi(订单,成功订单,订单金额,订单时长),省份Code,分钟数)*/def kpi_realtime_minute(baseData: RDD[(String, String, List[Double], String, String)]) = {baseData.map(tp => ((tp._1,tp._2,tp._5),List(tp._3(1),tp._3(2)))).reduceByKey((list1,list2)=>{list1.zip(list2).map(tp => tp._1+tp._2)}).foreachPartition(partition => {val jedis = Jpools.getJedispartition.foreach(tp => {//每分钟充值成功的笔数和充值金额jedis.hincrBy("D-" + tp._1._1,"C:"+ tp._1._2+tp._1._3,tp._2(0).toLong)jedis.hincrByFloat("D-" + tp._1._1,"M"+tp._1._2+tp._1._3,tp._2(1))jedis.expire("D-" + tp._1._1, 48 * 60 * 60)})jedis.close()})}/*** 整理基础数据*/def baseDataRDD(rdd: RDD[ConsumerRecord[String, String]]): RDD[(String, String, List[Double], String, String)] = {rdd // ConsumerRecord => JSONObject.map(cr => JSON.parseObject(cr.value())) // 过滤出充值通知日志.filter(obj => obj.getString("serviceName").equalsIgnoreCase("reChargeNotifyReq")).map(obj => {// 判断该条日志是否是充值成功的日志val result = obj.getString("bussinessRst")val fee = obj.getDouble("chargefee")// 充值发起时间和结束时间val requestId = obj.getString("requestId")// 数据当前日期val day = requestId.substring(0, 8)val hour = requestId.substring(8, 10)val minute = requestId.substring(10, 12)val receiveTime = obj.getString("receiveNotifyTime")//省份Codeval provinceCode = obj.getString("provinceCode")val costTime = CaculateTools.caculateTime(requestId, receiveTime)val succAndFeeAndTime: (Double, Double, Double) = if (result.equals("0000")) (1, fee, costTime) else (0, 0, 0)// (日期, 小时, Kpi(订单,成功订单,订单金额,订单时长),省份Code,分钟数)(day, hour, List[Double](1, succAndFeeAndTime._1, succAndFeeAndTime._2, succAndFeeAndTime._3), provinceCode, minute)}).cache()}
}

(2)将封装好的方法调用

import cn.sheep.utils.{AppParams, KpiTools, OffsetManager}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 中国移动实时监控平台(优化版)* Created by zhangjingcun on 2018/10/16 16:34.*/
object BootStarpAppV2 {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf()sparkConf.setAppName("中国移动运营实时监控平台-Monitor") //如果在集群上运行的话,需要去掉:sparkConf.setMaster("local[*]")sparkConf.setMaster("local[*]") //将rdd以序列化格式来保存以减少内存的占用//默认采用org.apache.spark.serializer.JavaSerializer//这是最基本的优化sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //rdd压缩sparkConf.set("spark.rdd.compress", "true") //batchSize = partitionNum * 分区数量 * 采样时间sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10000") //优雅的停止sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")val ssc = new StreamingContext(sparkConf, Seconds(2))/*** 提取数据库中存储的偏移量*/val currOffset = OffsetManager.getMydbCurrentOffset/*** 广播省份映射关系*/val pcode2PName: Broadcast[Map[String, AnyRef]] = ssc.sparkContext.broadcast(AppParams.pCode2PName)/** 获取kafka的数据* LocationStrategies:位置策略,如果kafka的broker节点跟Executor在同一台机器上给一种策略,不在一台机器上给另外一种策略* 设定策略后会以最优的策略进行获取数据* 一般在企业中kafka节点跟Executor不会放到一台机器的,原因是kakfa是消息存储的,Executor用来做消息的计算,* 因此计算与存储分开,存储对磁盘要求高,计算对内存、CPU要求高* 如果Executor节点跟Broker节点在一起的话使用PreferBrokers策略,如果不在一起的话使用PreferConsistent策略* 使用PreferConsistent策略的话,将来在kafka中拉取了数据以后尽量将数据分散到所有的Executor上 */val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](AppParams.topic, AppParams.kafkaParams))/*** 数据处理*/stream.foreachRDD(rdd=>{val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRangesval baseData = KpiTools.baseDataRDD(rdd)/*** 计算业务概况*/KpiTools.kpi_general(baseData)KpiTools.kpi_general_hour(baseData)/*** 业务质量*/KpiTools.kpi_quality(baseData, pcode2PName)/*** 实时充值情况分析*/KpiTools.kpi_realtime_minute(baseData)/*** 存储偏移量*/OffsetManager.saveCurrentOffset(offsetRanges)})ssc.start()ssc.awaitTermination()}
}

pom文件

<?xml version="1.0" encoding="UTF-8"?>

4.0.0

<groupId>cn.sheep</groupId>
<artifactId>cmcc_monitor</artifactId>
<version>1.0-SNAPSHOT</version><properties><spark.version>2.2.1</spark.version><mysql.version>5.1.40</mysql.version><jedis.version>2.9.0</jedis.version><config.version>1.3.3</config.version><fastjson.version>1.2.51</fastjson.version><scalikejdbc.version>3.3.1</scalikejdbc.version>
</properties><dependencies><!-- 导入spark streaming依赖包--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version></dependency><!-- 导入streaming kafka依赖包--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency><!-- 导入mysql数据库驱动--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><!-- 导入redis客户端依赖包--><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>${jedis.version}</version></dependency><!-- 导入config配置文件依赖包--><dependency><groupId>com.typesafe</groupId><artifactId>config</artifactId><version>${config.version}</version></dependency><!-- 导入json依赖包--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>org.scalikejdbc</groupId><artifactId>scalikejdbc_2.11</artifactId><version>3.3.1</version></dependency><dependency><groupId>org.scalikejdbc</groupId><artifactId>scalikejdbc-core_2.11</artifactId><version>3.3.1</version></dependency><dependency><groupId>org.scalikejdbc</groupId><artifactId>scalikejdbc-config_2.11</artifactId><version>3.3.1</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>compile</scope></dependency>
</dependencies>

链接:https://pan.baidu.com/s/1kjK9XK0yhbojUUexu3oFXQ
提取码:rrow

大数据实战项目------中国移动运营分析实时监控平台 || 项目需求实现(文章最后有数据文件)相关推荐

  1. 大数据实战项目------中国移动运营分析实时监控平台 || 项目背景

    中国移动运营分析实时监控平台 项目背景 中国移动公司旗下拥有很多的子机构,基本可以按照省份划分. 而各省份旗下的充值机构也非常的多. 目前要想获取整个平台的充值情况,需要先以省为单元,进行省份旗下的机 ...

  2. 大数据·实战个例“宏”分析

    大数据·实战个例"宏"分析 MBA教育体系最成功之处,就在于导入了科学的个案分析. Ps,二战最伟大的技术成功,不是原子弹.导弹.喷气机,而是流水线.流水线提供的生产力,比二战所有 ...

  3. 大数据实战:用户流量分析系统

    ---------------------------------------------------------------------------------------------------- ...

  4. Splunk—云计算大数据时代的超级日志分析和监控利器

          信息科技的不断进步,一方面使得银行业信息和数据逻辑集中程度不断得到提高,另一方面又成为银行业稳健运行的一大安全隐患.Splunk作为智能的IT管理运维平台,能够帮助银行业积极迎接.应对和解 ...

  5. 大数据实战项目--中国移动运行分析

    1.项目背景 中国移动公司旗下拥有很多的子机构,基本可以按照省份划分. 而各省份旗下的充值机构也非常的多. 目前要想获取整个平台的充值情况,需要先以省为单元,进行省份旗下的机构统计,然后由下往上一层一 ...

  6. 大数据实战之用户画像概念、项目概述及环境搭建

    下面跟着我一起来学习大数据获取用户画像: 项目Profile课程安排 : 用户画像概念 1.用户画像概述 1.1.产生背景 早期的用户画像起源于交互设计之父Alan Cooper提出的"Pe ...

  7. 大数据实战项目之电商数仓(一)

    大数据实战项目之电商数仓(一) 项目介绍 数据仓库概念 ​ 数据仓库是为企业所有决策制定过程,提供所有系统数据支持的战略集合.通过对数据仓库中数据的分析,可以帮助企业改进业务流程,控制成本,提高产品质 ...

  8. 大数据实战-callLog项目(通话记录数据分析)之数据生产

    文章目录 前言 生成基础数据 Maven依赖 姓名-手机号列表 时间维度表 自动随机生成 生成时间戳 生成一定范围内的long数据 代码 调用演示 获得随机的时间戳 代码 调用示例 生成一行记录 打包 ...

  9. 《OD大数据实战》驴妈妈旅游网大型离线数据电商分析平台

    一.环境搭建 1. <OD大数据实战>Hadoop伪分布式环境搭建 2. <OD大数据实战>Hive环境搭建 3. <OD大数据实战>Sqoop入门实例 4. &l ...

最新文章

  1. [转]Eclipse中的Web项目自动部署到Tomcat
  2. C#中统计程序运行时间
  3. qt连接错误ip的sqlserver超时时间_参数设置导致请求超时案例
  4. android 跳转到小米手机神隐模式
  5. pandas 第一行_Pandas数据预处理相关经验
  6. eclipse 使用 maven 无法编译 jsp 文件的问题
  7. 领航机器人广告段子_教育机器人广告宣传语_段子网收录最新段子
  8. Matplotlib作业3
  9. (step4.3.1) hdu 1010(Tempter of the Bone——DFS)
  10. WPF自定义控件与样式-自定义按钮(Button)
  11. java学习笔记宋红康版本-01
  12. Linux系统编程:入门篇视频教程-王利涛-专题视频课程
  13. 10款超炫HTML5游戏 附游戏源码
  14. 关于 PDCA 的个人理解
  15. LTE-5G学习笔记17--COMP技术讲解
  16. python中math的ln_Python math库 ln(x)运算的实现及原理
  17. 计算机专业的学生简历范文,计算机专业学生的简历范文精选
  18. 很遗憾,你们的问题我无法回答--Leo谈应届生求职 10
  19. 安卓定时获得wifi强度编程
  20. Windows 是最安全的操作系统

热门文章

  1. Photoshop制作水印文字
  2. c语言写扫雷新手详解
  3. 装修前必看系列|冰箱也有大学问,万元级冰箱选购攻略
  4. 视频会议的昨天,今天和明天
  5. IDEA+Java控制台实现房屋信息管理系统
  6. WebMagic使用阿布云HTTP代理ip
  7. ❤️blotemJ陆小马学习资源分享❤️
  8. Linux系统定时重启开关机如何设置?
  9. 单目图像深度估计 - 应用篇:Learning to be a Depth Camera
  10. Only one ConfirmCallback is supported by each RabbitTemplate 解决办法