将hive数据导入hbase

  • 一、hive外部表映射hbase表
    • 1. hbase里面创建表
    • 2. 建立hive外部表并映射hbase表
    • 3.测试
  • 二、spark bulkload方式
    • 1.准备事项
    • 2.pom文件
    • 3.代码

一、hive外部表映射hbase表

1. hbase里面创建表

create 'B_TEST_STU', { NAME => 'info', COMPRESSION => 'SNAPPY' }

2. 建立hive外部表并映射hbase表

create database yh_test;
use yh_test;
create external table yh_test.stu(
id String,
name String
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,
info:name,
info:gender
") TBLPROPERTIES("hbase.table.name" ="B_TEST_STU");

3.测试

在hive表插入数据

insert into table stu  values ('001','zhangsan','1');

去hbase表查看数据是否生成

scan 'hb_stu'

二、spark bulkload方式

参考官网
分为两个步骤:

  1. 生成hfile文件格式数据
  2. 将生成的hfile文件移动到hbase对应位置

1.准备事项

  1. hdfs、hive、hbase、zookeeper、spark集群均可正常使用
  2. 将hive的hive-site.xml文件复制到$SPARK_HOME/conf目录、项目的resource目录下(保证spark能够访问到外部hive)
  3. 将对应保存元数据数据库的驱动添加到项目resource目录下

2.pom文件

<properties><hadoop-version>2.6.0</hadoop-version><hive-version>1.1.0</hive-version><hbase-version>1.2.0</hbase-version><spark.version>2.3.3</spark.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.12</version></dependency><!-- Spark Dependencies --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-common</artifactId><version>${hbase-version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>${hbase-version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>${hbase-version}</version></dependency><!-- 指定hadoop-client API的版本 --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop-version}</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-hbase-handler</artifactId><version>${hive-version}</version></dependency></dependencies><build><pluginManagement><plugins><!-- 编译scala的插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version></plugin><!-- 编译java的插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version></plugin></plugins></pluginManagement><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>scala-test-compile</id><phase>process-test-resources</phase><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><executions><execution><phase>compile</phase><goals><goal>compile</goal></goals></execution></executions></plugin><!-- 打jar插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>

3.代码

package com.yhimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.SparkSessionimport java.net.URI/***
spark-on-yarn提交方式bin/spark-submit \
--class com.yh.BulkloadTest \
--master yarn \
--executor-memory 1G \
--total-executor-cores 2 \
--num-executors 2 \
hdfs://node01:8020/spark_bulkload_hbase-1.0-SNAPSHOT.jar \*/
object BulkloadTest {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("BulkloadTest").master("local[*]").enableHiveSupport().getOrCreate()//从hive中读取数据,数据是在hdfs上,hive是个外部表,你也可以用内部表,都一样spark.sql("use yh_test")spark.sql("select *,id rowkey from stu").show()val Data = spark.sql("select *,id rowkey from stu")val dataRdd = Data.rdd.flatMap(row => { //cf是列族名,ID、DATA_TYPE、DEVICE_ID为字段名val rowkey = row.getAs[String]("rowkey".toLowerCase)Array((rowkey, ("info", "id", row.getAs[String]("id"))),(rowkey, ("info", "name", row.getAs[String]("name"))))})//要保证行键,列族,列名的整体有序,必须先排序后处理,防止数据异常过滤rowkeyval rdds = dataRdd.filter(x => x._1 != null).sortBy(x => (x._1, x._2._1, x._2._2)).map(x => {//将rdd转换成HFile需要的格式,Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key//KeyValue的实例为valueval rowKey = Bytes.toBytes(x._1)val family = Bytes.toBytes(x._2._1)val colum = Bytes.toBytes(x._2._2)val value = Bytes.toBytes(x._2._3)(new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, family, colum, value))})//临时文件保存位置,在hdfs上print(" 临时文件保存位置 hdfs://node01:8020/tmp/test2")val tmpdir = "hdfs://node01:8020/tmp/test2"val hconf = new Configuration()hconf.set("fs.defaultFS", "hdfs://node01:8020")val fs = FileSystem.get(new URI("hdfs://node01:8020"), hconf, "hadoop") //hadoop为你的服务器用户名if (fs.exists(new Path(tmpdir))) { //由于生成Hfile文件的目录必须是不存在的,所以我们存在的话就把它删除掉println("删除临时文件夹")fs.delete(new Path(tmpdir), true)}//创建HBase的配置val conf = HBaseConfiguration.create()conf.set("hbase.zookeeper.quorum", "node01,node02,node03")conf.set("hbase.zookeeper.property.clientPort", "2181")//为了预防hfile文件数过多无法进行导入,设置该参数值conf.setInt("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", 5000)//此处运行完成之后,在tmpdir生成的Hfile文件rdds.saveAsNewAPIHadoopFile(tmpdir,classOf[ImmutableBytesWritable],classOf[KeyValue],classOf[HFileOutputFormat2],conf)//开始HFile导入到Hbaseval load = new LoadIncrementalHFiles(conf)//hbase的表名val tableName = "B_TEST_STU"//创建hbase的链接,利用默认的配置文件,实际上读取的hbase的master地址val conn = ConnectionFactory.createConnection(conf)//根据表名获取表val table = conn.getTable(TableName.valueOf(tableName))try {//获取hbase表的region分布val regionLocator = conn.getRegionLocator(TableName.valueOf(tableName))println("获取hbase表的region分布:   "+regionLocator)//创建一个hadoop的mapreduce的jobval job = Job.getInstance(conf)//设置job名称,随便起一个就行job.setJobName("bulkload_stu_test")//此处最重要,需要设置文件输出的key,因为我们要生成HFil,所以outkey要用ImmutableBytesWritablejob.setMapOutputKeyClass(classOf[ImmutableBytesWritable])//输出文件的内容KeyValuejob.setMapOutputValueClass(classOf[KeyValue])print("配置HFileOutputFormat2的信息-----开始导入----------")//配置HFileOutputFormat2的信息HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)print("开始导入----------")//开始导入load.doBulkLoad(new Path(tmpdir), conn.getAdmin, table, regionLocator)print("结束导入----------")} finally {table.close()conn.close()}}
}

上面是基于hive1.x
后面我又基于hive2.x实现了bulkload方式导入。

<properties><hadoop-version>2.6.5</hadoop-version><hive-version>2.3.2</hive-version><hbase-version>1.2.4</hbase-version><spark-version>2.0.0</spark-version><scala-version>2.11.8</scala-version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala-version}</version></dependency><!-- Spark Dependencies --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark-version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark-version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>${spark-version}</version></dependency><!--连接连接hive元数据的服务--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive-thriftserver_2.11</artifactId><version>${spark-version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-common</artifactId><version>${hbase-version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>${hbase-version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>${hbase-version}</version></dependency><!-- 指定hadoop-client API的版本 --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop-version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop-version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop-version}</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-hbase-handler</artifactId><version>${hive-version}</version></dependency></dependencies><build><pluginManagement><plugins><!-- 编译scala的插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version></plugin><!-- 编译java的插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version></plugin></plugins></pluginManagement><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>scala-test-compile</id><phase>process-test-resources</phase><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><executions><execution><phase>compile</phase><goals><goal>compile</goal></goals></execution></executions></plugin><!-- 打jar插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>

整体上差不多。有一个小错误:

解决方式,指定spark.sql.warehouse.dir:

val spark = SparkSession.builder().master("local[*]").appName("SavingDetail").config("spark.sql.warehouse.dir","/spark-warehouse/").enableHiveSupport().getOrCreate()

执行main方法即可将hive数据通过bulkload方式导入数据。

hive数据导入hbase相关推荐

  1. sqoop将hbase数据导入mysql_Sqoop将mysql数据导入hbase的血与泪

    Sqoop将mysql数据导入hbase的血与泪(整整搞了大半天) 版权声明:本文为yunshuxueyuan原创文章. 如需转载请标明出处: https://my.oschina.net/yunsh ...

  2. hive数据导入导出

    Hive数据导入导出的几种方式 一,Hive数据导入的几种方式 首先列出讲述下面几种导入方式的数据和hive表. 导入: 本地文件导入到Hive表: Hive表导入到Hive表; HDFS文件导入到H ...

  3. 数据导入HBase最常用的三种方式及实践分析

    要使用Hadoop,数据合并至关重要,HBase应用甚广.一般而言,需要针对不同情景模式将现有的各种类型的数据库或数据文件中的数据转入至HBase中. 常见方式为:1.使用HBase的API中的Put ...

  4. KUDU数据导入尝试一:TextFile数据导入Hive,Hive数据导入KUDU

    背景 SQLSERVER数据库中单表数据几十亿,分区方案也已经无法查询出结果.故:采用导出功能,导出数据到Text文本(文本>40G)中. 因上原因,所以本次的实验样本为:[数据量:61w条,文 ...

  5. mongodb数据导入hbase,spark读取hbase数据分析

    为什么80%的码农都做不了架构师?>>>    使用mavn管理相关依赖包pom.xml <project xmlns="http://maven.apache.or ...

  6. 使用iServer将udb数据导入HBase

    部署大数据环境 通过iServer将UDB数据导入Hbase需要依赖大数据环境,大家可以选择自行安装部署SuperMap大数据环境需要的spark Hadoop hbase软件,可以参考我们往期的文章 ...

  7. hive常用功能:Hive数据导入导出方式

    作为数据仓库的Hive,存储着海量用户使用的数据.在平常的Hive使用过程中,难免对遇到将外部数据导入到Hive或者将Hive中的数据导出来.今天主要就来学习一下Hive的几种数据导入和导出的方式. ...

  8. hive删除hbase数据_Hive数据导入Hbase

    方案一:Hive关联HBase表方式 适用场景:数据量不大4T以下(走hbase的api导入数据) 一.hbase表不存在的情况 创建hive表hive_hbase_table映射hbase表hbas ...

  9. hive数据导入到hbase

    方式一: ​ hbase中建表,然后hive中建外部表,这样当hive中写入数据后,hbase中的表也会同时更新 创建hbase表 create 'classes','user' --表名是class ...

最新文章

  1. 取消掉Transfer-Encoding:chunked
  2. swift实现ios类似微信输入框跟随键盘弹出的效果
  3. 翻转棋游戏c语言讲解,有没有人懂黑白棋(翻转棋)的核心算法
  4. boost::function模块实现operator()的测试程序
  5. wicket常用控件使用方法 .
  6. mysql审计 社区版有吗_mysql 5.6 社区版上审计功能,不扯皮
  7. linux tcp ip c,Linux下TCP/IP编程--TCP实战(select)
  8. SpringBoot2.0之一 新建项目helloWorld
  9. Oracle 修改表空间文件路径
  10. C语言中全局变量存放在哪个位置?
  11. php消息实时推送技术,基于HTTP协议之WEB消息实时推送技术原理及实现
  12. 【SAE 部署 JavaWeb 项目报 404 错误】
  13. JUCE框架教程(6)——通过AudioProcessorValuetTeeState链接数据和UI
  14. 5个免费、优质视频素材网站,可商用
  15. Docker和Pycharm
  16. 学计算机智商,IQ最高的十大专业公布,考验你们智商的时刻到了!
  17. mysql数据库试用期过了_mysql试用期过了
  18. 通过xsd校验xml文件
  19. [绝对原创] AKM项目轶事之FLYBACK飞机晚点索赔
  20. 生命游戏 Life of Game

热门文章

  1. Python中的时间格式的读取与转换(time模块)
  2. 【无人机】【2007.09】小型无人机通信链路可靠性研究
  3. [开车技巧]开车高手整理了一年
  4. (二)nesting解读simulations>examples
  5. 记第一次C++面试与经验分享
  6. 揭穿几个著名的Windows“伪”优化技(…
  7. Codeforces Round #786 (Div.3) 题解 (CF786) [A-G]
  8. 概率密度函数,概率分布函数,正态分布
  9. flutter如何实现删除线效果
  10. Hive中的常用的日期函数