Druid.io index_realtime实时任务源码分析
目录
- 前言
- 以【消防】工作来形象类比
- 实时任务大体流程介绍
- Ingest 阶段
- Persist 阶段
- Merge 阶段
- Hand off 阶段
- 任务的提交到启动
- 任务的提交
- 相关源码分析
- 任务队列和overlord转发到middleManager
- TaskQueue 源码
- RemoteTaskRunner & 任务状态(pending, waiting, running, completed)
- 任务分配给worker策略选择
- Peon进程的启动
- CliMiddleManager: WorkerTaskMonitor & ForkingTaskRunner
- ForkingTaskRunner 启动peon进程
- 任务的消费,持久化
- task.json & peon configuration
- CliPeon & RealtimePlumber
- CliPeon
- RealtimeIndexTask --> RealtimePlumber
- RealtimePlumber#startJob
- tranquility & EventReceiverFirehoseFactory#connect
- EventReceiverFirehoseFactory & RealtimePlumber 数据处理流程
- 实时数据处理的大概流程图
- EventReceiverFirehose buffer的使用?
- 什么时候需要持久化?
- 一个实际的index_realtime task log文件
- 总结
前言
tranquility 配合 index_realtime (备份,窗口等)完成实时数据流的持久化,查询等处理。
本文主要介绍其中的流程和细节,并补充上在生产
环境遇到的一些问题和解决方案,用以加深对实时任务处理的理解,也是巩固和学习;如有任何错误或不足之处请联系改正,欢迎交流。
以【消防】工作来形象类比
因为druid.io
源码不少类都用了【消防】相关的术语,所以类比下是有必要的,阅读源代码也会相对容易一点。其中【消防员】这个角色尤为重要,负责了主要的接【消防水带】,开【消防栓】灭火工作。
【实时数据】 —> 水流 ----> 数据来源
【处理任务主体对象】—>【消防员】----> 核心处理
【持久化】—> [要灭的火] —> 数据落地
实时任务大体流程介绍
一般的,也是很多文章提到的,实时任务整个过程分为四个步骤;每个阶段完成不同的工作,也可以理解为index_realtime任务的生命周期
(就如同学习Spring时Bean的生命周期一样,实时任务从创建到销毁也是有一套流程和各种细节)。
Ingest 阶段
实时流数据,采用LSM-Tree(Log-Structured Merge-Tree )将数据持有在内存中(JVM堆中)。
补充:这里消费有Buffer,Druid处理不过来Buffer容易满,并且配合tranquility有【反压】,生产上基于druid指标【ingest/events/processed】和【ingest/events/buffered】的告警或者借助分析上游Lag
工作是经常有的事情。
Persist 阶段
当阈值maxRowsInMemory(例如 5万行)或 intermediatePersistPeriod(例如10分钟), 内存中的数据会被转换为列式存储物到磁盘上,为了保证实时窗口内已物化的Smoosh文件依然可以被查询,Druid使用内存文件映射方式(mmap)将Smoosh文件加载到直接内存(堆外内存)中,优化读取性能。
Merge 阶段
对于Persist阶段,会出现很多Smoosh碎片,小的碎片文件会严重影响后期的数据查询工作,所以在实时索引任务周期的末尾(略少于SegmentGranularity + WindowPeriod时长),产生back-groundtask,一方面是最后时间窗口内还有的数据,另一方面搜索本地磁盘所有已物化的Smoosh文件,并将其拼成Segment,也就是index.zip, 开始merge磁盘上的所有文件,生成Segment,准备Hand off。
Hand off 阶段
本阶段主要由Coordinator负责,Coordinator会将已完成的Segment信息注册到元信息库、上传 Deep Storage,并通知集群内Historical去加载该Segment,同时每隔一定时间间隔(默认1分钟)检查Handoff状态,如果成功,会在Zookeeper中申明已不服务该Segment,并执行下一个时间窗口内的索引任务;如果失败,Coordinator会进行反复尝试。
补充:生产上 Hand off 也是经常发生的事情,当然也必须做好检测告警工作,这块可以加上一些改进措施
一般产生原因有:
- historical磁盘用满了(当然这个也必须要有监控告警),segment分配不了了
- hisorical 任务产生过多的segments ,导致一时间分配不了实时的,因为实时总是时间窗口之后进行hand off
- coordinator分配特定的算法,当集群规模越来越大,segments超过几百万的时候,经常容易协调不过来,线程卡住问题(实际运维前期也经常通过重启coordinator来解决的)
- 其它一些莫名原因,需要具体排查
Hand off 导致实时任务一直无法移交完成,占用这middleManager上的worker资源,这个是个很大的问题
任务的提交到启动
上面虽然介绍了任务的整个大致流程,但是实际的设计和处理确是有很多细节的,流程比较多,借助了zookeeper ,每一块都是一个知识点,也都在生产过程中遇到不少问题,本文会结合源码来阐述实际的运行原理
任务的提交
任务的创建,都知道是调用overlord的api去创建, 类似如下;任务具体的配置在官网也有详细的介绍
curl -X POST \http://localhost:8090/druid/indexer/v1/task \-H 'Content-Type: application/json' \-d '{"type" : "index_realtime","id" : "index_realtime_mv2_2020-11-13T08:00:00.000Z_0_2","resource" : {"availabilityGroup" : "mv2-2020-11-13T08:00:00.000Z-0000","requiredCapacity" : 1},"spec" : {"dataSchema" : {"dataSource" : "mv2","parser" : {"type" : "map","parseSpec" : {"format" : "json","timestampSpec" : {"column" : "time","format" : "millis","missingValue" : null},"dimensionsSpec" : {"dimensions" : [ "url", "user" ],"spatialDimensions" : [ ]}}},"metricsSpec" : [ {"type" : "count","name" : "views"}, {"type" : "doubleSum","name" : "latencyMs","fieldName" : "latencyMs","expression" : null} ],"granularitySpec" : {"type" : "uniform","segmentGranularity" : "HOUR","queryGranularity" : {"type" : "none"},"rollup" : true,"intervals" : null},"transformSpec" : {"filter" : null,"transforms" : [ ]}},"ioConfig" : {"type" : "realtime","firehose" : {"type" : "clipped","delegate" : {"type" : "timed","delegate" : {"type" : "receiver","serviceName" : "firehose:druid:overlord:mv2-008-0000-0000","bufferSize" : 100000},"shutoffTime" : "2020-11-13T09:15:00.000Z"},"interval" : "2020-11-13T08:00:00.000Z/2020-11-13T09:00:00.000Z"},"firehoseV2" : null},"tuningConfig" : {"type" : "realtime","maxRowsInMemory" : 1000,"intermediatePersistPeriod" : "PT3M","windowPeriod" : "PT10M","basePersistDirectory" : "/var/folders/mt/9sz41nps4_5_7rshbz3nkrsh0000gn/T/1605255000037-0","versioningPolicy" : {"type" : "intervalStart"},"rejectionPolicy" : {"type" : "none"},"maxPendingPersists" : 0,"shardSpec" : {"type" : "linear","partitionNum" : 0},"indexSpec" : {"bitmap" : {"type" : "concise"},"dimensionCompression" : "lz4","metricCompression" : "lz4","longEncoding" : "longs"},"buildV9Directly" : true,"persistThreadPriority" : 0,"mergeThreadPriority" : 0,"reportParseExceptions" : false,"handoffConditionTimeout" : 0,"alertTimeout" : 0,"segmentWriteOutMediumFactory" : null}},"context" : { },"groupId" : "index_realtime_mv2","dataSource" : "mv2"
}'
相关源码分析
创建任务的api对应到源码中:io.druid.indexing.overlord.http.OverlordResource#taskPost
, 如下稍微加上了些个人注释
@POST@Path("/task")@Consumes(MediaType.APPLICATION_JSON)@Produces(MediaType.APPLICATION_JSON)public Response taskPost(final Task task,@Context final HttpServletRequest req){final String dataSource = task.getDataSource();final ResourceAction resourceAction = new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE),Action.WRITE);Access authResult = AuthorizationUtils.authorizeResourceAction(req,resourceAction,authorizerMapper);if (!authResult.isAllowed()) {throw new ForbiddenException(authResult.getMessage());}/*** overlord leader 接收任务** TaskMaster 监听 overlord leader变化,维持 TaskQueue, TaskRunner** MetadataTaskStorage 操作元数据 druid_tasks 表,提供对任务元数据操作**/return asLeaderWith( // 只有master节点接受任务提交,其他slave节点返回503 codetaskMaster.getTaskQueue(), // taskMaster监听leader节点变更,当成为leader节点后,创建TaskQueue存放任务、taskRunner运行任务new Function<TaskQueue, Response>(){@Overridepublic Response apply(TaskQueue taskQueue){try {// 添加任务到元数据库中和任务队列中taskQueue.add(task);return Response.ok(ImmutableMap.of("task", task.getId())).build();}catch (EntryExistsException e) {return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error",StringUtils.format("Task[%s] already exists!", task.getId()))).build();}}});}
要点:
- overlord leader才负责任务接收,slave无用的
- 必须确保overlord的正常运行,否则任务提交可能失败;这里也要注意overlord的gc问题,Stop The World过长也会导致当时提交任务的失败
任务队列和overlord转发到middleManager
TaskQueue 源码
个人现阶段阅读学习源码经验:到处打log就好,可以先无脑打log,慢慢深入后,再分析,如下TaskQueue
从LifecycleStart
方法开始
源码注释交代了用途:作为任务产生和运行的一个中间层,即任务产生后交给TaskQueue
, 然后TaskQueue
再交给taskRunner
去运行(这里本地源码设置的taskRunner是RemoteTaskRunner
类)
/*** Interface between task producers and the task runner.* <p/>* This object accepts tasks from producers using {@link #add} and manages delivery of these tasks to a* {@link TaskRunner}. Tasks will run in a mostly-FIFO order, with deviations when the natural next task is not ready* in time (based on its {@link Task#isReady} method).* <p/>* For persistence, we save all new tasks and task status changes using a {@link TaskStorage} object.*/
public class TaskQueue
{private final List<Task> tasks = Lists.newArrayList();private final Map<String, ListenableFuture<TaskStatus>> taskFutures = Maps.newHashMap();private final TaskQueueConfig config;private final TaskStorage taskStorage;private final TaskRunner taskRunner;/*** LocalTaskActionClientFactory* RemoteTaskActionClientFactory*/private final TaskActionClientFactory taskActionClientFactory;private final TaskLockbox taskLockbox;private final ServiceEmitter emitter;private final ReentrantLock giant = new ReentrantLock(true);private final Condition managementMayBeNecessary = giant.newCondition();private final ExecutorService managerExec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(false).setNameFormat("TaskQueue-Manager").build());private final ScheduledExecutorService storageSyncExec = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(false).setNameFormat("TaskQueue-StorageSync").build());private volatile boolean active = false;private static final EmittingLogger log = new EmittingLogger(TaskQueue.class);@Injectpublic TaskQueue(TaskQueueConfig config,TaskStorage taskStorage,TaskRunner taskRunner,TaskActionClientFactory taskActionClientFactory,TaskLockbox taskLockbox,ServiceEmitter emitter){this.config = Preconditions.checkNotNull(config, "config");this.taskStorage = Preconditions.checkNotNull(taskStorage, "taskStorage");this.taskRunner = Preconditions.checkNotNull(taskRunner, "taskRunner");this.taskActionClientFactory = Preconditions.checkNotNull(taskActionClientFactory, "taskActionClientFactory");this.taskLockbox = Preconditions.checkNotNull(taskLockbox, "taskLockbox");this.emitter = Preconditions.checkNotNull(emitter, "emitter");}
主要方法逻辑在:io.druid.indexing.overlord.TaskQueue#manage
io.druid.indexing.overlord.HeapMemoryTaskStorage
会添加任务到自己内部结构中,并打印logInserting task index_realtime_mv2_2020-11-14T01:00:00.000Z_0_1 with status: TaskStatus{id=index_realtime_mv2_2020-11-14T01:00:00.000Z_0_1, status=RUNNING, duration=-1}
- TaskQueue 会将任务存储到
io.druid.indexing.overlord.TaskQueue#tasks
,然后遍历交给taskRunner, 并返回ListenableFuture<TaskStatus>
可以对任务状态进行监听
RemoteTaskRunner & 任务状态(pending, waiting, running, completed)
从TaskQueue
的代码执行到:io.druid.indexing.overlord.RemoteTaskRunner#run
/*** A task will be run only if there is no current knowledge in the RemoteTaskRunner of the task.** @param task task to run*/@Overridepublic ListenableFuture<TaskStatus> run(final Task task){final RemoteTaskRunnerWorkItem completeTask, runningTask, pendingTask;if ((pendingTask = pendingTasks.get(task.getId())) != null) {log.info("Assigned a task[%s] that is already pending, not doing anything", task.getId());return pendingTask.getResult();} else if ((runningTask = runningTasks.get(task.getId())) != null) {ZkWorker zkWorker = findWorkerRunningTask(task.getId());if (zkWorker == null) {log.warn("Told to run task[%s], but no worker has started running it yet.", task.getId());} else {log.info("Task[%s] already running on %s.", task.getId(), zkWorker.getWorker().getHost());TaskAnnouncement announcement = zkWorker.getRunningTasks().get(task.getId());if (announcement.getTaskStatus().isComplete()) {taskComplete(runningTask, zkWorker, announcement.getTaskStatus());}}return runningTask.getResult();} else if ((completeTask = completeTasks.get(task.getId())) != null) {return completeTask.getResult();} else {return addPendingTask(task).getResult();}}
我们都熟悉overlord任务管理界面任务是有不同状态的,如下图,而上面的代码就是入口
如果正常提交新任务,那么是从io.druid.indexing.overlord.RemoteTaskRunner#addPendingTask
开始这个任务状态的各种变化的
/*** Adds a task to the pending queue*/private RemoteTaskRunnerWorkItem addPendingTask(final Task task){log.info("Added pending task %s", task.getId());final RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(task.getId(), null, null);pendingTaskPayloads.put(task.getId(), task);pendingTasks.put(task.getId(), taskRunnerWorkItem);runPendingTasks();return taskRunnerWorkItem;}
接下来的io.druid.indexing.overlord.RemoteTaskRunner#runPendingTasks
是重点,任务必须交给一个worker去执行,如何分配选择这是个问题?
/*** 此方法使用多线程执行程序来提取所有pending的任务并尝试运行它们。* 被成功分配worker的任务将从pendingTasks移动到runningTasks。 此方法是线程安全的。* 有新worker或新任务被分配时,都应运行此方法。** This method uses a multi-threaded executor to extract all pending tasks and attempt to run them. Any tasks that* are successfully assigned to a worker will be moved from pendingTasks to runningTasks. This method is thread-safe.* This method should be run each time there is new worker capacity or if new tasks are assigned.*/private void runPendingTasks(){log.info("start runPendingTasks");runPendingTasksExec.submit(new Callable<Void>(){@Overridepublic Void call() throws Exception{try {// make a copy of the pending tasks because tryAssignTask may delete tasks from pending and move them// into running statusList<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());sortByInsertionTime(copy);for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {String taskId = taskRunnerWorkItem.getTaskId();log.info("runPendingTasks task:%s", taskId);if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {try {//this can still be null due to race from explicit task shutdown request//or if another thread steals and completes this task right after this thread makes copy//of pending tasks. See https://github.com/druid-io/druid/issues/2842 .Task task = pendingTaskPayloads.get(taskId);if (task != null && tryAssignTask(task, taskRunnerWorkItem)) {pendingTaskPayloads.remove(taskId);}}catch (Exception e) {log.makeAlert(e, "Exception while trying to assign task").addData("taskId", taskRunnerWorkItem.getTaskId()).emit();RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);if (workItem != null) {taskComplete(workItem, null, TaskStatus.failure(taskId));}}finally {tryAssignTasks.remove(taskId);}}}}catch (Exception e) {log.makeAlert(e, "Exception in running pending tasks").emit();}return null;}});}
任务分配给worker策略选择
接着上一步骤,代码将走到io.druid.indexing.overlord.RemoteTaskRunner#tryAssignTask
我们知道官网文档上是给出了worker select strategy
的,所以源码也是按照策略选择的,上面截图中可以看到策略是io.druid.indexing.overlord.setup.EqualDistributionWorkerSelectStrategy
简要步骤如下
// 根据具体策略找到一个worker
immutableZkWorker = strategy.findWorkerForTask()
// 根据找到的worker从zk获取worker的具体信息
assignedWorker = zkWorkers.get(immutableZkWorker.getWorker().getHost());
// 发布任务:
// 1. zk要创建任务worker/running任务节点
// 2. pendingTasks要移除该任务,并设置为running状态加入到`RemoteTaskRunner`的runningTasks队列中
announceTask(task, assignedWorker, taskRunnerWorkItem);
Peon进程的启动
io.druid.indexing.overlord.RemoteTaskRunner#start
的zookeeper注册监听能知道worker节点任务的变化,在日志中也能看到任务状态和location的改变,如下图
CliMiddleManager: WorkerTaskMonitor & ForkingTaskRunner
middleManager进程启动后,注册了WorkerTaskMonitor
对worker任务的监听,有了运行任务会存储到private final BlockingQueue<Notice> notices = new LinkedBlockingDeque<>();
队列中,然后WorkerTaskMonitor
的主循环方法不断到获取队列中任务交给ForkingTaskRunner
去运行即可
private void mainLoop(){try {while (!Thread.currentThread().isInterrupted()) {final Notice notice = notices.take();try {notice.handle();}catch (InterruptedException e) {// Will be caught and logged in the outer try blockthrow e;}catch (Exception e) {log.makeAlert(e, "Failed to handle notice").addData("noticeClass", notice.getClass().getSimpleName()).addData("noticeTaskId", notice.getTaskId()).emit();}}}catch (InterruptedException e) {log.info("WorkerTaskMonitor interrupted, exiting.");}finally {doneStopping.countDown();}}
重点关注:io.druid.indexing.worker.WorkerTaskMonitor.RunNotice
private class RunNotice implements Notice{private final Task task;public RunNotice(Task task){this.task = task;}@Overridepublic String getTaskId(){return task.getId();}@Overridepublic void handle() throws Exception{if (running.containsKey(task.getId())) {log.warn("Got run notice for task [%s] that I am already running...",task.getId());workerCuratorCoordinator.removeTaskRunZnode(task.getId());return;}log.info("Submitting runnable for task[%s]", task.getId());workerCuratorCoordinator.updateTaskStatusAnnouncement(TaskAnnouncement.create(task,TaskStatus.running(task.getId()),TaskLocation.unknown()));log.info("Affirmative. Running task [%s]", task.getId());workerCuratorCoordinator.removeTaskRunZnode(task.getId());final ListenableFuture<TaskStatus> future = taskRunner.run(task);addRunningTask(task, future);}}
所以真正的主角就是io.druid.indexing.overlord.ForkingTaskRunner#run
ForkingTaskRunner 启动peon进程
上面分析了一大堆,目前才开始peon;上面找到了任务要运行的middleManager,显然首先要在middleManager找一个端口,才能启动peon进程
可以看下PortFinder
这个类,本文这里就不具体分析了;说下线上遇到的问题:找到了一个可用端口,但是起peon进程还是有个短暂过程的,这期间要是端口突然被占用了,就会导致peon进程启动失败,如下图·Address already in use
的报错
peon进程通过Java Process启动起来,可以看下官网文档说明
https://druid.apache.org/docs/0.20.0/design/peons.html
任务的消费,持久化
peon进程后,任务才真正的开始工作:消费,查询,持久化等等;实际工作的任务参数配置是件重要的事情,需要足够清楚,在分析和排查问题时就会游刃有余
官网文档对任务相关介绍:http://druid.io/docs/0.12.3/ingestion/tasks.html
task.json & peon configuration
eg: tranquility实时任务运行后的一个task.json
如下
{"type":"index_realtime","id":"index_realtime_mv2_2020-11-13T08:00:00.000Z_0_2","resource":{"aabilityGroup":"mv2-2020-11-13T08:00:00.000Z-0000","requiredCapacity":1},"spec":{"dataSchema":{"dataSource":"mv2","parser":{"type":"map","parseSpec":{"format":"json","timestampSpec":{"column":"time","format":"millis","missingValue":null},"dimensionsSpec":{"dimensions":["url","user"],"spatialDimensions":[]}}},"metricsSpec":[{"type":"count","name":"views"},{"type":"doubleSum","name":"latencyMs","fieldName":"latencyMs","expression":null}],"granularitySpec":{"type":"uniform","segmentGranularity":"HOUR","queryGranularity":{"type":"none"},"rollup":true,"intervals":null},"transformSpec":{"filter":null,"transforms":[]}},"ioConfig":{"type":"realtime","firehose":{"type":"clipped","delegate":{"type":"timed","delegate":{"type":"receiver","serviceName":"firehose:druid:overlord:mv2-008-0000-0000","bufferSize":100000},"shutoffTime":"2020-11-13T09:15:00.000Z"},"interval":"2020-11-13T08:00:00.000Z/2020-11-13T09:00:00.000Z"},"firehoseV2":null},"tuningConfig":{"type":"realtime","maxRowsInMemory":1000,"intermediatePersistPeriod":"PT3M","windowPeriod":"PT10M","basePersistDirectory":"/var/folders/mt/9sz41nps4_5_7rshbz3nkrsh0000gn/T/1605255000037-0","versioningPolicy":{"type":"intervalStart"},"rejectionPolicy":{"type":"none"},"maxPendingPersists":0,"shardSpec":{"type":"linear","partitionNum":0},"indexSpec":{"bitmap":{"type":"concise"},"dimensionCompression":"lz4","metricCompression":"lz4","longEncoding":"longs"},"buildV9Directly":true,"persistThreadPriority":0,"mergeThreadPriority":0,"reportParseExceptions":false,"handoffConditionTimeout":0,"alertTimeout":0,"segmentWriteOutMediumFactory":null}},"context":{},"groupId":"index_realtime_mv2","dataSource":"mv2"
}
相关配置说明可参见官方文档,可以看最新的文档:https://druid.apache.org/docs/latest/configuration/index.html
tuningConfig
中的几个重要参数
配置项 | 描述 | 默认值 | 中文说明 |
---|---|---|---|
maxRowsInMemory | The maximum number of records to store in memory before persisting to disk. Note that this is the number of rows post-rollup, and so it may not be equal to the number of input records. Ingested records will be persisted to disk when either maxRowsInMemory or maxBytesInMemory are reached (whichever happens first). | 1000000 | 数据在peon内存中需要持久化到本地文件,可以聚合后配置超过多少数据量就开始持久化 |
intermediatePersistPeriod | 10min | 持久化也可以消费多少分钟的数据后就开始持久化,跟maxRowsInMemory配置项功能类似 | |
windowPeriod | 10min | tranquility的时间窗口概念,比如实时数据延迟较大,这里就可以稍微配置大一点 | |
basePersistDirectory | 持久化文件和任务json,log相关信息是在本地文件目录的,这里配置持久化根目录 |
平时看任务log,可以在io.druid.indexing.overlord.ForkingTaskRunner
看到如下的log,任务日志是写到了本地文件中的
Logging task index_realtime_mv2_2020-11-14T03:00:00.000Z_0_1 output to: /var/folders/mt/9sz41nps4_5_7rshbz3nkrsh0000gn/T/persistent/task/index_realtime_mv2_2020-11-14T03:00:00.000Z_0_1/log
ioConfig
中有个bufferSize
是需要注意的
配置项 | 描述 | 默认值 | 中文说明 |
---|---|---|---|
bufferSize | druid处理消息内部也是缓冲处理的,即数据加入到buffer,druid从buffer取,然后建立索引,这个bufferSize 指名了buffer的大小;生产上如果这个buffer一直满这的,那么一般说明druid处理不过来了,这里也需要进行指标的监控告警 |
CliPeon & RealtimePlumber
本文最开始类比了消防作业,说明了【消防员】这个角色的重要行,在druid.io源码则是RealtimePlumber
这个重要类,了解数据的接收处理,必须明白RealtimePlumber
CliPeon
前面ForkingTaskRunner
启动了Peon进程,但是Peon进程在做什么没有说明,这个必须要了解下,整个源码如下,跟其它进程没什么两样:生命周期,回调,模块绑定等。查看任务log,结合源码是比不可少的
/***/
@Command(name = "peon",description = "Runs a Peon, this is an individual forked \"task\" used as part of the indexing service. "+ "This should rarely, if ever, be used directly. See http://druid.io/docs/latest/design/peons.html for a description"
)
public class CliPeon extends GuiceRunnable
{@Arguments(description = "task.json status.json", required = true)public List<String> taskAndStatusFile;@Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK")public String nodeType = "indexer-executor";private static final Logger log = new Logger(CliPeon.class);@Injectprivate Properties properties;public CliPeon(){super(log);}@Overrideprotected List<? extends Module> getModules(){return ImmutableList.<Module>of(new DruidProcessingModule(),new QueryableModule(),new QueryRunnerFactoryModule(),new Module(){@Overridepublic void configure(Binder binder){binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/peon");binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);PolyBind.createChoice(binder,"druid.indexer.task.chathandler.type",Key.get(ChatHandlerProvider.class),Key.get(ServiceAnnouncingChatHandlerProvider.class));final MapBinder<String, ChatHandlerProvider> handlerProviderBinder = PolyBind.optionBinder(binder, Key.get(ChatHandlerProvider.class));handlerProviderBinder.addBinding("announce").to(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class);handlerProviderBinder.addBinding("noop").to(NoopChatHandlerProvider.class).in(LazySingleton.class);binder.bind(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class);binder.bind(NoopChatHandlerProvider.class).in(LazySingleton.class);binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);JsonConfigProvider.bind(binder, "druid.peon.taskActionClient.retry", RetryPolicyConfig.class);configureTaskActionClient(binder);binder.bind(RetryPolicyFactory.class).in(LazySingleton.class);// Build it to make it bind even if nothing binds to it.Binders.dataSegmentKillerBinder(binder);binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class);Binders.dataSegmentMoverBinder(binder);binder.bind(DataSegmentMover.class).to(OmniDataSegmentMover.class).in(LazySingleton.class);Binders.dataSegmentArchiverBinder(binder);binder.bind(DataSegmentArchiver.class).to(OmniDataSegmentArchiver.class).in(LazySingleton.class);binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class);LifecycleModule.register(binder, ExecutorLifecycle.class);binder.bind(ExecutorLifecycleConfig.class).toInstance(new ExecutorLifecycleConfig().setTaskFile(new File(taskAndStatusFile.get(0))).setStatusFile(new File(taskAndStatusFile.get(1))));binder.bind(TaskRunner.class).to(ThreadPoolTaskRunner.class);binder.bind(QuerySegmentWalker.class).to(ThreadPoolTaskRunner.class);binder.bind(ThreadPoolTaskRunner.class).in(ManageLifecycle.class);JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class);binder.install(new CacheModule());JsonConfigProvider.bind(binder,"druid.segment.handoff",CoordinatorBasedSegmentHandoffNotifierConfig.class);binder.bind(SegmentHandoffNotifierFactory.class).to(CoordinatorBasedSegmentHandoffNotifierFactory.class).in(LazySingleton.class);// Override the default SegmentLoaderConfig because we don't actually care about the// configuration based locations. This will override them anyway. This is also stopping// configuration of other parameters, but I don't think that's actually a problem.// Note, if that is actually not a problem, then that probably means we have the wrong abstraction.binder.bind(SegmentLoaderConfig.class).toInstance(new SegmentLoaderConfig().withLocations(Arrays.<StorageLocationConfig>asList()));binder.bind(CoordinatorClient.class).in(LazySingleton.class);binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);Jerseys.addResource(binder, SegmentListerResource.class);binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(ServerType.fromString(nodeType)));LifecycleModule.register(binder, Server.class);}private void configureTaskActionClient(Binder binder){PolyBind.createChoice(binder,"druid.peon.mode",Key.get(TaskActionClientFactory.class),Key.get(RemoteTaskActionClientFactory.class));final MapBinder<String, TaskActionClientFactory> taskActionBinder = PolyBind.optionBinder(binder, Key.get(TaskActionClientFactory.class));taskActionBinder.addBinding("local").to(LocalTaskActionClientFactory.class).in(LazySingleton.class);// all of these bindings are so that we can run the peon in local modeJsonConfigProvider.bind(binder, "druid.indexer.storage", TaskStorageConfig.class);binder.bind(TaskStorage.class).to(HeapMemoryTaskStorage.class).in(LazySingleton.class);binder.bind(TaskActionToolbox.class).in(LazySingleton.class);binder.bind(IndexerMetadataStorageCoordinator.class).to(IndexerSQLMetadataStorageCoordinator.class).in(LazySingleton.class);taskActionBinder.addBinding("remote").to(RemoteTaskActionClientFactory.class).in(LazySingleton.class);}@Provides@LazySingletonpublic Task readTask(@Json ObjectMapper mapper, ExecutorLifecycleConfig config){try {return mapper.readValue(config.getTaskFile(), Task.class);}catch (IOException e) {throw Throwables.propagate(e);}}@Provides@LazySingleton@Named(DataSourceTaskIdHolder.DATA_SOURCE_BINDING)public String getDataSourceFromTask(final Task task){return task.getDataSource();}@Provides@LazySingleton@Named(DataSourceTaskIdHolder.TASK_ID_BINDING)public String getTaskIDFromTask(final Task task){return task.getId();}@Providespublic SegmentListerResource getSegmentListerResource(@Json ObjectMapper jsonMapper,@Smile ObjectMapper smileMapper,@Nullable BatchDataSegmentAnnouncer announcer){return new SegmentListerResource(jsonMapper,smileMapper,announcer,null);}},new QueryablePeonModule(),new IndexingServiceFirehoseModule(),new ChatHandlerServerModule(properties),new LookupModule());}@Overridepublic void run(){try {Injector injector = makeInjector();try {final Lifecycle lifecycle = initLifecycle(injector);final Thread hook = new Thread(new Runnable(){@Overridepublic void run(){log.info("Running shutdown hook");lifecycle.stop();}});Runtime.getRuntime().addShutdownHook(hook);injector.getInstance(ExecutorLifecycle.class).join();// Sanity check to help debug unexpected non-daemon threadsfinal Set<Thread> threadSet = Thread.getAllStackTraces().keySet();for (Thread thread : threadSet) {if (!thread.isDaemon() && thread != Thread.currentThread()) {log.info("Thread [%s] is non daemon.", thread);}}// Explicitly call lifecycle stop, dont rely on shutdown hook.lifecycle.stop();try {Runtime.getRuntime().removeShutdownHook(hook);}catch (IllegalStateException e) {log.warn("Cannot remove shutdown hook, already shutting down");}}catch (Throwable t) {log.error(t, "Error when starting up. Failing.");System.exit(1);}log.info("Finished peon task");}catch (Exception e) {throw Throwables.propagate(e);}}
}
peon将任务交给io.druid.indexing.overlord.ThreadPoolTaskRunner#run
执行, 具体则是ThreadPoolTaskRunnerCallable
的call方法
private class ThreadPoolTaskRunnerCallable implements Callable<TaskStatus>{private final Task task;private final TaskLocation location;private final TaskToolbox toolbox;public ThreadPoolTaskRunnerCallable(Task task, TaskLocation location, TaskToolbox toolbox){this.task = task;this.location = location;this.toolbox = toolbox;}@Overridepublic TaskStatus call(){final long startTime = System.currentTimeMillis();TaskStatus status;try {log.info("Running task: %s", task.getId());TaskRunnerUtils.notifyLocationChanged(listeners,task.getId(),location);TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.running(task.getId()));status = task.run(toolbox);}catch (InterruptedException e) {// Don't reset the interrupt flag of the thread, as we do want to continue to the end of this callable.if (stopping) {// Tasks may interrupt their own run threads to stop themselves gracefully; don't be too scary about this.log.debug(e, "Interrupted while running task[%s] during graceful shutdown.", task);} else {// Not stopping, this is definitely unexpected.log.warn(e, "Interrupted while running task[%s]", task);}status = TaskStatus.failure(task.getId());}catch (Exception e) {log.error(e, "Exception while running task[%s]", task);status = TaskStatus.failure(task.getId());}catch (Throwable t) {log.error(t, "Uncaught Throwable while running task[%s]", task);throw t;}status = status.withDuration(System.currentTimeMillis() - startTime);TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);return status;}}
我们这里是io.druid.indexing.common.task.RealtimeIndexTask#run
RealtimeIndexTask --> RealtimePlumber
上面从peon进程走到了RealtimeIndexTask
的run方法真正开始运行任务,但事实上这里我们才刚开始,下面先简单但介绍下任务这里流程(这里很多流程就和类比的消防
流程差不多了)
io.druid.indexing.common.task.RealtimeIndexTask#run
// segment发布用
final SegmentPublisher segmentPublisher = new TaskActionSegmentPublisher(toolbox);
final DataSegmentAnnouncer lockingSegmentAnnouncer = new DataSegmentAnnouncer()
FireDepartment fireDepartment = new FireDepartment(...
PlumberSchool plumberSchool = new RealtimePlumberSchool(...
this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, metrics);
try {toolbox.getDataSegmentServerAnnouncer().announce();toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);plumber.startJob();// Set up metrics emissiontoolbox.getMonitorScheduler().addMonitor(metricsMonitor);// Firehose temporary directory is automatically removed when this RealtimeIndexTask completes.FileUtils.forceMkdir(firehoseTempDir);// Delay firehose connection to avoid claiming input resources while the plumber is starting up.final FirehoseFactory firehoseFactory = spec.getIOConfig().getFirehoseFactory();final boolean firehoseDrainableByClosing = isFirehoseDrainableByClosing(firehoseFactory);// Skip connecting firehose if we've been stopped before we got started.synchronized (this) {if (!gracefullyStopped) {firehose = firehoseFactory.connect(spec.getDataSchema().getParser(), firehoseTempDir);committerSupplier = Committers.supplierFromFirehose(firehose);}}// Time to read data!while (firehose != null && (!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) {Plumbers.addNextRow(committerSupplier,firehose,plumber,tuningConfig.isReportParseExceptions(),metrics);}}catch (Throwable e) {normalExit = false;log.makeAlert(e, "Exception aborted realtime processing[%s]", dataSchema.getDataSource()).emit();throw e;}
这里的plumber就是RealtimePlumber
,所以io.druid.segment.realtime.plumber.RealtimePlumber#startJob
这个是开始RealtimePlumber(消防员/水管工)的真正工作
同时要注意到firehose = firehoseFactory.connect(spec.getDataSchema().getParser(), firehoseTempDir);
, 实时数据流是通过firehoseFactory
来接收的(firehose英文意思就是:消防水带的意思,水在水管中流过来,那么数据在firehoseFactory进来,这么理解会清晰一点)
RealtimePlumber#startJob
io.druid.segment.realtime.plumber.RealtimePlumber#startJob
代码如下,很简单的一个流程,这里可以看到: persist
,merge
,hand off
这几个工作都有
@Overridepublic Object startJob(){computeBaseDir(schema).mkdirs();initializeExecutors();handoffNotifier.start();Object retVal = bootstrapSinksFromDisk();startPersistThread();// Push pending sinks bootstrapped from previous runmergeAndPush();resetNextFlush();return retVal;}
tranquility & EventReceiverFirehoseFactory#connect
tranquility不是本文重点,这里简单介绍一下:
tranquility将数据以POST请求的方式提交给EventReceiverFirehose(会丢弃所有时间窗口外的数据),当到达任务到达(SegmentGranularity+WindowPeriod)时,TimedShutoffFirehose会自动关闭Firehose,此时Segment会进行合并、注册元信息、存储到Deep Storage中并等待Handoff,当某个Historical Node声明自己已加载该Segment后,Indexing Service Task会正常退出。
tranquility post提交数据的api接口是EventReceiverFirehoseFactory的/push-events
, 如下图
EventReceiverFirehoseFactory & RealtimePlumber 数据处理流程
上面的内容都是铺垫,EventReceiverFirehoseFactory
& RealtimePlumber
这两个类是数据处理的重点,要研究数据的流程,这两个类就必须好好分析。
实时数据处理的大概流程图
EventReceiverFirehose buffer的使用?
io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory.EventReceiverFirehose#addRows
这里的buffer就是个数组阻塞队列:BlockingQueue<InputRow> buffer = new ArrayBlockingQueue<>(bufferSize);
public void addRows(Iterable<InputRow> rows) throws InterruptedException
{for (final InputRow row : rows) {boolean added = false;while (!closed && !added) {added = buffer.offer(row, 500, TimeUnit.MILLISECONDS);if (!added) {long currTime = System.currentTimeMillis();long lastTime = lastBufferAddFailMsgTime.get();if (currTime - lastTime > 10000 && lastBufferAddFailMsgTime.compareAndSet(lastTime, currTime)) {log.warn("Failed to add event to buffer with current size [%s] . Retrying...", buffer.size());}}}if (!added) {throw new IllegalStateException("Cannot add events to closed firehose!");}}
}
什么时候需要持久化?
io.druid.segment.realtime.plumber.RealtimePlumber#add
@Override
public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
{long messageTimestamp = row.getTimestampFromEpoch();final Sink sink = getSink(messageTimestamp);metrics.reportMessageMaxTimestamp(messageTimestamp);if (sink == null) {return -1;}final int numRows = sink.add(row, false);if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) {persist(committerSupplier.get());}return numRows;
}
持久化的条件就是if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush)
这个判断
- nextFlush 就是一个时间判断(这里就是前文说的
intermediatePersistPeriod
配置)
private void resetNextFlush()
{nextFlush = DateTimes.nowUtc().plus(config.getIntermediatePersistPeriod()).getMillis();
}
- !sink.canAppendRow()
判断的是当前currHydrant
public boolean canAppendRow()
{synchronized (hydrantLock) {return writable && currHydrant != null && currHydrant.getIndex().canAppendRow();}
}
具体是:io.druid.segment.incremental.OnheapIncrementalIndex#canAppendRow
,可以看到就是当前Hydrant聚合后行数是否超过了一个阈值(也即前文说的maxRowsInMemory
配置)
@Override
public boolean canAppendRow(){final boolean canAdd = size() < maxRowCount;if (!canAdd) {outOfRowsReason = StringUtils.format("Maximum number of rows [%d] reached", maxRowCount);}return canAdd;}
所以最后就是这两个条件满足其一,就会进行持久化操作,没那么复杂
一个实际的index_realtime task log文件
每次持久化都会出现如下的log信息
通常的,当移交超时的时候会看到疯狂的log:Still waiting for Handoff for Segments
总结
本文结合源码和实际例子基本介绍了index_realtime任务从提交到执行的完整流程,但是实际上还是有很多细节没有讲到,后续也需要继续补充。也是作为个人源码阅读和实际工作对这部分内容的理解和总结
----2020年11月14日 星期六 18时03分18秒 CST
Druid.io index_realtime实时任务源码分析相关推荐
- iostat IO统计原理linux内核源码分析----基于单通道SATA盘
iostat IO统计原理linux内核源码分析----基于单通道SATA盘 先上一个IO发送submit_bio流程图,本文基本就是围绕该流程讲解. 内核版本 3.10.96 详细的源码注释:htt ...
- druid.io index_realtime任务的hand off:仍然是源码+log说清楚
目录 前言 源码 + log 说明流程 总结 前言 之前的博文:Druid.io index_realtime实时任务源码分析 介绍了整个index_realtime任务运行的流程,但是对于某些细节还 ...
- Rxjava源码分析之IO.Reactivex.Observable
Rxjava 源码系列目录 Rxjava源码分析之IO.Reactivex.Observer Rxjava源码分析之IO.Reactivex.CompositeDisposable Rxjava源码分 ...
- Rxjava源码分析之IO.Reactivex.CompositeDisposable
Rxjava 源码系列目录 Rxjava源码分析之IO.Reactivex.Observer Rxjava源码分析之IO.Reactivex.CompositeDisposable Rxjava源码分 ...
- coredns源码分析
CoreDNS是使用go语言编写的快速灵活的DNS服务,采用链式插件模式,每个插件实现独立的功能,底层协议可以是tcp/udp,也可以是TLS,gRPC等.默认监听所有ip地址,可使用bind插件指定 ...
- ceph bluestore源码分析:admin_socket实时获取内存池数据
环境: 版本:ceph 12.2.1 部署完cephfs 使用ceph-fuse挂载,并写入数据 关键参数: debug_mempool = true 将该参数置为true即可查看详细的blustor ...
- 【Linux 内核】实时调度类 ② ( 实时调度实体 sched_rt_entity 源码分析 | run_list、timeout、watchdog_stamp、time_slice 字段 )
文章目录 一.sched_rt_entity 源码分析 1.run_list 字段 2.timeout 字段 3.watchdog_stamp 字段 4.time_slice 字段 5.back 字段 ...
- Apache Storm 实时流处理系统通信机制源码分析
我们今天就来仔细研究一下Apache Storm 2.0.0-SNAPSHOT的通信机制.下面我将从大致思想以及源码分析,然后我们细致分析实时流处理系统中源码通信机制研究. 1. 简介 Worker间 ...
- Thrift异步IO服务器源码分析
http://yanyiwu.com/work/2014/12/06/thrift-tnonblockingserver-analysis.html 最近在使用 libevent 开发项目,想起之前写 ...
最新文章
- java 密钥工厂 desede_20145212 实验五《Java网络编程》
- 明确REST接口开发的核心任务
- 字节跳动内部学习资料泄露!kafka入门教程
- 关不关机 扫地机器人_【小米智能家居】米家扫拖机器人,模拟人工来回擦拖地!...
- HttpServletRequest类用途
- 实力打脸: 量子隐形传输与 “瞬间移动” 毫无关系
- python语言数据挖掘python语言数据_Python语言数据挖掘01-环境搭建
- Windows下IIS+PHP 5.2的安装与配置
- mysql报904_数据库错误码[-904]
- pyqt与mysql例子_PyQt 连接MySql数据库,C++代码转Python3代码
- 电脑开机一会就蓝屏怎么回事_电脑蓝屏怎么回事
- C#根据身份证获出生日期和性别---含C#代码
- 【Matlab学习笔记】【函数学习】eps
- UNIX/Linux系统结构
- 汉诺塔C语言实现(纯代码)
- egg服务重启及child_process的使用
- 为什么要玩FLTK(Fast Light Tool Kit)
- 属性动画与图片三级缓存
- 【单片机基础】(四)单片机的引脚功能
- 微信支付委托代扣的服务商模式和直连模式