首先我们使用新的API方法连接mysql加载数据 创建DF

import org.apache.spark.sql.DataFrame
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{SaveMode, DataFrame}
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.hive.HiveContext
import java.sql.DriverManager
import java.sql.Connection
val sqlContext = new HiveContext(sc)
val mySQLUrl = "jdbc:mysql://10.180.211.100:3306/appcocdb?user=appcoc&password=Asia123"

val CI_MDA_SYS_TABLE = sqlContext.jdbc(mySQLUrl,"CI_MDA_SYS_TABLE").cache()

val CI_MDA_SYS_TABLE_COLUMN = sqlContext.jdbc(mySQLUrl,"CI_MDA_SYS_TABLE_COLUMN").cache()

val CI_LABEL_EXT_INFO = sqlContext.jdbc(mySQLUrl,"CI_LABEL_EXT_INFO").cache()

val CI_LABEL_INFO = sqlContext.jdbc(mySQLUrl,"CI_LABEL_INFO").cache()

val CI_APPROVE_STATUS = sqlContext.jdbc(mySQLUrl,"CI_APPROVE_STATUS").cache()

val DIM_COC_LABEL_COUNT_RULES = sqlContext.jdbc(mySQLUrl,"DIM_COC_LABEL_COUNT_RULES").cache()

根据多表ID进行关联

val labels = CI_MDA_SYS_TABLE.join(CI_MDA_SYS_TABLE_COLUMN,CI_MDA_SYS_TABLE("TABLE_ID") === CI_MDA_SYS_TABLE_COLUMN("TABLE_ID"),"inner").cache()
labels.join(CI_LABEL_EXT_INFO,CI_MDA_SYS_TABLE_COLUMN("COLUMN_ID") === CI_LABEL_EXT_INFO("COLUMN_ID"),"inner").cache()
labels.join(CI_LABEL_INFO,CI_LABEL_EXT_INFO("LABEL_ID") === CI_LABEL_INFO("LABEL_ID"),"inner").cache()
labels.join(CI_APPROVE_STATUS,CI_LABEL_INFO("LABEL_ID") === CI_APPROVE_STATUS("RESOURCE_ID"),"inner").cache()
labels.filter(CI_APPROVE_STATUS("CURR_APPROVE_STATUS_ID") === 107 and (CI_LABEL_INFO("DATA_STATUS_ID") === 1 || CI_LABEL_INFO("DATA_STATUS_ID") === 2) and (CI_LABEL_EXT_INFO("COUNT_RULES_CODE") isNotNull) and CI_MDA_SYS_TABLE("UPDATE_CYCLE") === 1).cache()

于是噼里啪啦的报错了,在第三个join时找不到ID了,这个问题很诡异。。。:

无奈了。。于是使用官网API spark1.4的指定方法尝试

val labels = CI_MDA_SYS_TABLE.join(CI_MDA_SYS_TABLE_COLUMN,"TABLE_ID")
labels.join(CI_LABEL_EXT_INFO,"COLUMN_ID")
labels.join(CI_LABEL_INFO,"LABEL_ID")
labels.join(CI_APPROVE_STATUS).WHERE($"LABEL_ID"===$"RESOURCE_ID")

于是又噼里啪啦的,还是找不到ID。。。。

最后无奈。。就用原来的方法 创建软连接,加载数据,发现可以。。这我就不明白了。。。

val CI_MDA_SYS_TABLE_DDL = s"""
             CREATE TEMPORARY TABLE CI_MDA_SYS_TABLEUSING org.apache.spark.sql.jdbcOPTIONS (url    '${mySQLUrl}',dbtable     'CI_MDA_SYS_TABLE')""".stripMargin
sqlContext.sql(CI_MDA_SYS_TABLE_DDL)val CI_MDA_SYS_TABLE = sql("SELECT * FROM CI_MDA_SYS_TABLE").cache()//val CI_MDA_SYS_TABLE  = sqlContext.jdbc(mySQLUrl,"CI_MDA_SYS_TABLE").cache()
val CI_MDA_SYS_TABLE_COLUMN_DDL = s"""
            CREATE TEMPORARY TABLE CI_MDA_SYS_TABLE_COLUMNUSING org.apache.spark.sql.jdbcOPTIONS (url    '${mySQLUrl}',dbtable     'CI_MDA_SYS_TABLE_COLUMN')""".stripMargin
sqlContext.sql(CI_MDA_SYS_TABLE_COLUMN_DDL)val CI_MDA_SYS_TABLE_COLUMN = sql("SELECT * FROM CI_MDA_SYS_TABLE_COLUMN").cache()//val CI_MDA_SYS_TABLE_COLUMN  = sqlContext.jdbc(mySQLUrl,"CI_MDA_SYS_TABLE_COLUMN").cache()

.........

最终问题是解决了。。可是 为什么直接加载不行呢。。还有待考究。

附带一个问题的解决 如果啊报这种错误

15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_3_piece0 on cbg6aocdp9:49897 in memory (size: 8.4 KB, free: 1060.3 MB)
15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_3_piece0 on cbg6aocdp5:45978 in memory (size: 8.4 KB, free: 1060.3 MB)
15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 10.176.238.11:38968 in memory (size: 8.2 KB, free: 4.7 GB)
15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_2_piece0 on cbg6aocdp4:55199 in memory (size: 8.2 KB, free: 1060.3 MB)
15/11/19 10:57:12 INFO ContextCleaner: Cleaned shuffle 0
15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 10.176.238.11:38968 in memory (size: 6.5 KB, free: 4.7 GB)
15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_1_piece0 on cbg6aocdp8:55706 in memory (size: 6.5 KB, free: 1060.3 MB)
TARGET_TABLE_CODE:========================IT03
Exception in thread "main" java.lang.RuntimeException: Error in configuring objectat org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:190)at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)at scala.Option.getOrElse(Option.scala:120)at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)at scala.Option.getOrElse(Option.scala:120)at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)at scala.Option.getOrElse(Option.scala:120)at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)at scala.Option.getOrElse(Option.scala:120)at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)at scala.Option.getOrElse(Option.scala:120)at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)at scala.Option.getOrElse(Option.scala:120)at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)at scala.Option.getOrElse(Option.scala:120)at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)at scala.Option.getOrElse(Option.scala:120)at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)at scala.Option.getOrElse(Option.scala:120)at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:121)at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:125)at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1269)at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1203)at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1262)at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:176)at org.apache.spark.sql.DataFrame.show(DataFrame.scala:331)at main.asiainfo.coc.impl.IndexMakerObj$$anonfun$makeIndexsAndLabels$1.apply(IndexMakerObj.scala:218)at main.asiainfo.coc.impl.IndexMakerObj$$anonfun$makeIndexsAndLabels$1.apply(IndexMakerObj.scala:137)at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)at main.asiainfo.coc.impl.IndexMakerObj$.makeIndexsAndLabels(IndexMakerObj.scala:137)at main.asiainfo.coc.CocDss$.main(CocDss.scala:23)at main.asiainfo.coc.CocDss.main(CocDss.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:606)at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.reflect.InvocationTargetExceptionat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:606)at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)... 71 more
Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135)at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175)at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)... 76 more
Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not foundat org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2018)at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128)... 78 more

一看最后就知道 是hadoop数据压缩格式为lzo spark要想读取 必须引入hadoop lzo的jar包

转载于:https://www.cnblogs.com/yangsy0915/p/4978975.html

spark1.4加载mysql数据 创建Dataframe及join操作连接方法问题相关推荐

  1. Databricks 加载MySQL数据

    databricks加载MySQL数据,需要先安装MySQL的驱动包.可以从集群的Libraries安装.上传jar包即可. 查询如下 driver = "com.mysql.jdbc.Dr ...

  2. 安卓加载mysql数据到列表里_Android如何从数据库中加载海量数据

    在Android3.0之前,很多应用程序响应性能方面有缺陷,其中比较典型的错误行为是在UI线程中执行了查询数据操作,尤其是一次性从database查出大量数据并加载到ListView里,用这种方式载入 ...

  3. 加载mySQL数据到内存_【测试验证】数据库加载到内存占用大小

    介绍 数据库大小加载到内存占用是不是数据库的本身大小哪?今天忽然突发奇想于是就测试一下.首先我要说明的是我是在EF框架下进行测试的,不知道会不会与ado.net连接方式差生不同的结果,我仅仅是做记录, ...

  4. jqgrid 加载mysql数据_利用jqgrid+加mysql的text类型实现简单自定义数据模型

    有的时候,我们做东西可能速度和安全在前期并不是我们考虑的重点,数据的灵活性可能对于一个不成熟的系统更加重要,这里我使用thinkphp+mysql+jqgrid做了一个简单的自定义数据模型.简单总结一 ...

  5. psql+加载mysql数据库_Go实战--go语言操作PostgreSQL数据库(github.com/lib/pq)

    生命不止,继续 Go go go !!! 之前关于golang操作数据库的博客: 今天跟大家分享golang中使用PostgreSQL数据库. 何为PostgreSQL PostgreSQL is a ...

  6. Spark _24 _读取JDBC中的数据创建DataFrame/DataSet(MySql为例)(三)

    两种方式创建DataSet 现在数据库中创建表不能给插入少量数据. javaapi: package SparkSql;import org.apache.spark.SparkConf; impor ...

  7. sparksql加载mysql表中的数据

    sparksql加载mysql表中的数据 <dependency><groupId>mysql</groupId><artifactId>mysql-c ...

  8. FlowChart.NET使用教程:加载图形数据和创建图形对象

    FlowChart.NET是一个通用的流程图控件,提供了用于创建或编辑图表的直观的用户交互模型. 今天演示一下在FlowChart.NET中如何加载来自XML的图形数据,以及创建对应于图形节点和边的图 ...

  9. 正确加载MySQL驱动的语句_正确加载MySQL驱动的语句_____________________;_学小易找答案...

    [多选题]电子图书的特点是 [简答题] [单选题]为了准备病理学考试,王同学想在图书馆借一本人民卫生出版社出版的熊小亮编写的<病理学试题库>,经过检索发现该书都被借出去了 , 最方便快捷的 ...

最新文章

  1. github工具:Octotree安装和使用教程
  2. Oracle内部错误:ORA-00600[2608]一例
  3. 手机远程ssh登录Linux,Linux SSH(远程登录)到Milestone手机
  4. 中科燕园GIS外包-----基于ArcGIS的应急平台
  5. cocos2d-x ios游戏开发初认识(六) 渲染的优化
  6. C/C+语言struct深层探索
  7. 2021.4.23最新mac11.1 big sur 关于CocoaPods安装和使用
  8. 计算机网络--接入互联网方式
  9. 融资2.5亿的国产浏览器,被曝只是打包chrome
  10. TensorFlow 深度学习实战指南中文版
  11. django 自定义权限_如何对django api做权限限制
  12. [转载] python函数——字典设置默认值get() 与 setdefault()区别
  13. AD18 制作PCB封装库时导入其3D模型+下载3D模型
  14. 手把手带你可视化分析 NBA 季后赛
  15. 计算机散热 测试,散热拷机实测_笔记本评测-中关村在线
  16. android自定义url协议,Android平台设置UrlSchemes,实现被第三方应用调用
  17. 趁1024,来总结2022年流走的10个月
  18. 【C++】模板(初级)
  19. 从欧美到亚洲,2019年全球有多少个软件工程师?
  20. oop练习(第11周)

热门文章

  1. java ibatis 锁表_oracle查看被锁的表和解锁
  2. python找工作学历_第一学历和最高学历,哪一个对找工作帮助更大?
  3. 幼儿园语言活动包括哪几类_幼儿园教育:《一起玩》语言活动教案
  4. 500个普通人名_2020年世界500强汽车行业排名:大众公司第一,丰田汽车公司第二...
  5. 计算机与维修专业学校,计算机应用与维修学校录取分专业可靠
  6. java 枚举的继承_java中枚举类可以被继承吗?
  7. php mysql备份脚本_MySQL备份脚本,mysql脚本
  8. lr如何监控linux,LoadRunner如何监控Linux系统资源
  9. Python,OpenCV中的K近邻(knn K-Nearest Neighbor)及改进版的K近邻
  10. OpenCV中的特征匹配+单应性以查找对象