一、项目说明

1、需求

实时更新每个用户走的总步数;

每隔5s统计一次,包括某个用户新统计时的时间、所在地点、新增步数;

这里为了方便只将每个用户以及实时更新的步数总和两个维度保存到redis数据库中;

2、业务流程

首先造一些模拟数据实时传入kafka队列,然后sparkStreaming从kafka实时读取这些模拟数据并做相关分析,最终将分析结果存入redis;

3、大数据组件

kafka: kafka_2.10-0.10.2.1

spark: spark-2.2.0-bin-hadoop2.7

redis: redis-3.0.0

4、编译工具

IDEA: IntelliJ IDEA 2018.3.2 x64

二、实战代码

1、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.cn</groupId><artifactId>sparkSysLearn</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.8</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>2.11.8</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>2.11.8</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.2.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.2.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.2.1</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.2.1</version></dependency><dependency><!-- Spark Streaming Kafka --><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_2.10</artifactId><version>1.6.3</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.2.1</version><scope>provided</scope></dependency><dependency><groupId>org.gavaghan</groupId><artifactId>geodesy</artifactId><version>1.1.3</version></dependency><dependency><groupId>com.github.scopt</groupId><artifactId>scopt_2.11</artifactId><version>3.7.0</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.2.4</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version></dependency><!-- https://mvnrepository.com/artifact/org.codehaus.jettison/jettison --><dependency><groupId>org.codehaus.jettison</groupId><artifactId>jettison</artifactId><version>1.1</version></dependency><!-- https://mvnrepository.com/artifact/net.sf.json-lib/json-lib --><dependency><groupId>net.sf.json-lib</groupId><artifactId>json-lib</artifactId><version>2.4</version><classifier>jdk15</classifier></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.4.2</version></dependency></dependencies><build><finalName>telecomeAnalysis-1.0.0</finalName><pluginManagement><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.0</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.12.4</version></plugin></plugins></pluginManagement><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><!-- <descriptor>src/main/resources/assembly.xml</descriptor> --><appendAssemblyId>false</appendAssemblyId></configuration></plugin><!-- 拷贝依赖的jar包到lib目录 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-dependency-plugin</artifactId><executions><execution><id>copy</id><phase>package</phase><goals><goal>copy-dependencies</goal></goals><configuration><outputDirectory>${project.build.directory}/lib</outputDirectory></configuration></execution></executions></plugin><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build>
</project>

2、模拟数据

package com.cn.utilimport java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.codehaus.jettison.json.JSONObjectimport scala.util.Random/*** 编写一个提交数据到kafka集群的producer* 模拟场景:* 统计一些用户实时步行的总步数,每隔5s统计一次,包括某个用户新统计时的时间、所在地点、新增步数;*/
object KafkaEventProducer {//用户private val users = Array("zhangSan", "liSi","wangWu", "xiaoQiang","zhangFei", "liuBei","guanYu", "maChao","caoCao", "guanYu")private var pointer = -1//随机获得用户def getUser(): String = {pointer = (pointer + 1) % users.lengthusers(pointer)}//获取新增步数val random = new Random()def getNewStepNum(): Int = {random.nextInt(users.length)}//获取统计时间def getTime(): Long = {System.currentTimeMillis()}//获取行走地点val walkPlace = Array("操场南门", "操场东门", "操场北门", "操场西门", "操场东南门", "操场西北门", "操场西南门", "操场东南北门")def getWalkPlace(): String = {walkPlace(random.nextInt(walkPlace.length))}def main(args: Array[String]): Unit = {val topic = "topic_walkCount"val brokers = "master:6667,slaves1:6667,slaves2:6667"//设置属性,配置val props = new Properties()props.setProperty("bootstrap.servers", brokers)props.setProperty("metadata.broker.list", brokers)props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")//生成producer对象val producer = new KafkaProducer[String, String](props)//传输数据while (true) {val event = new JSONObject()event.put("user", getUser()).put("count_time", getTime()).put("walk_place", getWalkPlace()).put("new_walkNum", getNewStepNum())println(event.toString())//发送数据producer.send(new ProducerRecord[String, String](topic, event.toString))Thread.sleep(5000)}}
}

3、redis工具类

package com.cn.utilimport redis.clients.jedis.JedisPoolobject RedisUtils {private val host = "master"private val port = 6379//private val poolConfig = new GenericObjectPoolConfig()lazy val pool = new JedisPool(host, port)//关闭lazy val hooks = new Thread() {override def run(): Unit = {println("Execute hook thread: " + this)pool.destroy()}}
}

4、业务处理代码

package com.cn.sparkStreamingimport com.cn.util.RedisUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.codehaus.jettison.json.JSONObject/*** 统计一些用户实时步行的总步数,每隔5s统计一次,包括某个用户新统计时的时间、所在地点、新增步数;* 将每个用户以及实时更新的步数总和保存到redis数据库中;*/
object kafka2sparkStreaming2redis {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("kafka2sparkStreaming2redis").setMaster("local[1]")//.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")//设置流数据每批的时间间隔为2sval ssc = new StreamingContext(conf, Seconds(2))//控制日志输出级别ssc.sparkContext.setLogLevel("WARN") //WARN,INFO,DEBUGssc.checkpoint("checkpoint")val topic = "topic_walkCount"val groupId = "t03"val kafkaParams = Map[String, Object]("bootstrap.servers" -> "master:6667,slaves1:6667,slaves2:6667","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> groupId,"auto.offset.reset" -> "earliest", // 初次启动从最开始的位置开始消费"enable.auto.commit" -> (false: java.lang.Boolean) // 自动提交设置为 false)val topics = Array(topic)val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent, //均匀分发到executorConsumerStrategies.Subscribe[String, String](topics, kafkaParams))val dbIndex = 3;stream.foreachRDD(rdd => {// 获取每一个分区的消费的偏移量val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd.foreachPartition(partitions => {partitions.foreach(records => {val record = new JSONObject(records.value())val user = record.getString("user")val countTime = record.getLong("count_time")val walkPlace = record.getString("walk_place")val newWalkNum = record.getInt("new_walkNum")//获取redis对象val jedis = RedisUtils.pool.getResource//redis密码jedis.auth("123456")//选择数据库jedis.select(dbIndex)val count = jedis.hincrBy("user_walknum", user, newWalkNum)println(count)RedisUtils.pool.returnResource(jedis)})})// 手动提交偏移量stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)})ssc.start()ssc.awaitTermination()}
}

三、运行及结果展示

1、模拟数据展示

log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
{"user":"zhangSan","count_time":1582689943955,"walk_place":"操场北门","new_walkNum":5}
{"user":"liSi","count_time":1582689956236,"walk_place":"操场东门","new_walkNum":0}
{"user":"wangWu","count_time":1582689961236,"walk_place":"操场东南北门","new_walkNum":2}
{"user":"xiaoQiang","count_time":1582689966239,"walk_place":"操场东门","new_walkNum":6}
{"user":"zhangFei","count_time":1582689971240,"walk_place":"操场东门","new_walkNum":8}
{"user":"liuBei","count_time":1582689976240,"walk_place":"操场西南门","new_walkNum":5}
{"user":"guanYu","count_time":1582689981240,"walk_place":"操场东南门","new_walkNum":9}
{"user":"maChao","count_time":1582689986240,"walk_place":"操场北门","new_walkNum":6}
{"user":"caoCao","count_time":1582689991245,"walk_place":"操场东南北门","new_walkNum":2}
{"user":"guanYu","count_time":1582689996245,"walk_place":"操场西门","new_walkNum":0}
{"user":"zhangSan","count_time":1582690001246,"walk_place":"操场东门","new_walkNum":3}
{"user":"liSi","count_time":1582690006247,"walk_place":"操场北门","new_walkNum":2}

2、redis存储展示

sparkStreaming+kafka+redis小项目实战相关推荐

  1. android简单app实例_Android安卓小项目实战视频教程集锦

    Android安卓小项目实战视频教程,点击进入视频教程: 一.安卓项目视频教程: 1蓝牙聊天APP介绍-分步骤介绍一个简单安卓蓝牙APP的开发过程 - 西瓜视频 2蓝牙聊天开发流程-分步骤介绍一个简单 ...

  2. CSS样式小项目实战 - 网页变色小按钮

    小项目练手实战 - 变色小按钮 [背景分析] 为了满足用户体验,提高项目网页的视觉冲击力,各大网站上都有一些有颜色的按钮,当鼠标划上去的时候会变色,让用户的体验非常好.为了满足用户需求,同时让代码效率 ...

  3. Redis高级项目实战,西安java程序员工资

    一面问题:MySQL+Redis+Kafka+线程+算法 mysql知道哪些存储引擎,它们的区别 mysql索引在什么情况下会失效 mysql在项目中的优化场景,慢查询解决等 mysql有什么索引,索 ...

  4. oracle 小项目实战总结

    说明:钓鱼君昨天在网上找到一份oracle项目实战的文档,粗略看了一下大致内容,感觉自己很多知识不够扎实,便跟着文档敲了一遍,目前除了机械性代码没有实现外,主要涉及知识:创建表空间.创建用户.给用户赋 ...

  5. Redis高级项目实战,java截取两个字符串中间的字符串

    我听到的一些发声 你们赚的钱已经可以了: 我一个发小是做土木工程的,上海大学博士,参与很多著名建筑的工程,但是从薪资上看,还不如一些稍微像样的公司的6年多的高级开发.为什么?这就是行业的红利,个体是享 ...

  6. Redis高级项目实战!北京java编程入门培训

    Dubbo面试专题 JVM面试专题 Java并发面试专题 Kafka面试专题 MongDB面试专题 MyBatis面试专题 MySQL面试专题 Netty面试专题 RabbitMQ面试专题 Redis ...

  7. python一天学费多少_自学python一天的小项目实战

    最近想做一些自己的项目,需要网上采集一些数据,以前都是用火车头采集的,感觉很不灵活,于是今天就花了一些时间学下python 展示下今天的成果,做了两个小实战 一个是抖音去水印 另外一个是爬取B站上的视 ...

  8. Redis高级项目实战,华为java开发工资

    个人基本情况: 首先介绍一下自己的个人基本情况,某专科学校毕业,计算机技术与应用专业,有过2年的工作经验,毕业以后一直想要进入一线互联网大厂工作,但无奈学历受限,屡屡被挡在门外.后来接触到一个朋友,了 ...

  9. Redis高级项目实战!mysql和java的管理系统源码

    分享第一份Java基础-中级-高级面试集合 Java基础(对象+线程+字符+接口+变量+异常+方法) Java中级开发(底层+Spring相关+Redis+分布式+设计模式+MySQL+高并发+锁+线 ...

最新文章

  1. springboot +element-axios跨域请求
  2. linux telnet远程登录工具,Linux 远程登录(telnet ssh)
  3. 云计算简介+云计算建站平台
  4. 数据结构-挖坑填数+分治法解决快速排序问题(java+c)
  5. ASP.NET Core 替换 Action 实际执行方法
  6. 摄像头拼接技术-远超海康大华
  7. poi获取段落位置_java poi读取.doc和.docx文件时获取图片与段落的对应关系
  8. Spring.net 中的AOP功能
  9. ssis sql oracle,[SQL][SSIS]透過 SSIS 連接 Oracle 的資料庫
  10. 信息提取 Information Extraction
  11. 【T+】畅捷通T+自定义报表将所有月份发生数在同一张表上体现
  12. 伦敦银现的交易时间特点
  13. java木板接水问题_木作施工常见的问题汇总,详解,避免留下遗憾
  14. Hadoop3.2.1 【 YARN 】源码分析 : ContainerManager浅析
  15. 如何简单的爬取网络数据
  16. 自动化测试中的验证码问题
  17. 防灾科技学院的计算机如何,防灾科技学院最好的专业是什么
  18. App邀请注册如何提高效率
  19. java 打印机_java调用打印机方式二
  20. glusterfs记录

热门文章

  1. android屏蔽表情输入法,Android中EditText屏蔽第三方输入法表情的方法示例
  2. Java中双冒号(::)运算操作符
  3. echarts 文本标签配置 label文字样式
  4. 【笔记】Android手机root的概念
  5. Saber 2016安装过程分享
  6. cf修改游戏客户端是什么意思_cf封号原因是非法篡改游戏客户端是什么意思
  7. Java学习笔记-IO
  8. ie11与html不兼容,IE11浏览器网页不兼容怎么办?IE 11浏览器网页不兼容解决方法...
  9. Web前端不同阶段工资待遇如何?前端开发真的很值钱吗?
  10. 云计算1+X平台运维与开发认证(初级)操作题详解