SparkSQL中UDAF案例分析

1、统计单词的个数

package com.bynear.spark_sql;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;


public class Spark_UDAF extends UserDefinedAggregateFunction {/**
     * inputSchema指的是输入的数据类型
     *
     * @return
     */
    @Override
    public StructType inputSchema() {ArrayList<StructField> fields = new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("str", DataTypes.StringType, true));
        return DataTypes.createStructType(fields);
    }/**
     * bufferSchema指的是  中间进行聚合时  所处理的数据类型
     *
     * @return
     */
    @Override
    public StructType bufferSchema() {ArrayList<StructField> fields = new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("count", DataTypes.IntegerType, true));
        return DataTypes.createStructType(fields);
    }/**
     * dataType指的是函数返回值的类型
     *
     * @return
     */
    @Override
    public DataType dataType() {return DataTypes.IntegerType;
    }/**
     * 一致性检验,如果为true,那么输入不变的情况下计算的结果也是不变的。
     *
     * @return
     */
    @Override
    public boolean deterministic() {return true;
    }/**
     * 设置聚合中间buffer的初始值,但需要保证这个语义:两个初始buffer调用下面实现的merge方法后也应该为初始buffer
     * 即如果你初始值是1,然后你merge是执行一个相加的动作,两个初始buffer合并之后等于2     * 不会等于初始buffer了。这样的初始值就是有问题的,所以初始值也叫"zero value"
     * 为每个分组的数据执行初始化操作
     *
     * @param buffer
     */
    @Override
    public void initialize(MutableAggregationBuffer buffer) {buffer.update(0, 0);
    }/**
     * 用输入数据input更新buffer,类似于combineByKey
     * 指的是,每个分组,有新的值进来的时候,如何进行分组对应的聚合值的计算
     *
     * @param buffer
     * @param input
     */
    @Override
    public void update(MutableAggregationBuffer buffer, Row input) {buffer.update(0, Integer.valueOf(buffer.getAs(0).toString()) + 1);
    }/**
     * 合并两个buffer,buffer2合并到buffer1.在合并两个分区聚合结果的时候会被用到,类似于reduceByKey
     * 这里要注意该方法没有返回值,在实现的时候是把buffer2合并到buffer1中去,你需要实现这个合并细节
     * 由于spark是分布式的,所以每一分组的数据,可能会在不同的节点上进行局部聚合,就是update
     * 但是 最后一个分组,在各个节点上的聚合值,要进行merge 也就是合并
     *
     * @param buffer1
     * @param buffer2
     */
    @Override
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) {buffer1.update(0, Integer.valueOf(buffer1.getAs(0).toString()) + Integer.valueOf(buffer2.getAs(0).toString()));
    }/**
     * 只的是 一个分组的聚合值,如何通过中间的缓存聚合值,最后返回一个最终的聚合值
     *
     * @param buffer
     * @return
     */
    @Override
    public Object evaluate(Row buffer) {return buffer.getInt(0);
    }
}
package com.bynear.spark_sql;

import com.clearspring.analytics.util.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.Arrays;
import java.util.List;

public class UDAF {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("UDAF").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        List<String> nameList = Arrays.asList("xiaoming", "xiaoming", "刘德华","古天乐","feifei", "feifei", "feifei", "katong");
        //转换为javaRDD
        JavaRDD<String> nameRDD = sc.parallelize(nameList, 3);
        //转换为JavaRDD<Row>
        JavaRDD<Row> nameRowRDD = nameRDD.map(new Function<String, Row>() {public Row call(String name) throws Exception {return RowFactory.create(name);
            }});
        List<StructField> fields = Lists.newArrayList();
        fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        StructType structType = DataTypes.createStructType(fields);
        DataFrame namesDF = sqlContext.createDataFrame(nameRowRDD, structType);
        namesDF.registerTempTable("names");
        sqlContext.udf().register("countString", new Spark_UDAF());
        sqlContext.sql("select name,countString(name) as count  from names group by name").show();
        List<Row> rows = sqlContext.sql("select name,countString(name) as count  from names group by name").javaRDD().collect();
        for (Row row : rows) {System.out.println(row);
        }}
}

运行结果:

+--------+-----+
|    name|count|
+--------+-----+
|  feifei|    3|
|xiaoming|    2|
|     刘德华|    1|
|  katong|    1|
|     古天乐|    1|
+--------+-----+

2、统计某品牌价格的平均值

package com.bynear.spark_sql;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;

public class MyUDAF extends UserDefinedAggregateFunction {private StructType inputSchema;
    private StructType bufferSchema;

    public MyUDAF() {ArrayList<StructField> inputFields = new ArrayList<StructField>();
        inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.DoubleType, true));
        inputSchema = DataTypes.createStructType(inputFields);

        ArrayList<StructField> bufferFields = new ArrayList<StructField>();
        bufferFields.add(DataTypes.createStructField("sum", DataTypes.DoubleType, true));
        bufferFields.add(DataTypes.createStructField("count", DataTypes.DoubleType, true));
        bufferSchema = DataTypes.createStructType(bufferFields);
    }@Override
    public StructType inputSchema() {return inputSchema;
    }@Override
    public StructType bufferSchema() {return bufferSchema;
    }@Override
    public DataType dataType() {return DataTypes.DoubleType;
    }@Override
    public boolean deterministic() {return true;
    }@Override
    public void initialize(MutableAggregationBuffer buffer) {
//        缓存区两个分组  分组编号为0 求和sum   初始化值为0
//                     分组编号为1 求count   初始化值为0
        buffer.update(0, 0.0);
        buffer.update(1, 0.0);
    }@Override
    public void update(MutableAggregationBuffer buffer, Row input) {//如果input的索引值为0的值不为0
        if (!input.isNullAt(0)) {
//            两个分组分别进行更新数据!分组编号0  求和sum  缓存区的值 +  输入放入值
            double updatesum = buffer.getDouble(0) + input.getDouble(0);
//                                 分组编号1  求count  缓存区的个数 + 1
            double updatecount = buffer.getDouble(1) + 1;
            buffer.update(0, updatesum);
            buffer.update(1, updatecount);
        }}@Override
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) {double metgesum = buffer1.getDouble(0) + buffer2.getDouble(0);
        double mergecount = buffer1.getDouble(1) + buffer2.getDouble(1);
        buffer1.update(0, metgesum);
        buffer1.update(1, mergecount);
    }@Override
    public Object evaluate(Row buffer) {return buffer.getDouble(0) / buffer.getDouble(1);
    }
}
package com.bynear.spark_sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.math.BigDecimal;
import java.util.ArrayList;

public class MyUDAF_SQL {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("myUDAF").setMaster("local");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(jsc);
        JavaRDD<String> lines = jsc.textFile("C://Users//Administrator//Desktop//fastJSon//sales.txt");
        JavaRDD<Row> map = lines.map(new Function<String, Row>() {@Override
            public Row call(String line) throws Exception {String[] Linesplit = line.split(",");
                return RowFactory.create(String.valueOf(Linesplit[0]), Double.valueOf(Linesplit[1]));
            }});
        ArrayList<StructField> fields = new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("salary", DataTypes.DoubleType, true));
        StructType structType = DataTypes.createStructType(fields);
        DataFrame df = sqlContext.createDataFrame(map, structType);
        sqlContext.udf().register("myAverage", new MyUDAF());
        df.registerTempTable("zjs_table");

        df.show();

        sqlContext.udf().register("twoDecimal", new UDF1<Double, Double>() {@Override
            public Double call(Double in) throws Exception {BigDecimal b = new BigDecimal(in);
                double res = b.setScale(2, BigDecimal.ROUND_HALF_DOWN).doubleValue();
                return res;
            }}, DataTypes.DoubleType);

        DataFrame resultDF = sqlContext.sql("select name,twoDecimal(myAverage(salary)) as 平均值 from zjs_table group by name ");
        resultDF.show();

    }
}

文本:

三星,1542
三星,1548
三星,8456
三星,8866
中兴,1856
中兴,1752
苹果,1500
苹果,2500
苹果,3500
苹果,4500
苹果,5500

运行结果:

+----+-------+
|name| salary|
+----+-------+
|  三星|12345.0|
|  三星| 4521.0|
|  三星| 7895.0|
|  华为| 5421.0|
|  华为| 4521.0|
|  华为| 5648.0|
|  苹果|12548.0|
|  苹果| 7856.0|
|  苹果|45217.0|
|  苹果|89654.0|
+----+-------+

+----+--------+
|name|     平均值|
+----+--------+
|  三星| 8253.67|
|  华为| 5196.67|
|  苹果|38818.75|
+----+--------+

注意点:文本的编码格式,以及Java代码中DataTypes.DoubleType。。。。

SparkSQL中UDAF案例分析相关推荐

  1. 【关于NI CAN USB-8473在实际应用中的案例分析】

    [NI CAN USB-8473在实际应用中的案例分析] NI CAN USB-8473是国际上比较先进的一种控制器区域网络(Controller Area Network,CAN)总线适配器,可实现 ...

  2. [大话IT]圈套玄机—《圈子圈套》中的案例分析

    [@2006-02-22 00:20:14] Filter-pattern:$_(楼主) ~supernal_pig~ 地址:http://www12.tianya.cn/new/Publicforu ...

  3. 软考 案例分析__预测

    在软考中, 案例分析是整个考试的试金石 现在有一种这样趋势, 上次的论文或者去年的论文, 题目类型,在这次拿到案例分析中来, 因为论文的题目一旦上次考过, 这次又考的概率极小, 有是有这种可能(例如2 ...

  4. html中放大镜案列,Canvas实现放大镜效果完整案例分析(附代码)

    本文主要记录 canvas 在图像.文字处理.离屏技术和放大镜特效的实现过程中使用到的api.先看下效果吧: 一张模糊的图片: 鼠标点击任意位置,产生放大效果: 哇塞~ 一个帅哥,哈哈哈哈~ 1.放大 ...

  5. 信息系统项目管理师-案例分析专题(二)案例中常见问题找茬笔记

    场景 信息系统项目管理师-案例分析专题(一)案例简介.答题方法.要点笔记: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/1181 ...

  6. 产品设计美学案例分析_美学在产品设计中的重要性

    产品设计美学案例分析 重点 (Top highlight) In one of my previous jobs, I had really interesting debates with the ...

  7. php事件编程,PHP相应button中onclick事件的案例分析

    PHP相应button中onclick事件的案例分析 发布时间:2020-11-10 11:28:31 来源:亿速云 阅读:71 作者:小新 小编给大家分享一下PHP相应button中onclick事 ...

  8. 白盒测试中的六种覆盖方法及案例分析

    语句覆盖是指选择足够的测试用例,使得运行这些测试用例时,被测程序的每一个语句至少执行一次,其覆盖标准无法发现判定中逻辑运算的错误: 判定覆盖<又叫分支覆盖率>是指选择足够的测试用例,使得运 ...

  9. 计算机有效教学案例分析,中职计算机应用基础有效教学案例分析.doc

    中职计算机应用基础有效教学案例分析.doc (7页) 本资源提供全文预览,点击全文预览即可全文预览,如果喜欢文档就下载吧,查找使用更方便哦! 9.90 积分 俩瘫直词琼下矫焉原温沽己动府罚斗寒详沛凡 ...

最新文章

  1. java.lang.NoSuchMethodError: org.junit.runner.Description.getClassName()Ljava/lang/String;
  2. java jdbc脚本_关于java:使用MySQL和JDBC运行.sql脚本
  3. 链表之打印两个有序链表的公共部分
  4. 飞行摇杆设置_HORI皇牌空战7最新飞行摇杆抢先开箱 设计出色布局合理
  5. Altium Designer20原理图库放置引脚报错解决方案
  6. 大数据元数据管理系统功能有哪些
  7. Apache Hudi 是Uber 大数据存储系统
  8. Delphi - 注入的方式来禁止任务管理器
  9. MyEclipse 8.6.1下载|MyEclipse 8下载|MyEclipse 8.6.1官网下载
  10. 全民一起玩python实战篇百度云_【全民一起玩python】下载 - 面包树
  11. Java静态代理详解
  12. pyqt5 python qlineedit信号_PyQt5实现QLineEdit添加clicked信号的方法
  13. Duplicate entry '127' for key 'PRIMARY'
  14. python直方图教程_Matplotlib绘制直方图
  15. WQ7033开发指南(音频篇)之3.0 如何配置音频模式切换
  16. 选股方法之各种指标的选股方法分析
  17. idea git rebase ---- 合并多个提交到某个分支(实用)
  18. 用C语言实现简单的猜数字小游戏
  19. 腾讯云数据库 CynosDB应用场景与产品优势有哪些?
  20. js实现520倒计时

热门文章

  1. SMOTE/SMOTEEN 处理不平衡数据集
  2. HDU 6428 Problem C. Calculate(积性函数)
  3. Gym102832K. Ragdoll(CCPC长春)
  4. CF1479D Odd Mineral Resource
  5. CF1548A Web of Lies
  6. Tree UVALive - 8212
  7. [2021.4.7多校省选模拟33]A,B,C
  8. [FWT] 时隔一年再回首FWT(快速沃尔什变换),我终于不再是个门外汉
  9. P3200-[HNOI2009]有趣的数列【卡特兰数】
  10. P7044-[MCOI-03]括号【组合数学】