1 配置

1.1 开发环境:

  • HBase:hbase-1.0.0-cdh5.4.5.tar.gz
  • Hadoop:hadoop-2.6.0-cdh5.4.5.tar.gz
  • ZooKeeper:zookeeper-3.4.5-cdh5.4.5.tar.gz
  • Spark:spark-2.1.0-bin-hadoop2.6

1.2 Spark的配置

  • Jar包:需要HBase的Jar如下(经过测试,正常运行,但是是否存在冗余的Jar并未证实,若发现多余的jar可自行进行删除)

  • spark-env.sh
    添加以下配置:export SPARK_CLASSPATH=/home/hadoop/data/lib1/*
    注:如果使用spark-shell的yarn模式进行测试的话,那么最好每个NodeManager节点都有配置jars和hbase-site.xml
  • spark-default.sh
spark.yarn.historyServer.address=slave11:18080
spark.history.ui.port=18080
spark.eventLog.enabled=true
spark.eventLog.dir=hdfs:///tmp/spark/events
spark.history.fs.logDirectory=hdfs:///tmp/spark/events
spark.driver.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer

1.3 数据

1)格式: barCode@item@value@standardValue@upperLimit@lowerLimit

01055HAXMTXG10100001@KEY_VOLTAGE_TEC_PWR@1.60@1.62@1.75@1.55
01055HAXMTXG10100001@KEY_VOLTAGE_T_C_PWR@1.22@1.24@1.45@0.8
01055HAXMTXG10100001@KEY_VOLTAGE_T_BC_PWR@1.16@1.25@1.45@0.8
01055HAXMTXG10100001@KEY_VOLTAGE_11@1.32@1.25@1.45@0.8
01055HAXMTXG10100001@KEY_VOLTAGE_T_RC_PWR@1.24@1.25@1.45@0.8
01055HAXMTXG10100001@KEY_VOLTAGE_T_VCC_5V@1.93@1.90@1.95@1.65
01055HAXMTXG10100001@KEY_VOLTAGE_T_VDD3V3@1.59@1.62@1.75@1.55

2 代码演示

2.1 准备动作

1)既然是与HBase相关,那么首先需要使用hbase shell来创建一个表

创建表格:create ‘data’,’v’,create ‘data1’,’v’

2)使用spark-shell进行操作,命令如下:

bin/spark-shell --master yarn --deploy-mode client --num-executors 5 --executor-memory 1g --executor-cores 2

3)import 各种类

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64,Bytes}
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.commons.codec.digest.DigestUtils

2.2 代码实战

创建conf和table

val conf= HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE,"data1")
val table = new HTable(conf,"data1")

2.2.1 数据写入

格式:

val put = new Put(Bytes.toBytes("rowKey"))
put.add("cf","q","value")

使用for来插入5条数据

for(i <- 1 to 5){ var put= new Put(Bytes.toBytes("row"+i));put.add(Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes("value"+i));table.put(put)}

到hbase shell中查看结果

2.2.2 数据读取

val hbaseRdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])

1)take

hbaseRdd take 1

2)scan

var scan = new Scan();
scan.addFamily(Bytes.toBytes(“v”));
var proto = ProtobufUtil.toScan(scan)
var scanToString = Base64.encodeBytes(proto.toByteArray());
conf.set(TableInputFormat.SCAN,scanToString)val datas = hbaseRdd.map( x=>x._2).map{result => (result.getRow,result.getValue(Bytes.toBytes("v"),Bytes.toBytes("value")))}.map(row => (new String(row._1),new String(row._2))).collect.foreach(r => (println(r._1+":"+r._2)))

2.3 批量插入

2.3.1 普通插入

1)代码

val rdd = sc.textFile("/data/produce/2015/2015-03-01.log")
val data = rdd.map(_.split("@")).map{x=>(x(0)+x(1),x(2))}
val result = data.foreachPartition{x => {val conf= HBaseConfiguration.create();conf.set(TableInputFormat.INPUT_TABLE,"data");conf.set("hbase.zookeeper.quorum","slave5,slave6,slave7");conf.set("hbase.zookeeper.property.clientPort","2181");conf.addResource("/home/hadoop/data/lib/hbase-site.xml");val table = new HTable(conf,"data");table.setAutoFlush(false,false);table.setWriteBufferSize(3*1024*1024); x.foreach{y => {
var put= new Put(Bytes.toBytes(y._1));put.add(Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(y._2));table.put(put)};table.flushCommits}}}

2)执行时间如下:7.6 min

2.3.2 Bulkload

1) 代码:

val conf = HBaseConfiguration.create();
val tableName = "data1"
val table = new HTable(conf,tableName)
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)lazy val job = Job.getInstance(conf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad(job,table)val rdd = sc.textFile("/data/produce/2015/2015-03-01.log").map(_.split("@")).map{x => (DigestUtils.md5Hex(x(0)+x(1)).substring(0,3)+x(0)+x(1),x(2))}.sortBy(x =>x._1).map{x=>{val kv:KeyValue = new KeyValue(Bytes.toBytes(x._1),Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(x._2+""));(new ImmutableBytesWritable(kv.getKey),kv)}}rdd.saveAsNewAPIHadoopFile("/tmp/data1",classOf[ImmutableBytesWritable],classOf[KeyValue],classOf[HFileOutputFormat],job.getConfiguration())
val bulkLoader = new LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad(new Path("/tmp/data1"),table)

2) 执行时间:7s


3)执行结果:
到hbase shell 中查看 list “data1”

通过对比我们可以发现bulkload批量导入所用时间远远少于普通导入,速度提升了60多倍,当然我没有使用更大的数据量测试,但是我相信导入速度的提升是非常显著的,强烈建议使用BulkLoad批量导入数据到HBase中。

关于Spark与Hbase之间操作就写到这里,如果有什么地方写得不对或者运行不了,欢迎指出,谢谢

转载于:https://www.cnblogs.com/simple-focus/p/6879971.html

Spark实战之读写HBase相关推荐

  1. Spark读写HBase(主要讲解SHC的使用)

    前言 Spark读写HBase本身来说是没啥可以讲的,最早之前都是基于RDD的,网上的资料就太多了,可以参考: 参考链接1 参考链接2 其实都一样,后来有了Hortonworks公司的研发人员研发了一 ...

  2. Spark读写Hbase的二种方式对比

    作者:Syn良子 出处:http://www.cnblogs.com/cssdongl 转载请注明出处 一.传统方式 这种方式就是常用的TableInputFormat和TableOutputForm ...

  3. spark sql hbase java_Spark 读写 HBase 的两种方式(RDD、DataFrame)

    使用 saveAsHadoopDataset 写入数据 import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, Ta ...

  4. 7.读写HBase数据(华为云学习笔记,Spark编程基础,大数据)

    读写HBase数据 ① 在hbase-shell中使用命令创建HBase数据库: ② 使用Spark读写HBase数据库中的数据. 实验原理 -> HBase HBase是一个高可靠.高性能.面 ...

  5. Spark读写HBase:处理纽约出租车数据

    一.数据及部分代码来源: 解析geojson数据:https://github.com/jwills/geojson 纽约出租车数据:http://www.andresmh.com/nyctaxitr ...

  6. 使用java开发spark实战

    课程内容 使用java开发spark 实战 一:环境搭建 安装jdk 和maven. 1. 安装jdk并配置环境变量 系统变量→新建 JAVA_HOME 变量 . 变量值填写jdk的安装目录(本人是 ...

  7. 大数据Spark实战第一集 导学

    开篇词:学了就能用的 Spark? 你好,很高兴我们在<即学即用的 Spark 实战 44 讲>这个课程中相遇,我是范东来,Spark Contributor 和 Superset Con ...

  8. 大数据Spark实战视频教程-张长志-专题视频课程

    大数据Spark实战视频教程-33364人已学习 课程介绍         大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装.Spark表配置.平台搭建.快学Scala入门.Sp ...

  9. 【原创】大叔经验分享(25)hive通过外部表读写hbase数据

    在hive中创建外部表: CREATE EXTERNAL TABLE hive_hbase_table( key string, name string, desc string ) STORED B ...

最新文章

  1. Mysql搭建主从服务器
  2. 在几何画板中如何制作圆柱的侧面展开动画_倒计时与时钟演示 | 几何画板
  3. 简单纯文字浮动信息-Tooltip
  4. Codeforece E. Anton and Permutation
  5. 每天一学——VAB RANGE
  6. github 建立博客
  7. linux里终端安转视频播放器的操作及显示
  8. win8这台计算机到桌面上,Win8如何在桌面上显示“我的电脑”图标,小编教你Win8如何在桌面上显示我的电脑...
  9. jQuery按住滑块拖动验证插件
  10. Cross-speaker Style Transfer with Prosody Bottleneck in Neural Speech Synthesis
  11. 手机上的环境光传感器
  12. “5G+”发展论坛暨“金帽子”年度盛典圆满结束,共同探讨5G背景下网安技术发展和前沿趋势
  13. 公众号引流好困难,这个病该怎么治?
  14. Ubuntu下如何使用编译使用john-1.9.0源码
  15. ArcGIS的运行许可文件ecp如何打开?
  16. C语言:小写字母与大写字母的转换
  17. python3爬取教务系统的个人学期课程表(无头谷歌浏览模拟登录)
  18. SSRNet:用于大规模点云表面重建的深度学习网络(CVPR2020)
  19. 【逻辑与计算理论】λ演算、组合子逻辑的历史背景
  20. 不用爬虫,简单JQuery获取磁力链接,方便下载

热门文章

  1. Linux C 中断言assert()使用简介
  2. Python中的元类及元类实现的单例模式
  3. python向服务器请求压缩数据及解压缩数据
  4. 使用UltraEdit32编辑器格式化源码功能 XML、Java、C/C++、C#
  5. makefile常用讲解(2)
  6. bash删除文件中的空行
  7. 读入源文件,并在每行前加上行号和[Tab]
  8. 前端学习(2979):vue-element-admin结构always:true始终显示
  9. [html] android手机的微信H5弹出的软键盘挡住了文本框,如何解决?
  10. [html] 你知道著名的3像素Bug指的是什么吗?怎么解决呢?