Spark 1.5.x版本引入的内置函数在Spark 1.5.x版本,增加了一系列内置函数到DataFrame API中,并且实现了code-generation的优化。与普通的函数不同,DataFrame的函数并不会执行后立即返回一个结果值,而是返回一个Column对象,用于在并行作业中进行求值。Column可以用在DataFrame的操作之中,比如select,filter,groupBy等。函数的输入值,也可以是Column。种类函数聚合函数approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct

集合函数array_contains, explode, size, sort_array

日期/时间函数日期时间转换 unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp 从日期时间中提取字段 year, month, dayofmonth, hour, minute, second

日期/时间函数日期/时间计算 datediff, date_add, date_sub, add_months, last_day, next_day, months_between 获取当前时间等 current_date, current_timestamp, trunc, date_format

数学函数abs, acros, asin, atan, atan2, bin, cbrt, ceil, conv, cos, sosh, exp, expm1, factorial, floor, hex, hypot, log, log10, log1p, log2, pmod, pow, rint, round, shiftLeft, shiftRight, shiftRightUnsigned, signum, sin, sinh, sqrt, tan, tanh, toDegrees, toRadians, unhex

混合函数array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, isNaN, isnotnull, isnull, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when

字符串函数ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper

窗口函数cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber

案例实战:根据每天的用户访问日志和用户购买日志,统计每日的uv和销售额

Scala版本countDistinct代码package cn.spark.study.sql

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.sql.SQLContext

import org.apache.spark.sql.Row

import org.apache.spark.sql.types.StructType

import org.apache.spark.sql.types.StructField

import org.apache.spark.sql.types.StringType

import org.apache.spark.sql.types.IntegerType

import org.apache.spark.sql.functions._

object DailyUV {

def main(args: Array[String]){

val conf = new SparkConf()

.setMaster("local")

.setAppName("DailyUV")

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

// 这里着重说明一下!!!

// 要使用Spark SQL的内置函数,就必须在这里导入SQLContext下的隐式转换

import sqlContext.implicits._

// 构造用户访问日志数据,并创建DataFrame

// 模拟用户访问日志,日志用逗号隔开,第一列是日期,第二列是用户id

val userAccessLog = Array(

"2015-10-01,1122",

"2015-10-01,1122",

"2015-10-01,1123",

"2015-10-01,1124",

"2015-10-01,1124",

"2015-10-02,1122",

"2015-10-02,1121",

"2015-10-02,1123",

"2015-10-02,1123");

val userAccessLogRDD = sc.parallelize(userAccessLog, 5)

// 将模拟出来的用户访问日志RDD,转换为DataFrame

// 首先,将普通的RDD,转换为元素为Row的RDD

val userAccessLogRowRDD = userAccessLogRDD

.map{ log => Row(log.split(",")(0), log.split(",")(1).toInt)}

// 构造DataFrame的元数据

val structType = StructType(Array(

StructField("date", StringType, true),

StructField("userid", IntegerType, true)))

// 使用SQLContext创建DataFrame

val userAccessLogRowDF = sqlContext.createDataFrame(userAccessLogRowRDD, structType)

// 这里讲解一下uv的基本含义和业务

// 每天都有很多用户来访问,但是每个用户可能每天都会访问很多次

// 所以,uv,指的是,对用户进行去重以后的访问总数

// 这里,正式开始使用Spark 1.5.x版本提供的最新特性,内置函数,countDistinct

// 讲解一下聚合函数的用法

// 首先,对DataFrame调用groupBy()方法,对某一列进行分组

// 然后,调用agg()方法 ,第一个参数,必须,必须,传入之前在groupBy()方法中出现的字段

// 第二个参数,传入countDistinct、sum、first等,Spark提供的内置函数

// 内置函数中,传入的参数,也是用单引号作为前缀的,其他的字段

userAccessLogRowDF.groupBy("date")

.agg('date, countDistinct('userid))

.map { row => Row(row(1), row(2)) }

.collect()

.foreach(println)

}

}

Scala版本sum代码package cn.spark.study.sql

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.sql.SQLContext

import org.apache.spark.sql.Row

import org.apache.spark.sql.types.StructType

import org.apache.spark.sql.types.StructField

import org.apache.spark.sql.types.StringType

import org.apache.spark.sql.types.DoubleType

import org.apache.spark.sql.functions._

object DailySale {

def main(args: Array[String]){

val conf = new SparkConf()

.setMaster("local")

.setAppName("DailyUV")

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

// 模拟数据

val userSaleLog = Array("2015-10-01,55.05,1122",

"2015-10-01,23.15,1133",

"2015-10-01,15.20,",

"2015-10-02,56.05,1144",

"2015-10-02,78.87,1155",

"2015-10-02,113.02,1123")

val userSaleLogRDD = sc.parallelize(userSaleLog, 5)

// 进行有效销售日志的过滤

val filteredUserSaleLogRDD = userSaleLogRDD

.filter {log =>if (log.split(",").length == 3) true else false }

val userSaleLogRowRDD = filteredUserSaleLogRDD

.map { log => Row(log.split(",")(0), log.split(",")(1).toDouble) }

val structType = StructType(Array(

StructField("date", StringType, true),

StructField("sale_amount", DoubleType, true)))

val userSaleLogDF = sqlContext.createDataFrame(userSaleLogRowRDD, structType)

userSaleLogDF.groupBy("date")

.agg('date, sum('sale_amount))

.map{ row => Row(row(1),row(2)) }

.collect()

.foreach(println)

}

}

开窗函数案例:统计每个种类的销售额排名前3的产品

java版本代码package cn.spark.study.sql;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.sql.DataFrame;

import org.apache.spark.sql.hive.HiveContext;

public class RowNumberWindowFunction {

@SuppressWarnings("deprecation")

public static void main(String[] arg){

SparkConf conf = new SparkConf()

.setAppName("JDBCDataSource");

JavaSparkContext sc = new JavaSparkContext(conf);

HiveContext hiveContext = new HiveContext(sc.sc());

// 创建销售额表,sales表

hiveContext.sql("DROP TABLE IF EXISTS sales");

hiveContext.sql("CREATE TABLE IF NOT EXISTS sales ("

+ "product STRING,"

+ "category STRING,"

+ "revenue BIGINT)");

hiveContext.sql("LOAD DATA "

+ "LOCAL INPATH '/usr/local/spark-study/resources/sales.txt' "

+ "INTO TABLE sales");

// 开始编写我们的统计逻辑,使用row_number()开窗函数

// 先说明一下,row_number()开窗函数的作用

// 其实,就是给每个分组的数据,按照其排序顺序,打上一个分组内的行号

// 比如说,有一个分组date=20151001,里面有3条数据,1122,1121,1124,

// 那么对这个分组的每一行使用row_number()开窗函数以后,三行,依次会获得一个组内的行号

// 行号从1开始递增,比如1122 1,1121 2,1124 3

DataFrame top3SalesDF = hiveContext.sql(""

+ "SELECT product, category, revenue "

+ "FROM ("

+ "SELECT "

+ "Product,"

+ "category,"

+ "revenue,"

// row_number()开窗函数的语法说明

// 首先可以,在SELECT查询时,使用row_number()函数

// 其次,row_number()函数后面先跟上OVER关键字

// 然后括号中,是PARTITION BY,也就是说根据哪个字段进行分组

// 其次是可以用ORDER BY进行组内排序

// 然后row_number()就可以给每个组内的行,一个组内行号

+ "row_number() OVER (PARTITION BY category ORDER BY revenue DESC) rank "

+ "FROM sales "

+ ") tmp_sales "

+ "WHERE rank<=3");

// 将每组排名前3的数据,保存到一个表中

hiveContext.sql("DROP TABLE IF EXISTS top3_sales");

top3SalesDF.saveAsTable("top3_sales");

sc.close();

}

}

spark sql uv_内置函数_SparkSQL学习 - 编程那点事相关推荐

  1. python内置函数open_Python学习教程:Python内置函数大总结(下篇)

    这里接着上次的 Python学习教程,给大家总结了Python 剩下的33个内置函数. 31 hash() 返回对象的哈希值 In [112]: hash(xiaoming)Out[112]: 613 ...

  2. python 内置函数 builtins_python学习笔记(七)——内置函数

    builtins.py模块,是python的内建模块,在运行时会自动导入该模块.在该模块中定义了很多我们常用的内置函数,比如print,input 等. 在 builtins.py 模块中给出如下注释 ...

  3. Python的内置函数的学习笔记

    1 致谢 感谢陈志兴老师耐心的讲解与帮助! 2 前言 Python里面有一些内置函数,有时候看代码的时候会看不懂,这里记录一下- 3 Python中的内置函数 3.1 getattr() 通过名字找到 ...

  4. 递归函数与内置函数和函数式编程

    递归函数: 定义:在函数内部,可以调用其他函数.如果一个函数在内部调用自身, 这个函数就是递归函数. 实例1(阶乘): 在这里插入代码片def factorial(n):result=nfor i i ...

  5. Sql Server内置函数实现MD5加密

    实例 MD5加密"123456": HashBytes('MD5','123456') 结果:0xE10ADC3949BA59ABBE56E057F20F883E (提示:看完最后 ...

  6. 总结Sql Server内置函数实现MD5加密

    --MD5加密 --HashBytes ('加密方式', '待加密的值') --加密方式= MD2 | MD4 | MD5 | SHA | SHA1 --返回值类型:varbinary(maximum ...

  7. SparkSQL内置函数

    使用Spark SQL中的内置函数对数据进行分析,Spark SQL API不同的是,DataFrame中的内置函数操作的结果是返回一个Column对象,而DataFrame天生就是"A d ...

  8. Hive学习之路(四):Hive内置函数介绍与实现WordCount

    内容简介 一.Hive内置函数介绍 二.Hive常用内置函数介绍 1.数值计算函数 2.字符串操作函数 3.日期函数 4.聚合函数 5.表生成函数 三.使用Hive函数完成WordCount 1.创建 ...

  9. SQL常用的内置函数

    SQL函数基本概念 函数通常分为内置函数和自定义函数,函数的作用是将经常使用的代码封装起来,需要的时候直接调用,能提高代码效率和可维护性 SQL中的函数一般是在数据上执行的,可以很方便的转换和处理数据 ...

最新文章

  1. apache2配置和使用
  2. vue-cli + lib-flexible + px2rem实现px自动转化为rem
  3. JavaScript设计模式之发布-订阅模式(观察者模式)-Part1
  4. android 串口通信_使用UART与ZYBO进行通信常用外设设计方案
  5. 我最喜欢的Java高级开发人员书籍
  6. signal(SIGPIPE, SIG_IGN);
  7. 用jenkins创建节点
  8. 区域转换为二值图像_零基础一文读懂AI深度学习图像识别
  9. java 栈和队列实现迷宫代码_使用两个队列实现一个栈
  10. Python接通图灵机器人
  11. asp网上书店的代码_使用Helm将ASP.NET Core应用程序部署到Kubernetes容器集群
  12. 门店销售系统开发实例
  13. 用注册机注册Keil
  14. 计算机教室验收结果报告模板,课题结题验收总结报告powerpoint演示文稿.ppt
  15. WPS客户端更新日志留着备用
  16. openssl下载与安装
  17. FPGA控制TDC-GPX2时间间隔测量(三)
  18. 【游戏逆向】CS1.6无后坐力基址寻找
  19. JAVA 实现《飞机大战-II》游戏
  20. “熊猫烧香”病毒简介及特征

热门文章

  1. 提高加密程序加密强度的技巧
  2. ASP.NET AJAX1.0尝鲜试用:Web Service调用
  3. 4-1-getOutputStream()或getWriter()发送响应消息体及分析为什么不能同时使用
  4. 本地虚拟机上的docker安装mysql_linux下利用Docker安装mysql的步骤
  5. baq在聊天中啥意思_职场中的“老实人”如何实现逆袭,得到领导的重用?
  6. 在优图网,临摹借鉴设计大咖作品|品图标设计:主要趋势
  7. 圣诞海报设计没有思路,素材技巧都来了!
  8. 老板分分钟要急需应急海报怎么做?PSD分层万能模板帮你解决燃眉之急!
  9. 霓虹促销电商设计,购买欲冲击视觉
  10. oppo手机显示andrOid什么意思,OPPO R17 Pro手机状态栏图标分别代表什么意思?