UDTF处理json
添加依赖
        <dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>2.1.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.24</version></dependency>

 
仿照这个GenericUDTFExlode函数源码来写自定义UTDF,
因为自定义函数继承的类为GenericUDTF
 
 
自定义的UTDF代码

自定义叉分函数

表生成函数

  1 package com.oldboy.umeng.hive.udtf;
  2
  3 import com.alibaba.fastjson.JSONObject;
  4 import com.oldboy.umeng.common.domain.AppLogAggEntity;
  5 import com.oldboy.umeng.common.domain.AppStartupLog;
  6 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
  7 import org.apache.hadoop.hive.ql.metadata.HiveException;
  8 import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
  9 import org.apache.hadoop.hive.serde2.objectinspector.*;
 10 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 11
 12 import java.beans.BeanInfo;
 13 import java.beans.Introspector;
 14 import java.beans.PropertyDescriptor;
 15 import java.lang.reflect.Method;
 16 import java.lang.reflect.ParameterizedType;
 17 import java.util.ArrayList;
 18 import java.util.List;
 19
 20 /**
 21  * 自定义叉分函数
 22  * 表生成函数
 23  * fork(servertimestr , clienttimems , clientip ,json)
 24  */
 25 public abstract class BaseForkUDTF<T> extends GenericUDTF {
 26
 27     private Class<T> clazz;
 28
 29     private ObjectInspectorConverters.Converter[] converters ;
 30     //字段名称列表
 31     List<String> fieldNames = null ;
 32     //检查器列表
 33     List<ObjectInspector> ois = null ;
 34
 35     //通过构造函数抽取子类的泛型化超类部分
 36     public BaseForkUDTF(){
 37         ParameterizedType type = (ParameterizedType) this.getClass().getGenericSuperclass();
 38         clazz = (Class) type.getActualTypeArguments()[0];
 39     }
 40
 41     /**
 42      * 校验参数合规性
 43      */
 44     public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
 45
 46         //字段名称集合
 47         fieldNames = new ArrayList<String>() ;
 48
 49         //对象检查器集合
 50         List<ObjectInspector> ois = new ArrayList<ObjectInspector>() ;
 51
 52         if (args.length != 4) {
 53             throw new UDFArgumentException("fork()需要4个参数!!!");
 54         }
 55         //判断参数的类型
 56         //1.string
 57         if(args[0].getCategory()!= ObjectInspector.Category.PRIMITIVE
 58                    || ((PrimitiveObjectInspector)args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){
 59             throw new UDFArgumentException("参数{1}不是string类型!!!");
 60         }
 61         //2.bigint
 62         if (args[1].getCategory() != ObjectInspector.Category.PRIMITIVE || ((PrimitiveObjectInspector) args[1]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.LONG) {
 63             throw new UDFArgumentException("参数{2}不是bigint类型!!!");
 64         }
 65         //3.string
 66         if (args[2].getCategory() != ObjectInspector.Category.PRIMITIVE || ((PrimitiveObjectInspector) args[2]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
 67             throw new UDFArgumentException("参数{3}不是string类型!!!");
 68         }
 69         //4.string
 70         if (args[3].getCategory() != ObjectInspector.Category.PRIMITIVE || ((PrimitiveObjectInspector) args[3]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
 71             throw new UDFArgumentException("参数{4}不是string类型!!!");
 72         }
 73
 74         //类型转换器
 75         converters = new ObjectInspectorConverters.Converter[args.length];
 76         //保持每个参数对应的转换器
 77         converters[0] = ObjectInspectorConverters.getConverter(args[0] , PrimitiveObjectInspectorFactory.javaStringObjectInspector) ;
 78         converters[1] = ObjectInspectorConverters.getConverter(args[1] , PrimitiveObjectInspectorFactory.javaLongObjectInspector) ;
 79         converters[2] = ObjectInspectorConverters.getConverter(args[2] , PrimitiveObjectInspectorFactory.javaStringObjectInspector) ;
 80         converters[3] = ObjectInspectorConverters.getConverter(args[3] , PrimitiveObjectInspectorFactory.javaStringObjectInspector) ;
 81
 82         try {
 83             popOIS(fieldNames , ois) ;
 84         } catch (Exception e) {
 85             e.printStackTrace();
 86         }
 87         //返回结构体对象检查器
 88         return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames , ois) ;
 89     }
 90
 91     /**
 92      * 组装对象检查器
 93      * 将字段名称和对象检查器集合同步组装完成
 94      * 每个字段都对应各自的对象检查器(ObjectInspector)
 95      */
 96     private void popOIS(List<String> fieldNames, List<ObjectInspector> ois) throws Exception {
 97         //获取clazz类的bean信息
 98         BeanInfo bi = Introspector.getBeanInfo(clazz) ;
 99
100         //得到所有属性
101         PropertyDescriptor[] pps = bi.getPropertyDescriptors();
102
103         for(PropertyDescriptor pp :pps){
104             String name = pp.getName() ;
105             Class type = pp.getPropertyType() ;
106             Method get = pp.getReadMethod() ;
107             Method set = pp.getWriteMethod() ;
108             //
109             if(get != null && set != null){
110                 if(type == Long.class || type == long.class){
111                     fieldNames.add(name) ;
112                     ois.add(PrimitiveObjectInspectorFactory.javaLongObjectInspector) ;
113                 }
114                 else if(type == int.class || type ==Integer.class){
115                     fieldNames.add(name);
116                     ois.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
117                 }
118                 else if(type == String.class){
119                     fieldNames.add(name);
120                     ois.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
121                 }
122             }
123         }
124     }
125
126     public void process(Object[] args) throws HiveException {
127         //检查一下参数的个数有效性
128         if (args.length != 4) {
129             throw new UDFArgumentException("fork()需要4个参数!!!");
130         }
131
132         String servertimestr = (String) converters[0].convert(args[0]);
133         Long clienttimems = (Long) converters[1].convert(args[1]);
134         String clientip = (String) converters[2].convert(args[2]);
135         String json = (String) converters[3].convert(args[3]);
136
137         //替换\"为"
138         json = json.replace("\\\"" , "\"") ;
139
140         //解析json,返回日志聚合体对象
141         AppLogAggEntity agg = JSONObject.parseObject(json , AppLogAggEntity.class) ;
142         //TODO 时间对齐
143         List<T> logs = getLogs(agg) ;
144
145         //外层for循环决定行数
146         for(Object log : logs){
147             Object[] arr = new Object[fieldNames.size()] ;
148             int i = 0 ;
149             //内层for循环决定列数,顺序和filedname顺序相同
150             for(String fname : fieldNames){
151                 try {
152                     PropertyDescriptor pp = new PropertyDescriptor(fname , clazz) ;
153                     Method get = pp.getReadMethod() ;
154                     if(get != null){
155                         Object retValue = get.invoke(log) ;
156                         arr[i] = retValue ;
157                     }
158                     i ++ ;
159                 } catch (Exception e) {
160                     e.printStackTrace();
161                 }
162             }
163             //转发对象,就是输出一行
164             forward(arr);
165         }
166     }
167
168     //抽象方法,子类必须重写
169     public abstract List<T> getLogs(AppLogAggEntity agg) ;
170
171     public void close() throws HiveException {
172
173     }
174 }

BaseForkUDTF

错误日志Errorlogs生成函数  继承了  BaseForkUDTF

/*** 叉分错误日志*/
public class ForkErrorlogsUDTF extends BaseForkUDTF<AppErrorLog>{public List<AppErrorLog> getLogs(AppLogAggEntity agg) {return agg.getErrorLogs();}
}

事件日志EventLogs生成函数  继承了  BaseForkUDTF

/*** 叉分事件日志*/
public class ForkEventlogsUDTF extends BaseForkUDTF<AppEventLog>{public List<AppEventLog> getLogs(AppLogAggEntity agg) {return agg.getEventLogs();}
}

页面日志PageLogs生成函数  继承了  BaseForkUDTF

/*** 叉分页面日志*/
public class ForkPagelogsUDTF extends BaseForkUDTF<AppPageLog>{public List<AppPageLog> getLogs(AppLogAggEntity agg) {return agg.getPageLogs();}
}

开启日志StartupLogs生成函数  继承了  BaseForkUDTF

/*** 叉分启动日志*/
public class ForkStartuplogsUDTF extends BaseForkUDTF<AppStartupLog>{public List<AppStartupLog> getLogs(AppLogAggEntity agg) {return agg.getStartupLogs();}
}

页面使用日志UsageLogs生成函数  继承了  BaseForkUDTF

/*** 叉分页面使用日志*/
public class ForkUsagelogsUDTF extends BaseForkUDTF<AppUsageLog>{public List<AppUsageLog> getLogs(AppLogAggEntity agg) {return agg.getUsageLogs();}
}

转载于:https://www.cnblogs.com/star521/p/9880474.html

08 友盟项目--拆分日志为五个表---UDTF自定义函数相关推荐

  1. 02.友盟项目--原始日志数据生成

    友盟架构图1 友盟架构图2 日志生成类 package com.star.umeng.phone.domain;/*** App日志的公共属性*/ public class AppBaseLog {p ...

  2. 【Lolttery】项目开发日志 (五)目标dubbox,前进

    距离上一次开始使用dubbo已经有一段时间了,这段时间在项目的开发和实践中,对于dubbo有了更深的了解.今天终于决定抛弃原有的netty框架,正式转战dubbox dubbox是在阿里停止对dubb ...

  3. 30 友盟项目---体会篇

    BigData   任重而道远 spark.apache.org/ https://flink-china.org https://stackoverflow.com/ https://mvnrepo ...

  4. ios友盟错误_iOS 2017友盟错误统计及分析

    我还以为你不会搜我呢~ 看完这文章后的你 前言 在app开发中,我坚信,总会有小伙伴会用到友盟统计的,我也是(废话).但是在友盟的错误日志分析这块,小白用的话可能会有点小麻烦.最近有个小伙伴问我这点, ...

  5. 友盟+的数据智能生意经:如何从DI向AI进阶?

    采访 & 编辑 | Natalie 嘉宾 | 李丹枫 编辑 | Debra AI 前线导读:到今年为止,大数据的概念已经火了不止 7 年,人工智能和区块链成为了新的风口,而大数据产业则慢慢沉淀 ...

  6. android umeng,GitHub - umeng/umeng_community_android: 友盟微社区Android SDK开源项目

    友盟微社区 Android SDK 该项目是友盟微社区的开源部分代码,根目录下的工程为集成友盟微社区SDK的demo,umeng_comm_android_ui为友盟微社区开源UI代码,里面包含了友盟 ...

  7. 友盟+《小程序用户增长白皮书》:从五个角度入手分析小程序数据

    简介: 近日,国内领先的全域数据智能服务商--友盟+,发布了<友盟+U-APM 移动应用性能体验报告>.据悉,友盟+于去年将原移动分析U-App错误分析模块正式升级为U-APM应用性能监控 ...

  8. H5项目友盟埋点上报实践

    使用友盟平台做H5统计分析的话可以使用小程序统计下的H5统计分析 使用步骤: 1.注册/登录平台 2.创建H5应用 3.集成SDK 4.查看统计数据 这里主要需要开发支持的步骤是集成SDK这一步骤,下 ...

  9. Android开发-在Android项目里集成友盟统计SDK进行对移动APP统计的实现

    前 言 友盟(Umeng),2010年4月在北京成立,是中国最专业.最有数据凝聚力的移动开发者服务平台.友盟以移动应用统计分析为产品起点,发展成为提供从基础设置搭建-开发-运营服务的整合服务平台,致力 ...

最新文章

  1. C#2.0及C#3.0语言规范
  2. 在redhat9上安装firefox
  3. 【Java IO知识】读取中文乱码问题解决
  4. mysql多个分类取n条_MySQL获取所有分类和每个分类的前N条记录
  5. ByteBuffer 转 InputStream
  6. Android如何解析Intent
  7. PSpice和Simulink联合仿真笔记(一)SLPS介绍
  8. 清华大学数学与计算机学院院长,王跃宣(清华大学教授)_百度百科
  9. 美国3月通胀见顶了吗?
  10. 浪曦_Struts2应用开发系列_第2讲.Struts2的类型转换--出现的问题笔记
  11. SetWindowsHookEx 全局钩子
  12. usb摄像头android录像软件,USB摄像头app
  13. 高中物理应用计算机教学心得,高中物理教学心得
  14. BUUCTF-[网鼎杯 2020 青龙组]虚幻2
  15. 关闭笔记本电脑自带键盘
  16. 习惯于CRUD,不求上进呢?-为什么有很多程序员沉醉于舒适区
  17. cf85d treap
  18. 262. Trips and Users - 行程和用户 <Hard>
  19. 从 Paxos 到 Raft,分布式一致性算法解析
  20. Linux--shell--4--毫秒级时间戳

热门文章

  1. 【CCF】201503-1图像旋转
  2. ASP.NET @Page指令属性
  3. 基于LINQ to SQL的WEB开发三层架构(2)
  4. markdown数学公式手册
  5. PyQt5入门——QListWidget实现图片缩略图列表
  6. 使用Spring Security进行自动登录验证
  7. 机器学习之RandomForest(随机深林)原理与实战
  8. 卡尔曼滤波、扩展卡尔曼滤波、无迹卡尔曼滤波以及粒子滤波原理
  9. Golang基本变量
  10. 机器视觉【基础】什么是机器视觉?