本实例采用Scala开发,实现了RDD数据两种方式入库到HBase,从HBase中读取数据并print输出。

build.sbt

name := "SparkSbt"version := "0.1"scalaVersion := "2.10.5"libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.0"libraryDependencies+="org.apache.hbase"%"hbase-common"%"1.2.0"
libraryDependencies+="org.apache.hbase"%"hbase-client"%"1.2.0"
libraryDependencies+="org.apache.hbase"%"hbase-server"%"1.2.0"

先hbase shell执行命令创建表:

create 'account' , 'cf'

create 'account2' , 'cf'

源码

package com.whq.testimport org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark._object HBaseTest {def main(args: Array[String]) {val sparkConf = new SparkConf().setAppName("HBaseTest")val sc = new SparkContext(sparkConf)// please ensure HBASE_CONF_DIR is on classpath of spark driverval conf = HBaseConfiguration.create()//设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置conf.set("hbase.zookeeper.quorum","192.168.91.144")conf.set("hbase.zookeeper.property.clientPort", "2181")入库方式一saveAsHadoopDatasetprintln("————————————入库方式一")var tablename = "account"conf.set(TableInputFormat.INPUT_TABLE, tablename)//初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!val jobConf = new JobConf(conf)jobConf.setOutputFormat(classOf[TableOutputFormat])jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)//待入库数据val indataRDD = sc.makeRDD(Array("11,whq,30","12,wanghongqi,29","13,xiaoming,15"))//数据转换为可入库的RDD[(ImmutableBytesWritable,Put)]val rdd = indataRDD.map(_.split(',')).map{arr=>{/*一个Put对象就是一行记录,在构造方法中指定主键* 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换* Put.add方法接收三个参数:列族,列名,数据*/val put = new Put(Bytes.toBytes(arr(0).toInt))put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))//转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset(new ImmutableBytesWritable, put)}}//入库写入rdd.saveAsHadoopDataset(jobConf)入库方式二saveAsNewAPIHadoopDatasetprintln("————————————入库方式二")tablename = "account2"conf.set(TableOutputFormat.OUTPUT_TABLE, tablename)val job2 = Job.getInstance(conf)job2.setOutputKeyClass(classOf[ImmutableBytesWritable])job2.setOutputValueClass(classOf[Result])job2.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])val rdd2 = indataRDD.map(_.split(',')).map{arr=>{val put = new Put(Bytes.toBytes(arr(0)))put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))(new ImmutableBytesWritable, put)}}rdd2.saveAsNewAPIHadoopDataset(job2.getConfiguration())读取数据println("————————————读取数据")conf.set(TableInputFormat.INPUT_TABLE, tablename)//读取数据并转化成rddval hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])val count = hBaseRDD.count()println(count)hBaseRDD.collect().foreach{case (_,result) =>{//获取行键val key = Bytes.toString(result.getRow)//通过列族和列名获取列val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))println("Row key:"+key+" Name:"+name+" Age:"+age)}}sc.stop()}
}

执行命令

spark-submit --master yarn --deploy-mode client --class com.whq.test.HBaseTest sparksbt_2.10-0.1.jar

查看数据情况

scan 'account'

scan 'account2'

Spark 连接 HBase 入库及查询操作相关推荐

  1. Hbase的表查询操作

    目录 1.scan 2.get 1.filterBySingleColumnValueFilter(String tablename, String strF, String strC, String ...

  2. Hbase对表进行查询操作详细教程

    创建命名空间 booksystem Create_namespace'booksystem' 创建表 bookinfo Create 'booksystem:bookinfo','info1' 追加列 ...

  3. (一) elasticsearch-dsl查询操作

    目录 一: 连接配置 二: 查询操作 1: 简单的查询操作: 2: 普通查询: 3: 组合查询: 三: 过滤 四: 聚合: 4.1: 聚合案例: 五: 排序: 六: 分页: 官方文档地址: https ...

  4. HBase的安装、写入和查询操作

    实验材料及说明 在Ubuntu系统的/学号(每个人之间的学号)/salesInfo目录下,有买家的购买记录文件Sales,该文件记录了买家的id,购买商品的id以及购买日期,文件为名为Sales.Sa ...

  5. 配置phoenix连接hbase_使用 Phoenix-4.11.0连接 Hbase 集群 ,并使用 JDBC 查询测试

    什么是 Phoenix ? Apache Phoenix 是运行在Hbase之上的高性能关系型数据库,通过Phoenix可以像使用jdbc访问关系型数据库一样访问hbase. Phoenix,操作的表 ...

  6. Spark与Iceberg整合查询操作-查询快照,表历史,data files Manifests 查询快照,时间戳数据...

    1.8.6 Spark与Iceberg整合查询操作 1.8.6.1 DataFrame API加载Iceberg中的数据 Spark操作Iceberg不仅可以使用SQL方式查询Iceberg中的数据, ...

  7. Oracle数据库多表连接查询操作以及查询操作的补充

    文章目录 一.查询语句概述 1.查询语句基本语法格式 2.伪表和伪劣 二.单表查询 1.select子句 2.FROM子句 3.WHERE子句 4.DISTINCT关键字 5.GROUP BY子句与聚 ...

  8. 数据湖(十四):Spark与Iceberg整合查询操作

    文章目录 Spark与Iceberg整合查询操作 一.DataFrame API加载Iceberg中的数据 二.查询表快照

  9. Spark+hadoop+mllib及相关概念与操作笔记

    Spark+hadoop+mllib及相关概念与操作笔记 作者: lw 版本: 0.1 时间: 2016-07-18 1.调研相关注意事项 a) 理解调研 调研的意义在于了解当前情况,挖掘潜在的问题, ...

最新文章

  1. 「镁客·请讲」智加科技刘万千:技术与生态的成熟将推动自动驾驶的落地应用...
  2. 分布式环境下的并发问题
  3. ajaxbootstrap
  4. ASP.NET MVC3 技术(二) WebGrid 的使用方法
  5. 页面加载完成之后,开始显示内容
  6. react中使用构建缓存_完整的React课程:如何使用React构建聊天室应用
  7. 用户注册PHP,PHP制作用户注册系统,php制作用户注册_PHP教程
  8. CCF 2013-12-1 出现次数最多的数
  9. 客户端父进程提前死亡
  10. python自动测试q_阿里大牛教你基于Python的 Selenium自动化测试示例解析
  11. c#和javascript分别轻松实现计算24点
  12. Django 国际化和本地化
  13. 进入bios看了,vt 已经开了,为什么打开模拟器还显示未开启?
  14. php随笔_PHP随笔笔记
  15. Incapsula的全球网络地图
  16. SSD源码解读1-数据层AnnotatedDataLayer
  17. 图纸打印什么时候用蓝图_cad图怎么打印成施工蓝图
  18. win10开机桌面壁纸位置
  19. python清洗文本非法字符_Python 文本字符串清理
  20. ibm服务器维护重点,IBM服务器存储维护基础知识.pptx

热门文章

  1. 工作165:混入调用的时候
  2. 前端学习(2254)team怎么接受到pr
  3. 前端学习(2185):tabberitem和路由结果
  4. 前端学习(1713):前端系列javascript之运行
  5. VMware出现配置文件 .vmx 是由VMware产品创建,但该产品与此版 VMware workstation 不兼容,因此无法使用(VMware版本不兼容问题)
  6. mybatis学习(38):动态sql-foreach
  7. java学习(60):java最终类(了解)
  8. git 本地推送本地仓库到远程
  9. solor mysql_solr 同步 mysql
  10. 5月份 Github 上最热的十个 Python 项目,从Debug工具到AI水军、量化交易系统。