过滤函数

所有的过滤函数都要继承FilterFunc类,并且实现抽象方法exec(),该方法的返回类型为Boolean。
示例代码如下:

package com.udf.filter;import org.apache.pig.FilterFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;import java.io.IOException;public class IsGoodQuantity extends FilterFunc {@Overridepublic Boolean exec(Tuple tuple) throws IOException {if (tuple == null || tuple.size() == 0) {return false;}try {Object object = tuple.get(0);if (object == null) {return false;}int i = (Integer)object;return i == 0 || i == 1 || i == 4 || i == 5 || i == 9;} catch (ExecException e) {throw new IOException(e);}}
}

编写好代码之后,首先将它打成一个jar包。然后通过REGISTER操作指定文件的路径。

grunt> records = LOAD '/home/jackeyzhe/hadoop-book/input/ncdc/micro-tab/sample.txt'
>> AS (year:chararray, temperature:int, quality:int);
DUMP records;
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
(1949,78,1)
(1994,24,2)
REGISTER pigFilterUdf.jargrunt> filtered_records = FILTER records BY temperature != 9999 AND
>> com.udf.filter.IsGoodQuantity(quality);
DUMP filtered_records;
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
(1949,78,1)

结果如上所示,quality为2的数据被过滤掉了。

计算函数

自定义的计算函数要继承EvalFunc类,需要注意的是,写计算函数需要参数化返回类型。该类型为String。
示例代码如下:

public class Trim extends EvalFunc<String> {@Overridepublic String exec(Tuple tuple) throws IOException {if (tuple == null || tuple.size() == 0) {return null;}try {Object object = tuple.get(0);if (object == null) {return null;}return ((String) object).trim();} catch (ExecException e) {throw new IOException(e);}}@Overridepublic List<FuncSpec> getArgToFuncMapping() throws FrontendException {List<FuncSpec> funcList = new ArrayList<FuncSpec>();funcList.add(new FuncSpec(this.getClass().getName(), new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY))));return funcList;}
}

加载函数

加载函数需要继承LoadFunc,并实现相应的抽象方法。
具体代码示例如下:
CutLoadFunc.java

public class CutLoadFunc extends LoadFunc {private static final Log LOG = LogFactory.getLog(CutLoadFunc.class);private final List<Range> ranges;private final TupleFactory tupleFactory = TupleFactory.getInstance();private RecordReader reader;public CutLoadFunc(String cutPattern) {ranges = Range.parse(cutPattern);}@Overridepublic void setLocation(String location, Job job) throws IOException {FileInputFormat.setInputPaths(job, location);}@Overridepublic InputFormat getInputFormat() throws IOException {return new TextInputFormat();}@Overridepublic void prepareToRead(RecordReader recordReader, PigSplit pigSplit) throws IOException {this.reader = recordReader;}@Overridepublic Tuple getNext() throws IOException {try {if (!reader.nextKeyValue()) {return null;}Text value = (Text) reader.getCurrentValue();String line = value.toString();Tuple tuple = tupleFactory.newTuple(ranges.size());for (int i=0;i < ranges.size();i++) {Range range = ranges.get(i);if (range.getEnd() > line.length()) {LOG.warn(String.format(" Range end (%s) is longer than line length (%s)",range.getEnd(), line.length()));continue;}tuple.set(i, new DataByteArray(range.getSubstring(line)));}return tuple;} catch (InterruptedException e) {throw new ExecException(e);}}
}

Range.java

public class Range {private final int start;private final int end;public Range(int start, int end) {this.start = start;this.end = end;}public int getStart() {return start;}public int getEnd() {return end;}public String getSubstring(String line) {return line.substring(start - 1, end);}@Overridepublic int hashCode() {return start * 37 + end;}@Overridepublic boolean equals(Object obj) {if (!(obj instanceof Range)) {return false;}Range other = (Range) obj;return this.start == other.start && this.end == other.end;}public static List<Range> parse(String rangeSpec)throws IllegalArgumentException {if (rangeSpec.length() == 0) {return Collections.emptyList();}List<Range> ranges = new ArrayList<Range>();String[] specs = rangeSpec.split(",");for (String spec : specs) {String[] split = spec.split("-");try {ranges.add(new Range(Integer.parseInt(split[0]), Integer.parseInt(split[1])));} catch (NumberFormatException e) {throw new IllegalArgumentException(e.getMessage());}}return ranges;}}

加载结果:

grunt> records = LOAD '/home/jackeyzhe/hadoop-book/input/ncdc/micro/sample.txt'
>> USING com.udf.load.CutLoadFunc('16-19,88-92,93-93')
>> AS (year:int, temperature:int, quality:int);
grunt> DUMP records;

(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
(1949,78,1)

java程序员的大数据之路(15):Pig Latin用户自定义函数相关推荐

  1. java程序员的大数据之路(13):Pig入门

    Pig简介 Pig为大型数据集的处理提供了更高层次的抽象. Pig包括两部分: 用于描述数据流的语言,称为Pig Latin. 用于运行Pig Latin程序的执行环境.当前有两个环境:单JVM中的本 ...

  2. java程序员的大数据之路(3):用maven构建Hadoop项目

    背景 由于Hadoop项目多数是比较大的项目,因此我们选择使用构建工具来构建Hadoop项目,这里我们使用的是maven.当然也可以使用Gradle等比较流行的构建工具 构建过程 这里总结一下我使用I ...

  3. java程序员的大数据之路(14):Pig Latin

    结构 一个Pig Latin程序由一组语句构成,一个语句可以理解为一个操作,或一个命令.语句必须以分号结束. Pig Latin有两种注释方法,双减号表示单行注释.多行注释可以使用/* 和 */表示. ...

  4. java程序员的大数据之路(1):Hadoop安装

    Hadoop伪分布式安装 从今天开始我会在这里记录在大数据学习方面的方法和遇到的一些问题. 首先从最著名的开源平台Hadoop开始学习.参考安装教程,这个教程比较全面,按照步骤一步步安装即可. 安装时 ...

  5. java程序员的大数据之路(12):Hadoop的守护进程

    关键属性 Hadoop守护进程的关键属性大多标记为final,使作业的配置无法覆盖. 典型的core-site.xml配置文件 <?xml version="1.0"> ...

  6. java程序员的大数据之路(9):MapReduce的类型

    概述 Hadoop的MapReduce中,map和reduce函数遵循如下常规格式: map:(K1,V1) -> list(K2,V2) reduce:(K2,list(V2)) -> ...

  7. java程序员的大数据之路(2):创建第一个Hadoop程序

    环境 Ubuntu 16.04 + Hadoop 2.7.4 + Intellij idea 2017.2 + jdk 1.8 创建过程 新建工程 新建一个工程 输入工程名 可以随便给工程起一个名字, ...

  8. java程序员的大数据之路(8):MapReduce的工作机制

    概述 Hadoop运行作业时的整个过程如果所示. 包含如下4个独立的实体. 客户端:提交MapReduce作业. jobtracker:协调作业的运行.它的主类是JobTracker. tasktra ...

  9. java程序员的大数据之路(7):基于文件的数据结构

    SequenceFile 介绍 由于日志文件中每一条日志记录是一行文本.如果想记录二进制类型,纯文本是不合适的.这种情况下,Hadoop的SequenceFile类非常合适.SequenceFile可 ...

最新文章

  1. Doctor NiGONiGO’s multi-core CPU(最小费用最大流模板)
  2. 中断和异常,陷阱的区别和联系
  3. android 7 蓝牙版本,[Android]Android什么版本开始支持蓝牙4.2?答案:Android 7.0
  4. ITK:创建三角形四边形网格
  5. 2005年上海交通大学计算机研究生机试真题
  6. 寄存器计算软件/寄存器小精灵
  7. 计算机专业论文关于天气预报的,关于天气预报论文范文写作 天气预报相关论文写作资料...
  8. mysql在恢复数据时出现“table full”报错
  9. 【Kafka】kafka 脚本kafka-configs.sh用法解析
  10. python中oserror捕获_Python assert异常处理(一看即懂)
  11. 老李分享:HTTP session原理及应用 1
  12. RedHat命令笔记
  13. 系统动力学建模工具_多体动力学:ANSYS Motion 2020R2
  14. 高效的六面体变换算法实现(一) —— 等圆柱映射 与 六面体映射
  15. 李开复致中国家长的信:培养快乐感性的孩子
  16. HDU 5745 La Vie en rose(bitset优化dp)
  17. 应用宝上线应用后一直处于审核状态问题解决
  18. 干货深挖!从写简历,到面试、谈薪酬的那些技巧和防坑指南
  19. 小米5s Plus安装类原生系统
  20. 使用这个vue snippets,提高百倍开发效率

热门文章

  1. 千万级分拣平台API安全治理实战
  2. 开发Foxmail与Pocket PC的同步软件系列之一--FreeMail
  3. java计算机毕业设计-食材采购平台-源程序+mysql+系统+lw文档+远程调试
  4. 再谈微积分公理化问题
  5. 计算机网络考前复习整理
  6. PDF转txt之后特殊字符的处理
  7. Python学习第七课-元组字典集合
  8. python怎么取模,Python运算符之取模%
  9. Mac OS平台开源的压缩解压缩软件——keka
  10. Capstone 音视频转换HDMI转VGA方案|typec转HDMI拓展坞方案