1.7、flume案例二

案例需求:

在数据采集之后,通过flume的拦截器,实现不需要的数据过滤掉,并将指定的第一个字段进行加密,加密之后再往hdfs上面保存

原始数据与处理之后的数据对比

图一  原始文件内容

图二  HDFS上产生收集到的处理数据

实现步骤

第一步:创建maven java工程,导入jar包

<repositories>
    <repository>
        <id>cloudera</id>
 <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
</repositories>
<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.6.0-cdh5.14.0</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

第二步:自定义flume的拦截器

package cn.itcast.iterceptor;
import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static cn.itcast.iterceptor.CustomParameterInterceptor.Constants.*;

public class CustomParameterInterceptor implements Interceptor {
    /** The field_separator.指明每一行字段的分隔符 */
   
private final String fields_separator;

/** The indexs.通过分隔符分割后,指明需要那列的字段 下标*/
   
private final String indexs;

/** The indexs_separator. 多个下标的分隔符*/
   
private final String indexs_separator;

/**
     *
     * @param indexs
    
* @param indexs_separator
    
*/
   
public CustomParameterInterceptor( String fields_separator,
                                       String indexs, String indexs_separator,String encrypted_field_index) {
        String f = fields_separator.trim();
        String i = indexs_separator.trim();
        this.indexs = indexs;
        this.encrypted_field_index=encrypted_field_index.trim();
        if (!f.equals("")) {
            f = UnicodeToString(f);
        }
        this.fields_separator =f;
        if (!i.equals("")) {
            i = UnicodeToString(i);
        }
        this.indexs_separator = i;
    }

/*
     *
     * \t
制表符 ('\u0009') \n 新行(换行)符 (' ') \r 回车符 (' ') \f 换页符 ('\u000C') \a 报警
     * (bell)
符 ('\u0007') \e 转义符 ('\u001B') \cx  空格(\u0020)对应于 x 的控制符
     *
     * @param str
     * @return
     * @data:2015-6-30
     */

/** The encrypted_field_index. 需要加密的字段下标*/
   
private final String encrypted_field_index;
    public static String UnicodeToString(String str) {
        Pattern pattern = Pattern.compile("(\\\\u(\\p{XDigit}{4}))");
        Matcher matcher = pattern.matcher(str);
        char ch;
        while (matcher.find()) {
            ch = (char) Integer.parseInt(matcher.group(2), 16);
            str = str.replace(matcher.group(1), ch + "");
        }
        return str;
    }

/*
     * @see org.apache.flume.interceptor.Interceptor#intercept(org.apache.flume.Event)
     *
单个event拦截逻辑
     */
   
public Event intercept(Event event) {
        if (event == null) {
            return null;
        }
        try {
            String line = new String(event.getBody(), Charsets.UTF_8);
            String[] fields_spilts = line.split(fields_separator);
            String[] indexs_split = indexs.split(indexs_separator);
            String newLine="";
            for (int i = 0; i < indexs_split.length; i++) {
                int parseInt = Integer.parseInt(indexs_split[i]);
                //对加密字段进行加密
               
if(!"".equals(encrypted_field_index)&&encrypted_field_index.equals(indexs_split[i])){
                    newLine+=StringUtils.GetMD5Code(fields_spilts[parseInt]);
                }else{
                    newLine+=fields_spilts[parseInt];
                }

if(i!=indexs_split.length-1){
                    newLine+=fields_separator;
                }
            }
            event.setBody(newLine.getBytes(Charsets.UTF_8));
            return event;
        } catch (Exception e) {
            return event;
        }
    }

/*
     * @see org.apache.flume.interceptor.Interceptor#intercept(java.util.List)
     *
批量event拦截逻辑
     */
   
public List<Event> intercept(List<Event> events) {
        List<Event> out = new ArrayList<Event>();
        for (Event event : events) {
            Event outEvent = intercept(event);
            if (outEvent != null) {
                out.add(outEvent);
            }
        }
        return out;
    }

/*
     * @see org.apache.flume.interceptor.Interceptor#initialize()
     */
   
public void initialize() {
        // TODO Auto-generated method stub

}

/*
     * @see org.apache.flume.interceptor.Interceptor#close()
     */
   
public void close() {
        // TODO Auto-generated method stub

}

/**
     *
相当于自定义Interceptor的工厂类
     *
在flume采集配置文件中通过制定该Builder来创建Interceptor对象
     *
可以在Builder中获取、解析flume采集配置文件中的拦截器Interceptor的自定义参数:
     *
字段分隔符,字段下标,下标分隔符、加密字段下标 ...
     * @author
    
*
     */
   
public static class Builder implements Interceptor.Builder {

/** The fields_separator.指明每一行字段的分隔符 */
       
private  String fields_separator;

/** The indexs.通过分隔符分割后,指明需要那列的字段 下标*/
       
private  String indexs;

/** The indexs_separator. 多个下标下标的分隔符*/
       
private  String indexs_separator;

/** The encrypted_field. 需要加密的字段下标*/
       
private  String encrypted_field_index;
        /*
         * @see org.apache.flume.conf.Configurable#configure(org.apache.flume.Context)
         */
       
public void configure(Context context) {
            fields_separator = context.getString(FIELD_SEPARATOR, DEFAULT_FIELD_SEPARATOR);
            indexs = context.getString(INDEXS, DEFAULT_INDEXS);
            indexs_separator = context.getString(INDEXS_SEPARATOR, DEFAULT_INDEXS_SEPARATOR);
            encrypted_field_index= context.getString(ENCRYPTED_FIELD_INDEX, DEFAULT_ENCRYPTED_FIELD_INDEX);
        }
        /*
         * @see org.apache.flume.interceptor.Interceptor.Builder#build()
         */
       
public Interceptor build() {
            return new CustomParameterInterceptor(fields_separator, indexs, indexs_separator,encrypted_field_index);
        }
    }
    /**
     *
常量
     *
     */
   
public static class Constants {
        /** The Constant FIELD_SEPARATOR. */
        
public static final String FIELD_SEPARATOR = "fields_separator";

/** The Constant DEFAULT_FIELD_SEPARATOR. */
       
public static final String DEFAULT_FIELD_SEPARATOR =" ";

/** The Constant INDEXS. */
       
public static final String INDEXS = "indexs";

/** The Constant DEFAULT_INDEXS. */
       
public static final String DEFAULT_INDEXS = "0";

/** The Constant INDEXS_SEPARATOR. */
       
public static final String INDEXS_SEPARATOR = "indexs_separator";

/** The Constant DEFAULT_INDEXS_SEPARATOR. */
       
public static final String DEFAULT_INDEXS_SEPARATOR = ",";

/** The Constant ENCRYPTED_FIELD_INDEX. */
       
public static final String ENCRYPTED_FIELD_INDEX = "encrypted_field_index";

/** The Constant DEFAUL_TENCRYPTED_FIELD_INDEX. */
       
public static final String DEFAULT_ENCRYPTED_FIELD_INDEX = "";

/** The Constant PROCESSTIME. */
       
public static final String PROCESSTIME = "processTime";
        /** The Constant PROCESSTIME. */
       
public static final String DEFAULT_PROCESSTIME = "a";

}
    /**
     *
工具类:字符串md5加密
     */
   
public static class StringUtils {
        // 全局数组
       
private final static String[] strDigits = { "0", "1", "2", "3", "4", "5",
                "6", "7", "8", "9", "a", "b", "c", "d", "e", "f" };
        // 返回形式为数字跟字符串
       
private static String byteToArrayString(byte bByte) {
            int iRet = bByte;
            // System.out.println("iRet="+iRet);
           
if (iRet < 0) {
                iRet += 256;
            }
            int iD1 = iRet / 16;
            int iD2 = iRet % 16;
            return strDigits[iD1] + strDigits[iD2];
        }

// 返回形式只为数字
       
private static String byteToNum(byte bByte) {
            int iRet = bByte;
            System.out.println("iRet1=" + iRet);
            if (iRet < 0) {
                iRet += 256;
            }
            return String.valueOf(iRet);
        }

// 转换字节数组为16进制字串
       
private static String byteToString(byte[] bByte) {
            StringBuffer sBuffer = new StringBuffer();
            for (int i = 0; i < bByte.length; i++) {
                sBuffer.append(byteToArrayString(bByte[i]));
            }
            return sBuffer.toString();
        }

public static String GetMD5Code(String strObj) {
            String resultString = null;
            try {
                resultString = new String(strObj);
                MessageDigest md = MessageDigest.getInstance("MD5");
                // md.digest() 该函数返回值为存放哈希值结果的byte数组
               
resultString = byteToString(md.digest(strObj.getBytes()));
            } catch (NoSuchAlgorithmException ex) {
                ex.printStackTrace();
            }
            return resultString;
        }
    }

}

第三步:打包上传服务器

将我们的拦截器打成jar包放到flume的lib目录下

第四步:开发flume的配置文件

第三台机器开发flume的配置文件

cd  /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf

vim spool-interceptor-hdfs.conf

a1.channels = c1

a1.sources = r1

a1.sinks = s1

#channel

a1.channels.c1.type = memory

a1.channels.c1.capacity=100000

a1.channels.c1.transactionCapacity=50000

#source

a1.sources.r1.channels = c1

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /export/servers/intercept

a1.sources.r1.batchSize= 50

a1.sources.r1.inputCharset = UTF-8

a1.sources.r1.interceptors =i1 i2

a1.sources.r1.interceptors.i1.type =cn.itcast.iterceptor.CustomParameterInterceptor$Builder

a1.sources.r1.interceptors.i1.fields_separator=\\u0009

a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6

a1.sources.r1.interceptors.i1.indexs_separator =\\u002c

a1.sources.r1.interceptors.i1.encrypted_field_index =0

a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

#sink

a1.sinks.s1.channel = c1

a1.sinks.s1.type = hdfs

a1.sinks.s1.hdfs.path =hdfs://192.168.52.100:8020/flume/intercept/%Y%m%d

a1.sinks.s1.hdfs.filePrefix = event

a1.sinks.s1.hdfs.fileSuffix = .log

a1.sinks.s1.hdfs.rollSize = 10485760

a1.sinks.s1.hdfs.rollInterval =20

a1.sinks.s1.hdfs.rollCount = 0

a1.sinks.s1.hdfs.batchSize = 1500

a1.sinks.s1.hdfs.round = true

a1.sinks.s1.hdfs.roundUnit = minute

a1.sinks.s1.hdfs.threadsPoolSize = 25

a1.sinks.s1.hdfs.useLocalTimeStamp = true

a1.sinks.s1.hdfs.minBlockReplicas = 1

a1.sinks.s1.hdfs.fileType =DataStream

a1.sinks.s1.hdfs.writeFormat = Text

a1.sinks.s1.hdfs.callTimeout = 60000

a1.sinks.s1.hdfs.idleTimeout =60

第五步:上传测试数据

上传我们的测试数据到/export/servers/intercept 这个目录下面去,如果目录不存在则创建

mkdir  -p /export/servers/intercept

测试数据如下

13601249301 100    200   300   400   500   600   700

13601249302 100    200   300   400   500   600   700

13601249303 100    200   300   400   500   600   700

13601249304 100    200   300   400   500   600   700

13601249305 100    200   300   400   500   600   700

13601249306 100    200   300   400   500   600   700

13601249307 100    200   300   400   500   600   700

13601249308 100    200   300   400   500   600   700

13601249309 100    200   300   400   500   600   700

13601249310 100    200   300   400   500   600   700

13601249311 100    200   300   400   500   600   700

13601249312 100    200   300   400   500   600   700

第六步:启动flume

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin

bin/flume-ng agent -c conf -f conf/spool-interceptor-hdfs.conf -name a1 -Dflume.root.logger=DEBUG,console

小结:一般不在flume上进行数据处理。数据的处理都在MR上进行,flume主要就是数据的收集。

转载于:https://www.cnblogs.com/mediocreWorld/p/11081958.html

第1节 flume:15、flume案例二,通过自定义拦截器实现数据的脱敏相关推荐

  1. 【Flume】(四)IDEA编写自定义拦截器

    IDEA编写自定义拦截器 IDEA中导入以下依赖: <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core ...

  2. Hadoop生态圈-Flume的组件之自定义拦截器(interceptor)

    Hadoop生态圈-Flume的组件之自定义拦截器(interceptor) 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 本篇博客只是举例了一个自定义拦截器的方法,测试字节传输速 ...

  3. Flume的开发之 自定义 source 自定义 sink 自定义拦截器

    一:开发相关 加入 jar 包依赖: <dependency> <groupId>org.apache.flume</groupId> <artifactId ...

  4. Flume四:多路复用(ChannelSelector之Multiplexing)+自定义拦截器

    案例: 自定义拦截器 pom.xml <dependency><groupId>org.apache.flume</groupId><artifactId&g ...

  5. flume拦截器及自定义拦截器

    拦截器做什么呢? 时间拦截器 以时间拦截器为例.会在Event的header中添加一个属性进去,属性的key叫做timestamp, value是当前的毫秒值. 问题是写到header然后呢?有啥用呢 ...

  6. flume自定义拦截器开发步骤

    步骤如下: 1.新建一个java项目,不需要依赖spring等一系列依赖.只需要加上你用的 工具类的依赖.flume的依赖不用加,因为服务器里面有. 2.实现Interceptor接口,重写里面的in ...

  7. springMVC02-SSM整合(Result统一响应数据格式、异常页面修改、SSM整合vue-elementUI小案例、SpringMVC的拦截器Interceptor)

    文章目录 今日内容 一.SSM整合[重点] 1 SSM整合配置 问题导入 1.1 SSM整合流程 1.2 SSM整合配置 1.2.1 创建工程,添加依赖和插件 1.2.2 Spring整合Mybati ...

  8. Flume自定义拦截器

    需求 定义两个拦截器,一个用于过滤不合法数据,一个用于区分日志类型. ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志. 日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便 ...

  9. unity烘培单个物体_Unity可编程渲染管线(SRP)教程:二、自定义着色器

    本文翻译自Catlike Coding,原作者:Jasper Flick. 本文经原作者授权,转载请说明出处. 原文链接在下: https://catlikecoding.com/unity/tuto ...

最新文章

  1. 初等数论四大基本定理
  2. linux下自定义dubbo的shell脚本
  3. linux操作系统 抢占式,Linux操作系统内核抢占补丁的基本原理(2)
  4. 赛门铁克:靠近数据源的重复数据删除
  5. Day26:configparser、subprocess模块
  6. spring boot 集成 Oracle Access Manager(OAM)单点登录
  7. 孙宇晨:BM跑路或是觉得做项目是一种负担
  8. ADO.NET Entity Framework 学习(1)
  9. 关于System.identityHashCode(obj) 与 obj.hashcode()
  10. java常见类型的转化以及风险
  11. Java虚拟机内存管理
  12. 串口和并口和网口区别
  13. HTML5轮播图全代码
  14. 人工智能标记语言AIML聊天机器人:产生、种类、应用、实例、AIML概述、知识库、公司、业界(20k字经典收藏版)
  15. oracle关系数据库概述
  16. 如何提高软件测试能力的19条建议,希望对你有用
  17. 星起航:短视频营销之场景化
  18. 表格维护生成器-部分字段不能修改或不能看见
  19. 微信图片服务器逻辑,微信小程序[第八篇] -- 实现完整的相册列表逻辑(小程序端服务器端)...
  20. 在 github 建立blg

热门文章

  1. system verilog中的参数传递——ref,input,output
  2. php mysql 简单,你想不到的最简单php操作MySQL
  3. html盒子优先级设置,CSS 基础(盒模型、选择器、权重、优先级)
  4. 微型计算机d3000,13级仪表微机重点教程.doc
  5. 夹娃娃_夹娃娃的实用性方法 抓娃娃可以用什么技巧
  6. 《Linux Shell脚本攻略》读书笔记第三章 以文件之名
  7. android 加载json停顿,java – 在Android上解析~1 MB JSON非常慢
  8. php ngx_http_auth_basic_module,nginx认证模块ngx_http_auth_basic_module
  9. 网管师职业认证网上辅导班开课前的调查
  10. linux内核分支,新闻|Linux 内核分支 2.4 版结束生命周期