Flume的开发之 自定义 source 自定义 sink 自定义拦截器
一:开发相关
加入 jar 包依赖:
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-configuration</artifactId>
<version>1.5.2</version>
</dependency
二:自定义 source
比如有一个需求,需要读取指定文件,要求读取4行才发送给channel
package package package com.gds.flume.source;
import import import import java.io.FileNotFoundException;
import import import import java.io.IOException;
import import import import java.io.RandomAccessFile;
import import import import java.util.concurrent.atomic.AtomicLong;
import import import import org.apache.flume.Context;
import import import import org.apache.flume.Event;
import import import import org.apache.flume.EventDeliveryException;
import import import import org.apache.flume.PollableSource;
import import import import org.apache.flume.conf.Configurable;
import import import import org.apache.flume.event.EventBuilder;
import import import import org.apache.flume.source.AbstractSource;
import import import import org.slf4j.Logger;
import import import import org.slf4j.LoggerFactory;
/**
* 从某个日子文件中每次读取4行,发送给channel
*
* @author @author @author @author dczhao
*
*/
public public public public class class class class MySource extends extends extends extends AbstractSource implements implements implements implements Configurable,
PollableSource {
private private private private static static static static final final final final Logger logger = LoggerFactory
.getLogger(MySource.class class class class);
/**
* 要读的日志文件地址
*/
private private private private String logPath;
private private private private RandomAccessFile file;
private private private private AtomicLong autoLine;
private private private private StringBuilder oldLine = new new new new StringBuilder();;
@Override
public public public public synchronized synchronized synchronized synchronized void void void void start() {
logger.info("MySource starting~~~~~");
autoLine = new new new new AtomicLong(0);
try try try try {
file = new new new new RandomAccessFile(logPath, "r");
} catch catch catch catch (FileNotFoundException e) {
logger.error("file oper error~~~~~", e);
}
super super super super.start();
}
@Override
public public public public synchronized synchronized synchronized synchronized void void void void stop() {
try try try try {
file.close();
} catch catch catch catch (IOException e) {
logger.error("file close error~~~~~", e);
}
autoLine = new new new new AtomicLong(0);
super super super super.stop();
}
public public public public void void void void configure(Context context) {
this this this this.logPath = context.getString("logPath");
}
public public public public Status process() throws throws throws throws EventDeliveryException {
try try try try {
String line = file.readLine();
ifififif(line==null null null null){
return return return return Status.BACKOFF;
}
ifififif(autoLine.intValue()%4==0){
logger.info("oldLine:" + oldLine.toString());
byte byte byte byte[] body = oldLine.toString().getBytes();
Event event = EventBuilder.withBody(body);
getChannelProcessor().processEvent(event);
oldLine = new new new new StringBuilder();
autoLine=new new new new AtomicLong(0);
}
oldLine.append(line);
} catch catch catch catch (IOException e) {
logger.error("getFilePointer error~~~~~", e);
return return return return Status.BACKOFF;
}
return return return return Status.READY;
}
}
其实要学会怎么自定义 source,可以参考 flume 提供的相关 source类,比如 AvroSource、NetcatSource、SpoolDirectorySource 等。
三:自定义 sink
从 channel 中读取数据,推送到 MySQL 数据库保存数据为例
ackage package package package com.gds.flume.sink;
import import import import com.google.common.base.Preconditions;
import import import import com.google.common.base.Throwables;
import import import import com.google.common.collect.Lists;
import import import import org.apache.flume.*;
import import import import org.apache.flume.conf.Configurable;
import import import import org.apache.flume.sink.AbstractSink;
import import import import org.slf4j.Logger;
import import import import org.slf4j.LoggerFactory;
import import import import java.sql.Connection;
import import import import java.sql.DriverManager;
import import import import java.sql.PreparedStatement;
import import import import java.sql.SQLException;
import import import import java.util.List;
public public public public class class class class MysqlSink extends extends extends extends AbstractSink implements implements implements implements Configurable {
private private private private Logger LOG = LoggerFactory.getLogger(MysqlSink.class class class class);
private private private private String hostname;
private private private private String port;
private private private private String databaseName;
private private private private String tableName;
private private private private String user;
private private private private String password;
private private private private PreparedStatement preparedStatement;
private private private private Connection conn;
private private private private int int int int batchSize;
public public public public MysqlSink() {
LOG.info("MysqlSink start...");
}
public public public public void void void void configure(Context context) {
hostname = context.getString("hostname");
Preconditions.checkNotNull(hostname, "hostname must be set!!");
port = context.getString("port");
Preconditions.checkNotNull(port, "port must be set!!");
databaseName = context.getString("databaseName");
Preconditions.checkNotNull(databaseName, "databaseName must be set!!");
tableName = context.getString("tableName");
Preconditions.checkNotNull(tableName, "tableName must be set!!");
user = context.getString("user");
Preconditions.checkNotNull(user, "user must be set!!");
password = context.getString("password");
Preconditions.checkNotNull(password, "password must be set!!");
batchSize = context.getInteger("batchSize", 100);
Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive
number!!");
}
@Override
public public public public void void void void start() {
super super super super.start();
try try try try {
//调用Class.forName()方法加载驱动程序
Class.forName("com.mysql.jdbc.Driver");
} catch catch catch catch (ClassNotFoundException e) {
e.printStackTrace();
}
String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databaseName;
//调用DriverManager对象的getConnection()方法,获得一个Connection对象
try try try try {
conn = DriverManager.getConnection(url, user, password);
conn.setAutoCommit(false false false false);
//创建一个Statement对象
preparedStatement = conn.prepareStatement("insert into " + tableName +
" (content) values (?)");
} catch catch catch catch (SQLException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public public public public void void void void stop() {
super super super super.stop();
ifififif (preparedStatement != null null null null) {
try try try try {
preparedStatement.close();
} catch catch catch catch (SQLException e) {
e.printStackTrace();
}
}
ifififif (conn != null null null null) {
try try try try {
conn.close();
} catch catch catch catch (SQLException e) {
e.printStackTrace();
}
}
}
public public public public Status process() throws throws throws throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event;
String content;
List<String> actions = Lists.newArrayList();
transaction.begin();
try try try try {
for for for for (int int int int i = 0; i < batchSize; i++) {
event = channel.take();
ifififif (event != null null null null) {
content = new new new new String(event.getBody());
actions.add(content);
} else else else else {
result = Status.BACKOFF;
break break break break;
}
}
ifififif (actions.size() > 0) {
preparedStatement.clearBatch();
for for for for (String temp : actions) {
preparedStatement.setString(1, temp);
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
conn.commit();
}
transaction.commit();
} catch catch catch catch (Throwable e) {
try try try try {
transaction.rollback();
} catch catch catch catch (Exception e2) {
LOG.error("Exception in rollback. Rollback might not have been" +
"successful.", e2);
}
LOG.error("Failed to commit transaction." +
"Transaction rolled back.", e);
Throwables.propagate(e);
} finally finally finally finally {
transaction.close();
}
return return return return result;
}
}
其实要学会怎么自定义 sink,可以参考 flume 提供的相关 sink 类,
比如 LoggerSink、AvroSink 等。
四:自定义拦截器
五:其他说明
1、使用 Spooling Directory Source 的时候,一定要避免同时读写 一 个 文 件 的 情 况 。 可 以 通 过 source1.ignorePattern =^(.)*\.tmp$这个配置,让 spoolingsource 不读取该格式的文件。
.
六:参考文档
http://blog.csdn.net/xiao_jun_0820/article/category/2399621 这篇博客比较全
Flume的开发之 自定义 source 自定义 sink 自定义拦截器相关推荐
- 三十九、Flume自定义Source、Sink
上篇文章咱们基于Flume举了几个例子,包括它的扇入扇出等等.这篇文章我们主要来看一下怎样通过自定义Source和Sink来实现Flume的数据采集.关注专栏<破茧成蝶--大数据篇>,查看 ...
- Jodd 5.0 使用自定义WebApp及设置默认拦截器
首先使用IDEA创建Java工程,目录如下图所示: 使用 Jetty作为Web服务器,配置 jetty.xml: <?xml version="1.0" encoding=& ...
- Flume NG 学习笔记(八)Interceptors(拦截器)测试
版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 拦截器主要是对事件的header信息信息操作,要么直接忽略他,要么修改他的数据 一.Event Serializers file ...
- 在SpringBoot项目中,自定义注解+拦截器优雅的实现敏感数据的加解密!
在实际生产项目中,经常需要对如身份证信息.手机号.真实姓名等的敏感数据进行加密数据库存储,但在业务代码中对敏感信息进行手动加解密则十分不优雅,甚至会存在错加密.漏加密.业务人员需要知道实际的加密规则等 ...
- Struts2框架自定义拦截器
Struts2中把某些公共性的功能放置到拦截器中,一般一个拦截器只负责一个功能. 拦截器的与过滤器有很多的相似之处,其中过滤器依赖于servlet容器,拦截器基于反射,不依赖servlet容器, 过滤 ...
- Hadoop生态Flume(三)拦截器(Interceptor)介绍与使用(1)
转载自 Flume中的拦截器(Interceptor)介绍与使用(一) Flume中的拦截器(interceptor) 用户Source读取events发送到Sink的时候,在events heade ...
- 1.31.Flink自定义rocketmq(source/sink)+自定义redis source和sink
1.31.Flink自定义rocketmq(source/sink)+自定义redis+自定义 1.31.1.工程结构 1.31.2.定义pom.xml文件 1.31.3.log4j2.propert ...
- flume自定义拦截器开发步骤
步骤如下: 1.新建一个java项目,不需要依赖spring等一系列依赖.只需要加上你用的 工具类的依赖.flume的依赖不用加,因为服务器里面有. 2.实现Interceptor接口,重写里面的in ...
- 第1节 flume:15、flume案例二,通过自定义拦截器实现数据的脱敏
1.7.flume案例二 案例需求: 在数据采集之后,通过flume的拦截器,实现不需要的数据过滤掉,并将指定的第一个字段进行加密,加密之后再往hdfs上面保存 原始数据与处理之后的数据对比 图一 ...
最新文章
- drop wp table
- python subplot_气象编程 | 一个简单的风数据处理和分析案例(Python版)
- rtmp 时间戳问题
- Java enum枚举类型
- 【渝粤教育】国家开放大学2018年秋季 2083T信息技术与教育技术(2) 参考试题
- 面试官:面对业务量增长10倍、100倍怎么处理? 当场哭出声。。
- Windows运行命令收集
- ubuntu网站收集
- 【译】渐进式 Web App 的离线存储
- IIS设置HTTP To HTTPS
- 并发编程学习之Lock同步锁
- android ntp服务器配置
- 如何修改显示Office图标而不是wps图标
- 如何定向网件路由防火墙与URL
- Spatial4j简介
- 计算机科学与技术学院老师颁奖词,各种颁奖词收集与各类奖学金、各种称号、各种职位中英文对照(个人简历用得上)合集.doc...
- 【PostgreSQL】函数之百分位数中位数:percentile_cont()
- ORACLE自学教程
- java冒泡排序经典代码(Java冒泡排序)
- Onvif OSD相关操作
热门文章
- 《Scala机器学习》一一1.1 Scala入门
- 勒索病毒WannaCry(永恒之蓝)
- MySQL查看修改存储引擎总结
- “我有必要写技术博客吗?” 写技术博客一年,谈谈其得失优劣
- 【数据结构笔记19】File Transfer的C语言实现,集合的简化表示,按秩归并,路径压缩
- 项目代码matlab
- JellyViewPager
- linux通用自启动管理,linux下通过xinetd服务管理 rsync 实现开机自启动
- sql server 约束 查找
- 升级dedecms5.5后,出现提示保存目录数据时失败,请检查你的输入资料是否存在问题...