Spark DataFrames DataSet
Json文件内容:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
-- 加载json文件转换成DataFrames
scala> val df = sqlContext.jsonFile("/spark/json")
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
// 显示
scala> df.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
-- 打印schema
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
-- 按列选择
scala> df.select("age").show
+----+
| age|
+----+
|null|
| 30|
| 19|
+----+
scala> df.select("age","name").show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
-- 对字段值进行操作 age这一列+2
scala> df.select(df("name"),df("age")+2).show
+-------+---------+
| name|(age + 2)|
+-------+---------+
|Michael| null|
| Andy| 32|
| Justin| 21|
+-------+---------+
-- filter过滤器
scala> df.filter(df("age")>20).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
-- 分组函数 ---必须要加聚合函数
scala> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
|null| 1|
| 19| 1|
| 30| 1|
+----+-----+
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create the DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")
// Show the content of the DataFrame
df.show()
// age name
// null Michael
// 30 Andy
// 19 Justin
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show()
// name
// Michael
// Andy
// Justin
// Select everybody, but increment the age by 1
df.select(df("name"), df("age") + 1).show()
// name (age + 1)
// Michael null
// Andy 31
// Justin 20
// Select people older than 21
df.filter(df("age") > 21).show()
// age name
// 30 Andy
// Count people by age
df.groupBy("age").count().show()
// age count
// null 1
// 19 1
// 30 1
-- 读取hive表中的数据转化成DataFrame
scala> val df = sqlContext.sql("select * from t_hdrc_type_month");
scala> df.count
res6: Long = 121144
-- ***********创建RDD----DF和DS互相转化 -----Creating Datasets*****************/
-- DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name.
// Encoders for most common types are automatically provided by importing sqlContext.implicits._
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect() // Returns: Array(2, 3, 4)
// Encoders are also created for case classes.
case class Person(name: String, age: Long)
val ds = Seq(Person("Andy", 32)).toDS()
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name.
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path).as[Person]
-- dataFrame转换成DataSet
scala> sqlContext.read.json("hdfs://suixingpay199:9000/user/app/spark/people.json")
res6: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> res6.as[Person]
res8: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
-- 数据源来源于mysql
val jdbcDF = sqlContext.load("jdbc", Map("url" -> "jdbc:mysql://suixingpay190:3306/azkaban?user=root&password=123", "dbtable" -> "projects"))
scala> jdbcDF.show
+---+--------------+------+-------------+-------------+-------+----------------+--------------+--------+--------------------+
| id| name|active|modified_time| create_time|version|last_modified_by| description|enc_type| settings_blob|
+---+--------------+------+-------------+-------------+-------+----------------+--------------+--------+--------------------+
| 1| test| true|1452602214512|1452599858502| 4| azkaban| test| 2|[31, -117, 8, 0, ...|
| 2| touch| true|1452655867334|1452601523613| 7| azkaban| touch| 2|[31, -117, 8, 0, ...|
| 3|DataWherehouse| false|1452688047362|1452662944078| 34| azkaban|datawherehouse| 2|[31, -117, 8, 0, ...|
| 4|DataWhereHouse| true|1452689575193|1452688067801| 3| azkaban|DataWhereHouse| 2|[31, -117, 8, 0, ...|
| 5| HIveAnalysis| true|1453440935221|1453440922278| 1| baige| HIveAnalysis| 2|[31, -117, 8, 0, ...|
+---+--------------+------+-------------+-------------+-------+----------------+--------------+--------+--------------------+
-- 查询指定列
jdbcDF.select("name","id").show
+--------------+---+
| name| id|
+--------------+---+
| test| 1|
| touch| 2|
|DataWherehouse| 3|
|DataWhereHouse| 4|
| HIveAnalysis| 5|
+--------------+---+
Spark DataFrames DataSet相关推荐
- Spark的Dataset操作
列的选择select 来个例子边看边说: scala> val df = spark.createDataset(Seq( ("aaa", 1, 2), ("bbb ...
- 【Spark】Spark SQL, DataFrames and Datasets Guide(翻译文,持续更新)
本文主要是翻译Spark官网Spark SQL programming guide .只能保证大概意思,尽量保证细节.英文水平有限,如果有错误的地方请指正,轻喷.目录导航在右上角 Spark SQL. ...
- Apache Spark 3.0 SQL DataFrame和DataSet指南
目录 简介 SQL 数据集和数据框 入门 起点:SparkSession Scala语言 Java语言 Python语言 R语言 创建DataFrame Scala语言 Java语言 Python语言 ...
- Spark _25.plus _使用idea读取Hive中的数据加载成DataFrame/DataSet(四)
对Spark _25 _读取Hive中的数据加载成DataFrame/DataSet(四) https://georgedage.blog.csdn.net/article/details/10309 ...
- Spark _25 _读取Hive中的数据加载成DataFrame/DataSet(四)
由于Hive不在本地,操作略显麻烦.不过细心一点,分析错误,也还好,如果你搭建的hadoop是HA,需要多注意: 这里指出一个错误,如果你报了同类错误,可以参考:https://georgedage. ...
- Spark _24 _读取JDBC中的数据创建DataFrame/DataSet(MySql为例)(三)
两种方式创建DataSet 现在数据库中创建表不能给插入少量数据. javaapi: package SparkSql;import org.apache.spark.SparkConf; impor ...
- 快速理解Spark Dataset
1. 前言 RDD.DataFrame.Dataset是Spark三个最重要的概念,RDD和DataFrame两个概念出现的比较早,Dataset相对出现的较晚(1.6版本开始出现),有些开发人员对此 ...
- java spark dataset_Spark 2.0介绍:Dataset介绍和使用
<Spark 2.0技术预览:更容易.更快速.更智能>文章中简单地介绍了 Dataset介绍 Dataset是从Spark 1.6开始引入的一个新的抽象,当时还是处于alpha版本:然而在 ...
- 独家 | 使用Spark进行大规模图形挖掘(附链接)
翻译:陈丹 校对:王雨桐 本文约4700字,建议阅读15分钟 本文为大家介绍了多种图挖掘工具,并运用Spark为大家展示了一个标签传播算法LPA构建图的实例. 本教程分为两部分: 第1部分:无监督学习 ...
- Spark LogisticRegression 逻辑回归之建模
导入包 import org.apache.spark.sql.SparkSession import org.apache.spark.sql.Dataset import org.apache.s ...
最新文章
- OpenStack Keystone架构一:Keystone基础
- 优化Linux系统中的服务
- python数据分析pandas_Python数据分析之 pandas汇总和计算描述统计
- ORM框架-工具-产品开发之四 开发代码生成器 Template Studio Development (一)
- flask 写数据mysql_flask 创建数据提交到mysql中的方式
- linux服务器拓扑图,Linux服务器作为网关的DNS分离解析服务(CentOS 7版本)
- python图像识别代码_python图像识别--验证码
- 字节跳动联邦学习平台通过中国信通院首批评估 数据安全能力得到专业验证
- Oracle跨数据库操作
- HDU Today--hdu2112
- Win10系统中英文切换
- 【计算机组成原理】定点乘法运算之补码一位乘法(Booth算法)(对初学者的步骤详解)
- Activity的概述
- 量化基金投资之套利策略简介
- 解决达梦数据库如何查找表字段名。
- ChatGPT 是传统搜索引擎的终结?——Web3 创新 | Is ChatGPT The End Of Traditional Search Engines—Web3 Innovation
- PROFINET的GSD文件描述
- 企业信息管理系统(4)_用户查询
- 【高等数学】抛物线图像的画法
- AXI总线之AXI-LITE总线分析与实现