spark从hbase读取写入数据
将RDD写入hbase
注意点:
依赖:
将lib目录下的hadoop开头jar包、hbase开头jar包添加至classpath
此外还有lib目录下的:zookeeper-3.4.6.jar、metrics-core-2.2.0.jar(缺少会提示hbase RpcRetryingCaller: Call exception不断尝试重连hbase,不报错)、htrace-core-3.1.0-incubating.jar、guava-12.0.1.jar
$SPARK_HOME/lib目录下的 spark-assembly-1.6.1-hadoop2.4.0.jar
不同的package中可能会有相同名称的类,不要导错
连接集群:
spark应用需要连接到zookeeper集群,然后借助zookeeper访问hbase。一般可以通过两种方式连接到zookeeper:
第一种是将hbase-site.xml文件加入classpath
第二种是在HBaseConfiguration实例中设置
如果不设置,默认连接的是localhost:2181会报错:connection refused
本文使用的是第二种方式。
hbase创建表:
虽然可以在spark应用中创建hbase表,但是不建议这样做,最好在hbase shell中创建表,spark写或读数据
使用saveAsHadoopDataset写入数据
package com.test
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
object TestHBase {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
//设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")
//设置zookeeper连接端口,默认2181
conf.set("hbase.zookeeper.property.clientPort", "2181")
val tablename = "account"
//初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!
val jobConf = new JobConf(conf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
val rdd = indataRDD.map(_.split(',')).map{arr=>{
/*一个Put对象就是一行记录,在构造方法中指定主键
* 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换
* Put.add方法接收三个参数:列族,列名,数据
*/
val put = new Put(Bytes.toBytes(arr(0).toInt))
put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
//转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
(new ImmutableBytesWritable, put)
}}
rdd.saveAsHadoopDataset(jobConf)
sc.stop()
}
}
使用saveAsNewAPIHadoopDataset写入数据
package com.test
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.spark._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
object TestHBase3 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
val sc = new SparkContext(sparkConf)
val tablename = "account"
sc.hadoopConfiguration.set("hbase.zookeeper.quorum","slave1,slave2,slave3")
sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
val job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
val rdd = indataRDD.map(_.split(',')).map{arr=>{
val put = new Put(Bytes.toBytes(arr(0)))
put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
(new ImmutableBytesWritable, put)
}}
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
}
}
从hbase读取数据转化成RDD
本例基于官方提供的例子
package com.test
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark._
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io._
object TestHBase2 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
val sc = new SparkContext(sparkConf)
val tablename = "account"
val conf = HBaseConfiguration.create()
//设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")
//设置zookeeper连接端口,默认2181
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set(TableInputFormat.INPUT_TABLE, tablename)
// 如果表不存在则创建表
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(tablename)) {
val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))
admin.createTable(tableDesc)
}
//读取数据并转化成rdd
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val count = hBaseRDD.count()
println(count)
hBaseRDD.foreach{case (_,result) =>{
//获取行键
val key = Bytes.toString(result.getRow)
//通过列族和列名获取列
val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))
val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))
println("Row key:"+key+" Name:"+name+" Age:"+age)
}}
sc.stop()
admin.close()
}
}
spark从hbase读取写入数据相关推荐
- 用多态来实现U盘,Mp3,移动硬盘和电脑的对接,读取写入数据。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.T ...
- vb6 串口同时读取写入数据怎么避免冲突_实例:S7-200 SMART通过Modbus-RTU读取温湿度传感器数据...
本实例我们介绍下西门子S7-200 SMART PLC如何通过Modbus-RTU协议读取温湿度传感器的数值.实例使用的硬件如下: S7-200 SMART CPU ST20: 温湿度传感器(支持Mo ...
- vb6 串口同时读取写入数据怎么避免冲突_分布式场景下的数据复制究竟怎么做...
主从复制 集群中有一个主节点,写操作都必须经过主节点完成,读操作主从节点都可以处理. 同步复制 数据在副本上落盘才返回. 优点:保证在副本上的数据是最新数据. 缺点:延迟高,响应慢. 异步复制 数据不 ...
- Springboot + Easyexcel读取写入数据,多头行数,多sheet,复杂表头简单实现
Springboot + Easyexcel 读取数据 简单读取excel文件 读取下图的 excel 数据 导入依赖,阿里的easyexcel插件 <dependency><gro ...
- 嵌入式C语言STM32在FLASH中读取写入数据
STM32F4XX向指定FLASH地址读写 向FLASH中写入数据的主体思想就是先解锁,然后清标志位,然后找到要写入的地址,然后改变标志准备写入,然后在按已有的函数按地址一字节一字节的写入,最后要将F ...
- C# 操作地址 从内存中读取写入数据(初级)
本示例以植物大战僵尸为例, 实现功能为 每1秒让阳光刷新为 9999.本示例使用的游戏版本为 [植物大战僵尸2010年度版], 使用的辅助查看内存地址的工具是 CE. 由于每次启动游戏, 游戏中阳光 ...
- HBase批量写入数据
一.HBase安装 1.上传解压 2.修改环境变量 vi /etc/profile export HBASE_HOME=/home/hadoop/hbase export PATH=$PATH:$HB ...
- spark保存到mysql_Spark写入数据到MySQL
以下是本人做的测试,如果有错误请及时指正,有问题欢迎一起讨论. 情景: 需要读取HDFS上的数据,处理之后,写入到MySQL数据库里面去. 实现: 1.版本 spark版本:1.2.1 MySQL版本 ...
- java spark读写hdfs_Spark读取HDFS数据输出到不同的文件
最近有一个需求是这样的:原来的数据是存储在MySQL,然后通过Sqoop将MySQL的数据抽取到了HDFS集群上,抽取到HDFS上的数据都是纯数据,字段值之间以\t分隔,现在需要将这部分数据还原为js ...
最新文章
- 2019年企业云呈现五大技术发展趋势
- 剑指 offer 链表倒数的第k个数
- cisco无线网络实施方案
- 在unity 中,使用http请求,下载文件到可读可写路径
- 第一款Micropython图形化编辑器—Python Editor
- 大家不要催!雷军的螺丝刀已经准备好了...
- 【Java】Java StreamCorruptedException: invalid stream header: EFBFBDEF
- android 涂鸦之图片叠加,android图像处理系列之七--图片涂鸦,水印-图片叠加...
- windows 2003 server右键菜单没有共享选项的解决办法
- [渝粤教育] 信阳农林学院 鱼类学 参考 资料
- Win10专业版启用.NET FrameWork 3.5
- 01超精美渐变色动态背景完整示例【CSS动效实战(纯CSS与JS动效)】
- 生活记录:压抑暂时解脱
- 4.1 数据仓库基础与Apache Hive入门
- xp系统桌面没有计算机,在xp系统中,为什么桌面所有图标都消失?
- 数据可视化笔记1 数据可视化简介(简史、分类、功能、目标)
- 市场监管新规下Android接入的友盟Umeng移动统计/推送/分享SDK过程问题总结
- 闭关修炼,看了老大的博客,才发现自己是多么的技术低,原来我就达到06年的他
- 超声波测距仪编程_51单片机控制的超声波测距仪程序
- 人生需要规划——感受徐小平 (转自徐小平博客)
热门文章
- java tomcat jndi,Tomcat JNDI 资源
- c语言学生成绩删除功能,c语言学生成绩管理系统程序设计,有添加,查找,删除,输出,修改,排序等功能!!!...
- 为什么c语言要定义变量,C语言为什么要规定对所用到的变量要“先定义,后使用”...
- gen文件下有两个R.java_gen目录无法更新,或者gen目录下的R.JAVA文件无法生成
- 小程序引入的echarts过大如何解决_解决微信小程序引用echarts视图模糊的问题
- android 定制ui,AndroidSDK-UI定制
- 液晶弹性自由能计算_自由能方法应用(一)开放计算平台BRIDGE的介绍及使用案例...
- python文件实时同步_python文件自动同步备份v1.2【运维必备】2020/12/31
- linux查看文件列表内存地址ll,linux指令之文件查看 ls
- python读取pdf文档书签 bookmark_Python利用PyPDF2库获取PDF文件总页码实例