目录

  • 前言
  • 以【消防】工作来形象类比
  • 实时任务大体流程介绍
    • Ingest 阶段
    • Persist 阶段
    • Merge 阶段
    • Hand off 阶段
  • 任务的提交到启动
    • 任务的提交
      • 相关源码分析
    • 任务队列和overlord转发到middleManager
      • TaskQueue 源码
      • RemoteTaskRunner & 任务状态(pending, waiting, running, completed)
      • 任务分配给worker策略选择
    • Peon进程的启动
      • CliMiddleManager: WorkerTaskMonitor & ForkingTaskRunner
      • ForkingTaskRunner 启动peon进程
  • 任务的消费,持久化
    • task.json & peon configuration
    • CliPeon & RealtimePlumber
      • CliPeon
      • RealtimeIndexTask --> RealtimePlumber
      • RealtimePlumber#startJob
      • tranquility & EventReceiverFirehoseFactory#connect
    • EventReceiverFirehoseFactory & RealtimePlumber 数据处理流程
      • 实时数据处理的大概流程图
      • EventReceiverFirehose buffer的使用?
      • 什么时候需要持久化?
    • 一个实际的index_realtime task log文件
  • 总结

前言

tranquility 配合 index_realtime (备份,窗口等)完成实时数据流的持久化,查询等处理。
本文主要介绍其中的流程和细节,并补充上在生产环境遇到的一些问题和解决方案,用以加深对实时任务处理的理解,也是巩固和学习;如有任何错误或不足之处请联系改正,欢迎交流。

以【消防】工作来形象类比

因为druid.io源码不少类都用了【消防】相关的术语,所以类比下是有必要的,阅读源代码也会相对容易一点。其中【消防员】这个角色尤为重要,负责了主要的接【消防水带】,开【消防栓】灭火工作。

【实时数据】 —> 水流 ----> 数据来源
【处理任务主体对象】—>【消防员】----> 核心处理
【持久化】—> [要灭的火] —> 数据落地

实时任务大体流程介绍

一般的,也是很多文章提到的,实时任务整个过程分为四个步骤;每个阶段完成不同的工作,也可以理解为index_realtime任务的生命周期(就如同学习Spring时Bean的生命周期一样,实时任务从创建到销毁也是有一套流程和各种细节)。

Ingest 阶段

实时流数据,采用LSM-Tree(Log-Structured Merge-Tree )将数据持有在内存中(JVM堆中)。

补充:这里消费有Buffer,Druid处理不过来Buffer容易满,并且配合tranquility有【反压】,生产上基于druid指标【ingest/events/processed】和【ingest/events/buffered】的告警或者借助分析上游Lag工作是经常有的事情。

Persist 阶段

当阈值maxRowsInMemory(例如 5万行)或 intermediatePersistPeriod(例如10分钟), 内存中的数据会被转换为列式存储物到磁盘上,为了保证实时窗口内已物化的Smoosh文件依然可以被查询,Druid使用内存文件映射方式(mmap)将Smoosh文件加载到直接内存(堆外内存)中,优化读取性能。

Merge 阶段

对于Persist阶段,会出现很多Smoosh碎片,小的碎片文件会严重影响后期的数据查询工作,所以在实时索引任务周期的末尾(略少于SegmentGranularity + WindowPeriod时长),产生back-groundtask,一方面是最后时间窗口内还有的数据,另一方面搜索本地磁盘所有已物化的Smoosh文件,并将其拼成Segment,也就是index.zip, 开始merge磁盘上的所有文件,生成Segment,准备Hand off。

Hand off 阶段

本阶段主要由Coordinator负责,Coordinator会将已完成的Segment信息注册到元信息库、上传 Deep Storage,并通知集群内Historical去加载该Segment,同时每隔一定时间间隔(默认1分钟)检查Handoff状态,如果成功,会在Zookeeper中申明已不服务该Segment,并执行下一个时间窗口内的索引任务;如果失败,Coordinator会进行反复尝试。

补充:生产上 Hand off 也是经常发生的事情,当然也必须做好检测告警工作,这块可以加上一些改进措施

一般产生原因有:

  1. historical磁盘用满了(当然这个也必须要有监控告警),segment分配不了了
  2. hisorical 任务产生过多的segments ,导致一时间分配不了实时的,因为实时总是时间窗口之后进行hand off
  3. coordinator分配特定的算法,当集群规模越来越大,segments超过几百万的时候,经常容易协调不过来,线程卡住问题(实际运维前期也经常通过重启coordinator来解决的)
  4. 其它一些莫名原因,需要具体排查

Hand off 导致实时任务一直无法移交完成,占用这middleManager上的worker资源,这个是个很大的问题

任务的提交到启动

上面虽然介绍了任务的整个大致流程,但是实际的设计和处理确是有很多细节的,流程比较多,借助了zookeeper ,每一块都是一个知识点,也都在生产过程中遇到不少问题,本文会结合源码来阐述实际的运行原理

任务的提交

任务的创建,都知道是调用overlord的api去创建, 类似如下;任务具体的配置在官网也有详细的介绍

curl -X POST \http://localhost:8090/druid/indexer/v1/task \-H 'Content-Type: application/json' \-d '{"type" : "index_realtime","id" : "index_realtime_mv2_2020-11-13T08:00:00.000Z_0_2","resource" : {"availabilityGroup" : "mv2-2020-11-13T08:00:00.000Z-0000","requiredCapacity" : 1},"spec" : {"dataSchema" : {"dataSource" : "mv2","parser" : {"type" : "map","parseSpec" : {"format" : "json","timestampSpec" : {"column" : "time","format" : "millis","missingValue" : null},"dimensionsSpec" : {"dimensions" : [ "url", "user" ],"spatialDimensions" : [ ]}}},"metricsSpec" : [ {"type" : "count","name" : "views"}, {"type" : "doubleSum","name" : "latencyMs","fieldName" : "latencyMs","expression" : null} ],"granularitySpec" : {"type" : "uniform","segmentGranularity" : "HOUR","queryGranularity" : {"type" : "none"},"rollup" : true,"intervals" : null},"transformSpec" : {"filter" : null,"transforms" : [ ]}},"ioConfig" : {"type" : "realtime","firehose" : {"type" : "clipped","delegate" : {"type" : "timed","delegate" : {"type" : "receiver","serviceName" : "firehose:druid:overlord:mv2-008-0000-0000","bufferSize" : 100000},"shutoffTime" : "2020-11-13T09:15:00.000Z"},"interval" : "2020-11-13T08:00:00.000Z/2020-11-13T09:00:00.000Z"},"firehoseV2" : null},"tuningConfig" : {"type" : "realtime","maxRowsInMemory" : 1000,"intermediatePersistPeriod" : "PT3M","windowPeriod" : "PT10M","basePersistDirectory" : "/var/folders/mt/9sz41nps4_5_7rshbz3nkrsh0000gn/T/1605255000037-0","versioningPolicy" : {"type" : "intervalStart"},"rejectionPolicy" : {"type" : "none"},"maxPendingPersists" : 0,"shardSpec" : {"type" : "linear","partitionNum" : 0},"indexSpec" : {"bitmap" : {"type" : "concise"},"dimensionCompression" : "lz4","metricCompression" : "lz4","longEncoding" : "longs"},"buildV9Directly" : true,"persistThreadPriority" : 0,"mergeThreadPriority" : 0,"reportParseExceptions" : false,"handoffConditionTimeout" : 0,"alertTimeout" : 0,"segmentWriteOutMediumFactory" : null}},"context" : { },"groupId" : "index_realtime_mv2","dataSource" : "mv2"
}'

相关源码分析

创建任务的api对应到源码中:io.druid.indexing.overlord.http.OverlordResource#taskPost, 如下稍微加上了些个人注释

 @POST@Path("/task")@Consumes(MediaType.APPLICATION_JSON)@Produces(MediaType.APPLICATION_JSON)public Response taskPost(final Task task,@Context final HttpServletRequest req){final String dataSource = task.getDataSource();final ResourceAction resourceAction = new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE),Action.WRITE);Access authResult = AuthorizationUtils.authorizeResourceAction(req,resourceAction,authorizerMapper);if (!authResult.isAllowed()) {throw new ForbiddenException(authResult.getMessage());}/*** overlord leader 接收任务** TaskMaster 监听 overlord leader变化,维持 TaskQueue, TaskRunner** MetadataTaskStorage 操作元数据 druid_tasks 表,提供对任务元数据操作**/return asLeaderWith( // 只有master节点接受任务提交,其他slave节点返回503 codetaskMaster.getTaskQueue(), // taskMaster监听leader节点变更,当成为leader节点后,创建TaskQueue存放任务、taskRunner运行任务new Function<TaskQueue, Response>(){@Overridepublic Response apply(TaskQueue taskQueue){try {// 添加任务到元数据库中和任务队列中taskQueue.add(task);return Response.ok(ImmutableMap.of("task", task.getId())).build();}catch (EntryExistsException e) {return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error",StringUtils.format("Task[%s] already exists!", task.getId()))).build();}}});}

要点:

  1. overlord leader才负责任务接收,slave无用的
  2. 必须确保overlord的正常运行,否则任务提交可能失败;这里也要注意overlord的gc问题,Stop The World过长也会导致当时提交任务的失败

任务队列和overlord转发到middleManager

TaskQueue 源码

个人现阶段阅读学习源码经验:到处打log就好,可以先无脑打log,慢慢深入后,再分析,如下TaskQueueLifecycleStart方法开始

源码注释交代了用途:作为任务产生和运行的一个中间层,即任务产生后交给TaskQueue, 然后TaskQueue再交给taskRunner去运行(这里本地源码设置的taskRunner是RemoteTaskRunner类)

/*** Interface between task producers and the task runner.* <p/>* This object accepts tasks from producers using {@link #add} and manages delivery of these tasks to a* {@link TaskRunner}. Tasks will run in a mostly-FIFO order, with deviations when the natural next task is not ready* in time (based on its {@link Task#isReady} method).* <p/>* For persistence, we save all new tasks and task status changes using a {@link TaskStorage} object.*/
public class TaskQueue
{private final List<Task> tasks = Lists.newArrayList();private final Map<String, ListenableFuture<TaskStatus>> taskFutures = Maps.newHashMap();private final TaskQueueConfig config;private final TaskStorage taskStorage;private final TaskRunner taskRunner;/*** LocalTaskActionClientFactory* RemoteTaskActionClientFactory*/private final TaskActionClientFactory taskActionClientFactory;private final TaskLockbox taskLockbox;private final ServiceEmitter emitter;private final ReentrantLock giant = new ReentrantLock(true);private final Condition managementMayBeNecessary = giant.newCondition();private final ExecutorService managerExec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(false).setNameFormat("TaskQueue-Manager").build());private final ScheduledExecutorService storageSyncExec = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(false).setNameFormat("TaskQueue-StorageSync").build());private volatile boolean active = false;private static final EmittingLogger log = new EmittingLogger(TaskQueue.class);@Injectpublic TaskQueue(TaskQueueConfig config,TaskStorage taskStorage,TaskRunner taskRunner,TaskActionClientFactory taskActionClientFactory,TaskLockbox taskLockbox,ServiceEmitter emitter){this.config = Preconditions.checkNotNull(config, "config");this.taskStorage = Preconditions.checkNotNull(taskStorage, "taskStorage");this.taskRunner = Preconditions.checkNotNull(taskRunner, "taskRunner");this.taskActionClientFactory = Preconditions.checkNotNull(taskActionClientFactory, "taskActionClientFactory");this.taskLockbox = Preconditions.checkNotNull(taskLockbox, "taskLockbox");this.emitter = Preconditions.checkNotNull(emitter, "emitter");}

主要方法逻辑在:io.druid.indexing.overlord.TaskQueue#manage

  1. io.druid.indexing.overlord.HeapMemoryTaskStorage会添加任务到自己内部结构中,并打印log Inserting task index_realtime_mv2_2020-11-14T01:00:00.000Z_0_1 with status: TaskStatus{id=index_realtime_mv2_2020-11-14T01:00:00.000Z_0_1, status=RUNNING, duration=-1}
  2. TaskQueue 会将任务存储到io.druid.indexing.overlord.TaskQueue#tasks,然后遍历交给taskRunner, 并返回ListenableFuture<TaskStatus>可以对任务状态进行监听

RemoteTaskRunner & 任务状态(pending, waiting, running, completed)

TaskQueue的代码执行到:io.druid.indexing.overlord.RemoteTaskRunner#run

 /*** A task will be run only if there is no current knowledge in the RemoteTaskRunner of the task.** @param task task to run*/@Overridepublic ListenableFuture<TaskStatus> run(final Task task){final RemoteTaskRunnerWorkItem completeTask, runningTask, pendingTask;if ((pendingTask = pendingTasks.get(task.getId())) != null) {log.info("Assigned a task[%s] that is already pending, not doing anything", task.getId());return pendingTask.getResult();} else if ((runningTask = runningTasks.get(task.getId())) != null) {ZkWorker zkWorker = findWorkerRunningTask(task.getId());if (zkWorker == null) {log.warn("Told to run task[%s], but no worker has started running it yet.", task.getId());} else {log.info("Task[%s] already running on %s.", task.getId(), zkWorker.getWorker().getHost());TaskAnnouncement announcement = zkWorker.getRunningTasks().get(task.getId());if (announcement.getTaskStatus().isComplete()) {taskComplete(runningTask, zkWorker, announcement.getTaskStatus());}}return runningTask.getResult();} else if ((completeTask = completeTasks.get(task.getId())) != null) {return completeTask.getResult();} else {return addPendingTask(task).getResult();}}

我们都熟悉overlord任务管理界面任务是有不同状态的,如下图,而上面的代码就是入口

如果正常提交新任务,那么是从io.druid.indexing.overlord.RemoteTaskRunner#addPendingTask开始这个任务状态的各种变化的

/*** Adds a task to the pending queue*/private RemoteTaskRunnerWorkItem  addPendingTask(final Task task){log.info("Added pending task %s", task.getId());final RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(task.getId(), null, null);pendingTaskPayloads.put(task.getId(), task);pendingTasks.put(task.getId(), taskRunnerWorkItem);runPendingTasks();return taskRunnerWorkItem;}

接下来的io.druid.indexing.overlord.RemoteTaskRunner#runPendingTasks是重点,任务必须交给一个worker去执行,如何分配选择这是个问题?

/*** 此方法使用多线程执行程序来提取所有pending的任务并尝试运行它们。* 被成功分配worker的任务将从pendingTasks移动到runningTasks。 此方法是线程安全的。* 有新worker或新任务被分配时,都应运行此方法。** This method uses a multi-threaded executor to extract all pending tasks and attempt to run them. Any tasks that* are successfully assigned to a worker will be moved from pendingTasks to runningTasks. This method is thread-safe.* This method should be run each time there is new worker capacity or if new tasks are assigned.*/private void runPendingTasks(){log.info("start runPendingTasks");runPendingTasksExec.submit(new Callable<Void>(){@Overridepublic Void call() throws Exception{try {// make a copy of the pending tasks because tryAssignTask may delete tasks from pending and move them// into running statusList<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());sortByInsertionTime(copy);for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {String taskId = taskRunnerWorkItem.getTaskId();log.info("runPendingTasks task:%s", taskId);if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {try {//this can still be null due to race from explicit task shutdown request//or if another thread steals and completes this task right after this thread makes copy//of pending tasks. See https://github.com/druid-io/druid/issues/2842 .Task task = pendingTaskPayloads.get(taskId);if (task != null && tryAssignTask(task, taskRunnerWorkItem)) {pendingTaskPayloads.remove(taskId);}}catch (Exception e) {log.makeAlert(e, "Exception while trying to assign task").addData("taskId", taskRunnerWorkItem.getTaskId()).emit();RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);if (workItem != null) {taskComplete(workItem, null, TaskStatus.failure(taskId));}}finally {tryAssignTasks.remove(taskId);}}}}catch (Exception e) {log.makeAlert(e, "Exception in running pending tasks").emit();}return null;}});}

任务分配给worker策略选择

接着上一步骤,代码将走到io.druid.indexing.overlord.RemoteTaskRunner#tryAssignTask

我们知道官网文档上是给出了worker select strategy的,所以源码也是按照策略选择的,上面截图中可以看到策略是io.druid.indexing.overlord.setup.EqualDistributionWorkerSelectStrategy

简要步骤如下

// 根据具体策略找到一个worker
immutableZkWorker = strategy.findWorkerForTask()
// 根据找到的worker从zk获取worker的具体信息
assignedWorker = zkWorkers.get(immutableZkWorker.getWorker().getHost());
// 发布任务:
// 1. zk要创建任务worker/running任务节点
// 2. pendingTasks要移除该任务,并设置为running状态加入到`RemoteTaskRunner`的runningTasks队列中
announceTask(task, assignedWorker, taskRunnerWorkItem);

Peon进程的启动

io.druid.indexing.overlord.RemoteTaskRunner#start的zookeeper注册监听能知道worker节点任务的变化,在日志中也能看到任务状态和location的改变,如下图

CliMiddleManager: WorkerTaskMonitor & ForkingTaskRunner

middleManager进程启动后,注册了WorkerTaskMonitor对worker任务的监听,有了运行任务会存储到private final BlockingQueue<Notice> notices = new LinkedBlockingDeque<>();队列中,然后WorkerTaskMonitor的主循环方法不断到获取队列中任务交给ForkingTaskRunner去运行即可

private void mainLoop(){try {while (!Thread.currentThread().isInterrupted()) {final Notice notice = notices.take();try {notice.handle();}catch (InterruptedException e) {// Will be caught and logged in the outer try blockthrow e;}catch (Exception e) {log.makeAlert(e, "Failed to handle notice").addData("noticeClass", notice.getClass().getSimpleName()).addData("noticeTaskId", notice.getTaskId()).emit();}}}catch (InterruptedException e) {log.info("WorkerTaskMonitor interrupted, exiting.");}finally {doneStopping.countDown();}}

重点关注:io.druid.indexing.worker.WorkerTaskMonitor.RunNotice

 private class RunNotice implements Notice{private final Task task;public RunNotice(Task task){this.task = task;}@Overridepublic String getTaskId(){return task.getId();}@Overridepublic void handle() throws Exception{if (running.containsKey(task.getId())) {log.warn("Got run notice for task [%s] that I am already running...",task.getId());workerCuratorCoordinator.removeTaskRunZnode(task.getId());return;}log.info("Submitting runnable for task[%s]", task.getId());workerCuratorCoordinator.updateTaskStatusAnnouncement(TaskAnnouncement.create(task,TaskStatus.running(task.getId()),TaskLocation.unknown()));log.info("Affirmative. Running task [%s]", task.getId());workerCuratorCoordinator.removeTaskRunZnode(task.getId());final ListenableFuture<TaskStatus> future = taskRunner.run(task);addRunningTask(task, future);}}

所以真正的主角就是io.druid.indexing.overlord.ForkingTaskRunner#run

ForkingTaskRunner 启动peon进程

上面分析了一大堆,目前才开始peon;上面找到了任务要运行的middleManager,显然首先要在middleManager找一个端口,才能启动peon进程


可以看下PortFinder这个类,本文这里就不具体分析了;说下线上遇到的问题:找到了一个可用端口,但是起peon进程还是有个短暂过程的,这期间要是端口突然被占用了,就会导致peon进程启动失败,如下图·Address already in use的报错

peon进程通过Java Process启动起来,可以看下官网文档说明

https://druid.apache.org/docs/0.20.0/design/peons.html

任务的消费,持久化

peon进程后,任务才真正的开始工作:消费,查询,持久化等等;实际工作的任务参数配置是件重要的事情,需要足够清楚,在分析和排查问题时就会游刃有余

官网文档对任务相关介绍:http://druid.io/docs/0.12.3/ingestion/tasks.html

task.json & peon configuration

eg: tranquility实时任务运行后的一个task.json如下

{"type":"index_realtime","id":"index_realtime_mv2_2020-11-13T08:00:00.000Z_0_2","resource":{"aabilityGroup":"mv2-2020-11-13T08:00:00.000Z-0000","requiredCapacity":1},"spec":{"dataSchema":{"dataSource":"mv2","parser":{"type":"map","parseSpec":{"format":"json","timestampSpec":{"column":"time","format":"millis","missingValue":null},"dimensionsSpec":{"dimensions":["url","user"],"spatialDimensions":[]}}},"metricsSpec":[{"type":"count","name":"views"},{"type":"doubleSum","name":"latencyMs","fieldName":"latencyMs","expression":null}],"granularitySpec":{"type":"uniform","segmentGranularity":"HOUR","queryGranularity":{"type":"none"},"rollup":true,"intervals":null},"transformSpec":{"filter":null,"transforms":[]}},"ioConfig":{"type":"realtime","firehose":{"type":"clipped","delegate":{"type":"timed","delegate":{"type":"receiver","serviceName":"firehose:druid:overlord:mv2-008-0000-0000","bufferSize":100000},"shutoffTime":"2020-11-13T09:15:00.000Z"},"interval":"2020-11-13T08:00:00.000Z/2020-11-13T09:00:00.000Z"},"firehoseV2":null},"tuningConfig":{"type":"realtime","maxRowsInMemory":1000,"intermediatePersistPeriod":"PT3M","windowPeriod":"PT10M","basePersistDirectory":"/var/folders/mt/9sz41nps4_5_7rshbz3nkrsh0000gn/T/1605255000037-0","versioningPolicy":{"type":"intervalStart"},"rejectionPolicy":{"type":"none"},"maxPendingPersists":0,"shardSpec":{"type":"linear","partitionNum":0},"indexSpec":{"bitmap":{"type":"concise"},"dimensionCompression":"lz4","metricCompression":"lz4","longEncoding":"longs"},"buildV9Directly":true,"persistThreadPriority":0,"mergeThreadPriority":0,"reportParseExceptions":false,"handoffConditionTimeout":0,"alertTimeout":0,"segmentWriteOutMediumFactory":null}},"context":{},"groupId":"index_realtime_mv2","dataSource":"mv2"
}

相关配置说明可参见官方文档,可以看最新的文档:https://druid.apache.org/docs/latest/configuration/index.html

tuningConfig中的几个重要参数

配置项 描述 默认值 中文说明
maxRowsInMemory The maximum number of records to store in memory before persisting to disk. Note that this is the number of rows post-rollup, and so it may not be equal to the number of input records. Ingested records will be persisted to disk when either maxRowsInMemory or maxBytesInMemory are reached (whichever happens first). 1000000 数据在peon内存中需要持久化到本地文件,可以聚合后配置超过多少数据量就开始持久化
intermediatePersistPeriod 10min 持久化也可以消费多少分钟的数据后就开始持久化,跟maxRowsInMemory配置项功能类似
windowPeriod 10min tranquility的时间窗口概念,比如实时数据延迟较大,这里就可以稍微配置大一点
basePersistDirectory 持久化文件和任务json,log相关信息是在本地文件目录的,这里配置持久化根目录


平时看任务log,可以在io.druid.indexing.overlord.ForkingTaskRunner看到如下的log,任务日志是写到了本地文件中的

Logging task index_realtime_mv2_2020-11-14T03:00:00.000Z_0_1 output to: /var/folders/mt/9sz41nps4_5_7rshbz3nkrsh0000gn/T/persistent/task/index_realtime_mv2_2020-11-14T03:00:00.000Z_0_1/log

ioConfig中有个bufferSize是需要注意的

配置项 描述 默认值 中文说明
bufferSize druid处理消息内部也是缓冲处理的,即数据加入到buffer,druid从buffer取,然后建立索引,这个bufferSize 指名了buffer的大小;生产上如果这个buffer一直满这的,那么一般说明druid处理不过来了,这里也需要进行指标的监控告警

CliPeon & RealtimePlumber

本文最开始类比了消防作业,说明了【消防员】这个角色的重要行,在druid.io源码则是RealtimePlumber这个重要类,了解数据的接收处理,必须明白RealtimePlumber

CliPeon

前面ForkingTaskRunner启动了Peon进程,但是Peon进程在做什么没有说明,这个必须要了解下,整个源码如下,跟其它进程没什么两样:生命周期,回调,模块绑定等。查看任务log,结合源码是比不可少的

/***/
@Command(name = "peon",description = "Runs a Peon, this is an individual forked \"task\" used as part of the indexing service. "+ "This should rarely, if ever, be used directly. See http://druid.io/docs/latest/design/peons.html for a description"
)
public class CliPeon extends GuiceRunnable
{@Arguments(description = "task.json status.json", required = true)public List<String> taskAndStatusFile;@Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK")public String nodeType = "indexer-executor";private static final Logger log = new Logger(CliPeon.class);@Injectprivate Properties properties;public CliPeon(){super(log);}@Overrideprotected List<? extends Module> getModules(){return ImmutableList.<Module>of(new DruidProcessingModule(),new QueryableModule(),new QueryRunnerFactoryModule(),new Module(){@Overridepublic void configure(Binder binder){binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/peon");binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);PolyBind.createChoice(binder,"druid.indexer.task.chathandler.type",Key.get(ChatHandlerProvider.class),Key.get(ServiceAnnouncingChatHandlerProvider.class));final MapBinder<String, ChatHandlerProvider> handlerProviderBinder = PolyBind.optionBinder(binder, Key.get(ChatHandlerProvider.class));handlerProviderBinder.addBinding("announce").to(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class);handlerProviderBinder.addBinding("noop").to(NoopChatHandlerProvider.class).in(LazySingleton.class);binder.bind(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class);binder.bind(NoopChatHandlerProvider.class).in(LazySingleton.class);binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);JsonConfigProvider.bind(binder, "druid.peon.taskActionClient.retry", RetryPolicyConfig.class);configureTaskActionClient(binder);binder.bind(RetryPolicyFactory.class).in(LazySingleton.class);// Build it to make it bind even if nothing binds to it.Binders.dataSegmentKillerBinder(binder);binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class);Binders.dataSegmentMoverBinder(binder);binder.bind(DataSegmentMover.class).to(OmniDataSegmentMover.class).in(LazySingleton.class);Binders.dataSegmentArchiverBinder(binder);binder.bind(DataSegmentArchiver.class).to(OmniDataSegmentArchiver.class).in(LazySingleton.class);binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class);LifecycleModule.register(binder, ExecutorLifecycle.class);binder.bind(ExecutorLifecycleConfig.class).toInstance(new ExecutorLifecycleConfig().setTaskFile(new File(taskAndStatusFile.get(0))).setStatusFile(new File(taskAndStatusFile.get(1))));binder.bind(TaskRunner.class).to(ThreadPoolTaskRunner.class);binder.bind(QuerySegmentWalker.class).to(ThreadPoolTaskRunner.class);binder.bind(ThreadPoolTaskRunner.class).in(ManageLifecycle.class);JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class);binder.install(new CacheModule());JsonConfigProvider.bind(binder,"druid.segment.handoff",CoordinatorBasedSegmentHandoffNotifierConfig.class);binder.bind(SegmentHandoffNotifierFactory.class).to(CoordinatorBasedSegmentHandoffNotifierFactory.class).in(LazySingleton.class);// Override the default SegmentLoaderConfig because we don't actually care about the// configuration based locations.  This will override them anyway.  This is also stopping// configuration of other parameters, but I don't think that's actually a problem.// Note, if that is actually not a problem, then that probably means we have the wrong abstraction.binder.bind(SegmentLoaderConfig.class).toInstance(new SegmentLoaderConfig().withLocations(Arrays.<StorageLocationConfig>asList()));binder.bind(CoordinatorClient.class).in(LazySingleton.class);binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);Jerseys.addResource(binder, SegmentListerResource.class);binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(ServerType.fromString(nodeType)));LifecycleModule.register(binder, Server.class);}private void configureTaskActionClient(Binder binder){PolyBind.createChoice(binder,"druid.peon.mode",Key.get(TaskActionClientFactory.class),Key.get(RemoteTaskActionClientFactory.class));final MapBinder<String, TaskActionClientFactory> taskActionBinder = PolyBind.optionBinder(binder, Key.get(TaskActionClientFactory.class));taskActionBinder.addBinding("local").to(LocalTaskActionClientFactory.class).in(LazySingleton.class);// all of these bindings are so that we can run the peon in local modeJsonConfigProvider.bind(binder, "druid.indexer.storage", TaskStorageConfig.class);binder.bind(TaskStorage.class).to(HeapMemoryTaskStorage.class).in(LazySingleton.class);binder.bind(TaskActionToolbox.class).in(LazySingleton.class);binder.bind(IndexerMetadataStorageCoordinator.class).to(IndexerSQLMetadataStorageCoordinator.class).in(LazySingleton.class);taskActionBinder.addBinding("remote").to(RemoteTaskActionClientFactory.class).in(LazySingleton.class);}@Provides@LazySingletonpublic Task readTask(@Json ObjectMapper mapper, ExecutorLifecycleConfig config){try {return mapper.readValue(config.getTaskFile(), Task.class);}catch (IOException e) {throw Throwables.propagate(e);}}@Provides@LazySingleton@Named(DataSourceTaskIdHolder.DATA_SOURCE_BINDING)public String getDataSourceFromTask(final Task task){return task.getDataSource();}@Provides@LazySingleton@Named(DataSourceTaskIdHolder.TASK_ID_BINDING)public String getTaskIDFromTask(final Task task){return task.getId();}@Providespublic SegmentListerResource getSegmentListerResource(@Json ObjectMapper jsonMapper,@Smile ObjectMapper smileMapper,@Nullable BatchDataSegmentAnnouncer announcer){return new SegmentListerResource(jsonMapper,smileMapper,announcer,null);}},new QueryablePeonModule(),new IndexingServiceFirehoseModule(),new ChatHandlerServerModule(properties),new LookupModule());}@Overridepublic void run(){try {Injector injector = makeInjector();try {final Lifecycle lifecycle = initLifecycle(injector);final Thread hook = new Thread(new Runnable(){@Overridepublic void run(){log.info("Running shutdown hook");lifecycle.stop();}});Runtime.getRuntime().addShutdownHook(hook);injector.getInstance(ExecutorLifecycle.class).join();// Sanity check to help debug unexpected non-daemon threadsfinal Set<Thread> threadSet = Thread.getAllStackTraces().keySet();for (Thread thread : threadSet) {if (!thread.isDaemon() && thread != Thread.currentThread()) {log.info("Thread [%s] is non daemon.", thread);}}// Explicitly call lifecycle stop, dont rely on shutdown hook.lifecycle.stop();try {Runtime.getRuntime().removeShutdownHook(hook);}catch (IllegalStateException e) {log.warn("Cannot remove shutdown hook, already shutting down");}}catch (Throwable t) {log.error(t, "Error when starting up.  Failing.");System.exit(1);}log.info("Finished peon task");}catch (Exception e) {throw Throwables.propagate(e);}}
}

peon将任务交给io.druid.indexing.overlord.ThreadPoolTaskRunner#run执行, 具体则是ThreadPoolTaskRunnerCallable的call方法

 private class ThreadPoolTaskRunnerCallable implements Callable<TaskStatus>{private final Task task;private final TaskLocation location;private final TaskToolbox toolbox;public ThreadPoolTaskRunnerCallable(Task task, TaskLocation location, TaskToolbox toolbox){this.task = task;this.location = location;this.toolbox = toolbox;}@Overridepublic TaskStatus call(){final long startTime = System.currentTimeMillis();TaskStatus status;try {log.info("Running task: %s", task.getId());TaskRunnerUtils.notifyLocationChanged(listeners,task.getId(),location);TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.running(task.getId()));status = task.run(toolbox);}catch (InterruptedException e) {// Don't reset the interrupt flag of the thread, as we do want to continue to the end of this callable.if (stopping) {// Tasks may interrupt their own run threads to stop themselves gracefully; don't be too scary about this.log.debug(e, "Interrupted while running task[%s] during graceful shutdown.", task);} else {// Not stopping, this is definitely unexpected.log.warn(e, "Interrupted while running task[%s]", task);}status = TaskStatus.failure(task.getId());}catch (Exception e) {log.error(e, "Exception while running task[%s]", task);status = TaskStatus.failure(task.getId());}catch (Throwable t) {log.error(t, "Uncaught Throwable while running task[%s]", task);throw t;}status = status.withDuration(System.currentTimeMillis() - startTime);TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);return status;}}

我们这里是io.druid.indexing.common.task.RealtimeIndexTask#run

RealtimeIndexTask --> RealtimePlumber

上面从peon进程走到了RealtimeIndexTask的run方法真正开始运行任务,但事实上这里我们才刚开始,下面先简单但介绍下任务这里流程(这里很多流程就和类比的消防流程差不多了)

io.druid.indexing.common.task.RealtimeIndexTask#run

// segment发布用
final SegmentPublisher segmentPublisher = new TaskActionSegmentPublisher(toolbox);
final DataSegmentAnnouncer lockingSegmentAnnouncer = new DataSegmentAnnouncer()
FireDepartment fireDepartment = new FireDepartment(...
PlumberSchool plumberSchool = new RealtimePlumberSchool(...
this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, metrics);
try {toolbox.getDataSegmentServerAnnouncer().announce();toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);plumber.startJob();// Set up metrics emissiontoolbox.getMonitorScheduler().addMonitor(metricsMonitor);// Firehose temporary directory is automatically removed when this RealtimeIndexTask completes.FileUtils.forceMkdir(firehoseTempDir);// Delay firehose connection to avoid claiming input resources while the plumber is starting up.final FirehoseFactory firehoseFactory = spec.getIOConfig().getFirehoseFactory();final boolean firehoseDrainableByClosing = isFirehoseDrainableByClosing(firehoseFactory);// Skip connecting firehose if we've been stopped before we got started.synchronized (this) {if (!gracefullyStopped) {firehose = firehoseFactory.connect(spec.getDataSchema().getParser(), firehoseTempDir);committerSupplier = Committers.supplierFromFirehose(firehose);}}// Time to read data!while (firehose != null && (!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) {Plumbers.addNextRow(committerSupplier,firehose,plumber,tuningConfig.isReportParseExceptions(),metrics);}}catch (Throwable e) {normalExit = false;log.makeAlert(e, "Exception aborted realtime processing[%s]", dataSchema.getDataSource()).emit();throw e;}

这里的plumber就是RealtimePlumber,所以io.druid.segment.realtime.plumber.RealtimePlumber#startJob这个是开始RealtimePlumber(消防员/水管工)的真正工作

同时要注意到firehose = firehoseFactory.connect(spec.getDataSchema().getParser(), firehoseTempDir);, 实时数据流是通过firehoseFactory来接收的(firehose英文意思就是:消防水带的意思,水在水管中流过来,那么数据在firehoseFactory进来,这么理解会清晰一点)

RealtimePlumber#startJob

io.druid.segment.realtime.plumber.RealtimePlumber#startJob 代码如下,很简单的一个流程,这里可以看到: persistmergehand off 这几个工作都有

@Overridepublic Object startJob(){computeBaseDir(schema).mkdirs();initializeExecutors();handoffNotifier.start();Object retVal = bootstrapSinksFromDisk();startPersistThread();// Push pending sinks bootstrapped from previous runmergeAndPush();resetNextFlush();return retVal;}

tranquility & EventReceiverFirehoseFactory#connect

tranquility不是本文重点,这里简单介绍一下:

tranquility将数据以POST请求的方式提交给EventReceiverFirehose(会丢弃所有时间窗口外的数据),当到达任务到达(SegmentGranularity+WindowPeriod)时,TimedShutoffFirehose会自动关闭Firehose,此时Segment会进行合并、注册元信息、存储到Deep Storage中并等待Handoff,当某个Historical Node声明自己已加载该Segment后,Indexing Service Task会正常退出。

tranquility post提交数据的api接口是EventReceiverFirehoseFactory的/push-events, 如下图

EventReceiverFirehoseFactory & RealtimePlumber 数据处理流程

上面的内容都是铺垫,EventReceiverFirehoseFactory & RealtimePlumber 这两个类是数据处理的重点,要研究数据的流程,这两个类就必须好好分析。

实时数据处理的大概流程图

EventReceiverFirehose buffer的使用?

io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory.EventReceiverFirehose#addRows

这里的buffer就是个数组阻塞队列:BlockingQueue<InputRow> buffer = new ArrayBlockingQueue<>(bufferSize);

public void addRows(Iterable<InputRow> rows) throws InterruptedException
{for (final InputRow row : rows) {boolean added = false;while (!closed && !added) {added = buffer.offer(row, 500, TimeUnit.MILLISECONDS);if (!added) {long currTime = System.currentTimeMillis();long lastTime = lastBufferAddFailMsgTime.get();if (currTime - lastTime > 10000 && lastBufferAddFailMsgTime.compareAndSet(lastTime, currTime)) {log.warn("Failed to add event to buffer with current size [%s] . Retrying...", buffer.size());}}}if (!added) {throw new IllegalStateException("Cannot add events to closed firehose!");}}
}

什么时候需要持久化?

io.druid.segment.realtime.plumber.RealtimePlumber#add

@Override
public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
{long messageTimestamp = row.getTimestampFromEpoch();final Sink sink = getSink(messageTimestamp);metrics.reportMessageMaxTimestamp(messageTimestamp);if (sink == null) {return -1;}final int numRows = sink.add(row, false);if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) {persist(committerSupplier.get());}return numRows;
}

持久化的条件就是if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush)这个判断

  • nextFlush 就是一个时间判断(这里就是前文说的intermediatePersistPeriod配置)
private void resetNextFlush()
{nextFlush = DateTimes.nowUtc().plus(config.getIntermediatePersistPeriod()).getMillis();
}
  • !sink.canAppendRow()

判断的是当前currHydrant

public boolean canAppendRow()
{synchronized (hydrantLock) {return writable && currHydrant != null && currHydrant.getIndex().canAppendRow();}
}

具体是:io.druid.segment.incremental.OnheapIncrementalIndex#canAppendRow,可以看到就是当前Hydrant聚合后行数是否超过了一个阈值(也即前文说的maxRowsInMemory配置)

@Override
public boolean canAppendRow(){final boolean canAdd = size() < maxRowCount;if (!canAdd) {outOfRowsReason = StringUtils.format("Maximum number of rows [%d] reached", maxRowCount);}return canAdd;}

所以最后就是这两个条件满足其一,就会进行持久化操作,没那么复杂

一个实际的index_realtime task log文件




每次持久化都会出现如下的log信息


通常的,当移交超时的时候会看到疯狂的log:Still waiting for Handoff for Segments

总结

本文结合源码和实际例子基本介绍了index_realtime任务从提交到执行的完整流程,但是实际上还是有很多细节没有讲到,后续也需要继续补充。也是作为个人源码阅读和实际工作对这部分内容的理解和总结

----2020年11月14日 星期六 18时03分18秒 CST

Druid.io index_realtime实时任务源码分析相关推荐

  1. iostat IO统计原理linux内核源码分析----基于单通道SATA盘

    iostat IO统计原理linux内核源码分析----基于单通道SATA盘 先上一个IO发送submit_bio流程图,本文基本就是围绕该流程讲解. 内核版本 3.10.96 详细的源码注释:htt ...

  2. druid.io index_realtime任务的hand off:仍然是源码+log说清楚

    目录 前言 源码 + log 说明流程 总结 前言 之前的博文:Druid.io index_realtime实时任务源码分析 介绍了整个index_realtime任务运行的流程,但是对于某些细节还 ...

  3. Rxjava源码分析之IO.Reactivex.Observable

    Rxjava 源码系列目录 Rxjava源码分析之IO.Reactivex.Observer Rxjava源码分析之IO.Reactivex.CompositeDisposable Rxjava源码分 ...

  4. Rxjava源码分析之IO.Reactivex.CompositeDisposable

    Rxjava 源码系列目录 Rxjava源码分析之IO.Reactivex.Observer Rxjava源码分析之IO.Reactivex.CompositeDisposable Rxjava源码分 ...

  5. coredns源码分析

    CoreDNS是使用go语言编写的快速灵活的DNS服务,采用链式插件模式,每个插件实现独立的功能,底层协议可以是tcp/udp,也可以是TLS,gRPC等.默认监听所有ip地址,可使用bind插件指定 ...

  6. ceph bluestore源码分析:admin_socket实时获取内存池数据

    环境: 版本:ceph 12.2.1 部署完cephfs 使用ceph-fuse挂载,并写入数据 关键参数: debug_mempool = true 将该参数置为true即可查看详细的blustor ...

  7. 【Linux 内核】实时调度类 ② ( 实时调度实体 sched_rt_entity 源码分析 | run_list、timeout、watchdog_stamp、time_slice 字段 )

    文章目录 一.sched_rt_entity 源码分析 1.run_list 字段 2.timeout 字段 3.watchdog_stamp 字段 4.time_slice 字段 5.back 字段 ...

  8. Apache Storm 实时流处理系统通信机制源码分析

    我们今天就来仔细研究一下Apache Storm 2.0.0-SNAPSHOT的通信机制.下面我将从大致思想以及源码分析,然后我们细致分析实时流处理系统中源码通信机制研究. 1. 简介 Worker间 ...

  9. Thrift异步IO服务器源码分析

    http://yanyiwu.com/work/2014/12/06/thrift-tnonblockingserver-analysis.html 最近在使用 libevent 开发项目,想起之前写 ...

最新文章

  1. java 密钥工厂 desede_20145212 实验五《Java网络编程》
  2. 明确REST接口开发的核心任务
  3. 字节跳动内部学习资料泄露!kafka入门教程
  4. 关不关机 扫地机器人_【小米智能家居】米家扫拖机器人,模拟人工来回擦拖地!...
  5. HttpServletRequest类用途
  6. 实力打脸: 量子隐形传输与 “瞬间移动” 毫无关系
  7. python语言数据挖掘python语言数据_Python语言数据挖掘01-环境搭建
  8. Windows下IIS+PHP 5.2的安装与配置
  9. mysql报904_数据库错误码[-904]
  10. pyqt与mysql例子_PyQt 连接MySql数据库,C++代码转Python3代码
  11. 电脑开机一会就蓝屏怎么回事_电脑蓝屏怎么回事
  12. C#根据身份证获出生日期和性别---含C#代码
  13. 【Matlab学习笔记】【函数学习】eps
  14. UNIX/Linux系统结构
  15. 汉诺塔C语言实现(纯代码)
  16. egg服务重启及child_process的使用
  17. 为什么要玩FLTK(Fast Light Tool Kit)
  18. 属性动画与图片三级缓存
  19. 【单片机基础】(四)单片机的引脚功能
  20. 微信支付委托代扣的服务商模式和直连模式

热门文章

  1. 微信公众号开发系列-12、微信前端开发利器:WeUI
  2. mc33063,mc34063发热问题
  3. Gatekeeper代码导读
  4. Android中的PendingIntent 原理
  5. [软件工程] 面向对象分析
  6. Python网络爬虫实践简答题
  7. 12星座之追女必杀技~
  8. 稳压二极管串联电阻计算
  9. 银行卡收单业务____单边账___现实生活中单边账的处理
  10. 魔众EDM邮件营销系统 v1.0.0 专业的EDM邮件营销系统