hbase-day05

1、bulkLoad实现批量导入

优点:

  1. 如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即HBase提供的HFileOutputFormat类。

  2. 它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。

限制:

  1. 仅适合初次数据导入,即表内数据为空,或者每次入库表内都无数据的情况。
  2. HBase集群与Hadoop集群为同一集群,即HBase所基于的HDFS为生成HFile的MR的集群

代码编写:

提前在Hbase中创建好表

生成Hfile基本流程:

  1. 设置Mapper的输出KV类型:

    K: ImmutableBytesWritable(代表行键)

    V: KeyValue (代表cell)

​ 2. 开发Mapper

​ 读取你的原始数据,按你的需求做处理

​ 输出rowkey作为K,输出一些KeyValue(Put)作为V

​ 3. 配置job参数

​ a. Zookeeper的连接地址

​ b. 配置输出的OutputFormat为HFileOutputFormat2,并为其设置参数

​ 4. 提交job

​ 导入HFile到RegionServer的流程

​ 构建一个表描述对象

​ 构建一个region定位工具

​ 然后用LoadIncrementalHFiles来doBulkload操作

pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>hadoop-bigdata17</artifactId><groupId>com.shujia</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>had-hbase-demo</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId></dependency><dependency><groupId>org.apache.phoenix</groupId><artifactId>phoenix-core</artifactId></dependency><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId></dependency></dependencies><build><plugins><!-- compiler插件, 设定JDK版本 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><encoding>UTF-8</encoding><source>1.8</source><target>1.8</target><showWarnings>true</showWarnings></configuration></plugin><!-- 带依赖jar 插件--><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

代码如下:

package com.bulkloadingdemo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/*** @author WangTao* @date 2022/6/14 19:53*/class BulkLoadingMapper  extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>{@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context) throws IOException, InterruptedException {//按照tab对每行数据进行分割String[] strings = value.toString().split("\t");if(strings.length>7){String phoneNum = strings[0];String wg = strings[1];String city = strings[2];String qx = strings[3];String stayTime = strings[4];String startTime = strings[5];String endTime = strings[6];String dateTime = strings[7];//设计rowkeyString rowkey = phoneNum+"_"+startTime;KeyValue wg2 = new KeyValue(rowkey.getBytes(), "info".getBytes(), "wg".getBytes(), wg.getBytes());KeyValue city2 = new KeyValue(rowkey.getBytes(), "info".getBytes(), "city".getBytes(), city.getBytes());KeyValue qx2 = new KeyValue(rowkey.getBytes(), "info".getBytes(), "qx".getBytes(), qx.getBytes());KeyValue stayTime2 = new KeyValue(rowkey.getBytes(), "info".getBytes(), "stayTime".getBytes(), stayTime.getBytes());KeyValue endTime2 = new KeyValue(rowkey.getBytes(), "info".getBytes(), "endTime".getBytes(), endTime.getBytes());KeyValue dateTime2 = new KeyValue(rowkey.getBytes(), "info".getBytes(), "dateTime".getBytes(), dateTime.getBytes());ImmutableBytesWritable rowkey2 = new ImmutableBytesWritable(rowkey.getBytes());context.write(rowkey2, wg2);context.write(rowkey2, city2);context.write(rowkey2, qx2);context.write(rowkey2, stayTime2);context.write(rowkey2, endTime2);context.write(rowkey2, dateTime2);}}
}public class BulkLoadDemo {public static void main(String[] args) throws Exception {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", "master:2181,node1:2181,node2:2181");//创建一个Job作业Job job = Job.getInstance(conf);job.setJobName("BulkLoadDemo  dianxin_data");job.setJarByClass(BulkLoadDemo.class);//这个可以进行设置,但是不会生效,reduce的任务是有region的数量决定的job.setNumReduceTasks(4);//配置Map相关的内容job.setMapperClass(BulkLoadingMapper.class);job.setMapOutputKeyClass(ImmutableBytesWritable.class);job.setMapOutputValueClass(KeyValue.class);job.setPartitionerClass(SimpleTotalOrderPartitioner.class);//配置Reduce//保证分区内部是有序的job.setReducerClass(KeyValueSortReducer.class);//配置输入路径FileInputFormat.addInputPath(job,new Path("/data/DIANXIN/input/dianxin_data"));//配置输出路径FileOutputFormat.setOutputPath(job,new Path("/data/DIANXIN/output"));Connection conn = ConnectionFactory.createConnection(conf);Admin admin = conn.getAdmin();Table dianxin_bulk = conn.getTable(TableName.valueOf("dianxin_bulk"));RegionLocator regionLocator = conn.getRegionLocator(TableName.valueOf("dianxin_bulk"));HFileOutputFormat2.configureIncrementalLoad(job,dianxin_bulk,regionLocator);//开始执行MapReduce任务boolean b = job.waitForCompletion(true);System.out.println("========dianxin_data生成HFile文件成功,对应HBASE表:dianxin_bulk========");if(b){//第二步,加载Hfile文件到 Hbase 中LoadIncrementalHFiles loadIncrementalHFiles = new LoadIncrementalHFiles(conf);loadIncrementalHFiles.doBulkLoad(new Path("/data/DIANXIN/output"),admin,dianxin_bulk,regionLocator);}else {System.out.println("数据导入hbase失败。。。");}}
}/*** 1、将数据传入到HDFS上:/data/DIANXIN/input/dianxin_data* 2、在Hbase中建表 create 'dianxin_bulk','info'* 3、打包上传* 4、执行*   hadoop jar had-hbase-demo-1.0-SNAPSHOT-jar-with-dependencies.jar com.shujia.hbaseapi.bulkloadingdemo.BulkLoadDemo*/

电信数据

手机号,网格编号,城市编号,区县编号,停留时间,进入时间,离开时间,时间分区
D55433A437AEC8D8D3DB2BCA56E9E64392A9D93C,117210031795040,83401,8340104,301,20180503190539,20180503233517,20180503手机号和进入时间

说明

  1. 最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。

  2. 最终输出部分,Value类型是KeyValue 或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer。

  3. MR例子中HFileOutputFormat2.configureIncrementalLoad(job, dianxin_bulk, regionLocator);自动对job进行配置。SimpleTotalOrderPartitioner是需要先对key进行整体排序,然后划分到每个reduce中,保证每一个reducer中的的key最小最大值区间范围,是不会有交集的。因为入库到HBase的时候,作为一个整体的Region,key是绝对有序的。

  4. MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库HBase,相当于move HFile到HBase的Region中,HFile子目录的列族内容没有了,但不能直接使用mv命令移动,因为直接移动不能更新HBase的元数据。

  5. HFile入库到HBase通过HBase中 LoadIncrementalHFiles的doBulkLoad方法,对生成的HFile文件入库

2、HBase中rowkey的设计(重点!!面试题)

HBase的RowKey设计

HBase是三维有序存储的,通过rowkey(行键),column key(column family和qualifier)和TimeStamp(时间戳)这个三个维度可以对HBase中的数据进行快速定位。

HBase中rowkey可以唯一标识一行记录,在HBase查询的时候,有两种方式:

通过get方式,指定rowkey获取唯一一条记录

通过scan方式,设置startRow和stopRow参数进行范围匹配

全表扫描,即直接扫描整张表中所有行记录

rowkey长度原则

rowkey是一个二进制码流,可以是任意字符串,最大长度 64kb ,实际应用中一般为10-100bytes,以 byte[] 形式保存,一般设计成定长。

建议越短越好,不要超过16个字节,原因如下:

数据的持久化文件HFile中是按照KeyValue存储的,如果rowkey过长,比如超过100字节,1000w行数据,光rowkey就要占用100*1000w=10亿个字节,将近1G数据,这样会极大影响HFile的存储效率;

MemStore将缓存部分数据到内存,如果rowkey字段过长,内存的有效利用率就会降低,系统不能缓存更多的数据,这样会降低检索效率。

目前操作系统都是64位系统,内存8字节对齐,控制在16个字节,8字节的整数倍利用了操作系统的最佳特性。

rowkey散列原则

如果rowkey按照时间戳的方式递增,不要将时间放在二进制码的前面,建议将rowkey的高位作为散列字段,由程序随机生成,低位放时间字段,这样将提高数据均衡分布在每个RegionServer,以实现负载均衡的几率。如果没有散列字段,首字段直接是时间信息,所有的数据都会集中在一个RegionServer上,这样在数据检索的时候负载会集中在个别的RegionServer上,造成热点问题,会降低查询效率。

rowkey唯一原则

必须在设计上保证其唯一性,rowkey是按照字典顺序排序存储的,因此,设计rowkey的时候,要充分利用这个排序的特点,将经常读取的数据存储到一块,将最近可能会被访问的数据放到一块。

什么是热点

HBase中的行是按照rowkey的字典顺序排序的,这种设计优化了scan操作,可以将相关的行以及会被一起读取的行存取在临近位置,便于scan。然而糟糕的rowkey设计是热点的源头。 热点发生在大量的client直接访问集群的一个或极少数个节点(访问可能是读,写或者其他操作)。大量访问会使热点region所在的单个机器超出自身承受能力,引起性能下降甚至region不可用,这也会影响同一个RegionServer上的其他region,由于主机无法服务其他region的请求。 设计良好的数据访问模式以使集群被充分,均衡的利用。

为了避免写热点,设计rowkey使得不同行在同一个region,但是在更多数据情况下,数据应该被写入集群的多个region,而不是一个。

下面是一些常见的避免热点的方法以及它们的优缺点:

加盐

这里所说的加盐不是密码学中的加盐,而是在rowkey的前面增加随机数,具体就是给rowkey分配一个随机前缀以使得它和之前的rowkey的开头不同。分配的前缀种类数量应该和你想使用数据分散到不同的region的数量一致。加盐之后的rowkey就会根据随机生成的前缀分散到各个region上,以避免热点。

哈希

哈希会使同一行永远用一个前缀加盐。哈希也可以使负载分散到整个集群,但是读却是可以预测的。使用确定的哈希可以让客户端重构完整的rowkey,可以使用get操作准确获取某一个行数据

反转

第三种防止热点的方法时反转固定长度或者数字格式的rowkey。这样可以使得rowkey中经常改变的部分(最没有意义的部分)放在前面。这样可以有效的随机rowkey,但是牺牲了rowkey的有序性。

反转rowkey的例子以手机号为rowkey,可以将手机号反转后的字符串作为rowkey,这样的就避免了以手机号那样比较固定开头导致热点问题

时间戳反转

一个常见的数据处理问题是快速获取数据的最近版本,使用反转的时间戳作为rowkey的一部分对这个问题十分有用,可以用 Long.Max_Value - timestamp 追加到key的末尾,例如 [key]reverse_timestamp , [key] 的最新值可以通过scan [key]获得[key]的第一条记录,因为HBase中rowkey是有序的,第一条记录是最后录入的数据。

比如需要保存一个用户的操作记录,按照操作时间倒序排序,在设计rowkey的时候,可以这样设计

[userId反转]Long.Max_Value - timestamp,在查询用户的所有操作记录数据的时候,直接指定反转后的userId,startRow是[userId反转]000000000000,stopRow是[userId反转]Long.Max_Value - timestamp

如果需要查询某段时间的操作记录,startRow是[user反转]Long.Max_Value - 起始时间,stopRow是[userId反转]Long.Max_Value - 结束时间

其他一些建议

尽量减少行和列的大小在HBase中,value永远和它的key一起传输的。当具体的值在系统间传输时,它的rowkey,列名,时间戳也会一起传输。如果你的rowkey和列名很大,甚至可以和具体的值相比较,那么你将会遇到一些有趣的问题。HBase storefiles中的索引(有助于随机访问)最终占据了HBase分配的大量内存,因为具体的值和它的key很大。可以增加block大小使得storefiles索引再更大的时间间隔增加,或者修改表的模式以减小rowkey和列名的大小。压缩也有助于更大的索引。

列族尽可能越短越好,最好是一个字符

冗长的属性名虽然可读性好,但是更短的属性名存储在HBase中会更好

# 原数据:以时间戳_user_id作为rowkey
# 时间戳高位变化不大,太连续,最终可能会导致热点问题
1638584124_user_id
1638584135_user_id
1638584146_user_id
1638584157_user_id
1638584168_user_id
1638584179_user_id# 解决方案:加盐、反转、哈希# 加盐
# 加上随即前缀,随机的打散
# 该过程无法预测 前缀时随机的
00_1638584124_user_id
05_1638584135_user_id
03_1638584146_user_id
04_1638584157_user_id
02_1638584168_user_id
06_1638584179_user_id# 反转
# 适用于高位变化不大,低位变化大的rowkey
4214858361_user_id
5314858361_user_id
6414858361_user_id
7514858361_user_id
8614858361_user_id
9714858361_user_id# 散列 md5、sha1、sha256......
25531D7065AE158AAB6FA53379523979_user_id
60F9A0072C0BD06C92D768DACF2DFDC3_user_id
D2EFD883A6C0198DA3AF4FD8F82DEB57_user_id
A9A4C265D61E0801D163927DE1299C79_user_id
3F41251355E092D7D8A50130441B58A5_user_id
5E6043C773DA4CF991B389D200B77379_user_id# 时间戳"反转"
# rowkey:时间戳_user_id
# rowkey是字典升序的,那么越新的记录会被排在最后面,不容易被获取到
# 需求:让最新的记录排在最前面# 大数:9999999999
# 大数-小数1638584124_user_id => 8361415875_user_id
1638584135_user_id => 8361415864_user_id
1638584146_user_id => 8361415853_user_id
1638584157_user_id => 8361415842_user_id
1638584168_user_id => 8361415831_user_id
1638584179_user_id => 8361415820_user_id1638586193_user_id => 8361413806_user_id

合理设计rowkey实战(电信)

手机号,网格编号,城市编号,区县编号,停留时间,进入时间,离开时间,时间分区
D55433A437AEC8D8D3DB2BCA56E9E64392A9D93C,117210031795040,83401,8340104,301,20180503190539,20180503233517,20180503将用户位置数据保存到hbase查询需求1、通过手机号查询用户最近10条位置记录2、获取用户某一天在一个城市中的所有位置怎么设计hbase表1、rowkey2、时间戳

3、二级索引

二级索引的本质就是建立各列值与行键之间的映射关系

Hbase的局限性:

  HBase本身只提供基于行键和全表扫描的查询,而行键索引单一,对于多维度的查询困难。

所以我们引进一个二级索引的概念

常见的二级索引:

HBase的一级索引就是rowkey,我们只能通过rowkey进行检索。如果我们相对hbase里面列族的列列进行一些组合查询,就需要采用HBase的二级索引方案来进行多条件的查询。

  1. MapReduce方案
  2. ITHBASE(Indexed-Transanctional HBase)方案
  3. IHBASE(Index HBase)方案
  4. Hbase Coprocessor(协处理器)方案
  5. Solr+hbase方案 redis+hbase 方案

  6. CCIndex(complementalclustering index)方案

二级索引的种类

1、创建单列索引2、同时创建多个单列索引3、创建联合索引(最多同时支持3个列)4、只根据rowkey创建索引

单表建立二级索引

1.首先disable ‘表名’
2.然后修改表alter 'LogTable',METHOD=>'table_att','coprocessor'=>'hdfs:///写好的Hbase协处理器(coprocessor)的jar包名|类的绝对路径名|1001'3. enable '表名'

二级索引的设计思路

二级索引的本质就是建立各列值与行键之间的映射关系如上图1,当要对F:C1这列建立索引时,只需要建立F:C1各列值到其对应行键的映射关系,如C11->RK1等,这样就完成了对F:C1列值的二级索引的构建,当要查询符合F:C1=C11对应的F:C2的列值时(即根据C1=C11来查询C2的值,图1青色部分)其查询步骤如下:1. 根据C1=C11到索引数据中查找其对应的RK,查询得到其对应的RK=RK12. 得到RK1后就自然能根据RK1来查询C2的值了 这是构建二级索引大概思路,其他组合查询的联合索引的建立也类似。

Mapreduce的方式创建二级索引

使用整合MapReduce的方式创建hbase索引。主要的流程如下:

1.1扫描输入表,使用hbase继承类TableMapper

1.2获取rowkey和指定字段名称和字段值

1.3创建Put实例, value=” “, rowkey=班级,column=学号

1.4使用IdentityTableReducer将数据写入索引表

案例:

1、在hbase中创建索引表 student_index

create 'student_index','info'

2、编写mapreduce代码

package com.shujia.hbaseapi.hbaseindexdemo;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/***  编写整个mapreduce程序建立索引表*/class IndexMapper extends TableMapper<Text, NullWritable>{@Overrideprotected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, NullWritable>.Context context) throws IOException, InterruptedException {String id = Bytes.toString(key.get());String clazz = Bytes.toString(value.getValue("info".getBytes(), "clazz".getBytes()));String key1 = id+"_"+clazz;context.write(new Text(key1),NullWritable.get());}
}/**** reduce端获取map端传过来的key*/class IndexReduce extends TableReducer<Text,NullWritable,NullWritable>{@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, NullWritable, Mutation>.Context context) throws IOException, InterruptedException {String[] strings = key.toString().split("_");String id = strings[0];String clazz = strings[1];//索引表也是属于hbase的表,需要使用put实例添加数据Put put = new Put(clazz.getBytes());put.add("info".getBytes(),id.getBytes(),"".getBytes());context.write(NullWritable.get(),put);}
}public class HbaseIndex {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration conf = new Configuration();conf.set("hbase.zookeeper.quorum", "master:2181,node1:2181,node2:2181");Job job = Job.getInstance(conf);job.setJobName("建立学生索引表");job.setJarByClass(HbaseIndex.class);Scan scan = new Scan();scan.addFamily("info".getBytes());//指定对哪张表建立索引,以及指定需要建索引的列所属的列簇TableMapReduceUtil.initTableMapperJob("students",scan,IndexMapper.class,Text.class,NullWritable.class,job);TableMapReduceUtil.initTableReducerJob("student_index",IndexReduce.class,job);job.waitForCompletion(true);}}

3、打成jar包上传到hadoop中运行

hadoop jar had-hbase-demo-1.0-SNAPSHOT-jar-with-dependencies.jar com.shujia.hbaseapi.hbaseindexdemo.HbaseIndex

4、编写查询代码,测试结果(先查询索引表,在查数据)

package com.shujia.hbaseapi.hbaseindexdemo;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;public class HbaseIndexToStudents {private HConnection conn;private HBaseAdmin hAdmin;@Beforepublic void connect() {try {//1、获取Hadoop的相关配置环境Configuration conf = new Configuration();//2、获取zookeeper的配置conf.set("hbase.zookeeper.quorum", "master:2181,node1:2181,node2:2181");//获取与Hbase的连接,这个连接是将来可以用户获取hbase表的conn = HConnectionManager.createConnection(conf);//将来我们要对表做DDL相关操作,而对表的操作在hbase架构中是有HMasterhAdmin = new HBaseAdmin(conf);System.out.println("建立连接成功:" + conn + ", HMaster获取成功:" + hAdmin);} catch (IOException e) {e.printStackTrace();}}/*** 通过索引表进行查询数据* <p>* 需求:获取理科二班所有的学生信息,不适用过滤器,使用索引表查询*/@Testpublic void scanData() {try {long start = System.currentTimeMillis();//创建一个集合存放查询到的学号ArrayList<Get> gets = new ArrayList<>();//获取到索引表HTableInterface student_index = conn.getTable("student_index");//创建Get实例Get get = new Get("理科二班".getBytes());Result result = student_index.get(get);List<Cell> cells = result.listCells();for (Cell cell : cells) {//每一个单元格的列名byte[] bytes = CellUtil.cloneQualifier(cell);String id = Bytes.toString(bytes);Get get1 = new Get(id.getBytes());//将学号添加到集合中gets.add(get1);}//获取真正的学生数据表 studentsHTableInterface students = conn.getTable("students");Result[] results = students.get(gets);for (Result result1 : results) {String id = Bytes.toString(result1.getRow());String name = Bytes.toString(result1.getValue("info".getBytes(), "name".getBytes()));String age = Bytes.toString(result1.getValue("info".getBytes(), "age".getBytes()));String gender = Bytes.toString(result1.getValue("info".getBytes(), "gender".getBytes()));String clazz = Bytes.toString(result1.getValue("info".getBytes(), "clazz".getBytes()));System.out.println("学号:" + id + ", 姓名:" + name + ", 年龄:" + age + ", 性别:" + gender + ", 班级:" + clazz);}long endtime = System.currentTimeMillis();System.out.println("=========================================");System.out.println((endtime - start) + "毫秒");} catch (IOException e) {e.printStackTrace();}}@Testpublic void getData() {try {long start = System.currentTimeMillis();//获取真正的学生数据表 studentsHTableInterface students = conn.getTable("students");Scan scan = new Scan();SubstringComparator substringComparator = new SubstringComparator("理科二班");SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("info".getBytes(), "clazz".getBytes(), CompareFilter.CompareOp.EQUAL, substringComparator);scan.setFilter(singleColumnValueFilter);ResultScanner scanner = students.getScanner(scan);Result rs = null;while ((rs = scanner.next()) != null) {String id = Bytes.toString(rs.getRow());String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));String age = Bytes.toString(rs.getValue("info".getBytes(), "age".getBytes()));String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));System.out.println("学号:" + id + ", 姓名:" + name + ", 年龄:" + age + ", 性别:" + gender + ", 班级:" + clazz);}long endtime = System.currentTimeMillis();System.out.println("=========================================");System.out.println((endtime - start) + "毫秒");} catch (IOException e) {e.printStackTrace();}}@Afterpublic void close() {if (conn != null) {try {conn.close();} catch (IOException e) {e.printStackTrace();}System.out.println("conn连接已经关闭.....");}if (hAdmin != null) {try {hAdmin.close();} catch (IOException e) {e.printStackTrace();}System.out.println("HMaster已经关闭......");}}
}

4、Phoenix二级索引

对于Hbase,如果想精确定位到某行记录,唯一的办法就是通过rowkey查询。如果不通过rowkey查找数据,就必须逐行比较每一行的值,对于较大的表,全表扫描的代价是不可接受的。

1、开启索引支持

# 关闭hbase集群
stop-hbase.sh# 在/usr/local/soft/hbase-1.4.6/conf/hbase-site.xml中增加如下配置<property><name>hbase.regionserver.wal.codec</name><value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>
<property><name>hbase.rpc.timeout</name><value>60000000</value>
</property>
<property><name>hbase.client.scanner.timeout.period</name><value>60000000</value>
</property>
<property><name>phoenix.query.timeoutMs</name><value>60000000</value>
</property># 同步到所有节点
scp hbase-site.xml node1:`pwd`
scp hbase-site.xml node2:`pwd`# 修改phoenix目录下的bin目录中的hbase-site.xml
<property><name>hbase.rpc.timeout</name><value>60000000</value>
</property>
<property><name>hbase.client.scanner.timeout.period</name><value>60000000</value>
</property>
<property><name>phoenix.query.timeoutMs</name><value>60000000</value>
</property># 启动hbase
start-hbase.sh
# 重新进入phoenix客户端
sqlline.py master,node1,node2

2、创建索引

2.1、全局索引

全局索引适合读多写少的场景。如果使用全局索引,读数据基本不损耗性能,所有的性能损耗都来源于写数据。数据表的添加、删除和修改都会更新相关的索引表(数据删除了,索引表中的数据也会删除;数据增加了,索引表的数据也会增加)

注意: 对于全局索引在默认情况下,在查询语句中检索的列如果不在索引表中,Phoenix不会使用索引表将,除非使用hint。

手机号 进入网格的时间 离开网格的时间 区县编码 经度 纬度 基站标识 网格编号 业务类型# 创建DIANXIN.sql --脚本,通过命令执行
CREATE TABLE IF NOT EXISTS DIANXIN (mdn VARCHAR ,start_date VARCHAR ,end_date VARCHAR ,county VARCHAR,x DOUBLE ,y  DOUBLE,bsid VARCHAR,grid_id  VARCHAR,biz_type VARCHAR, event_type VARCHAR , data_source VARCHAR ,CONSTRAINT PK PRIMARY KEY (mdn,start_date)
) column_encoded_bytes=0;# 上传数据DIANXIN.csv# 导入数据
psql.py master,node1,node2 DIANXIN.sql DIANXIN.csv# 创建全局索引
CREATE INDEX DIANXIN_INDEX ON DIANXIN ( end_date );# 查询数据 ( 索引未生效)
select * from DIANXIN where end_date = '20180503154014';# 强制使用索引 (索引生效) hint
select /*+ INDEX(DIANXIN DIANXIN_INDEX) */  * from DIANXIN where end_date = '20180503154014';select /*+ INDEX(DIANXIN DIANXIN_INDEX) */  * from DIANXIN where end_date = '20180503154014'  and start_date = '20180503154614';# 取索引列,(索引生效)
select end_date from DIANXIN where end_date = '20180503154014';# 创建多列索引
CREATE INDEX DIANXIN_INDEX1 ON DIANXIN ( end_date,COUNTY );# 多条件查询 (索引生效)
select end_date,MDN,COUNTY from DIANXIN where end_date = '20180503154014' and COUNTY = '8340104';# 查询所有列 (索引未生效)
select  * from DIANXIN where end_date = '20180503154014'  and COUNTY = '8340104';# 查询所有列 (索引生效)
select /*+ INDEX(DIANXIN DIANXIN_INDEX1) */ * from DIANXIN where end_date = '20180503154014' and COUNTY = '8340104';# 单条件  (索引未生效)
select end_date from DIANXIN where  COUNTY = '8340103';
# 单条件  (索引生效) end_date 在前
select COUNTY from DIANXIN where end_date = '20180503154014';# 删除索引
drop index DIANXIN_INDEX on DIANXIN;

2.2、本地索引

本地索引适合写多读少的场景,或者存储空间有限的场景。和全局索引一样,Phoenix也会在查询的时候自动选择是否使用本地索引。本地索引因为索引数据和原数据存储在同一台机器上,避免网络数据传输的开销,所以更适合写多的场景。由于无法提前确定数据在哪个Region上,所以在读数据的时候,需要检查每个Region上的数据从而带来一些性能损耗。

注意:对于本地索引,查询中无论是否指定hint或者是查询的列是否都在索引表中,都会使用索引表。

# 创建本地索引
CREATE LOCAL INDEX DIANXIN_LOCAL_IDEX ON DIANXIN(grid_id);# 索引生效
select grid_id from dianxin where grid_id='117285031820040';# 索引生效
select * from dianxin where grid_id='117285031820040';

2.3、覆盖索引

覆盖索引是把原数据存储在索引数据表中,这样在查询时不需要再去HBase的原表获取数据就,直接返回查询结果。

注意:查询是 select 的列和 where 的列都需要在索引中出现。

# 创建覆盖索引
CREATE INDEX DIANXIN_INDEX_COVER ON DIANXIN ( x,y ) INCLUDE ( county );# 查询所有列 (索引未生效)
select * from DIANXIN where x=117.288 and y =31.822;# 强制使用索引 (索引生效)
select /*+ INDEX(DIANXIN DIANXIN_INDEX_COVER) */ * from DIANXIN where x=117.288 and y =31.822;# 查询索引中的列 (索引生效) mdn是DIANXIN表的RowKey中的一部分
select x,y,county from DIANXIN where x=117.288 and y =31.822;
select mdn,x,y,county from DIANXIN where x=117.288 and y =31.822;# 查询条件必须放在索引中  select 中的列可以放在INCLUDE (将数据保存在索引中)
select /*+ INDEX(DIANXIN DIANXIN_INDEX_COVER) */ x,y,count(*) from DIANXIN group by x,y;

5、Phoenix JDBC

# 导入依赖
<dependency><groupId>org.apache.phoenix</groupId><artifactId>phoenix-core</artifactId><version>4.15.0-HBase-1.4</version>
</dependency>
<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.2</version>
</dependency>
Connection conn = DriverManager.getConnection("jdbc:phoenix:master,node1,node2:2181");PreparedStatement ps = conn.prepareStatement("select /*+ INDEX(DIANXIN DIANXIN_INDEX) */ * from DIANXIN where end_date=?");ps.setString(1, "20180503212649");ResultSet rs = ps.executeQuery();while (rs.next()) {String mdn = rs.getString("mdn");String start_date = rs.getString("start_date");String end_date = rs.getString("end_date");String x = rs.getString("x");String y = rs.getString("y");String county = rs.getString("county");System.out.println(mdn + "\t" + start_date + "\t" + end_date + "\t" + x + "\t" + y + "\t" + county);}ps.close();conn.close();

Hbase-day05_bulkLoad实现批量导入_HBase中rowkey的设计_二级索引_Phoenix二级索引相关推荐

  1. HBase结合MapReduce批量导入

    Hbase是Hadoop生态体系配置的数据库,我们可以通过HTable api中的put方法向Hbase数据库中插入数据,但是由于put效率太低,不能批量插入大量的数据,文本将详细介绍如何通过MapR ...

  2. solr使用网页浏览器批量导入数据库中数据(本案例是mysql)

    如果想要知道如何安装solr,集成IKAnalyzer中文分词器,批量导入数据库数据,java使用参照以下本博主博文: 安装solr https://blog.csdn.net/u013294097/ ...

  3. hbase中的row key_hbase中RowKey的设计规则

    在关心到hbase中rowkey设计的时候,说明hbase基本的知识已经了解了.就直接上干货(如果不了解的可以参考我上面一片关于hbase的自我总结的文章,我觉得总结的还是很好的).如果文章中有错误或 ...

  4. Habse中Rowkey的设计原则——通俗易懂篇

    Hbase的Rowkey设计原则 一. Hbase介绍 HBase -> Hadoop Database,HBase是Apache的Hadoop项目的子项目.HBase不同于一般的关系数据库,它 ...

  5. ui设计中的版式设计_设计中的版式-第3部分

    ui设计中的版式设计 and how not to suck at it 以及如何不吸吮它 This is the 3rd and last part of the series. Here we t ...

  6. 小程序中的权限设计_低代码布道师的博客-CSDN博客

    日常我们开发小程序的时候,经常需要考虑权限如何设计,比如在我的页面,管理员可以看到一些菜单,而普通用户可以看到另外一些菜单.那如何设计这种带权限的功能呢?本文就以低代码工具为例,看看低代码中是如何设计 ...

  7. Hbase中RowKey的设计原则和热点问题

    Rowkey设计需要遵循三个原则,即长度原则.散列原则.唯一原则. 1. 长度原则 由于Rowkey是一个二进制码流,可以是任意字符串,最大长度64kb,实际应用中一般为10- 100bytes,以b ...

  8. python导入类中函数不能用_我可以使用本地类中导入模块中的函数吗?(Python)...

    需要在某个地方导入声明:import import_module class local_class(): def local_function(): action = raw_input() if ...

  9. python中扑克牌类设计_一摞Python风格的纸牌

    一摞Python风格的纸牌 接下来我会用一个非常简单的例子来展示如何实现 getitme 和 len 这两个特殊方法,通过这个例子我们也能见识到特殊方法的强 大. 示例 1-1 里的代码建立了一个纸牌 ...

最新文章

  1. Android应用程序键盘(Keyboard)消息处理机制分析(20)
  2. Anaconda 默认环境
  3. 16位代码段与32位代码段的区别
  4. 卸载CentOS 5.4自带的OpenJDK,配置新的Java环境
  5. D. Cut and Stick(Codeforces Round #716 (Div. 2))
  6. 1006 换个格式输出整数 (15 分)—PAT (Basic Level) Practice (中文)
  7. css横向排列_前端初学者李不白,html+css的角度,带你分析蘑菇街官网!!!
  8. 四菱天线怎么加强_白话天线(2)---什么是天线?
  9. Java 用Myeclipse部署项目基础坏境搭建
  10. 从音箱入门到高手必看知识(一)—— 音箱初级知识
  11. 激活visio(2019)
  12. 学单片机有什么用?单片机自学网有哪些?
  13. 搭建内网BT服务器(转)
  14. marve register license
  15. 如何向Google提交网站?(转)
  16. python中fact()是什么意思_python中fact函数是什么及如何使用?
  17. 基于禁忌搜索算法的三维装箱问题
  18. BTC源码分析 交易(一)
  19. hotspot源码角度看OOP之类属性的底层实现(一)
  20. 整数规划、混合整数规划基础知识

热门文章

  1. 流水的技术,铁打的Java!非一线城市也能拿20K+
  2. 乌镇世界区块链大会回顾:从Defi到MOV
  3. Python 常用笔记
  4. excel一列数据两两组合(excel一列的数据等于另一列)
  5. Linux用户进程高精度定时器去抖动
  6. 工业机器人码垛教学实施_工业机器人码垛方案设计.doc
  7. 斗破苍穹文字页游php_浅谈斗破影视化下的网络文学二次创作
  8. 晓莲说-何不原创:java 实现二维数组冒泡排序
  9. Redis源码阅读01-读了一下redis启动流程涉及的源码我都读了个啥
  10. 像Google一样开会 【http://www.isweetriver.com/2010/google-meetings】