take,takeAsList是Action操作
limit⽅法获取指定DataFrame的前n⾏记录,得到⼀个新的DataFrame对象。和take与head不同的是,limit⽅法不是Action操作

文章目录

  • 一、准备工作
  • 二、创建SparkSession的三种方法
  • 三、RDD、DataFrame、DataSet之间的相互转换
    • 3.1 RDD转DataFrame
    • 3.2 RDD转DataSet
    • 3.3 Frame转DataSet
    • 3.4 DataSet转DataFrame
  • 四、DSL常用方法
    • 4.1 show
    • 4.2 collect
    • 4.3 describe
    • 4.4 where、filter
    • 4.5 select
    • 4.6 drop
    • 4.7 limit
    • 4.8 sort
    • 4.9 groupBy
    • 4.10 agg
    • 4.11 distinct、dropDuplicates
    • 4.12 union
    • 4.13 join
    • 4.14 intersect、except
    • 4.15 withColumn、withColumnRenamed
  • 五、SQL风格语法
  • 六、Spark读写操作
    • 6.1 读文件
    • 6.2 读MySQL数据库
    • 6.3 写文件
    • 6.4 写入MySQL数据库
  • 七、需要手动导包的方法总结
  • 八、获取Column对象的方式

一、准备工作

导入依赖SparkSQL依赖

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>

后面会用到mysql,所以导入mysql依赖

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.17</version>
</dependency>

二、创建SparkSession的三种方法

 * Spark1.x版本的时候,Spark提供了两个查询入口,分别是 SQLContext 和 HiveContext* 在Spark2.0的时候对其进行了整合,形成了SparkSession* SparkSession是Spark2.0版本之后的SparkSQL查询的入口,里面集成了SparkContext* SparkSession兼容SQLContext和HiveContext中的API
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.junit.Testclass _01_SparkSessionCreate {@Test def createSparkSessionTest1(): Unit = {val spark: SparkSession = SparkSession.builder()                          // 用于创建一个SparkSession.Builder对象.master("local")          // 设置Master节点.appName("SparkSession")    // Application的名字.getOrCreate()                      // 获取SparkSession对象// 使用结束后,需要添加stopspark.stop()}@Test def createSparkSessionTest2(): Unit = {val conf: SparkConf = new SparkConf().setMaster("local").setAppName("SparkSession")val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()// 使用结束后,需要添加stopspark.stop()}@Test def createSparkSessionTest3(): Unit = {val spark: SparkSession = SparkSession.builder()                          // 用于创建一个SparkSession.Builder对象.master("local")          // 设置Master节点.appName("SparkSession")    // Application的名字.enableHiveSupport()                // 打开Hive支持,对接hive时需要将hive-site.xml放在resource文件中.getOrCreate()                      // 获取SparkSession对象// 使用结束后,需要添加stopspark.stop()}
}

三、RDD、DataFrame、DataSet之间的相互转换

准备工作

 * DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,* 只能用上面提到的getAS方法或者共性中的第七条提到的模式匹配拿出特定字段* 而Dataset中,每一行是什么类型是不一定的,* 在自定义了case class之后可以很自由的获得每一行的信息

person.txt

Michael, 29
Andy, 30
Justin, 19
private val BASE_URL: String = "C:\\Users\\luds\\Desktop\\dataSource\\"

3.1 RDD转DataFrame

方式1:不可以指定表头列名
得到的DataFrame是没有Schema信息的,默认的表头是 _1, _2
val frame1: DataFrame = spark.createDataFrame(rdd)方式2:可以指定表头列名
需要导入如下的隐式转换包,这里要注意,spark表示你的SparkSession对象
import spark.implicits._
val frame2: DataFrame = rdd.toDF(不变参数列名)方式3:可以指定表头列名
在RDD中,使用到了一个样例类对象,此时转成DataFrame的时候,就有了Schema信息
val peopleRDD: RDD[People] = rdd.map(t => People(t._1, t._2))
val frame3: DataFrame = spark.createDataFrame(peopleRDD)方式4:可以指定表头列名
反射方式val peopleRDD: RDD[People] = rdd.map(t => People(t._1, t._2))val frame3: DataFrame = spark.createDataFrame(peopleRDD)方式5:可以指定表头列名
动态加载方式
val rowRDD: RDD[Row] = rdd.map(t => Row(t._1, t._2))//    封装Schema信息val structType: StructType = StructType(Array(StructField("name", StringType),StructField("age", IntegerType)))
// 转成DataFrame
val frame4: DataFrame = spark.createDataFrame(rowRDD, structType)
    @Test def rdd2DataFrame(): Unit = {val conf: SparkConf = new SparkConf().setMaster("local").setAppName("rdd2DataFrame")val sc: SparkContext = new SparkContext(conf)val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()// 1. 构建RDDval rdd: RDD[(String, Int)] = sc.textFile(BASE_URL + "people.txt").map(line => {val parts: Array[String] = line.split(", ")(parts(0), parts(1).toInt)})// 2. RDD转DataFrame//    这种方式的转换,得到的DataFrame是没有Schema信息的,默认的表头是 _1, _2val frame1: DataFrame = spark.createDataFrame(rdd)frame1.show()// 3. RDD转DataFrame//    需要导入如下的隐式转换包,这里要注意,spark表示你的SparkSession对象import spark.implicits._val frame2: DataFrame = rdd.toDF("name", "age")frame2.show()// 4. RDD转DataFrame(反射方式)//    在RDD中,使用到了一个样例类对象,此时转成DataFrame的时候,就有了Schema信息val peopleRDD: RDD[People] = rdd.map(t => People(t._1, t._2))val frame3: DataFrame = spark.createDataFrame(peopleRDD)frame3.show()// 5. RDD转DataFrame(动态加载方式)val rowRDD: RDD[Row] = rdd.map(t => Row(t._1, t._2))//    封装Schema信息val structType: StructType = StructType(Array(StructField("name", StringType),StructField("age", IntegerType)))// 转成DataFrameval frame4: DataFrame = spark.createDataFrame(rowRDD, structType)frame4.show()}case class People(name: String, age: Int)

3.2 RDD转DataSet

方式1:不能指定表头列名
使用createDataset创建
得到的 DataSet[元组],表头信息就是 _1, _2, ...
import spark.implicits._
val ds1: Dataset[(String, Int)] = spark.createDataset(rdd)方式2:指定表头列名
通过一个样例类
import spark.implicits._
val peopleRDD: RDD[People] = rdd.map(t => People(t._1, t._2))
val ds2: Dataset[People] = spark.createDataset(peopleRDD)方式3:指定表头列名
toDS()方法
import spark.implicits._
val peopleRDD: RDD[People] = rdd.map(t => People(t._1, t._2))
val ds3: Dataset[People] = peopleRDD.toDS()
    @Test def rdd2DataSet(): Unit = {val conf: SparkConf = new SparkConf().setMaster("local").setAppName("rdd2DataFrame")val sc: SparkContext = new SparkContext(conf)val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()// 1. 构建RDDval rdd: RDD[(String, Int)] = sc.textFile(BASE_URL + "people.txt").map(line => {val parts: Array[String] = line.split(", ")(parts(0), parts(1).toInt)})// 2. 使用createDataset创建//    得到的 DataSet[元组],表头信息就是 _1, _2, ...import spark.implicits._val ds1: Dataset[(String, Int)] = spark.createDataset(rdd)ds1.show()// 3. 通过一个样例类import spark.implicits._val peopleRDD: RDD[People] = rdd.map(t => People(t._1, t._2))val ds2: Dataset[People] = spark.createDataset(peopleRDD)ds2.show()// 4. toDSimport spark.implicits._val peopleRDD: RDD[People] = rdd.map(t => People(t._1, t._2))val ds3: Dataset[People] = peopleRDD.toDS()ds3.show()}case class People(name: String, age: Int)

3.3 Frame转DataSet

方式1:as[DataSet的类型]
val ds: Dataset[People] = df.as[People]

示例见3.4

3.4 DataSet转DataFrame

方法1:toDF()或toDF(colNames: String*)方法
val df2: DataFrame = ds.toDF()
val df3: DataFrame = ds.toDF("newName", "newAge")
    @Test def dataFrame2DataSet(): Unit = {val conf: SparkConf = new SparkConf().setMaster("local").setAppName("rdd2DataFrame")val sc: SparkContext = new SparkContext(conf)val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()// 1. 构建RDDval rdd: RDD[People] = sc.textFile(BASE_URL + "people.txt").map(line => {val parts: Array[String] = line.split(", ")People(parts(0), parts(1).toInt)})// 2. 构建一个DataFrameimport spark.implicits._val df: DataFrame = rdd.toDF()// 3. 转DataSetval ds: Dataset[People] = df.as[People]// 4. DataSet转DataFrameval df2: DataFrame = ds.toDF()val df3: DataFrame = ds.toDF("newName", "newAge")df.show()ds.show()df2.show()df3.show()}case class People(name: String, age: Int)

四、DSL常用方法

准备工作
Employee.json

{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000.126}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 19, "depId": 2, "gender": "male", "salary": 8000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "XiaoFang", "age": 18, "depId": 3, "gender": "female", "salary": 58000}

Department.json

{"id": 1, "name": "Technical Department"}
{"id": 2, "name": "Financial Department"}
{"id": 3, "name": "HR Department"}
    private val spark: SparkSession = SparkSession.builder().master("local").appName("DFAction").getOrCreate()import spark.implicits._// 读取json文件,形成DataFrameprivate val employeeDF: DataFrame = spark.read.json("file\\Employee.json")private val departmentDF: DataFrame = spark.read.json("file\\Department.json")

4.1 show

    @Test def showTest(): Unit = {// show: 查看表中的所有信息 select * from xxx// show(): 默认查看20行数据employeeDF.show()                       // 查看20行数据,如果不到20行,全部查看employeeDF.show(2)          // 查看2行数据employeeDF.show(false)       // 是否截断长字符串。如果为true,则超过20个字符的字符串将被截断并且所有单元格都将右对齐employeeDF.show(3, 2)   // 查看3行数据,每一个数据截取2个字符employeeDF.printSchema()    // 输出表结构}

4.2 collect

    @Test def collectTest(): Unit = {val array: Array[Row] = employeeDF.collect()val list: util.List[Row] = employeeDF.collectAsList()       // 返回 java.util.Listimport spark.implicits._val ds: Dataset[Employee] = employeeDF.as[Employee]val array1: Array[Employee] = ds.collect()val list1: util.List[Employee] = ds.collectAsList()println(array.mkString(","))println(list)println(array1.mkString(","))println(list1)}case class Employee(age: Long, depId:Long, gender:String, name:String, salary:Double)

结果如下,通过下面的数据可以清楚看出DataFrame和DataSet的区别

[25,1,male,Leo,20000.126],[30,2,female,Marry,25000.0],[35,1,male,Jack,15000.0],[42,3,male,Tom,18000.0],[21,3,female,Kattie,21000.0],[19,2,male,Jen,8000.0],[30,2,female,Jen,28000.0],[42,3,male,Tom,18000.0],[18,3,female,XiaoFang,58000.0]
[[25,1,male,Leo,20000.126], [30,2,female,Marry,25000.0], [35,1,male,Jack,15000.0], [42,3,male,Tom,18000.0], [21,3,female,Kattie,21000.0], [19,2,male,Jen,8000.0], [30,2,female,Jen,28000.0], [42,3,male,Tom,18000.0], [18,3,female,XiaoFang,58000.0]]
Employee(25,1,male,Leo,20000.126),Employee(30,2,female,Marry,25000.0),Employee(35,1,male,Jack,15000.0),Employee(42,3,male,Tom,18000.0),Employee(21,3,female,Kattie,21000.0),Employee(19,2,male,Jen,8000.0),Employee(30,2,female,Jen,28000.0),Employee(42,3,male,Tom,18000.0),Employee(18,3,female,XiaoFang,58000.0)
[Employee(25,1,male,Leo,20000.126), Employee(30,2,female,Marry,25000.0), Employee(35,1,male,Jack,15000.0), Employee(42,3,male,Tom,18000.0), Employee(21,3,female,Kattie,21000.0), Employee(19,2,male,Jen,8000.0), Employee(30,2,female,Jen,28000.0), Employee(42,3,male,Tom,18000.0), Employee(18,3,female,XiaoFang,58000.0)]

4.3 describe

    @Test def describeTest(): Unit = {// 针对指定的列,进行统计。只针对数值类型的列employeeDF.describe("age", "depId", "salary").show()}

4.4 where、filter

filter和where的效果是一样的

    @Test def whereTest(): Unit = {// 条件: 相当于 select * from xxxx where xxxxxxemployeeDF.where("depId = 3").show()employeeDF.where("depId between 1 and 3").show()employeeDF.where("depId = 3 and salary > 20000").show()employeeDF.filter("depId = 3").show()employeeDF.filter("depId between 1 and 3").show()employeeDF.filter("depId = 3 and salary > 20000").show()// DataSet除了有上面的方法以外,还有下面的方法val ds: Dataset[Employee] = employeeDF.as[Employee]ds.filter(_.depId == 3).show()      // 因为Dataset[Employee]是Employee对象,所以还可以这样干// $"depId" 取这个列,相当于 new Column("depId"),使用的时候需要导包 import spark.implicits._import spark.implicits._employeeDF.where($"salary" >= 10000 and($"salary" < 20000)).show()employeeDF.where(new Column("salary").between(10000, 20000)).show()      // between和or方法都可以使用}case class Employee(age: Long, depId:Long, gender:String, name:String, salary:Double)

4.5 select

    @Test def selectTest(): Unit = {// 查询指定的列employeeDF.select("depId", "name").show()// 列之间的计算, 需要使用Column来完成// 获取Column对象的方式:// new Column("列名")// employeeDF("列名")// $"列名"                // 需要导包 import spark.implicits._// '列名// col("列名")            // 需要导包 import org.apache.spark.sql.functions._employeeDF.select(new Column("name"), new Column("age") + 1).show()employeeDF.select($"name", $"age" + 1).show()employeeDF.select(employeeDF("name"), employeeDF("age") + 1).show()import org.apache.spark.sql.functions._employeeDF.select(col("name"), col("age") + 1).show()employeeDF.select('name, 'age + 1).show()employeeDF.select('name, ('age + 1).as("age_plus_one")).show()// 可以对字段使用UDF函数employeeDF.selectExpr("name as newName", "age + 1 as age_plus_one", "round(salary)").show()}

4.6 drop

    @Test def dropTest(): Unit = {// 查询结果去除指定的列employeeDF.drop("depId").show()employeeDF.drop("depId", "age").show()}

4.7 limit

    @Test def limitTest(): Unit = {// limit 的返回值是DataSet,同时这个方法,也是一个懒加载的方法employeeDF.limit(3).show()// 注意事项: take和takeAsList都会将结果返回到Driver端,在使用的时候注意OOMprintln(employeeDF.first()) // 查询第一行println(employeeDF.head())  // 查询第一行println(employeeDF.head(3)) // 查询前3行println(employeeDF.take(3)) // 查询前3行println(employeeDF.takeAsList(3))// 查询前3行,返回java.util.List集合}

4.8 sort

orderBy和sort按指定字段排序,默认为升序。加个“-”号表示降序排序。只对数字类型和⽇期类型⽣效。sort和orderBy使⽤⽅法相同。

sortWithinPartitions和上⾯的sort⽅法功能类似,区别在于sortWithinPartitions⽅法返回的是按Partition排好序的DataFrame对象。

    @Test def sortTest(): Unit = {// 按照指定的列进行排序,只能升序employeeDF.sort("salary").show()// 如果想要降序,需要使用到column,在前面添加负号即可employeeDF.sort(-employeeDF("salary")).show()employeeDF.sort(-'salary).show()// select * from xxx order by depId, salary descemployeeDF.sort('depId, -'salary).show()    // 按照depId升序,depId相同的情况下,按照salary降序employeeDF.repartition(2).sort("salary").show()// sortWithinPartitions: 分区内的排序,最终的结果输出是按照分区输出的employeeDF.repartition(2).sortWithinPartitions("salary").show()}

4.9 groupBy

    @Test def groupByTest(): Unit = {// select count(*) from xxxx group by depId// groupBy的返回值是: RelationalGroupedDataset// count()  : 获取每一个分组的数量// max(colNames: String*) : 获取分组中指定字段的最大值,只能作用在数字类型的字段上// min(colNames: String*) : 获取分组中指定字段的最小值,只能作用在数字类型的字段上// sum(colNames: String*) : 获取分组中指定字段的和,只能作用在数字类型的字段上// mean(colNames: String*) : 获取分组中指定字段的平均值值,只能作用在数字类型的字段上// avg(colNames: String*) : 获取分组中指定字段的平均值值,只能作用在数字类型的字段上employeeDF.groupBy("depId").mean("salary").show()}

4.10 agg

聚合数据,可以没有使用groupBy

    @Test def aggTest(): Unit = {// 计算所有人的年龄的平均值,所有人的工资总和// select avg(age), sum(salary) from xxxemployeeDF.agg("age" -> "mean", "salary" -> "sum").show()employeeDF.agg(("age", "mean"), ("salary", "sum")).show()employeeDF.groupBy("depId").agg("age" -> "mean", "salary" -> "sum").show()}

4.11 distinct、dropDuplicates

返回一个没有重复元素的DataSet

    @Test def distinctTest(): Unit = {employeeDF.distinct().show()employeeDF.select("depId").distinct().show()        // 只查询depId列,结果去重employeeDF.dropDuplicates("depId").show()          // 按照depId去重,查询的结果包含每一列}

4.12 union

返回一个包含原Dataset和另一个Dataset的新的Dataset

    @Test def unionTest(): Unit = {employeeDF.union(employeeDF.limit(3)).show()}

4.13 join

  inner:内连outer,full,full_outer:全连left, left_outer:左连right,right_outer:右连left_semi:过滤出joinDF1中和joinDF2共有的部分left_anti:过滤出joinDF1中joinDF2没有的部分
    @Test def joinTest(): Unit = {// using字段,进行两张表的连接// 注意事项: using的字段,必须在这两张表中都包含// employeeDF.join(departmentDF, "depId").show()employeeDF.join(employeeDF, "depId").show()// 如果字段名不相同,则可以使用下面的方式employeeDF.join(employeeDF, Seq("depId", "age")).show()// joinType: 连接类型// inner// outer 、 full 、 full_outer// left 、 left_outer// right 、 right_outeremployeeDF.join(employeeDF, Seq("depId"), "inner").show()}
    @Test def join2Test(): Unit = {// 适用于作用两张表中,用来连接的字段名字不一样的情况employeeDF.join(departmentDF, employeeDF("depId") === departmentDF("id")).show()employeeDF.join(departmentDF, employeeDF("depId") === departmentDF("id"), "inner").show()}

4.14 intersect、except

    @Test def intersectAndExceptTest(): Unit = {// intersect: 获取两张表中都存在的数据(交集)// except: 获取的左表存在,右表不存在的数据employeeDF.intersect(employeeDF.limit(2)).show()employeeDF.except(employeeDF.limit(2)).show()employeeDF.limit(2).except(employeeDF).show()}

4.15 withColumn、withColumnRenamed

    @Test def columnNameTest(): Unit = {// 将一个表中的一个字段名,改成指定的名字,如果这个名字不存在,则不会做任何事情val frame: DataFrame = employeeDF.withColumnRenamed("depId", "id")frame.printSchema()// 添加和第二个参数相同的新的一列后返回一个Dataset // 如果第一个参数和第二个参数相同,则不会再添加新的列val frame1: DataFrame = employeeDF.withColumn("id", new Column("depId"))frame1.printSchema()}

方式1结果

方式2结果

五、SQL风格语法

4个重要方法

createGlobalTempView
createOrReplaceGlobalTempView
createTempView
createOrReplaceTempView
import org.apache.spark.sql.{DataFrame, SparkSession}/*** SQL风格的写法*/
object _04_SparkSessionOperation {def main(args: Array[String]): Unit = {// 1. 准备数据val spark: SparkSession = SparkSession.builder().master("local").appName("sql").getOrCreate()val employeeDF: DataFrame = spark.read.json("file\\Employee.json")// 2. SQL风格的写法,首先需要注册一张虚拟表// employeeDF.registerTempTable("employee")    // 注册虚拟表,这个方法已经过时1.6// createGlobalTempView// createOrReplaceGlobalTempView// createTempView// createOrReplaceTempView// 有无Global://      有Global: 在整个Application的生命周期中都可以用//      无Global: 只能在当前的SparkSession中使用,如果在Application中开辟了其他的SparkSession,是不能用的// 有无Replace://      有Replace: 如果创建的虚拟表表名已经存在,则会用这个新的表,替换原来的表//      无Replace: 如果创建的虚拟表表名已经存在,则会异常employeeDF.createOrReplaceTempView("employee")employeeDF.createOrReplaceTempView("employee")employeeDF.createTempView("employee")// 3. 从这个虚拟表中查询val frame: DataFrame = spark.sql("select * from employee")frame.show()}
}

六、Spark读写操作

 * 读取文件*      json:*          spark.read.json(xxx.json)       读取一个json文件*      load:*          spark.read.load(xxx.parquet)                    默认读取一个parquet文件*          spark.read.format("json").load(xxx.json)        读取一个指定格式的文件*      csv: (主要存储MySQL导出的数据)*          spark.read.csv(xxx.csv)*          csv文件,默认的分隔符是逗号,如果想用其他的分隔符分隔每一列,需要添加option("sep", "|")*              spark.read.option("sep", "|").csv(BASE_URL + "student.csv").show()*          如果想让csv文件的第一行作为表头信息,需要添加 option("header", "true")*              spark.read.option("sep", "|").option("header","true").csv(BASE_URL + "student.csv").show()*      orc: (orc是一种列式二进制存储文件,是rc文件的升级版,主要是Hive的存储格式)*          spark.read.orc(xxx.orc)*      text: 读取普通文件,只能将一行的数据直接读取到一个Value中,只能解析为一列** 写文件

准备工作

    // 读取文件的父级目录private val BASE_URL: String = "C:\\Users\\luds\\Desktop\\dataSource\\"private val spark: SparkSession = SparkSession.builder().master("local").appName("file").getOrCreate()

6.1 读文件

    @Test def readFile(): Unit = {spark.read.load(BASE_URL + "users.parquet").show()spark.read.format("json").load(BASE_URL + "account.json").show()spark.read.format("csv").load(BASE_URL + "country.csv").show()spark.read.csv(BASE_URL + "country.csv").show()// sep是separate的缩写,使用sep可以改变读取的文件分隔符spark.read.option("sep", "|").csv(BASE_URL + "student.csv").show()spark.read.option("sep", "|").option("header","true").csv(BASE_URL + "student.csv").show()spark.read.orc(BASE_URL + "student.orc").show()spark.read.text(BASE_URL + "Score.txt").show()import spark.implicits._spark.read.textFile(BASE_URL + "Score.txt").rdd.map(line => {val strings: Array[String] = line.split(",")(strings(0), strings(1).toInt, strings(2).toInt)}).toDF("name", "id", "score").show()}

6.2 读MySQL数据库

如果报空指针异常,那么很可能是jdbc驱动的版本问题,我这里是8.0.17版本的mysql,那么你的jdbc驱动也应该是8.0.17版本

    @Test def readJDBC(): Unit = {// 1. 准备JDBC连接的必要的参数val url: String = "jdbc:mysql://localhost:3306/mydb1?serverTimezone=UTC"val tableName: String = "emp"val properties: Properties = new Properties()properties.setProperty("user", "root")properties.setProperty("password", "123456")// 2. 连接数据库val df: DataFrame = spark.read.jdbc(url, tableName, properties)df.show()}

6.3 写文件

    @Test def writeFile(): Unit = {val PATH: String = "file/"val df: DataFrame = spark.read.json(BASE_URL + "account.json")// 1. 写jsondf.write.json(PATH + "account.json")// 2. 写parquet(默认的格式是parquet,压缩格式采用的snappy)df.write.save(PATH + "account.parquet")df.write.parquet()// 3. 写csvdf.write.csv(PATH + "account.csv1")df.write.option("sep", "|").csv(PATH + "account.csv2")df.write.option("sep", "|").option("header", "true").csv(PATH + "account.csv3")// 4. 写orc(需要整合Hive)df.write.orc()// 5. 写text,只能是DF中只有一列的情况df.write.text(PATH + "account.txt")}

6.4 写入MySQL数据库

    @Test def writeJDBC(): Unit = {// 1. 准备JDBC连接的必要的参数val url: String = "jdbc:mysql://localhost:3306/mydb1?serverTimezone=UTC"val tableName: String = "account"val properties: Properties = new Properties()properties.setProperty("user", "root")properties.setProperty("password", "123456")// 2. 将数据写入到数据库val df: DataFrame = spark.read.json(BASE_URL + "account.json")// df.write.jdbc(url, tableName, properties)// `overwrite`: 如果输出的目标存在,则覆盖之前的数据// `append`: 如果输出的目标存在,则向后追加数据// `ignore`: 如果输出的目标存在,则不作任何事情// `error`: 如果输出的目标存在,则异常df.write.mode("append").jdbc(url, tableName, properties)}

七、需要手动导包的方法总结

需要手动导入:import spark.implicits._

$"列名"   val frame2: DataFrame = rdd.toDF(不变参数列名)val ds1: Dataset[(String, Int)] = spark.createDataset(rdd)val ds3: Dataset[People] = peopleRDD.toDS()val ds: Dataset[Employee] = employeeDF.as[Employee]

需要手动导入:import org.apache.spark.sql.functions._

col("列名")

八、获取Column对象的方式

 1. new Column("列名")2. employeeDF("列名")3. $"列名"                // 需要导包 import spark.implicits._4. '列名5. col("列名")            // 需要导包 import org.apache.spark.sql.functions._

【大数据开发】SparkSQL——RDD、DataFrame、DataSet相互转换、DSL常用方法、SQL风格语法、Spark读写操作、获取Column对象的方式相关推荐

  1. 什么是大数据开发?大数据开发要学什么?一个Java转行过程和经历

    大数据开发是大数据职业发展方向之一,另外一个方向是大数据分析.从工作内容上来说,大数据开发主要是负责大数据挖掘,大数据清洗处理,大数据建模等工作,主要是负责大规模数据的处理和应用,工作主要以开发为主, ...

  2. 接地气,到底什么才是大数据开发工程师?

    最近发现有些同学并不太了解大数据开发工程师这个职位,自己转大数据开发也已经三年了,所以想简单介绍一下什么是大数据开发工程师,当前互联网公司的数据开发到底是什么样子的?和一般的java或者php工程师在 ...

  3. 30岁了,还可以转行大数据开发吗?

    这里有一个半路转行做开发实现年薪40W的故事,也许对你有帮助. 毕业后第一份工作是公司的销售,卖那种app注册量,没错,就是经常在大街上有人拉着你让你扫二维码注册app那种.但是只工作了四个多月我就萌 ...

  4. 大数据之SparkSQL简介及DataFrame的使用

    目录 前言: 1.Spark SQL 1.1.Spark SQL概述 1.2.DataFrames 1.3.DataFrame常用操作 总结: 目录 前言: 本文主要介绍下SparkSQL以及Spar ...

  5. 大数据开发面试知识点复习3

    文章目录 大数据开发复习课程 10.scala 10.1.scala介绍 10.2.scala解释器 10.3.scala的基本语法 10.3.1.声明变量 10.3.2.字符串 10.3.3.数据类 ...

  6. 【Spark】黑马-大数据开发2

    Scala+Spark-大数据开发复习课程 10.scala 10.1.scala介绍 10.2.scala解释器 10.3.scala的基本语法 10.3.1.声明变量 10.3.2.字符串 10. ...

  7. 大数据 - 大数据开发技术课程总结(未完)

    1.课程介绍 大数据开发课程主要从了解大数据概念.特征开始,再介绍大数据Java开发和Hadoop的环境配置,较为全面地讲解了HDFS分布式存储,MapReduce分布式计算框架,Spark平台开发和 ...

  8. 2020“东方国信杯”高校大数据开发大赛最终榜第三名思路分享

    2020"东方国信杯"高校大数据开发大赛最终榜第三名思路分享 2020"东方国信杯"高校大数据开发大赛刚刚结束,我所在的队伍"三人运动团"最 ...

  9. 大数据开发实战教程目录

    大数据开发实战教程目录 一. 课程性质.目的和任务 本课程目的是让学生了解并掌握四个领域 (1)大数据系统的起源及系统特征 (2)大数据系统的架构设计及功能目标设计 (3)大数据系统程序开发.企业大数 ...

最新文章

  1. (转载)android如何在style文件中使用自定义属性
  2. jboss 的debug启动4法
  3. linux diff 远程文件,登录diff命令,以单独的文件输出在linux
  4. (6)学习笔记 ) ASP.NET CORE微服务 Micro-Service ---- AOP框架
  5. PyCharm汉化后无法打开Settings设置
  6. php 空文件夹,使用PHP删除空子文件夹
  7. Win11延迟高怎么办?Win11延迟高的解决方法
  8. Javascript设计模式(二)工厂模式
  9. ubuntu18.04安装CH340和CH341驱动
  10. linux下安装tecplot记录
  11. 清华学生的编程能力有多强?大一学生 C++作业引爆全网,特奖得主、阿里P6:我们也做不到...
  12. caxa发生文件读写异常_文件和异常
  13. 微信小程序直播如何开通
  14. 计算机驱动空间的c盘不足怎么办,C盘磁盘空间不足怎么解决
  15. java 裁剪 pdf_java拆分pdf文档
  16. 基于ArcScene简单实验操作
  17. Spring Aop(十五)——Aop原理之Advised接口
  18. 奥赛 兔子繁殖 c语言,兔子繁殖问题(斐波拉契)
  19. ios logo 启动页大小
  20. Matlab网页交互

热门文章

  1. 基于linux下QT象棋,课内资源 - 基于QT实现的网络象棋游戏
  2. 重要:欢迎光临新版微软中文CRM论坛!
  3. 数字人民币上线红包新功能;高通开始人员优化;第一批AI绘画公司开始倒闭;网易云音乐加码声音社交;统计学课程(2023版);GitHub今日热榜 | ShowMeAI资讯日报
  4. 命名空间“Microsoft.Office”中不存在类型或命名空间名称“Interop”(是缺少程序集引用吗?)...
  5. HarmonyOS应用开发 — HelloWorld应用开发E2E体验
  6. 山东大学2019级软件工程应用与实践——基于人工智能的多肽药物分析问题(十二)
  7. 计算机安装Hp1005打印机,hp1005打印机驱动官方版
  8. 图片放大模糊怎么办?这个方法了解一下
  9. 实战goldengate:安装配置+数据初始化+单向DML复制
  10. 进击3D游戏界!Cocos Creator快速实现骨骼动画交互!