一:开发相关

加入 jar 包依赖:

<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-configuration</artifactId>
<version>1.5.2</version>
</dependency

二:自定义 source

比如有一个需求,需要读取指定文件,要求读取4行才发送给channel

package package package com.gds.flume.source;
import import import import java.io.FileNotFoundException;
import import import import java.io.IOException;
import import import import java.io.RandomAccessFile;
import import import import java.util.concurrent.atomic.AtomicLong;
import import import import org.apache.flume.Context;
import import import import org.apache.flume.Event;
import import import import org.apache.flume.EventDeliveryException;
import import import import org.apache.flume.PollableSource;
import import import import org.apache.flume.conf.Configurable;
import import import import org.apache.flume.event.EventBuilder;
import import import import org.apache.flume.source.AbstractSource;
import import import import org.slf4j.Logger;
import import import import org.slf4j.LoggerFactory;
/**
* 从某个日子文件中每次读取4行,发送给channel
*
* @author @author @author @author dczhao
*
*/
public public public public class class class class MySource extends extends extends extends AbstractSource implements implements implements implements Configurable,
PollableSource {
private private private private static static static static final final final final Logger logger = LoggerFactory
.getLogger(MySource.class class class class);
/**
* 要读的日志文件地址
*/
private private private private String logPath;
private private private private RandomAccessFile file;
private private private private AtomicLong autoLine;
private private private private StringBuilder oldLine = new new new new StringBuilder();;
@Override
public public public public synchronized synchronized synchronized synchronized void void void void start() {
logger.info("MySource starting~~~~~");
autoLine = new new new new AtomicLong(0);
try try try try {
file = new new new new RandomAccessFile(logPath, "r");
} catch catch catch catch (FileNotFoundException e) {
logger.error("file oper error~~~~~", e);
}
super super super super.start();
}
@Override
public public public public synchronized synchronized synchronized synchronized void void void void stop() {
try try try try {
file.close();
} catch catch catch catch (IOException e) {
logger.error("file close error~~~~~", e);
}
autoLine = new new new new AtomicLong(0);
super super super super.stop();
}
public public public public void void void void configure(Context context) {
this this this this.logPath = context.getString("logPath");
}
public public public public Status process() throws throws throws throws EventDeliveryException {
try try try try {
String line = file.readLine();
ifififif(line==null null null null){
return return return return Status.BACKOFF;
}
ifififif(autoLine.intValue()%4==0){
logger.info("oldLine:" + oldLine.toString());
byte byte byte byte[] body = oldLine.toString().getBytes();
Event event = EventBuilder.withBody(body);
getChannelProcessor().processEvent(event);
oldLine = new new new new StringBuilder();
autoLine=new new new new AtomicLong(0);
}
oldLine.append(line);
} catch catch catch catch (IOException e) {
logger.error("getFilePointer error~~~~~", e);
return return return return Status.BACKOFF;
}
return return return return Status.READY;
}
}

其实要学会怎么自定义 source,可以参考 flume 提供的相关 source类,比如 AvroSource、NetcatSource、SpoolDirectorySource 等。

三:自定义 sink

从 channel 中读取数据,推送到 MySQL 数据库保存数据为例

ackage package package package com.gds.flume.sink;
import import import import com.google.common.base.Preconditions;
import import import import com.google.common.base.Throwables;
import import import import com.google.common.collect.Lists;
import import import import org.apache.flume.*;
import import import import org.apache.flume.conf.Configurable;
import import import import org.apache.flume.sink.AbstractSink;
import import import import org.slf4j.Logger;
import import import import org.slf4j.LoggerFactory;
import import import import java.sql.Connection;
import import import import java.sql.DriverManager;
import import import import java.sql.PreparedStatement;
import import import import java.sql.SQLException;
import import import import java.util.List;
public public public public class class class class MysqlSink extends extends extends extends AbstractSink implements implements implements implements Configurable {
private private private private Logger LOG = LoggerFactory.getLogger(MysqlSink.class class class class);
private private private private String hostname;
private private private private String port;
private private private private String databaseName;
private private private private String tableName;
private private private private String user;
private private private private String password;
private private private private PreparedStatement preparedStatement;
private private private private Connection conn;
private private private private int int int int batchSize;
public public public public MysqlSink() {
LOG.info("MysqlSink start...");
}
public public public public void void void void configure(Context context) {
hostname = context.getString("hostname");
Preconditions.checkNotNull(hostname, "hostname must be set!!");
port = context.getString("port");
Preconditions.checkNotNull(port, "port must be set!!");
databaseName = context.getString("databaseName");
Preconditions.checkNotNull(databaseName, "databaseName must be set!!");
tableName = context.getString("tableName");
Preconditions.checkNotNull(tableName, "tableName must be set!!");
user = context.getString("user");
Preconditions.checkNotNull(user, "user must be set!!");
password = context.getString("password");
Preconditions.checkNotNull(password, "password must be set!!");
batchSize = context.getInteger("batchSize", 100);
Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive
number!!");
}
@Override
public public public public void void void void start() {
super super super super.start();
try try try try {
//调用Class.forName()方法加载驱动程序
Class.forName("com.mysql.jdbc.Driver");
} catch catch catch catch (ClassNotFoundException e) {
e.printStackTrace();
}
String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databaseName;
//调用DriverManager对象的getConnection()方法,获得一个Connection对象
try try try try {
conn = DriverManager.getConnection(url, user, password);
conn.setAutoCommit(false false false false);
//创建一个Statement对象
preparedStatement = conn.prepareStatement("insert into " + tableName +
" (content) values (?)");
} catch catch catch catch (SQLException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public public public public void void void void stop() {
super super super super.stop();
ifififif (preparedStatement != null null null null) {
try try try try {
preparedStatement.close();
} catch catch catch catch (SQLException e) {
e.printStackTrace();
}
}
ifififif (conn != null null null null) {
try try try try {
conn.close();
} catch catch catch catch (SQLException e) {
e.printStackTrace();
}
}
}
public public public public Status process() throws throws throws throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event;
String content;
List<String> actions = Lists.newArrayList();
transaction.begin();
try try try try {
for for for for (int int int int i = 0; i < batchSize; i++) {
event = channel.take();
ifififif (event != null null null null) {
content = new new new new String(event.getBody());
actions.add(content);
} else else else else {
result = Status.BACKOFF;
break break break break;
}
}
ifififif (actions.size() > 0) {
preparedStatement.clearBatch();
for for for for (String temp : actions) {
preparedStatement.setString(1, temp);
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
conn.commit();
}
transaction.commit();
} catch catch catch catch (Throwable e) {
try try try try {
transaction.rollback();
} catch catch catch catch (Exception e2) {
LOG.error("Exception in rollback. Rollback might not have been" +
"successful.", e2);
}
LOG.error("Failed to commit transaction." +
"Transaction rolled back.", e);
Throwables.propagate(e);
} finally finally finally finally {
transaction.close();
}
return return return return result;
}
}

其实要学会怎么自定义 sink,可以参考 flume 提供的相关 sink 类,
比如 LoggerSink、AvroSink 等。

四:自定义拦截器

五:其他说明

1、使用 Spooling Directory Source 的时候,一定要避免同时读写 一 个 文 件 的 情 况 。 可 以 通 过 source1.ignorePattern =^(.)*\.tmp$这个配置,让 spoolingsource 不读取该格式的文件。
.

六:参考文档

http://blog.csdn.net/xiao_jun_0820/article/category/2399621 这篇博客比较全

Flume的开发之 自定义 source 自定义 sink 自定义拦截器相关推荐

  1. 三十九、Flume自定义Source、Sink

    上篇文章咱们基于Flume举了几个例子,包括它的扇入扇出等等.这篇文章我们主要来看一下怎样通过自定义Source和Sink来实现Flume的数据采集.关注专栏<破茧成蝶--大数据篇>,查看 ...

  2. Jodd 5.0 使用自定义WebApp及设置默认拦截器

    首先使用IDEA创建Java工程,目录如下图所示: 使用 Jetty作为Web服务器,配置 jetty.xml: <?xml version="1.0" encoding=& ...

  3. Flume NG 学习笔记(八)Interceptors(拦截器)测试

    版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 拦截器主要是对事件的header信息信息操作,要么直接忽略他,要么修改他的数据 一.Event Serializers file ...

  4. 在SpringBoot项目中,自定义注解+拦截器优雅的实现敏感数据的加解密!

    在实际生产项目中,经常需要对如身份证信息.手机号.真实姓名等的敏感数据进行加密数据库存储,但在业务代码中对敏感信息进行手动加解密则十分不优雅,甚至会存在错加密.漏加密.业务人员需要知道实际的加密规则等 ...

  5. Struts2框架自定义拦截器

    Struts2中把某些公共性的功能放置到拦截器中,一般一个拦截器只负责一个功能. 拦截器的与过滤器有很多的相似之处,其中过滤器依赖于servlet容器,拦截器基于反射,不依赖servlet容器, 过滤 ...

  6. Hadoop生态Flume(三)拦截器(Interceptor)介绍与使用(1)

    转载自 Flume中的拦截器(Interceptor)介绍与使用(一) Flume中的拦截器(interceptor) 用户Source读取events发送到Sink的时候,在events heade ...

  7. 1.31.Flink自定义rocketmq(source/sink)+自定义redis source和sink

    1.31.Flink自定义rocketmq(source/sink)+自定义redis+自定义 1.31.1.工程结构 1.31.2.定义pom.xml文件 1.31.3.log4j2.propert ...

  8. flume自定义拦截器开发步骤

    步骤如下: 1.新建一个java项目,不需要依赖spring等一系列依赖.只需要加上你用的 工具类的依赖.flume的依赖不用加,因为服务器里面有. 2.实现Interceptor接口,重写里面的in ...

  9. 第1节 flume:15、flume案例二,通过自定义拦截器实现数据的脱敏

    1.7.flume案例二 案例需求: 在数据采集之后,通过flume的拦截器,实现不需要的数据过滤掉,并将指定的第一个字段进行加密,加密之后再往hdfs上面保存 原始数据与处理之后的数据对比 图一  ...

最新文章

  1. drop wp table
  2. python subplot_气象编程 | 一个简单的风数据处理和分析案例(Python版)
  3. rtmp 时间戳问题
  4. Java enum枚举类型
  5. 【渝粤教育】国家开放大学2018年秋季 2083T信息技术与教育技术(2) 参考试题
  6. 面试官:面对业务量增长10倍、100倍怎么处理? 当场哭出声。。
  7. Windows运行命令收集
  8. ubuntu网站收集
  9. 【译】渐进式 Web App 的离线存储
  10. IIS设置HTTP To HTTPS
  11. 并发编程学习之Lock同步锁
  12. android ntp服务器配置
  13. 如何修改显示Office图标而不是wps图标
  14. 如何定向网件路由防火墙与URL
  15. Spatial4j简介
  16. 计算机科学与技术学院老师颁奖词,各种颁奖词收集与各类奖学金、各种称号、各种职位中英文对照(个人简历用得上)合集.doc...
  17. 【PostgreSQL】函数之百分位数中位数:percentile_cont()
  18. ORACLE自学教程
  19. java冒泡排序经典代码(Java冒泡排序)
  20. Onvif OSD相关操作

热门文章

  1. 《Scala机器学习》一一1.1 Scala入门
  2. 勒索病毒WannaCry(永恒之蓝)
  3. MySQL查看修改存储引擎总结
  4. “我有必要写技术博客吗?” 写技术博客一年,谈谈其得失优劣
  5. 【数据结构笔记19】File Transfer的C语言实现,集合的简化表示,按秩归并,路径压缩
  6. 项目代码matlab
  7. JellyViewPager
  8. linux通用自启动管理,linux下通过xinetd服务管理 rsync 实现开机自启动
  9. sql server 约束 查找
  10. 升级dedecms5.5后,出现提示保存目录数据时失败,请检查你的输入资料是否存在问题...