Flume 1.7 源码分析(一)源码编译
Flume 1.7 源码分析(二)整体架构
Flume 1.7 源码分析(三)程序入口
Flume 1.7 源码分析(四)从Source写数据到Channel
Flume 1.7 源码分析(五)从Channel获取数据写入Sink

6 从Channel获取数据写入Sink

6.1 Sink部分

Sink部分主要分为以下3个步骤:
1. 由SinkRunner不断调用SinkProcessor的process方法。
2. 根据配置的SinkProcessor的不同,会使用不同的策略来选择sink。SinkProcessor有3种,默认是DefaultSinkProcessor。
3. 调用选择的sink的process方法。

6.1.1 Sink的Process方法

以LoggerSink为例进行说明。这个方法来自Sink接口,主要用于取出数据进行处理,如果失败则回滚(takeList中内容退回quene):

public Status process() throws EventDeliveryException {Status result = Status.READY;Channel channel = getChannel();Transaction transaction = channel.getTransaction();Event event = null;try {transaction.begin();event = channel.take();//从channel中获取一条数据if (event != null) {if (logger.isInfoEnabled()) {logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
//输出event到日志}} else {result = Status.BACKOFF;}transaction.commit();//执行提交操作} catch (Exception ex) {transaction.rollback();//执行回滚操作throw new EventDeliveryException("Failed to log event: " + event, ex);} finally {transaction.close();}return result;
}

6.2 Channel部分

6.2.1 doTake方法

这个方法中主要是从queue中取出事件,放到takeList中。

protected Event doTake() throws InterruptedException {channelCounter.incrementEventTakeAttemptCount();//获取take列表容量的许可,如果没有则报异常。if (takeList.remainingCapacity() == 0) {throw new ChannelException("");}
//尝试获取queue数量的许可,如果没有则代表没有数据可以取,直接返回。if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {return null;}Event event;synchronized (queueLock) {event = queue.poll();//从queue中取出一条数据}Preconditions.checkNotNull(event, "");takeList.put(event);//放到takeList中int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);takeByteCounter += eventByteSize;//设置计数器return event;
}

6.2.2 doCommit方法

前面说到put和take操作的提交都是通过这个方法来提交的。

这个步骤要做的事情有:
1. putList放入queue,完成后就代表eventList->putList->queue这个步骤完成。
2. 假如doTake过程没报错(能进到这个方法说明没报错),说明sink那边已经获取到了全部的event,这时可直接清空takeList,代表queuetakeList & sink这个步骤完成。

综上,两个事情合并在一起的话,要做的就是,把putList放入queue再清空takeList。

protected void doCommit() throws InterruptedException {int remainingChange = takeList.size() - putList.size();if (remainingChange < 0) {if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {throw new ChannelException("");}if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {bytesRemaining.release(putByteCounter);throw new ChannelFullException("");}}int puts = putList.size();int takes = takeList.size();synchronized (queueLock) {if (puts > 0) {while (!putList.isEmpty()) {if (!queue.offer(putList.removeFirst())) {throw new RuntimeException("");}}}putList.clear();takeList.clear();}//后面是重新设置相关计数器
}

这个方法一开始去比较takeList和putList的容量差,是为了简化申请许可的过程。正常的流程是清空takeList,释放takeList.size个许可,再申请putList.size个许可,它是两个步骤合并起来的。

6.2.3 doRollback方法

与doCommit方法类似,这里的回滚,也分为2种情况:
- 由take操作引起的
该transaction的流程如下:queue->takeList & sink,所以回滚操作要做的事情就是:把takeList放回queue。
- 由put操作引起的
该transaction的流程如下:eventList->putList->queue,由于doPut和doCommit执行出现异常就直接跳出了,还没执行清空语句,也就是eventList还没有清空,所以可以直接清空putList,这样下次循环还会重新读取该eventList中的数据。

综上,两种操作要合为一个方法的话,就把takeList放回queue,然后清理putList就可以了。代码如下:

protected void doRollback() {int takes = takeList.size();synchronized (queueLock) {Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),"");while (!takeList.isEmpty()) {queue.addFirst(takeList.removeLast());}putList.clear();}//后面是重新设置相关计数器
}

附注:从目前的代码看,在take操作的时候,应该已经获取到了部分数据,如果这个时候异常了,把takeList返回queue的话,会导致重复数据。

Flume 1.7 源码分析(五)从Channel获取数据写入Sink相关推荐

  1. Flume 1.7 源码分析(四)从Source写数据到Channel

    Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 Flume 1.7 源码分析(四)从Source写数据到Channe ...

  2. Flume 1.7 源码分析(三)程序入口

    Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 Flume 1.7 源码分析(四)从Source写数据到Channe ...

  3. Flume 1.7 源码分析(二)整体架构

    Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 Flume 1.7 源码分析(四)从Source写数据到Channe ...

  4. Flume 1.7 源码分析(一)源码编译

    Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 1 说明 Flume是Cloudera提供的一个高可用的,高可靠的, ...

  5. Linux内核源码分析—从用户空间复制数据到内核空间

    Linux内核源码分析-从用户空间复制数据到内核空间 本文主要参考<深入理解Linux内核>,结合2.6.11.1版的内核代码,分析从用户空间复制数据到内核空间函数. 1.不描述内核同步. ...

  6. MPTCP 源码分析(五) 接收端窗口值

    简述: 在TCP协议中影响数据发送的三个因素分别为:发送端窗口值.接收端窗口值和拥塞窗口值. 本文主要分析MPTCP中各个子路径对接收端窗口值rcv_wnd的处理. 接收端窗口值的初始化 根据< ...

  7. 【转】ABP源码分析五:ABP初始化全过程

    ABP在初始化阶段做了哪些操作,前面的四篇文章大致描述了一下. 为个更清楚的描述其脉络,做了张流程图以辅助说明.其中每一步都涉及很多细节,难以在一张图中全部表现出来.每一步的细节(会涉及到较多接口,类 ...

  8. Mybatis-Spring源码分析(五) MapperMethod和MappedStatement解析

    前言 基本上这就是Mybatis-Spring源码的最后一篇了,如果想起来什么再单开博客.比起来Spring源码,Mybatis的确实简单一些,本篇就说一下Mybatis中两个十分重要的类Mapper ...

  9. Vue.js 源码分析(五) 基础篇 方法 methods属性详解

    methods中定义了Vue实例的方法,官网是这样介绍的: 例如:: <!DOCTYPE html> <html lang="en"> <head&g ...

最新文章

  1. java mysql 查询结果_JAVA中显示MYSQL查询结果
  2. NI Measurement Studio 打包问题的解决(原创)
  3. C#设置IP地址,启用禁用适配器
  4. dns文件传输服务器,MOOC云计算 - DNS三部曲之DNS区域传输限制
  5. 提取source引擎.mdl模型,并转unity引擎.fbx
  6. cesium——鼠标拾取坐标并转换为经纬高
  7. openmv实现二维码识别与串口发送
  8. CCF认证 2018-09 卖菜
  9. 启动tomcat时候报错(Error deploying web application directory)
  10. Godaddy无缝切换SSL,无需续费可省12美元
  11. 初中英语语法(005)-时态
  12. 关于自己学习安卓的体会
  13. 雨量传感器测试(大众凌渡高尔夫7雨量传感器)
  14. Oracle账号频繁被锁定
  15. FFmpeg源码分析:avcodec_register_all()注册编解码器
  16. 多通道图像的分解和单通道图像的合成
  17. 0元0基础搭建个人网站简易实操
  18. 32位和64位的区别
  19. 为什么我的物联网创业失败?看看这五个原因
  20. 计算机应用基础试题学测,计算机应用基础测试题含答案

热门文章

  1. Android开发笔记——Android 9发送通知
  2. 《qss样式表笔记大全(二):可设置样式的窗口部件列表(上)(包含相关示例)》
  3. 直接将自身代码注入傀儡进程
  4. EXE和SYS通信IOCTL方式
  5. 逆向工程核心原理读书笔记-API钩取之隐藏进程(二)
  6. 外挂学习之路(1)--- bp send 回溯寻找关键call
  7. 【Boost】boost库asio详解1——io_service::run函数无任务时退出的问题
  8. interface接口实例
  9. 彻彻底底了解回调函数
  10. getsockname与getpeername用法与区别