Spark SQL主要目的是使得用户可以在Spark上使用SQL,其数据源既可以是RDD,也可以是外部的数据源(比如Parquet、Hive、Json等)。Spark SQL的其中一个分支就是Spark on Hive,也就是使用Hive中HQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MR作业替换成了Spark作业。本文就是来介绍如何通过Spark SQL来读取现有Hive中的数据。

  不过,预先编译好的Spark assembly包是不支持Hive的,如果你需要在Spark中使用Hive,必须重新编译,加上-Phive选项既可,具体如下:

[iteblog@www.iteblog.com spark]$ ./make-distribution.sh --tgz -Phadoop-2.2 -Pyarn -DskipTests -Dhadoop.version=2.2.0  -Phive

  编译完成之后,会在SPARK_HOME的lib目录下多产生三个jar包,分别是datanucleus-api-jdo-3.2.6.jar、datanucleus-core-3.2.10.jar、datanucleus-rdbms-3.2.9.jar,这些包都是Hive所需要的。下面就开始介绍步骤。

一、环境准备

  为了让Spark能够连接到Hive的原有数据仓库,我们需要将Hive中的hive-site.xml文件拷贝到Spark的conf目录下,这样就可以通过这个配置文件找到Hive的元数据以及数据存放。

  如果Hive的元数据存放在Mysql中,我们还需要准备好Mysql相关驱动,比如:mysql-connector-java-5.1.22-bin.jar。

二、启动spark-shell

  环境准备好之后,为了方便起见,我们使用spark-shell来进行说明如何通过Spark SQL读取Hive中的数据。我们可以通过下面的命令来启动spark-shell:

[iteblog@www.iteblog.com spark]$  bin/spark-shell --master yarn-client  --jars lib/mysql-connector-java-5.1.22-bin.jar

....

15/08/27 18:21:25 INFO repl.SparkILoop: Created spark context..

Spark context available as sc.

....

15/08/27 18:21:30 INFO repl.SparkILoop: Created sql context (with Hive support)..

SQL context available as sqlContext.

  启动spark-shell的时候会先向ResourceManager申请资源,而且还会初始化SparkContext和SQLContext实例。sqlContext对象其实是HiveContext的实例,sqlContext是进入Spark SQL的切入点。接下来我们来读取Hive中的数据。

scala> sqlContext.sql("CREATE EXTERNAL  TABLE IF NOT EXISTS ewaplog (key STRING, value STRING)

STORED AS INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat' OUTPUTFORMAT

'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION '/user/iteblog/ewaplog' ")

res0: org.apache.spark.sql.DataFrame = [result: string]

scala> sqlContext.sql("LOAD DATA LOCAL INPATH '/data/test.lzo' INTO TABLE ewaplog")

res1: org.apache.spark.sql.DataFrame = [result: string]

scala> sqlContext.sql("FROM ewaplog SELECT key, value").collect().foreach(println)

[12,wyp]

[23,ry]

[12,wyp]

[23,ry]

  我们先创建了ewaplog表,然后导入数据,最后查询。我们可以看出所有的SQL和在Hive中是一样的,只是在Spark上运行而已。在执行SQL的时候,默认是调用hiveql解析器来解析SQL的。当然,你完全可以调用Spark SQL内置的SQL解析器sql,可以通过spark.sql.dialect参数来设置。但是建议还是使用hivesql解析器,因为它支持的语法更多,而且还支持Hive的UDF函数,在多数情况下推荐使用hivesql解析器。

  如果你在创建HiveContext的时候出现了类似以下的错误:

15/11/20 16:20:07 WARN metadata.Hive: Failed to access metastore. This class should not accessed in runtime.

org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

    at org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:1236)

    at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:174)

    at org.apache.hadoop.hive.ql.metadata.Hive.<clinit>(Hive.java:166)

    at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)

    at org.apache.spark.sql.hive.client.ClientWrapper.<init>(ClientWrapper.scala:171)

    at org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:162)

    at org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:160)

    at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala:167)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)

    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)

    at org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1028)

    at $line4.$read$$iwC$$iwC.<init>(<console>:9)

    at $line4.$read$$iwC.<init>(<console>:18)

    at $line4.$read.<init>(<console>:20)

    at $line4.$read$.<init>(<console>:24)

    at $line4.$read$.<clinit>(<console>)

    at $line4.$eval$.<init>(<console>:7)

    at $line4.$eval$.<clinit>(<console>)

    at $line4.$eval.$print(<console>)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:606)

    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)

    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)

    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)

    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)

    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)

    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)

    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)

    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)

    at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:132)

    at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:124)

    at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:324)

    at org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:124)

    at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:64)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:974)

    at org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:159)

    at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:64)

    at org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:108)

    at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:64)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:991)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)

    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)

    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)

    at org.apache.spark.repl.Main$.main(Main.scala:31)

    at org.apache.spark.repl.Main.main(Main.scala)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:606)

    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)

    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)

    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)

    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)

    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

    at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1523)

    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:86)

    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:132)

    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)

    at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3005)

    at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024)

    at org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:1234)

    ... 59 more

Caused by: java.lang.reflect.InvocationTargetException

    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)

    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)

    at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1521)

    ... 65 more

Caused by: MetaException(message:Version information not found in metastore. )

    at org.apache.hadoop.hive.metastore.ObjectStore.checkSchema(ObjectStore.java:6664)

    at org.apache.hadoop.hive.metastore.ObjectStore.verifySchema(ObjectStore.java:6645)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:606)

    at org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:114)

    at com.sun.proxy.$Proxy15.verifySchema(Unknown Source)

    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:572)

    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:620)

    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:461)

    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:66)

    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:72)

    at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5762)

    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:199)

    at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(SessionHiveMetaStoreClient.java:74)

    ... 70 more

15/11/20 16:20:07 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore

15/11/20 16:20:07 INFO metastore.ObjectStore: ObjectStore, initialize called

15/11/20 16:20:07 INFO metastore.MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY

15/11/20 16:20:07 INFO metastore.ObjectStore: Initialized ObjectStore

java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

    at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)

    at org.apache.spark.sql.hive.client.ClientWrapper.<init>(ClientWrapper.scala:171)

    at org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:162)

    at org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:160)

    at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala:167)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)

    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)

    at org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1028)

    at $iwC$$iwC.<init>(<console>:9)

    at $iwC.<init>(<console>:18)

    at <init>(<console>:20)

    at .<init>(<console>:24)

    at .<clinit>(<console>)

    at .<init>(<console>:7)

    at .<clinit>(<console>)

    at $print(<console>)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:606)

    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)

    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)

    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)

    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)

    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)

    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)

    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)

    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)

    at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:132)

    at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:124)

    at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:324)

    at org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:124)

    at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:64)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:974)

    at org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:159)

    at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:64)

    at org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:108)

    at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:64)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:991)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)

    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)

    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)

    at org.apache.spark.repl.Main$.main(Main.scala:31)

    at org.apache.spark.repl.Main.main(Main.scala)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:606)

    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)

    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)

    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)

    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)

    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

    at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1523)

    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:86)

    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:132)

    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)

    at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3005)

    at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024)

    at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)

    ... 56 more

Caused by: java.lang.reflect.InvocationTargetException

    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)

    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)

    at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1521)

    ... 62 more

Caused by: MetaException(message:Version information not found in metastore. )

    at org.apache.hadoop.hive.metastore.ObjectStore.checkSchema(ObjectStore.java:6664)

    at org.apache.hadoop.hive.metastore.ObjectStore.verifySchema(ObjectStore.java:6645)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:606)

    at org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:114)

    at com.sun.proxy.$Proxy15.verifySchema(Unknown Source)

    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:572)

    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:620)

    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:461)

    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:66)

    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:72)

    at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5762)

    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:199)

    at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(SessionHiveMetaStoreClient.java:74)

    ... 67 more

看下你的Hadoop集群是否可以连接Mysql元数据。

1 场景

在实际过程中,遇到这样的场景:

日志数据打到HDFS中,运维人员将HDFS的数据做ETL之后加载到hive中,之后需要使用Spark来对日志做分析处理,Spark的部署方式是Spark on Yarn的方式。

Hive环境

  • 需要配置好Hive环境,因为在提交Spark任务时,需要连同hive-site.xml文件一起提交,因为只有这样才能够识别已有的hive环境的元数据信息;
  • 所以其实中Spark on Yarn的部署模式中,需要的只是hive的配置文件,以让HiveContext能够读取存储在mysql中的元数据信息以及存储在HDFS上的hive表数据;

2 编写程序与打包

作为一个测试案例,这里的测试代码比较简单,如下:

package cn.xpleaf.spark.scala.sql.p2import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}/*** @author xpleaf*/
object _01HiveContextOps {def main(args: Array[String]): Unit = {Logger.getLogger("org.apache.spark").setLevel(Level.OFF)val conf = new SparkConf()
//            .setMaster("local[2]").setAppName(s"${_01HiveContextOps.getClass.getSimpleName}")val sc = new SparkContext(conf)val hiveContext = new HiveContext(sc)hiveContext.sql("show databases").show()hiveContext.sql("use mydb1")// 创建teacher_info表val sql1 = "create table teacher_info(\n" + "name string,\n" + "height double)\n" + "row format delimited\n" + "fields terminated by ','"hiveContext.sql(sql1)// 创建teacher_basic表val sql2 = "create table teacher_basic(\n" + "name string,\n" + "age int,\n" + "married boolean,\n" + "children int)\n" + "row format delimited\n" + "fields terminated by ','"hiveContext.sql(sql2)// 向表中加载数据hiveContext.sql("load data inpath 'hdfs://ns1/data/hive/teacher_info.txt' into table teacher_info")hiveContext.sql("load data inpath 'hdfs://ns1/data/hive/teacher_basic.txt' into table teacher_basic")// 第二步操作:计算两张表的关联数据val sql3 = "select\n" + "b.name,\n" + "b.age,\n" + "if(b.married,'已婚','未婚') as married,\n" + "b.children,\n" + "i.height\n" + "from teacher_info i\n" + "inner join teacher_basic b on i.name=b.name"val joinDF:DataFrame = hiveContext.sql(sql3)val joinRDD = joinDF.rddjoinRDD.collect().foreach(println)joinDF.write.saveAsTable("teacher")sc.stop()}}

可以看到其实只是简单的在hive中建表、加载数据、关联数据与保存数据到hive表中。

编写完成之后打包就可以了,注意不需要将依赖一起打包。之后就可以把jar包上传到我们的环境中了。

3 部署

编写submit脚本,如下:

[hadoop@hadoop01 jars]$ cat spark-submit-yarn.sh
/home/hadoop/app/spark/bin/spark-submit \
--class $2 \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--num-executors 1 \
--files $SPARK_HOME/conf/hive-site.xml \
--jars $SPARK_HOME/lib/mysql-connector-java-5.1.39.jar,$SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar \
$1 \

注意其中非常关键的--files--jars,说明如下:

--files $HIVE_HOME/conf/hive-site.xml
//将Hive的配置文件添加到Driver和Executor的classpath中
--jars $HIVE_HOME/lib/mysql-connector-java-5.1.39.jar,….
//将Hive依赖的jar包添加到Driver和Executor的classpath中

之后就可以执行脚本,将任务提交到Yarn上:

[hadoop@hadoop01 jars]$ ./spark-submit-yarn.sh spark-process-1.0-SNAPSHOT.jar cn.xpleaf.spark.scala.sql.p2._01HiveContextOps

4 查看结果

需要说明的是,如果需要对执行过程进行监控,就需要进行配置historyServer(mr的jobHistoryServer和spark的historyServer),可以参考我之前写的文章。

4.1 Yarn UI

4.2 Spark UI

5 问题与解决

1.User class threw exception: java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

注意我们的Spark部署模式是Yarn,yarn上面是没有相关spark和hive的相关依赖的,所以在提交任务时,必须要指定要上传的jar包依赖:

--jars $SPARK_HOME/lib/mysql-connector-java-5.1.39.jar,$SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar \

其实在提交任务时,注意观察控制台的输出:


18/10/09 10:57:44 INFO yarn.Client: Uploading resource file:/home/hadoop/app/spark/lib/spark-assembly-1.6.2-hadoop2.6.0.jar -> hdfs://ns1/user/hadoop/.sparkStaging/application_1538989570769_0023/spark-assembly-1.6.2-hadoop2.6.0.jar
18/10/09 10:57:47 INFO yarn.Client: Uploading resource file:/home/hadoop/jars/spark-process-1.0-SNAPSHOT.jar -> hdfs://ns1/user/hadoop/.sparkStaging/application_1538989570769_0023/spark-process-1.0-SNAPSHOT.jar
18/10/09 10:57:47 INFO yarn.Client: Uploading resource file:/home/hadoop/app/spark/lib/mysql-connector-java-5.1.39.jar -> hdfs://ns1/user/hadoop/.sparkStaging/application_1538989570769_0023/mysql-connector-java-5.1.39.jar
18/10/09 10:57:47 INFO yarn.Client: Uploading resource file:/home/hadoop/app/spark/lib/datanucleus-api-jdo-3.2.6.jar -> hdfs://ns1/user/hadoop/.sparkStaging/application_1538989570769_0023/datanucleus-api-jdo-3.2.6.jar
18/10/09 10:57:47 INFO yarn.Client: Uploading resource file:/home/hadoop/app/spark/lib/datanucleus-core-3.2.10.jar -> hdfs://ns1/user/hadoop/.sparkStaging/application_1538989570769_0023/datanucleus-core-3.2.10.jar
18/10/09 10:57:47 INFO yarn.Client: Uploading resource file:/home/hadoop/app/spark/lib/datanucleus-rdbms-3.2.9.jar -> hdfs://ns1/user/hadoop/.sparkStaging/application_1538989570769_0023/datanucleus-rdbms-3.2.9.jar
18/10/09 10:57:47 INFO yarn.Client: Uploading resource file:/home/hadoop/app/spark/conf/hive-site.xml -> hdfs://ns1/user/hadoop/.sparkStaging/application_1538989570769_0023/hive-site.xml
18/10/09 10:57:47 INFO yarn.Client: Uploading resource file:/tmp/spark-6f582e5c-3eef-4646-b8c7-0719877434d8/__spark_conf__103916311924336720.zip -> hdfs://ns1/user/hadoop/.sparkStaging/application_1538989570769_0023/__spark_conf__103916311924336720.zip

也可以看到,其会将相关spark相关的jar包上传到yarn的环境也就是hdfs上,之后再执行相关的任务。

2.User class threw exception: org.apache.spark.sql.execution.QueryExecutionException: FAILED: SemanticException [Error 10072]: Database does not exist: mydb1

mydb1不存在,说明没有读取到我们已有的hive环境的元数据信息,那是因为在提交任务时没有指定把hive-site.xml配置文件一并提交,如下:

--files $SPARK_HOME/conf/hive-site.xml \

https://www.jianshu.com/p/8ea827872e7e

使用Spark SQL读取Hive上的数据相关推荐

  1. python使用spark-sql读取数据并可视化_使用Spark SQL读取HBase上的数据

    近日,由华为团队开发的 1.基于部分评估技术,该项目具有强大的数据剪枝和智能扫描特点: 2.支持自定义过滤规则.协处理器等以便支持超低延迟的处理: 3.支持SQL.DataFrame: 4.支持更多的 ...

  2. spark sql读取hive底层_scala – 从一个hive表中读取并使用spark sql写回来

    我正在使用Spark SQL读取Hive表并将其分配给 scala val val x = sqlContext.sql("select * from some_table") 然 ...

  3. spark sql读取hive底层_[大数据]spark sql读写Hive数据不一致

    在大数据公司中,任何一家公司都不会只使用一个框架吧?! skr,skr~~ 那我们今天就来聊一段 Hive 与 Spark的爱恨情仇 就像 在一些场景中,需要将外部的数据导入到Hive表中,然后再对这 ...

  4. spark.sql读取Hive数据报错

    将hive -> conf -> hive-site.xml 文件 复制到 spark -> conf 文件夹下 mysql-connector-java-5.1.25.jar 复制 ...

  5. Spark _25.plus _使用idea读取Hive中的数据加载成DataFrame/DataSet(四)

    对Spark _25 _读取Hive中的数据加载成DataFrame/DataSet(四) https://georgedage.blog.csdn.net/article/details/10309 ...

  6. Spark _25 _读取Hive中的数据加载成DataFrame/DataSet(四)

    由于Hive不在本地,操作略显麻烦.不过细心一点,分析错误,也还好,如果你搭建的hadoop是HA,需要多注意: 这里指出一个错误,如果你报了同类错误,可以参考:https://georgedage. ...

  7. Spark读取Hive中的数据加载为DataFrame

    首先要告诉SparkSql,Hive在哪.然后读取Hive中的数据,必须开启enableHiveSupport. val spark = SparkSession.builder().appName( ...

  8. Spark SQL读取Oracle的number类型的数据时精度丢失问题

    Spark SQL读取Oracle的number类型的数据时精度丢失问题 在程序开发中,使用到了sparkSQL读取Oracle数据库,发现当sparkSQL读取Oracle的number类型字段时, ...

  9. spark数据查询语句select_sparksql读取hive表中数据

    文章目录 spark sql与hive本地调试 new HiveContext空指针异常 权限: 异常 执行select查询的时候找不到host spark sql与hive本地调试 将hive-si ...

最新文章

  1. 滚动屏幕显示隐藏div_HTML结构-常用标签:a·img·table·form·input·iframe·div
  2. mybatis元素类型为 “resultMap“ 的内容必须匹配 “(constructor?,id *,result*,association报错解决
  3. 【数据结构 JavaScript版】- web前端开发精品课程【红点工场】 --javascript-- 链表实现...
  4. 腾讯里约——数字化中台的基石
  5. Arm华为NXP睿赛德大咖云集!2020中国嵌入式技术大会嘉宾揭晓
  6. J2EE基础之Web服务简介
  7. 为什么部分Android用户不喜欢用iOS系统?
  8. 使用Visual Studio来创建动态库/静态库,并加载
  9. Jmeter-阶梯场景设置
  10. linux BufferedImage.createGraphics()卡住不动
  11. ni软件可以卸载吗_电视盒子自带的软件居然可以这样卸载!
  12. 《应用时间序列分析:R软件陪同》——2.11 习题
  13. moment时区转换
  14. 手机视频监控直播系统是如何实现的?需要满足哪些条件?
  15. Vue中使用swiper构建简易轮播图
  16. 遍历两个数组,使得值相等的时候,给第一个数组增加一个值为 true 的属性。
  17. 有多少个数既是 4 的整数倍,又是 6 的整数倍。
  18. MATLAB小技巧(32)FCM聚类
  19. 甲骨文诉谷歌Java API版权纠纷可能移至美国最高法院
  20. 景观照明酒店照明LED筒灯等灯具价格的影响因素

热门文章

  1. PPT素材不够用,这5个网站帮你解决
  2. C++用循环写出一个金字塔
  3. 【Arduino】关于使用USB-ISP烧入bootloader的一些记录
  4. 洛谷P2298 Java解法
  5. 秀和vs太田雄藏(依田纪基讲定式,欺骗感情)
  6. 会议OA项目(三)---我的会议(会议排座、送审)
  7. 二进制1010.0101转换为十进制
  8. 惯性室内导航入门到精通(3)-计步算法
  9. 园区网的网关部署在接入层还是汇聚层 面试官与求职者之间谈话
  10. YOLO算法是干嘛的?