1.使用pipline的原因

Redis 使用的是客户端-服务器(CS)模型和请求/响应协议的 TCP 服务器。这意味着通常情况下一个请求会遵循以下步骤:
客户端向服务端发送一个查询请求,并监听 Socket 返回,通常是以阻塞模式,等待服务端响应。
服务端处理命令,并将结果返回给客户端。
管道(pipeline)可以一次性发送多条命令并在执行完后一次性将结果返回,pipeline 通过减少客户端与 redis 的通信次数来实现降低往返延时时间,而且 Pipeline 实现的原理是队列,而队列的原理是时先进先出,这样就保证数据的顺序性。
通俗点:pipeline就是把一组命令进行打包,然后一次性通过网络发送到Redis。同时将执行的结果批量的返回回来
pipelined.sync()表示我一次性的异步发送到redis,不关注执行结果。
pipeline.syncAndReturnAll ();将返回执行过的命令结果返回到List列表中

2.方法

2.1写入redis的方法

2.1.1参数说明

sc:SparkContext Spark上下文
spark:SparkSession 使用Dataset和DataFrame API编程Spark的入口点

def writeRedis(sc: SparkContext,spark: SparkSession): Unit ={// spark读取数据集val df: DataFrame = spark.read.parquet("file:///F://delRedisData//1//delData.snappy.parquet")df.show(1,false)val rdd: RDD[String] = df.rdd.map(x=>x.getAs[String]("r"))// 这个集合写的是2000多万的数据sc.toRedisSET(rdd,"test:task:deplicate")}

2.2读取本地待删除数据的方法

2.2.1参数说明

sc:SparkContext Spark上下文
spark:SparkSession 使用Dataset和DataFrame API编程Spark的入口点

def readParquet(spark: SparkSession,path:String): RDD[String] ={val df: DataFrame = spark.read.parquet(path)val strRDD: RDD[String] = df.rdd.map(_.getAs[String]("r"))// 返回String类型的RDDstrRDD}

2.3调用pipline删除的方法

2.3.1参数说明

collectionName 其中redis set集合的名称
num是要删除的数据量是多少
arr是要删除的数据存放的是set集合的key
jedis是redis的客户端


def delPipleine(collectionName:String,num:Int,arr:Array[String],jedis:Jedis):Unit = {try{val pipeline: Pipeline = jedis.pipelined()// 选择数据库  默认为 0pipeline.select(1)for(i <- 0 to (num - 1) ){pipeline.srem(collectionName,arr(i))}//表示我一次性的异步发送到redis,不关注执行结果pipeline.sync()}catch {case e : JedisException => e.printStackTrace()}finally if(jedis !=null) jedis.close()}

3.完整代码

import com.redislabs.provider.redis._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.exceptions.JedisException
import redis.clients.jedis.{Jedis, Pipeline}/*** Date 2022/5/25 17:57*/
object DelRedis {def main(args: Array[String]): Unit = {val conf = new SparkConf()// 驱动进程使用的内核数,仅在集群模式下使用。.set("spark.driver.cores","5")/*** 驱动进程使用的内存数量,也就是SparkContext初始化的地方,* 其格式与JVM内存字符串具有大小单位后缀(“k”,“m”,“g”或“t”)(例如512m, 2g)相同。* 注意:在客户端模式下,不能直接在应用程序中通过SparkConf设置此配置,因为此时驱动程* 序JVM已经启动。相反,请通过——driver-memory命令行选项或在默认属性文件中设置。*/.set("spark.driver.memory","5g")/*** 限制每个Spark操作(例如collect)的所有分区的序列化结果的总大小(以字节为单位)。* 应该至少是1M,或者0表示无限制。如果总大小超过此限制,则作业将被终止。* 过高的限制可能会导致驱动程序内存不足错误(取决于spark.driver.memory和JVM中对象的内存开销)。* 设置适当的限制可以防止驱动程序出现内存不足的错误。*/.set("spark.driver.maxResultSize","10g")/*** 每个执行程序进程使用的内存数量,* 格式与带有大小单位后缀(“k”,“m”,“g”或“t”)的JVM内存字符串相同(例如512m, 2g)。**/.set("spark.executor.memory","5g")/*** 默认 1在YARN模式下,worker上所有可用的内核在standalone和Mesos粗粒度模式下。*/.set("spark.executor.cores","5")val spark: SparkSession = SparkSession.builder().appName("DelRedis").master("local[*]").config("spark.redis.host","192.168.100.201").config("spark.redis.port","6379").config("spark.redis.db","1")     // 可选的数据库编号。避免使用它,尤其是在集群模式下,redisRedis默认支持16个数据库,默认是选择数据库0,这里设置为1。.config("spark.redis.timeout","2000000")   // 连接超时,以毫秒为单位,默认为 2000 毫秒.config(conf).getOrCreate()val sc: SparkContext = spark.sparkContext//1.写入数据集writeRedis(sc,spark)// 2.读取待删除的数据keyval path = "file:///F://delRedisData//test.parquet"val rdd: RDD[String] = readParquet(spark,path)//3.使用redis 中的 pipeline 方法 进行删除操作rdd.foreachPartition(iter=>{// 连接redis客户端val jedis = new Jedis("192.168.100.201",6379)val array: Array[String] = iter.toArrayval length: Int = array.lengthval beginTime: Long = System.currentTimeMillis()delPipleine(collectionName,length,array,jedis)val endTime: Long = System.currentTimeMillis()println("删除:"+length+"条数据,耗时:"+(endTime-beginTime)/1000+"秒")})sc.stop()spark.stop()}
def delPipleine(collectionName:String,num:Int,arr:Array[String],jedis:Jedis):Unit = {try{val pipeline: Pipeline = jedis.pipelined()// 选择数据库  默认为 0pipeline.select(1)for(i <- 0 to (num - 1) ){pipeline.srem(collectionName,arr(i))}//表示我一次性的异步发送到redis,不关注执行结果pipeline.sync()}catch {case e : JedisException => e.printStackTrace()}finally if(jedis !=null) jedis.close()}
def writeRedis(sc: SparkContext,spark: SparkSession): Unit ={// spark读取数据集val df: DataFrame = spark.read.parquet("file:///F://delRedisData//1//delData.snappy.parquet")df.show(1,false)val rdd: RDD[String] = df.rdd.map(x=>x.getAs[String]("r"))// 这个集合写的是2000多万的数据sc.toRedisSET(rdd,"test:task:deplicate")}
def readParquet(spark: SparkSession,path:String): RDD[String] ={val df: DataFrame = spark.read.parquet(path)val strRDD: RDD[String] = df.rdd.map(_.getAs[String]("r"))// 返回String类型的RDDstrRDD}}

4.总结

经检测:redis 的 pipeline(管道)方法 ,经单机版的redis测试 ,百万级别数据删除仅需要1分钟左右与硬件有关,还包括读取数据的时长等方面原因

Spark删除redis千万级别set集合数据相关推荐

  1. 千万级别数据表创建索引

    业务背景 最近一个开发维护的公众号管理系统用户表(user_info)数据已经达到15,000k了,而此时有一个业务场景需要将公众号的用户信息重新同步一次,且后台原有过针对单个公众号的用户同步,但是已 ...

  2. mysql千万级数据怎么删除,MySQL 快速删除大量数据(千万级别)的几种实践方案详解...

    这篇文章主要介绍了MySQL 快速删除大量数据(千万级别)的几种实践方案详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧 笔者 ...

  3. redis集合数据过期_关于redis性能问题分析和优化

    一.如何查看Redis性能 info命令输出的数据可以分为10个分类,分别是: server,clients,memory,persistence,stats,replication,cpu,comm ...

  4. 千万级别数据查询优化_从千万级数据查询来聊一聊索引结构和数据库原理

    在日常工作中我们不可避免地会遇到慢SQL问题,比如笔者在之前的公司时会定期收到DBA彪哥发来的Oracle AWR报告,并特别提示我某条sql近阶段执行明显很慢,可能要优化一下等.对于这样的问题通常大 ...

  5. Mysql千万级别数据如何 做分页?

    后端开发中为了防止⼀次性加载太多数据导致内存.磁盘IO都开销过⼤,经常需要分⻚展示,这个时候就需要⽤到MySQL的LIMIT关键字.但你以为LIMIT分⻚就万事大吉了么,LIMIT在数据量⼤的时候极可 ...

  6. 菜鸟mysql四分钟导入千万级别的数据

    最近在分析数据时,遇到1000万条csv数据,于是便想着将其导入MySQL进行分析,由于本人比较笨,折腾了一晚上还没搞定,总是遇到各种各样的错误,终于在今天成功导入了这1000万条数据,在此跟大家分享 ...

  7. mysql数据库千万级别数据的查询优化和分页测试

    原文地址:原创 mysql数据库千万级别数据的查询优化和分页测试作者:于堡舰  本文为本人最近利用几个小时才分析总结出的原创文章,希望大家转载,但是要注明出处  http://blog.sina.co ...

  8. mysql插入数据返回主键值_Mysql千万级别数据批量插入只需简单三步!

    第一步:配置my.ini文件 文件中配置 bulk_insert_buffer_size=120M 或者更大 将insert语句的长度设为最大. Max_allowed_packet=1M Net_b ...

  9. B+Tree索引为什么可以支持千万级别数据量的查找——讲讲mysql索引的底层数据结构

    MySQL索引底层数据结构 索引是存储引擎快速找到记录的一种数据结构 一. 有索引与没索引的差距 先来看一张图: 左边是没有索引的情况,右边是作为col2字段 二叉树索引的情况. 假如执行查找(假设表 ...

最新文章

  1. 二叉树中序遍历方法实现
  2. Tensorflow2.0与Tensorflow1.0的理解
  3. linux限制普通账号使用sftp,CentOS6.2使用SFTP限制帐号SSH连接
  4. Android版同步工具豌豆荚实测 电脑给手机按软件 截图
  5. leetcode1296. 划分数组为连续数字的集合(贪心算法)
  6. [js] 你认为es5的设计缺陷有哪些?
  7. 2020顶会指南:征稿截止时间、举办地、举办时间一览
  8. 吉大计算机考研分数线2021,吉林大学2021考研分数线
  9. 读 项亮《推荐系统实践》
  10. Windows 引导修复
  11. dpdk LRO功能总结
  12. ubantu20.04安装PCL
  13. Linux nm命令详解
  14. Python中的内置数据类型
  15. (附源码)springboot大学医学生毕业实习分配系统 毕业设计212 002
  16. Lucene .Net + 盘古分词 学习资料
  17. vba遍历数组_Excel VBA中如何对数组进行去重
  18. 使用 Jupiter Notebook 运行 Delta Lake 入门教程
  19. 关联分析之Apriori学习笔记
  20. MT6735 8.1 Secure Boot 签名

热门文章

  1. 如何让图片保持原比例,占满整个盒子
  2. 【转】c# 图片压缩 (非图片大小变化)----使得显示效果差点,但是图片占用空间需要变小
  3. Win系统 - 如何查看电脑开机了多长时间?
  4. GoogleMap获取地图中心点位置信息
  5. Word自动生成目录页码靠右对齐
  6. Geekon移动电源概念版
  7. Java学生成绩处理
  8. 物联网 DFrobot 掌控版 人工智能测温实验
  9. 机器学习笔记 - IOU、mAP、ROC、AUC、准确率、召回率、F分数
  10. 岳父岳母-12个未接电话