本文作者:赵旭阳

一、简介

我们在进行 Android 开发的时候,会通过创建一个 Handler 并调用其 sendMessage  或 Post 方法来进行异步消息调用,其背后涉及到了三个面试经常被问的类:Handler,Looper,MessageQueue,内部原理我想做过 Android 开发的基本都了解。

  1. Event queue : 包含了所有的外部事件:I/O,鼠标点击,绘制,定时器,Dart isolate 的消息等,其实这块又根据消息的优先级细分成了两个队列,后面会有介绍。

  2. Microtask queue :事件处理代码有时需要在当前 event 之后,且在下一个 event 之前做一些任务。

  • 两种队列的消息处理流程大致如图所示:


dart 提供了 dart:async 库来对这两个队列进行操作,主要是如下两个API:

  1. Future 类,创建一个定时器事件到 Event queue 尾部。

  2. scheduleMicrotask() 方法,添加一个事件到 Microtask queue 尾部

下面将会分析这两个 API 背后的实现,涉及的是 flutter engine 的源码,可以参考官方的 wiki 来下载:

二、初始化

1.  创建 MessageLoop

  • 在启动 Flutter 的时候,引擎会额外创建三个线程:UI Thread,IO Thread, GPU Thread,并为每个线程都创建一个 MessageLoop,之后各个线程就进入的消息循环的状态,等待新的消息来处理,具体流程如下:

ThreadHost thread_host_;
AndroidShellHolder::AndroidShellHolder(flutter::Settings settings,fml::jni::JavaObjectWeakGlobalRef java_object,bool is_background_view): settings_(std::move(settings)), java_object_(java_object) {
......if (is_background_view) {thread_host_ = {thread_label, ThreadHost::Type::UI};} else {// 创建三个线程thread_host_ = {thread_label, ThreadHost::Type::UI | ThreadHost::Type::GPU |ThreadHost::Type::IO};}
......
}
  • 进一步分析 ThreadHost 的创建,最后来到了创建 Thread,每个 Thread 都会创建一个 MessageLoop 并获取其 TaskRunner,TaskRunner 是用来外部向 MessageLoop 中 Post Task 的。顺带说一下,在 Android 上 MessageLoop 的实现还是使用的系统自身的 Looper 机制,这里是通过 NDK 的 ALooper 相关接口来实现的。具体代码如下:

Thread::Thread(const std::string& name) : joined_(false) {fml::AutoResetWaitableEvent latch;fml::RefPtr<fml::TaskRunner> runner;thread_ = std::make_unique<std::thread>([&latch, &runner, name]() -> void {SetCurrentThreadName(name);// 创建 MessageLoopfml::MessageLoop::EnsureInitializedForCurrentThread();auto& loop = MessageLoop::GetCurrent();// 获取 TaskRunnerrunner = loop.GetTaskRunner();latch.Signal();loop.Run();});latch.Wait();task_runner_ = runner;
}
  • MessageLoop 创建好后,我们就可以通过 TaskRunner 向其发送 Task 了,这里需要注意 MessageLoop 执行的 Task 仅是一个 无参的闭包 。类似这样:

auto jni_exit_task([key = thread_destruct_key_]() {FML_CHECK(pthread_setspecific(key, reinterpret_cast<void*>(1)) == 0);
});
thread_host_.ui_thread->GetTaskRunner()->PostTask(jni_exit_task);

2.  创建 Root Isolate

在 dart 语言中是没有线程的,而是使用类似于线程但相互之间不共享堆内存的 isolate,代表一个独立的 dart 程序执行环境。同样 Flutter 的 dart 代码是运行在一个叫 root isolate 的 isolate 中,下面简要列下 root isolate 的创建过程。

a.启动 dart vm

这个步骤的具体流程,大家可以顺着 /engine-v1.5.4/src/flutter/runtime/dart_vm.cc 中的 DartVM 构造方法去跟进分析。在 dart vm 启动过程中会创建 vm isolate 和 PortMap,这两个的具体作用下面有介绍。

b.创建 root isolate

root isolate 是在 UI 线程中创建的,具体流程见 /src/flutter/runtime/dart_isolate.cc 的 CreateRootIsolate 方法。由于 isolate 是对当前线程执行环境的一个抽象表示,所以其内部存储了很多信息,对于异步消息这块有四个关键的信息是需要注意的。

下面三个字段是定义在 dart vm 层的 Isolate 类中,具体见 /src/third_party/dart/runtime/vm/isolate.h。

  • main_port :可以看做该 isolate 的标识,是一个整数;

  • message_handler :顾名思义,用来管理每个 isolate 的 Event queue,其内部根据 message 的优先级将消息分为了两个队列:普通优先级的 queue 和 OOB 优先级的 oob_queue。了解 TCP 协议的应该了解 TCP 中有带外数据(即优先数据),isolate 的 OOB 优先级也是类似的意思,OOB 优先级的消息会被优先处理,目前看到有这么几种 OOB 消息:

// 这些主要和 isolate 的生命周期相关
kPauseMsg = 1,
kResumeMsg = 2,
kPingMsg = 3,
kKillMsg = 4,
kAddExitMsg = 5,
kDelExitMsg = 6,
kAddErrorMsg = 7,
kDelErrorMsg = 8,
kErrorFatalMsg = 9,
// 可以注意下 kLowMemoryMsg ,如果有大量 OOB 怀疑是内存不够了
kInterruptMsg = 10,     // Break in the debugger.
kInternalKillMsg = 11,  // Like kill, but does not run exit listeners, etc.
kLowMemoryMsg = 12,     // Run compactor, etc.
kDrainServiceExtensionsMsg = 13,  // Invoke pending service extensions
  • message_notify_callback :message_handler 收到消息后会调用该变量指向的函数去处理;

Flutter 引擎层会对 dart vm 的 isolate 实例做一层包装( DartIsolate 类),其内部定义了:

  • microtask_queue: 用来存储 microtask 消息,可见在 Flutter 引擎中,Microtask queue 并不是由 dart vm 层来管理的

    前面已经说过 isolate 之间是不能直接互相访问,如图:

可以看出 isolate 之间是共享 vm isolate 的堆内存区域的,有点类似于操作系统的内核空间,vm isolate 的堆内存存储了 dart vm 内部的核心数据(内置类,内置对象)。除了 vm isolate,不同 isolate 之间的堆内存是不能直接访问的,为此 dart vm 提供了 isolate 之间的通信机制,负责通信路由的大管家就是 PortMap,其内部实现就是一张 Hash 表,Key 为 isolate 的 main_port,Value 为 isolate 的 message_handler。

三、创建 Future

使用 dart 开发一定会用到 Future,当我们通过 Future 构造方法创建一个实例的时候,就会创建一个定时器消息到 Event queue,下面我们将分析这个流程。整体架构图:

1.  dart 层创建 Future

创建 Future 的时候,内部会通过 Timer 构造一个定时器,具体代码如下:

/src/out/host_release/dart-sdk/lib/async/future.dart

factory Future(FutureOr<T> computation()) {_Future<T> result = new _Future<T>();Timer.run(() {try {result._complete(computation());} catch (e, s) {_completeWithErrorCallback(result, e, s);}});return result;
}

跟进 Timer 的实现,具体代码如下:

/src/out/host_release/dart-sdk/lib/async/timer.dart

static void run(void callback()) {new Timer(Duration.zero, callback);
}factory Timer(Duration duration, void callback()) {if (Zone.current == Zone.root) {return Zone.current.createTimer(duration, callback);}
......
}

这里假定 duration == Duration.zero,Zone.current == Zone.root ,进而到 rootZone 的 createTimer 方法,里面又调用了 Timer 的 _createTimer 方法:

/src/out/host_release/dart-sdk/lib/async/zone.dart

class _RootZone extends _Zone {
......Timer createTimer(Duration duration, void f()) {return Timer._createTimer(duration, f);}
......
}

/src/out/host_release/dart-sdk/lib/async/timer.dart

external static Timer _createTimer(Duration duration, void callback());

可以看到 _createTimer 方法是个 external 的,按照 dart 语言的规范,external 方法的实现都在对应的 patch 文件中( timer_patch.dart),内部通过 _TimerFactory._factory 来创建 Timer,具体代码如下:

/src/third_party/dart/runtime/lib/timer_patch.dart

@patch
class Timer {
......@patchstatic Timer _createTimer(Duration duration, void callback()) {if (_TimerFactory._factory == null) {_TimerFactory._factory = VMLibraryHooks.timerFactory;}
......int milliseconds = duration.inMilliseconds;if (milliseconds < 0) milliseconds = 0;// 注意此处将外部的 callback 又包了一层return _TimerFactory._factory(milliseconds, (_) {callback();}, false);}
......
}

通过上面的代码,我们知道 _TimerFactory._factory = VMLibraryHooks.timerFactory, VMLibraryHooks.timerFactory 又是在 root isolate 初始化时通过调用 _setupHooks 方法设置的,具体代码如下:

/src/third_party/dart/runtime/lib/timer_impl.dart

@pragma("vm:entry-point", "call")
_setupHooks() {VMLibraryHooks.timerFactory = _Timer._factory;
}
// VMLibraryHooks.timerFactory 指向的该方法
// 我们假设创建的是非 repeating 消息,并且 milliSeconds 为 0
static Timer _factory(int milliSeconds, void callback(Timer timer), bool repeating) {
......return new _Timer(milliSeconds, callback);}
}factory _Timer(int milliSeconds, void callback(Timer timer)) {return _createTimer(callback, milliSeconds, false);
}
// 创建一个 Timer 实例并调用 _enqueue 将其加入到队列
static Timer _createTimer(void callback(Timer timer), int milliSeconds, bool repeating) {
......_Timer timer =new _Timer._internal(callback, wakeupTime, milliSeconds, repeating);timer._enqueue();return timer;
}_Timer._internal(this._callback, this._wakeupTime, this._milliSeconds, this._repeating): _id = _nextId();// 这里 _milliSeconds == 0,会向 ZeroTimer 队列插入消息,然后调用 _notifyZeroHandler
void _enqueue() {if (_milliSeconds == 0) {if (_firstZeroTimer == null) {_lastZeroTimer = this;_firstZeroTimer = this;} else {_lastZeroTimer._indexOrNext = this;_lastZeroTimer = this;}// Every zero timer gets its own event._notifyZeroHandler();} else {......// 延迟消息这里先不分析}
}

折腾了一大圈,最后只是构造了一个 _Timer 实例并把其加入到 ZeroTimer 队列中,如果是延迟消息则会加入到 TimeoutTimerHeap 中,最后调用 _notifyZeroHandler 方法, 其主要做如下操作:

  • 创建 RawReceivePort 并设置一个叫 _handleMessage 方法做为引擎层的回调方法

  • 向引擎层 Event queue 发送一个普通优先级的  _ZERO_EVENT ,引擎层处理该消息的时候会最终回调到上面设置的 _handleMessage 方法。

具体代码如下:

/src/third_party/dart/runtime/lib/timer_impl.dart

static void _notifyZeroHandler() {if (_sendPort == null) {_createTimerHandler();}
// 底层会调到 PortMap 的 PostMessage 方法,进而唤醒消息处理,后面会分析这个流程_sendPort.send(_ZERO_EVENT);
}
// 创建和引擎层通信的 RawReceivePort,并设置引擎层的回调方法 _handleMessage
static void _createTimerHandler() {assert(_receivePort == null);assert(_sendPort == null);_receivePort = new RawReceivePort(_handleMessage);_sendPort = _receivePort.sendPort;_scheduledWakeupTime = null;
}

/src/third_party/dart/runtime/lib/isolate_patch.dart

@patch
class RawReceivePort {@patchfactory RawReceivePort([Function handler]) {_RawReceivePortImpl result = new _RawReceivePortImpl();result.handler = handler;return result;}
}// 最终将回调设置到 _RawReceivePortImpl 的 _handlerMap 中,引擎层会从这个 map 寻找消息的 handler
@pragma("vm:entry-point")
class _RawReceivePortImpl implements RawReceivePort {void set handler(Function value) {_handlerMap[this._get_id()] = value;}
}

_handleMessage 回调方法会收集 Timer 并执行,具体代码实现如下:

/src/third_party/dart/runtime/lib/timer_impl.dart

static void _handleMessage(msg) {var pendingTimers;if (msg == _ZERO_EVENT) {// 找到所有的待处理 TimerspendingTimers = _queueFromZeroEvent();assert(pendingTimers.length > 0);} else {......// 延时消息这里不分析}
// 处理Timer,即调用设置的 callback_runTimers(pendingTimers);
......
}

2.  向 Event Queue 发送消息

前面说到 RawReceiverPort 会向引擎层 Event queue 发送一个 _ZERO_EVENT  ,其内部是通过调用 PortMap 的 PostMessage 方法将消息发送到 Event queue,该方法首先会根据接收方的 port id 找到对应的 message_handler,然后将消息根据优先级保存到相应的 queue 中,最后唤醒 message_notify_callback 回调函数 ,具体代码如下:

/src/third_party/dart/runtime/vm/port.cc

bool PortMap::PostMessage(Message* message, bool before_events) {......intptr_t index = FindPort(message->dest_port());......MessageHandler* handler = map_[index].handler;......handler->PostMessage(message, before_events);return true;
}

/src/third_party/dart/runtime/vm/message_handler.cc

void MessageHandler::PostMessage(Message* message, bool before_events) {Message::Priority saved_priority;bool task_running = true;......// 根据消息优先级进入不同的队列if (message->IsOOB()) {oob_queue_->Enqueue(message, before_events);} else {queue_->Enqueue(message, before_events);}......
//唤醒并处理消息MessageNotify(saved_priority);
}

/src/third_party/dart/runtime/vm/isolate.cc

void IsolateMessageHandler::MessageNotify(Message::Priority priority) {if (priority >= Message::kOOBPriority) {I->ScheduleInterrupts(Thread::kMessageInterrupt);}
// 最后调用的 message_notify_callback 所指向的函数Dart_MessageNotifyCallback callback = I->message_notify_callback();if (callback) {(*callback)(Api::CastIsolate(I));}
}

3.   Event Queue 消息处理

前面消息已经发送成功并调用了消息处理唤醒的操作,下面我们需要知道 message_notify_callback 所指向的函数的实现, root isolate 在初始化时会设置该变量,具体代码如下:

/src/flutter/runtime/dart_isolate.cc

bool DartIsolate::Initialize(Dart_Isolate dart_isolate, bool is_root_isolate) {......// 设置 message handler 的 task runner 为 UI Task RunnerSetMessageHandlingTaskRunner(GetTaskRunners().GetUITaskRunner(),is_root_isolate);......return true;
}
void DartIsolate::SetMessageHandlingTaskRunner(fml::RefPtr<fml::TaskRunner> runner,bool is_root_isolate) {......message_handler().Initialize([runner](std::function<void()> task) { runner->PostTask(task); });
}

进一步跟进分析发现通过 Dart_SetMessageNotifyCallback 将  root isolate 的 message_notify_callback 设置为 MessageNotifyCallback 方法,具体代码如下:

/src/third_party/tonic/dart_message_handler.cc

void DartMessageHandler::Initialize(TaskDispatcher dispatcher) {TONIC_CHECK(!task_dispatcher_ && dispatcher);task_dispatcher_ = dispatcher;Dart_SetMessageNotifyCallback(MessageNotifyCallback);
}

MessageNotifyCallback 会在 Event queue 收到消息后执行,但其执行过程中并没有拿到 Event queue 中的消息,而是往 UI Thread 的 MessageLoop Post 了一个 Task 闭包,这个 Task 闭包会通过调用 Dart_HandleMessage 来处理 Event queue 中的消息,具体代码流程如下:

/src/third_party/tonic/dart_message_handler.cc

void DartMessageHandler::MessageNotifyCallback(Dart_Isolate dest_isolate) {auto dart_state = DartState::From(dest_isolate);TONIC_CHECK(dart_state);dart_state->message_handler().OnMessage(dart_state);
}void DartMessageHandler::OnMessage(DartState* dart_state) {auto task_dispatcher_ = dart_state->message_handler().task_dispatcher_;// 往 ui 线程 MessageLoop Post 了一个 Taskauto weak_dart_state = dart_state->GetWeakPtr();task_dispatcher_([weak_dart_state]() {if (auto dart_state = weak_dart_state.lock()) {dart_state->message_handler().OnHandleMessage(dart_state.get());}});
}void DartMessageHandler::OnHandleMessage(DartState* dart_state) {......if (Dart_IsPausedOnStart()) {......} else if (Dart_IsPausedOnExit()) {......} else {// 调用 Dart_HandleMessage 方法处理消息result = Dart_HandleMessage();......}......
}

Dart_HandleMessage 的实现很简单,只是调用 message_handler 的 HandleNextMessage 方法,具体代码实现如下:

/src/third_party/dart/runtime/vm/dart_api_impl.cc

DART_EXPORT Dart_Handle Dart_HandleMessage() {
......if (I->message_handler()->HandleNextMessage() != MessageHandler::kOK) {return Api::NewHandle(T, T->StealStickyError());}return Api::Success();
}

我们进一步跟进  HandleNextMessage 方法的实现,最终来到如下代码:

/src/third_party/dart/runtime/vm/message_handler.cc

// 依次遍历 message_handler 的消息队列,对每个消息进程处理
MessageHandler::MessageStatus MessageHandler::HandleMessages(MonitorLocker* ml,bool allow_normal_messages,bool allow_multiple_normal_messages) {
......Message* message = DequeueMessage(min_priority);while (message != NULL) {......MessageStatus status = HandleMessage(message);......message = DequeueMessage(min_priority);}return max_status;
}// 取消息的时候会优先处理 OOB Message
Message* MessageHandler::DequeueMessage(Message::Priority min_priority) {Message* message = oob_queue_->Dequeue();if ((message == NULL) && (min_priority < Message::kOOBPriority)) {message = queue_->Dequeue();}return message;
}

每个消息的处理都是在 HandleMessage 方法中,该方法会根据不同的消息优先级做相应的处理,具体代码如下:

/src/third_party/dart/runtime/vm/isolate.cc

MessageHandler::MessageStatus IsolateMessageHandler::HandleMessage(Message* message) {
......Object& msg_handler = Object::Handle(zone);
// 非 OOB 消息,需要获取 dart 层的  handler 函数if (!message->IsOOB() && (message->dest_port() != Message::kIllegalPort)) {msg_handler = DartLibraryCalls::LookupHandler(message->dest_port());......}
......MessageStatus status = kOK;if (message->IsOOB()) {// 处理 OOB 消息,详细实现可自己看代码,这里不分析OOB消息......} else if (message->dest_port() == Message::kIllegalPort) {......} else {......// 调用前面找到的 msg_handler 来处理普通消息const Object& result =Object::Handle(zone, DartLibraryCalls::HandleMessage(msg_handler, msg));......}delete message;return status;
}

这里我们主要看普通消息的处理逻辑,首先会通过调用 DartLibraryCalls::LookupHandler 方法来从 dart 层寻找相应的 handler 函数,然后通过 DartLibraryCalls::HandleMessage 执行相应的处理函数,具体实现代码如下:

/src/third_party/dart/runtime/vm/dart_entry.cc

RawObject* DartLibraryCalls::LookupHandler(Dart_Port port_id) {Thread* thread = Thread::Current();Zone* zone = thread->zone();Function& function = Function::Handle(zone, thread->isolate()->object_store()->lookup_port_handler());const int kTypeArgsLen = 0;const int kNumArguments = 1;
// 如果没有消息处理方法,则进行查找,最终找到的是 RawReceivePortImpl 的 _lookupHandler 方法。if (function.IsNull()) {Library& isolate_lib = Library::Handle(zone, Library::IsolateLibrary());ASSERT(!isolate_lib.IsNull());const String& class_name = String::Handle(zone, isolate_lib.PrivateName(Symbols::_RawReceivePortImpl()));const String& function_name = String::Handle(zone, isolate_lib.PrivateName(Symbols::_lookupHandler()));function = Resolver::ResolveStatic(isolate_lib, class_name, function_name,kTypeArgsLen, kNumArguments,Object::empty_array());ASSERT(!function.IsNull());thread->isolate()->object_store()->set_lookup_port_handler(function);}
// 执行消息处理函数const Array& args = Array::Handle(zone, Array::New(kNumArguments));args.SetAt(0, Integer::Handle(zone, Integer::New(port_id)));const Object& result =Object::Handle(zone, DartEntry::InvokeFunction(function, args));return result.raw();
}

最终执行的是 RawReceivePortImpl 的 _lookupHandler 方法,在前面在创建 Future 的时候我们已经设置 _handleMessage 到 _handlerMap 中,_lookupHandler 方法会从 _handlerMap 中找到设置的回调方法,最后执行回调方法。具体代码如下:

/src/third_party/dart/runtime/lib/isolate_patch.dart

@pragma("vm:entry-point")
class _RawReceivePortImpl implements RawReceivePort {......// Called from the VM to retrieve the handler for a message.@pragma("vm:entry-point", "call")static _lookupHandler(int id) {var result = _handlerMap[id];return result;}
......
}

Future 的创建到这就分析完了,整个过程涉及到了 EventQueue 的消息收发。

至此,通过分析 Future 我们已经把 Event Queue 的消息处理流程了解了。

四、Microtask

1.  向  Microtask queue 发送消息

假设 Zone. current 为 rootZone, 直接看 scheduleMicrotask 方法的实现:

/src/third_party/dart/sdk/lib/async/schedule_microtask.dart

void scheduleMicrotask(void callback()) {_Zone currentZone = Zone.current;if (identical(_rootZone, currentZone)) {_rootScheduleMicrotask(null, null, _rootZone, callback);return;}
......
}

跟进 _rootScheduleMicrotask 方法的实现,最终来到 _scheduleAsyncCallback 方法,该方法做了两件事情:

  • 将传入的 callback 加入 callback 队列

  • 将 _startMicrotaskLoop 作为闭包参数调用  _AsyncRun._scheduleImmediate 方法,_startMicrotaskLoop 中会依次执行 callback 队列保存的回调。

具体代码如下:

/src/third_party/dart/sdk/lib/async/schedule_microtask.dart

void _scheduleAsyncCallback(_AsyncCallback callback) {_AsyncCallbackEntry newEntry = new _AsyncCallbackEntry(callback);if (_nextCallback == null) {_nextCallback = _lastCallback = newEntry;if (!_isInCallbackLoop) {_AsyncRun._scheduleImmediate(_startMicrotaskLoop);}} else {_lastCallback.next = newEntry;_lastCallback = newEntry;}
}
// 该方法被作为回调设置到引擎,会在处理所有的 Microtask 的时候执行
void _startMicrotaskLoop() {_isInCallbackLoop = true;try {_microtaskLoop();} finally {_lastPriorityCallback = null;_isInCallbackLoop = false;if (_nextCallback != null) {_AsyncRun._scheduleImmediate(_startMicrotaskLoop);}}
}class _AsyncRun {external static void _scheduleImmediate(void callback());
}

根据前面的经验,会在对应的 patch 文件中找到 _AsyncRun._scheduleImmediate 的实现,其内部调用了 _ScheduleImmediate._closure 指向的方法。

具体代码如下:

/src/third_party/dart/runtime/lib/schedule_microtask_patch.dart

@patch
class _AsyncRun {@patchstatic void _scheduleImmediate(void callback()) {if (_ScheduleImmediate._closure == null) {throw new UnsupportedError("Microtasks are not supported");}_ScheduleImmediate._closure(callback);}
}
// 通过该方法设置 _ScheduleImmediate._closure
@pragma("vm:entry-point", "call")
void _setScheduleImmediateClosure(_ScheduleImmediateClosure closure) {_ScheduleImmediate._closure = closure;
}

那么 _ScheduleImmediate._closure 指向的是什么呢?我们需要找到 _setScheduleImmediateClosure 的调用方。root isolate 初始化时会执行一系列的 vm hook 调用,我们从中找到了 _setScheduleImmediateClosure 的调用,具体代码如下:

/src/flutter/lib/ui/dart_runtime_hooks.cc

static void InitDartAsync(Dart_Handle builtin_library, bool is_ui_isolate) {Dart_Handle schedule_microtask;if (is_ui_isolate) {
// 这里的 builtin_library 是 Flutter 扩展的 ui libraryschedule_microtask =GetFunction(builtin_library, "_getScheduleMicrotaskClosure");} else {......}Dart_Handle async_library = Dart_LookupLibrary(ToDart("dart:async"));Dart_Handle set_schedule_microtask = ToDart("_setScheduleImmediateClosure");Dart_Handle result = Dart_Invoke(async_library, set_schedule_microtask, 1,&schedule_microtask);PropagateIfError(result);
}

进一步跟进,最终找到了 _ScheduleImmediate._closure 指向的方法,是一个 native 实现的函数,具体代码如下:

/src/flutter/lib/ui/natives.dart

Function _getScheduleMicrotaskClosure() => _scheduleMicrotask; void _scheduleMicrotask(void callback()) native 'ScheduleMicrotask';

跟进 _scheduleMicrotask 的 native 实现,发现其会把传入的 _startMicrotaskLoop 方法加入到底层的Microtask queue,具体代码如下:

/src/flutter/lib/ui/dart_runtime_hooks.cc

void ScheduleMicrotask(Dart_NativeArguments args) {Dart_Handle closure = Dart_GetNativeArgument(args, 0);UIDartState::Current()->ScheduleMicrotask(closure);
}

/src/flutter/lib/ui/ui_dart_state.cc

void UIDartState::ScheduleMicrotask(Dart_Handle closure) {if (tonic::LogIfError(closure) || !Dart_IsClosure(closure)) {return;}microtask_queue_.ScheduleMicrotask(closure);
}

2.  Microtask queue 消息处理

前面已经将 _startMicrotaskLoop 方法加入到了 Microtask queue ,那么 Microtask queue 内的方法何时执行呢?我们通过跟进 Microtask queue  的 RunMicrotasks 方法的调用方,最终找到 Microtask queue 内方法的执行时机 FlushMicrotasksNow,具体代码如下:

/src/flutter/lib/ui/ui_dart_state.cc

void UIDartState::FlushMicrotasksNow() {microtask_queue_.RunMicrotasks();
}

再跟进 FlushMicrotasksNow 方法的调用方,发现有两处调用:

  • 这里是在每一帧开始的时候去执行 Microtask

/src/flutter/lib/ui/window/window.cc

void Window::BeginFrame(fml::TimePoint frameTime) {
......UIDartState::Current()->FlushMicrotasksNow();
......
}
  • 另外一处调用是通过 TaskObserve 的形式,具体代码如下:

/src/flutter/lib/ui/ui_dart_state.cc

void UIDartState::AddOrRemoveTaskObserver(bool add) {
......if (add) {
// 这个 add_callback_ 是啥呢?add_callback_(reinterpret_cast<intptr_t>(this),[this]() { this->FlushMicrotasksNow(); });} else {remove_callback_(reinterpret_cast<intptr_t>(this));}
}

跟进 add_callback_ 的赋值,这里是android的实现

/src/flutter/shell/platform/android/flutter_main.cc

void FlutterMain::Init(JNIEnv* env,jclass clazz,jobject context,jobjectArray jargs,jstring bundlePath,jstring appStoragePath,jstring engineCachesPath) {......settings.task_observer_add = [](intptr_t key, fml::closure callback) {fml::MessageLoop::GetCurrent().AddTaskObserver(key, std::move(callback));};
......
}

FlushMicrotasksNow() 是作为 MessageLoop 的 TaskObserver 来执行的, TaskObserver 会在处理完task之后把该 Task 创建的 MicroTask 全部执行,也就是说在下一个 Task 运行前执行。代码如下:

/src/flutter/fml/message_loop_impl.cc

void MessageLoopImpl::FlushTasks(FlushType type) {
......for (const auto& invocation : invocations) {invocation();for (const auto& observer : task_observers_) {observer.second();}}
}

五、结束

通过前面的分析,Flutter 的异步消息处理流程还是挺复杂的,主要是代码写的比较乱,跳转层次太多,可以通过对整个流程的掌控来寻找UI 线程的优化及监控点,进而降低 UI 线程的处理时间,希望本篇文章让大家对 Flutter 的异步消息的整体处理流程有更深的理解。

推荐阅读你们吹捧的鸿蒙,只是另一个FuchsiaAndroid仿微信QQ图片裁剪互联网 HR 黑话大全,太真实了!

编程·思维·职场
欢迎扫码关注

探索 Flutter 异步消息的实现相关推荐

  1. pthread异步_探索 Flutter 异步消息的实现

    本文作者:赵旭阳 字节跳动资深工程师 一.简介 我们在进行 Android 开发的时候,会通过创建一个 Handler 并调用其 sendMessage  或 Post 方法来进行异步消息调用,其背后 ...

  2. 电商异步消息系统的实践

    声明:本文为<程序员>7月期原创投稿文章,未经许可禁止任何形式的转载. 作者:王晓宇,小米网平台研发部软件研发工程师.2015年入职小米,主要负责电商后端仓储物流相关的业务系统开发.曾在西 ...

  3. EJB与JAVA BEAN_J2EE的异步消息机制

    EJB与JAVA BEAN_J2EE的异步消息机制 EJB与JAVA BEAN的区别 Java Bean 是可复用的组件,对Java Bean并没有严格的规范,理论上讲,任何一个Java类都可以是一个 ...

  4. Android异步消息机制

    2019独角兽企业重金招聘Python工程师标准>>> 目录介绍 1.Handler的常见的使用方式 2.如何在子线程中定义Handler 3.主线程如何自动调用Looper.pre ...

  5. kafka之Producer同步与异步消息发送及事务幂等性案例应用实战

    本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客.版权声明:本套Spark商业应用实战归作者(秦凯新)所有,禁止转载,欢迎学习. 秦凯新的技术社区 ...

  6. 异步消息的传递-回调机制

    1 什么是回调 软件模块之间总是存在着一定的接口,从调用方式上,可以把他们分为三类:同步调用.回调和异步调用.同步调用是一种阻塞式调用,调用方要等待对方执行完毕才返回,它是一种单向调用:回调是一种双向 ...

  7. 八.利用springAMQP实现异步消息队列的日志管理

    经过前段时间的学习和铺垫,已经对spring amqp有了大概的了解.俗话说学以致用,今天就利用springAMQP来完成一个日志管理模块.大概的需求是这样的:系统中有很多地方需要记录操作日志,比如登 ...

  8. C#实现异步消息队列

    C#实现异步消息队列 原文:C#实现异步消息队列 拿到新书<.net框架设计>,到手之后迅速读了好多,虽然这本书不像很多教程一样从头到尾系统的讲明一些知识,但是从项目实战角度告诉我们如何使 ...

  9. 微服务中的异步消息通讯

    前言 在上一篇文章中,我们说到了异步消息通讯,下面这篇文章呢,大部分内容是翻译来自于这篇微软的文章,所以其内容还是具有一定的理论指导意义的. 当我们跨多个微服务进行内部通讯的时候,异步消息和事件驱动至 ...

最新文章

  1. Udacity机器人软件工程师课程笔记(三十五) - SLAM - 基于网格的FastSLAM
  2. python argv,Python argv函数简介
  3. cnpm install -g @vue/cli
  4. mybatis Resultmap 与 ResultType 区别
  5. 服务器高并发下出现大量的time wait的解决办法
  6. 【Flink】Flink NoSuchFieldError BIND_PORT
  7. 为什么打完篮球后手接触篮球杆会有触电感?
  8. Ubuntu 安装Jenkins报错
  9. plsa java代码_LDA主题聚类学习小结
  10. PHP文件系统-文件的读写操作
  11. clang命令编译c++程序时报错
  12. COM 组件实现事件、通知
  13. js 计算 往前(后)几天(月、年)
  14. 御剑飞行扫描后门加上burpsuite字典树爆破
  15. java自动化高频面试题
  16. 振铃效应与样点自适应补偿(Sample Adaptive Offset,SAO)技术
  17. 为什么培训班出来的程序员总遭人嫌弃?
  18. 开源房产中介管理系统
  19. android自定义素材拼图,众望所归 美图秀秀Android拼图隆重上线
  20. 经纬度查地址与地址查经纬度

热门文章

  1. 域名可以过户吗?域名过户需要多久?
  2. 信用卡透支忘还钱怎么办?
  3. android魅族升级,Android8.0快来了,魅族手机终于要升级 7.0了!
  4. 3-2加法器、4-2压缩器、5-2压缩器
  5. Android从相册中选取图片上传到阿里云OSS
  6. 如何成为一名懒惰的系统管理员
  7. 红米Note-4G双卡移动版线刷兼救砖_解账户锁_纯净刷机包_教程
  8. Generalizing to Unseen Domains via Adversarial Data Augmentation 正文
  9. wireshark--工具使用记录----TCP acked unseen segment
  10. 俄罗斯央行:犯罪分子很少使用加密货币来回笼资金