flume源码学习4-SourceRunner与ExecSource实现

2024-05-11 03:24:57

在agent启动时,会启动Channel,SourceRunner,SinkRunner,比如在org.apache.flume.agent.embedded.EmbeddedAgent类的doStart方法中:

  private void doStart() {boolean error = true;try {channel.start(); //调用Channel.start启动ChannelsinkRunner.start(); //调用SinkRunner.start启动SinkRunnersourceRunner.start(); //调用SourceRunner.start启动SourceRunnersupervisor.supervise(channel,new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);supervisor.supervise(sinkRunner,new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);supervisor.supervise(sourceRunner,new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);error = false;
.....

SourceRunner目前有两大类PollableSourceRunner和EventDrivenSourceRunner,分别对应PollableSource和EventDrivenSource,PollableSource相关类需要外部驱动来确定source中是否有消息可以使用,而EventDrivenSource相关类不需要外部驱动,自己实现了事件驱动机制,目前常见的Source类都属于EventDrivenSource类型。在org.apache.flume.conf.source.SourceType中定义了常见的Source类:

OTHER(null),
SEQ("org.apache.flume.source.SequenceGeneratorSource"),
NETCAT("org.apache.flume.source.NetcatSource"),
EXEC("org.apache.flume.source.ExecSource"),
AVRO("org.apache.flume.source.AvroSource"),
SYSLOGTCP("org.apache.flume.source.SyslogTcpSource"),
MULTIPORT_SYSLOGTCP("org.apache.flume.source.MultiportSyslogTCPSource"),
SYSLOGUDP("org.apache.flume.source.SyslogUDPSource"),
SPOOLDIR("org.apache.flume.source.SpoolDirectorySource"),
HTTP("org.apache.flume.source.http.HTTPSource");

可以由org.apache.flume.source.DefaultSourceFactory获取其对应的实例.
SourceRunner则负责启动Source,比如在org.apache.flume.source.EventDrivenSourceRunner的start方法:

  public void start() {Source source = getSource(); //通过getSource获取Source对象ChannelProcessor cp = source.getChannelProcessor(); //获取ChannelProcessor 对象cp.initialize(); //调用ChannelProcessor.initialize方法source.start(); //调用Source.start方法lifecycleState = LifecycleState. START;}

这里以execsource为例,配置source的类型为EXEC时,使用org.apache.flume.source.ExecSource.
主要原理是通过启动操作系统中的进程把每一行文本信息转换为Event,exec source不提供事务的特性(不能保证数据被成功发送到channel中),execsource关心的是标准输出,标准错误输出会被忽略(虽然可以打印到log中,但不会作为Event发送),可以设置restart的特性让source在退出时自动重启
主要方法分析:
1.configure方法用来设置相关参数
参数项:

command  执行的命令
restart 在命令退出后是否自动重启,默认是false
restartThrottle 重启命令的等待时间,默认是10s
logStderr 是否记录错误日志(只会放到日志中,不会做为Event),默认是false
batchSize 一次向channel发送的event的数量,批量发送的功能,默认是20
charset 字符编码设置,默认是UTF-8

2.start方法,SourceRunner的start方法会调用这个start方法:

  public void start() {logger.info( "Exec source starting with command:{}" , command );executor = Executors. newSingleThreadExecutor(); // 初始化一个单线程的线程池,private ExecutorService executor;counterGroup = new CounterGroup(); //初始化性能计数器runner = new ExecRunnable( command, getChannelProcessor(), counterGroup,restart, restartThrottle, logStderr , bufferCount , charset ); //实例化一个ExecRunnable线程// FIXME : Use a callback-like executor / future to signal us upon failure.runnerFuture = executor.submit( runner);  //private Future<?> runnerFuture;super.start(); //设置状态为start,lifecycleState = LifecycleState.START;logger.debug( "Exec source started");}

3.核心是一个实现Runnable接口的线程类ExecRunnable,用于启动读取数据的进程(在一个do...while循环中)

public void run() {do {String exitCode = "unknown";BufferedReader reader = null;try {String[] commandArgs = command.split("\\s+"); //由命令生成字符串数组process = new ProcessBuilder(commandArgs).start(); //生成一个Process对象,启动命令进程reader = new BufferedReader(new InputStreamReader(process.getInputStream(), charset)); //读取标准输出至缓冲区对象// StderrLogger dies as soon as the input stream is invalidStderrReader stderrReader = new StderrReader(new BufferedReader(new InputStreamReader(process.getErrorStream(), charset)), logStderr); //StderrReader线程用于记录标准错误日志stderrReader.setName("StderrReader-[" + command + "]");stderrReader.setDaemon(true);stderrReader.start();/*while((line = input.readLine()) != null) {if(logStderr) {logger.info("StderrLogger[{}] = '{}'", ++i, line); //日志只会写到log中,不会转换为Event}}*/String line = null;List<Event> eventList = new ArrayList<Event>();while ((line = reader.readLine()) != null) { //循环读取缓冲区中的内容counterGroup.incrementAndGet("exec.lines.read");eventList.add(EventBuilder.withBody(line.getBytes(charset)));  //使用EventBuilder.withBody转换成Event并放入eventList中if(eventList.size() >= bufferCount) { //达到batchSize的设置时调用ChannelProcessor.processEventBatch处理EventchannelProcessor.processEventBatch(eventList);eventList.clear();}}if(!eventList.isEmpty()) {channelProcessor.processEventBatch(eventList);}}
....} while(restart); //如果设置了restart,会在process退出后重新运行do代码块,否则退出
}

转载于:https://blog.51cto.com/caiguangguang/1618373

flume源码学习4-SourceRunner与ExecSource实现相关推荐

  1. flume源码学习8-hdfs sink的具体写入流程

    上一篇说了HDFSEventSink的实现,这里根据hdfs sink的配置和调用分析来看下sink中整个hdfs数据写入的过程: 线上hdfs sink的几个重要设置 1 2 3 4 5 6 7 8 ...

  2. Shiro源码学习之二

    接上一篇 Shiro源码学习之一 3.subject.login 进入login public void login(AuthenticationToken token) throws Authent ...

  3. Shiro源码学习之一

    一.最基本的使用 1.Maven依赖 <dependency><groupId>org.apache.shiro</groupId><artifactId&g ...

  4. mutations vuex 调用_Vuex源码学习(六)action和mutation如何被调用的(前置准备篇)...

    前言 Vuex源码系列不知不觉已经到了第六篇.前置的五篇分别如下: 长篇连载:Vuex源码学习(一)功能梳理 长篇连载:Vuex源码学习(二)脉络梳理 作为一个Web前端,你知道Vuex的instal ...

  5. vue实例没有挂载到html上,vue 源码学习 - 实例挂载

    前言 在学习vue源码之前需要先了解源码目录设计(了解各个模块的功能)丶Flow语法. src ├── compiler # 把模板解析成 ast 语法树,ast 语法树优化,代码生成等功能. ├── ...

  6. 2021-03-19Tomcat源码学习--WebAppClassLoader类加载机制

    Tomcat源码学习--WebAppClassLoader类加载机制 在WebappClassLoaderBase中重写了ClassLoader的loadClass方法,在这个实现方法中我们可以一窥t ...

  7. jQuery源码学习之Callbacks

    jQuery源码学习之Callbacks jQuery的ajax.deferred通过回调实现异步,其实现核心是Callbacks. 使用方法 使用首先要先新建一个实例对象.创建时可以传入参数flag ...

  8. JDK源码学习笔记——Integer

    一.类定义 public final class Integer extends Number implements Comparable<Integer> 二.属性 private fi ...

  9. DotText源码学习——ASP.NET的工作机制

    --本文是<项目驱动学习--DotText源码学习>系列的第一篇文章,在这之后会持续发表相关的文章. 概论 在阅读DotText源码之前,让我们首先了解一下ASP.NET的工作机制,可以使 ...

最新文章

  1. Java项目:校园招聘平台系统(java+MySQL+Jdbc+Servlet+SpringMvc+Jsp)
  2. 40多个漂亮的网页表单设计实例
  3. python元类 orm_python-进阶-元类在ORM上的应用详解
  4. java中jtansforms,java – 使用AffineTransform旋转图像
  5. 51Nod1230 幸运数
  6. vue 导入element-ui css报错解决方法
  7. c++中的类型转换--reinterpret_cast
  8. mongodb 监控权限_MongoDB - 监控
  9. TyepScript入门教程 之 async await
  10. 批量文件替换_CAD图形文件中如何快速批量替换文字?【AutoCAD教程】
  11. 管理SourceForge项目的方法
  12. xml笔记(马士兵)
  13. B Start - 哔哩哔哩校招入职成长营
  14. error: Microsoft Visual C++ 14.0 or greater is required. Get it with “Microsoft C++ Build Tools“
  15. Windows10服务优化
  16. 记一次nginx配置服务器代理发送请求(外网请求内网ip)
  17. CodeForces - 407C C - Curious Array 高阶差分序列
  18. 把握出租车行驶的数据脉搏 :出租车轨迹数据给你答案!
  19. pyqt QLabel详细用法
  20. React16版本更新的新特性

热门文章

  1. 在巴塞罗那,华为挥别昨日 | MWC 2019
  2. 为什么数据挖掘很难成功?
  3. Python requests模块相关接口
  4. 解析不是utf-8的xml文件 附(tag 属性的获取 )
  5. Windows 2003 服务器播放FLV的问题解决
  6. BZOJ1068:[SCOI2007]压缩——题解
  7. UITableView全面解析
  8. Linux内核抢占实现机制分析【转】
  9. 【第二篇】Volley的使用之加载图片
  10. JBPM6教程-10分钟玩转JBPM工作台