分享两篇文章,结合看更清楚一点。

背景

假设有一个学生各门课的成绩的表单,应用hive取出每科成绩前100名的学生成绩。

这个就是典型在分组取Top N的需求。

解决思路

对于取出每科成绩前100名的学生成绩,针对学生成绩表,根据学科,成绩做order by排序,然后对排序后的成绩,执行自定义函数row_number(),必须带一个或者多个列参数,如ROW_NUMBER(col1, ....),它的作用是按指定的列进行分组生成行序列。在ROW_NUMBER(a,b) 时,若两条记录的a,b列相同,则行序列+1,否则重新计数。

只要返回row_number()返回值小于100的的成绩记录,就可以返回每个单科成绩前一百的学生。

解决过程

成绩表结构

create table score_table (subject        string,student       string,score           int
)
partitioned by (date string)

如果要查询2012年每科成绩前100的学生成绩,sql如下

create temporary function row_number as 'com.blue.hive.udf.RowNumber';
select subject,score,student from(select subject,score,student from score where dt='2012'  order by subject,socre desc) order_score
where row_number(subject) <= 100;

com.blue.hive.udf.RowNumber是自定义函数,函数的作用是按指定的列进行分组生成行序列。这里根据每个科目的所有成绩,生成序列,序列值从1开始自增。

假设成绩表的记录如下:

物理  80 张三
数学  100 李一
物理  90  张二
数学  90  李二
物理  100 张一
数学  80  李三.....

经过order by全局排序后,记录如下

物理  100 张一
物理  90  张二
物理  80 张三.....
数学  100 李一
数学  90  李二
数学  80  李三....

接着执行row_number函数,返回值如下

科目  成绩 学生   row_number
物理  100 张一      1
物理  90  张二      2
物理  80  张三      3
.....
数学  100 李一      1
数学  90  李二      2
数学  80  李三      3
....

因为hive是基于MAPREADUCE的,必须保证row_number执行是在reducer中执行。上述的语句保证了成绩表的记录,按照科目和成绩做了全局排序,然后在reducer端执行row_number函数,如果在map端执行了row_number,那么结果将是错误的。

要查看row_number函数在map端还是reducer端执行,可以查看hive的执行计划:

create temporary function row_number as 'com.blue.hive.udf.RowNumber';
explain select subject,score,student from(select subject,score,student from score where dt='2012'  order by subject,socre desc) order_score
where row_number(subject) <= 100;

explain不会执行mapreduce计算,只会显示执行计划。

只要row_number函数在reducer端执行,除了使用order by全局排序配合,也可以使用distribute by + sort by。distribute by可以让相同科目的成绩记录发送到同一个reducer,而sort by可以在reducer端对记录做排序。

而使用order by全局排序,只有一个reducer,未能充分利用资源,相比之下,distribute by + sort by在这里更有性能优势,可以在多个reducer做排序,再做row_number的计算。

sql如下:

create temporary function row_number as 'com.blue.hive.udf.RowNumber';
select subject,score,student from(select subject,score,student from score where dt='2012'  distribute by subject sort by subject asc, socre desc) order_score
where row_number(subject) <= 100;

如果成绩有学院字段college,要找出学院里,单科成绩前一百的学生,解决方法如下:

create temporary function row_number as 'com.blue.hive.udf.RowNumber';
explain select college,subject,score,student from(select college,subject,score,student from score where dt='2012'  order by college asc,subject asc,socre desc) order_score
where row_number(college,subject) <= 100;

如果成绩有学院字段college,要找出学院里,总成绩前一百的学生,解决方法如下:

create temporary function row_number as 'com.blue.hive.udf.RowNumber';
explain select college,totalscore,student from(select college,student,sum(score) as totalscore from score where dt='2012'  group by college,student  order by college asc,totalscore desc) order_score
where row_number(college) <= 100;

row_number的源码

函数row_number(),必须带一个或者多个列参数,如ROW_NUMBER(col1, ....),它的作用是按指定的列进行分组生成行序列。在ROW_NUMBER(a,b) 时,若两条记录的a,b列相同,则行序列+1,否则重新计数。

package com.blue.hive.udf;import org.apache.hadoop.hive.ql.exec.UDF;public class RowNumber extends UDF {private static int MAX_VALUE = 50;private static String comparedColumn[] = new String[MAX_VALUE];private static int rowNum = 1;public int evaluate(Object... args) {String columnValue[] = new String[args.length];for (int i = 0; i < args.length; i++) 『columnValue[i] = args[i].toString();}if (rowNum == 1) {for (int i = 0; i < columnValue.length; i++)comparedColumn[i] = columnValue[i];}for (int i = 0; i < columnValue.length; i++) {if (!comparedColumn[i].equals(columnValue[i])) {for (int j = 0; j < columnValue.length; j++) {comparedColumn[j] = columnValue[j];}rowNum = 1;return rowNum++;}}return rowNum++;}
}

编译后,打包成一个jar包,如/usr/local/hive/udf/blueudf.jar

然后在hive shell下使用,如下:

add jar /usr/local/hive/udf/blueudf.jar;
create temporary function row_number as 'com.blue.hive.udf.RowNumber';
select subject,score,student from(select subject,score,student from score where dt='2012'  order by subject,socre desc) order_score
where row_number(subject) <= 100;

hive 0.12之前可用,0.12之后不可用,只能用窗口函数替代。

参考 http://chiyx.iteye.com/blog/1559460

-----------------------------------------分割线-----------------------------------------------------

问题:

有如下数据文件 city.txt (id, city, value)

cat city.txt 
1 wh 500
2 bj 600
3 wh 100
4 sh 400
5 wh 200
6 bj 100
7 sh 200
8 bj 300
9 sh 900
需要按 city 分组聚合,然后从每组数据中取出前两条value最大的记录。

1、这是实际业务中经常会遇到的 group TopK 问题,下面来看看 pig 如何解决:

?
1
2
3
4
5
a = load '/data/city.txt'  using PigStorage(' ') as (id:chararray, city:chararray, value:int);
b = group a by city;
c = foreach b {c1=order a by value desc; c2=limit c1 2; generate group,c2.value;};
d = stream c through `sed 's/[(){}]//g'`;
dump d;

结果:

?
1
2
3
(bj,600,300)
(sh,900,400)
(wh,500,200)

这几行代码其实也实现了mysql中的 group_concat 函数的功能:

?
1
2
3
4
5
a = load '/data/city.txt'  using PigStorage(' ') as (id:chararray, city:chararray, value:int);
b = group a by city;
c = foreach b {c1=order a by value desc;  generate group,c1.value;};
d = stream c through `sed 's/[(){}]//g'`;
dump d;

结果:

?
1
2
3
(bj,600,300,100)
(sh,900,400,200)
(wh,500,200,100)

2、下面我们再来看看hive如何处理group topk的问题:

本质上HSQL和sql有很多相同的地方,但HSQL目前功能还有很多缺失,至少不如原生态的SQL功能强大,

比起PIG也有些差距,如果SQL中这类分组topk的问题如何解决呢?

?
1
2
3
select * from city a where
2>(select count(1) from city where cname=a.cname and value>a.value)
distribute by a.cname sort by a.cname,a.value desc;

http://my.oschina.net/leejun2005/blog/78904

但是这种写法在HQL中直接报语法错误了,下面我们只能用hive udf的思路来解决了:

排序city和value,然后对city计数,最后where过滤掉city列计数器大于k的行即可。

好了,上代码:

(1)定义UDF:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.example.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
      
public final class Rank extends UDF{
    private int  counter;
    private String last_key;
    public int evaluate(final String key){
      if ( !key.equalsIgnoreCase(this.last_key) ) {
         this.counter = 0;
         this.last_key = key;
      }
      return this.counter++;
    }
}

(2)注册jar、建表、导数据,查询:

?
1
2
3
4
5
6
7
8
9
add jar Rank.jar;
create temporary function rank as 'com.example.hive.udf.Rank';
create table city(id int,cname string,value int) row format delimited fields terminated by ' ';
LOAD DATA LOCAL INPATH 'city.txt' OVERWRITE INTO TABLE city;
select cname, value from (
    select cname,rank(cname) csum,value from (
        select id, cname, value from city distribute by cname sort by cname,value desc
    )a
)b where csum < 2;

(3)结果:

?
1
2
3
4
5
6
bj  600
bj  300
sh  900
sh  400
wh  500
wh  200

可以看到,hive相比pig来说,处理起来稍微复杂了点,但随着hive的日渐完善,以后比pig更简洁也说不定。

REF:hive中分组取前N个值的实现

http://baiyunl.iteye.com/blog/1466343

3、最后我们来看一下原生态的MR:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import java.io.IOException;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class GroupTopK {
    // 这个 MR 将会取得每组年龄中 id 最大的前 3 个
    // 测试数据由脚本生成:http://my.oschina.net/leejun2005/blog/76631
    public static class GroupTopKMapper extends
            Mapper<LongWritable, Text, IntWritable, LongWritable> {
        IntWritable outKey = new IntWritable();
        LongWritable outValue = new LongWritable();
        String[] valArr = null;
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            valArr = value.toString().split("\t");
            outKey.set(Integer.parseInt(valArr[2]));// age int
            outValue.set(Long.parseLong(valArr[0]));// id long
            context.write(outKey, outValue);
        }
    }
    public static class GroupTopKReducer extends
            Reducer<IntWritable, LongWritable, IntWritable, LongWritable> {
        LongWritable outValue = new LongWritable();
        public void reduce(IntWritable key, Iterable<LongWritable> values,
                Context context) throws IOException, InterruptedException {
            TreeSet<Long> idTreeSet = new TreeSet<Long>();
            for (LongWritable val : values) {
                idTreeSet.add(val.get());
                if (idTreeSet.size() > 3) {
                    idTreeSet.remove(idTreeSet.first());
                }
            }
            for (Long id : idTreeSet) {
                outValue.set(id);
                context.write(key, outValue);
            }
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        System.out.println(otherArgs.length);
        System.out.println(otherArgs[0]);
        System.out.println(otherArgs[1]);
        if (otherArgs.length != 3) {
            System.err.println("Usage: GroupTopK <in> <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "GroupTopK");
        job.setJarByClass(GroupTopK.class);
        job.setMapperClass(GroupTopKMapper.class);
        job.setReducerClass(GroupTopKReducer.class);
        job.setNumReduceTasks(1);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(LongWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

hadoop jar GroupTopK.jar GroupTopK /tmp/decli/record_new.txt /tmp/1

结果:

hadoop fs -cat /tmp/1/part-r-00000
0       12869695
0       12869971
0       12869976
1       12869813
1       12869870
1       12869951

......

数据验证:

awk '$3==0{print $1}' record_new.txt|sort -nr|head -3
12869976
12869971
12869695

可以看到结果没有问题。

注:测试数据由以下脚本生成:

http://my.oschina.net/leejun2005/blog/76631

PS:

如果说hive类似sql的话,那pig就类似plsql存储过程了:程序编写更自由,逻辑能处理的更强大了。

pig中还能直接通过反射调用java的静态类中的方法,这块内容请参考之前的相关pig博文。

附几个HIVE UDAF链接,有兴趣的同学自己看下:

Hive UDAF和UDTF实现group by后获取top值 http://blog.csdn.net/liuzhoulong/article/details/7789183
hive中自定义函数(UDAF)实现多行字符串拼接为一行 http://blog.sina.com.cn/s/blog_6ff05a2c0100tjw4.html
编写Hive UDAF http://www.fuzhijie.me/?p=118
Hive UDAF开发 http://richiehu.blog.51cto.com/2093113/386113

转载于:https://www.cnblogs.com/LeeZz/p/4725868.html

Hive中分组取前N个值相关推荐

  1. hive中分组取前N个值的实现

    背景 假设有一个学生各门课的成绩的表单,应用hive取出每科成绩前100名的学生成绩. 这个就是典型在分组取Top N的需求. 解决思路 对于取出每科成绩前100名的学生成绩,针对学生成绩表,根据学科 ...

  2. hive udf 分组取top1_Hive中分组取前N个值的实现-row_number()

    背景 假设有一个学生各门课的成绩的表单,应用hive取出每科成绩前100名的学生成绩. 这个就是典型在分组取Top N的需求. 解决思路 对于取出每科成绩前100名的学生成绩,针对学生成绩表,根据学科 ...

  3. map分组后取前10个_hive中分组取前N个值的实现

    背景 假设有一个学生各门课的成绩的表单,应用hive取出每科成绩前100名的学生成绩. 这个就是典型在分组取Top N的需求. 解决思路 对于取出每科成绩前100名的学生成绩,针对学生成绩表,根据学科 ...

  4. hive udf 分组取top1_Hive的经典面试题

    很久没有发文章了,今天发表一下Hive的总结,如果那里有不足的欢迎指正,顺便再提一个问题(数仓建模中:细化程度越高,粒度级就越小,相反,细化程度越低,粒度级就越大,这个说法能打个比方比喻出来吗?) 必 ...

  5. oracle和sql server中,取前10条数据语法的区别

    在sql server中,取数据中前10条语句,我们可以用top 10 这样语句,但是oracle就没有这个函数,接下来介绍它们之间的区别 1.sql server 取前10语句和随机10条的语法 - ...

  6. SQL中如何取前百分之N的记录?

    点击关注公众号,SQL干货及时获取 后台回复:1024,获取海量学习资源 最近帮业务部门梳理业务报表,其中有个需求是就算某指标等待时间最长的前百分之十,其实就是对等待时长进行倒序排序后,取结果集的前百 ...

  7. python学习教程16-数据分组,取前三的值

    一个学校有三个班级,根据成绩取每个班级的前三名: 一个公司有三个销售部门,根据销售业绩取三个部门的销冠... 类型这样的需要有很多种. 一个excel表中,三个sheet(三个班级),把考试成绩的前三 ...

  8. python读取单元格前几个字的值_EXCEL表格中怎么取前一单元格中的前几个字符

    展开全部 截取单元格中前几个字符可以使用LEFT函数.LEFT从文本字符串62616964757a686964616fe4b893e5b19e31333366303138的第一个字符开始返回指定个数的 ...

  9. hive udf 分组取top1_Hive分组取Top K数据

    1.ROW_NUMBER,RANK(),DENSE_RANK() 语法格式:row_number() OVER (partition by COL1 order by COL2 desc ) rank ...

最新文章

  1. Windows 2003 服务器播放FLV的问题解决
  2. PHP如何使用GeoIP数据库
  3. WINCE6.0 + S3C2443的启动过程---nboot篇
  4. Go游戏服务器开发的一些思考(十):goroutine和coroutine
  5. jwt token注销_辩证的眼光搞懂 JWT 这个知识点
  6. SonarQube6.2源码解析(四)
  7. 【spring源码分析】spring中类型转换器详解
  8. DHCP保留地址与超级作用域centos7
  9. 计算机英语教学教案模板,英语教学设计模板
  10. icp许可证怎么申请
  11. (4)pokeman_用图片对模型进行测试
  12. java 文件目录操作_Java目录文件的操作 -解道Jdon
  13. 路由器端口映射,远程桌面连接--端口映射+花生壳=让人访问你个人服务器或WEB站点...
  14. 如何删除数组中的一个元素
  15. 进程注入之DLL注入
  16. C语言 经典例题 无重复三位数
  17. 百战终破黄金甲,不破楼兰终不还!
  18. jq linux下载文件,Linux中的Json格式化神器jq下载与安装
  19. Tableau实战 石油产量与收入分析(一二三)汇总制作仪表盘
  20. 10000+ 代码库、3000+ 研发人员大型保险集团的研发效能提升实践

热门文章

  1. 下一版本的Android OS ——Jelly Bean
  2. vim文本编辑器的使用
  3. (转)Membership、MembershipUser和Roles类 详解
  4. vim 删除当前词_VIM中常用的查找、替换、删除模式总结
  5. Ansible(六)对目标主机进行磁盘分区,创建逻辑卷、格式化并挂载
  6. Python——腾讯在线编程题(2018)
  7. 9.82万枚ETH在近一周被质押至以太坊2.0合约
  8. ChaiNext:过去24小时比特币向底部试探
  9. 万向区块链首席经济学家:央行数字货币与狭义银行是不同层次的概念
  10. SAP License:2021年度最新FICO面试题目