08 友盟项目--拆分日志为五个表---UDTF自定义函数
<dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>2.1.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.24</version></dependency>
自定义叉分函数
表生成函数
1 package com.oldboy.umeng.hive.udtf; 2 3 import com.alibaba.fastjson.JSONObject; 4 import com.oldboy.umeng.common.domain.AppLogAggEntity; 5 import com.oldboy.umeng.common.domain.AppStartupLog; 6 import org.apache.hadoop.hive.ql.exec.UDFArgumentException; 7 import org.apache.hadoop.hive.ql.metadata.HiveException; 8 import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; 9 import org.apache.hadoop.hive.serde2.objectinspector.*; 10 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; 11 12 import java.beans.BeanInfo; 13 import java.beans.Introspector; 14 import java.beans.PropertyDescriptor; 15 import java.lang.reflect.Method; 16 import java.lang.reflect.ParameterizedType; 17 import java.util.ArrayList; 18 import java.util.List; 19 20 /** 21 * 自定义叉分函数 22 * 表生成函数 23 * fork(servertimestr , clienttimems , clientip ,json) 24 */ 25 public abstract class BaseForkUDTF<T> extends GenericUDTF { 26 27 private Class<T> clazz; 28 29 private ObjectInspectorConverters.Converter[] converters ; 30 //字段名称列表 31 List<String> fieldNames = null ; 32 //检查器列表 33 List<ObjectInspector> ois = null ; 34 35 //通过构造函数抽取子类的泛型化超类部分 36 public BaseForkUDTF(){ 37 ParameterizedType type = (ParameterizedType) this.getClass().getGenericSuperclass(); 38 clazz = (Class) type.getActualTypeArguments()[0]; 39 } 40 41 /** 42 * 校验参数合规性 43 */ 44 public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException { 45 46 //字段名称集合 47 fieldNames = new ArrayList<String>() ; 48 49 //对象检查器集合 50 List<ObjectInspector> ois = new ArrayList<ObjectInspector>() ; 51 52 if (args.length != 4) { 53 throw new UDFArgumentException("fork()需要4个参数!!!"); 54 } 55 //判断参数的类型 56 //1.string 57 if(args[0].getCategory()!= ObjectInspector.Category.PRIMITIVE 58 || ((PrimitiveObjectInspector)args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){ 59 throw new UDFArgumentException("参数{1}不是string类型!!!"); 60 } 61 //2.bigint 62 if (args[1].getCategory() != ObjectInspector.Category.PRIMITIVE || ((PrimitiveObjectInspector) args[1]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.LONG) { 63 throw new UDFArgumentException("参数{2}不是bigint类型!!!"); 64 } 65 //3.string 66 if (args[2].getCategory() != ObjectInspector.Category.PRIMITIVE || ((PrimitiveObjectInspector) args[2]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) { 67 throw new UDFArgumentException("参数{3}不是string类型!!!"); 68 } 69 //4.string 70 if (args[3].getCategory() != ObjectInspector.Category.PRIMITIVE || ((PrimitiveObjectInspector) args[3]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) { 71 throw new UDFArgumentException("参数{4}不是string类型!!!"); 72 } 73 74 //类型转换器 75 converters = new ObjectInspectorConverters.Converter[args.length]; 76 //保持每个参数对应的转换器 77 converters[0] = ObjectInspectorConverters.getConverter(args[0] , PrimitiveObjectInspectorFactory.javaStringObjectInspector) ; 78 converters[1] = ObjectInspectorConverters.getConverter(args[1] , PrimitiveObjectInspectorFactory.javaLongObjectInspector) ; 79 converters[2] = ObjectInspectorConverters.getConverter(args[2] , PrimitiveObjectInspectorFactory.javaStringObjectInspector) ; 80 converters[3] = ObjectInspectorConverters.getConverter(args[3] , PrimitiveObjectInspectorFactory.javaStringObjectInspector) ; 81 82 try { 83 popOIS(fieldNames , ois) ; 84 } catch (Exception e) { 85 e.printStackTrace(); 86 } 87 //返回结构体对象检查器 88 return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames , ois) ; 89 } 90 91 /** 92 * 组装对象检查器 93 * 将字段名称和对象检查器集合同步组装完成 94 * 每个字段都对应各自的对象检查器(ObjectInspector) 95 */ 96 private void popOIS(List<String> fieldNames, List<ObjectInspector> ois) throws Exception { 97 //获取clazz类的bean信息 98 BeanInfo bi = Introspector.getBeanInfo(clazz) ; 99 100 //得到所有属性 101 PropertyDescriptor[] pps = bi.getPropertyDescriptors(); 102 103 for(PropertyDescriptor pp :pps){ 104 String name = pp.getName() ; 105 Class type = pp.getPropertyType() ; 106 Method get = pp.getReadMethod() ; 107 Method set = pp.getWriteMethod() ; 108 // 109 if(get != null && set != null){ 110 if(type == Long.class || type == long.class){ 111 fieldNames.add(name) ; 112 ois.add(PrimitiveObjectInspectorFactory.javaLongObjectInspector) ; 113 } 114 else if(type == int.class || type ==Integer.class){ 115 fieldNames.add(name); 116 ois.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector); 117 } 118 else if(type == String.class){ 119 fieldNames.add(name); 120 ois.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); 121 } 122 } 123 } 124 } 125 126 public void process(Object[] args) throws HiveException { 127 //检查一下参数的个数有效性 128 if (args.length != 4) { 129 throw new UDFArgumentException("fork()需要4个参数!!!"); 130 } 131 132 String servertimestr = (String) converters[0].convert(args[0]); 133 Long clienttimems = (Long) converters[1].convert(args[1]); 134 String clientip = (String) converters[2].convert(args[2]); 135 String json = (String) converters[3].convert(args[3]); 136 137 //替换\"为" 138 json = json.replace("\\\"" , "\"") ; 139 140 //解析json,返回日志聚合体对象 141 AppLogAggEntity agg = JSONObject.parseObject(json , AppLogAggEntity.class) ; 142 //TODO 时间对齐 143 List<T> logs = getLogs(agg) ; 144 145 //外层for循环决定行数 146 for(Object log : logs){ 147 Object[] arr = new Object[fieldNames.size()] ; 148 int i = 0 ; 149 //内层for循环决定列数,顺序和filedname顺序相同 150 for(String fname : fieldNames){ 151 try { 152 PropertyDescriptor pp = new PropertyDescriptor(fname , clazz) ; 153 Method get = pp.getReadMethod() ; 154 if(get != null){ 155 Object retValue = get.invoke(log) ; 156 arr[i] = retValue ; 157 } 158 i ++ ; 159 } catch (Exception e) { 160 e.printStackTrace(); 161 } 162 } 163 //转发对象,就是输出一行 164 forward(arr); 165 } 166 } 167 168 //抽象方法,子类必须重写 169 public abstract List<T> getLogs(AppLogAggEntity agg) ; 170 171 public void close() throws HiveException { 172 173 } 174 }
BaseForkUDTF
错误日志Errorlogs生成函数 继承了 BaseForkUDTF
/*** 叉分错误日志*/ public class ForkErrorlogsUDTF extends BaseForkUDTF<AppErrorLog>{public List<AppErrorLog> getLogs(AppLogAggEntity agg) {return agg.getErrorLogs();} }
事件日志EventLogs生成函数 继承了 BaseForkUDTF
/*** 叉分事件日志*/ public class ForkEventlogsUDTF extends BaseForkUDTF<AppEventLog>{public List<AppEventLog> getLogs(AppLogAggEntity agg) {return agg.getEventLogs();} }
页面日志PageLogs生成函数 继承了 BaseForkUDTF
/*** 叉分页面日志*/ public class ForkPagelogsUDTF extends BaseForkUDTF<AppPageLog>{public List<AppPageLog> getLogs(AppLogAggEntity agg) {return agg.getPageLogs();} }
开启日志StartupLogs生成函数 继承了 BaseForkUDTF
/*** 叉分启动日志*/ public class ForkStartuplogsUDTF extends BaseForkUDTF<AppStartupLog>{public List<AppStartupLog> getLogs(AppLogAggEntity agg) {return agg.getStartupLogs();} }
页面使用日志UsageLogs生成函数 继承了 BaseForkUDTF
/*** 叉分页面使用日志*/ public class ForkUsagelogsUDTF extends BaseForkUDTF<AppUsageLog>{public List<AppUsageLog> getLogs(AppLogAggEntity agg) {return agg.getUsageLogs();} }
转载于:https://www.cnblogs.com/star521/p/9880474.html
08 友盟项目--拆分日志为五个表---UDTF自定义函数相关推荐
- 02.友盟项目--原始日志数据生成
友盟架构图1 友盟架构图2 日志生成类 package com.star.umeng.phone.domain;/*** App日志的公共属性*/ public class AppBaseLog {p ...
- 【Lolttery】项目开发日志 (五)目标dubbox,前进
距离上一次开始使用dubbo已经有一段时间了,这段时间在项目的开发和实践中,对于dubbo有了更深的了解.今天终于决定抛弃原有的netty框架,正式转战dubbox dubbox是在阿里停止对dubb ...
- 30 友盟项目---体会篇
BigData 任重而道远 spark.apache.org/ https://flink-china.org https://stackoverflow.com/ https://mvnrepo ...
- ios友盟错误_iOS 2017友盟错误统计及分析
我还以为你不会搜我呢~ 看完这文章后的你 前言 在app开发中,我坚信,总会有小伙伴会用到友盟统计的,我也是(废话).但是在友盟的错误日志分析这块,小白用的话可能会有点小麻烦.最近有个小伙伴问我这点, ...
- 友盟+的数据智能生意经:如何从DI向AI进阶?
采访 & 编辑 | Natalie 嘉宾 | 李丹枫 编辑 | Debra AI 前线导读:到今年为止,大数据的概念已经火了不止 7 年,人工智能和区块链成为了新的风口,而大数据产业则慢慢沉淀 ...
- android umeng,GitHub - umeng/umeng_community_android: 友盟微社区Android SDK开源项目
友盟微社区 Android SDK 该项目是友盟微社区的开源部分代码,根目录下的工程为集成友盟微社区SDK的demo,umeng_comm_android_ui为友盟微社区开源UI代码,里面包含了友盟 ...
- 友盟+《小程序用户增长白皮书》:从五个角度入手分析小程序数据
简介: 近日,国内领先的全域数据智能服务商--友盟+,发布了<友盟+U-APM 移动应用性能体验报告>.据悉,友盟+于去年将原移动分析U-App错误分析模块正式升级为U-APM应用性能监控 ...
- H5项目友盟埋点上报实践
使用友盟平台做H5统计分析的话可以使用小程序统计下的H5统计分析 使用步骤: 1.注册/登录平台 2.创建H5应用 3.集成SDK 4.查看统计数据 这里主要需要开发支持的步骤是集成SDK这一步骤,下 ...
- Android开发-在Android项目里集成友盟统计SDK进行对移动APP统计的实现
前 言 友盟(Umeng),2010年4月在北京成立,是中国最专业.最有数据凝聚力的移动开发者服务平台.友盟以移动应用统计分析为产品起点,发展成为提供从基础设置搭建-开发-运营服务的整合服务平台,致力 ...
最新文章
- C#2.0及C#3.0语言规范
- 在redhat9上安装firefox
- 【Java IO知识】读取中文乱码问题解决
- mysql多个分类取n条_MySQL获取所有分类和每个分类的前N条记录
- ByteBuffer 转 InputStream
- Android如何解析Intent
- PSpice和Simulink联合仿真笔记(一)SLPS介绍
- 清华大学数学与计算机学院院长,王跃宣(清华大学教授)_百度百科
- 美国3月通胀见顶了吗?
- 浪曦_Struts2应用开发系列_第2讲.Struts2的类型转换--出现的问题笔记
- SetWindowsHookEx 全局钩子
- usb摄像头android录像软件,USB摄像头app
- 高中物理应用计算机教学心得,高中物理教学心得
- BUUCTF-[网鼎杯 2020 青龙组]虚幻2
- 关闭笔记本电脑自带键盘
- 习惯于CRUD,不求上进呢?-为什么有很多程序员沉醉于舒适区
- cf85d treap
- 262. Trips and Users - 行程和用户 <Hard>
- 从 Paxos 到 Raft,分布式一致性算法解析
- Linux--shell--4--毫秒级时间戳