1.引入POM文件

如果想调用Flume,需要引入flume相关的jar包依赖,jar包依赖如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>cn.com.toto.stormlogPro</artifactId><groupId>stormlogPro</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>cn.com.toto.flume</artifactId><dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.6.0</version><!-- 设置打包的时候,剔除依赖--><scope>provided</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>cn.com.toto.stromlogpro.log4j.LogInfoBuilder</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.7</source><target>1.7</target></configuration></plugin></plugins></build>
</project>

2.自定义的拦截器的代码

package cn.com.toto.stromlogpro.flume;import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;/*** 自定义一个点击流收集的拦截器* * 1、实现一个Interceptor.Builder接口。* 2、Interceptor.Builder中有个configuref方法,通过configure获取配置文件中的相应key。* 3、Interceptor.Builder中有个builder方法,通过builder创建一个自定义的AppInterceptor* 4、AppInterceptor中有两个方法,一个是批处理,一个单条处理,将批处理的逻辑转换为单条处理* 5、需要在单条数据中添加 appid,由于appid是变量。需要在AppInterceptor的构造器中传入一些参数。* 6、为自定义的AppInterceptor创建有参构造器,将需要的参数传入进来。** @author tuzq* @create 2017-06-25 12:48*/
public class AppInterceptor implements Interceptor{//4.定义成员变量appId,用来接收从配置文件中读取的信息private String appId;public AppInterceptor(String appId) {this.appId = appId;}/*** 单条数据进行处理,通过这个方式为日志添加上系统id* @param event* @return*/@Overridepublic Event intercept(Event event) {String message = null;try {message = new String(event.getBody(), "utf-8");} catch (UnsupportedEncodingException e) {message = new String(event.getBody());}//处理逻辑if (StringUtils.isNotBlank(message)) {message = "aid:"+appId+"||msg:" +message;event.setBody(message.getBytes());//正常逻辑应该执行到这里return event;}return event;}/*** 批量数据进行处理* @param list* @return*/@Overridepublic List<Event> intercept(List<Event> list) {List<Event> resultList = new ArrayList<Event>();for (Event event : list) {Event r = intercept(event);if (r != null) {resultList.add(r);}}return resultList;}@Overridepublic void initialize() {}@Overridepublic void close() {}public static  class AppInterceptorBuilder implements Interceptor.Builder{//1、获取配置文件的appIdprivate String appId;@Overridepublic Interceptor build() {//3、构造拦截器return new AppInterceptor(appId);}@Overridepublic void configure(Context context) {//2、当出现default之后,就是点击流告警系统this.appId =  context.getString("appId","default");System.out.println("appId:"+appId);}}
}

LogInfoBuilder的代码如下:

package cn.com.toto.stromlogpro.log4j;import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.logging.Logger;/*** 通过这个工程模拟创建日志内容** @author tuzq* @create 2017-06-25 13:51*/
public class LogInfoBuilder {private final static Logger logger = Logger.getLogger("msg");public static void main(String[] args) {Random random = new Random();List<String> list = logInfoList();while(true) {logger.info(list.get(random.nextInt(list.size())));}}private static List<String> logInfoList() {List list = new ArrayList<String>();list.add("aid:1||msg:error: Caused by: java.lang.NoClassDefFoundError: com/starit/gejie/dao/SysNameDao");list.add("java.sql.SQLException: You have an error in your SQL syntax;");list.add("error Unable to connect to any of the specified MySQL hosts.");list.add("error:Servlet.service() for servlet action threw exception java.lang.NullPointerException");list.add("error:Exception in thread main java.lang.ArrayIndexOutOfBoundsException: 2");list.add("error:NoSuchMethodError: com/starit/.");list.add("error:java.lang.NoClassDefFoundError: org/coffeesweet/test01/Test01");list.add("error:java.lang.NoClassDefFoundError: org/coffeesweet/test01/Test01");list.add("error:Java.lang.IllegalStateException");list.add("error:Java.lang.IllegalMonitorStateException");list.add("error:Java.lang.NegativeArraySizeException");list.add("error:java.sql.SQLException: You have an error in your SQL syntax;");list.add("error:Java.lang.TypeNotPresentException ");list.add("error:Java.lang.UnsupprotedOperationException ");list.add("error Java.lang.IndexOutOfBoundsException");list.add("error Java.lang.ClassNotFoundException");list.add("error java.lang.ExceptionInInitializerError ");list.add("error:java.lang.IncompatibleClassChangeError ");list.add("error:java.lang.LinkageError ");list.add("error:java.lang.OutOfMemoryError ");list.add("error java.lang.StackOverflowError");list.add("error: java.lang.UnsupportedClassVersionError");list.add("error java.lang.ClassCastException");list.add("error: java.lang.CloneNotSupportedException");list.add("error: java.lang.EnumConstantNotPresentException ");list.add("error java.lang.IllegalMonitorStateException ");list.add("error java.lang.IllegalStateException ");list.add("error java.lang.IndexOutOfBoundsException ");list.add("error java.lang.NumberFormatException ");list.add("error java.lang.RuntimeException ");list.add("error java.lang.TypeNotPresentException ");list.add("error MetaSpout.java:9: variable i might not have been initialized");list.add("error MyEvaluator.java:1: class Test1 is public, should be declared in a file named Test1.java ");list.add("error Main.java:5: cannot find symbol ");list.add("error NoClassDefFoundError: asa wrong name: ASA ");list.add("error Test1.java:54: 'void' type not allowed here");list.add("error Test5.java:8: missing return statement");list.add("error:Next.java:66: cannot find symbol ");list.add("error symbol  : method createTempFile(java.lang.String,java.lang.String,java.lang.String) ");list.add("error invalid method declaration; return type required");list.add("error array required, but java.lang.String found");list.add("error Exception in thread main java.lang.NumberFormatException: null 20. .");list.add("error non-static method cannot be referenced from a static context");list.add("error Main.java:5: non-static method fun1() cannot be referenced from a static context");list.add("error continue outside of  loop");list.add("error MyAbstract.java:6: missing method body, or declare abstract");list.add("error Main.java:6: Myabstract is abstract; cannot be instantiated");list.add("error MyInterface.java:2: interface methods cannot have body ");list.add("error Myabstract is abstract; cannot be instantiated");list.add("error asa.java:3: modifier static not allowed here");list.add("error possible loss of precision  found: long required:byte  var=varlong");list.add("error  java.lang.NegativeArraySizeException ");list.add("error java.lang.ArithmeticException:  by zero");list.add("error java.lang.ArithmeticException");list.add("error java.lang.ArrayIndexOutOfBoundsException");list.add("error java.lang.ClassNotFoundException");list.add("error java.lang.IllegalArgumentException");list.add("error fatal error C1010: unexpected end of file while looking for precompiled header directive");list.add("error fatal error C1083: Cannot open include file: R…….h: No such file or directory");list.add("error C2011:C……clas type redefinition");list.add("error C2018: unknown character 0xa3");list.add("error C2057: expected constant expression");list.add("error C2065: IDD_MYDIALOG : undeclared identifier IDD_MYDIALOG");list.add("error C2082: redefinition of formal parameter bReset");list.add("error C2143: syntax error: missing : before  ");list.add("error C2146: syntax error : missing ';' before identifier dc");list.add("error C2196: case value '69' already used");list.add("error C2509: 'OnTimer' : member function not declared in 'CHelloView'");list.add("error C2555: 'B::f1': overriding virtual function differs from 'A::f1' only by return type or calling convention");list.add("error C2511: 'reset': overloaded member function 'void (int)' not found in 'B'");list.add("error C2660: 'SetTimer' : function does not take 2 parameters");list.add("error warning C4035: 'f……': no return value");list.add("error warning C4553: '= =' : operator has no effect; did you intend '='");list.add("error C4716: 'CMyApp::InitInstance' : must return a value");list.add("error LINK : fatal error LNK1168: cannot open Debug/P1.exe for writing");list.add("error LNK2001: unresolved external symbol public: virtual _ _thiscall C (void)");list.add("error java.lang.IllegalArgumentException: Path index.jsp does not start with");list.add("error org.apache.struts.action.ActionServlet.process(ActionServlet.java:148");list.add("error org.apache.jasper.JasperException: Exception in JSP");list.add("error The server encountered an internal error () that prevented it from fulfilling this request");list.add("error org.apache.jasper.servlet.JspServletWrapper.handleJspException(JspServletWrapper.java:467");list.add("error javax.servlet.http.HttpServlet.service(HttpServlet.java:803)");list.add("error javax.servlet.jsp.JspException: Cannot find message resources under key org.apache.struts.action.MESSAGE");list.add("error Stacktrace:  org.apache.jasper.servlet.JspServletWrapper.handleJspException(JspServletWrapper.java:467)");list.add("error javax.servlet.ServletException: Cannot find bean org.apache.struts.taglib.html.BEAN in any scope");list.add("error no data found");list.add("error exception in thread main org.hibernate.MappingException: Unknown entity:.");list.add("error using namespace std;");list.add("error C2065: 'cout' : undeclared identifier");list.add("error main already defined in aaa.obj");list.add("error syntax error : missing ';' before '}'");list.add("error cout : undeclared identifier");list.add("error weblogic.servlet.internal.WebAppServletContext$ServletInvocationAction.run(WebAp ");list.add("error Caused by: java.lang.reflect.InvocationTargetException");list.add("error Caused by: java.lang.NoClassDefFoundError: com/starit/gejie/dao/SysNameDao");list.add("error at com.starit.gejie.Util.Trans.BL_getSysNamesByType(Trans.java:220)");return list;}
}

MyDailyRollingFileAppender的代码如下:

package cn.com.toto.stromlogpro.log4j;/*** Created by toto on 2017/6/25.*/import org.apache.log4j.DailyRollingFileAppender;
import org.apache.log4j.Priority;/*** @author tuzq* @create 2017-06-25 13:58*/
public class MyDailyRollingFileAppender extends DailyRollingFileAppender {@Overridepublic boolean isAsSevereAsThreshold(Priority priority) {return getThreshold().equals(priority);}
}

MyRollingFileAppender的代码如下:

package cn.com.toto.stromlogpro.log4j;/*** Created by toto on 2017/6/25.*/import org.apache.log4j.Priority;
import org.apache.log4j.RollingFileAppender;/*** @author tuzq* @create 2017-06-25 14:01*/
public class MyRollingFileAppender extends RollingFileAppender {@Overridepublic boolean isAsSevereAsThreshold(Priority priority) {return getThreshold().equals(priority);}
}

3.在Flume中的conf配置文件,并将收集的日志下沉到kafka中

a1.sources = r1
a1.channels = c1
a1.sinks = k1a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/data/flume_sources/click_log/info.log
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cn.com.toto.stromlogpro.flume.AppInterceptor$AppInterceptorBuilder
#通过这个参数向自定义的Flume拦截器中传递参数(即系统编号)
a1.sources.r1.interceptors.i1.appId = 1a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = log_monitor
a1.sinks.k1.brokerList = hadoop1:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1

自定义Flume拦截器,并将收集的日志存储到Kafka中(案例)相关推荐

  1. Flume拦截器实战案例

    日志的采集和汇总 案例场景 A.B两台日志服务机器实时生产日志主要类型为access.log.nginx.log.web.log 现在要求: 把A.B 机器中的access.log.nginx.log ...

  2. springboot整合shiro和session的详细过程和自定义登录拦截器

    文章目录 1.shiro依赖 2.shiro配置 shiro过滤器配置: 关联自定义的其他管理器 自定义会话工厂: 3.登陆时记录用户信息 4.shiro一些工具类的学习 5.自定义登录拦截器 shi ...

  3. Mybatis自定义SQL拦截器

    本博客介绍的是继承Mybatis提供的Interface接口,自定义拦截器,然后将项目中的sql拦截一下,打印到控制台. 先自定义一个拦截器 package com.muses.taoshop.com ...

  4. mysql拦截器实现crud_Mybatis自定义SQL拦截器

    本博客介绍的是继承Mybatis提供的Interface接口,自定义拦截器,然后将项目中的sql拦截一下,打印到控制台. 先自定义一个拦截器 package com.muses.taoshop.com ...

  5. 自定义注解+拦截器优化项目代码

    自定义注解+拦截器的优势 类似用户权限或者接口限流的需求,但并不是所有操作或者接口需要.可以使用过滤器或者拦截器,但这样就必须在配置文件里加上所有方法或者使用通配符. 所以可以采用一种比较简单灵活的方 ...

  6. springboot 自定义注解拦截器

    springboot 自定义注解拦截器 最近在工作中,发现自定义注解拦截使用起来特别方便,现在来写出来给大家看看 环境springboot 首先写一个自定义注解 package com.study.c ...

  7. Springboot 自定义mybatis 拦截器,实现我们要的扩展

    前言 相信大家对拦截器并不陌生,对mybatis也不陌生. 有用过pagehelper的,那么对mybatis拦截器也不陌生了,按照使用的规则触发sql拦截,帮我们自动添加分页参数 . 那么今天,我们 ...

  8. 【WebService】CXF拦截器的设置以及自定义CXF拦截器

    欢迎关注我新搭建的博客:http://www.itcodai.com/ WebService系列文章: [WebService]带你走进webservice的世界 [WebService]自定义Web ...

  9. flume拦截器及自定义拦截器

    拦截器做什么呢? 时间拦截器 以时间拦截器为例.会在Event的header中添加一个属性进去,属性的key叫做timestamp, value是当前的毫秒值. 问题是写到header然后呢?有啥用呢 ...

最新文章

  1. hook修改信息_React系列二十一 Hook(二)高级使用
  2. matlab神经网络的简单程序设计,BP神经网络设计的matlab简单实现
  3. 网站的PV、UV、的统计 (awk 统计命令)
  4. mybatis 知识1
  5. ab压力测试_Apache ab压力测试的知识点
  6. WinForm 修改App.config不起作用(但是调试没有异常)
  7. 谈判失败:Oracle杀死Java EE
  8. Jenkins进阶-Gitlab使用Webhook实现Push代码自动部署(3)
  9. 如何杀死一个已经detached的screen会话?
  10. Android ViewGroup
  11. 组合数性质--二项式系数之和等于2^n的证明
  12. egret 图文并排
  13. 微机大作业—— 24秒倒计时器
  14. 【原创】关于2013、2014款Macbook Air安装Windows的那点事
  15. a标签href的几种写法
  16. 日本人的姓及一些姓氏的读法(转)
  17. 对校招生培养工作的建议_学校招生工作的几点建议
  18. ERROR Failed to send requests for topics
  19. Java核心技术卷一笔记
  20. python网络编程案例—五子棋游戏

热门文章

  1. python字符串替换功能string.replace()可以用正则表达式,更优雅
  2. python简单的监控脚本-利用socket、psutil阻止远程主机运行特定程序
  3. Echarts散点图
  4. opencv学习笔记1:图片读入,显示与保存(有代码)
  5. JavaScript实现combinationSum组合和算法(附完整源码)
  6. JavaScript实现articulation-points(关键点)(割点)算法(附完整源码)
  7. OpenCASCADE绘制测试线束:形状修复命令之一般命令
  8. wxIntegerValidator< T > 类模板用法
  9. wxWidgets:wxBusyInfo类用法
  10. boost::yap::make_expression_function相关的测试程序