目录

1.创建Hbase表

2.测试数据文件 test2.txt

3. Spark2.4.6用 bulkload写入Hbase1.3.1


批量写入的优势:

  1. 不使用预写日志(WAL),不会出现flush和split
  2. 更少的垃圾回收

处理流程:

流程:

  1. 把外部数据导入HDFS
  2. 用spark 把数据处理成hbase的文件hfile所需格式,保存入hdfs
  3. 调用spark on hbase 的bulkLoad api 加载入hbase 目标表

1.创建Hbase表

create_namespace 'defaut'

create 'defaut:t_test', 'DATA'

2.测试数据文件 test2.txt

1595638951700,1,1.1939971,1.4677016,1.4034922

1595638951721,1,1.3716854,1.566847,1.4458307

1595638951723,2,1.3352232,1.4566108,1.5208404

1595638951715,1,1.8877013,1.1247256,1.6103745

1595638951696,2,1.2885377,1.7600425,1.4150856

1595638951707,1,1.8486422,1.1446141,1.5813918

1595638951694,3,1.2366319,1.4496765,1.7620823

1595638951740,1,1.9078307,1.7746134,1.337183

1595638951714,3,1.261858,1.2809255,1.4845717

1595638951697,2,1.5660034,1.0154893,1.6899275

3. Spark2.4.6用 bulkload写入Hbase1.3.1

package mySpark;
import org.apache.spark.sql.SparkSession;
import myHDFS.MyHDFS;
import myHbase.MyHbase;
import security.MyLoginCommon;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;public class MySpark3 implements Serializable{private SparkSession sparkSession=null;public static void main(String[] args) throws Exception {String hfilePath = "hdfs://hacluster/user/test2.hfile";String sourcefileName = "hdfs://hacluster/user/test2.txt";Long start=System.currentTimeMillis();MySpark3 mySpark3=new MySpark3();mySpark3. writeHbaseWithBuldLoadMultiCol (sourcefileName,hfilePath);Long end=System.currentTimeMillis();System.out.println("共花費:["+(end-start)/1000+"]秒。");}public void writeHbaseWithBuldLoadMultiCol(String sourceFileName,String hfilePath,String hbaseNamespace,String hbaseTableName,String hbaseFamily) throws Exception{   JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());Log.info("Load file["+sourceFileName+" to spark");JavaRDD<String> originRDD = javaSparkContext.textFile(sourceFileName);Configuration hbaseConf=MyLoginCommon.loginHbase();Connection hbaseConn =MyLoginCommon.getHbaseConn();JavaHBaseContext hbaseContext = new JavaHBaseContext(javaSparkContext, hbaseConf);String tableName="default:t_test";Log.info("sort through rowkey");JavaPairRDD<String, String>  javaPairRDD= originRDD.mapToPair(new PairFunction<String,String,String>(){@Overridepublic Tuple2<String, String> call(String s) throws Exception {String []a=s.split(",");return new Tuple2<String,String>(a[0],s);}    }) ;Log.info("sort the rowkey");JavaPairRDD<String, String>  javaPairRDDSort=  javaPairRDD.sortByKey();
JavaRDD<String> dataRDD=javaPairRDDSort.map(new Function<Tuple2<String,String>,String>(){@Overridepublic String call(Tuple2<String, String> s) throws Exception { TODO Auto-generated method stubreturn s._2;}});Log.info("product hfile formate file");
JavaPairRDD<ImmutableBytesWritable, KeyValue>  javaFlatPairRDD= dataRDD.flatMapToPair(new PairFlatMapFunction<String,ImmutableBytesWritable,KeyValue>(){@Overridepublic Iterator<Tuple2<ImmutableBytesWritable, KeyValue>> call(String s) throws Exception {List<Tuple2<ImmutableBytesWritable, KeyValue>> list=new ArrayList<Tuple2<ImmutableBytesWritable, KeyValue>>();String []strArr=s.split(",");for(int i=2;i<strArr.length;i++){String rowkey=strArr[0];KeyValue keyValue = new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes("DATA"), Bytes.toBytes("i"+(i-1)), Bytes.toBytes(strArr[i]));ImmutableBytesWritable rrk=new ImmutableBytesWritable(Bytes.toBytes(rowkey));Tuple2<ImmutableBytesWritable, KeyValue> tuple=new Tuple2<ImmutableBytesWritable, KeyValue>(rrk,keyValue);list.add(tuple);                      }return list.iterator();}    });Log.info("sava hfile to hdfs.");javaFlatPairRDD.saveAsNewAPIHadoopFile(hfilePath, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, hbaseConf);Log.info("Load hfile to table["+tableName+"]");LoadIncrementalHFiles load = new LoadIncrementalHFiles(hbaseConf);load.doBulkLoad( new Path(hfilePath),hbaseConn.getAdmin(),hbaseConn.getTable(TableName.valueOf(tableName)),hbaseConn.getRegionLocator(TableName.valueOf(tableName)));javaSparkContext.close();sparkSession.close();}}

[spark]Spark2.4.6用bulkload写入Hbase1.3.1表的多列相关推荐

  1. [spark]Spark2.4.6用put写入写入Hbase1.3.1

    场景:数据较少,用put写入 1.创建Hbase表 create_namespace 'default' create 'default:t_test1', 'DATA' 2.测试数据文件 test1 ...

  2. spark抽取mysql数据到hive_使用spark将内存中的数据写入到hive表中

    使用spark将内存中的数据写入到hive表中 hive-site.xml hive.metastore.uris thrift://master:9083 Thrift URI for the re ...

  3. 实践数据湖iceberg 第三十七课 kakfa写入iceberg的 icberg表的 enfource ,not enfource测试

    系列文章目录 实践数据湖iceberg 第一课 入门 实践数据湖iceberg 第二课 iceberg基于hadoop的底层数据格式 实践数据湖iceberg 第三课 在sqlclient中,以sql ...

  4. Spark Structured : HIve jdbc方式访问待下划线的表,找不到表的错误

    1.背景 Spark Structured : HIve jdbc方式访问待下划线的表,找不到表的错误 > select * from default._xd_after limit 1; &g ...

  5. Flink窗口+触发器 ,实现定时、定量批量写入Hbase不同的表

    需求案例 消费kafka中的数据,根据业务类型不同批量写入不同的Hbase表. 按照数据延迟和单次写入数据量要求,写入库的时候采用两种模式: 定时:满足指定数据延迟 定量:满足指定数据量 技术点解析 ...

  6. 【Spark】Spark 2.4 Stream 读取kafka 写入kafka

    1.概述 昨天一网友写了一个spark程序 读取kafka写入kafka,结果数据就是无法写入,然后交给我看看,这个程序是spark stream ,这个东东我都没玩过,我用过spark struct ...

  7. python 列表写入excel_Python将MySQL表数据写入excel

    背景:将mysql表查询结果写入excel. 1.使用sqlyog工具将查询结果导出到Excel.xml中,用excel打开发现:因为text字段中有回车换行操作,显示结果行是乱的. 2.用mysql ...

  8. python读取多个sheet文件_PythonPandas excel文件如何一次读取所有工作表,并再次写入多个工作表?,pythonpandasexcel,一次性,全部,sheet,重新...

    关于如何一次性读取全部sheet,可看我之前写的一篇博客 : 本文重点讲述如何对同一个excel文件一次性写入多个sheet数据 代码示例: import pandas as pd writer = ...

  9. postgresql获取表最后更新时间(通过触发器将时间写入另外一张表)

    通过触发器方式获取表最后更新时间,并将时间信息写入到另外一张表 一.创建测试表和表记录更新时间表 CREATE TABLE weather( city varchar(80), temp_lo int ...

最新文章

  1. Exchange Server 2010续定证书系列3-将续定后证书导入服务器
  2. theme vscode 护眼_VS code 豆沙绿护眼主题
  3. 深度解读畅捷通云原生架构转型实战历程
  4. PP模块: 最简单的混合生产方式
  5. windows编写linux脚本,Windows PowerShell:共享您的脚本 - 在脚本中编写 Cmdlet | Microsoft Docs...
  6. bagging和时间序列预测_时间序列的LSTM模型预测——基于Keras
  7. iOS之性能优化·提高App的编译速度
  8. autocad 二次开发 拆分图纸_AutoCAD二次开发技术在工程图纸绘制中的应用
  9. 信息学奥赛一本通C++语言——1060:均值
  10. java实验的总结_java实验总结
  11. uniapp前端处理接口返回一整个html格式
  12. 构建之法1,5,17章学习心得
  13. php报错集合,centos7安装php5.6报错集合
  14. php mysql 手机归属地_PHP 手机号码归属地查询代码 (API 接口 / mysql)
  15. doc 和docx的区别
  16. 如何将html的按钮做成圆角,HTML 圆角按钮的实现备忘
  17. c++中char的用法详解
  18. 亚马逊秒杀活动怎么报名?站斧浏览器亚马逊站内促销
  19. 用js计时器写倒计时
  20. 大学计算机音乐一起学,和学生一起学音乐

热门文章

  1. 通过set赋值,与select赋值的区别
  2. 大道至简java伪代码
  3. PHP面向对象知识点
  4. swift学习笔记《5》- 实用
  5. 十个最佳方法保护Windows文件服务器
  6. 设计模式-建造者模式(Builder Pattern)
  7. leveldb - log格式
  8. SQL光标的基本使用
  9. rpm yum 删除mysql
  10. C++中include头文件使用与的区别