python快速写入hbase_Python 读写 hbase 数据的正确姿势(一)
title: Python 读写 hbase 数据的正确姿势(一)
tags:
hbase
happybase
python
categories:
�Hbase
comments: true
date: 2017-09-09 19:00:00
之前操作 hbase 大都是用 java 写,或者偶尔用 python 写几个一些简单的 put、get 操作。最近在使用 happybase 库批量向 hbase 导入数据,并通过 java 实现查询的一些复杂的搜索时(scan+filter),遇到了一些有趣的问题。
实验版本
Hbase 版本:1.0.0
Happybase 版本:1.1.0
Python 版本:2.7.13
问题1:filter 过滤失败
问题重现
hbase 的使用场景大概是这样的:
有一个 hbase table,存储一些文章的基本信息,包括创建时间、文章ID、文章类别ID等,同属于一个column family,”article”。
查询的场景则是查找”指定的时间范围”,”文章类型ID为N” 的所有文章数据。
根据以上场景,设计如下 table:
hbase table 为 article 。
rowkey 是 “ARTICLE” + 微秒级时间戳(类似OpenTSDB 的rowkey,便于按时间序列查到某一段时间创建的 articles),即 “ARTICLE1504939752000000″。
family 为 “basic”,包含 “ArticleID”, “ArticleTypeID”, “Created”, 三个 column。
查询时通过指定 rowkey start 和 rowkey stop,可以 scan 某一个时间段的数据(因为 rowkey 中包含数值型的时间戳),通过 hbase filter 实现”ArticleTypeID” == N 的过滤条件。
开始导入数据、准备查询,以下是导入数据部分代码 demo:
def save_batch_events(datas, table=None):
with get_connetion_pool().connection() as conn:
if table is not None:
t = conn.table(table)
else:
t = conn.table(TABLE)
b = t.batch(transaction=False)
for row, data in datas.items():
b.put(row, data)
b.send()
def save_main_v1():
datas = dict()
for i in range(100):
article_type_id = i % 2
timestamp = time.time() + i
rowkey = "ARTICLE" + str(timestamp * 1000000)
data = {
"basic:" + "ArticleID": str(i),
"basic:" + "ArticleTypeID": str(article_type_id),
"basic:" + "Created": str(timestamp),
}
datas[rowkey] = data
save_batch_events(datas)
查看一下 hbase 的数据,100 条数据全部正常导入,其中50条数据 “ArticleTypeID” 为0,50条为1 :
图 1:python-happyhbase 写入的数据
接下来就是用 hbase filter 过滤的过程了,假设查询 “ArticleTypeID” 为 0 的数据,使用 java 客户端实现查询:
public static void test_hbase_filter() throws IOException {
TableName tableName = TableName.valueOf("test_article_1");
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(tableName);
// Scan python table `test_article_1`
System.out.println("Prepare to scan !");
FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ONE);
SingleColumnValueFilter filter1 = new SingleColumnValueFilter(Bytes.toBytes("basic"),
Bytes.toBytes("ArticleTypeID"), CompareOp.EQUAL, Bytes.toBytes(1L));
list.addFilter(filter1);
Scan s = new Scan();
s.addFamily(Bytes.toBytes("basic"));
s.setFilter(list);
ResultScanner scanner = table.getScanner(s);
int num = 0;
for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
num++;
}
System.out.println("Found row: " + num);// 预期 50,结果为 0
问题出现:使用 java 期望的查询结果为 50 条,但是查出的结果却是 0 条!
使用 python 查询却可以得到正确的结果:
def recent_events_v1(start, end, table=None, filter_str=None, limit=2000):
with get_connetion_pool().connection() as conn:
if table is not None:
t = conn.table(table)
else:
t = conn.table(TABLE)
start_row = 'ARTICLE' + str(start * 1000000)
end_row = 'ARTICLE' + str(end * 1000000)
return t.scan(row_start=start_row, row_stop=end_row, filter=filter_str, limit=limit)
if __name__ == '__main__':
filter_str = "SingleColumnValueFilter('basic', 'ArticleTypeID', =, 'binary:1')"
results = recent_events_v1(start=0, end=1505023900, filter_str=filter_str)
print len([i for i in results]) # 期望值为50, 实际值为 50,正确
寻找原因
经过 N 次确认,java 的读操作是没有问题的,python 实现的读写也得到了预期的效果。进一步探究,特意用 java 完整的实现的数据的导入和查询:
public static void test_hbase_filter1() throws IOException {
tableName = TableName.valueOf("test_article_java_1");
table = conn.getTable(tableName);
System.out.println("Prepare create table !");
Admin admin = conn.getAdmin();
if (!admin.tableExists(tableName)) {
HTableDescriptor td = new HTableDescriptor(tableName);
HColumnDescriptor basic = new HColumnDescriptor("basic");
td.addFamily(basic);
admin.createTable(td);
System.out.println("Created !");
}
// Put value to test_article_java_1
System.out.println("Prepare to write data to: " + table.getName().toString());
for (int i = 0; i < 100; i++) {
Put p = new Put(Bytes.toBytes("ARTICLE" + (System.currentTimeMillis() + 1000) * 1000));
p.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("ArticleTypeID"), Bytes.toBytes(Long.valueOf(i % 2)));
table.put(p);
}
// scan test_article_java_1
scanner = table.getScanner(s);
num = 0;
for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
num++;
}
System.out.println("Found row: " + num);// 预期 50,结果为 50
}
可见,用 java 写的数据,用 java 读是没问题的,用 python 写的数据用 python 读也没问题。但 java 读 python 写的数据就存在异常,难道是 python 写的数据和 java 写的数据不一样?为此分别对比一下 python 和 java 写入 hbase 的数据:
图 2:java 写入的数据
仔细观察图 1 和图 2 中的数据可以发现,python 写入的数据中对应的 ArticleTypeID 值为 0 或 1,而 java 则是一串 bytes。突然意识到一个问题,hbase 读写的时候要求传入的数据类型为 bytes,而使用 python 传输的过程中这种整形数据是直接通过 str() 方法转成字符串存储到 hbase 中的,并不是以 bytes 的形式存于 hbase,所以使用 java 用转化成 bytes 的 filter 读才没能得到预期的结果。
正确的 filter 姿势
既然找到了原因,解决问题就比较简单了,存储的时候将整型数据全部都通过 struct.pack 方法转成 bytes 存入,这样就可以被通用的查询了,同时 使用 python 查询的时候也将 filter 中的整型数值替换成 bytes 格式。
使用 struct.pack 方法将整型转成 bytes 时,注意选择使用 big-endian 的 Byte order,即 pack 方法的第一个参数使用 >。因为 java 官方 client 采用这种字节序,下面是 Bytes.toBytes 的实现源码,可见采用的是 big-endian:
/**
* Convert a long value to a byte array using big-endian.
*
* @param val value to convert
* @return the byte array
*/
public static byte[] toBytes(long val) {
byte [] b = new byte[8];
for (int i = 7; i > 0; i--) {
b[i] = (byte) val;
val >>>= 8;
}
b[0] = (byte) val;
return b;
写入的代码:
def save_main_v2():
datas = dict()
for i in range(100):
article_type_id = i % 2
timestamp = time.time() + i
rowkey = "ARTICLE" + str(timestamp * 1000000)
data = {
"basic:" + "ArticleID": str(i),
"basic:" + "ArticleTypeID": struct.pack('>q', article_type_id),
"basic:" + "Created": str(timestamp),
}
datas[rowkey] = data
save_batch_events(datas, table="test_article_2")
查询是的filter:
filter_str = "SingleColumnValueFilter('basic', 'ArticleTypeID', =, 'binary:{value}')".format(value=struct.pack('>q', 1))
这样就没有问题了~
总结
使用 python 读写 hbase 数据,直接传输整型参数时,hbase 的 thrift 接口会抛出 TDecodeException: Field 'value(3)' of 'Mutation' needs type 'STRING' 异常,被告知只接受 string 类型的数据。这时注意将整型数据转化成 bytes 形式的 str,而不要直接使用 str() 方法强转,否则难以避免的会出现一些非预期的结果。
以为这样就没问题了? 请关注看下文~
python快速写入hbase_Python 读写 hbase 数据的正确姿势(一)相关推荐
- python快速写入hbase_Python生成HBase 10w+ 条数据说明
每一个成功人士的背后,必定曾经做出过勇敢而又孤独的决定. 放弃不难,但坚持很酷~版本: Python:3.6.4 与 2.7.3 均适配 一.hbase表介绍表名:people 列族:basic_in ...
- python应用中调用spark_在python中使用pyspark读写Hive数据操作
1.读Hive表数据 pyspark读取hive数据非常简单,因为它有专门的接口来读取,完全不需要像hbase那样,需要做很多配置,pyspark提供的操作hive的接口,使得程序可以直接使用SQL语 ...
- 7.读写HBase数据(华为云学习笔记,Spark编程基础,大数据)
读写HBase数据 ① 在hbase-shell中使用命令创建HBase数据库: ② 使用Spark读写HBase数据库中的数据. 实验原理 -> HBase HBase是一个高可靠.高性能.面 ...
- 【原创】大叔经验分享(25)hive通过外部表读写hbase数据
在hive中创建外部表: CREATE EXTERNAL TABLE hive_hbase_table( key string, name string, desc string ) STORED B ...
- python数据处理系列之读写csv数据
python数据处理系列之读写csv数据 导入csv文件 pandas中导入csv数据用的方法是read_csv() import pandas as pd df = pd.read_csv(r'c: ...
- 5个学习大数据的正确姿势
数据科学这个新领域发展迅速的同时也有着较大的人才缺口,还有其可观的薪资吸引了无数人.小编分享学习大数据的5个正确姿势: 一.学习动机 数据科学是一个广泛而模糊的领域,这使得它很难学习.没有动力,你最终 ...
- 作为一名Python程序员,论听歌的正确姿势?
程序员听歌的正确姿势. 这有啥,无非就是跪.趴.躺- 啊呸,说错了,正确姿势可能是? 打开网易云–>找到榜单–>选歌 But!!! 这也太普通太随意了嘛,来看一个Python程序员的打开方 ...
- python如何将数据写入nc_读写nc数据的方法(转载)
本帖最后由 po_po1 于 2014-4-27 20:13 编辑 NetCDFFile.dimensions返回{dimName:size,-}维数与大小的字典.通过获得维数的python字典类型中 ...
- Python数据分析之Pandas读写外部数据文件
点击上方"Datawhale",选择"星标"公众号 第一时间获取价值内容 阅读目录 1 引言 2 文本文件(txt.csv) 2.1 读取数据 2.2 写入数据 ...
- python快速入门第3版 数据_Python 快速入门 第3版
第 一部分 开始篇 第 1章 关于Python 3 1.1 用Python的理由 3 1.2 Python的长处 3 1.2.1 Python易于使用 4 1.2.2 Python富有表现力 4 1. ...
最新文章
- jeecg3.5.2中上传下载文件的示例中的的一个bug
- matlab svd分解
- python用途与前景-Python就业前景如何?三大就业岗位分享
- Python第五课(字典)
- PMCAFF微课堂 | 阿里高级产品专家: 揭秘B类App如何在高速生长期凝结含金量
- 第十八期:闲鱼上哪些商品抢手?Python分析后告诉你
- css 垂直居中_html中div使用CSS实现水平/垂直居中的多种方式
- python cookie池_Python爬虫scrapy框架Cookie池(微博Cookie池)的使用
- java swing 代码_java swing编写gui生命游戏代码,新手上路
- aws rds监控慢sql_将AWS S3存储桶与AWS RDS SQL Server集成
- 性能测试知多少---吞吐量【转】
- Nodejs页面访问加载静态资源
- oracle汉字转拼音
- 用Python 计算t分布的置信区间
- struct inode 结构体详解
- matlab粒子群运动模拟伪代码,基本粒子群优化算法(PSO)的matlab实现
- EtherNET/IP协议基础知识(下)
- Axure视频教程2:制作第一个原型
- 基于OpenCV的火焰检测(二)——RGB颜色判据
- autocad.net 画多段线_AutoCAD2016绘制多段线图文教程
热门文章
- SQL SERVER2000教程-第五章 处理数据 第二节 检索数据
- 22.Linux/Unix 系统编程手册(上) -- 信号:高级特性
- 15.Linux 高性能服务器编程 --- 进程池和线程池
- 6.jQuery appendTo问题解决
- 3.2 Zend_Db_Select
- 74. PHP 计数器
- 如何为Redis中list中的项设置过期时间
- kafka 重新分配partition
- Linux Windows 环境下 RabbitMQ 安装与基本配置
- [UE4]Uniform Grid Panel