Flume 1.7 源码分析(五)从Channel获取数据写入Sink
Flume 1.7 源码分析(一)源码编译
Flume 1.7 源码分析(二)整体架构
Flume 1.7 源码分析(三)程序入口
Flume 1.7 源码分析(四)从Source写数据到Channel
Flume 1.7 源码分析(五)从Channel获取数据写入Sink
6 从Channel获取数据写入Sink
6.1 Sink部分
Sink部分主要分为以下3个步骤:
1. 由SinkRunner不断调用SinkProcessor的process方法。
2. 根据配置的SinkProcessor的不同,会使用不同的策略来选择sink。SinkProcessor有3种,默认是DefaultSinkProcessor。
3. 调用选择的sink的process方法。
6.1.1 Sink的Process方法
以LoggerSink为例进行说明。这个方法来自Sink接口,主要用于取出数据进行处理,如果失败则回滚(takeList中内容退回quene):
public Status process() throws EventDeliveryException {Status result = Status.READY;Channel channel = getChannel();Transaction transaction = channel.getTransaction();Event event = null;try {transaction.begin();event = channel.take();//从channel中获取一条数据if (event != null) {if (logger.isInfoEnabled()) {logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
//输出event到日志}} else {result = Status.BACKOFF;}transaction.commit();//执行提交操作} catch (Exception ex) {transaction.rollback();//执行回滚操作throw new EventDeliveryException("Failed to log event: " + event, ex);} finally {transaction.close();}return result;
}
6.2 Channel部分
6.2.1 doTake方法
这个方法中主要是从queue中取出事件,放到takeList中。
protected Event doTake() throws InterruptedException {channelCounter.incrementEventTakeAttemptCount();//获取take列表容量的许可,如果没有则报异常。if (takeList.remainingCapacity() == 0) {throw new ChannelException("");}
//尝试获取queue数量的许可,如果没有则代表没有数据可以取,直接返回。if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {return null;}Event event;synchronized (queueLock) {event = queue.poll();//从queue中取出一条数据}Preconditions.checkNotNull(event, "");takeList.put(event);//放到takeList中int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);takeByteCounter += eventByteSize;//设置计数器return event;
}
6.2.2 doCommit方法
前面说到put和take操作的提交都是通过这个方法来提交的。
这个步骤要做的事情有:
1. putList放入queue,完成后就代表eventList->putList->queue这个步骤完成。
2. 假如doTake过程没报错(能进到这个方法说明没报错),说明sink那边已经获取到了全部的event,这时可直接清空takeList,代表queuetakeList & sink这个步骤完成。
综上,两个事情合并在一起的话,要做的就是,把putList放入queue再清空takeList。
protected void doCommit() throws InterruptedException {int remainingChange = takeList.size() - putList.size();if (remainingChange < 0) {if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {throw new ChannelException("");}if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {bytesRemaining.release(putByteCounter);throw new ChannelFullException("");}}int puts = putList.size();int takes = takeList.size();synchronized (queueLock) {if (puts > 0) {while (!putList.isEmpty()) {if (!queue.offer(putList.removeFirst())) {throw new RuntimeException("");}}}putList.clear();takeList.clear();}//后面是重新设置相关计数器
}
这个方法一开始去比较takeList和putList的容量差,是为了简化申请许可的过程。正常的流程是清空takeList,释放takeList.size个许可,再申请putList.size个许可,它是两个步骤合并起来的。
6.2.3 doRollback方法
与doCommit方法类似,这里的回滚,也分为2种情况:
- 由take操作引起的
该transaction的流程如下:queue->takeList & sink,所以回滚操作要做的事情就是:把takeList放回queue。
- 由put操作引起的
该transaction的流程如下:eventList->putList->queue,由于doPut和doCommit执行出现异常就直接跳出了,还没执行清空语句,也就是eventList还没有清空,所以可以直接清空putList,这样下次循环还会重新读取该eventList中的数据。
综上,两种操作要合为一个方法的话,就把takeList放回queue,然后清理putList就可以了。代码如下:
protected void doRollback() {int takes = takeList.size();synchronized (queueLock) {Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),"");while (!takeList.isEmpty()) {queue.addFirst(takeList.removeLast());}putList.clear();}//后面是重新设置相关计数器
}
附注:从目前的代码看,在take操作的时候,应该已经获取到了部分数据,如果这个时候异常了,把takeList返回queue的话,会导致重复数据。
Flume 1.7 源码分析(五)从Channel获取数据写入Sink相关推荐
- Flume 1.7 源码分析(四)从Source写数据到Channel
Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 Flume 1.7 源码分析(四)从Source写数据到Channe ...
- Flume 1.7 源码分析(三)程序入口
Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 Flume 1.7 源码分析(四)从Source写数据到Channe ...
- Flume 1.7 源码分析(二)整体架构
Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 Flume 1.7 源码分析(四)从Source写数据到Channe ...
- Flume 1.7 源码分析(一)源码编译
Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 1 说明 Flume是Cloudera提供的一个高可用的,高可靠的, ...
- Linux内核源码分析—从用户空间复制数据到内核空间
Linux内核源码分析-从用户空间复制数据到内核空间 本文主要参考<深入理解Linux内核>,结合2.6.11.1版的内核代码,分析从用户空间复制数据到内核空间函数. 1.不描述内核同步. ...
- MPTCP 源码分析(五) 接收端窗口值
简述: 在TCP协议中影响数据发送的三个因素分别为:发送端窗口值.接收端窗口值和拥塞窗口值. 本文主要分析MPTCP中各个子路径对接收端窗口值rcv_wnd的处理. 接收端窗口值的初始化 根据< ...
- 【转】ABP源码分析五:ABP初始化全过程
ABP在初始化阶段做了哪些操作,前面的四篇文章大致描述了一下. 为个更清楚的描述其脉络,做了张流程图以辅助说明.其中每一步都涉及很多细节,难以在一张图中全部表现出来.每一步的细节(会涉及到较多接口,类 ...
- Mybatis-Spring源码分析(五) MapperMethod和MappedStatement解析
前言 基本上这就是Mybatis-Spring源码的最后一篇了,如果想起来什么再单开博客.比起来Spring源码,Mybatis的确实简单一些,本篇就说一下Mybatis中两个十分重要的类Mapper ...
- Vue.js 源码分析(五) 基础篇 方法 methods属性详解
methods中定义了Vue实例的方法,官网是这样介绍的: 例如:: <!DOCTYPE html> <html lang="en"> <head&g ...
最新文章
- java mysql 查询结果_JAVA中显示MYSQL查询结果
- NI Measurement Studio 打包问题的解决(原创)
- C#设置IP地址,启用禁用适配器
- dns文件传输服务器,MOOC云计算 - DNS三部曲之DNS区域传输限制
- 提取source引擎.mdl模型,并转unity引擎.fbx
- cesium——鼠标拾取坐标并转换为经纬高
- openmv实现二维码识别与串口发送
- CCF认证 2018-09 卖菜
- 启动tomcat时候报错(Error deploying web application directory)
- Godaddy无缝切换SSL,无需续费可省12美元
- 初中英语语法(005)-时态
- 关于自己学习安卓的体会
- 雨量传感器测试(大众凌渡高尔夫7雨量传感器)
- Oracle账号频繁被锁定
- FFmpeg源码分析:avcodec_register_all()注册编解码器
- 多通道图像的分解和单通道图像的合成
- 0元0基础搭建个人网站简易实操
- 32位和64位的区别
- 为什么我的物联网创业失败?看看这五个原因
- 计算机应用基础试题学测,计算机应用基础测试题含答案