Spark创建DataFrame
1. DataFrame
在Spark中可以通过RDD转换为DataFrame,也可以通过DataFrame转化为RDD,DataFrame可以理解为数据的一个格式,实质show()
就是一张表。
读取数据构造DataFrame主要有以下几种方式:
- 从Json文件中读取
- 通过SQLContext构造类对象构造DataFrame
- 动态创建Schema构造当前的DataFrame结构
- 从parquet文件中读取
- 从MySQL中读取数据
- 从Hive中读取数据
2. 从json文件读取构造DataFrame
public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("dfTest");JavaSparkContext sc = new JavaSparkContext(conf);SQLContext sqlContext = new SQLContext(sc);DataFrame df1 = sqlContext.read().format("json").json("./scala/student");df1.show();// 通过注册临时表的方法,可以使用我们比较熟悉的方式查询当前数据df1.registerTempTable("student");
// DataFrame df2 = sqlContext.sql("select name,age from student where age > 23");// 通过最原始的方式查询当前表的一些数据,利用DataFrame可以直接查询,其实质就是一张表// 如果注册成为了一张临时表,此时通过SQLContext可以利用我们的查询语句进行直接的查询DataFrame df2 = df1.select(df1.col("name"), df1.col("age")).where(df1.col("age").lt(24));df2.show();sc.stop();}
直接读取然后格式化为json格式的信息即可。
2. 通过文件读取,动态构建类的RDD
public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("dfTestTwo");JavaSparkContext sc = new JavaSparkContext(conf);SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> rdd1 = sc.textFile("./scala/person");// 转换为Person的RDDJavaRDD<Person> personRdd = rdd1.map(new Function<String, Person>() {@Overridepublic Person call(String s) throws Exception {String[] split = s.split(",");return new Person(Integer.valueOf(split[0]), split[1], split[2]);}});// 构建DataFrame整体的框架DataFrame df1 = sqlContext.createDataFrame(personRdd, Person.class);df1.show();df1.registerTempTable("person");DataFrame sql = sqlContext.sql("select name,age,address from person where age in (23,24)");sql.show();// 现在通过DataFrame构建person对象// 首先要获取当前dataFrame的一个JavaRDD对象JavaRDD<Row> rowJavaRDD = sql.javaRDD();// 然后将当前的JavaRDD<Row>对象转化为JavaRDD<Person>对象即可JavaRDD<Person> personJavaRDD = rowJavaRDD.map(new Function<Row, Person>() {@Overridepublic Person call(Row row) throws Exception {Person person = new Person();person.setAge(Integer.valueOf(row.getAs("age") + ""));person.setName(row.getAs("name") + "");person.setAddress(row.getAs("address") + "");return person;}});personJavaRDD.foreach(new VoidFunction<Person>() {@Overridepublic void call(Person person) throws Exception {System.out.println(person);}});sc.stop();}
首先需要构造一个当前的类,然后读取类的信息,然后构造当前类的一个DataFrame
3. 动态创建Schema构造DataFrame
public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("dfTestThree");JavaSparkContext sc = new JavaSparkContext(conf);SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> rdd1 = sc.textFile("./scala/person");// 动态构建SchemaJavaRDD<Row> rowJavaRDD = rdd1.map(new Function<String, Row>() {@Overridepublic Row call(String s) throws Exception {String[] split = s.split(",");return RowFactory.create(split[1], Integer.parseInt(split[0]), split[2]);}});// 动态创建schema,字段需要匹配操作List<StructField> fields = Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, true),DataTypes.createStructField("age", DataTypes.IntegerType, true),DataTypes.createStructField("address", DataTypes.StringType, true));// 创建schema,结构StructType schma = DataTypes.createStructType(fields);// 动态创建DataFrameDataFrame dataFrame = sqlContext.createDataFrame(rowJavaRDD, schma);dataFrame.show();sc.stop();}
构造属性集合,然后构建结构类型Schema,最后通过SQLContext构造DataFrame
4. 从parquet文件中读取构造
parquet实质也是Spark主动生成的一种压缩数据格式的文件。
public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("parquent");JavaSparkContext sc = new JavaSparkContext(conf);SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> javaRDD = sc.textFile("./scala/student");DataFrame df = sqlContext.read().json(javaRDD);df.write().mode(SaveMode.Ignore).format("parquet").save("./scala/parquent");DataFrame load = sqlContext.read().format("parquet").load("./scala/parquent");load.show();sc.stop();}
5. 从MySQL中读取数据构造DataFrame
通过远程连接MySQL获取数据,但是获取数据之前,需要先将各个字段给构造好。
public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("dfSql");conf.set("spark.sql.shuffle.partitions","200");JavaSparkContext sc = new JavaSparkContext(conf);SQLContext sqlContext = new SQLContext(sc);/*** 人为构建DataFrame然后将数据写入其中即可写的其实就是当前的一个DataFrame* 即构建DataFrame,带入的数据即是我们构建时写入的数据** 构建DataFrame的话,首先需要读入数据,然后通过StructField构建当前的一个表格属性的List* 然后通过DataTypes构建当前的数据类型StructType,最后通过sqlContext构建当前的一个DataFrame*/JavaRDD<String> javaRDD = sc.parallelize(Arrays.asList("1 Caoduanxi 24", "2 Zhangsan 21", "3 Lisi 22"));JavaRDD<Row> map = javaRDD.map(new Function<String, Row>() {@Overridepublic Row call(String s) throws Exception {String[] s1 = s.split(" ");return RowFactory.create(Integer.parseInt(s1[0] + ""), s1[1], s1[2]);}});List<StructField> structFields = new ArrayList<>();structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));structFields.add(DataTypes.createStructField("age", DataTypes.StringType, true));StructType structType = DataTypes.createStructType(structFields);DataFrame df = sqlContext.createDataFrame(map, structType);Properties properties = new Properties();properties.setProperty("user", "***");properties.setProperty("password", "****");df.write().mode(SaveMode.Append).jdbc("jdbc:mysql://106.15.1xx.xx:3306/study", "person", properties);/*填写参数获取此时数据库的信息*/
// Map<String,String> map = new HashMap<String,String>();
// // 参数也可以一个一个的传入
// map.put("url","jdbc:mysql://106.15.125.75:3306/study");
// map.put("driver","com.mysql.jdbc.Driver");
// map.put("user","root");
// map.put("password","123456");
// map.put("dbtable","logs");
//
// DataFrame df = sqlContext.read().format("jdbc").options(map).load();
// df.show();// 单独录入也可以,但是需要format格式为jdbc格式,然后需要load一下数据才可以被加载位当前的DataFrameDataFrameReader dfReader = sqlContext.read().format("jdbc");dfReader.option("url", "jdbc:mysql://106.15.125.75:3306/study");dfReader.option("driver", "com.mysql.jdbc.Driver");dfReader.option("user", "root");dfReader.option("password", "123456");dfReader.option("dbtable", "logs");// DataFrame df2 = dfReader.load();
// df2.show();
//
// df2.registerTempTable("logs");
// DataFrame sql = sqlContext.sql("select * from logs");
// sql.show();
//
// // 将结果写回mysql中
// Properties properties = new Properties();
// properties.setProperty("user", "root");
// properties.setProperty("password", "123456");
// // 如果自己认为构造一张表出来的话,还是可以直接放入到其中实现的
// df2.write().mode(SaveMode.Append).jdbc("jdbc:mysql://106.15.125.75:3306/study", "logs", properties);System.out.println("****finished****");sc.stop();}
6. 小结
数据的处理主要是通过RDD来实现的,主要是一些算子在其中主导数据的处理,但是输入的数据要想转换为RDD可处理,需要先转换为DataFrame,然后再转化为RDD,此时可以通过Transformations和Actions算子来处理即可。
Keep thinking, keep coding! 2020-9-24 南京
Spark创建DataFrame相关推荐
- Spark创建DataFrame的三种方法
跟关系数据库的表(Table)一样,DataFrame是Spark中对带模式(schema)行列数据的抽象.DateFrame广泛应用于使用SQL处理大数据的各种场景.创建DataFrame有很多种方 ...
- Spark _24 _读取JDBC中的数据创建DataFrame/DataSet(MySql为例)(三)
两种方式创建DataSet 现在数据库中创建表不能给插入少量数据. javaapi: package SparkSql;import org.apache.spark.SparkConf; impor ...
- Spark _22 _创建DataFrame的几种方式(一)
创建DataFrame的几种方式 读取json格式的文件创建DataFrame 注意: json文件中的json数据不能嵌套json格式数据. DataFrame是一个一个Row类型的RDD,df.r ...
- Spark创建空的DataFrame
前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家.点击跳转到网站:https://www.captainai.net/dongkelun 前言 本文主要给出Spark创 ...
- dataframe 排序_疯狂Spark之DataFrame创建方式详解一(九)
创建DataFrame的几种方式 1.读取json格式的文件创建DataFrame 注意: 1. json文件中的json数据不能嵌套json格式数据. 2. DataFrame是一个一个Row类型的 ...
- spark1.4加载mysql数据 创建Dataframe及join操作连接方法问题
首先我们使用新的API方法连接mysql加载数据 创建DF import org.apache.spark.sql.DataFrame import org.apache.spark.{SparkCo ...
- python spark dataframe_pyspark dataframe 常用操作
spark dataframe派生于RDD类,但是提供了非常强大的数据操作功能.当然主要对类SQL的支持. 在实际工作中会遇到这样的情况,主要是会进行两个数据集的筛选.合并,重新入库. 首先加载数据集 ...
- 向Spark的DataFrame增加一列数据
前言 先说个题外话,如何给hive表增加一个列,并且该把该列的所有字段设为'China'? 如果仅仅是增加一列倒是很简单: alter table test add columns(flag stri ...
- SparkSQL 创建 DataFrame 的方式
1.读取 json 格式的文件创建 DataFrame 注意: 可以两种方式读取 json 格式的文件. df.show()默认显示前 20 行数据. DataFrame 原生 API 可以操作 Da ...
最新文章
- 激光雷达数据到云cloud
- r语言 bsda包_使用R语言creditmodel包进行Vintage分析或留存率分析
- R语言导入、读取网络CSV数据(Read a CSV from a URL)实战:R原生read_csv、readr包、data.table
- Python(四)字符串
- 前端学Markdown
- 亿级别记录的mongodb分页查询java代码实现
- Spring学习7之自动装配Bean03
- asp.net的dropDownlist只显示第一个字
- 第一年的要求 工程系的研究生
- 【编程】二叉搜索树的定义
- Gis 热点技术分析
- L1-038. 新世界
- 互联网晚报 | 8月21日 星期六 | 中国电信正式在A股上市;呷哺呷哺将关闭200家亏损门店;个人信息保护法表决通过...
- python sanic_Sanic框架安装与简单入门示例
- spring interceptor 拦截方法,判断用户是否存在
- linux中下载python_linux下python安装
- 微信群网址活码在线生成系统源码 二维码活码生成
- linux使用USB转串口驱动设置
- 基因型填充中的phasing究竟是什么
- China Mobile”、“China Unicom”,请修改为中国移动、中国联通