Flink1.12源码解读——Checkpoint详细执行过程
Apache Flink作为国内最火的大数据计算引擎之一,自身支持高吞吐,低延迟,exactly-once语义,有状态流等特性,阅读源码有助加深对框架的理解和认知。
因为在前面讲过的ExecutionGraph执行图构建https://blog.csdn.net/ws0owws0ow/article/details/113991593?spm=1001.2014.3001.5501中我们解析过JM的生成到JobMaster选主后会涉及到Checkpoint的调度,故我特别把Flink 的Checkpoint 单独展开一章介绍便于后续在解读物理执行图计划时候加深理解....。
备注:整个Checkpoint过程涉及到的内存模型,基于Netty通信,Credit消费模式以及TM生成的ResultSubpartition/InputGate等内容会放在后续章节解析。
Checkpoint作为Flink的核心容错机制,在Flink故障或人为重启后能快速恢复挂掉时的中间状态。Checkpoint流程顺序大概如下:JM启动Checkpoint->远程ExecutionRPC发送Checkpoint通知->StreamTask的subtaskCheckpointCoordinator触发checkpoint->生成并广播CheckpointBarrier->执行快照->向jobMaster发送完成报告
最开始在JM选主成功后会启动Scheduler,这里Scheduler除了用来分发部署Task外还用来生成Checkpoint的周期线程并通过JM上ExecutionGraph的CheckpointCoordinator并触发Checkpoint
直接进入启动Scheduler调度入口:
protected void startSchedulingInternal() {log.info("Starting scheduling with scheduling strategy [{}]", schedulingStrategy.getClass().getName());//通知ExecutionGraph的CheckpointCoordinator改变状态为running并准备执行checkpoint//生成的ScheduledTriggerRunnable主要包含checkpoint周期调用(CheckpointCoordinator.startCheckpointScheduler)的逻辑//JM的Checkpoint会判断Execution的assignedResource是否为空,否则不会向TM提交Checkpoint//当提交申请TM部署slot成功后,Execution的assignedResource才会被赋值,此时JM的Checkpoint周期线程才会被往后继续执行调用TM的task执行checkpointprepareExecutionGraphForNgScheduling();schedulingStrategy.startScheduling();}//转化JobStatus为Running public void transitionToRunning() {if (!transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);}}@Override//当job状态改为running的时候才真正开始触发Checkpointpublic void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {if (newJobStatus == JobStatus.RUNNING) {// start the checkpoint schedulercoordinator.startCheckpointScheduler();} else {// anything else should stop the trigger for nowcoordinator.stopCheckpointScheduler();}}private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {//底层调用的是ExecutionGraph创建的SingleThreadScheduledExecutor.scheduleAtFixedRatereturn timer.scheduleAtFixedRate(new ScheduledTrigger(),//执行线程initDelay//初始化延迟时间, baseInterval//线程调用间隔, TimeUnit.MILLISECONDS//计时单位毫秒);}
进入ScheduledTrigger线程逻辑
- ExecutionGraph的CheckpointCoordinator开始触发Checkpoint CheckpointCoordinator初始化原子自增CheckpointID和state存储位置
- 每个OperatorCoordinatorCheckpointContext(创建JobVertex时划分出的SourceOperator)开始依次调用checkpoint
- 这里的checkpoint其实就是JM把同一个当前CheckpointID写入到state存储位置,也就仅仅是JM端的CheckpointID初始化和存储
- 当CheckpointID写入完成并未发现任何异常后开始遍历所有executions依次调用Checkpoint
- 由于JM的Checkpoint周期线程启动较早,只有当TM和Slot申请完毕后才会向Execution提交CheckpointBarrier
private final class ScheduledTrigger implements Runnable {@Overridepublic void run() {try {triggerCheckpoint(true);}catch (Exception e) {LOG.error("Exception while triggering checkpoint for job {}.", job, e);}}}private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {try {synchronized (lock) {preCheckGlobalState(request.isPeriodic);}final Execution[] executions = getTriggerExecutions();final Map<ExecutionAttemptID, ExecutionVertex> ackTasks = getAckTasks();// we will actually trigger this checkpoint!Preconditions.checkState(!isTriggering);isTriggering = true;final long timestamp = System.currentTimeMillis();final CompletableFuture<PendingCheckpoint> pendingCheckpointCompletableFuture =//初始化原子自增CheckpointID和Checkpoint存储State位置initializeCheckpoint(request.props, request.externalSavepointLocation).thenApplyAsync((checkpointIdAndStorageLocation) -> createPendingCheckpoint(timestamp,request.props,ackTasks,request.isPeriodic,checkpointIdAndStorageLocation.checkpointId,checkpointIdAndStorageLocation.checkpointStorageLocation,request.getOnCompletionFuture()),timer);final CompletableFuture<?> coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture.thenComposeAsync((pendingCheckpoint) ->//coordinatorsToCheckpoint存放的是在JobVertex创建过程中存储的所有source的jobVertexOperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(coordinatorsToCheckpoint, pendingCheckpoint, timer),timer);....// no exception, no discarding, everything is OKfinal long checkpointId = checkpoint.getCheckpointId();//未发现任何异常后开始调用TM上每个Execution的CheckpointsnapshotTaskState(timestamp,checkpointId,checkpoint.getCheckpointStorageLocation(),request.props,executions,request.advanceToEndOfTime);....}public static CompletableFuture<AllCoordinatorSnapshots> triggerAllCoordinatorCheckpoints(final Collection<OperatorCoordinatorCheckpointContext> coordinators,final long checkpointId) throws Exception {//每个coordinator对应一个SourcejobVertexfinal Collection<CompletableFuture<CoordinatorSnapshot>> individualSnapshots = new ArrayList<>(coordinators.size());for (final OperatorCoordinatorCheckpointContext coordinator : coordinators) {//JM根据SourceOperator个数依次持久化checkpointId。final CompletableFuture<CoordinatorSnapshot> checkpointFuture = triggerCoordinatorCheckpoint(coordinator, checkpointId);individualSnapshots.add(checkpointFuture);}return FutureUtils.combineAll(individualSnapshots).thenApply(AllCoordinatorSnapshots::new);}public static CompletableFuture<CoordinatorSnapshot> triggerCoordinatorCheckpoint(final OperatorCoordinatorCheckpointContext coordinatorContext,final long checkpointId) throws Exception {final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();//持久化checkpointId到state中coordinatorContext.checkpointCoordinator(checkpointId, checkpointFuture);//checkpointId写入完成后封装成CoordinatorSnapshot返回return checkpointFuture.thenApply((state) -> new CoordinatorSnapshot(coordinatorContext, new ByteStreamStateHandle(coordinatorContext.operatorId().toString(), state)));}
当JM端初始化完CheckpointID后,开始依次调用TM上每个Execution的Checkpoint,这里的executions是从source端开始的有序集合
private void snapshotTaskState(long timestamp,long checkpointID,CheckpointStorageLocation checkpointStorageLocation,CheckpointProperties props,Execution[] executions,boolean advanceToEndOfTime) {final CheckpointOptions checkpointOptions = new CheckpointOptions(props.getCheckpointType(),checkpointStorageLocation.getLocationReference(),isExactlyOnceMode,props.getCheckpointType() == CheckpointType.CHECKPOINT && unalignedCheckpointsEnabled);// send the messages to the tasks that trigger their checkpoint// 通过rpcGateway依次调用每个taskExecutor上每个task的checkpointfor (Execution execution: executions) {if (props.isSynchronous()) {execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);} else {execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);}}}private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {final CheckpointType checkpointType = checkpointOptions.getCheckpointType();if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");}final LogicalSlot slot = assignedResource;//当JM申请部署TM和Slot完成后 slot才会被赋值//所以虽然一开始就启动了Checkpoint周期线程但并不会提前触发向TM提交Checkpoint任务if (slot != null) {//调用RpcTaskManagerGatewayfinal TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);} else {LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");}}
进入TM的SubtaskCheckpointCoordinatorImpl的checkpoint核心逻辑,主要分为四步:
- 从CheckpointMetaData获取CheckpointId并检查是否需要终止checkpoint,如果是发送CancelCheckpointMarker事件对象
- 调用是否需要预处理
- 生成CheckpointBarrier并广播到下游,Barrier主要是以CheckpointID和timestamp组成,广播主要逻辑是封装Barrier成基于Heap的HybridMemorySegment的Buffer后添加到ResultSubpartition的PrioritizedDeque中并通知下游InpuGate消费
- 写入UnalignedCheckpoint模式下的数据,默认使用仅一次语义的alignedCheckpoint模式,故这里略过
- 执行state和checkpointId快照,完成以上步骤后向JM发送报告
public void checkpointState(CheckpointMetaData metadata,//主要封装了JM生成的CheckpointID和timestampCheckpointOptions options,CheckpointMetricsBuilder metrics,OperatorChain<?, ?> operatorChain,Supplier<Boolean> isCanceled) throws Exception {....// Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint if necessary.//JM端生成的CheckpointIDlastCheckpointId = metadata.getCheckpointId();//首先检查ck是否需要终止if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {// broadcast cancel checkpoint marker to avoid downstream back-pressure due to checkpoint barrier align.operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId());return;}// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.// The pre-barrier work should be nothing or minimal in the common case.//第一步,预处理,一般我们调用的streamOperator无任何逻辑operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());// Step (2): Send the checkpoint barrier downstream// flink1.11版本新引入的Unaligned特性,参考社区FLIP-76,在需要提高checkpoint吞吐量并且不要求数据精准一次性情况下可考虑使用// 封装优先级buffer后add到ResultSubpartition的PrioritizedDeque队列中,更新buffer和backlog数// 当notifyDataAvailable=true时 通知下游消费// 下游CheckpointedInputGate拿到buffer后匹配到是checkpoint事件做出相应动作operatorChain.broadcastEvent(//创建Barrier,主要封装的CheckpointID和timestampnew CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),options.isUnalignedCheckpoint());// Step (3): Prepare to spill the in-flight buffers for input and output// aligned模式直接跳过if (options.isUnalignedCheckpoint()) {// output data already written while broadcasting eventchannelStateWriter.finishOutput(metadata.getCheckpointId());}// Step (4): Take the state snapshot. This should be largely asynchronous, to not impact progress of the// streaming topologyMap<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators());try {// takeSnapshotSync 执行checkpoint核心逻辑的入口if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled)) {// finishAndReportAsync 完成snapshot后,向jobMaster发送报告finishAndReportAsync(snapshotFutures, metadata, metrics, options);} else {cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));}} catch (Exception ex) {cleanup(snapshotFutures, metadata, metrics, ex);throw ex;}}
步骤3:向下游广播checkpointEvent
默认Stream模式下ResultPartitionType是PIPELINED或者PIPELINED_BOUNDED,故这里调用 BufferWritingResultPartition的broadcastEvent
public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {for (RecordWriterOutput<?> streamOutput : streamOutputs) {streamOutput.broadcastEvent(event, isPriorityEvent);}}public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {checkInProduceState();finishBroadcastBufferBuilder();finishUnicastBufferBuilders();//封装成带优先级的buffer并add到(Pipelined/BoundedBlocking)ResultSubpartition的PrioritizedDeque队列中try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event, isPriorityEvent)) {for (ResultSubpartition subpartition : subpartitions) {// Retain the buffer so that it can be recycled by each channel of targetPartitionsubpartition.add(eventBufferConsumer.copy(), 0);}}}
生成基于heap的最小内存数据结构segment并封装成BufferConsumer
public static BufferConsumer toBufferConsumer(AbstractEvent event, boolean hasPriority) throws IOException {final ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event);//调用基于Heap的HybridMemorySegment构造函数MemorySegment data = MemorySegmentFactory.wrap(serializedEvent.array());return new BufferConsumer(data, FreeingBufferRecycler.INSTANCE, getDataType(event, hasPriority));}public static MemorySegment wrap(byte[] buffer) {return new HybridMemorySegment(buffer, null);}
添加到ResultSubpartition的PrioritizedDeque中并通知下游消费
private boolean add(BufferConsumer bufferConsumer, int partialRecordLength, boolean finish) {checkNotNull(bufferConsumer);final boolean notifyDataAvailable;int prioritySequenceNumber = -1;synchronized (buffers) {if (isFinished || isReleased) {bufferConsumer.close();return false;}// Add the bufferConsumer and update the stats//增加buffer到PrioritizedDeque里,优先级高的buffer放入队列头if (addBuffer(bufferConsumer, partialRecordLength)) {prioritySequenceNumber = sequenceNumber;}updateStatistics(bufferConsumer);//总buffer数+1increaseBuffersInBacklog(bufferConsumer);//总backlog数+1notifyDataAvailable = finish || shouldNotifyDataAvailable();isFinished |= finish;}if (prioritySequenceNumber != -1) {notifyPriorityEvent(prioritySequenceNumber);}//如果可用(比如数据完整,非阻塞模式等) 就通知下游inputGate来消费if (notifyDataAvailable) {notifyDataAvailable();}return true;}private boolean addBuffer(BufferConsumer bufferConsumer, int partialRecordLength) {assert Thread.holdsLock(buffers);if (bufferConsumer.getDataType().hasPriority()) {return processPriorityBuffer(bufferConsumer, partialRecordLength);}//生产BufferConsumerWithPartialRecordLength(可能这个buffer只包含了部分record(长度过长溢出到下一个record里了)buffers.add(new BufferConsumerWithPartialRecordLength(bufferConsumer, partialRecordLength));return false;}//包括checkpoint的事件也会放进来,老版本好像是ArrayDeque,并不支持事件优先级private final PrioritizedDeque<BufferConsumerWithPartialRecordLength> buffers = new PrioritizedDeque<>();
继续调用CreditBasedSequenceNumberingViewReader的notifyReaderNonEmpty
这里调用的是Netty的ChannelPipeline的fireUserEventTriggered向下游监听的availabilityListener发送事件消息
void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {// The notification might come from the same thread. For the initial writes this// might happen before the reader has set its reference to the view, because// creating the queue and the initial notification happen in the same method call.// This can be resolved by separating the creation of the view and allowing// notifications.// TODO This could potentially have a bad performance impact as in the// worst case (network consumes faster than the producer) each buffer// will trigger a separate event loop task being scheduled.ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader));}//netty4.io.netty.channel包下的ChannelPipeline
ChannelPipeline fireUserEventTriggered(Object var1);
广播完CheckpointBarrier后我们再主要看看执行checkpoint的快照逻辑,这里也会涉及到Flink的State状态管理和持久化方式
private boolean takeSnapshotSync(Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress,CheckpointMetaData checkpointMetaData,CheckpointMetricsBuilder checkpointMetrics,CheckpointOptions checkpointOptions,OperatorChain<?, ?> operatorChain,Supplier<Boolean> isCanceled) throws Exception {....long checkpointId = checkpointMetaData.getCheckpointId();long started = System.nanoTime();ChannelStateWriteResult channelStateWriteResult = checkpointOptions.isUnalignedCheckpoint() ?channelStateWriter.getAndRemoveWriteResult(checkpointId) :ChannelStateWriteResult.EMPTY;//解索checkpoint的存储位置(Memory/FS/RocksDB)CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(checkpointId, checkpointOptions.getTargetLocation());try {for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) {if (!operatorWrapper.isClosed()) {operatorSnapshotsInProgress.put(operatorWrapper.getStreamOperator().getOperatorID(),//执行checkpoint入口buildOperatorSnapshotFutures(checkpointMetaData,checkpointOptions,operatorChain,operatorWrapper.getStreamOperator(),isCanceled,channelStateWriteResult,storage));}}} finally {checkpointStorage.clearCacheFor(checkpointId);}...}
持久化State和checkpointId
void snapshotState(CheckpointedStreamOperator streamOperator,Optional<InternalTimeServiceManager<?>> timeServiceManager,String operatorName,long checkpointId,long timestamp,CheckpointOptions checkpointOptions,CheckpointStreamFactory factory,OperatorSnapshotFutures snapshotInProgress,StateSnapshotContextSynchronousImpl snapshotContext) throws CheckpointException {try {if (timeServiceManager.isPresent()) {checkState(keyedStateBackend != null, "keyedStateBackend should be available with timeServiceManager");timeServiceManager.get().snapshotState(snapshotContext, operatorName);}//执行需要持久化state的操作//比如map操作,它生成的是StreamMap属于AbstractUdfStreamOperator子类,里面封装了snapshotState逻辑,如果没实现ck接口就跳过此步骤streamOperator.snapshotState(snapshotContext);snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());//DefaultOperatorStateBackend,持久化到内存中if (null != operatorStateBackend) {snapshotInProgress.setOperatorStateManagedFuture(//持久化当前checkpointId等属性operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}//有HeapKeyedStateBackend,RocksDBKeyedStateBackend等基于Key的持久化方式if (null != keyedStateBackend) {snapshotInProgress.setKeyedStateManagedFuture(//持久化当前checkpointId等属性keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}} catch (Exception snapshotException) {try {snapshotInProgress.cancel();} catch (Exception e) {snapshotException.addSuppressed(e);}String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " +operatorName + ".";try {snapshotContext.closeExceptionally();} catch (IOException e) {snapshotException.addSuppressed(e);}throw new CheckpointException(snapshotFailMessage, CheckpointFailureReason.CHECKPOINT_DECLINED, snapshotException);}}
如果用户实现了Checkpoint接口则会持久化到指定的stateBackend中反之略过...
这里以AbstractUdfStreamOperator为例(map,filter等Operator都继承了该abstract类)
public void snapshotState(StateSnapshotContext context) throws Exception {super.snapshotState(context);//判断userFunction是否属于CheckpointedFunction或者ListCheckpointed的实例//如果是则调用用户实现的snapshotState执行相关逻辑//比如FlinkKafkaConsumerBase则自己实现了CheckpointedFunction的接口StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);}//userFunction : 用户定义的functionpublic static void snapshotFunctionState(StateSnapshotContext context,OperatorStateBackend backend,Function userFunction ) throws Exception {Preconditions.checkNotNull(context);Preconditions.checkNotNull(backend);while (true) {// 用户是否有自定义checkpoint逻辑if (trySnapshotFunctionState(context, backend, userFunction)) {break;}// inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function// 比如window后的process算子 会生成WrappingFunction的子类InternalIterableProcessWindowFunction,赋值给userFunction后继续while循环if (userFunction instanceof WrappingFunction) {userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();} else {break;}}}
通常我们可能会在算子中实现CheckpointedFunction或ListCheckpointed来自定义checkpoint时的state的存储逻辑和初始化逻辑
Checkpoint接口中的initializeState会在启动task时调用beforeInvoke->operatorChain.initializeStateAndOpenOperators时初始化state(后续章节分析)
private static boolean trySnapshotFunctionState(StateSnapshotContext context,OperatorStateBackend backend,Function userFunction) throws Exception {//判断用户是否实现CheckpointedFunction接口if (userFunction instanceof CheckpointedFunction) {//执行snapshotState逻辑((CheckpointedFunction) userFunction).snapshotState(context);return true;}//判断用户是否实现ListCheckpointed接口if (userFunction instanceof ListCheckpointed) {......}
当state持久化完毕后就会把checkpoint也持久到指定的stateBackend中。这样整个快照生成完毕,最后Flink会调用finishAndReportAsync向Master发送完成报告,而下游的Operator继续重复以上步骤直到Master收到所有节点的完成报告,这时Master会生成CompletedCheckpoint持久化到指定stateBackend中(如果整个Checkpoint中间有超时或者节点挂掉等造成Master无法收集完整的各节点报告则会宣告失败并删除这次所有产生的状态数据),至此 整个Checkpoint结束。
==========================================================================================================================================================================================================================================================================================================================
Flink1.12源码解读——Checkpoint详细执行过程相关推荐
- Zeus源码解读之定时任务执行与手动执行任务的过程分析
Zeus源码解读之定时任务执行与手动执行任务的过程分析 zeus集群依赖任务执行模式 宙斯中任务出去任务独立调度之外,支持任务直接的复杂依赖调度,如下图一所示: 图1 A为根任务,B,C依赖A任务 ...
- PTMs:QLoRA技巧的简介、使用方法、论文解读、源码解读之详细攻略
PTMs:QLoRA技巧的简介.使用方法.论文解读.源码解读之详细攻略 目录 QLoRA技巧的简介 1.量化.分页优化器 QLoRA技巧的使用方法 1.安装 2.入
- MyBatis 源码分析 - SQL 的执行过程
本文速览 本篇文章较为详细的介绍了 MyBatis 执行 SQL 的过程.该过程本身比较复杂,牵涉到的技术点比较多.包括但不限于 Mapper 接口代理类的生成.接口方法的解析.SQL 语句的解析.运 ...
- Fabric中PBFT源码解读——Checkpoint机制
文章目录 1. 写在前面 1.1 前置阅读 1.2 对TestCheckpoint函数的测试 2. 对TestCheckpoint函数运行流程的解读 2.1 Checkpoint和Water mark ...
- MyBatis(12) 源码解析之SQL执行流程
一.前言 资料 mybatis文档:https://mybatis.org/mybatis-3/index.html mybatis源码:https://github.com/mybatis/myba ...
- 走近源码:Redis命令执行过程(客户端)
前面我们了解过了当Redis执行一个命令时,服务端做了哪些事情,不了解的同学可以看一下这篇文章走近源码:Redis如何执行命令.今天就一起来看看Redis的命令执行过程中客户端都做了什么事情. 启动客 ...
- Mybatis 框架源码解读(详细流程图+时序图)
看源码都要带着问题去看,比如 UserMapper.java只有接口而没有实现类,那么是如何执行的呢? mybatis中一级缓存是如何进行缓存与维护的? 底层是如何执行query查询的 查询后的结果是 ...
- mysql 执行概况_转mysql源码分析之SQL执行过程简介
本人打算从SQL语句的执行开始学习和分析MYSQL源码,首先了解MYSQL是如何执行一条SQL语句的,详细了解它的执行过程之后,再深入学习执行一条SQL语句的运行原理. 1)从执行一条SQL语句的堆栈 ...
- openstack nova 源码解析 — Nova API 执行过程从(novaclient到Action)
目录 目录 Nova API Nova API 的执行过程 novaclient 将 Commands 转换为标准的HTTP请求 PasteDeploy 将 HTTP 请求路由到具体的 WSGI Ap ...
- PostgreSQL 源码解读(154)- 后台进程#6(walsender#2)
本节继续介绍PostgreSQL的后台进程walsender,重点介绍的是调用栈中的exec_replication_command和StartReplication函数. 调用栈如下: (gdb) ...
最新文章
- python用selenium爬取网页数据_Python项目实战:使用selenium爬取拉勾网数据
- 最新鲜最详细的Android SDK下载安装及配置教程
- Linux nginx搭建文件服务器
- PATH环境变量的相关操作
- 【Github教程】史上最全github用法:github入门到精通
- 使用cadence建封装
- python plot方法的使用_Python bokeh.plotting.figure.step()用法及代码示例
- 求最大公约数欧几里得算法
- tp5 MySQL发红包功能_ThinkPHP5微信现金红包的开发
- 沟通中的情绪管理(演讲稿)
- 寻找丢失的LZY(dfs)
- C++综合练习——身份证
- 原生WebGL场景中绘制多个圆锥圆柱
- APP系列,学院专题讲座图像记录软件推荐
- 数组排序(5) 快速排序之三指针分区法
- 深入浅出学算法——n个1(整除求余的优化)
- 【全志T113-S3_100ask】6-编写IIC驱动GY-302(twi)
- dhu 6 获取AOE网的关键路径
- 磊科nw705p虚拟服务器设置,教你如何设置磊科nw705p无线路由器的详细步骤【图文】...
- android 模拟器实现发短信
热门文章
- 神工鬼斧惟肖惟妙,M1 mac系统深度学习框架Pytorch的二次元动漫动画风格迁移滤镜AnimeGANv2+Ffmpeg(图片+视频)快速实践
- 马士兵的经典名言!!!
- 盲盒抽奖微信小程序源码
- html如何制作水滴效果图,html+css实现充电水滴融合特效代码
- 用JAVA爬虫爬网站的图片
- 数理统计复习笔记二——充分统计量
- Oracle Lob介绍
- oracle lob类型和mysql text_OracleLob类型存储浅析
- 简析also, too, as well,either用法
- 河北省级环保督察回头看全覆盖 大气污染问题成举报重点