一 ,常规问题 :

1 ,表关联,数据过滤 :

  1. sql
select stock.area,goods.smallLei,goods.typeColorId, weekofyear(to_date(stock.sellDate,'yyyyMMdd')) weekofyear,stock.numBian
from lifeCycleGoods goods,lifeCycleStock stock
where goods.typeId=stock.kuanId and goods.color=stock.color
  1. 得到 :
    jointable ( area,smallLei,typeColorId,weekofyear,numBian )

2 ,注册 : jointable

3 ,求分区个数 : 区域个数

val dfArea: DataFrame = spark.sql("(select count(distinct(area)) cnt from jointable)")
val areaNum: Int = dfArea.collect()(0).get(0).toString.toInt

4 ,按照星期分组求和 :

  1. sql :
select area,smallLei,typeColorId,weekofyear,sum(numBian) sumstock
from jointable
group by area,smallLei,typeColorId,weekofyear
order by area,smallLei,typeColorId,weekofyear
  1. 得到 :
    groupTable ( area,smallLei,typeColorId,weekofyear,sumstock )

5 ,注册 : groupTable

6 ,累加 :

  1. 需求 :
    分组 : area,smallLei,typeColorId
    排序 : smallLei,typeColorId,weekofyear
  2. 代码 :
package com.lifecycle.demo01import java.io.InputStream
import java.util.Propertiesimport org.apache.spark.SparkContext
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql._import scala.collection.mutable.ListBufferobject Demo12 {def main(args: Array[String]): Unit = {//  goods("typeColorId","typeId","bigLei","midLei","smallLei","gendar","yearSeason","goodsYear","goodsSeason","color","sellMonth","sellTpt","dingScore","yunYingScore","boDuan","sellPrice","serialId","banType")//  stock("area","stockId","sellMonth","sellDate","kuanId","color","reasonBian","numBian","owner","stockType")//  结果集val resList: ListBuffer[(String, String)] = new ListBuffer[Tuple2[String,String]]val spark: SparkSession = getSpark()//  表关联 : ( 区域,小类,款色号,日期-此时已经转变为第几周,库存变动量 ),把日期转换为第几周val dfJoin: DataFrame = spark.sql("select stock.area,goods.smallLei,goods.typeColorId,weekofyear(to_date(stock.sellDate,'yyyyMMdd')) weekofyear,stock.numBian from lifeCycleGoods goods,lifeCycleStock stock where goods.typeId=stock.kuanId and goods.color=stock.color")dfJoin.cache()//  注册表 jointable ( area,smallLei,typeColorId,weekofyear,numBian )dfJoin.createOrReplaceTempView("jointable")//  区域个数val dfArea: DataFrame = spark.sql("(select count(distinct(area)) cnt from jointable)")val areaNum: Int = dfArea.collect()(0).get(0).toString.toInt//  星期分组求和 ( 分组:area,smallLei,typeColorId,weekofyear ;求和 numBian )val dfGroup: DataFrame = spark.sql("select area,smallLei,typeColorId,weekofyear,sum(numBian) sumstock from jointable group by area,smallLei,typeColorId,weekofyear order by area,smallLei,typeColorId,weekofyear")dfGroup.cache()//  建表 groupTable ( area,smallLei,typeColorId,weekofyear,sumstock )dfGroup.createOrReplaceTempView("groupTable")//  星期累加val sql = "select area,smallLei,typeColorId,weekofyear,sumstock,sum(sumstock) over(partition by area,smallLei,typeColorId order by weekofyear) sumoverstock from groupTable"val dfRes: DataFrame = spark.sql(sql)//  分区val ds: Dataset[Row] = dfRes.repartition(areaNum,new Column("area"))//  输出ds.write.option("header","true").option("delimiter",",").csv("s3a://lifecyclebigdata/test/data/lifecycle/res12")//  测试//  dfJoin.groupBy("area").count().repartition(1).write.option("header","true").option("delimiter",",").csv("s3a://lifecyclebigdata/test/data/lifecycle/res11")spark.close()}//  建 spark 对象//  建表 goods : lifeCycleGoods ( 18 列 )//  建表 stock : lifeCycleStock ( 10 列 )def getSpark() :SparkSession = {//  毫秒val timer1: Long = System.currentTimeMillis()//  1 ,spark 上下文val spark = SparkSession.builder()//  为了使用 webUI.config("spark.eventLog.enabled", "false")//  driver 进程的内存.config("spark.driver.memory", "2g").appName("SparkDemoFromS3").getOrCreate()//  1 ,日志级别spark.sparkContext.setLogLevel("WARN")//  2 ,读资源文件val properties = new Properties()val stream: InputStream = Demo01.getClass.getClassLoader.getResourceAsStream("s3.properties")properties.load(stream)//  3 ,设置数据源 ( s3 )val sc: SparkContext = spark.sparkContextsc.hadoopConfiguration.set("fs.s3a.access.key", properties.getProperty("fs.s3a.access.key"))sc.hadoopConfiguration.set("fs.s3a.secret.key", properties.getProperty("fs.s3a.secret.key"))sc.hadoopConfiguration.set("fs.s3a.endpoint", properties.getProperty("fs.s3a.endpoint"))//  4 ,隐式转换//  5 ,商品表,18 列 ( 款色号,款号,大类,中类,小类,性别,年季,商品年份,商品季节,颜色,上市月份,上市温度,订货评分,运营评分,波段,售价,系列,版型 )//  5 ,注意 : 读文件,有表头val dfSourceGoods: DataFrame = spark.read.option("header", "true").option("delimiter", ",").csv("s3a://lifecyclebigdata/test/data/lifecycle/goods.csv")val dfGoods: DataFrame = dfSourceGoods.toDF("typeColorId","typeId","bigLei","midLei","smallLei","gendar","yearSeason","goodsYear","goodsSeason","color","sellMonth","sellTpt","dingScore","yunYingScore","boDuan","sellPrice","serialId","banType")dfGoods.cache().createOrReplaceTempView("lifeCycleGoods")//  6 ,业绩表,10 列 ( 区域,门店代码,销售月份,销售日期,款号,颜色,变动原因,库存变动量,店主,店铺类型 )//  6 ,注意 : 读文件,有表头val dfSourceStock: DataFrame = spark.read.option("header", "true").option("delimiter", ",").csv("s3a://lifecyclebigdata/test/data/lifecycle/stock.csv")val dfStock: DataFrame = dfSourceStock.toDF("area","stockId","sellMonth","sellDate","kuanId","color","reasonBian","numBian","owner","stockType").withColumn("numBian",col("numBian").cast(IntegerType))dfStock.cache().createOrReplaceTempView("lifeCycleStock")spark}
}

7 ,shell 运行 :

spark-submit --master yarn --deploy-mode client --num-executors 10 --executor-cores 2 --executor-memory 4g --class com.lifecycle.demo01.Demo12 s3://lifecyclebigdata/test/jar/demo12/demo1007-1.0-SNAPSHOT.jar

8 ,性能监控 :

  1. 运行 :
spark-submit --master yarn --deploy-mode client --num-executors 10 --executor-cores 2 --executor-memory 4g --class com.lifecycle.demo01.Demo12 s3://lifecyclebigdata/test/jar/demo12/demo1007-1.0-SNAPSHOT.jar

9 ,用列分区 :

  1. 运行 :
spark-submit --master yarn --deploy-mode client --num-executors 10 --executor-cores 2 --executor-memory 4g --class com.lifecycle.demo02.Demo01_partition s3://lifecyclebigdata/test/jar/demo1007-1.0-SNAPSHOT.jar

二 ,魔幻问题 : 分区

1 ,原始数据 : 36 行,每行一个单词

2F00
2A00
2700
2200
J001
J002
J032
1300
2100
2E00
J018
2000
2G00
2300
J028
J003
2900
J005
1000
2500
J010
2C00
2600
2400
J025
J006
2.0
J011
J016
200A
J020
J031
2D00
J017
2800
J030

2 ,分区成 36 个分区 : 代码

package mytestimport org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object MyStringTest {def main(args: Array[String]): Unit = {//  1 ,spark sql 上下文val spark = SparkSession.builder().master("local[2]").config("spark.eventLog.enabled", "false").config("spark.driver.memory", "2g").config("spark.executor.memory", "2g").appName("SparkDemoFromS3").getOrCreate()//  2 ,设置日志级别 :spark.sparkContext.setLogLevel("ERROR")//  3 ,隐式转换import spark.implicits._import org.apache.spark.sql.types._import org.apache.spark.sql.functions._val df: DataFrame = spark.read.option("delimiter", ",").csv("par.csv")val df01: DataFrame = df.toDF("area")//  重新分区val df02: Dataset[Row] = df01.repartition(36,col("area"))//  4 ,表注册df02.createOrReplaceTempView("area")val dfRes: DataFrame = spark.sql("select * from area")println(dfRes.rdd.getNumPartitions)dfRes.write.option("header","true").option("delimiter",",").csv("res01")spark.close()}
}

3 ,结果 : 少了很多个分区

4 ,有问题的数据 :

2G00,1
J011,1
J028,1
2500,1
J002,1
2900,1
2E00,1
2600,1
2400,1
J016,1
2F00,1
2.0,1
J032,1
J003,1
J010,1
J020,1
2200,1
2D00,1
2C00,1
J031,1
2A00,1
2700,1
J001,1
2100,1
J006,1

5 ,原因 : 有些数据,多条分在了一起

多条数据去了同一个分区

6 ,输出的时候,重新分区 : 可以达到效果,但我不喜欢

dfRes.write.partitionBy("area").option("header","true").option("delimiter",",").csv("res01")

三 ,自定义分区 :

1 ,自定义分区器 :

  1. 思路 :
    1 ,针对 ( k-v ) 型数据分区
    2 ,用 area 列来分区
    3 ,一共 arr.length 个分区 ( 有多少区域,就分多少个区 )
  2. 代码 :
package com.lifecycle.sparkUtilimport org.apache.spark.Partitioner//  传 1 个参数 : key 组成的数组 ( 我们这里传进来的是所有 area )
class StockPartitioner(val arr: Array[String]) extends Partitioner  {//  理论 ://      1 ,分区从 0 开始//      2 ,一共 num 个分区的话,分区号从 0 到 num-1//  1 ,分区数 ( 有几个区域,就有几个分区 )override def numPartitions:Int = arr.length//  2 ,根据 key ,指定分区override def getPartition(key: Any):Int = {return arr.indexOf(key.toString)}
}

2 ,计算代码 : 两千万提数据分区 2 分钟

package com.lifecycle.demo02import java.io.InputStream
import java.util.Propertiesimport com.lifecycle.sparkUtil.StockPartitioner
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.IntegerTypeimport scala.collection.mutable.ListBufferobject Demo01_partition {def main(args: Array[String]): Unit = {//  goods("typeColorId","typeId","bigLei","midLei","smallLei","gendar","yearSeason","goodsYear","goodsSeason","color","sellMonth","sellTpt","dingScore","yunYingScore","boDuan","sellPrice","serialId","banType")//  stock("area","stockId","sellMonth","sellDate","kuanId","color","reasonBian","numBian","owner","stockType")//  结果集val resList: ListBuffer[(String, String)] = new ListBuffer[Tuple2[String, String]]val spark: SparkSession = getSpark()//  0 ,隐式转换import org.apache.spark.sql.functions._import org.apache.spark.sql.types._import spark.implicits._//  一 ,两表关联 ( goods,stock )//  1 ,表关联 : ( 区域,小类,款色号,日期-此时已经转变为第几周,库存变动量 ),把日期转换为第几周val dfJoin: DataFrame = spark.sql("select stock.area,goods.smallLei,goods.typeColorId,weekofyear(to_date(stock.sellDate,'yyyyMMdd')) weekofyear,stock.numBian from lifeCycleGoods goods,lifeCycleStock stock where goods.typeId=stock.kuanId and goods.color=stock.color")//  2 ,表缓存val joinTable: DataFrame = dfJoin.cache()//  3 ,表注册 : joinTable ( stock.area,smallLei,typeColorId,weekofyear,numBian )joinTable.createOrReplaceTempView("joinTable")//  二 ,周聚合 : dfZhou ( area,smallLei,typeColorId,weekofyear,sumstock )//  1 ,sqlval dfZhou: DataFrame = spark.sql("select area,smallLei,typeColorId,weekofyear,sum(numBian) sumstock from joinTable group by area,smallLei,typeColorId,weekofyear order by area,smallLei,typeColorId,weekofyear")//  三 ,重分区,思路 : df->rdd->重新分区->rdd->df ( 用 area 分区 )//  1 ,所有的 areaval dfAreaRes: DataFrame = spark.sql("select distinct(area) area from joinTable")val arr: Array[String] = dfAreaRes.rdd.map(row => {row.get(0).toString}).collect()//  2 ,自定义分区器 :val areaPartitioner: StockPartitioner = new StockPartitioner(arr)//  3 ,对聚合之后的数据分区//      转换val rddRow: RDD[Row] = dfZhou.rdd//      转换 : area,smallLei,typeColorId,weekofyear,sumstockval rddColumn: RDD[(String, (String, String, String, String, String))] = rddRow.map(row => {val area: String = row.get(0).toStringval smallLei: String = row.get(1).toStringval typeColorId: String = row.get(2).toStringval weekofyear: String = row.get(3).toStringval sumstock: String = row.get(4).toString(area, (area, smallLei, typeColorId, weekofyear, sumstock))})//      分区val rddPar: RDD[(String, (String, String, String, String, String))] = rddColumn.partitionBy(areaPartitioner)//  4 ,rdd->df ,转数字val rddZhou = rddPar.map(e=>e._2)val dfParZhou: DataFrame = rddZhou.toDF("area","smallLei","typeColorId","weekofyear","sumstock").withColumn("sumstock", col("sumstock").cast(IntegerType))//  四 ,dfParZhou 表,缓存,注册//  1 ,缓存 :val zhouTable: DataFrame = dfParZhou.cache()//  2 ,注册 : ( area,smallLei,typeColorId,weekofyear,sumstock )zhouTable.createOrReplaceTempView("zhouTable")//  五 ,周累加 ://  var sqlLeiJia = "select area,smallLei,typeColorId,weekofyear,sumstock,sum(sumstock) over(partition by area order by smallLei,typeColorId) sumoverstock from groupTable"//  分区测试 :println(8888888)println(8888888)println(8888888)println(8888888)//  spark.sql("select * from zhouTable").write.option("header","true").option("delimiter",",").csv("s3a://lifecyclebigdata/test/data/lifecycle/res")spark.sql("select * from zhouTable").write.option("header","true").option("delimiter",",").csv("s3a://lifecyclebigdata/test/data/lifecycle/res")println(8888888)println(8888888)println(8888888)println(8888888)spark.stop()}//  建 spark 对象//  建表 goods : lifeCycleGoods ( 18 列 )//  建表 stock : lifeCycleStock ( 10 列 )def getSpark(): SparkSession = {//  毫秒val timer1: Long = System.currentTimeMillis()//  1 ,spark 上下文val spark = SparkSession.builder()//  为了使用 webUI.config("spark.eventLog.enabled", "false")//  driver 进程的内存.config("spark.driver.memory", "2g")//  spark 的 shuffle 数量.config("spark.sql.shuffle.partitions", "100").appName("SparkDemoFromS3").getOrCreate()//  1 ,日志级别spark.sparkContext.setLogLevel("WARN")//  2 ,读资源文件val properties = new Properties()val stream: InputStream = Demo01_partition.getClass.getClassLoader.getResourceAsStream("s3.properties")properties.load(stream)//  3 ,设置数据源 ( s3 )val sc: SparkContext = spark.sparkContextsc.hadoopConfiguration.set("fs.s3a.access.key", properties.getProperty("fs.s3a.access.key"))sc.hadoopConfiguration.set("fs.s3a.secret.key", properties.getProperty("fs.s3a.secret.key"))sc.hadoopConfiguration.set("fs.s3a.endpoint", properties.getProperty("fs.s3a.endpoint"))//  4 ,隐式转换//  5 ,商品表,18 列 ( 款色号,款号,大类,中类,小类,性别,年季,商品年份,商品季节,颜色,上市月份,上市温度,订货评分,运营评分,波段,售价,系列,版型 )//  5 ,注意 : 读文件,有表头val dfSourceGoods: DataFrame = spark.read.option("header", "true").option("delimiter", ",").csv("s3a://lifecyclebigdata/test/data/lifecycle/goods.csv")val dfGoods: DataFrame = dfSourceGoods.toDF("typeColorId", "typeId", "bigLei", "midLei", "smallLei", "gendar", "yearSeason", "goodsYear", "goodsSeason", "color", "sellMonth", "sellTpt", "dingScore", "yunYingScore", "boDuan", "sellPrice", "serialId", "banType")dfGoods.cache().createOrReplaceTempView("lifeCycleGoods")//  6 ,业绩表,10 列 ( 区域,门店代码,销售月份,销售日期,款号,颜色,变动原因,库存变动量,店主,店铺类型 )//  6 ,注意 : 读文件,有表头val dfSourceStock: DataFrame = spark.read.option("header", "true").option("delimiter", ",").csv("s3a://lifecyclebigdata/test/data/lifecycle/stock.csv")val dfStock: DataFrame = dfSourceStock.toDF("area", "stockId", "sellMonth", "sellDate", "kuanId", "color", "reasonBian", "numBian", "owner", "stockType").withColumn("numBian", col("numBian").cast(IntegerType))dfStock.cache().createOrReplaceTempView("lifeCycleStock")spark}
}
//  样例类,JoinGoods:stock.area,smallLei,typeColorId,weekofyear,numBian
//  case class JoinGoods(var area:String,var smallLei:String,var typeColorId:String,var weekofyear:String,var numBian:String)

3 ,执行 :

spark-submit --master yarn --deploy-mode client --num-executors 20 --executor-cores 2 --executor-memory 4g --class com.lifecycle.demo02.Demo01_partition s3://lifecyclebigdata/test/jar/demo1007-1.0-SNAPSHOT.jar

4 ,结果 : 分区成功

四 ,最终结果 :

1 ,思路 :

  1. 先计算 sql : 分组累加
  2. 处理结果 : df -> rdd -> partition by ( 用来分文件 ) -> df -> 输出

2 ,为什么先 sql 再分区,而不是先分区,再 sql :

  1. 因为 spark sql 的执行原理是,partition by 的时候,会触发重分区
  2. 如果我们先分区的话,执行 sql 语句 partition by 后,就重分区,我们的分区计划就白搭了

3 ,代码清单 :

package com.lifecycle.demo02import java.io.InputStream
import java.util.Propertiesimport com.lifecycle.sparkUtil.StockPartitioner
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.IntegerTypeimport scala.collection.mutable.ListBufferobject Demo01_partition {def main(args: Array[String]): Unit = {//  goods("typeColorId","typeId","bigLei","midLei","smallLei","gendar","yearSeason","goodsYear","goodsSeason","color","sellMonth","sellTpt","dingScore","yunYingScore","boDuan","sellPrice","serialId","banType")//  stock("area","stockId","sellMonth","sellDate","kuanId","color","reasonBian","numBian","owner","stockType")//  结果集val resList: ListBuffer[(String, String)] = new ListBuffer[Tuple2[String, String]]val spark: SparkSession = getSpark()//  0 ,隐式转换import org.apache.spark.sql.functions._import org.apache.spark.sql.types._import spark.implicits._//  一 ,两表关联 ( goods,stock )//  1 ,表关联 : ( 区域,小类,款色号,日期-此时已经转变为第几周,库存变动量 ),把日期转换为第几周val dfJoin: DataFrame = spark.sql("select stock.area,goods.smallLei,goods.typeColorId,weekofyear(to_date(stock.sellDate,'yyyyMMdd')) weekofyear,stock.numBian from lifeCycleGoods goods,lifeCycleStock stock where goods.typeId=stock.kuanId and goods.color=stock.color")//  2 ,表缓存val joinTable: DataFrame = dfJoin.cache()//  3 ,表注册 : joinTable ( stock.area,smallLei,typeColorId,weekofyear,numBian )joinTable.createOrReplaceTempView("joinTable")//  二 ,周聚合 : dfZhou ( area,smallLei,typeColorId,weekofyear,sumstock )//  1 ,周 sqlval dfZhou: DataFrame = spark.sql("select area,smallLei,typeColorId,weekofyear,sum(numBian) sumstock from joinTable group by area,smallLei,typeColorId,weekofyear order by area,smallLei,typeColorId,weekofyear")//  2 ,周缓存 :val zhouTable: DataFrame = dfZhou.cache()//  3 ,周注册 : ( area,smallLei,typeColorId,weekofyear,sumstock )zhouTable.createOrReplaceTempView("zhouTable")//  三 ,周累加,结果排序 ( over 里面 order by 到了谁,就按照谁来累加 )//  1 ,累加 sqlvar sqlLeiJia = "select " +"area,smallLei,typeColorId,weekofyear,sumstock," +"sum(sumstock) over(partition by area,smallLei,typeColorId order by area,smallLei,typeColorId,weekofyear) sumoverstock " +"from zhouTable " +"order by area,smallLei,typeColorId,weekofyear"//  2 ,执行,并且,将结果缓存 ( area,smallLei,typeColorId,weekofyear,sumstock,sumoverstock )val dfRes: DataFrame = spark.sql(sqlLeiJia).cache()//  四 ,将结果重新分区 ( 目的 : 分文件输出 ) 思路 : df->rdd->重新分区->rdd->df ( 用 area 分区 )//  1 ,所有的 area ,得到一个数组val dfAreaRes: DataFrame = spark.sql("select distinct(area) area from joinTable")val arr: Array[String] = dfAreaRes.rdd.map(row => {row.get(0).toString}).collect()//  2 ,自定义分区器 : 按照 area 分区val areaPartitioner: StockPartitioner = new StockPartitioner(arr)//  3 ,转换 df -> rdd ( area,smallLei,typeColorId,weekofyear,sumstock,sumoverstock )val rddRow: RDD[Row] = dfRes.rdd//      转换 : area,smallLei,typeColorId,weekofyear,sumstockval rddColumn: RDD[(String, (String, String, String, String, String, String))] = rddRow.map(row => {val area: String = row.get(0).toStringval smallLei: String = row.get(1).toStringval typeColorId: String = row.get(2).toStringval weekofyear: String = row.get(3).toStringval sumstock: String = row.get(4).toStringval sumoverstock: String = row.get(5).toString(area, (area, smallLei, typeColorId, weekofyear, sumstock,sumoverstock))})//  4 ,重新分区val rddPar: RDD[(String, (String, String, String, String, String, String))] = rddColumn.partitionBy(areaPartitioner)//  5 ,转换 rdd -> dfval dfResRes: DataFrame = rddPar.map(e=>e._2).toDF("area","smallLei","typeColorId","weekofyear","sumstock","sumsumstock")//  五 ,输出dfResRes.write.option("header","true").option("delimiter",",").csv("s3a://lifecyclebigdata/test/data/lifecycle/res02")spark.stop()}//  建 spark 对象//  建表 goods : lifeCycleGoods ( 18 列 )//  建表 stock : lifeCycleStock ( 10 列 )def getSpark(): SparkSession = {//  毫秒val timer1: Long = System.currentTimeMillis()//  1 ,spark 上下文val spark = SparkSession.builder()//  为了使用 webUI.config("spark.eventLog.enabled", "false")//  driver 进程的内存.config("spark.driver.memory", "2g")//  spark 的 shuffle 数量.config("spark.sql.shuffle.partitions", "100").appName("SparkDemoFromS3").getOrCreate()//  1 ,日志级别spark.sparkContext.setLogLevel("WARN")//  2 ,读资源文件val properties = new Properties()val stream: InputStream = Demo01_partition.getClass.getClassLoader.getResourceAsStream("s3.properties")properties.load(stream)//  3 ,设置数据源 ( s3 )val sc: SparkContext = spark.sparkContextsc.hadoopConfiguration.set("fs.s3a.access.key", properties.getProperty("fs.s3a.access.key"))sc.hadoopConfiguration.set("fs.s3a.secret.key", properties.getProperty("fs.s3a.secret.key"))sc.hadoopConfiguration.set("fs.s3a.endpoint", properties.getProperty("fs.s3a.endpoint"))//  4 ,隐式转换//  5 ,商品表,18 列 ( 款色号,款号,大类,中类,小类,性别,年季,商品年份,商品季节,颜色,上市月份,上市温度,订货评分,运营评分,波段,售价,系列,版型 )//  5 ,注意 : 读文件,有表头val dfSourceGoods: DataFrame = spark.read.option("header", "true").option("delimiter", ",").csv("s3a://lifecyclebigdata/test/data/lifecycle/goods.csv")val dfGoods: DataFrame = dfSourceGoods.toDF("typeColorId", "typeId", "bigLei", "midLei", "smallLei", "gendar", "yearSeason", "goodsYear", "goodsSeason", "color", "sellMonth", "sellTpt", "dingScore", "yunYingScore", "boDuan", "sellPrice", "serialId", "banType")dfGoods.cache().createOrReplaceTempView("lifeCycleGoods")//  6 ,业绩表,10 列 ( 区域,门店代码,销售月份,销售日期,款号,颜色,变动原因,库存变动量,店主,店铺类型 )//  6 ,注意 : 读文件,有表头val dfSourceStock: DataFrame = spark.read.option("header", "true").option("delimiter", ",").csv("s3a://lifecyclebigdata/test/data/lifecycle/stock.csv")val dfStock: DataFrame = dfSourceStock.toDF("area", "stockId", "sellMonth", "sellDate", "kuanId", "color", "reasonBian", "numBian", "owner", "stockType").withColumn("numBian", col("numBian").cast(IntegerType))dfStock.cache().createOrReplaceTempView("lifeCycleStock")spark}
}//  样例类,JoinGoods:stock.area,smallLei,typeColorId,weekofyear,numBian
//  case class JoinGoods(var area:String,var smallLei:String,var typeColorId:String,var weekofyear:String,var numBian:String)

21,spark sql 测试 : 1.4G 文件实战,测试耗时多少,先分区,再在分区内计算,用列内容分区( 这是一个很魔幻的问题 ),自定义分区相关推荐

  1. 【Spark实训】-- Spark SQL结构化数据文件处理 ②

    目录 题目:统计分析航空公司客户数据的空值以及异常值. 1.训练要点 2.需求说明 3.实现思路及步骤 4.具体实现过程代码与截图: 题目:统计分析航空公司客户数据的空值以及异常值. 1.训练要点 ( ...

  2. python 测试linux dev文件,Linux测试开发人员要掌握的Linux命令有哪些?

    今天小编要跟大家分享的文章是关于Linux测试开发人员要掌握的学习Linux和 一.文件和目录相关 文件和目录相关 1.进入目录--cd cd /usr/local //进入/usr/local这个目 ...

  3. 第69课:Spark SQL通过Hive数据源JOIN实战 每天晚上20:00YY频道现场授课频道68917580

    /* * *王家林老师授课http://weibo.com/ilovepains */ 每天晚上20:00YY频道现场授课频道68917580 源文件 person.txt Michael 29  A ...

  4. spark SQL读取ORC文件从Driver启动到开始执行Task(或stage)间隔时间太长(计算Partition时间太长)且产出orc单个文件中stripe个数太多问题解决方案...

    1.背景: 控制上游文件个数每天7000个,每个文件大小小于256M,50亿条+,orc格式.查看每个文件的stripe个数,500个左右,查询命令:hdfs fsck viewfs://hadoop ...

  5. Spark SQL: Relational Data Processing in Spark

    Spark SQL: Relational Data Processing in Spark Spark SQL : Spark中关系型处理模块 说明: 类似这样的说明并非是原作者的内容翻译,而是本篇 ...

  6. 快学Big Data -- Spark SQL总结(二十四)

    Spark  SQL 总结 概述 Spark  Sql 是用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用. 特点 spark  sql 要比 ...

  7. spark sql定义RDD、DataFrame与DataSet

    RDD 优点: 编译时类型安全 编译时就能检查出类型错误 面向对象的编程风格 直接通过类名点的方式来操作数据 缺点: 序列化和反序列化的性能开销 无论是集群间的通信, 还是IO操作都需要对对象的结构和 ...

  8. spark-sql建表语句限制_第三篇|Spark SQL编程指南

    在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...

  9. Spark SQL应用解析

    一  Spark SQL概述 1.1 什么是Spark SQL Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用 ...

最新文章

  1. 机器学习笔记(十六)强化学习
  2. 求解两个非负整数的最大公约数(C语言实现)
  3. 总结了点React,咱也不敢说
  4. C/Cpp / STL / map 的 key 为自定义的 struct 或者 class 时,有什么注意事项?
  5. 同意按钮,倒计时10秒,同意按钮变为可提交的
  6. JNDI配置原理详解
  7. php curl for win7_win7 wamp 64位 php环境开启curl服务遇到的问题及解决方法
  8. flex中DataGrid里使用itemRenderer后数据无法绑定到数据源的问题
  9. js 字符串转换成数字(转)
  10. JavaScript:模块化及模块化规范
  11. solr 忽略大小写
  12. Java 技术书籍大全
  13. HDU 6191 2017广西邀请赛Query on A Tree:可持久化01字典树(区间抑或最大值查询)
  14. 1.2. Linear and Quadratic Discriminant Analysis(线性判别和二次判别分析)(一)
  15. Git 报错:You have not concluded your cherry-pick (CHERRY_PICK_HEAD exists).Please, commit your changes
  16. Win10(Win7)通过注册表(regedit)添加、修改、删除系统环境变量、或系统服务(services.msc)
  17. linux双机连通找不到con1,Linux服务器崩溃 - 错误在反序列化(节点$ CON):错误的连接...
  18. html的img标签repeat,border-image-repeat属性怎么用
  19. 在Linux系统中解决 swap file “*.swp”already exists!问题
  20. 入职外包公司一年,人废了

热门文章

  1. 近万字Vue详细基础教程
  2. java查询历史记录的思路_JavaWeb之商品查看后历史记录代码实现
  3. EOJ3533. 庙会
  4. 纯跟踪算法用于无人车自动泊车
  5. 微信JS-SDK 分享到朋友圈和分享给朋友接口使用
  6. lca题目 敌对势力 题解
  7. 隐藏Boss——ddmlib使用入门
  8. jacobi 矩阵行列式
  9. 网络视频直播将给影视界带来重大影响
  10. C/C++语言100题练习计划 90——10 进制转 x 进制(进制转换实现)