Pyspark + Hbase

环境配置:
Python:3.7.4
Spark:2.4.4
Hbase:2.2.3


前言:
首先,本人建议使用scala来做有关spark的开发,这是和前辈讨论他们给的建议,或者你们可以参考一下这篇文章.
这个真的折磨了我好久(中途还接到需求,断断续续弄了好久,多久就不告诉你们了,免得你们笑我菜),真的辛酸。里面我的做法肯定有很多漏洞,而且还没完全解决,我会持续更新。另外,各位大神,如果友好的建议,评论区提点一下,万分感谢。
另外,本篇可能会有点长,不喜勿喷。。。。


数据格式:
一天一个压缩包(ZIP),然后里面都是很多个TXT文件,分隔符是 “|” ,每个TXT文件大概是9000条数据这样。


前言

我不知道大家有没有这种感觉,反正我是有,就是随着版本的迭代升级,大数据的组件对Python越来越不友好,学习javascala对于大数据开发来说真的很重要。对于我这种只会python的菜鸡还真的是刺激。。。
使用spark写入hbase有两个方法,一个是使用hortonworks的开源框架shc,另一个是使用RDD自带的方法saveAsNewAPIHadoopDataset存入到hbase中。

hortonworks的开源框架shc

先说说这个。
首先你需要下载个shc的依赖包shc-core-1.1.1-2.1-s_2.11.jar:spark hbase connect.
版本的选择自己判断把,这个链接往前翻能找到其他版本。
如果你想自行编译这个包,也可以,从这里找到源码下载自己打包:github源码网址.
我已经忘记我怎么找到的了,应该是这一个没错。
代码:

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *conf = SparkConf()
config = (("spark.executor.memory", "8g"),("spark.executor.cores", "4"))
conf.setAll(config)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()def read_txt(sc):# input = sc.textFile("file:///home/data/demodata").map(lambda x: x.split('|'))# a_rdd = sc.textFile("file:///home/data/20200302").map(lambda x: x.split('|'))# df = input.toDF()df = spark.read.csv("file:///home/data/demodata", sep="|")return dfdef write_to_hbase1(df):rename_df = df. \withColumnRenamed("_1", "PLA_INFO").\withColumnRenamed("_2", "DATE_TIME").\withColumnRenamed("_3", "KKJ").\withColumnRenamed("_4", "KKW").\withColumnRenamed("_5", "SPEED").\withColumnRenamed("_6", "SPEED_EX").\withColumnRenamed("_7", "ALTITUDE").\withColumnRenamed("_8", "STATUS").\withColumnRenamed("_9", "COLOUR").\withColumnRenamed("_10", "MILEAGE").\withColumnRenamed("_11", "WAY")id_df = rename_df.withColumn("V_ID", monotonically_increasing_id())id_df = id_df.withColumn("V_ID", id_df.V_ID.cast(StringType()))dep = "org.apache.spark.sql.execution.datasources.hbase"# 创建schemacatalog = """{"table":{"namespace":"default", "name":"test", "tableCoder":"PrimitiveType"},"rowkey":"key","columns":{"V_ID":{"cf":"rowkey", "col":"key", "type":"string"},"PLATE":{"cf":"car_info", "col":"PLA_INFO", "type":"string"},"DATE_TIME":{"cf":"car_info", "col":"DATE_TIME", "type":"int"},"KKJD":{"cf":"car_info", "col":"KKJ","type":"int"},"KKWD":{"cf":"car_info", "col":"KKW","type":"string"},"SPEED":{"cf":"car_info", "col":"SPEED", "type":"string"},"SPEED_EX":{"cf":"car_info", "col":"SPEED_EX","type":"string"},"ALTITUDE":{"cf":"car_info", "col":"ALTITUDE","type":"string"},"STATUS":{"cf":"car_info", "col":"STATUS","type":"string"},"COLOUR":{"cf":"car_info", "col":"COLOUR","type":"string"},"MILEAGE":{"cf":"car_info", "col":"MILEAGE","type":"string"},"WAY":{"cf":"car_info", "col":"WAY","type":"string"}}} """id_df.write.options(catalog=catalog).format(dep).save()print('***************************************************************')print('*******************        写 入 成 功       ******************')print('***************************************************************')def main():sc = spark.sparkContextdf = read_txt(sc)write_to_hbase1(df)sc.stop()
if __name__ == '__main__':main()

然后报错了。。。。

给个文字版吧,主要是这一句:

java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;

简单来说就是在保存的时候,发生了jar包冲突的错,冲突的是org.json4s.jackson.JsonMethods对应的是spark里的json4s-core_2.11-3.5.3.jar
我用上面的网址下载源码,下载的对用的包,pom里只引用了一个包,应该是引用里面又引用了json4s这个包,我在包里搜索了一下还真的有。。我就不截图了,其实也就是说把这个包去掉重新编译就行了。于是乎,我让我同事帮忙在pom里加了个忽略json4s包的代码(因为没成功,我就删了,没能贴出来),但是打包的时候一堆错误,所以我放弃了。
网上遇到我上面这个错的都是建议把shc jar包里的剔除了再重新编译就可以了,建议大家试试,应该是可以成功的。

saveAsNewAPIHadoopDataset

这个我直接上代码吧。
代码:

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *conf = SparkConf()
config = (("spark.executor.memory", "8g"),("spark.executor.cores", "4"))
conf.setAll(config)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()def read_txt(sc):# input = sc.textFile("file:///home/data/demodata").map(lambda x: x.split('|'))# a_rdd = sc.textFile("file:///home/data/20200302").map(lambda x: x.split('|'))# df = input.toDF()df = spark.read.csv("file:///home/data/demodata", sep="|")return dfdef write_to_hbase2(df):"""将Spark DataFrame 数据写入Hbase"""rename_df = df. \withColumnRenamed("_1", "PLA_INFO").\withColumnRenamed("_2", "DATE_TIME").\withColumnRenamed("_3", "KKJ").\withColumnRenamed("_4", "KKW").\withColumnRenamed("_5", "SPEED").\withColumnRenamed("_6", "SPEED_EX").\withColumnRenamed("_7", "ALTITUDE").\withColumnRenamed("_8", "STATUS").\withColumnRenamed("_9", "COLOUR").\withColumnRenamed("_10", "MILEAGE").\withColumnRenamed("_11", "WAY")id_df = rename_df.withColumn("V_ID", monotonically_increasing_id())id_df = id_df.withColumn("V_ID", id_df.V_ID.cast(StringType()))host = "192.168.0.111"table = "test"keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"conf = {"hbase.zookeeper.quorum": host, "hbase.mapred.outputtable": table,"zookeeper.znode.parent": "/hbase-unsecure","mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat","mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable","mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}# DataFrame转换为rdd写入hbasecol_list = id_df.columnsprint(col_list)rdd_data = id_df.rdd.map(lambda x:(x["V_ID"],((i,x[i]) for i in col_list[1:])))rdd_data = rdd_data.flatMapValues(lambda x:x)rrd2 = rdd_data.map(lambda x:(x[0],[x[0],'info',x[1][0],x[1][1]]))print(rrd2.take(10))rrd2.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)def main():sc = spark.sparkContextdf = read_txt(sc)write_to_hbase2(df)sc.stop()
if __name__ == '__main__':main()

这个方法有个值得注意的地方,如果你在配置的代码中没有配置zookeeper.znode.parent,你就会遇到以下错误:

20/04/29 09:28:14 WARN ConnectionImplementation: Retrieve cluster id failed
java.util.concurrent.ExecutionException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /hbase/hbaseid

然后你会一头雾水,因为这个问题你可能在使用hbase shell的时候应该遇到过,而且你应该已经解决了,然后再往下拉,你就又看到一句:

Caused by: java.io.IOException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /hbase

如果你百度的这些错的话,你能看到的都不是说spark连接Hbase遇到这个问题。。这就很无语。。
其实这里简单来说就是,找不到hbase在zookeeper的路径(不知道我这样表述有没有问题)。所以你只要在配置里指定好zookeeper.znode.parent就可以了。
这个配置我试过,在scala的代码里不写,也是报一样的错,但是很奇怪的是网上的代码基本都不写这个配置。。可能是我环境配的有问题吧。

好的,配置完之后不报这个错了,又报另一个错:

20/05/07 15:37:18 ERROR Utils: Aborting task
java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.add([B[B[B)Lorg/apache/hadoop/hbase/client/Put;

这里简单来讲就是org.apache.hadoop.hbase.client里put.add有问题。。。
那么我上网查过,反正意思就是2.0版本之后这个方法改了
解决办法看看这几篇:
https://blog.csdn.net/sinat_37992109/article/details/98735188.
https://www.cnblogs.com/cssdongl/p/7347167.html.
https://blog.csdn.net/cssdongl/article/details/77750515.

简单来讲就是要不呢,你就换版本(spark降低到2.0版本以下),要不呢,你就重新编译spark-exmaple的jar包(怎么又重新编译。。)

我觉得太过于麻烦。。所以我还是写scala
由于文章太长,所以我再分一篇把。。。

PySpark写入数据到Hbase的辛酸经历相关推荐

  1. CDH集成了Kerberos后写入数据到HBase遇到的几个问题

    目录 环境和场景 问题一:Zookeeper认证错误 解决方法 问题二:HBase权限错误 解决方法 环境和场景 环境:CDH6.3.1+Kerberos 场景:数据经Flink处理后写入到Hbase ...

  2. 读取TXT文件写入数据到Hbase

    背景: 接到需求,需要每天把大概一亿条的数据导入到Hbase里,对标同事的mongoDB(大概半个小时内导完) 环境配置: Python:3.7.4 Hbase:2.2.3 数据格式: 一天一个压缩包 ...

  3. Spark写入数据到Hbase(下)解决scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps

    环境配置: Scala:2.11.12(看上篇,原本是2.12.1) Spark:2.4.4 Hbase:2.2.3 前言: 上一篇没报错,后来改动了一下,就报错了,这个错是版本的问题,而且让我十分疑 ...

  4. [喵咪大数据]Hive+Hbase关联

    在之前的章节中我们已经一同学习的Hive和HBase相关的知识,但是Hive和HBase都存在各自的问题,Hive实时性不强单条写入数据慢,HBase查询能力差不具备复杂查询的能力,但是Hive和HB ...

  5. spark从hbase读取写入数据

    将RDD写入hbase 注意点: 依赖: 将lib目录下的hadoop开头jar包.hbase开头jar包添加至classpath 此外还有lib目录下的:zookeeper-3.4.6.jar.me ...

  6. hbase原理与实践_JAVA连接HBase客户端及HBase写入数据和读取数据原理解析

    JAVA连接HBase客户端 接着上篇文章进行代码的实践,从JAVA 客户端对 HBase的客户端进行一系列操作 工具类:HbaseUtil 静态代码块一次性创建连接对象 并赋值 返回连接对象 Con ...

  7. 大数据查询——HBase读写设计与实践--转

    背景介绍 本项目主要解决 check 和 opinion2 张历史数据表(历史数据是指当业务发生过程中的完整中间流程和结果数据)的在线查询.原实现基于 Oracle 提供存储查询服务,随着数据量的不断 ...

  8. 一条数据的HBase之旅,简明HBase入门教程-开篇

    常见的HBase新手问题: 什么样的数据适合用HBase来存储? 既然HBase也是一个数据库,能否用它将现有系统中昂贵的Oracle替换掉? 存放于HBase中的数据记录,为何不直接存放于HDFS之 ...

  9. 一条数据的HBase之旅,简明HBase入门教程-Write全流程

    如果将上篇内容理解为一个冗长的"铺垫",那么,从本文开始,剧情才开始正式展开.本文基于提供的样例数据,介绍了写数据的接口,RowKey定义,数据在客户端的组装,数据路由,打包分发, ...

最新文章

  1. linux输出重定向%3e退出,Linux学习笔记——第二章:Linux的用户接口与文本编辑器...
  2. Android SQLite数据库增删改查操作
  3. 16 分频 32 分频是啥意思_Verilog 数字分频器的设计及验证
  4. 【渝粤题库】国家开放大学2021春1078复变函数题目
  5. 安装mysql8._安装MySQL8(附详细图文)
  6. java enummap_Java EnumMap get()方法与示例
  7. 彭文华:详解数字化转型的破局之道(附直播视频)
  8. 对于成功的GDUUU商人来说
  9. MySQL 异常有这一篇就够了!
  10. 谁给小鹏P5的勇气?
  11. 做Tiktok如何选择地区?
  12. 文字处理(WORD/WP)中,布局与绘制必然分开
  13. extern dllInport用法
  14. 【UVA-10891】Game of Sum【区间DP】
  15. vbs画动态爱心代码_求助,求一个vbs画心形的代码(程序小白的求助)
  16. 模糊综合评价模型原理及matlab实现
  17. 尔雅 科学通史(吴国盛) 个人笔记及课后习题 2018 第五章 欧洲科技文明的起源
  18. python做一个浏览器_用python做一个简单的浏览器
  19. python贪吃蛇设计思路_Python制作AI贪吃蛇
  20. 字典(python学习)

热门文章

  1. php自动加nofollow,WordPress自动外部链接加上nofollow标签且新窗口打开
  2. word把选择答案弄到题目里_word2003考试选择题及答案
  3. PHP比java好学?_php好学还是java好学,学php有前途吗
  4. python海龟画图(2)五星红旗
  5. 获得安卓手机的相关信息
  6. 逆向 qq 消息撤回
  7. SpringBoot:WebSocket实现消息撤回、图片撤回
  8. windows8.1 ssd优化
  9. 网络安全工程师的学习路线
  10. 非root账号安装nginx1.22.0