Spark大数据分析与实战:Spark SQL编程初级实践

一、安装Hadoop和Spark

具体的安装过程在我以前的博客里面有,大家可以通过以下链接进入操作:

Hadoop的安装:https://blog.csdn.net/weixin_47580081/article/details/108647420
Scala及Spark的安装:https://blog.csdn.net/weixin_47580081/article/details/114250894

提示:如果IDEA未构建Spark项目,可以转接到以下的博客:

IDEA使用Maven构建Spark项目:https://blog.csdn.net/weixin_47580081/article/details/115435536

二、启动Hadoop与Spark

查看3个节点的进程

master
slave1

slave2

三、Spark SQL基本操作

将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{“id”:1,“name”:“Ella”,“age”:36}
{“id”:2,“name”:“Bob”,“age”:29}
{“id”:3,“name”:“Jack”,“age”:29}
{“id”:4,“name”:“Jim”,“age”:28}
{“id”:4,“name”:“Jim”,“age”:28}
{“id”:5,“name”:“Damon”}
{“id”:5,“name”:“Damon”}

(1) 查询所有数据;

源代码:

val df = spark.read.json("file:///opt/data/employee.json")df.show()

运行截图:

(2) 查询所有数据,并去除重复的数据;

源代码:

df.distinct().show()

运行截图:

(3) 查询所有数据,打印时去除id字段;

源代码:

df.drop("id").show()

运行截图:

(4) 筛选出age>30的记录;

源代码:

df.filter(df("age")>30).show()

运行截图:

(5) 将数据按age分组;

源代码:

df.groupBy("age").count().show()

运行截图:

(6) 将数据按name升序排列;

源代码:

df.sort(df("name")).show()

运行截图:

(7) 取出前3行数据;

源代码:

df.head(3)

运行截图:

(8) 查询所有记录的name列,并为其取别名为username;

源代码:

df.select(df("name").alias("username")).show()

运行截图:


(9) 查询年龄age的平均值;

源代码:

df.agg("age"->"avg").show()

运行截图:

(10) 查询年龄age的最小值。

源代码:

df.agg("age"->"min").show()

运行截图:

四、编程实现将RDD转换为DataFrame

源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

源代码:

package com.John.Sparkstudy.sqlTest.Test02import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession/**\* @author John\* @Date 2021/4/15 17:44*/object project2 {case class Employee(id:Long,name:String,age:Long)def main(args: Array[String]) {// 配置conf连接val conf = new SparkConf()conf.setMaster("local[3]").setAppName("RDDtoDF")val sc = new SparkContext(conf)// 创建 sparkSessionval spark = SparkSession.builder.getOrCreate// 导入txt文件并转换为dataframeimport spark.implicits._val employeeDF = sc.textFile("D:\\bigdata\\Spark分布式计算框架\\data\\employee.txt").map(_.split(",")).map(attributes => Employee(attributes(0).trim.toInt, attributes(1),attributes(2).trim.toInt)).toDF()// 创建临时视图,并用sparksql命令查询employeeDF.createOrReplaceTempView("employee")var employeeRDD = spark.sql("select id,name,age from employee")// 设置输出格式employeeRDD.map(t=>"id:"+t(0)+"name:"+t(1)+"age:"+t(2)).show()}
}

运行截图:

五、编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表6-2所示的两行数据。

表6-2 employee表 原有数据

id name gender Age
1 Alice F 22
2 John M 25

源代码:

create database sparktest;use sparktest;create table employee(id int(4),name char(50), gender char(20), age int(10));insert into employee values(1,'Alice','F',22);insert into employee values(2,'John','M',25);

运行截图:

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表6-3所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。

表6-3 employee表 新增数据

id name gender age
3 Mary F 26
4 Tom M 23

源代码:

package com.John.Sparkstudy.sqlTest.Test02import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession/**\* @author John\* @Date 2021/4/15 17:25*/object project3 {def main(args: Array[String]): Unit = {// 创建 sparkSessionval spark:SparkSession = SparkSession.builder().appName("mysql_spark").master("local[3]").getOrCreate()// 创建需要插入数据val employeeRDD=spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))val schema=StructType(List(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("gender",StringType,true), StructField("age",IntegerType,true)))val rowRDD=employeeRDD.map(p=>Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))val employeeDF=spark.createDataFrame(rowRDD,schema)// 创建Properties对象,设置连接mysql的用户名和密码,并插入数据val prop=new Properties()prop.put("user","root")prop.put("password","John123456")prop.put("driver","com.mysql.jdbc.Driver")employeeDF.write.mode("append").jdbc("jdbc:mysql://192.168.254.124:3306/sparktest","sparktest.employee",prop)// 读取mysql中的数据val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://192.168.254.124:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable", "employee").option("user", "root").option("password","John123456").load()// 计算数据中age的最大值和总和jdbcDF.agg("age" -> "max", "age" -> "sum").show()}
}

运行截图:

Spark大数据分析与实战:Spark SQL编程初级实践相关推荐

  1. Spark大数据分析与实战:基于Spark MLlib 实现音乐推荐

    Spark大数据分析与实战:基于Spark MLlib 实现音乐推荐 基于Spark MLlib 实现音乐推荐 一.实验背景: 熟悉 Audioscrobbler 数据集 基于该数据集选择合适的 ML ...

  2. 《Spark大数据分析:核心概念、技术及实践》大数据技术一览

    本节书摘来自华章出版社<Spark大数据分析:核心概念.技术及实践>一书中的第1章,第1节,作者穆罕默德·古勒(Mohammed Guller)更多章节内容可以访问云栖社区"华章 ...

  3. 《Spark大数据分析:核心概念、技术及实践》一1.5 NoSQL

    本节书摘来自华章出版社<Spark大数据分析:核心概念.技术及实践>一书中的第1章,第1.5节,作者[美] 穆罕默德·古勒(Mohammed Guller),更多章节内容可以访问云栖社区& ...

  4. 《Spark大数据分析:核心概念、技术及实践》一3.5 API

    本节书摘来自华章出版社<Spark大数据分析:核心概念.技术及实践>一书中的第3章,第3.5节,作者[美] 穆罕默德·古勒(Mohammed Guller),更多章节内容可以访问云栖社区& ...

  5. 《Spark大数据分析:核心概念、技术及实践》一3.6 惰性操作

    本节书摘来自华章出版社<Spark大数据分析:核心概念.技术及实践>一书中的第3章,第3.6节,作者[美] 穆罕默德·古勒(Mohammed Guller),更多章节内容可以访问云栖社区& ...

  6. 实验5 Spark SQL编程初级实践

    今天做实验[Spark SQL 编程初级实践],虽然网上有答案,但在自己的环境下并不能够顺利进行 在第二题中,要求编程实现将 RDD 转换为 DataFrame.根据所谓标准答案,在进行sbt 打包时 ...

  7. Spark大数据分析与实战:Spark Streaming编程初级实践

    Spark Streaming编程初级实践 一.安装Hadoop和Spark 具体的安装过程在我以前的博客里面有,大家可以通过以下链接进入操作: Hadoop的安装:https://blog.csdn ...

  8. Spark SQL 编程初级实践

    1.Spark SQL 基本操作 将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json. { "id":1 , "name&qu ...

  9. 超详攻略!Databricks 数据洞察 - 企业级全托管 Spark 大数据分析平台及案例分析

    简介: 5分钟读懂 Databricks 数据洞察 ~ 更多详细信息可登录 Databricks 数据洞察 产品链接:https://www.aliyun.com/product/bigdata/sp ...

  10. 《Spark大数据分析实战》——1.4节弹性分布式数据集

    本节书摘来自华章社区<Spark大数据分析实战>一书中的第1章,第1.4节弹性分布式数据集,作者高彦杰 倪亚宇,更多章节内容可以访问云栖社区"华章社区"公众号查看 1. ...

最新文章

  1. mfc编辑框显示数据_Excel表格技巧—Excel表格中怎么给数据分等级
  2. CYQ.Data 轻量数据层之路 V2.0 震撼惊世 支持多数据库/内置Aop(二十五)
  3. elasticSearch6源码分析(4)indices模块
  4. python日历下拉框_python日历来计算月份倒退
  5. magento cms page、登錄頁面修改(增加)breadcrumbs
  6. mysql去掉小数点多余0_mysql数据库个性化需求:版本号排序
  7. 源代码:spark-shell解读
  8. 【Flink】Flink WindowOperator大概工作流程
  9. LINUX 循环fork()
  10. Linux Namespace机制简介
  11. 计算机在数据处理方面的论文,数据挖掘论文3000字范文参考(2)
  12. 频率超出范围黑屏Linux,显示器超出频率限制黑屏怎么解决?显示器超出频率限制黑屏解决方法...
  13. MT4跨平台跟单系统(API跟单、EA跟单、NJ4X跟单)的实现方式和技术原理
  14. Windows 95 被做成了一款 app,我们在 MacBook 上体验了它
  15. 【贪心算法】哈夫曼编码问题
  16. 100ask imx6ull开发板移植NXP官方UBOOT
  17. 程序猿怎样变身IT讲师
  18. Fewest Flops
  19. 你是哪类人?愚蠢的五大基本定律
  20. 【HISI系列】海思 IPC hi3516a、hi3519v101 的单包模式和多包模式

热门文章

  1. videojs-dynamic-watermark: video.js 视频添加文字水印
  2. 思维导图与知识树的区别
  3. ADAMS并联机器人动力学仿真【附源文件】
  4. ios版的chrome如何保存网页为PDF
  5. 如何官网下载 IEEE 论文 Latex 和 Word 模板
  6. 工业污染治理投资完成情况分析(2000—2019年)
  7. 基于stc89c58的万年历设计
  8. Arduino: AD模数转换详解和电路搭建以及示例代码
  9. 索尼( A7II)相机刷中文/汉化
  10. ffmpeg:将webm无损转为mp4