目录

外部数据源

MySQL 数据源

演示代码

HBase 数据源

HBase Sink

​​​​​​​HBase Source


外部数据源

Spark可以从外部存储系统读取数据,比如RDBMs表中或者HBase表中读写数据,这也是企业中常常使用,如:

 1)、要分析的数据存储在HBase表中,需要从其中读取数据数据分析

日志数据:电商网站的商家操作日志

订单数据:保险行业订单数据

 2)、使用Spark进行离线分析以后,往往将报表结果保存到MySQL表中

网站基本分析(pv、uv。。。。。)

注意:实际开发中会封装为工具类直接使用

https://github.com/teeyog/blog/issues/22

https://blog.csdn.net/u011817217/article/details/81667115

MySQL 数据源

实际开发中常常将分析结果RDD保存至MySQL表中,使用foreachPartition函数;此外Spark中提供JdbcRDD用于从MySQL表中读取数据。

调用RDD#foreachPartition函数将每个分区数据保存至MySQL表中,保存时考虑降低RDD分区数目和批量插入,提升程序性能。

演示代码

package cn.itcast.coreimport java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.{JdbcRDD, RDD}/*** Author itcast* Desc 演示使用Spark将数据写入到MySQL,再从MySQL读取出来*/
object SparkJdbcDataSource {def main(args: Array[String]): Unit = {//1.创建SparkContextval sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")//2.准备数据val data: RDD[(String, Int)] = sc.parallelize(List(("jack", 18), ("tom", 19), ("rose", 20)))//3.将RDD中的数据保存到MySQL中去//将每一个分区中的数据保存到MySQL中去,有几个分区,就会开启关闭连接几次//data.foreachPartition(itar=>dataToMySQL(itar))data.foreachPartition(dataToMySQL) //方法即函数,函数即对象//4.从MySQL读取数据/*class JdbcRDD[T: ClassTag](sc: SparkContext,getConnection: () => Connection,sql: String,lowerBound: Long,upperBound: Long,numPartitions: Int,mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)*/val getConnection = ()=> DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","root")val sql:String = "select id,name,age from t_student where id >= ? and id <= ?"val mapRow = (rs:ResultSet) => {val id: Int = rs.getInt(1)val name: String = rs.getString(2)val age: Int = rs.getInt("age")(id,name,age)}val studentRDD: JdbcRDD[(Int, String, Int)] = new JdbcRDD(sc,getConnection,sql,4,5,2,mapRow)println(studentRDD.collect().toBuffer)}/*** 将分区中的数据保存到MySQL* @param itar 传过来的每个分区有多条数据*/def dataToMySQL(itar: Iterator[(String, Int)]): Unit = {//0.加载驱动//Class.forName("") //源码中已经加载了//1.获取连接val connection: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","root")//2.编写sqlval sql:String = "INSERT INTO `t_student` (`name`, `age`) VALUES (?, ?);"//3.获取psval ps: PreparedStatement = connection.prepareStatement(sql)itar.foreach(data=>{//4.设置参数ps.setString(1,data._1)ps.setInt(2,data._2)//5.执行sqlps.addBatch()})ps.executeBatch()ps.close()connection.close()}
}

​​​​​​​HBase 数据源

Spark可以从HBase表中读写(Read/Write)数据,底层采用TableInputFormatTableOutputFormat方式,与MapReduce与HBase集成完全一样,使用输入格式InputFormat和输出格式OutputFoamt。

​​​​​​​HBase Sink

回顾MapReduce向HBase表中写入数据,使用TableReducer,其中OutputFormat为TableOutputFormat,读取数据Key:ImmutableBytesWritable(Rowkey),Value:Put(Put对象)

写入数据时,需要将RDD转换为RDD[(ImmutableBytesWritable, Put)]类型,调用saveAsNewAPIHadoopFile方法数据保存至HBase表中。

HBase Client连接时,需要设置依赖Zookeeper地址相关信息及表的名称,通过Configuration设置属性值进行传递。

范例演示:将词频统计结果保存HBase表,表的设计

代码如下:

package cn.itcast.coreimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** 将RDD数据保存至HBase表中*/
object SparkWriteHBase {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 构建RDDval list = List(("hadoop", 234), ("spark", 3454), ("hive", 343434), ("ml", 8765))val outputRDD: RDD[(String, Int)] = sc.parallelize(list, numSlices = 2)// 将数据写入到HBase表中, 使用saveAsNewAPIHadoopFile函数,要求RDD是(key, Value)//  组装RDD[(ImmutableBytesWritable, Put)]/*** HBase表的设计:* 表的名称:htb_wordcount* Rowkey:  word* 列簇:    info* 字段名称: count*/val putsRDD: RDD[(ImmutableBytesWritable, Put)] = outputRDD.mapPartitions { iter =>iter.map { case (word, count) =>// 创建Put实例对象val put = new Put(Bytes.toBytes(word))// 添加列put.addColumn(// 实际项目中使用HBase时,插入数据,先将所有字段的值转为String,再使用Bytes转换为字节数组Bytes.toBytes("info"), Bytes.toBytes("cout"), Bytes.toBytes(count.toString))// 返回二元组(new ImmutableBytesWritable(put.getRow), put)}}// 构建HBase Client配置信息val conf: Configuration = HBaseConfiguration.create()// 设置连接Zookeeper属性conf.set("hbase.zookeeper.quorum", "node1")conf.set("hbase.zookeeper.property.clientPort", "2181")conf.set("zookeeper.znode.parent", "/hbase")// 设置将数据保存的HBase表的名称conf.set(TableOutputFormat.OUTPUT_TABLE, "htb_wordcount")/*def saveAsNewAPIHadoopFile(path: String,// 保存的路径keyClass: Class[_], // Key类型valueClass: Class[_], // Value类型outputFormatClass: Class[_ <: NewOutputFormat[_, _]], // 输出格式OutputFormat实现conf: Configuration = self.context.hadoopConfiguration // 配置信息): Unit*/putsRDD.saveAsNewAPIHadoopFile("datas/spark/htb-output-" + System.nanoTime(), //classOf[ImmutableBytesWritable], //classOf[Put], //classOf[TableOutputFormat[ImmutableBytesWritable]], //conf)// 应用程序运行结束,关闭资源sc.stop()}
}

运行完成以后,使用hbase shell查看数据:

​​​​​​​HBase Source

回顾MapReduce从读HBase表中的数据,使用TableMapper,其中InputFormat为TableInputFormat,读取数据Key:ImmutableBytesWritable,Value:Result

从HBase表读取数据时,同样需要设置依赖Zookeeper地址信息和表的名称,使用Configuration设置属性,形式如下:

此外,读取的数据封装到RDD中,Key和Value类型分别为:ImmutableBytesWritable和Result,不支持Java Serializable导致处理数据时报序列化异常。设置Spark Application使用Kryo序列化,性能要比Java 序列化要好,创建SparkConf对象设置相关属性,如下所示:

范例演示:从HBase表读取词频统计结果,代码如下

package cn.itcast.coreimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** 从HBase 表中读取数据,封装到RDD数据集*/
object SparkReadHBase {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 读取HBase Client 配置信息val conf: Configuration = HBaseConfiguration.create()conf.set("hbase.zookeeper.quorum", "node1")conf.set("hbase.zookeeper.property.clientPort", "2181")conf.set("zookeeper.znode.parent", "/hbase")// 设置读取的表的名称conf.set(TableInputFormat.INPUT_TABLE, "htb_wordcount")/*def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](conf: Configuration = hadoopConfiguration,fClass: Class[F],kClass: Class[K],vClass: Class[V]): RDD[(K, V)]*/val resultRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])println(s"Count = ${resultRDD.count()}")resultRDD.take(5).foreach { case (rowKey, result) =>println(s"RowKey = ${Bytes.toString(rowKey.get())}")// HBase表中的每条数据封装在result对象中,解析获取每列的值result.rawCells().foreach { cell =>val cf = Bytes.toString(CellUtil.cloneFamily(cell))val column = Bytes.toString(CellUtil.cloneQualifier(cell))val value = Bytes.toString(CellUtil.cloneValue(cell))val version = cell.getTimestampprintln(s"\t $cf:$column = $value, version = $version")}}// 应用程序运行结束,关闭资源sc.stop()}
}

运行结果:

2021年大数据Spark(二十):Spark Core外部数据源引入相关推荐

  1. 2021年大数据Kafka(十二):❤️Kafka配额限速机制❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka配额限速机制 限制producer端的速率 限制c ...

  2. 2021年大数据HBase(十二):Apache Phoenix 二级索引

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Apache Phoenix 二级索引 一.索引分类 ...

  3. 2021年大数据Hive(十二):Hive综合案例!!!

    全网最详细的大数据Hive文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Hive综合案例 一.需求描述 二.项目表的字段 三.进 ...

  4. 2021年大数据Kafka(十):kafka生产者数据分发策略

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 生产者数据分发策略 策略一:用户指定了partition 策 ...

  5. 2021年大数据HBase(十六):HBase的协处理器(Coprocessor)

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 HBase的协处理器(Coprocessor) 一.起源 二 ...

  6. 2021年大数据HBase(十五):HBase的Bulk Load批量加载操作

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 HBase的Bulk Load批量加载操作 一.Bulk L ...

  7. 2021年大数据HBase(十四):HBase的原理及其相关的工作机制

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 HBase的原理及其相关的工作机制 一.HBase的flus ...

  8. 2021年大数据HBase(十):Apache Phoenix的基本入门操作

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Apache Phoenix的基本入门操作 一.Pho ...

  9. 2021年大数据Hadoop(十五):Hadoop的联邦机制 Federation

    全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 Hadoop的联邦机制 Federation 背景概述 F ...

  10. 2021年大数据Hadoop(十四):HDFS的高可用机制

    全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 HDFS的高可用机制 HDFS高可用介绍 组件介绍 Nam ...

最新文章

  1. 解密美国五角大楼人工智能中心
  2. TVideoGrabber的使用(简介)
  3. python画柱状图和折线图-Python数据可视化–折线图–柱状图
  4. jQuery-选择器(2)
  5. 数据库 三范式最简单最易记的解释
  6. html5 拖拽上传文件时,屏蔽浏览器默认打开文件
  7. Java学习笔记——反射
  8. word交叉引用插入文献后更新域之后编号未更新
  9. 新天龙官网服务器更新消息,新天龙八部怀旧服太火,增开7组服务器不够用,还得继续扩容...
  10. private-bower
  11. 部署集群linux Oracle VM VirtualBox vagrant
  12. selenium报错TypeError: 'FirefoxWebElement' object is not iterable
  13. 获取datagrid中编辑列combobox的value值与text值
  14. python迅雷下载器_简单的迅雷VIP账号获取器(Python)
  15. linux 怎么在pe下安装驱动程序,【教程】再谈“万能驱动7在PE下安装驱动”功能...
  16. CentOS7.6 部署 Snipe-it 资产管理系统
  17. python输出10行带标号的hello、world_#000 Python 入门第一题通过扩展,学到了更多的知识...
  18. AJP:斯坦福加速智能神经调控疗法治疗难治性抑郁症
  19. 微信公众平台对接C#-普通消息接收
  20. 移动电影院为小众精品影片提供更广大的生存空间

热门文章

  1. Struts2 Cannot create a session after the response has been committed 一个不起眼的错误
  2. Mybatis传递多个参数的4种方式
  3. TensorFlow基础笔记(11) max_pool2D函数 深度学习
  4. PyTorch学习之六个学习率调整策略
  5. MindSpore整体架构介绍
  6. MAML-Tracker: 目标跟踪分析:CVPR 2020(Oral)
  7. Windows系统下安装Thrift的方法
  8. 解决每次git pull需要不用输入用户名信息
  9. Python:Spider
  10. 电商商品模块数据设计与关系图