摘要:由于redis是基于内存的数据库,稳定性并不是很高,尤其是standalone模式下的redis。于是工作中在使用Spark-Redis时也会碰到很多问题,尤其是执行海量数据插入与查询的场景中。

海量数据查询

Redis是基于内存读取的数据库,相比其它的数据库,Redis的读取速度会更快。但是当我们要查询上千万条的海量数据时,即使是Redis也需要花费较长时间。这时候如果我们想要终止select作业的执行,我们希望的是所有的running task立即killed。

Spark是有作业调度机制的。SparkContext是Spark的入口,相当于应用程序的main函数。SparkContext中的cancelJobGroup函数可以取消正在运行的job。

/*** Cancel active jobs for the specified group. See `org.apache.spark.SparkContext.setJobGroup`* for more information.*/def cancelJobGroup(groupId: String) {assertNotStopped()dagScheduler.cancelJobGroup(groupId)}

按理说取消job之后,job下的所有task应该也终止。而且当我们取消select作业时,executor会throw TaskKilledException,而这个时候负责task作业的TaskContext在捕获到该异常之后,会执行killTaskIfInterrupted。

 // If this task has been killed before we deserialized it, let's quit now. Otherwise,// continue executing the task.val killReason = reasonIfKilledif (killReason.isDefined) {// Throw an exception rather than returning, because returning within a try{} block// causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl// exception will be caught by the catch block, leading to an incorrect ExceptionFailure// for the task.throw new TaskKilledException(killReason.get)}
/*** If the task is interrupted, throws TaskKilledException with the reason for the interrupt.*/private[spark] def killTaskIfInterrupted(): Unit

但是Spark-Redis中还是会出现终止作业但是task仍然running。因为task的计算逻辑最终是在RedisRDD中实现的,RedisRDD的compute会从Jedis中取获取keys。所以说要解决这个问题,应该在RedisRDD中取消正在running的task。这里有两种方法:

方法一:参考Spark的JDBCRDD,定义close(),结合InterruptibleIterator。

def close() {if (closed) returntry {if (null != rs) {rs.close()}} catch {case e: Exception => logWarning("Exception closing resultset", e)}try {if (null != stmt) {stmt.close()}} catch {case e: Exception => logWarning("Exception closing statement", e)}try {if (null != conn) {if (!conn.isClosed && !conn.getAutoCommit) {try {conn.commit()} catch {case NonFatal(e) => logWarning("Exception committing transaction", e)}}conn.close()}logInfo("closed connection")} catch {case e: Exception => logWarning("Exception closing connection", e)}closed = true}context.addTaskCompletionListener{ context => close() }
CompletionIterator[InternalRow, Iterator[InternalRow]](new InterruptibleIterator(context, rowsIterator), close())

方法二:异步线程执行compute,主线程中判断task isInterrupted

try{val thread = new Thread() {override def run(): Unit = {try {keys = doCall} catch {case e =>logWarning(s"execute http require failed.")}isRequestFinished = true}}// control the http request for quite if user interrupt the jobthread.start()while (!context.isInterrupted() && !isRequestFinished) {Thread.sleep(GetKeysWaitInterval)}if (context.isInterrupted() && !isRequestFinished) {logInfo(s"try to kill task ${context.getKillReason()}")context.killTaskIfInterrupted()}thread.join()CompletionIterator[T, Iterator[T]](new InterruptibleIterator(context, keys), close)

我们可以异步线程来执行compute,然后在另外的线程中判断是否task isInterrupted,如果是的话就执行TaskContext的killTaskIfInterrupted。防止killTaskIfInterrupted无法杀掉task,再结合InterruptibleIterator:一种迭代器,以提供任务终止功能。通过检查[TaskContext]中的中断标志来工作。

海量数据插入

我们都已经redis的数据是保存在内存中的。当然Redis也支持持久化,可以将数据备份到硬盘中。当插入海量数据时,如果Redis的内存不够的话,很显然会丢失部分数据。这里让使用者困惑的点在于: 当Redis已使用内存大于最大可用内存时,Redis会报错:command not allowed when used memory > ‘maxmemory’。但是当insert job的数据大于Redis的可用内存时,部分数据丢失了,并且还没有任何报错。

因为不管是Jedis客户端还是Redis服务器,当插入数据时内存不够,不会插入成功,但也不会返回任何response。所以目前能想到的解决办法就是当insert数据丢失时,扩大Redis内存。

总结

Spark-Redis是一个应用还不是很广泛的开源项目,不像Spark JDBC那样已经商业化。所以Spark-Redis还是存在很多问题。相信随着commiter的努力,Spark-Redis也会越来越强大。

点击关注,第一时间了解华为云新鲜技术~

如何应对Spark-Redis行海量数据插入、查询作业时碰到的问题相关推荐

  1. sql多行插入insert多行无法分析查询文本_收藏!SQL语法全集合!

    来源:PHP开源社区 本文针对关系型数据库的一般语法.限于篇幅,本文侧重说明用法,不会展开讲解特性.原理. 一.基本概念 数据库术语 数据库(database) - 保存有组织的数据的容器(通常是一个 ...

  2. Redis面试常问2-- 从海量数据里查询某一固定前缀的key? SCAN cursor

    从海量数据里查询某一固定前缀的key? 问题背景: 面试官问,如何从10亿个数据中,找到某一个固定前缀的10万个key? 注意:面试官问的细节 多点问面试官一些细节相关的问题,摸清楚数据规模 问清楚面 ...

  3. 面试突击 004 | 如何排查 Redis 中的慢查询?视频实战篇

    这是我的第 34 篇原创文章 作者 | 老王(javacn666) 1 面试题 如何排查 Redis 中的慢查询? 2 涉及相关问题 Redis 中有没有慢查询排查工具或者相关排查手段? 慢查询日志都 ...

  4. 1000+Redis实例,100+集群,Redis 在海量数据和高并发下的优化实践

    墨墨导读:Redis 对于从事互联网技术工程师来说并不陌生,几乎所有的大中型企业都在使用 Redis 作为缓存数据库. 但是对于绝大多数企业来说只会用到它的最基础的 KV 缓存功能,还有很多 Redi ...

  5. hbase 查询_不用ES也能海量数据复杂查询秒回

    面对海量数据复杂查询场景,目前的主流选择是HBase搭配ElasticSearch或者直接用ElasitcSearch实现,本文提出一个新的解决方案,基于HBase实现更加轻量,无需增加硬件投入,我们 ...

  6. mysql命令行批量添加数据_mysql命令行批量插入100条数据命令

    先介绍一个关键字的使用: delimiter 定好结束符为"$$",(定义的时候需要加上一个空格) 然后最后又定义为";", MYSQL的默认结束符为" ...

  7. 如何在海量数据中查询一个值是否存在?

    一般面试中考察的题目通常是由三类组成的,基础面试题.进阶面试题.开放性面试题,而本文的题目则属于一个开放性的面试题,但对于 Redis 这种以数据为核心的缓存中间件来说,实现在海量数据中查询一个值是否 ...

  8. PHP5: mysqli 插入, 查询, 更新和删除 Insert Update Delete Using mysqli (CRUD)

    原文: PHP5: mysqli 插入, 查询, 更新和删除  Insert Update Delete Using mysqli (CRUD) PHP 5 及以上版本建议使用以下方式连接 MySQL ...

  9. 钱文品 | 《Redis在海量数据和高并发下的优化实践》主题分享

    原文:http://www.enmotech.com/web/detail/1/750/1.html 导读:Redis 对于从事互联网技术工程师来说并不陌生,几乎所有的大中型企业都在使用 Redis ...

最新文章

  1. 关于 iOS 10 中 ATS 的问题
  2. 战神背光键盘如何关系_显瘦又有肌肉 神舟战神Z7MKP5GZ评测
  3. curl不通 k8s_如何利用curl命令访问Kubernetes API server
  4. ERROR java.lang.NoClassDefFoundError
  5. Python菜鸟入门:day02知识分类
  6. python日记----2017.7.20
  7. 使用php glob函数查找文件,遍历文件目录(转)
  8. cisco下模拟Linux防火墙,Cisco防火墙HA实例
  9. 双目测距原理 matlab,双目测距的基本原理
  10. 网络安全与渗透:sql注入,一文详解(九)此生无悔入华夏,男儿何不带吴钩
  11. 29、程序员的面试考题,要求用一个for循环打出乘法表。
  12. 一些javascript内容
  13. 年轻程序员如何快速成长
  14. 判断大学生体侧项目中立定跳远成绩的等级
  15. Tridium niagara N4---JACE 8000恢复出厂报错无法进去
  16. 《Android之大话设计模式》--设计原则 第三章:开放封闭原则 孙悟空任弼马温一职
  17. 团队作业五之旅游行业手机APP分析
  18. 计算机硬件和工作原理,课题计算机硬件和基本工作原理
  19. 人生的追求到底是什么?
  20. 英语四级考前重点词汇【3】

热门文章

  1. 如何检测支付宝接口中notify_url.php有没有返回,微信小程序支付成功,但是notify_url接收不到回调如何排查此问题?...
  2. VS2019C++代码出现cout不明确
  3. 视觉SLAM笔记(27) 非线性最小二乘
  4. CF1060C Maximum Subrectangle
  5. DataTable中Compute计算函数
  6. Nginx http 视频点播服务器搭建操作指南
  7. expect学习笔记及实例详解【转】
  8. 我TM快疯了,在博客园开博短短2个月,经历博客园数次故障。。。
  9. c#连mysql的latin1编码乱码问题
  10. 动态规划——骨牌平铺问题