Hadoop生态圈-Flume的组件之自定义拦截器(interceptor)

                                              作者:尹正杰

版权声明:原创作品,谢绝转载!否则将追究法律责任。

  本篇博客只是举例了一个自定义拦截器的方法,测试字节传输速度。

1>.自定义interceptor方法

  1 /*
  2 @author :yinzhengjie
  3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E7%94%9F%E6%80%81%E5%9C%88/
  4 EMAIL:y1053419035@qq.com
  5 */
  6 package cn.org.yinzhengjie.interceptor;
  7
  8 import org.apache.flume.Context;
  9 import org.apache.flume.Event;
 10 import org.apache.flume.interceptor.Interceptor;
 11
 12 import java.util.List;
 13
 14 /**
 15  * 设置限速拦截器
 16  * <p>
 17  * 当 字节/时间,即同一时刻,如果进入的字节过多
 18  * 则休眠一会
 19  */
 20 public class MyInterceptor implements Interceptor {
 21
 22     private int speed;
 23
 24
 25     //构造
 26     private MyInterceptor(int speed) {
 27         this.speed = speed;
 28     }
 29
 30
 31     //do nothing
 32     public void initialize() {
 33
 34     }
 35
 36     /**
 37      * 1、拿出上一个event的时间,和当前时间进行相减,得出上一个event的时间间隔
 38      * 2、得到上一个event的body字节数
 39      * 3、相除得到上一个event的速度,并在此event中先进行停留,再返回event
 40      *
 41      * @param event
 42      * @return
 43      */
 44
 45     long lastTime = -1;
 46     long lastBodySize = 0;
 47
 48     public Event intercept(Event event) {
 49
 50
 51         byte[] body = event.getBody();
 52         int len = body.length;
 53
 54
 55         long current = System.nanoTime();
 56
 57         //第一个event
 58         if (lastTime == -1) {
 59             lastTime = current;
 60             lastBodySize = len;
 61         }
 62
 63         //非第一个event
 64         else {
 65             //计算上一个event停留的时间
 66             long interval = current - lastTime;
 67             System.out.println("=========================" + current + "/" + lastTime + "/" + interval + "=========================");
 68             //上一个event的速度
 69             int now_speed = (int) ((double) lastBodySize / interval * 1000);
 70             if (now_speed > speed) {
 71                 System.out.println("=========================" + now_speed + "=========================");
 72                 //计算需要停留多少秒 线程休眠,时间 = shouldTime - interval
 73                 try {
 74                     Thread.sleep((lastBodySize / speed) * 1000 - interval);
 75                 } catch (InterruptedException e) {
 76                     e.printStackTrace();
 77                 }
 78             }
 79             lastBodySize = len;
 80             lastTime = System.currentTimeMillis();
 81
 82         }
 83         return event;
 84
 85     }
 86
 87     //迭代List<Event>,将所有Event交给intercept(Event)进行处理
 88     public List<Event> intercept(List<Event> events) {
 89         for (Event event : events) {
 90             intercept(event);
 91         }
 92         return events;
 93     }
 94
 95     //do nothing
 96     public void close() {
 97
 98     }
 99
100     public static class Builder implements Interceptor.Builder {
101
102         private int speed;
103
104         public void configure(Context context) {
105             speed = context.getInteger(Constants.SPEED, Constants.DEFAULT_SPEED);
106
107         }
108
109         public Interceptor build() {
110             return new MyInterceptor(speed);
111         }
112     }
113
114     public static class Constants {
115         public static String SPEED = "speed";
116         public static int DEFAULT_SPEED = 1;
117
118     }
119 }

2>.打包并将其发送到 /soft/flume/lib下

[yinzhengjie@s101 ~]$ cd /soft/flume/lib/
[yinzhengjie@s101 lib]$
[yinzhengjie@s101 lib]$ ll | grep MyFlume
-rw-r--r--  1 yinzhengjie yinzhengjie    5231 Jun 20 18:53 MyFlume-1.0-SNAPSHOT.jar
[yinzhengjie@s101 lib]$
[yinzhengjie@s101 lib]$ rm -rf MyFlume-1.0-SNAPSHOT.jar
[yinzhengjie@s101 lib]$
[yinzhengjie@s101 lib]$ rz[yinzhengjie@s101 lib]$
[yinzhengjie@s101 lib]$ ll | grep MyFlume
-rw-r--r--  1 yinzhengjie yinzhengjie    8667 Jun 20 21:02 MyFlume-1.0-SNAPSHOT.jar
[yinzhengjie@s101 lib]$
[yinzhengjie@s101 lib]$ 

3>.编写agent的配置文件

[yinzhengjie@s101 ~]$ more /soft/flume/conf/yinzhengjie_myInterceptor.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 定义源: seq
a1.sources.r1.type = seq
# 定义一次RPC产生的批次数量
a1.sources.r1.batchSize = 1024# 指定添加拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cn.org.yinzhengjie.interceptor.MyInterceptor$Builder
a1.sources.r1.interceptors.i1.speed = 1# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[yinzhengjie@s101 ~]$ 

4>.启动flume并测试

[yinzhengjie@s101 ~]$ flume-ng agent -f /soft/flume/conf/yinzhengjie_myInterceptor.conf -n a1

  下图是运行agent部分的输出内容

转载于:https://www.cnblogs.com/yinzhengjie/p/9208268.html

Hadoop生态圈-Flume的组件之自定义拦截器(interceptor)相关推荐

  1. Hadoop生态圈-Flume的组件之sink处理器

    Hadoop生态圈-Flume的组件之sink处理器 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 一. 二. 转载于:https://www.cnblogs.com/yinzhe ...

  2. 【Spring MVC】自定义拦截器 Interceptor

    一.自定义拦截器 1.拦截器,跟过滤器比较像的技术. 1.1 拦截器和过滤器的区别: 拦截器只能拦Controller,Filter可以拦任何请求 因为Spring的入口是dispatcherServ ...

  3. 自定义拦截器Interceptor

    自定义拦截器 import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse;i ...

  4. 第1节 flume:15、flume案例二,通过自定义拦截器实现数据的脱敏

    1.7.flume案例二 案例需求: 在数据采集之后,通过flume的拦截器,实现不需要的数据过滤掉,并将指定的第一个字段进行加密,加密之后再往hdfs上面保存 原始数据与处理之后的数据对比 图一  ...

  5. struts2自定义拦截器一——模拟登陆权限验证

    1.http://localhost:8083/struts2/user.jsp 表示用户已登陆,存放session对象 2.http://localhost:8083/struts2/quit.js ...

  6. 使用mybatis plus自定义拦截器,实现数据权限

    需求 为了增强程序的安全性,需要在用户访问数据库的时候进行权限判断后选择性进行判断是否需要增强sql,来达到限制低级别权限用户访问数据的目的. 根据业务需要,这里将角色按照数据范围做权限限定.比如,角 ...

  7. Flume的开发之 自定义 source 自定义 sink 自定义拦截器

    一:开发相关 加入 jar 包依赖: <dependency> <groupId>org.apache.flume</groupId> <artifactId ...

  8. flume拦截器及自定义拦截器

    拦截器做什么呢? 时间拦截器 以时间拦截器为例.会在Event的header中添加一个属性进去,属性的key叫做timestamp, value是当前的毫秒值. 问题是写到header然后呢?有啥用呢 ...

  9. flume自定义拦截器开发步骤

    步骤如下: 1.新建一个java项目,不需要依赖spring等一系列依赖.只需要加上你用的 工具类的依赖.flume的依赖不用加,因为服务器里面有. 2.实现Interceptor接口,重写里面的in ...

最新文章

  1. Django 无法添加新字段,django.db.utils.OperationalError: (1050, Table app already exists)
  2. pmp学习资料_南昌如何选择PMP报考条件-海外人才交流协会
  3. iris流程图_如何搭建一个简单的机器学习流水线?
  4. NOVO Nordisk IT SOP List
  5. SAP Spartacus 服务器端渲染处理内存泄漏的准则
  6. 【Android笔记】如何创建列表视图3
  7. C++ opengl 使视野转头移动(站桩看世界)
  8. 02-通过修改映射在本地模拟Eureka集群
  9. 树桩数组求逆序数+离散化(数太大)
  10. 孙鑫-MFC笔记七--文件与注册表
  11. Kotlin — 适用于原生
  12. 系统升级时,数据库脚本执行注意事项,血的教训
  13. 摘抄:不注明来源,就是违反契约
  14. 上海中环C位出道,自动驾驶天团横「扫」北上广
  15. vue添加定位功能_vue 实现Web端的定位功能 获取经纬度
  16. 今天母亲节,作为程序员,我是这样表达母爱的……
  17. 任天堂游戏 html5,任天堂Switch游戏销量排行Top40,赶快收藏跟着买就对啦!
  18. The operator ‘SUBTRACT‘ is not supported between objects of type ‘null‘ and ‘java.lang.Integer‘
  19. dva处理_关于dva框架的二三事
  20. Linux 安装Kerberos认证KDC服务

热门文章

  1. ansible批量修改linux服务器密码的playbook
  2. struts2提交list
  3. 使用正则表达式抽取新闻/BBS网页发表时间
  4. MVC3+EF4.1学习系列(一)-------创建EF4.1 code first的第一个实例(强转)
  5. 【Android游戏开发之七】(游戏开发中需要的样式)再次剖析游戏开发中对SurfaceView中添加组件方案!...
  6. 使用SQL Server维护计划实现数据库定时自动备份
  7. [原创] 如何追踪每一笔记录的来龙去脉:一个完整的Audit Logging解决方案—Part I...
  8. Data Artisans发布支持ACID事务的流式处理框架Streaming Ledger
  9. 生活总是在推着你一步一步往前走
  10. 数组、字符串对象、Math对象