先一个一个java程序,读取日志文件中的数据,然后将数据写入到Kafka中,然后写一个SparkSteaming程序,使用直连的方式读取Kafka中的数据,计算如下指标

该文件是一个电商网站某一天用户购买商品的订单成交数据,每一行有多个字段,用空格分割,字段的含义如下
用户ID   ip地址          商品分类   购买明细     商品金额
A        202.106.196.115 手机       iPhone8      8000

0          1                   2        3          4

A 202.106.196.115 手机 iPhone8 8000
B 202.106.0.20 服装 布莱奥尼西服 199
C 202.102.152.3 家具 婴儿床 2000
D 202.96.96.68 家电 电饭锅 1000
F 202.98.0.68 化妆品 迪奥香水 200
H 202.96.75.68 食品 奶粉 600
J 202.97.229.133 图书 Hadoop编程指南 90
A 202.106.196.115 手机 手机壳 200
B 202.106.0.20 手机 iPhone8 8000
C 202.102.152.3 家具 婴儿车 2000
D 202.96.96.68 家具 婴儿车 1000
F 202.98.0.68 化妆品 迪奥香水 200
H 202.96.75.68 食品 婴儿床 600
J 202.97.229.133 图书 spark实战 80

问题1.计算出各个省的成交量总额(结果保存到MySQL中)
问题2.计算每个省城市成交量的top3(结果保存到MySQL中)
问题3.计算每个商品分类的成交总额,并按照从高到低排序(结果保存到MySQL中)
问题4.构建每一个用户的用户画像,就是根据用户购买的具体商品,给用户打上一个标签,为将来的商品推荐系统作数据支撑

说明:如果一个用户购买了一个iPhone8,对应有多个标签:果粉、高端人士、数码一族
请将下面的规则数据保存到MySQL数据库中,并作为标签规则(三个字段分别代表id、商品、对于的标签):

1 iPhone8 果粉
2 iPhone8 高端人士
3 iPhone8 数码一族
4 布莱奥尼西服 高端人士
5 布莱奥尼西服 商务男士
6 婴儿床 育儿中
7 迪奥香水 高端人士
8 迪奥香水 白富美
9 婴儿床 育儿中
10 iPhone8手机壳 果粉
11 iPhone8手机壳 高端人士
12 iPhone8手机壳 数码一族
13 spark实战 IT人士
14 spark实战 屌丝
15 Hadoop编程指南 IT人士
16 Hadoop编程指南 屌丝

用户的行为数据,根据规则打上对应的标签,然后将数据存储到Hbase中,并说明Hbase的注解和列族的设计思想!
由于用户的行为过多,计算过程要对数据进行序列化的压缩,要求使用kryo这种序列化机制,压缩方式自己选择

object OrderCount {def main(args: Array[String]): Unit = {val group = "g1"val conf = new SparkConf().setAppName("OrderCount").setMaster("local[4]")val ssc = new StreamingContext(conf, Duration(5000))val broadcastRef = IPUtils.broadcastIpRules(ssc, "/ip/ip.txt")val topic = "orders"val brokerList = "lj01:9092,lj02:9092,lj03:9092"val zkQuorum = "lj01:2181,lj02:2181,lj03:2181"val topics: Set[String] = Set(topic)val topicDirs = new ZKGroupTopicDirs(group, topic)val zkTopicPath = s"${topicDirs.consumerOffsetDir}"val kafkaParams = Map("metadata.broker.list" -> brokerList,"group.id" -> group,"auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString)val zkClient = new ZkClient(zkQuorum)val children = zkClient.countChildren(zkTopicPath)var kafkaStream: InputDStream[(String, String)] = nullvar fromOffsets: Map[TopicAndPartition, Long] = Map()//如果保存过 offset//注意:偏移量的查询是在Driver完成的if (children > 0) {for (i <- 0 until children) {val partitionOffset = zkClient.readData[String](s"$zkTopicPath/${i}")val tp = TopicAndPartition(topic, i)fromOffsets += (tp -> partitionOffset.toLong)}val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)} else {kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)}var offsetRanges = Array[OffsetRange]()//kafkaStream.foreachRDD里面的业务逻辑是在Driver端执行,RDD在Driver端生成,RDD调算子,算子里得函数的执行是在ExecutorkafkaStream.foreachRDD { kafkaRDD =>if(!kafkaRDD.isEmpty()) {offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRangesval lines: RDD[String] = kafkaRDD.map(_._2)//整理数据val fields: RDD[Array[String]] = lines.map(_.split(" "))//计算成交总金额CalculateUtil.calculateIncome(fields)//计算商品分类金额CalculateUtil.calculateItem(fields)//计算区域成交金额CalculateUtil.calculateZone(fields, broadcastRef)for (o <- offsetRanges) {val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString)}}}ssc.start()ssc.awaitTermination()}
}

实现要在redis设置一个key,变量

object Constant {val TOTAL_INCOME = "TOTAL_INCOME"
}

工具类CalculateUtil

object CalculateUtil {//计算成交总金额def calculateIncome(fields: RDD[Array[String]]) = {//将数据计算后写入到Reidsval priceRDD: RDD[Double] = fields.map(arr => {val price = arr(4).toDoubleprice})//reduce是一个Action,会把结果返回到Driver端//将当前批次的总金额返回了val sum: Double = priceRDD.reduce(_+_)//获取一个jedis连接val conn = JedisConnectionPool.getConnection()//将历史值和当前的值进行累加conn.incrByFloat(Constant.TOTAL_INCOME, sum)//释放连接conn.close()}// 计算分类的成交金额def calculateItem(fields: RDD[Array[String]]) = {val itemAndPrice: RDD[(String, Double)] = fields.map(arr => {//分类val item = arr(2)//金额val parice = arr(4).toDouble(item, parice)})//按商品分类进行聚合val reduced: RDD[(String, Double)] = itemAndPrice.reduceByKey(_+_)//将当前批次的数据累加到Redis中//foreachPartition是一个Action//现在这种方式,jeids的连接是在哪一端创建的(Driver)//在Driver端拿Jedis连接不好reduced.foreachPartition(part => {//获取一个Jedis连接//这个连接其实是在Executor中的获取的//JedisConnectionPool在一个Executor进程中有几个实例(单例)val conn = JedisConnectionPool.getConnection()part.foreach(t => {//一个连接更新多条数据conn.incrByFloat(t._1, t._2)})//将当前分区中的数据跟新完在关闭连接conn.close()})}//根据Ip计算归属地def calculateZone(fields: RDD[Array[String]], broadcastRef: Broadcast[Array[(Long, Long, String)]]) = {val provinceAndPrice: RDD[(String, Double)] = fields.map(arr => {val ip = arr(1)val price = arr(4).toDoubleval ipNum = MyUtils.ip2Long(ip)//在Executor中获取到广播的全部规则val allRules: Array[(Long, Long, String)] = broadcastRef.value//二分法查找val index = MyUtils.binarySearch(allRules, ipNum)var province = "未知"if (index != -1) {province = allRules(index)._3}//省份,订单金额(province, price)})//按省份进行聚合val reduced: RDD[(String, Double)] = provinceAndPrice.reduceByKey(_+_)//将数据跟新到Redisreduced.foreachPartition(part => {val conn = JedisConnectionPool.getConnection()part.foreach(t => {conn.incrByFloat(t._1, t._2)})conn.close()})}
}

即Myutilt

object MyUtils {def ip2Long(ip: String): Long = {val fragments = ip.split("[.]")var ipNum = 0Lfor (i <- 0 until fragments.length){ipNum =  fragments(i).toLong | ipNum << 8L}ipNum}def readRules(path: String): Array[(Long, Long, String)] = {//读取ip规则val bf: BufferedSource = Source.fromFile(path)val lines: Iterator[String] = bf.getLines()//对ip规则进行整理,并放入到内存val rules: Array[(Long, Long, String)] = lines.map(line => {val fileds = line.split("[|]")val startNum = fileds(2).toLongval endNum = fileds(3).toLongval province = fileds(6)(startNum, endNum, province)}).toArrayrules}def binarySearch(lines: Array[(Long, Long, String)], ip: Long) : Int = {var low = 0var high = lines.length - 1while (low <= high) {val middle = (low + high) / 2if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))return middleif (ip < lines(middle)._1)high = middle - 1else {low = middle + 1}}-1}def data2MySQL(it: Iterator[(String, Int)]): Unit = {//一个迭代器代表一个分区,分区中有多条数据//先获得一个JDBC连接val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "123568")//将数据通过Connection写入到数据库val pstm: PreparedStatement = conn.prepareStatement("INSERT INTO access_log VALUES (?, ?)")//将分区中的数据一条一条写入到MySQL中it.foreach(tp => {pstm.setString(1, tp._1)pstm.setInt(2, tp._2)pstm.executeUpdate()})//将分区中的数据全部写完之后,在关闭连接if(pstm != null) {pstm.close()}if (conn != null) {conn.close()}}def main(args: Array[String]): Unit = {//数据是在内存中val rules: Array[(Long, Long, String)] = readRules("/Users/zx/Desktop/ip/ip.txt")//将ip地址转换成十进制val ipNum = ip2Long("114.215.43.42")//查找val index = binarySearch(rules, ipNum)//根据脚本到rules中查找对应的数据val tp = rules(index)val province = tp._3println(province)}
}

Spark系列十七:经典案列使用直连的方式,Kafka,SparkSteaming,Redis相关推荐

  1. 项目案列:银行ATM存款机系统(笔记经典案列)

    第一步数据库设计: 1.明确了解银行业务需求,围绕银行的需求进行分析,确认银行ATM存款机有紧密关系的实体,并得到每个实体的必要属性 2.绘制E-R图 描述: 使用数据库设计工具,把设计数据库第一步的 ...

  2. Java经典案列——模拟双色球(含完整代码)

    今天给大家分享一个Java经典案例--模拟双色球,通过这个案例对之前所学习的循环语句.数组.方法做一个简单的总结,下面先来看题目: 题目 在双色球系统中,其投注号码由六个红球和一个蓝球组成,其中红球从 ...

  3. 卷积神经网络概述以及简单案列

    感谢阅读 CNN概述 卷积层 卷积计算 Padding stride PyTorch 卷积层 API 池化层 经典案列图像分类 CIFAR10 数据集 模型构建 模型训练 预测 CNN概述 卷积神经网 ...

  4. 每日一课 | List和 tuple的13个经典使用案列(完结篇)

    06 大家好,我是营长,上期营长分享了"List和 tuple的13个经典使用案列"的一部分,这期营长接着为大家分享. 本期分享内容:List和 tuple的13个经典使用案列(完 ...

  5. lora模块 A39系列 远距离通信 Arduino 串口案列程序

    lora模块 A39系列 远距离通信 Arduino 串口案列程序 一.器材 Lora通信模块 A39C-T400A22D1a *2 USB转TTL *1 Arduino *1 面包板 *1 杜邦线 ...

  6. Java基础再回首之设计模式系列①-----StrategyPattern 策略者模式(案列教程,附带demo)

    一.前言 自己学java学的也熟悉了.踏入工作的不归路之后,身为一个Android开发工程师,不仅要Java基础扎实,还要对其的设计模式有所掌握,最近在群听到<Head First>这本书 ...

  7. Spark 系列(一)—— Spark 简介

    Spark 系列(一)-- Spark 简介 一.简介 Spark 于 2009 年诞生于加州大学伯克利分校 AMPLab,2013 年被捐赠给 Apache 软件基金会,2014 年 2 月成为 A ...

  8. Web 前端开发精华文章集锦(jQuery、HTML5、CSS3)【系列十七】

    <Web 前端开发精华文章推荐>2013年第五期(总第十七期)和大家见面了.梦想天空博客关注 前端开发 技术,分享各种增强网站用户体验的 jQuery 插件,展示前沿的 HTML5 和 C ...

  9. 深入继承之抽象类和接口综合分析及完整案列解说(一)

    首先感谢园里的一位前辈anytao.cnblogs.com ,他对面向抽象类和接口编程作出比较完善的总结.下面的总结是直接Copy他的原话. 一.相同点 ● 都不能被直接实例化,都可以通过继承实现其抽 ...

最新文章

  1. 执行在一行中组合多个Linux命令
  2. Spring Cloud构建微服务架构:服务消费(Ribbon)【Dalston版】
  3. HTML5 CSS3的新交互特性
  4. 一个基于Mule的企业服务总线的案例(关于JMS)
  5. new arraylist内存_Java内存泄漏分析工具Memory Analyzer Tool
  6. java soap envelope_如何在SOAP请求中关闭Envelope和Body
  7. 玻璃体浑浊的分子原理
  8. 如何找到SAP CRM WebClient UI error message的来源
  9. React 学习笔记 —— Ref Hook
  10. 03. 数组中重复的数字
  11. Android应用删除顶部标题栏
  12. python Django音乐推荐系统(课设、毕设、学习)
  13. After Effects - Bodymovin 插件验货及感受
  14. vue+阿里的G2图表-antv+折线图
  15. 在word中插入参考文献角标
  16. css-绘制平行四边形
  17. 股权、期权和原始股傻傻分不清楚?一文帮你安排的明明白白!
  18. 作家天地杂志作家天地杂志社作家天地编辑部2022年第23期目录
  19. 傻傻分不清?云存储、云计算与分布式存储、分布式计算是一回事吗?
  20. 【HTML基础】第一课、Web前端的概述

热门文章

  1. 独家 | 机器学习解释模型:黑盒VS白盒(附资料链接)
  2. 千万别小看一个面相好的女人!
  3. 面临困难不知道如何抉择怎么办,《大话西游之大圣娶亲》观后感
  4. 网站流量统计(免费的)
  5. 语句摘抄——第20周
  6. android webview使用html5input id=input type=file/ 上传相册、拍照照片
  7. 常见的GC算法(GC的背景与原理)
  8. pycharm激活2099年方法
  9. 策略模式、观察者模式、状态模式原理及实例
  10. 数组最大值和最小值的求法