• 创建DataFrame的几种方式

  • 读取json格式的文件创建DataFrame

注意:

  • json文件中的json数据不能嵌套json格式数据。
  • DataFrame是一个一个Row类型的RDD,df.rdd()/df.javaRdd()。
  • 可以两种方式读取json格式的文件。
  • df.show()默认显示前20行数据。
  • DataFrame原生API可以操作DataFrame(不方便)。
  • 注册成临时表时,表中的列默认按ascii顺序显示列。

json文件:

{"name":"george","age":"22"}
{"name":"lucy"}
{"name":"honey","age":"20"}
{"name":"KK","age":"20"}

javaAPI:

package SparkSql;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;/*** @author George* @description* 创建DataFrame的几种方式* 1.读取json格式的文件创建DataFrame**/
public class DFDemo {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("DF");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");//创建sqlContextSQLContext sqlContext = new SQLContext(sc);/*** DataFrame的底层是一个一个的RDD  RDD的泛型是Row类型。* 以下两种方式都可以读取json格式的文件*/
//        Dataset<Row> df = sqlContext.read().format("json").load("./data/json");Dataset<Row> df = sqlContext.read().json("./data/json");df.show();/*** +----+------+* | age|  name|* +----+------+* |  22|george|* |null|  lucy|* |  20| honey|* |  20|    KK|* +----+------+*///DataFrame转换成RDDRDD<Row> rdd = df.rdd();/*** 显示 DataFrame中的内容,默认显示前20行。如果现实多行要指定多少行show(行数)* 注意:当有多个列时,显示的列先后顺序是按列的ascii码先后显示。*//*** 树形的形式显示schema信息*/df.printSchema();/*** root*  |-- age: string (nullable = true)*  |-- name: string (nullable = true)*//*** dataFram自带的API 操作DataFrame*/
//        select name from tabledf.select("name").show();/*** +------+* |  name|* +------+* |george|* |  lucy|* | honey|* |    KK|* +------+*///select name,age+10 from table where age>19df.select(df.col("name"),df.col("age").plus(10).alias("addAge")).show();/***+------+------+* |  name|addAge|* +------+------+* |george|  32.0|* |  lucy|  null|* | honey|  30.0|* |    KK|  30.0|* +------+------+*///select name,age from table where age > 19df.select(df.col("name"),df.col("age")).where(df.col("age").gt(19)).show();/*** +------+---+* |  name|age|* +------+---+* |george| 22|* | honey| 20|* |    KK| 20|* +------+---+*///select count(*) from table group by agedf.groupBy(df.col("age")).count().show();/***+----+-----+* | age|count|* +----+-----+* |  22|    1|* |null|    1|* |  20|    2|* +----+-----+*//*** 将DataFrame注册成临时的一张表,这张表临时注册到内存中,是逻辑上的表,不会雾化到磁盘*/df.registerTempTable("t1");Dataset<Row> sql1 = sqlContext.sql("select age,count(*) from t1 group by age");sql1.show();/*** +----+--------+* | age|count(1)|* +----+--------+* |  22|       1|* |null|       1|* |  20|       2|* +----+--------+*/Dataset<Row> sql2 = sqlContext.sql("select * from t1");sql2.show();/*** +----+------+* | age|  name|* +----+------+* |  22|george|* |null|  lucy|* |  20| honey|* |  20|    KK|* +----+------+*/sc.stop();}
}

scalaAPI:

【友情补充】Spark2.3.0的DataFrame去哪了,DataSet是哪位?https://georgedage.blog.csdn.net/article/details/103072515

关于SQLContext过期,SparkSession登场https://georgedage.blog.csdn.net/article/details/102850878

package SparkSqlimport org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}/*** 创建DataFrame的几种方式* 1.读取json格式的文件创建DataFrame*/
object DFScalaDemo {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder.master("local").appName("www").getOrCreate()
//    val frame: DataFrame = sparkSession.read.format("json").load("./data/json")val frame = sparkSession.read.json("./data/json")frame.show()/*** +----+------+* | age|  name|* +----+------+* |  22|george|* |null|  lucy|* |  20| honey|* |  20|    KK|* +----+------+*/frame.printSchema()/*** root* |-- age: string (nullable = true)* |-- name: string (nullable = true)*///select * from tableframe.select(frame.col("name")).show()//select name from table where age>19frame.select(frame.col("name"),frame.col("age")).where(frame.col("age").gt(19)).show()//select count(*) from table group by ageframe.groupBy(frame.col("age")).count().show()/*** +------+* |  name|* +------+* |george|* |  lucy|* | honey|* |    KK|* +------+* +------+---+* |  name|age|* +------+---+* |george| 22|* | honey| 20|* |    KK| 20|* +------+---+* +----+-----+* | age|count|* +----+-----+* |  22|    1|* |null|    1|* |  20|    2|* +----+-----+*//*** 注册临时表*///【友情提示】registerTempTable在1.6.0后@deprecated("Use createOrReplaceTempView(viewName) instead.",frame.createOrReplaceTempView("t1")sparkSession.sql("select * from t1").show()/*** +----+------+* | age|  name|* +----+------+* |  22|george|* |null|  lucy|* |  20| honey|* |  20|    KK|* +----+------+*/sparkSession.stop()}
}
val conf = new SparkConf()
conf.setMaster("local").setAppName("jsonfile")val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.json("sparksql/json")
//val df1 = sqlContext.read.format("json").load("sparksql/json")df.show()
df.printSchema()
//select * from table
df.select(df.col("name")).show()
//select name from table where age>19
df.select(df.col("name"),df.col("age")).where(df.col("age").gt(19)).show()
//select count(*) from table group by age
df.groupBy(df.col("age")).count().show();/*** 注册临时表*/
df.registerTempTable("jtable")
val result  = sqlContext.sql("select  * from jtable")
result.show()
sc.stop()

  • 通过json格式的RDD创建DataFrame

javaAPI:

package SparkSql;import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;import java.util.Arrays;/*** @author George* @description* 2.通过json格式的RDD创建DataFrame**/
public class DFDemo2 {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setAppName("www");conf.setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);//parallelize分发一个本地Scala集合来形成一个RDDJavaRDD<String> nameRDD = sc.parallelize(Arrays.asList("{\"name\":\"zhangsan\",\"age\":\"18\"}","{\"name\":\"lisi\",\"age\":\"19\"}","{\"name\":\"wangwu\",\"age\":\"20\"}"));JavaRDD<String> scoreRDD = sc.parallelize(Arrays.asList("{\"name\":\"zhangsan\",\"score\":\"100\"}","{\"name\":\"lisi\",\"score\":\"200\"}","{\"name\":\"wangwu\",\"score\":\"300\"}"));Dataset<Row> nameds = sqlContext.read().json(nameRDD);Dataset<Row> scoreds = sqlContext.read().json(scoreRDD);nameds.registerTempTable("name");scoreds.registerTempTable("score");Dataset<Row> res = sqlContext.sql("select * from name,score where name.name = score.name");res.show();/*** +---+--------+--------+-----+* |age|    name|    name|score|* +---+--------+--------+-----+* | 20|  wangwu|  wangwu|  300|* | 18|zhangsan|zhangsan|  100|* | 19|    lisi|    lisi|  200|* +---+--------+--------+-----+*/sc.stop();}
}

scalaAPI:

package SparkSqlimport org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SparkSession}object DFScalaDemo2 {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local").setAppName("jsonrdd")val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)val nameRDD = sc.makeRDD(Array("{\"name\":\"zhangsan\",\"age\":18}","{\"name\":\"lisi\",\"age\":19}","{\"name\":\"wangwu\",\"age\":20}"))val scoreRDD = sc.makeRDD(Array("{\"name\":\"zhangsan\",\"score\":100}","{\"name\":\"lisi\",\"score\":200}","{\"name\":\"wangwu\",\"score\":300}"))val nameDF = sqlContext.read.json(nameRDD)val scoreDF = sqlContext.read.json(scoreRDD)nameDF.registerTempTable("name")scoreDF.registerTempTable("score")val result = sqlContext.sql("select name.name,name.age,score.score from name,score where name.name = score.name")result.show()/*** +--------+---+-----+* |    name|age|score|* +--------+---+-----+* |  wangwu| 20|  300|* |zhangsan| 18|  100|* |    lisi| 19|  200|* +--------+---+-----+*/sc.stop()}
}

  • 非json格式的RDD创建DataFrame

  • 通过反射的方式将非json格式的RDD转换成DataFrame(不建议使用)
  1. 自定义类要可序列化
  2. 自定义类的访问级别是Public
  3. RDD转成DataFrame后会根据映射将字段按Assci码排序
  4. 将DataFrame转换成RDD时获取字段两种方式,一种是df.getInt(0)下标获取(不推荐使用),另一种是df.getAs(“列名”)获取(推荐使用)

Person类:

package SparkSql;import java.io.Serializable;/*** @author George* @description**/
public class Person implements Serializable {private int id;private String name;private int age;public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "Person{" +"id=" + id +", name='" + name + '\'' +", age=" + age +'}';}
}

javaAPI:

package SparkSql;import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.SQLContext;import javax.sound.midi.SoundbankResource;/*** @author George* @description* 3.非json格式的RDD创建DataFrame**/
public class DFDemo3 {public static void main(String[] args) {/*** 注意:* 1.自定义类必须是可序列化的* 2.自定义类访问级别必须是Public* 3.RDD转成DataFrame会把自定义类中字段的名称按assci码排序*/SparkConf conf = new SparkConf();conf.setAppName("df");conf.setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> textFile = sc.textFile("./data/person.txt");JavaRDD<Person> map = textFile.map(new Function<String, Person>() {@Overridepublic Person call(String s) throws Exception {Person p = new Person();p.setId(Integer.parseInt(s.split(",")[0]));p.setName(s.split(",")[1]);p.setAge(Integer.parseInt(s.split(",")[2]));return p;}});map.foreach(new VoidFunction<Person>() {@Overridepublic void call(Person person) throws Exception {System.out.println(person);}});/*** Person{id=1, name='zhangsan', age=18}* Person{id=2, name='lisi', age=19}* Person{id=3, name='wangwu', age=20}*/sc.stop();}
}

scalaAPI:

package SparkSqlimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}object DFScalaDemo3 {case class Person(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("ww")val sc = new SparkContext(conf)sc.setLogLevel("error")val sqlContext: SQLContext = new SQLContext(sc)val tf = sc.textFile("./data/person.txt")/*** 将RDD隐式转换成DataFrame*/import sqlContext.implicits._val map: RDD[Person] = tf.map { x => {val person = Person(Integer.parseInt(x.split(",")(0)), x.split(",")(1), Integer.valueOf(x.split(",")(2)))person}}val frame = map.toDF()frame.show()/*** +---+--------+---+* | id|    name|age|* +---+--------+---+* |  1|zhangsan| 18|* |  2|    lisi| 19|* |  3|  wangwu| 20|* +---+--------+---+*//*** 将DataFrame转换成PersonRDD*/val rdd = frame.rddval result = rdd.map(x => {Person(x.getAs("id"), x.getAs("name"), x.getAs("age"))})result.foreach(println)/*** Person(1,zhangsan,18)* Person(2,lisi,19)* Person(3,wangwu,20)*/sc.stop()}
}
  • 动态创建Schema将非json格式的RDD转换成DataFrame

javaAPI:

package SparkSql;import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
import org.apache.hadoop.hive.serde2.thrift.TReflectionUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;import java.util.Arrays;
import java.util.List;/*** @author George* @description* 2)动态创建Schema将非json格式的RDD转换成DataFrame**/
public class DFDemo4 {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("ww");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> tf = sc.textFile("./data/person.txt");/***  转换成Row类型的RDD*/JavaRDD<Row> map = tf.map(new Function<String, Row>() {@Overridepublic Row call(String v1) throws Exception {return RowFactory.create(Integer.parseInt(v1.split(",")[0]),String.valueOf(v1.split(",")[1]),Integer.valueOf(v1.split(",")[2]));}});/*** 动态构建DataFrame中的元数据,一般来说这里的字段可以来源自字符串,也可以来源于外部数据库*/List<StructField> asList = Arrays.asList(DataTypes.createStructField("id", DataTypes.IntegerType, true),DataTypes.createStructField("name", DataTypes.StringType, true),DataTypes.createStructField("age", DataTypes.IntegerType, true));StructType schema = DataTypes.createStructType(asList);Dataset<Row> dataFrame = sqlContext.createDataFrame(map, schema);dataFrame.show();/***+---+--------+---+* | id|    name|age|* +---+--------+---+* |  1|zhangsan| 18|* |  2|    lisi| 19|* |  3|  wangwu| 20|* +---+--------+---+*/sc.stop();}
}

scalaAPI:

package SparkSqlimport org.apache.spark.sql.{RowFactory, SQLContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}object DFScalaDemo4 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("sww").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("error")val sqlContext = new SQLContext(sc)val tf = sc.textFile("./data/person.txt")val map = tf.map(x => {val strings = x.split(",")RowFactory.create(Integer.valueOf(strings(0)), strings(1), Integer.valueOf(strings(2)))})val schema = StructType(List(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("age",IntegerType,true)))val frame = sqlContext.createDataFrame(map,schema)frame.show()frame.printSchema()/*** +---+--------+---+* | id|    name|age|* +---+--------+---+* |  1|zhangsan| 18|* |  2|    lisi| 19|* |  3|  wangwu| 20|* +---+--------+---+** root* |-- id: integer (nullable = true)* |-- name: string (nullable = true)* |-- age: integer (nullable = true)*/sc.stop()}
}

后续还有,别走开。哈哈哈

Spark _22 _创建DataFrame的几种方式(一)相关推荐

  1. pandas创建DataFrame的几种方式(建议收藏)

    pandas创建DataFrame的几种方式 如果你是一个pandas初学者,那么不知道你会不会像我一样.在学用列表或者数组创建DataFrame时理不清怎样用数据生成以及想要形状的的Datafram ...

  2. Spark创建DataFrame的三种方法

    跟关系数据库的表(Table)一样,DataFrame是Spark中对带模式(schema)行列数据的抽象.DateFrame广泛应用于使用SQL处理大数据的各种场景.创建DataFrame有很多种方 ...

  3. java 创建线程_【80期】说出Java创建线程的三种方式及对比

    点击上方"Java面试题精选",关注公众号 面试刷图,查缺补漏 >>号外:往期面试题,10篇为一个单位归置到本公众号菜单栏->面试题,有需要的欢迎翻阅. 一.Ja ...

  4. java匿名启动线程_使用匿名内部类:来创建线程的两种方式

    [Java]基础31:创建线程的两种方式 谢谢你的观看. 今天是我自学Java的第31天. 你的观看便是对我最大的鼓励. 话不多说,开始今天的学习: 线程的创建有两种方式,昨天学习过其中的一种: 创建 ...

  5. 创建表的两种方式 [MySQL][数据库]

    创建表的两种方式 方式一: ----> 我将这种方式归结为白手起家的方式 CREATE TABLE [IF NOT EXISTS] 表名(#这里的每个字段之间使用,(逗号)隔开,但是最后面一个字 ...

  6. java多线程w3c_Java创建多线程的三种方式

    前言 这篇文章主要讲述线程的概念.组成.Java创建多线程的三种方式以及线程的类型. 线程概念 线程和进程的区别 **进程:**正在运行的程序,例如:你打开 的qq音乐.exe程序,其由PCB(进程控 ...

  7. Spring注解创建Bean的几种方式

    Spring注解创建Bean的几种方式 1.@Component系列 @Component @Service @Repository @Controller @Configuration 2. 依附于 ...

  8. 【并发编程】创建线程的四种方式

    上一篇我们初步认识了线程,现在我们来讲一下,创建线程的三种方式 1.继承Thread 类通过继承thread类,然后重写run方法(run方法中是线程真正执行的代码,runable也是如此)即可.当子 ...

  9. js学习-DOM之动态创建元素的三种方式、插入元素、onkeydown与onkeyup两个事件整理...

    动态创建元素的三种方式: 第一种: Document.write(); <body> <input type="button" id="btn" ...

最新文章

  1. R语言用户自定义函数的语法结构、编写自定义统计值计算函数(使用ifelse结构计算均值和标准差等)、编写自定义日期格式化(format)函数(switch函数使用不同分枝格式化日期数据)、应用自定函数
  2. JavaScript原型-进阶者指南
  3. [Security] Automatically adding CSRF tokens to ajax calls when using jQuery--转
  4. 用numpy把一个矩阵的一行或一列删除,再把剩下的拼在一起
  5. 排队器拦截_过滤器(Filter)和拦截器(Interceptor)的执行顺序和区别
  6. 向日葵远程使用备忘录
  7. 震惊kafka_5个令人震惊的统计数据证明日志不足
  8. spring整合mybatis采坑
  9. javascript进阶课程--第一章--函数
  10. Atitit. Atiposter 发帖机版本历史 编年史
  11. layui radio 赋初始值
  12. Python QT学习——一个简单的入门案例(一)
  13. h5页面唤起打电话、发短信功能
  14. xtrareport 修改行颜色
  15. gpio上拉下拉区别
  16. BatchUpdateException: Incorrect string value: '\xE9\x87\x8D\xE5\xBA\x86...'
  17. 亚马逊EC2使用账号密码登录
  18. html5页面3d滚动轮播,jQuery实现的3D版图片轮播示例【滑动轮播】
  19. videopose3d 用自己的视频跑结果
  20. EXCEL描述统计输出详解:标准误、置信度、偏度、峰度和JB检验

热门文章

  1. 四、启动OpenLDAP服务器
  2. 牛客多校6 - Binary Vector(组合数学+推公式)
  3. CodeForces - 123B Squares(简单几何+旋转坐标系)
  4. TensorFlow2-循环神经网络
  5. 切割固定长度字符串的方法_木质踢脚线安装的施工方法
  6. 玩转Google开源C++单元测试框架Google Test系列(gtest)之三 - 事件机制
  7. HTTP/2 流量调试
  8. tomcat源码运行
  9. MySQL大表优化技术要点科普
  10. Spring Boot 几条最佳实践!