Spark _22 _创建DataFrame的几种方式(一)
创建DataFrame的几种方式
读取json格式的文件创建DataFrame
注意:
- json文件中的json数据不能嵌套json格式数据。
- DataFrame是一个一个Row类型的RDD,df.rdd()/df.javaRdd()。
- 可以两种方式读取json格式的文件。
- df.show()默认显示前20行数据。
- DataFrame原生API可以操作DataFrame(不方便)。
- 注册成临时表时,表中的列默认按ascii顺序显示列。
json文件:
{"name":"george","age":"22"}
{"name":"lucy"}
{"name":"honey","age":"20"}
{"name":"KK","age":"20"}
javaAPI:
package SparkSql;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;/*** @author George* @description* 创建DataFrame的几种方式* 1.读取json格式的文件创建DataFrame**/
public class DFDemo {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("DF");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");//创建sqlContextSQLContext sqlContext = new SQLContext(sc);/*** DataFrame的底层是一个一个的RDD RDD的泛型是Row类型。* 以下两种方式都可以读取json格式的文件*/
// Dataset<Row> df = sqlContext.read().format("json").load("./data/json");Dataset<Row> df = sqlContext.read().json("./data/json");df.show();/*** +----+------+* | age| name|* +----+------+* | 22|george|* |null| lucy|* | 20| honey|* | 20| KK|* +----+------+*///DataFrame转换成RDDRDD<Row> rdd = df.rdd();/*** 显示 DataFrame中的内容,默认显示前20行。如果现实多行要指定多少行show(行数)* 注意:当有多个列时,显示的列先后顺序是按列的ascii码先后显示。*//*** 树形的形式显示schema信息*/df.printSchema();/*** root* |-- age: string (nullable = true)* |-- name: string (nullable = true)*//*** dataFram自带的API 操作DataFrame*/
// select name from tabledf.select("name").show();/*** +------+* | name|* +------+* |george|* | lucy|* | honey|* | KK|* +------+*///select name,age+10 from table where age>19df.select(df.col("name"),df.col("age").plus(10).alias("addAge")).show();/***+------+------+* | name|addAge|* +------+------+* |george| 32.0|* | lucy| null|* | honey| 30.0|* | KK| 30.0|* +------+------+*///select name,age from table where age > 19df.select(df.col("name"),df.col("age")).where(df.col("age").gt(19)).show();/*** +------+---+* | name|age|* +------+---+* |george| 22|* | honey| 20|* | KK| 20|* +------+---+*///select count(*) from table group by agedf.groupBy(df.col("age")).count().show();/***+----+-----+* | age|count|* +----+-----+* | 22| 1|* |null| 1|* | 20| 2|* +----+-----+*//*** 将DataFrame注册成临时的一张表,这张表临时注册到内存中,是逻辑上的表,不会雾化到磁盘*/df.registerTempTable("t1");Dataset<Row> sql1 = sqlContext.sql("select age,count(*) from t1 group by age");sql1.show();/*** +----+--------+* | age|count(1)|* +----+--------+* | 22| 1|* |null| 1|* | 20| 2|* +----+--------+*/Dataset<Row> sql2 = sqlContext.sql("select * from t1");sql2.show();/*** +----+------+* | age| name|* +----+------+* | 22|george|* |null| lucy|* | 20| honey|* | 20| KK|* +----+------+*/sc.stop();}
}
scalaAPI:
【友情补充】Spark2.3.0的DataFrame去哪了,DataSet是哪位?https://georgedage.blog.csdn.net/article/details/103072515
关于SQLContext过期,SparkSession登场https://georgedage.blog.csdn.net/article/details/102850878
package SparkSqlimport org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}/*** 创建DataFrame的几种方式* 1.读取json格式的文件创建DataFrame*/
object DFScalaDemo {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder.master("local").appName("www").getOrCreate()
// val frame: DataFrame = sparkSession.read.format("json").load("./data/json")val frame = sparkSession.read.json("./data/json")frame.show()/*** +----+------+* | age| name|* +----+------+* | 22|george|* |null| lucy|* | 20| honey|* | 20| KK|* +----+------+*/frame.printSchema()/*** root* |-- age: string (nullable = true)* |-- name: string (nullable = true)*///select * from tableframe.select(frame.col("name")).show()//select name from table where age>19frame.select(frame.col("name"),frame.col("age")).where(frame.col("age").gt(19)).show()//select count(*) from table group by ageframe.groupBy(frame.col("age")).count().show()/*** +------+* | name|* +------+* |george|* | lucy|* | honey|* | KK|* +------+* +------+---+* | name|age|* +------+---+* |george| 22|* | honey| 20|* | KK| 20|* +------+---+* +----+-----+* | age|count|* +----+-----+* | 22| 1|* |null| 1|* | 20| 2|* +----+-----+*//*** 注册临时表*///【友情提示】registerTempTable在1.6.0后@deprecated("Use createOrReplaceTempView(viewName) instead.",frame.createOrReplaceTempView("t1")sparkSession.sql("select * from t1").show()/*** +----+------+* | age| name|* +----+------+* | 22|george|* |null| lucy|* | 20| honey|* | 20| KK|* +----+------+*/sparkSession.stop()}
}
val conf = new SparkConf()
conf.setMaster("local").setAppName("jsonfile")val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.json("sparksql/json")
//val df1 = sqlContext.read.format("json").load("sparksql/json")df.show()
df.printSchema()
//select * from table
df.select(df.col("name")).show()
//select name from table where age>19
df.select(df.col("name"),df.col("age")).where(df.col("age").gt(19)).show()
//select count(*) from table group by age
df.groupBy(df.col("age")).count().show();/*** 注册临时表*/
df.registerTempTable("jtable")
val result = sqlContext.sql("select * from jtable")
result.show()
sc.stop()
通过json格式的RDD创建DataFrame
javaAPI:
package SparkSql;import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;import java.util.Arrays;/*** @author George* @description* 2.通过json格式的RDD创建DataFrame**/
public class DFDemo2 {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setAppName("www");conf.setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);//parallelize分发一个本地Scala集合来形成一个RDDJavaRDD<String> nameRDD = sc.parallelize(Arrays.asList("{\"name\":\"zhangsan\",\"age\":\"18\"}","{\"name\":\"lisi\",\"age\":\"19\"}","{\"name\":\"wangwu\",\"age\":\"20\"}"));JavaRDD<String> scoreRDD = sc.parallelize(Arrays.asList("{\"name\":\"zhangsan\",\"score\":\"100\"}","{\"name\":\"lisi\",\"score\":\"200\"}","{\"name\":\"wangwu\",\"score\":\"300\"}"));Dataset<Row> nameds = sqlContext.read().json(nameRDD);Dataset<Row> scoreds = sqlContext.read().json(scoreRDD);nameds.registerTempTable("name");scoreds.registerTempTable("score");Dataset<Row> res = sqlContext.sql("select * from name,score where name.name = score.name");res.show();/*** +---+--------+--------+-----+* |age| name| name|score|* +---+--------+--------+-----+* | 20| wangwu| wangwu| 300|* | 18|zhangsan|zhangsan| 100|* | 19| lisi| lisi| 200|* +---+--------+--------+-----+*/sc.stop();}
}
scalaAPI:
package SparkSqlimport org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SparkSession}object DFScalaDemo2 {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local").setAppName("jsonrdd")val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)val nameRDD = sc.makeRDD(Array("{\"name\":\"zhangsan\",\"age\":18}","{\"name\":\"lisi\",\"age\":19}","{\"name\":\"wangwu\",\"age\":20}"))val scoreRDD = sc.makeRDD(Array("{\"name\":\"zhangsan\",\"score\":100}","{\"name\":\"lisi\",\"score\":200}","{\"name\":\"wangwu\",\"score\":300}"))val nameDF = sqlContext.read.json(nameRDD)val scoreDF = sqlContext.read.json(scoreRDD)nameDF.registerTempTable("name")scoreDF.registerTempTable("score")val result = sqlContext.sql("select name.name,name.age,score.score from name,score where name.name = score.name")result.show()/*** +--------+---+-----+* | name|age|score|* +--------+---+-----+* | wangwu| 20| 300|* |zhangsan| 18| 100|* | lisi| 19| 200|* +--------+---+-----+*/sc.stop()}
}
非json格式的RDD创建DataFrame
- 通过反射的方式将非json格式的RDD转换成DataFrame(不建议使用)
- 自定义类要可序列化
- 自定义类的访问级别是Public
- RDD转成DataFrame后会根据映射将字段按Assci码排序
- 将DataFrame转换成RDD时获取字段两种方式,一种是df.getInt(0)下标获取(不推荐使用),另一种是df.getAs(“列名”)获取(推荐使用)
Person类:
package SparkSql;import java.io.Serializable;/*** @author George* @description**/
public class Person implements Serializable {private int id;private String name;private int age;public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "Person{" +"id=" + id +", name='" + name + '\'' +", age=" + age +'}';}
}
javaAPI:
package SparkSql;import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.SQLContext;import javax.sound.midi.SoundbankResource;/*** @author George* @description* 3.非json格式的RDD创建DataFrame**/
public class DFDemo3 {public static void main(String[] args) {/*** 注意:* 1.自定义类必须是可序列化的* 2.自定义类访问级别必须是Public* 3.RDD转成DataFrame会把自定义类中字段的名称按assci码排序*/SparkConf conf = new SparkConf();conf.setAppName("df");conf.setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> textFile = sc.textFile("./data/person.txt");JavaRDD<Person> map = textFile.map(new Function<String, Person>() {@Overridepublic Person call(String s) throws Exception {Person p = new Person();p.setId(Integer.parseInt(s.split(",")[0]));p.setName(s.split(",")[1]);p.setAge(Integer.parseInt(s.split(",")[2]));return p;}});map.foreach(new VoidFunction<Person>() {@Overridepublic void call(Person person) throws Exception {System.out.println(person);}});/*** Person{id=1, name='zhangsan', age=18}* Person{id=2, name='lisi', age=19}* Person{id=3, name='wangwu', age=20}*/sc.stop();}
}
scalaAPI:
package SparkSqlimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}object DFScalaDemo3 {case class Person(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("ww")val sc = new SparkContext(conf)sc.setLogLevel("error")val sqlContext: SQLContext = new SQLContext(sc)val tf = sc.textFile("./data/person.txt")/*** 将RDD隐式转换成DataFrame*/import sqlContext.implicits._val map: RDD[Person] = tf.map { x => {val person = Person(Integer.parseInt(x.split(",")(0)), x.split(",")(1), Integer.valueOf(x.split(",")(2)))person}}val frame = map.toDF()frame.show()/*** +---+--------+---+* | id| name|age|* +---+--------+---+* | 1|zhangsan| 18|* | 2| lisi| 19|* | 3| wangwu| 20|* +---+--------+---+*//*** 将DataFrame转换成PersonRDD*/val rdd = frame.rddval result = rdd.map(x => {Person(x.getAs("id"), x.getAs("name"), x.getAs("age"))})result.foreach(println)/*** Person(1,zhangsan,18)* Person(2,lisi,19)* Person(3,wangwu,20)*/sc.stop()}
}
- 动态创建Schema将非json格式的RDD转换成DataFrame
javaAPI:
package SparkSql;import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
import org.apache.hadoop.hive.serde2.thrift.TReflectionUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;import java.util.Arrays;
import java.util.List;/*** @author George* @description* 2)动态创建Schema将非json格式的RDD转换成DataFrame**/
public class DFDemo4 {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("ww");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> tf = sc.textFile("./data/person.txt");/*** 转换成Row类型的RDD*/JavaRDD<Row> map = tf.map(new Function<String, Row>() {@Overridepublic Row call(String v1) throws Exception {return RowFactory.create(Integer.parseInt(v1.split(",")[0]),String.valueOf(v1.split(",")[1]),Integer.valueOf(v1.split(",")[2]));}});/*** 动态构建DataFrame中的元数据,一般来说这里的字段可以来源自字符串,也可以来源于外部数据库*/List<StructField> asList = Arrays.asList(DataTypes.createStructField("id", DataTypes.IntegerType, true),DataTypes.createStructField("name", DataTypes.StringType, true),DataTypes.createStructField("age", DataTypes.IntegerType, true));StructType schema = DataTypes.createStructType(asList);Dataset<Row> dataFrame = sqlContext.createDataFrame(map, schema);dataFrame.show();/***+---+--------+---+* | id| name|age|* +---+--------+---+* | 1|zhangsan| 18|* | 2| lisi| 19|* | 3| wangwu| 20|* +---+--------+---+*/sc.stop();}
}
scalaAPI:
package SparkSqlimport org.apache.spark.sql.{RowFactory, SQLContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}object DFScalaDemo4 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("sww").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("error")val sqlContext = new SQLContext(sc)val tf = sc.textFile("./data/person.txt")val map = tf.map(x => {val strings = x.split(",")RowFactory.create(Integer.valueOf(strings(0)), strings(1), Integer.valueOf(strings(2)))})val schema = StructType(List(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("age",IntegerType,true)))val frame = sqlContext.createDataFrame(map,schema)frame.show()frame.printSchema()/*** +---+--------+---+* | id| name|age|* +---+--------+---+* | 1|zhangsan| 18|* | 2| lisi| 19|* | 3| wangwu| 20|* +---+--------+---+** root* |-- id: integer (nullable = true)* |-- name: string (nullable = true)* |-- age: integer (nullable = true)*/sc.stop()}
}
后续还有,别走开。哈哈哈
Spark _22 _创建DataFrame的几种方式(一)相关推荐
- pandas创建DataFrame的几种方式(建议收藏)
pandas创建DataFrame的几种方式 如果你是一个pandas初学者,那么不知道你会不会像我一样.在学用列表或者数组创建DataFrame时理不清怎样用数据生成以及想要形状的的Datafram ...
- Spark创建DataFrame的三种方法
跟关系数据库的表(Table)一样,DataFrame是Spark中对带模式(schema)行列数据的抽象.DateFrame广泛应用于使用SQL处理大数据的各种场景.创建DataFrame有很多种方 ...
- java 创建线程_【80期】说出Java创建线程的三种方式及对比
点击上方"Java面试题精选",关注公众号 面试刷图,查缺补漏 >>号外:往期面试题,10篇为一个单位归置到本公众号菜单栏->面试题,有需要的欢迎翻阅. 一.Ja ...
- java匿名启动线程_使用匿名内部类:来创建线程的两种方式
[Java]基础31:创建线程的两种方式 谢谢你的观看. 今天是我自学Java的第31天. 你的观看便是对我最大的鼓励. 话不多说,开始今天的学习: 线程的创建有两种方式,昨天学习过其中的一种: 创建 ...
- 创建表的两种方式 [MySQL][数据库]
创建表的两种方式 方式一: ----> 我将这种方式归结为白手起家的方式 CREATE TABLE [IF NOT EXISTS] 表名(#这里的每个字段之间使用,(逗号)隔开,但是最后面一个字 ...
- java多线程w3c_Java创建多线程的三种方式
前言 这篇文章主要讲述线程的概念.组成.Java创建多线程的三种方式以及线程的类型. 线程概念 线程和进程的区别 **进程:**正在运行的程序,例如:你打开 的qq音乐.exe程序,其由PCB(进程控 ...
- Spring注解创建Bean的几种方式
Spring注解创建Bean的几种方式 1.@Component系列 @Component @Service @Repository @Controller @Configuration 2. 依附于 ...
- 【并发编程】创建线程的四种方式
上一篇我们初步认识了线程,现在我们来讲一下,创建线程的三种方式 1.继承Thread 类通过继承thread类,然后重写run方法(run方法中是线程真正执行的代码,runable也是如此)即可.当子 ...
- js学习-DOM之动态创建元素的三种方式、插入元素、onkeydown与onkeyup两个事件整理...
动态创建元素的三种方式: 第一种: Document.write(); <body> <input type="button" id="btn" ...
最新文章
- R语言用户自定义函数的语法结构、编写自定义统计值计算函数(使用ifelse结构计算均值和标准差等)、编写自定义日期格式化(format)函数(switch函数使用不同分枝格式化日期数据)、应用自定函数
- JavaScript原型-进阶者指南
- [Security] Automatically adding CSRF tokens to ajax calls when using jQuery--转
- 用numpy把一个矩阵的一行或一列删除,再把剩下的拼在一起
- 排队器拦截_过滤器(Filter)和拦截器(Interceptor)的执行顺序和区别
- 向日葵远程使用备忘录
- 震惊kafka_5个令人震惊的统计数据证明日志不足
- spring整合mybatis采坑
- javascript进阶课程--第一章--函数
- Atitit. Atiposter 发帖机版本历史 编年史
- layui radio 赋初始值
- Python QT学习——一个简单的入门案例(一)
- h5页面唤起打电话、发短信功能
- xtrareport 修改行颜色
- gpio上拉下拉区别
- BatchUpdateException: Incorrect string value: '\xE9\x87\x8D\xE5\xBA\x86...'
- 亚马逊EC2使用账号密码登录
- html5页面3d滚动轮播,jQuery实现的3D版图片轮播示例【滑动轮播】
- videopose3d 用自己的视频跑结果
- EXCEL描述统计输出详解:标准误、置信度、偏度、峰度和JB检验