java操作Redis:http://blog.csdn.net/xyang81/article/details/51918129

数据order.txt

A 202.106.196.115 手机 iPhone8 8000
B 202.106.0.20 服装 布莱奥尼西服 199
C 202.102.152.3 家具 婴儿床 2000
D 202.96.96.68 家电 电饭锅 1000
F 202.98.0.68 化妆品 迪奥香水 200
H 202.96.75.68 食品 奶粉 600
J 202.97.229.133 图书 Hadoop编程指南 90
A 202.106.196.115 手机 手机壳 200
B 202.106.0.20 手机 iPhone8 8000
C 202.102.152.3 家具 婴儿车 2000
D 202.96.96.68 家具 婴儿车 1000
F 202.98.0.68 化妆品 迪奥香水 200
H 202.96.75.68 食品 婴儿床 600
J 202.97.229.133 图书 spark实战 80

JedisConnectionPool类

import redis.clients.jedis.Jedis
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}object JedisConnectionPool{val config = new JedisPoolConfig()//最大连接数,config.setMaxTotal(20)//最大空闲连接数config.setMaxIdle(10)//当调用borrow Object方法时,是否进行有效性检查 -->config.setTestOnBorrow(true)//10000代表超时时间(10秒)val pool = new JedisPool(config, "192.168.1.207", 6379, 10000, "123")def getConnection(): Jedis = {pool.getResource}def main(args: Array[String]) {val conn = JedisConnectionPool.getConnection()
//    conn.set("income", "1000")
//
//    val r1 = conn.get("xiaoniu")
//
//    println(r1)
//
//    conn.incrBy("xiaoniu", -50)
//
//    val r2 = conn.get("xiaoniu")
//
//    println(r2)
//
//    conn.close()val r = conn.keys("*")import scala.collection.JavaConversions._for (p <- r) {println(p + " : " + conn.get(p))}}}

OrderCount类

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Duration, StreamingContext}object OrderCount {def main(args: Array[String]): Unit = {//指定组名val group = "g1"//创建SparkConfval conf = new SparkConf().setAppName("OrderCount").setMaster("local[4]")//创建SparkStreaming,并设置间隔时间val ssc = new StreamingContext(conf, Duration(5000))val broadcastRef = IPUtils.broadcastIpRules(ssc, "/Users/zx/Desktop/temp/spark-24/spark-4/ip/ip.txt")//指定消费的 topic 名字val topic = "orders"//指定kafka的broker地址(sparkStream的Task直连到kafka的分区上,用更加底层的API消费,效率更高)val brokerList = "node-4:9092,node-5:9092,node-6:9092"//指定zk的地址,后期更新消费的偏移量时使用(以后可以使用Redis、MySQL来记录偏移量)val zkQuorum = "node-1:2181,node-2:2181,node-3:2181"//创建 stream 时使用的 topic 名字集合,SparkStreaming可同时消费多个topicval topics: Set[String] = Set(topic)//创建一个 ZKGroupTopicDirs 对象,其实是指定往zk中写入数据的目录,用于保存偏移量val topicDirs = new ZKGroupTopicDirs(group, topic)//获取 zookeeper 中的路径 "/g001/offsets/wordcount/"val zkTopicPath = s"${topicDirs.consumerOffsetDir}"//准备kafka的参数val kafkaParams = Map(//"key.deserializer" -> classOf[StringDeserializer],//"value.deserializer" -> classOf[StringDeserializer],//"deserializer.encoding" -> "GB2312", //配置读取Kafka中数据的编码"metadata.broker.list" -> brokerList,"group.id" -> group,//从头开始读取数据"auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString)//zookeeper 的host 和 ip,创建一个 client,用于跟新偏移量量的//是zookeeper的客户端,可以从zk中读取偏移量数据,并更新偏移量val zkClient = new ZkClient(zkQuorum)//查询该路径下是否字节点(默认有字节点为我们自己保存不同 partition 时生成的)// /g001/offsets/wordcount/0/10001"// /g001/offsets/wordcount/1/30001"// /g001/offsets/wordcount/2/10001"//zkTopicPath  -> /g001/offsets/wordcount/val children = zkClient.countChildren(zkTopicPath)var kafkaStream: InputDStream[(String, String)] = null//如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置var fromOffsets: Map[TopicAndPartition, Long] = Map()//如果保存过 offset//注意:偏移量的查询是在Driver完成的if (children > 0) {for (i <- 0 until children) {// /g001/offsets/wordcount/0/10001// /g001/offsets/wordcount/0val partitionOffset = zkClient.readData[String](s"$zkTopicPath/${i}")// wordcount/0val tp = TopicAndPartition(topic, i)//将不同 partition 对应的 offset 增加到 fromOffsets 中// wordcount/0 -> 10001fromOffsets += (tp -> partitionOffset.toLong)}//Key: kafka的key   values: "hello tom hello jerry"//这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (kafka的key, message) 这样的 tupleval messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())//通过KafkaUtils创建直连的DStream(fromOffsets参数的作用是:按照前面计算好了的偏移量继续消费数据)//[String, String, StringDecoder, StringDecoder,     (String, String)]//  key    value    key的解码方式   value的解码方式kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)} else {//如果未保存,根据 kafkaParam 的配置使用最新(largest)或者最旧的(smallest) offsetkafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)}//偏移量的范围var offsetRanges = Array[OffsetRange]()//直连方式只有在KafkaDStream的RDD(KafkaRDD)中才能获取偏移量,那么就不能到调用DStream的Transformation//所以只能子在kafkaStream调用foreachRDD,获取RDD的偏移量,然后就是对RDD进行操作了//依次迭代KafkaDStream中的KafkaRDD//如果使用直连方式累加数据,那么就要在外部的数据库中进行累加(用KeyVlaue的内存数据库(NoSQL),Redis)//kafkaStream.foreachRDD里面的业务逻辑是在Driver端执行kafkaStream.foreachRDD { kafkaRDD =>//判断当前的kafkaStream中的RDD是否有数据if(!kafkaRDD.isEmpty()) {//只有KafkaRDD可以强转成HasOffsetRanges,并获取到偏移量offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRangesval lines: RDD[String] = kafkaRDD.map(_._2)//整理数据val fields: RDD[Array[String]] = lines.map(_.split(" "))//计算成交总金额CalculateUtil.calculateIncome(fields)//计算商品分类金额CalculateUtil.calculateItem(fields)//计算区域成交金额CalculateUtil.calculateZone(fields, broadcastRef)//偏移量跟新在哪一端()for (o <- offsetRanges) {//  /g001/offsets/wordcount/0val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"//将该 partition 的 offset 保存到 zookeeper//  /g001/offsets/wordcount/0/20000ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString)}}}ssc.start()ssc.awaitTermination()}}

CalculateUtil类

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDDobject CalculateUtil {def calculateIncome(fields: RDD[Array[String]]) = {//将数据计算后写入到Reidsval priceRDD: RDD[Double] = fields.map(arr => {val price = arr(4).toDoubleprice})//reduce是一个Action,会把结果返回到Driver端//将当前批次的总金额返回了val sum: Double = priceRDD.reduce(_+_)//获取一个jedis连接val conn = JedisConnectionPool.getConnection()//将历史值和当前的值进行累加//conn.set(Constant.TOTAL_INCOME, sum.toString)conn.incrByFloat(Constant.TOTAL_INCOME, sum)//释放连接conn.close()}/*** 计算分类的成交金额* @param fields*/def calculateItem(fields: RDD[Array[String]]) = {//对field的map方法是在哪一端调用的呢?Driverval itemAndPrice: RDD[(String, Double)] = fields.map(arr => {//分类val item = arr(2)//金额val parice = arr(4).toDouble(item, parice)})//安装商品分类进行聚合val reduced: RDD[(String, Double)] = itemAndPrice.reduceByKey(_+_)//将当前批次的数据累加到Redis中//foreachPartition是一个Action//现在这种方式,jeids的连接是在哪一端创建的(Driver)//在Driver端拿Jedis连接不好//val conn = JedisConnectionPool.getConnection()reduced.foreachPartition(part => {//获取一个Jedis连接//这个连接其实是在Executor中的获取的//JedisConnectionPool在一个Executor进程中有几个实例(单例)val conn = JedisConnectionPool.getConnection()part.foreach(t => {//一个连接更新多条数据conn.incrByFloat(t._1, t._2)})//将当前分区中的数据跟新完在关闭连接conn.close()})}//根据Ip计算归属地def calculateZone(fields: RDD[Array[String]], broadcastRef: Broadcast[Array[(Long, Long, String)]]) = {val provinceAndPrice: RDD[(String, Double)] = fields.map(arr => {val ip = arr(1)val price = arr(4).toDoubleval ipNum = MyUtils.ip2Long(ip)//在Executor中获取到广播的全部规则val allRules: Array[(Long, Long, String)] = broadcastRef.value//二分法查找val index = MyUtils.binarySearch(allRules, ipNum)var province = "未知"if (index != -1) {province = allRules(index)._3}//省份,订单金额(province, price)})//按省份进行聚合val reduced: RDD[(String, Double)] = provinceAndPrice.reduceByKey(_+_)//将数据跟新到Redisreduced.foreachPartition(part => {val conn = JedisConnectionPool.getConnection()part.foreach(t => {conn.incrByFloat(t._1, t._2)})conn.close()})}
}

Constant(1)类

object Constant {val TOTAL_INCOME = "TOTAL_INCOME"
}

MyUtils类

import java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.spark.streaming.StreamingContextimport scala.io.{BufferedSource, Source}object MyUtils {def ip2Long(ip: String): Long = {val fragments = ip.split("[.]")var ipNum = 0Lfor (i <- 0 until fragments.length){ipNum =  fragments(i).toLong | ipNum << 8L}ipNum}def readRules(path: String): Array[(Long, Long, String)] = {//读取ip规则val bf: BufferedSource = Source.fromFile(path)val lines: Iterator[String] = bf.getLines()//对ip规则进行整理,并放入到内存val rules: Array[(Long, Long, String)] = lines.map(line => {val fileds = line.split("[|]")val startNum = fileds(2).toLongval endNum = fileds(3).toLongval province = fileds(6)(startNum, endNum, province)}).toArrayrules}def binarySearch(lines: Array[(Long, Long, String)], ip: Long) : Int = {var low = 0var high = lines.length - 1while (low <= high) {val middle = (low + high) / 2if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))return middleif (ip < lines(middle)._1)high = middle - 1else {low = middle + 1}}-1}def data2MySQL(it: Iterator[(String, Int)]): Unit = {//一个迭代器代表一个分区,分区中有多条数据//先获得一个JDBC连接val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "123568")//将数据通过Connection写入到数据库val pstm: PreparedStatement = conn.prepareStatement("INSERT INTO access_log VALUES (?, ?)")//将分区中的数据一条一条写入到MySQL中it.foreach(tp => {pstm.setString(1, tp._1)pstm.setInt(2, tp._2)pstm.executeUpdate()})//将分区中的数据全部写完之后,在关闭连接if(pstm != null) {pstm.close()}if (conn != null) {conn.close()}}def main(args: Array[String]): Unit = {//数据是在内存中val rules: Array[(Long, Long, String)] = readRules("/Users/zx/Desktop/ip/ip.txt")//将ip地址转换成十进制val ipNum = ip2Long("114.215.43.42")//查找val index = binarySearch(rules, ipNum)//根据脚本到rules中查找对应的数据val tp = rules(index)val province = tp._3println(province)}
}

IPUtils类

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContextobject IPUtils {def broadcastIpRules(ssc: StreamingContext, ipRulesPath: String): Broadcast[Array[(Long, Long, String)]] = {//现获取sparkContextval sc = ssc.sparkContextval rulesLines:RDD[String] = sc.textFile(ipRulesPath)//整理ip规则数据val ipRulesRDD: RDD[(Long, Long, String)] = rulesLines.map(line => {val fields = line.split("[|]")val startNum = fields(2).toLongval endNum = fields(3).toLongval province = fields(6)(startNum, endNum, province)})//将分散在多个Executor中的部分IP规则收集到Driver端val rulesInDriver: Array[(Long, Long, String)] = ipRulesRDD.collect()//将Driver端的数据广播到Executor//广播变量的引用(还在Driver端)sc.broadcast(rulesInDriver)}
}

spark streamming + kafka + Redis 实践相关推荐

  1. pythonspark实践_基于Python的Spark Streaming Kafka编程实践

    版权声明:本文为CSDN博主原创文章,未经博主允许不得转载. 说明 Spark Streaming的原理说明的文章很多,这里不做介绍.本文主要介绍使用Kafka作为数据源的编程模型,编码实践,以及一些 ...

  2. spark消费kafka产生数据堆积怎么处理_SparkStreaming读取Kafka的两种方式

    本文主要从以下几个方面介绍SparkStreaming读取Kafka的两种方式: 一.SparkStreaming简介 二.Kafka简介 三.Redis简介(可用于保存历史数据或偏移量数据) 四.S ...

  3. sparkStreaming+kafka+redis小项目实战

    一.项目说明 1.需求 实时更新每个用户走的总步数: 每隔5s统计一次,包括某个用户新统计时的时间.所在地点.新增步数: 这里为了方便只将每个用户以及实时更新的步数总和两个维度保存到redis数据库中 ...

  4. Spark整合Kafka小项目

    SparkStreaming与kafka整合小项目实践含所有代码带详细注释 总流程:自制日志生成器生成含数据日志,使用kafkaAppender直接发送到kafka,SparkStreaming从ka ...

  5. spark把kafka数据写到hive

    写入分区表: 准备工作:先建好分区表 方法一:(使用dataframe) 写数据到数据所在的位置,因为hive分区的本质就是分文件夹,先用spark把数据写到文件夹位置,然后执行sql添加分区 1.写 ...

  6. spark spark streaming + kafka receiver方式消费消息

    2019独角兽企业重金招聘Python工程师标准>>> kafka + spark streaming 集群 前提: spark 安装成功,spark 1.6.0 zookeeper ...

  7. Scala Spark Streaming + Kafka + Zookeeper完成数据的发布和消费

    一.Spark Streaming Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理.数据可以从许多来源(如Kafka,Flume,Kine ...

  8. 腾讯在线人数统计_PHP + REDIS 实践:统计在线人数的几种方案分析

    (给PHP开发者加星标,提升PHP技能) 转自:饼bing blog.csdn.net/hao508506/article/details/52496656 在线人数统计业务是我们开发web肯定要设计 ...

  9. Strom+Kafka + redis实时计算单词出现频率的案例

    案例要实现的目标 在Kafka的shell 客户端中输入内容,通过Storm实时去kafka中取数据并进行计算单词出现的次数,并且实时把这些数据信息存储到redis中. 代码编写 编写Pom文件,代码 ...

最新文章

  1. 花了3个月整理的超级全面的Python资料和Java面试题,分享给大家!
  2. LeetCode 77. Combinations--回溯法,-Python,Java解法
  3. SmartCode 使用常见问题
  4. VScode中Python的交互式命令环境使用笔记
  5. 还在用递归查询 MySQL 的树形结构吗?教你一种更好的解决方案!
  6. 东北真有“油炸冰溜子”这道菜吗?
  7. android字符串点击事件,Android匹配字符串高亮并设置点击事件
  8. 向云上迁移数据时如何避免停机和中断
  9. AD(PCB)知识总结
  10. 专用计算机数控编程软件有哪些,大家都用什么数控编程软件
  11. usb3.0速度测试软件,主流B75原生SATA3.0和USB3.0速度测试
  12. 法学生民法方面的论文选题,有什么推荐吗?
  13. 坐标求四面体体积_「体积公式」四面体体积公式 - seo实验室
  14. android 11.0 12.0Launcher3去掉默认的google搜索栏
  15. Dism++: 好用的Windows 系统优化工具
  16. 用Windows Media Service打造的流媒体直播系统
  17. 香港服务器需要如何来维护呢?
  18. win7-32位系统SqlServer2014版本下载与安装
  19. php三级分销思路 数据库设计_分销系统的用户关系,用户与推广链接的数据库设计。设计思路...
  20. ShardingSphere实践(1)——ShardingSphere介绍

热门文章

  1. 公园遛狗(小 * 逛公园)
  2. i513500h和r5 5600h选哪个 r55600h和i513500h差多少
  3. 第十三周 项目2第11章习题9
  4. tkinter浏览器组件
  5. Android添加拍照功能,Android开发实现拍照功能的方法实例解析
  6. Type interface com.aiit.mapper.BrandMapper is not known to the MapperRegistry.解决办法
  7. 被遗忘的数学家!曾提出最接地气的数学定理,可以计算男朋友真不真心的那种......
  8. 导航星历的钟差,TGD问题
  9. 测绘界超强工具箱!CADCASS实用插件合集,各种功能应有尽有!断面、高程点、等高线、三角网、拓扑检查...
  10. 硅芯思见:SystemVerilog中的类型转换有哪些