1. DataFrame

在Spark中可以通过RDD转换为DataFrame,也可以通过DataFrame转化为RDD,DataFrame可以理解为数据的一个格式,实质show()就是一张表。

读取数据构造DataFrame主要有以下几种方式:

  • 从Json文件中读取
  • 通过SQLContext构造类对象构造DataFrame
  • 动态创建Schema构造当前的DataFrame结构
  • 从parquet文件中读取
  • 从MySQL中读取数据
  • 从Hive中读取数据

2. 从json文件读取构造DataFrame

 public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("dfTest");JavaSparkContext sc = new JavaSparkContext(conf);SQLContext sqlContext = new SQLContext(sc);DataFrame df1 = sqlContext.read().format("json").json("./scala/student");df1.show();// 通过注册临时表的方法,可以使用我们比较熟悉的方式查询当前数据df1.registerTempTable("student");
//        DataFrame df2 = sqlContext.sql("select name,age from student where age > 23");// 通过最原始的方式查询当前表的一些数据,利用DataFrame可以直接查询,其实质就是一张表// 如果注册成为了一张临时表,此时通过SQLContext可以利用我们的查询语句进行直接的查询DataFrame df2 = df1.select(df1.col("name"), df1.col("age")).where(df1.col("age").lt(24));df2.show();sc.stop();}

直接读取然后格式化为json格式的信息即可。

2. 通过文件读取,动态构建类的RDD

public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("dfTestTwo");JavaSparkContext sc = new JavaSparkContext(conf);SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> rdd1 = sc.textFile("./scala/person");// 转换为Person的RDDJavaRDD<Person> personRdd = rdd1.map(new Function<String, Person>() {@Overridepublic Person call(String s) throws Exception {String[] split = s.split(",");return new Person(Integer.valueOf(split[0]), split[1], split[2]);}});// 构建DataFrame整体的框架DataFrame df1 = sqlContext.createDataFrame(personRdd, Person.class);df1.show();df1.registerTempTable("person");DataFrame sql = sqlContext.sql("select name,age,address from person where age in (23,24)");sql.show();// 现在通过DataFrame构建person对象// 首先要获取当前dataFrame的一个JavaRDD对象JavaRDD<Row> rowJavaRDD = sql.javaRDD();// 然后将当前的JavaRDD<Row>对象转化为JavaRDD<Person>对象即可JavaRDD<Person> personJavaRDD = rowJavaRDD.map(new Function<Row, Person>() {@Overridepublic Person call(Row row) throws Exception {Person person = new Person();person.setAge(Integer.valueOf(row.getAs("age") + ""));person.setName(row.getAs("name") + "");person.setAddress(row.getAs("address") + "");return person;}});personJavaRDD.foreach(new VoidFunction<Person>() {@Overridepublic void call(Person person) throws Exception {System.out.println(person);}});sc.stop();}

首先需要构造一个当前的类,然后读取类的信息,然后构造当前类的一个DataFrame

3. 动态创建Schema构造DataFrame

 public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("dfTestThree");JavaSparkContext sc = new JavaSparkContext(conf);SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> rdd1 = sc.textFile("./scala/person");// 动态构建SchemaJavaRDD<Row> rowJavaRDD = rdd1.map(new Function<String, Row>() {@Overridepublic Row call(String s) throws Exception {String[] split = s.split(",");return RowFactory.create(split[1], Integer.parseInt(split[0]), split[2]);}});// 动态创建schema,字段需要匹配操作List<StructField> fields = Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, true),DataTypes.createStructField("age", DataTypes.IntegerType, true),DataTypes.createStructField("address", DataTypes.StringType, true));// 创建schema,结构StructType schma = DataTypes.createStructType(fields);// 动态创建DataFrameDataFrame dataFrame = sqlContext.createDataFrame(rowJavaRDD, schma);dataFrame.show();sc.stop();}

构造属性集合,然后构建结构类型Schema,最后通过SQLContext构造DataFrame

4. 从parquet文件中读取构造

parquet实质也是Spark主动生成的一种压缩数据格式的文件。

 public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("parquent");JavaSparkContext sc = new JavaSparkContext(conf);SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> javaRDD = sc.textFile("./scala/student");DataFrame df = sqlContext.read().json(javaRDD);df.write().mode(SaveMode.Ignore).format("parquet").save("./scala/parquent");DataFrame load = sqlContext.read().format("parquet").load("./scala/parquent");load.show();sc.stop();}

5. 从MySQL中读取数据构造DataFrame

通过远程连接MySQL获取数据,但是获取数据之前,需要先将各个字段给构造好。

 public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("dfSql");conf.set("spark.sql.shuffle.partitions","200");JavaSparkContext sc = new JavaSparkContext(conf);SQLContext sqlContext = new SQLContext(sc);/*** 人为构建DataFrame然后将数据写入其中即可写的其实就是当前的一个DataFrame* 即构建DataFrame,带入的数据即是我们构建时写入的数据** 构建DataFrame的话,首先需要读入数据,然后通过StructField构建当前的一个表格属性的List* 然后通过DataTypes构建当前的数据类型StructType,最后通过sqlContext构建当前的一个DataFrame*/JavaRDD<String> javaRDD = sc.parallelize(Arrays.asList("1 Caoduanxi 24", "2 Zhangsan 21", "3 Lisi 22"));JavaRDD<Row> map = javaRDD.map(new Function<String, Row>() {@Overridepublic Row call(String s) throws Exception {String[] s1 = s.split(" ");return RowFactory.create(Integer.parseInt(s1[0] + ""), s1[1], s1[2]);}});List<StructField> structFields = new ArrayList<>();structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));structFields.add(DataTypes.createStructField("age", DataTypes.StringType, true));StructType structType = DataTypes.createStructType(structFields);DataFrame df = sqlContext.createDataFrame(map, structType);Properties properties = new Properties();properties.setProperty("user", "***");properties.setProperty("password", "****");df.write().mode(SaveMode.Append).jdbc("jdbc:mysql://106.15.1xx.xx:3306/study", "person", properties);/*填写参数获取此时数据库的信息*/
//        Map<String,String> map = new HashMap<String,String>();
//        // 参数也可以一个一个的传入
//        map.put("url","jdbc:mysql://106.15.125.75:3306/study");
//        map.put("driver","com.mysql.jdbc.Driver");
//        map.put("user","root");
//        map.put("password","123456");
//        map.put("dbtable","logs");
//
//        DataFrame df = sqlContext.read().format("jdbc").options(map).load();
//        df.show();// 单独录入也可以,但是需要format格式为jdbc格式,然后需要load一下数据才可以被加载位当前的DataFrameDataFrameReader dfReader = sqlContext.read().format("jdbc");dfReader.option("url", "jdbc:mysql://106.15.125.75:3306/study");dfReader.option("driver", "com.mysql.jdbc.Driver");dfReader.option("user", "root");dfReader.option("password", "123456");dfReader.option("dbtable", "logs");//        DataFrame df2 = dfReader.load();
//        df2.show();
//
//        df2.registerTempTable("logs");
//        DataFrame sql = sqlContext.sql("select * from logs");
//        sql.show();
//
//        // 将结果写回mysql中
//        Properties properties = new Properties();
//        properties.setProperty("user", "root");
//        properties.setProperty("password", "123456");
//        // 如果自己认为构造一张表出来的话,还是可以直接放入到其中实现的
//        df2.write().mode(SaveMode.Append).jdbc("jdbc:mysql://106.15.125.75:3306/study", "logs", properties);System.out.println("****finished****");sc.stop();}

6. 小结

数据的处理主要是通过RDD来实现的,主要是一些算子在其中主导数据的处理,但是输入的数据要想转换为RDD可处理,需要先转换为DataFrame,然后再转化为RDD,此时可以通过Transformations和Actions算子来处理即可。

Keep thinking, keep coding! 2020-9-24 南京

Spark创建DataFrame相关推荐

  1. Spark创建DataFrame的三种方法

    跟关系数据库的表(Table)一样,DataFrame是Spark中对带模式(schema)行列数据的抽象.DateFrame广泛应用于使用SQL处理大数据的各种场景.创建DataFrame有很多种方 ...

  2. Spark _24 _读取JDBC中的数据创建DataFrame/DataSet(MySql为例)(三)

    两种方式创建DataSet 现在数据库中创建表不能给插入少量数据. javaapi: package SparkSql;import org.apache.spark.SparkConf; impor ...

  3. Spark _22 _创建DataFrame的几种方式(一)

    创建DataFrame的几种方式 读取json格式的文件创建DataFrame 注意: json文件中的json数据不能嵌套json格式数据. DataFrame是一个一个Row类型的RDD,df.r ...

  4. Spark创建空的DataFrame

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家.点击跳转到网站:https://www.captainai.net/dongkelun 前言 本文主要给出Spark创 ...

  5. dataframe 排序_疯狂Spark之DataFrame创建方式详解一(九)

    创建DataFrame的几种方式 1.读取json格式的文件创建DataFrame 注意: 1. json文件中的json数据不能嵌套json格式数据. 2. DataFrame是一个一个Row类型的 ...

  6. spark1.4加载mysql数据 创建Dataframe及join操作连接方法问题

    首先我们使用新的API方法连接mysql加载数据 创建DF import org.apache.spark.sql.DataFrame import org.apache.spark.{SparkCo ...

  7. python spark dataframe_pyspark dataframe 常用操作

    spark dataframe派生于RDD类,但是提供了非常强大的数据操作功能.当然主要对类SQL的支持. 在实际工作中会遇到这样的情况,主要是会进行两个数据集的筛选.合并,重新入库. 首先加载数据集 ...

  8. 向Spark的DataFrame增加一列数据

    前言 先说个题外话,如何给hive表增加一个列,并且该把该列的所有字段设为'China'? 如果仅仅是增加一列倒是很简单: alter table test add columns(flag stri ...

  9. SparkSQL 创建 DataFrame 的方式

    1.读取 json 格式的文件创建 DataFrame 注意: 可以两种方式读取 json 格式的文件. df.show()默认显示前 20 行数据. DataFrame 原生 API 可以操作 Da ...

最新文章

  1. 激光雷达数据到云cloud
  2. r语言 bsda包_使用R语言creditmodel包进行Vintage分析或留存率分析
  3. R语言导入、读取网络CSV数据(Read a CSV from a URL)实战:R原生read_csv、readr包、data.table
  4. Python(四)字符串
  5. 前端学Markdown
  6. 亿级别记录的mongodb分页查询java代码实现
  7. Spring学习7之自动装配Bean03
  8. asp.net的dropDownlist只显示第一个字
  9. 第一年的要求 工程系的研究生
  10. 【编程】二叉搜索树的定义
  11. Gis 热点技术分析
  12. L1-038. 新世界
  13. 互联网晚报 | 8月21日 星期六 | 中国电信正式在A股上市;呷哺呷哺将关闭200家亏损门店;个人信息保护法表决通过...
  14. python sanic_Sanic框架安装与简单入门示例
  15. spring interceptor 拦截方法,判断用户是否存在
  16. linux中下载python_linux下python安装
  17. 微信群网址活码在线生成系统源码 二维码活码生成
  18. linux使用USB转串口驱动设置
  19. 基因型填充中的phasing究竟是什么
  20. China Mobile”、“China Unicom”,请修改为中国移动、中国联通

热门文章

  1. 软件工程师的核心竞争力是什么-笔记
  2. Ubuntu下的kdbg安装和VSCode安装与简单使用
  3. MODE-CSR相关
  4. 微信公众号开发智能硬件MP后台注册篇
  5. 但愿能带给你们一丝丝的温暖
  6. attiny13a程序实例_关于ATtiny13A的程序
  7. ps色阶怎么用:一招搞定曝光调整 | 萧蕊冰
  8. C++图像处理 -- 图像色阶调整
  9. 图灵杯 蔡老板的会议
  10. 2020前端面试专题整理