Pig是一种数据流编程语言,由一系列操作和变换构成,每一个操作或者变换都对输入进行处理,然后产生输出结果,整体操作表示一个数据流。Pig的执行环境将数据流翻译为可执行的内部表示,在Pig内部,这些变换操作被转换为一系列的MapReduce作业。

Pig自身有许多个方法,有时候需要我们自己定制特定的处理方法即UDF。

UDF具体的步骤如下:

第一步,继承计算类或者过滤类或者加载类或者存储类,重写里面的需要实现的方法,将写好的类进行打包生成jar文件。诸如命名为example.jar

第二步,进入Pig的grunt中,利用register将打包的文件注册进入Pig中。进入Pig的grunt中,当前本地路径就是用户输入Pig时候所在的路径。打包文件一定要加上它所在的路径。如register example.jar。

第三步,直接使用该自定义的UDF,在使用的过程中需要加上该类的权限定包名,如果这里example.jar的包结构为com.whut.FilterFunct。则引用的时候就是com.whut.FilterFunct(参数)。注意类的名称就是使用时候的方法名,必须要区分大小写。

第四步,为自己的UDF定义别名,这样使用的时候就不许要加包名了,如

define Goog com.whut.FilterFunct()。这样使用的时候就直接利用Goog了。

自定义过滤UDF:

过滤UDF需要继承FilterFunc。实现其exec方法。该方法返回的是boolean型。在对温度统计的时候,就可以利用过滤UDF来过滤是否正确的气温。

package whut;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.FilterFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
//删除记录中不符合要求的记录
//pig的自定义函数,过滤函数
public class IsGoodQuality extends FilterFunc{@Overridepublic Boolean exec(Tuple tuple) throws IOException {// TODO Auto-generated method stubif(tuple ==null ||tuple.size()==0)return false;try{Object obj=tuple.get(0);if(obj==null)return false;//这里强制转换为一个×××int i=(Integer)obj;return i==0 ||i==1 || i==2 || i==3;}catch(ExecException e){throw new IOException(e);}}
}

这里的参数是一个元组,可以包含多个输入参数,在方法中直接利用get(索引位置)来直接获取。

自定义加载函数UDF

在Pig中经常会使用到加载外部文件,一般使用Load进行加载,如Load 'input/tempdata' as (a:chararray,b:int) 。这里默认使用了内部加载存储函数,PigStorage。

即Load 'input/tempdata' using PigStorage()  as (a:chararray,b:int)。这里PigStorage默认的每一行的字段分割符是制表符,当然也可以传递一个自己的字段分割符号。有时候每一行是一串字符串,想从中取出某一个字段,则就需要自己定义一个加载函数。以下面这个文件为例子。

aaaaa1990aaaaaa0039a
bbbbb1991bbbbbb0045a
ccccc1992cccccc0011c
ddddd1993dddddd0043d
eeeee1994eeeeee0047e
aaaaa1990aaaaaa0037a
bbbbb1991bbbbbb0027a
ccccc1992cccccc0032c
ddddd1993dddddd0090d
eeeee1994eeeeee0091e
aaaaa1980aaaaaa0041a
bbbbb1981bbbbbb0050a
ccccc1992cccccc0020c
ddddd1993dddddd0033d
eeeee1984eeeeee0061e
aaaaa1980aaaaaa0054a
bbbbb1991bbbbbb0075a
ccccc1982cccccc0011c
ddddd1993dddddd0003d
eeeee1974eeeeee0041e
aaaaa1990aaaaaa0039a
bbbbb1961bbbbbb0041a
ccccc1972cccccc0070c
ddddd1993dddddd0042d
eeeee1974eeeeee0043e
aaaaa1990aaaaaa0034a
bbbbb1971bbbbbb0025a
ccccc1992cccccc0056c
ddddd1993dddddd0037d
eeeee1984eeeeee0038e
aaaaa1990aaaaaa0049a
bbbbb1991bbbbbb0011a
ccccc1962cccccc0012c
ddddd1993dddddd0023d
eeeee1984eeeeee0031e
aaaaa1980aaaaaa0094a
bbbbb1971bbbbbb0045a
ccccc1992cccccc0041c
ddddd1993dddddd0003d
eeeee1984eeeeee0081e
aaaaa1960aaaaaa0099a
bbbbb1971bbbbbb0050a
ccccc1952cccccc0055c
ddddd1963dddddd0043d
eeeee1994eeeeee0041e
aaaaa1990aaaaaa0031a
bbbbb1991bbbbbb0020a
ccccc1952cccccc0030c
ddddd1983dddddd0013d
eeeee1974eeeeee0061e
aaaaa1980aaaaaa0071a
bbbbb1961bbbbbb0060a
ccccc1992cccccc0080c
ddddd1953dddddd0033d
eeeee1964eeeeee0051e
aaaaa1960aaaaaa0024a
bbbbb1951bbbbbb0035a
ccccc1952cccccc0048c
ddddd1953dddddd0053d
eeeee1954eeeeee0048e

为了从中取出年份和温度,则就需要自己定义加载函数,这里每一列序号以0开始。自定义加载函数需要继承LoadFunc。具体的代码如下。

package whut;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
class Range
{//列的索引以0开始//字段分割的列的位置private int start;private int end;//根据输入来解析//字符串格式必须是(2~3,5~6)public static List<Range> parse(String cutStr)throws Exception{List<Range> rangeList=new ArrayList<Range>();//首先要判断是否格式正确boolean state=cutStr.matches("\\d+~\\d+(,\\d+~\\d+)*");if(!state){throw new Exception("InputForat Error:\n" +"Usage:number~number,number~number;Such 2~7,10~19");}//先截取几个字段的列起止位置如2~8  String[] splits=cutStr.split(",");//遍历长度设置Rangefor(int i=0;i<splits.length;i++){Range range=new Range();String sub=splits[i];String[] subSplits=sub.split("~");int subStart=Integer.parseInt(subSplits[0]);int subEnd=Integer.parseInt(subSplits[1]);if(subStart>subEnd)throw new Exception("InputForat Error:\n" +"Detail:first number must less than second number");range.setStart(subStart);range.setEnd(subEnd);rangeList.add(range);}return rangeList;}public int getStart() {return start;}public void setStart(int start) {this.start = start;}public int getEnd() {return end;}public void setEnd(int end) {this.end = end;}public String getSubString(String inStr){String res=inStr.substring(start, end);return res;}
}
//定义加载函数,从每一行字符串提出年份,温度
public class LineLoadFunc extends LoadFunc{private static final Log LOG=LogFactory.getLog(LineLoadFunc.class);//负责产生元组的各个字段private final TupleFactory tupleFactory=TupleFactory.getInstance();//负责读取输入记录private RecordReader reader;//存每个字段的集合private List<Range> ranges;//传递参数设置列的位置分割public LineLoadFunc(String cutPattern)throws Exception{ranges=Range.parse(cutPattern);}//设置文件的加载位置@Overridepublic void setLocation(String location, Job job) throws IOException {FileInputFormat.setInputPaths(job, location);}//设置加载文件的输入文件格式//为每一个分片建立一个RecordReader@Overridepublic InputFormat getInputFormat() throws IOException {return new TextInputFormat();}@Overridepublic void prepareToRead(RecordReader reader, PigSplit split)throws IOException {this.reader=reader;}@Overridepublic Tuple getNext() throws IOException {// TODO Auto-generated method stubtry{if(!reader.nextKeyValue())return null;//TextInputFormat//key:LongWritable,value:TextText value=(Text)reader.getCurrentValue();String line=value.toString();//设置每一个元组有几个字段Tuple tuple=tupleFactory.newTuple(ranges.size());for(int i=0;i<ranges.size();i++){Range range=ranges.get(i);if(range.getEnd()>line.length()){throw new ExecException("InputFormat:Error\n" +"field length more than total length");}//必须使用DataByteArray来构造字段的类型tuple.set(i, new DataByteArray(range.getSubString(line)));}return tuple;}catch(InterruptedException e){throw new ExecException();}}
}

具体使用的方法就是按照刚才所说的步骤进行的。

转载于:https://blog.51cto.com/computerdragon/1288228

Pig自定义过滤UDF和加载UDF相关推荐

  1. flink sql udf jar包_FlinkSQL 动态加载 UDF 实现思路

    导读: 最近在对 Flink 进行平台化,基于 REST API 构建一个平台实现通过纯 SQL 化编写和管理 Job.尽管 Flink官方希望用户将所有的依赖和业务逻辑打成一个fat jar,这样方 ...

  2. flinksql获取系统当前时间搓_FlinkSQL 动态加载 UDF 实现思路

    导读: 最近在对 Flink 进行平台化,基于 REST API 构建一个平台实现通过纯 SQL 化编写和管理 Job.尽管 Flink官方希望用户将所有的依赖和业务逻辑打成一个fat jar,这样方 ...

  3. 金属氢化物Fluent传热仿真论文复现(UDF加载能量源项)

    关于金属氢化物的传质传热,论文有相应的公式,主要是通过六个方程:能量守恒.动量守恒.动力学方程.热力学方程和理想气态方程.相应的公式解释在这里不作过多的解释,以后有时间,我会专门写一篇博文详细展开说明 ...

  4. udf开发入门(python udf、hive udf)

    开发前的声明         udf开发是在数据分析的时候如果内置的函数解析不了的情况下去做的开发,比方说你只想拆分一个字段,拼接一个字段之类的,就不要去搞udf了,这种基本的需求自带函数完全支持,具 ...

  5. PyTorch 笔记(20)— torchvision 的 datasets、transforms 数据预览和加载、模型搭建(torch.nn.Conv2d/MaxPool2d/Dropout)

    计算机视觉是深度学习中最重要的一类应用,为了方便研究者使用,PyTorch 团队专门开发了一个视觉工具包torchvision,这个包独立于 PyTorch,需通过 pip instal torchv ...

  6. flutter 如何自定义一个loadmore / 加载更多

    写在前面 这类的库在pub上有很多 我为什么要自定义呢 首先是项目需要,并且这种库普适性高,抽取出来今后复用也方便点 另外记录一下编码思路,方便后续查看 pub地址 pub国内镜像 github 使用 ...

  7. udf提权 udf.php,UDF提权

    1.什么是udf UDF是mysql的一个拓展接口,UDF(User defined function)可翻译为用户自定义函数,这个是用来拓展Mysql的技术手段.用户可以通过UDF添加自定义函数,在 ...

  8. TensorFlow 教程 --进阶指南--3.2变量:创建、初始化、保存和加载

    变量:创建.初始化.保存和加载 当训练模型时,用变量来存储和更新参数.变量包含张量 (Tensor)存放于内存的缓存区.建模时它们需要被明确地初始化,模型训练后它们必须被存储到磁盘.这些变量的值可在之 ...

  9. PyTorch学习笔记2:nn.Module、优化器、模型的保存和加载、TensorBoard

    文章目录 一.nn.Module 1.1 nn.Module的调用 1.2 线性回归的实现 二.损失函数 三.优化器 3.1.1 SGD优化器 3.1.2 Adagrad优化器 3.2 分层学习率 3 ...

最新文章

  1. bzoj 4710 [Jsoi2011]分特产 组合数学+容斥原理
  2. C#——《C#语言程序设计》实验报告——泛型与集合——“画树”程序
  3. -----------最小生成树----------------
  4. java setr()_Java RPr.setRFonts方法代码示例
  5. python内嵌函数和闭包与java 匿名内部类_Lambda表达式与匿名内部类的联系和区别...
  6. 拳王虚拟项目公社:怎么找低价电影票,低价电影票怎样赚钱,低价电影票实操赚钱方法?
  7. 【吐血经验】在 windows 上安装 spark 遇到的一些坑 | 避坑指南
  8. java lbs_在 Java 中利用 redis 实现 LBS 服务
  9. BootStrap modal() 如何根据返回的HTML宽度自动调整宽度?
  10. 百度移动搜索主要有如下几类结果构成
  11. SGD,Momentum,优化算法原理及实现
  12. Texstudio安装后闪退|重装系统Windows10|texstudio2022
  13. [MATLAB]最邻近插值法进行图像放大
  14. 用Python做一个游戏辅助脚本,完整编程思路分享
  15. FLURRY 文档摘要及备注
  16. SwiftUI系列教程第1章第4节:Text的Padding属性
  17. 便利贴--14{GIF录制工具}
  18. 从从协方差的误差椭圆到PCA
  19. 一个用interproscan做基因注释的简易教程
  20. 小学信息学竞赛计算机基础知识,信息学竞赛怎么快速入门

热门文章

  1. 第一代计算机到第四代计算机基本知识,[Ch01_计算机系统基本知识.ppt
  2. Kali Linux打开多个终端窗口
  3. Visual Studio 2019更新到16.2.1
  4. java 可控异常_java异常处理,重新认识java异常,java7异常处理的新特性!
  5. django jsonresponse_利用 Django 动态展示 Pyecharts 图表数据的几种方法
  6. bootstrap怎么用_不用自己写css,不用bootstrap,写样式有tailwindcss就足够了
  7. 霍夫变换检测圆c 语言,c – 使用Hough变换检测圆
  8. php mysql增删改查实例_php连接数据库实现用户数据的增删改查实例
  9. [unreal4入门系列之三] 初探UE4安装文件目录结构
  10. 17个改变世界的数学公式,马斯克点赞