Spark 连接 HBase 入库及查询操作
本实例采用Scala开发,实现了RDD数据两种方式入库到HBase,从HBase中读取数据并print输出。
build.sbt
name := "SparkSbt"version := "0.1"scalaVersion := "2.10.5"libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.0"libraryDependencies+="org.apache.hbase"%"hbase-common"%"1.2.0"
libraryDependencies+="org.apache.hbase"%"hbase-client"%"1.2.0"
libraryDependencies+="org.apache.hbase"%"hbase-server"%"1.2.0"
先hbase shell执行命令创建表:
create 'account' , 'cf'
create 'account2' , 'cf'
源码
package com.whq.testimport org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark._object HBaseTest {def main(args: Array[String]) {val sparkConf = new SparkConf().setAppName("HBaseTest")val sc = new SparkContext(sparkConf)// please ensure HBASE_CONF_DIR is on classpath of spark driverval conf = HBaseConfiguration.create()//设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置conf.set("hbase.zookeeper.quorum","192.168.91.144")conf.set("hbase.zookeeper.property.clientPort", "2181")入库方式一saveAsHadoopDatasetprintln("————————————入库方式一")var tablename = "account"conf.set(TableInputFormat.INPUT_TABLE, tablename)//初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!val jobConf = new JobConf(conf)jobConf.setOutputFormat(classOf[TableOutputFormat])jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)//待入库数据val indataRDD = sc.makeRDD(Array("11,whq,30","12,wanghongqi,29","13,xiaoming,15"))//数据转换为可入库的RDD[(ImmutableBytesWritable,Put)]val rdd = indataRDD.map(_.split(',')).map{arr=>{/*一个Put对象就是一行记录,在构造方法中指定主键* 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换* Put.add方法接收三个参数:列族,列名,数据*/val put = new Put(Bytes.toBytes(arr(0).toInt))put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))//转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset(new ImmutableBytesWritable, put)}}//入库写入rdd.saveAsHadoopDataset(jobConf)入库方式二saveAsNewAPIHadoopDatasetprintln("————————————入库方式二")tablename = "account2"conf.set(TableOutputFormat.OUTPUT_TABLE, tablename)val job2 = Job.getInstance(conf)job2.setOutputKeyClass(classOf[ImmutableBytesWritable])job2.setOutputValueClass(classOf[Result])job2.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])val rdd2 = indataRDD.map(_.split(',')).map{arr=>{val put = new Put(Bytes.toBytes(arr(0)))put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))(new ImmutableBytesWritable, put)}}rdd2.saveAsNewAPIHadoopDataset(job2.getConfiguration())读取数据println("————————————读取数据")conf.set(TableInputFormat.INPUT_TABLE, tablename)//读取数据并转化成rddval hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])val count = hBaseRDD.count()println(count)hBaseRDD.collect().foreach{case (_,result) =>{//获取行键val key = Bytes.toString(result.getRow)//通过列族和列名获取列val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))println("Row key:"+key+" Name:"+name+" Age:"+age)}}sc.stop()}
}
执行命令
spark-submit --master yarn --deploy-mode client --class com.whq.test.HBaseTest sparksbt_2.10-0.1.jar
查看数据情况
scan 'account'
scan 'account2'
Spark 连接 HBase 入库及查询操作相关推荐
- Hbase的表查询操作
目录 1.scan 2.get 1.filterBySingleColumnValueFilter(String tablename, String strF, String strC, String ...
- Hbase对表进行查询操作详细教程
创建命名空间 booksystem Create_namespace'booksystem' 创建表 bookinfo Create 'booksystem:bookinfo','info1' 追加列 ...
- (一) elasticsearch-dsl查询操作
目录 一: 连接配置 二: 查询操作 1: 简单的查询操作: 2: 普通查询: 3: 组合查询: 三: 过滤 四: 聚合: 4.1: 聚合案例: 五: 排序: 六: 分页: 官方文档地址: https ...
- HBase的安装、写入和查询操作
实验材料及说明 在Ubuntu系统的/学号(每个人之间的学号)/salesInfo目录下,有买家的购买记录文件Sales,该文件记录了买家的id,购买商品的id以及购买日期,文件为名为Sales.Sa ...
- 配置phoenix连接hbase_使用 Phoenix-4.11.0连接 Hbase 集群 ,并使用 JDBC 查询测试
什么是 Phoenix ? Apache Phoenix 是运行在Hbase之上的高性能关系型数据库,通过Phoenix可以像使用jdbc访问关系型数据库一样访问hbase. Phoenix,操作的表 ...
- Spark与Iceberg整合查询操作-查询快照,表历史,data files Manifests 查询快照,时间戳数据...
1.8.6 Spark与Iceberg整合查询操作 1.8.6.1 DataFrame API加载Iceberg中的数据 Spark操作Iceberg不仅可以使用SQL方式查询Iceberg中的数据, ...
- Oracle数据库多表连接查询操作以及查询操作的补充
文章目录 一.查询语句概述 1.查询语句基本语法格式 2.伪表和伪劣 二.单表查询 1.select子句 2.FROM子句 3.WHERE子句 4.DISTINCT关键字 5.GROUP BY子句与聚 ...
- 数据湖(十四):Spark与Iceberg整合查询操作
文章目录 Spark与Iceberg整合查询操作 一.DataFrame API加载Iceberg中的数据 二.查询表快照
- Spark+hadoop+mllib及相关概念与操作笔记
Spark+hadoop+mllib及相关概念与操作笔记 作者: lw 版本: 0.1 时间: 2016-07-18 1.调研相关注意事项 a) 理解调研 调研的意义在于了解当前情况,挖掘潜在的问题, ...
最新文章
- 「镁客·请讲」智加科技刘万千:技术与生态的成熟将推动自动驾驶的落地应用...
- 分布式环境下的并发问题
- ajaxbootstrap
- ASP.NET MVC3 技术(二) WebGrid 的使用方法
- 页面加载完成之后,开始显示内容
- react中使用构建缓存_完整的React课程:如何使用React构建聊天室应用
- 用户注册PHP,PHP制作用户注册系统,php制作用户注册_PHP教程
- CCF 2013-12-1 出现次数最多的数
- 客户端父进程提前死亡
- python自动测试q_阿里大牛教你基于Python的 Selenium自动化测试示例解析
- c#和javascript分别轻松实现计算24点
- Django 国际化和本地化
- 进入bios看了,vt 已经开了,为什么打开模拟器还显示未开启?
- php随笔_PHP随笔笔记
- Incapsula的全球网络地图
- SSD源码解读1-数据层AnnotatedDataLayer
- 图纸打印什么时候用蓝图_cad图怎么打印成施工蓝图
- win10开机桌面壁纸位置
- python清洗文本非法字符_Python 文本字符串清理
- ibm服务器维护重点,IBM服务器存储维护基础知识.pptx
热门文章
- 工作165:混入调用的时候
- 前端学习(2254)team怎么接受到pr
- 前端学习(2185):tabberitem和路由结果
- 前端学习(1713):前端系列javascript之运行
- VMware出现配置文件 .vmx 是由VMware产品创建,但该产品与此版 VMware workstation 不兼容,因此无法使用(VMware版本不兼容问题)
- mybatis学习(38):动态sql-foreach
- java学习(60):java最终类(了解)
- git 本地推送本地仓库到远程
- solor mysql_solr 同步 mysql
- 5月份 Github 上最热的十个 Python 项目,从Debug工具到AI水军、量化交易系统。