未经本人同意,严禁转载,徽沪一郎。

概要

本文就 spark-cassandra-connector 的一些实现细节进行探讨,主要集中于如何快速将大量的数据从cassandra 中读取到本地内存或磁盘。

数据分区

存储在 Cassandra 中数据的一般都会比较多,记录数在千万级别或上亿级别是常见的事。如何将这些表中的内容快速加载到本地内存就是一个非常现实的问题。
解决这一挑战的思路从大的方面来说是比较简单的,那就是将整张表中的内容分成不同的区域,然后分区加载,不同的分区可以在不同的线程或进程中加载,利用并行化来减少整体加载时间。

顺着这一思路出发,要问的问题就是 Cassandra 中的数据如何才能分成不同的区域。

不同于 MySQL ,在 Cassandra 中是不存在 Sequence Id 这样的类型的,也就是说无法简单的使用seqId 来指定查询或加载的数据范围。

既然没有 SequenceID,在 Cassandra 中是否就没有办法了呢?答案显然是否定的,如果只是仅仅支持串行读取,Cassandra 早就会被扔进垃圾堆里了。
数据分区的办法在 Cassandra 中至少有两种办法可以达到,一是通过 token range,另一个是 slice range。这里主要讲解利用 token range 来实现目的。

token range

Cassandra将要存储的记录存储在不同的区域中,判断某一记录具体存储在哪个区域的依据是主键的Hash值。

在Cassandra 1.2之前,组成Cassandra集群的所有节点(Node),都需要手工的指定该节点的Hash值范围也就是Token Range。

手工计算Token Range显然是很繁琐,同时也不怎么容易维护,在Cassandra 1.2之后,引进了虚拟节点(vnode)的概念,主要目的是减少不必要的人工指定,同时也将token range的划分变得更为细粒度。比如原先手工指定token range,只能达到10000这样一个精度,而有了vnode之后,默认安装是每一个物理节点上有256个虚拟节点,这样子的话每一个range的范围就是10000/256,这样变的更为精细。

有关token range的信息存储在cassandra的system命名空间(keyspace)下的local和peers两张表中。其中local表示本节点的token range情况,而peers表示集群中其它节点的token range情况。这两张表中的tokens字段就存储有详细的信息。如果集群中只由一台机器组成,那么peers中的就会什么内容都没有。

简单实验,列出本节点的token range>

use system;
desc table local;
select tokens from local;

Thrift接口

Token Range告诉我们Cassandra的记录是分片存储的,也就意味着可以分片读取。现在的问题转换成为如何知道每一个Token Range的起止范围。

Cassandra支持的Thrift接口中describe_ring就是用来获取token range的具体起止范围的。我们常用的nodetool工具使用的就是thrift接口,nodetool 中有一个describering指令使用的就是describe_ring原语。

可以做一个简单的实验,利用nodetool来查看某个keyspace的token range具体情况。

 nodetool -hcassandra_server_addr describering keyspacename

Spark-Cassandra-Connector

在第一节中讲解了Cassandra中Token Range信息的存储位置,以及可以使用哪些API来获取token range信息。

接下来就分析spark-cassandra-connector是如何以cassandra为数据源将数据加载进内存的。

以简单的查询语句为例,假设用户要从demo这个keyspace的tableX表中加载所有数据,用CQL来表述就是

select * from demo.tableX

上述的查询使用spark-cassandra-connector来表述就是

sc.cassandraTable(“demo”,”tableX”)

尽管上述语句没有触发Spark Job的提交,也就是说并不会将数据直正的从Cassandra的tableX表中加载进来,但spark-cassandra-connector还是需要进行一些数据库的操作。要解决的主要问题就是schema相关。

cassandraTable(“demo”,”tableX”)

只是说要从tableX中加载数据,并没有告诉connector有哪些字段,每个字段的类型是什么。这些信息对后面使用诸如get[String](“fieldX”)来说却是非常关键的。

为了获取字段类型信息的元数据,需要读取system.schema_columns表,利用如下语句可以得到schema_columns表结构的详细信息

desc table system.schema_columns

如果在conf/log4j.properties中将日志级别设置为DEBUG, 然后再执行sc.cassandraTable语句就可以看到具体的CQL查询语句是什么。

CassandraRDDPartitioner

Spark-cassandra-connector添加了一种新的RDD实现,即CassandraRDD。我们知道对于一个Spark RDD来说,非常关键的就是确定getPartitions和compute函数。

getPartitions函数会调用CassandraRDDPartitioner来获取分区数目,

override def getPartitions: Array[Partition] = {verify // let's fail fastval tf = TokenFactory.forCassandraPartitioner(cassandraPartitionerClassName)val partitions = new CassandraRDDPartitioner(connector, tableDef, splitSize)(tf).partitions(where)logDebug(s"Created total ${partitions.size} partitions for $keyspaceName.$tableName.")logTrace("Partitions: \n" + partitions.mkString("\n"))partitions}

CassandraRDDPartitioner中的partitions的处理逻辑大致如下:

  1. 首先确定token range,使用describe_ring

  2. 然后根据Cassandra中使用的Partitioner来确定某一个token range中可能的记录条数,这么做的原因就是为进一步控制加载的数据,提高并发度。否则并发度就永远是256了,比如有一个物理节点,其中有256个vnodes,也就是256个token分区。如果每个分区中大致的记录数是20,000,而每次加载最大只允许10,00的话,整个数据就可以分成256x2=512个分区。

  3. 对describeRing返回的token range进一步拆分的话,需要使用splitter,splitter的构建需要根据keyspace中使用了何种Partitioner来决定,Cassandra中默认的Partitioner是Murmur3Partitioner,Murmur3Hash算法可以让Hash值更为均匀的分布到不同节点。

  4. splitter中会利用到配置项spark.cassandra.input.split.size和spark.cassandra.page.row.size,分别表示一个线程最多读取多少记录,另一个表示每次读取多少行。

partitions的源码详见CasssandraRDDParitioner.scala

compute函数就利用确定的token的起止范围来加载内容,这里在理解的时候需要引起注意的就是flatMap是惰性执行的,也就是说只有在真正需要值的时候才会被执行,延迟触发。

数据真正的加载是发生在fetchTokenRange函数,这时使用到的就是Cassandra Java Driver了,平淡无奇。

fetchTokenRange

fetcchTokenRange函数使用Cassandra Java Driver提供的API接口来读取数据,利用Java API读取数据一般遵循以下步骤

val cluster = Cluster.Builder.addContactPoint(“xx.xx.xx.xx”).build
val session = cluster.connect
val stmt = new SimpleStatement(queryCQL)
session.execute(session)
session.close
cluster.close

addContactPoint的参数是cassandra server的ip地址,在后面真正执行cql语句的时候,如果集群有多个节点构成,那么不同的cql就会在不同的节点上执行,自动实现了负载均衡。可以在addContactPoint的参数中设定多个节点的地址,这样可以防止某一节点挂掉,无法获取集群信息的情况发生。

session是线程安全的,在不同的线程使用同一个session是没有问题的,建议针对一个keySpace只使用一个session.

RDD中使用Session

在Spark RDD中是无法使用SparkContext的,否则会形成RDD嵌套的现象,因为利用SparkContext很容易构造出RDD,如果在RDD的函数中如map中调用SparkContext创建一个新的RDD,则形成深度嵌套进而导致Spark Job有嵌套。

但在实际的情况下,我们可以需要根据RDD中的值再去对数据库进行操作,那么有什么办法来打开数据库连接呢?

解决的办法就是直接使用Cassandra Java Driver而不再使用spark-cassandra-connector的高级封装,因为不能像这样子来使用cassandraRDD.

   sc.cassandraRDD(“ks”,”tableX”).map(x=>sc.cassandraRDD(“ks”,”tableX”).where(filter))

如果是直接使用Cassandra Java Driver,为了避免每个RDD中的iterator都需要打开一个session,那么可以使用foreachPartition函数来进行操作,减少打开的session数。

val  rdd1 = sc.cassandraTable(“keyspace”,”tableX”)rdd1.foreachPartition( lst => {val cluster = ClusterBuilder.addContactPoint(“xx.xx.xx.xx”).buildval session = cluster.connectwhile ( iter.hasNext ) {val  elem = iter.next//do something by using session and elem}session.closecluster.close})

其实最好的办法是在外面建立一个session,然后在不同的partition中使用同一个session,但这种方法不行的原因是在执行的时候会需要”Task not Serializable”的错误,于是只有在foreachPartition函数内部新建session.

数据备份

尽管Cassandra号称可以做到宕机时间为零,但为了谨慎起见,还是需要对数据进行备份。

Cassandra提供了几种备份的方法,

  1. 将数据导出成为json格式

  2. 利用copy将数据导出为csv格式

  3. 直接复制sstable文件

导出成为json或csv格式,当表中的记录非常多的时候,这显然不是一个好的选择。于是就只剩下备份sstable文件了。

问题是将sstable存储到哪里呢?放到HDFS当然没有问题,哪有没有可能对放到HDFS上的sstable直接进行读取呢,在没有经过任务修改的情况下,这是不行的。

试想一下,sstable的文件会被拆分为多个块而存储到HDFS中,这样会破坏记录的完整性,HDFS在存储的时候并不知道某一block中包含有完成的记录信息。

为了做到记录信息不会被拆分到多个block中,需要根据sstable的格式自行提取信息,并将其存储到HDFS上。这样存储之后的文件就可以被并行访问。

Cassandra中提供了工具sstablesplit来将大的sstable分割成为小的文件。

DataStax的DSE企业版中提供了和Hadoop及Spark的紧密结合,其一个很大的基础就是先将sstable的内容存储到CFS中,大体的思路与刚才提及的应该差不多。

对sstable存储结构的分析是一个研究的热门,可以参考如下的链接。

  1. https://www.fullcontact.com/blog/cassandra-sstables-offline/

只所以要研究备份策略是想将对数据的分析部分与业务部分相分离开,避免由于后台的数据分析导致Cassandra集群响应变得缓慢而致前台业务不可用,即将OLTP和OLAP的数据源分离开。

复杂查询

通过近乎实时的数据备份,后台OLAP就可以使用Spark来对数据进行分析和处理。

与传统的RDBMS相比,Cassandra所能提供的查询功能实在是弱的可以,如果想到实现非常复杂的查询功能的,需要将Cassandra和Solr进行结合。

DSE企业版提供了该功能,如果想手工搭建的话,可以参考下面的链接

  1. http://www.slideshare.net/planetcassandra/an-introduction-to-distributed-search-with-cassandra-and-solr

  2. https://github.com/Stratio/stratio-cassandra 开源方面的尝试 Cassandra和Lucene的结合

共享SparkContext

SparkContext可以被多个线程使用,这意味着同个Spark Application中的Job可以同时提交到Spark Cluster中,减少了整体的等待时间。

在同一个线程中, Spark只能逐个提交Job,当Job在执行的时候,Driver Application中的提交线程是处于等待状态的。如果Job A没有执行完,Job B就无法提交到集群,就更不要提分配资源真正执行了。

那么如何来减少等待时间呢,比如在读取Cassandra数据的过程中,需要从两个不同的表中读取数据,一种办法就是先读取完成表A与读取表B,总的耗时是两者之和。

如果利用共享SparkContext的技术,在不同的线程中去读取,则耗时只是两者之间的最大值。

在Scala中有多种不同的方式来实现多线程,现仅以Future为例来说明问题

val ll  = (1 to 3 toList).map(x=>sc.makeRDD(1 to 100000 toList, 3))
val futures = ll.map ( x => Future {x.count()})
val fl = Future.sequencce(futures)
Await.result(fl,3600 seconds)

简要说明一下代码逻辑

  1. 创建三个不同的RDD

  2. 在不同的线程(Future)中通过count函数来提交Job

  3. 使用Await来等待Future执行结束

Apache Spark技术实战之7 -- CassandraRDD高并发数据读取实现剖析相关推荐

  1. Apache Spark技术实战之6 -- spark-submit常见问题及其解决

    除本人同意外,严禁一切转载,徽沪一郎. 概要 编写了独立运行的Spark Application之后,需要将其提交到Spark Cluster中运行,一般会采用spark-submit来进行应用的提交 ...

  2. Apache Spark 技术团队开源机器学习平台 MLflow

    开发四年只会写业务代码,分布式高并发都不会还做程序员?   近日,来自 Databricks 的 Matei Zaharia 宣布推出开源机器学习平台 MLflow .Matei Zaharia 是 ...

  3. 猿创征文 | 微服务 Spring Boot 整合Redis 实战开发解决高并发数据缓存

    文章目录 一.什么是 缓存? ⛅为什么用缓存? ⚡如何使用缓存 二.实现一个商家缓存 ⌛环境搭建 ♨️核心源码 ✅测试接口 三.采用 微服务 Spring Boot 注解开启缓存 ✂️@CacheEn ...

  4. Spring Boot实战解决高并发数据入库: Redis 缓存+MySQL 批量入库

    前言 最近在做阅读类的业务,需要记录用户的PV,UV: 项目状况:前期尝试业务阶段: 特点: 快速实现(不需要做太重,满足初期推广运营即可) 快速投入市场去运营 收集用户的原始数据,三要素: 谁 在什 ...

  5. 大数据|Spark技术在京东智能供应链预测的应用案例深度剖析(一)

    大数据|Spark技术在京东智能供应链预测的应用案例深度剖析(一) 2017-03-27 11:58  浏览次数:148 1. 背景 前段时间京东公开了面向第二个十二年的战略规划,表示京东将全面走向技 ...

  6. Spring Cloud Alibaba 分布式微服务高并发数据平台化(中台)思想+多租户saas企业开发架构技术选型和设计方案

    基于Spring Cloud Alibaba 分布式微服务高并发数据平台化(中台)思想+多租户saas设计的企业开发架构,支持源码二次开发.支持其他业务系统集成.集中式应用权限管理.支持拓展其他任意子 ...

  7. SparkSQL高并发:读取存储数据库

    摘要:实践解析如何利用SarkSQL高并发进行读取数据库和存储数据到数据库. 本文分享自华为云社区<SarkSQL高并发读取数据库和存储数据到数据库>,作者:Copy工程师 . 1. Sp ...

  8. TCP高并发数据转接服务器(Ntrip Caster)

    TCP高并发数据转接服务器(Ntrip Caster) 说到NTRIP Caster, 咱们需要首先链接一下什么是Ntrip协议由于这不是本博客的重点,故只做如下简单介绍,,大家可以通过如下链接对nt ...

  9. 处理大客流量高并发数据通讯_云如何处理高流量网站?

    处理大客流量高并发数据通讯 It is often seen that one of the main benefits of the cloud is its perceived ability t ...

  10. 千亿级平台技术架构:为了支撑高并发,我把身份证存到了JS里

    点击上方 IT牧场 ,选择 置顶或者星标 技术干货每日送达 随着时代及互联网的发展,人们对个人隐私越来越重视,但隐私信息泄露及滥用的问题依然屡见不鲜.之前有一份<中国个人信息安全和隐私保护报告& ...

最新文章

  1. 零基础怎么学UI设计
  2. 量子计算机神器,量子计算技术再获神器 科学家开发出新的成像技术
  3. JavaScript中的的面向对象中的一些知识
  4. ethtool如何让接口闪灯_如何解决专业家庭影院与卡拉OK的声学问题?
  5. S3C2440裸奔篇之MMU
  6. zkfc 异常退出问题,报错Received stat error from Zookeeper. code:CONNECTIONLOSS
  7. 渗透实战-guest账户-mimikatz-向日葵-sql提权-离线解密
  8. 量子力学考研书籍介绍
  9. 大数据时代,如何做商业智能产品选型
  10. 【PaddlePaddle论文复现】U-GAT-IT: 基于GAN的新型无监督图像转换
  11. 亚马逊云科技入门资源中心,从0到1轻松上云
  12. 灵格斯(lingoes)词霸 + Acapela破解语音包(法语,德语,西班牙语,美国英语)
  13. 打破思维断层之KMP分析
  14. 【香蕉OI】GCD 和 LCM (莫比乌斯反演)
  15. Springboot+学生作业管理系统 毕业设计-附源码251208
  16. css实现聊天气泡对话框
  17. b 站神器更新,还出了新玩意
  18. DL4J中文文档/调优与训练/可视化
  19. pytorch框架实现老照片修复功能详细演示(GPU版)
  20. 传导干扰测试(0.15~30MHz)

热门文章

  1. 在Linux(Ubuntu)下编写编译C语言
  2. Python弹球游戏(tkinter模块编写)
  3. 《机器学习Python实践》第6章——数据理解
  4. php提取文本数据处理,PHP文件处理—读取文件(一个字符,字串)
  5. 题目:一个数如果恰好等于它的因子之和,这个数就称为“完数”。例如6=1+2+3.编程找出1000以内的所有完数。
  6. Okhttp源码简单解析(一)
  7. 还在搭建传统IT架构的你,正在慢慢被行业淘汰
  8. C++11中内联函数(inline)
  9. java集成agent作用_javaagent 基于 javaagent 开发的 APM 工具,收集方法的执行次数和执行时间,定时输出成 json 格式 @codeKK Android开源站...
  10. 显微镜自动聚焦原理是什么_自动玻璃感应门原理是什么?看看东莞装修网怎么说...