探索 Flutter 异步消息的实现
本文作者:赵旭阳
一、简介
我们在进行 Android 开发的时候,会通过创建一个 Handler 并调用其 sendMessage 或 Post 方法来进行异步消息调用,其背后涉及到了三个面试经常被问的类:Handler,Looper,MessageQueue,内部原理我想做过 Android 开发的基本都了解。
Event queue : 包含了所有的外部事件:I/O,鼠标点击,绘制,定时器,Dart isolate 的消息等,其实这块又根据消息的优先级细分成了两个队列,后面会有介绍。
Microtask queue :事件处理代码有时需要在当前 event 之后,且在下一个 event 之前做一些任务。
两种队列的消息处理流程大致如图所示:
dart 提供了 dart:async 库来对这两个队列进行操作,主要是如下两个API:
Future 类,创建一个定时器事件到 Event queue 尾部。
scheduleMicrotask() 方法,添加一个事件到 Microtask queue 尾部
下面将会分析这两个 API 背后的实现,涉及的是 flutter engine 的源码,可以参考官方的 wiki 来下载:
二、初始化
1. 创建 MessageLoop
在启动 Flutter 的时候,引擎会额外创建三个线程:UI Thread,IO Thread, GPU Thread,并为每个线程都创建一个 MessageLoop,之后各个线程就进入的消息循环的状态,等待新的消息来处理,具体流程如下:
ThreadHost thread_host_;
AndroidShellHolder::AndroidShellHolder(flutter::Settings settings,fml::jni::JavaObjectWeakGlobalRef java_object,bool is_background_view): settings_(std::move(settings)), java_object_(java_object) {
......if (is_background_view) {thread_host_ = {thread_label, ThreadHost::Type::UI};} else {// 创建三个线程thread_host_ = {thread_label, ThreadHost::Type::UI | ThreadHost::Type::GPU |ThreadHost::Type::IO};}
......
}
进一步分析 ThreadHost 的创建,最后来到了创建 Thread,每个 Thread 都会创建一个 MessageLoop 并获取其 TaskRunner,TaskRunner 是用来外部向 MessageLoop 中 Post Task 的。顺带说一下,在 Android 上 MessageLoop 的实现还是使用的系统自身的 Looper 机制,这里是通过 NDK 的 ALooper 相关接口来实现的。具体代码如下:
Thread::Thread(const std::string& name) : joined_(false) {fml::AutoResetWaitableEvent latch;fml::RefPtr<fml::TaskRunner> runner;thread_ = std::make_unique<std::thread>([&latch, &runner, name]() -> void {SetCurrentThreadName(name);// 创建 MessageLoopfml::MessageLoop::EnsureInitializedForCurrentThread();auto& loop = MessageLoop::GetCurrent();// 获取 TaskRunnerrunner = loop.GetTaskRunner();latch.Signal();loop.Run();});latch.Wait();task_runner_ = runner;
}
MessageLoop 创建好后,我们就可以通过 TaskRunner 向其发送 Task 了,这里需要注意 MessageLoop 执行的 Task 仅是一个 无参的闭包 。类似这样:
auto jni_exit_task([key = thread_destruct_key_]() {FML_CHECK(pthread_setspecific(key, reinterpret_cast<void*>(1)) == 0);
});
thread_host_.ui_thread->GetTaskRunner()->PostTask(jni_exit_task);
2. 创建 Root Isolate
在 dart 语言中是没有线程的,而是使用类似于线程但相互之间不共享堆内存的 isolate,代表一个独立的 dart 程序执行环境。同样 Flutter 的 dart 代码是运行在一个叫 root isolate 的 isolate 中,下面简要列下 root isolate 的创建过程。
a.启动 dart vm
这个步骤的具体流程,大家可以顺着 /engine-v1.5.4/src/flutter/runtime/dart_vm.cc 中的 DartVM 构造方法去跟进分析。在 dart vm 启动过程中会创建 vm isolate 和 PortMap,这两个的具体作用下面有介绍。
b.创建 root isolate
root isolate 是在 UI 线程中创建的,具体流程见 /src/flutter/runtime/dart_isolate.cc 的 CreateRootIsolate 方法。由于 isolate 是对当前线程执行环境的一个抽象表示,所以其内部存储了很多信息,对于异步消息这块有四个关键的信息是需要注意的。
下面三个字段是定义在 dart vm 层的 Isolate 类中,具体见 /src/third_party/dart/runtime/vm/isolate.h。
main_port :可以看做该 isolate 的标识,是一个整数;
message_handler :顾名思义,用来管理每个 isolate 的 Event queue,其内部根据 message 的优先级将消息分为了两个队列:普通优先级的 queue 和 OOB 优先级的 oob_queue。了解 TCP 协议的应该了解 TCP 中有带外数据(即优先数据),isolate 的 OOB 优先级也是类似的意思,OOB 优先级的消息会被优先处理,目前看到有这么几种 OOB 消息:
// 这些主要和 isolate 的生命周期相关
kPauseMsg = 1,
kResumeMsg = 2,
kPingMsg = 3,
kKillMsg = 4,
kAddExitMsg = 5,
kDelExitMsg = 6,
kAddErrorMsg = 7,
kDelErrorMsg = 8,
kErrorFatalMsg = 9,
// 可以注意下 kLowMemoryMsg ,如果有大量 OOB 怀疑是内存不够了
kInterruptMsg = 10, // Break in the debugger.
kInternalKillMsg = 11, // Like kill, but does not run exit listeners, etc.
kLowMemoryMsg = 12, // Run compactor, etc.
kDrainServiceExtensionsMsg = 13, // Invoke pending service extensions
message_notify_callback :message_handler 收到消息后会调用该变量指向的函数去处理;
Flutter 引擎层会对 dart vm 的 isolate 实例做一层包装( DartIsolate 类),其内部定义了:
microtask_queue: 用来存储 microtask 消息,可见在 Flutter 引擎中,Microtask queue 并不是由 dart vm 层来管理的
前面已经说过 isolate 之间是不能直接互相访问,如图:
可以看出 isolate 之间是共享 vm isolate 的堆内存区域的,有点类似于操作系统的内核空间,vm isolate 的堆内存存储了 dart vm 内部的核心数据(内置类,内置对象)。除了 vm isolate,不同 isolate 之间的堆内存是不能直接访问的,为此 dart vm 提供了 isolate 之间的通信机制,负责通信路由的大管家就是 PortMap,其内部实现就是一张 Hash 表,Key 为 isolate 的 main_port,Value 为 isolate 的 message_handler。
三、创建 Future
使用 dart 开发一定会用到 Future,当我们通过 Future 构造方法创建一个实例的时候,就会创建一个定时器消息到 Event queue,下面我们将分析这个流程。整体架构图:
1. dart 层创建 Future
创建 Future 的时候,内部会通过 Timer 构造一个定时器,具体代码如下:
/src/out/host_release/dart-sdk/lib/async/future.dart
factory Future(FutureOr<T> computation()) {_Future<T> result = new _Future<T>();Timer.run(() {try {result._complete(computation());} catch (e, s) {_completeWithErrorCallback(result, e, s);}});return result;
}
跟进 Timer 的实现,具体代码如下:
/src/out/host_release/dart-sdk/lib/async/timer.dart
static void run(void callback()) {new Timer(Duration.zero, callback);
}factory Timer(Duration duration, void callback()) {if (Zone.current == Zone.root) {return Zone.current.createTimer(duration, callback);}
......
}
这里假定 duration == Duration.zero,Zone.current == Zone.root ,进而到 rootZone 的 createTimer 方法,里面又调用了 Timer 的 _createTimer 方法:
/src/out/host_release/dart-sdk/lib/async/zone.dart
class _RootZone extends _Zone {
......Timer createTimer(Duration duration, void f()) {return Timer._createTimer(duration, f);}
......
}
/src/out/host_release/dart-sdk/lib/async/timer.dart
external static Timer _createTimer(Duration duration, void callback());
可以看到 _createTimer 方法是个 external 的,按照 dart 语言的规范,external 方法的实现都在对应的 patch 文件中( timer_patch.dart),内部通过 _TimerFactory._factory 来创建 Timer,具体代码如下:
/src/third_party/dart/runtime/lib/timer_patch.dart
@patch
class Timer {
......@patchstatic Timer _createTimer(Duration duration, void callback()) {if (_TimerFactory._factory == null) {_TimerFactory._factory = VMLibraryHooks.timerFactory;}
......int milliseconds = duration.inMilliseconds;if (milliseconds < 0) milliseconds = 0;// 注意此处将外部的 callback 又包了一层return _TimerFactory._factory(milliseconds, (_) {callback();}, false);}
......
}
通过上面的代码,我们知道 _TimerFactory._factory = VMLibraryHooks.timerFactory, VMLibraryHooks.timerFactory 又是在 root isolate 初始化时通过调用 _setupHooks 方法设置的,具体代码如下:
/src/third_party/dart/runtime/lib/timer_impl.dart
@pragma("vm:entry-point", "call")
_setupHooks() {VMLibraryHooks.timerFactory = _Timer._factory;
}
// VMLibraryHooks.timerFactory 指向的该方法
// 我们假设创建的是非 repeating 消息,并且 milliSeconds 为 0
static Timer _factory(int milliSeconds, void callback(Timer timer), bool repeating) {
......return new _Timer(milliSeconds, callback);}
}factory _Timer(int milliSeconds, void callback(Timer timer)) {return _createTimer(callback, milliSeconds, false);
}
// 创建一个 Timer 实例并调用 _enqueue 将其加入到队列
static Timer _createTimer(void callback(Timer timer), int milliSeconds, bool repeating) {
......_Timer timer =new _Timer._internal(callback, wakeupTime, milliSeconds, repeating);timer._enqueue();return timer;
}_Timer._internal(this._callback, this._wakeupTime, this._milliSeconds, this._repeating): _id = _nextId();// 这里 _milliSeconds == 0,会向 ZeroTimer 队列插入消息,然后调用 _notifyZeroHandler
void _enqueue() {if (_milliSeconds == 0) {if (_firstZeroTimer == null) {_lastZeroTimer = this;_firstZeroTimer = this;} else {_lastZeroTimer._indexOrNext = this;_lastZeroTimer = this;}// Every zero timer gets its own event._notifyZeroHandler();} else {......// 延迟消息这里先不分析}
}
折腾了一大圈,最后只是构造了一个 _Timer 实例并把其加入到 ZeroTimer 队列中,如果是延迟消息则会加入到 TimeoutTimerHeap 中,最后调用 _notifyZeroHandler 方法, 其主要做如下操作:
创建 RawReceivePort 并设置一个叫 _handleMessage 方法做为引擎层的回调方法
向引擎层 Event queue 发送一个普通优先级的 _ZERO_EVENT ,引擎层处理该消息的时候会最终回调到上面设置的 _handleMessage 方法。
具体代码如下:
/src/third_party/dart/runtime/lib/timer_impl.dart
static void _notifyZeroHandler() {if (_sendPort == null) {_createTimerHandler();}
// 底层会调到 PortMap 的 PostMessage 方法,进而唤醒消息处理,后面会分析这个流程_sendPort.send(_ZERO_EVENT);
}
// 创建和引擎层通信的 RawReceivePort,并设置引擎层的回调方法 _handleMessage
static void _createTimerHandler() {assert(_receivePort == null);assert(_sendPort == null);_receivePort = new RawReceivePort(_handleMessage);_sendPort = _receivePort.sendPort;_scheduledWakeupTime = null;
}
/src/third_party/dart/runtime/lib/isolate_patch.dart
@patch
class RawReceivePort {@patchfactory RawReceivePort([Function handler]) {_RawReceivePortImpl result = new _RawReceivePortImpl();result.handler = handler;return result;}
}// 最终将回调设置到 _RawReceivePortImpl 的 _handlerMap 中,引擎层会从这个 map 寻找消息的 handler
@pragma("vm:entry-point")
class _RawReceivePortImpl implements RawReceivePort {void set handler(Function value) {_handlerMap[this._get_id()] = value;}
}
_handleMessage 回调方法会收集 Timer 并执行,具体代码实现如下:
/src/third_party/dart/runtime/lib/timer_impl.dart
static void _handleMessage(msg) {var pendingTimers;if (msg == _ZERO_EVENT) {// 找到所有的待处理 TimerspendingTimers = _queueFromZeroEvent();assert(pendingTimers.length > 0);} else {......// 延时消息这里不分析}
// 处理Timer,即调用设置的 callback_runTimers(pendingTimers);
......
}
2. 向 Event Queue 发送消息
前面说到 RawReceiverPort 会向引擎层 Event queue 发送一个 _ZERO_EVENT ,其内部是通过调用 PortMap 的 PostMessage 方法将消息发送到 Event queue,该方法首先会根据接收方的 port id 找到对应的 message_handler,然后将消息根据优先级保存到相应的 queue 中,最后唤醒 message_notify_callback 回调函数 ,具体代码如下:
/src/third_party/dart/runtime/vm/port.cc
bool PortMap::PostMessage(Message* message, bool before_events) {......intptr_t index = FindPort(message->dest_port());......MessageHandler* handler = map_[index].handler;......handler->PostMessage(message, before_events);return true;
}
/src/third_party/dart/runtime/vm/message_handler.cc
void MessageHandler::PostMessage(Message* message, bool before_events) {Message::Priority saved_priority;bool task_running = true;......// 根据消息优先级进入不同的队列if (message->IsOOB()) {oob_queue_->Enqueue(message, before_events);} else {queue_->Enqueue(message, before_events);}......
//唤醒并处理消息MessageNotify(saved_priority);
}
/src/third_party/dart/runtime/vm/isolate.cc
void IsolateMessageHandler::MessageNotify(Message::Priority priority) {if (priority >= Message::kOOBPriority) {I->ScheduleInterrupts(Thread::kMessageInterrupt);}
// 最后调用的 message_notify_callback 所指向的函数Dart_MessageNotifyCallback callback = I->message_notify_callback();if (callback) {(*callback)(Api::CastIsolate(I));}
}
3. Event Queue 消息处理
前面消息已经发送成功并调用了消息处理唤醒的操作,下面我们需要知道 message_notify_callback 所指向的函数的实现, root isolate 在初始化时会设置该变量,具体代码如下:
/src/flutter/runtime/dart_isolate.cc
bool DartIsolate::Initialize(Dart_Isolate dart_isolate, bool is_root_isolate) {......// 设置 message handler 的 task runner 为 UI Task RunnerSetMessageHandlingTaskRunner(GetTaskRunners().GetUITaskRunner(),is_root_isolate);......return true;
}
void DartIsolate::SetMessageHandlingTaskRunner(fml::RefPtr<fml::TaskRunner> runner,bool is_root_isolate) {......message_handler().Initialize([runner](std::function<void()> task) { runner->PostTask(task); });
}
进一步跟进分析发现通过 Dart_SetMessageNotifyCallback 将 root isolate 的 message_notify_callback 设置为 MessageNotifyCallback 方法,具体代码如下:
/src/third_party/tonic/dart_message_handler.cc
void DartMessageHandler::Initialize(TaskDispatcher dispatcher) {TONIC_CHECK(!task_dispatcher_ && dispatcher);task_dispatcher_ = dispatcher;Dart_SetMessageNotifyCallback(MessageNotifyCallback);
}
MessageNotifyCallback 会在 Event queue 收到消息后执行,但其执行过程中并没有拿到 Event queue 中的消息,而是往 UI Thread 的 MessageLoop Post 了一个 Task 闭包,这个 Task 闭包会通过调用 Dart_HandleMessage 来处理 Event queue 中的消息,具体代码流程如下:
/src/third_party/tonic/dart_message_handler.cc
void DartMessageHandler::MessageNotifyCallback(Dart_Isolate dest_isolate) {auto dart_state = DartState::From(dest_isolate);TONIC_CHECK(dart_state);dart_state->message_handler().OnMessage(dart_state);
}void DartMessageHandler::OnMessage(DartState* dart_state) {auto task_dispatcher_ = dart_state->message_handler().task_dispatcher_;// 往 ui 线程 MessageLoop Post 了一个 Taskauto weak_dart_state = dart_state->GetWeakPtr();task_dispatcher_([weak_dart_state]() {if (auto dart_state = weak_dart_state.lock()) {dart_state->message_handler().OnHandleMessage(dart_state.get());}});
}void DartMessageHandler::OnHandleMessage(DartState* dart_state) {......if (Dart_IsPausedOnStart()) {......} else if (Dart_IsPausedOnExit()) {......} else {// 调用 Dart_HandleMessage 方法处理消息result = Dart_HandleMessage();......}......
}
Dart_HandleMessage 的实现很简单,只是调用 message_handler 的 HandleNextMessage 方法,具体代码实现如下:
/src/third_party/dart/runtime/vm/dart_api_impl.cc
DART_EXPORT Dart_Handle Dart_HandleMessage() {
......if (I->message_handler()->HandleNextMessage() != MessageHandler::kOK) {return Api::NewHandle(T, T->StealStickyError());}return Api::Success();
}
我们进一步跟进 HandleNextMessage 方法的实现,最终来到如下代码:
/src/third_party/dart/runtime/vm/message_handler.cc
// 依次遍历 message_handler 的消息队列,对每个消息进程处理
MessageHandler::MessageStatus MessageHandler::HandleMessages(MonitorLocker* ml,bool allow_normal_messages,bool allow_multiple_normal_messages) {
......Message* message = DequeueMessage(min_priority);while (message != NULL) {......MessageStatus status = HandleMessage(message);......message = DequeueMessage(min_priority);}return max_status;
}// 取消息的时候会优先处理 OOB Message
Message* MessageHandler::DequeueMessage(Message::Priority min_priority) {Message* message = oob_queue_->Dequeue();if ((message == NULL) && (min_priority < Message::kOOBPriority)) {message = queue_->Dequeue();}return message;
}
每个消息的处理都是在 HandleMessage 方法中,该方法会根据不同的消息优先级做相应的处理,具体代码如下:
/src/third_party/dart/runtime/vm/isolate.cc
MessageHandler::MessageStatus IsolateMessageHandler::HandleMessage(Message* message) {
......Object& msg_handler = Object::Handle(zone);
// 非 OOB 消息,需要获取 dart 层的 handler 函数if (!message->IsOOB() && (message->dest_port() != Message::kIllegalPort)) {msg_handler = DartLibraryCalls::LookupHandler(message->dest_port());......}
......MessageStatus status = kOK;if (message->IsOOB()) {// 处理 OOB 消息,详细实现可自己看代码,这里不分析OOB消息......} else if (message->dest_port() == Message::kIllegalPort) {......} else {......// 调用前面找到的 msg_handler 来处理普通消息const Object& result =Object::Handle(zone, DartLibraryCalls::HandleMessage(msg_handler, msg));......}delete message;return status;
}
这里我们主要看普通消息的处理逻辑,首先会通过调用 DartLibraryCalls::LookupHandler 方法来从 dart 层寻找相应的 handler 函数,然后通过 DartLibraryCalls::HandleMessage 执行相应的处理函数,具体实现代码如下:
/src/third_party/dart/runtime/vm/dart_entry.cc
RawObject* DartLibraryCalls::LookupHandler(Dart_Port port_id) {Thread* thread = Thread::Current();Zone* zone = thread->zone();Function& function = Function::Handle(zone, thread->isolate()->object_store()->lookup_port_handler());const int kTypeArgsLen = 0;const int kNumArguments = 1;
// 如果没有消息处理方法,则进行查找,最终找到的是 RawReceivePortImpl 的 _lookupHandler 方法。if (function.IsNull()) {Library& isolate_lib = Library::Handle(zone, Library::IsolateLibrary());ASSERT(!isolate_lib.IsNull());const String& class_name = String::Handle(zone, isolate_lib.PrivateName(Symbols::_RawReceivePortImpl()));const String& function_name = String::Handle(zone, isolate_lib.PrivateName(Symbols::_lookupHandler()));function = Resolver::ResolveStatic(isolate_lib, class_name, function_name,kTypeArgsLen, kNumArguments,Object::empty_array());ASSERT(!function.IsNull());thread->isolate()->object_store()->set_lookup_port_handler(function);}
// 执行消息处理函数const Array& args = Array::Handle(zone, Array::New(kNumArguments));args.SetAt(0, Integer::Handle(zone, Integer::New(port_id)));const Object& result =Object::Handle(zone, DartEntry::InvokeFunction(function, args));return result.raw();
}
最终执行的是 RawReceivePortImpl 的 _lookupHandler 方法,在前面在创建 Future 的时候我们已经设置 _handleMessage 到 _handlerMap 中,_lookupHandler 方法会从 _handlerMap 中找到设置的回调方法,最后执行回调方法。具体代码如下:
/src/third_party/dart/runtime/lib/isolate_patch.dart
@pragma("vm:entry-point")
class _RawReceivePortImpl implements RawReceivePort {......// Called from the VM to retrieve the handler for a message.@pragma("vm:entry-point", "call")static _lookupHandler(int id) {var result = _handlerMap[id];return result;}
......
}
Future 的创建到这就分析完了,整个过程涉及到了 EventQueue 的消息收发。
至此,通过分析 Future 我们已经把 Event Queue 的消息处理流程了解了。
四、Microtask
1. 向 Microtask queue 发送消息
假设 Zone. current 为 rootZone, 直接看 scheduleMicrotask 方法的实现:
/src/third_party/dart/sdk/lib/async/schedule_microtask.dart
void scheduleMicrotask(void callback()) {_Zone currentZone = Zone.current;if (identical(_rootZone, currentZone)) {_rootScheduleMicrotask(null, null, _rootZone, callback);return;}
......
}
跟进 _rootScheduleMicrotask 方法的实现,最终来到 _scheduleAsyncCallback 方法,该方法做了两件事情:
将传入的 callback 加入 callback 队列
将 _startMicrotaskLoop 作为闭包参数调用 _AsyncRun._scheduleImmediate 方法,_startMicrotaskLoop 中会依次执行 callback 队列保存的回调。
具体代码如下:
/src/third_party/dart/sdk/lib/async/schedule_microtask.dart
void _scheduleAsyncCallback(_AsyncCallback callback) {_AsyncCallbackEntry newEntry = new _AsyncCallbackEntry(callback);if (_nextCallback == null) {_nextCallback = _lastCallback = newEntry;if (!_isInCallbackLoop) {_AsyncRun._scheduleImmediate(_startMicrotaskLoop);}} else {_lastCallback.next = newEntry;_lastCallback = newEntry;}
}
// 该方法被作为回调设置到引擎,会在处理所有的 Microtask 的时候执行
void _startMicrotaskLoop() {_isInCallbackLoop = true;try {_microtaskLoop();} finally {_lastPriorityCallback = null;_isInCallbackLoop = false;if (_nextCallback != null) {_AsyncRun._scheduleImmediate(_startMicrotaskLoop);}}
}class _AsyncRun {external static void _scheduleImmediate(void callback());
}
根据前面的经验,会在对应的 patch 文件中找到 _AsyncRun._scheduleImmediate 的实现,其内部调用了 _ScheduleImmediate._closure 指向的方法。
具体代码如下:
/src/third_party/dart/runtime/lib/schedule_microtask_patch.dart
@patch
class _AsyncRun {@patchstatic void _scheduleImmediate(void callback()) {if (_ScheduleImmediate._closure == null) {throw new UnsupportedError("Microtasks are not supported");}_ScheduleImmediate._closure(callback);}
}
// 通过该方法设置 _ScheduleImmediate._closure
@pragma("vm:entry-point", "call")
void _setScheduleImmediateClosure(_ScheduleImmediateClosure closure) {_ScheduleImmediate._closure = closure;
}
那么 _ScheduleImmediate._closure 指向的是什么呢?我们需要找到 _setScheduleImmediateClosure 的调用方。root isolate 初始化时会执行一系列的 vm hook 调用,我们从中找到了 _setScheduleImmediateClosure 的调用,具体代码如下:
/src/flutter/lib/ui/dart_runtime_hooks.cc
static void InitDartAsync(Dart_Handle builtin_library, bool is_ui_isolate) {Dart_Handle schedule_microtask;if (is_ui_isolate) {
// 这里的 builtin_library 是 Flutter 扩展的 ui libraryschedule_microtask =GetFunction(builtin_library, "_getScheduleMicrotaskClosure");} else {......}Dart_Handle async_library = Dart_LookupLibrary(ToDart("dart:async"));Dart_Handle set_schedule_microtask = ToDart("_setScheduleImmediateClosure");Dart_Handle result = Dart_Invoke(async_library, set_schedule_microtask, 1,&schedule_microtask);PropagateIfError(result);
}
进一步跟进,最终找到了 _ScheduleImmediate._closure 指向的方法,是一个 native 实现的函数,具体代码如下:
/src/flutter/lib/ui/natives.dart
Function _getScheduleMicrotaskClosure() => _scheduleMicrotask; void _scheduleMicrotask(void callback()) native 'ScheduleMicrotask';
跟进 _scheduleMicrotask 的 native 实现,发现其会把传入的 _startMicrotaskLoop 方法加入到底层的Microtask queue,具体代码如下:
/src/flutter/lib/ui/dart_runtime_hooks.cc
void ScheduleMicrotask(Dart_NativeArguments args) {Dart_Handle closure = Dart_GetNativeArgument(args, 0);UIDartState::Current()->ScheduleMicrotask(closure);
}
/src/flutter/lib/ui/ui_dart_state.cc
void UIDartState::ScheduleMicrotask(Dart_Handle closure) {if (tonic::LogIfError(closure) || !Dart_IsClosure(closure)) {return;}microtask_queue_.ScheduleMicrotask(closure);
}
2. Microtask queue 消息处理
前面已经将 _startMicrotaskLoop 方法加入到了 Microtask queue ,那么 Microtask queue 内的方法何时执行呢?我们通过跟进 Microtask queue 的 RunMicrotasks 方法的调用方,最终找到 Microtask queue 内方法的执行时机 FlushMicrotasksNow,具体代码如下:
/src/flutter/lib/ui/ui_dart_state.cc
void UIDartState::FlushMicrotasksNow() {microtask_queue_.RunMicrotasks();
}
再跟进 FlushMicrotasksNow 方法的调用方,发现有两处调用:
这里是在每一帧开始的时候去执行 Microtask
/src/flutter/lib/ui/window/window.cc
void Window::BeginFrame(fml::TimePoint frameTime) {
......UIDartState::Current()->FlushMicrotasksNow();
......
}
另外一处调用是通过 TaskObserve 的形式,具体代码如下:
/src/flutter/lib/ui/ui_dart_state.cc
void UIDartState::AddOrRemoveTaskObserver(bool add) {
......if (add) {
// 这个 add_callback_ 是啥呢?add_callback_(reinterpret_cast<intptr_t>(this),[this]() { this->FlushMicrotasksNow(); });} else {remove_callback_(reinterpret_cast<intptr_t>(this));}
}
跟进 add_callback_ 的赋值,这里是android的实现
/src/flutter/shell/platform/android/flutter_main.cc
void FlutterMain::Init(JNIEnv* env,jclass clazz,jobject context,jobjectArray jargs,jstring bundlePath,jstring appStoragePath,jstring engineCachesPath) {......settings.task_observer_add = [](intptr_t key, fml::closure callback) {fml::MessageLoop::GetCurrent().AddTaskObserver(key, std::move(callback));};
......
}
FlushMicrotasksNow() 是作为 MessageLoop 的 TaskObserver 来执行的, TaskObserver 会在处理完task之后把该 Task 创建的 MicroTask 全部执行,也就是说在下一个 Task 运行前执行。代码如下:
/src/flutter/fml/message_loop_impl.cc
void MessageLoopImpl::FlushTasks(FlushType type) {
......for (const auto& invocation : invocations) {invocation();for (const auto& observer : task_observers_) {observer.second();}}
}
五、结束
通过前面的分析,Flutter 的异步消息处理流程还是挺复杂的,主要是代码写的比较乱,跳转层次太多,可以通过对整个流程的掌控来寻找UI 线程的优化及监控点,进而降低 UI 线程的处理时间,希望本篇文章让大家对 Flutter 的异步消息的整体处理流程有更深的理解。
推荐阅读你们吹捧的鸿蒙,只是另一个FuchsiaAndroid仿微信QQ图片裁剪互联网 HR 黑话大全,太真实了!
编程·思维·职场
欢迎扫码关注
探索 Flutter 异步消息的实现相关推荐
- pthread异步_探索 Flutter 异步消息的实现
本文作者:赵旭阳 字节跳动资深工程师 一.简介 我们在进行 Android 开发的时候,会通过创建一个 Handler 并调用其 sendMessage 或 Post 方法来进行异步消息调用,其背后 ...
- 电商异步消息系统的实践
声明:本文为<程序员>7月期原创投稿文章,未经许可禁止任何形式的转载. 作者:王晓宇,小米网平台研发部软件研发工程师.2015年入职小米,主要负责电商后端仓储物流相关的业务系统开发.曾在西 ...
- EJB与JAVA BEAN_J2EE的异步消息机制
EJB与JAVA BEAN_J2EE的异步消息机制 EJB与JAVA BEAN的区别 Java Bean 是可复用的组件,对Java Bean并没有严格的规范,理论上讲,任何一个Java类都可以是一个 ...
- Android异步消息机制
2019独角兽企业重金招聘Python工程师标准>>> 目录介绍 1.Handler的常见的使用方式 2.如何在子线程中定义Handler 3.主线程如何自动调用Looper.pre ...
- kafka之Producer同步与异步消息发送及事务幂等性案例应用实战
本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客.版权声明:本套Spark商业应用实战归作者(秦凯新)所有,禁止转载,欢迎学习. 秦凯新的技术社区 ...
- 异步消息的传递-回调机制
1 什么是回调 软件模块之间总是存在着一定的接口,从调用方式上,可以把他们分为三类:同步调用.回调和异步调用.同步调用是一种阻塞式调用,调用方要等待对方执行完毕才返回,它是一种单向调用:回调是一种双向 ...
- 八.利用springAMQP实现异步消息队列的日志管理
经过前段时间的学习和铺垫,已经对spring amqp有了大概的了解.俗话说学以致用,今天就利用springAMQP来完成一个日志管理模块.大概的需求是这样的:系统中有很多地方需要记录操作日志,比如登 ...
- C#实现异步消息队列
C#实现异步消息队列 原文:C#实现异步消息队列 拿到新书<.net框架设计>,到手之后迅速读了好多,虽然这本书不像很多教程一样从头到尾系统的讲明一些知识,但是从项目实战角度告诉我们如何使 ...
- 微服务中的异步消息通讯
前言 在上一篇文章中,我们说到了异步消息通讯,下面这篇文章呢,大部分内容是翻译来自于这篇微软的文章,所以其内容还是具有一定的理论指导意义的. 当我们跨多个微服务进行内部通讯的时候,异步消息和事件驱动至 ...
最新文章
- Udacity机器人软件工程师课程笔记(三十五) - SLAM - 基于网格的FastSLAM
- python argv,Python argv函数简介
- cnpm install -g @vue/cli
- mybatis Resultmap 与 ResultType 区别
- 服务器高并发下出现大量的time wait的解决办法
- 【Flink】Flink NoSuchFieldError BIND_PORT
- 为什么打完篮球后手接触篮球杆会有触电感?
- Ubuntu 安装Jenkins报错
- plsa java代码_LDA主题聚类学习小结
- PHP文件系统-文件的读写操作
- clang命令编译c++程序时报错
- COM 组件实现事件、通知
- js 计算 往前(后)几天(月、年)
- 御剑飞行扫描后门加上burpsuite字典树爆破
- java自动化高频面试题
- 振铃效应与样点自适应补偿(Sample Adaptive Offset,SAO)技术
- 为什么培训班出来的程序员总遭人嫌弃?
- 开源房产中介管理系统
- android自定义素材拼图,众望所归 美图秀秀Android拼图隆重上线
- 经纬度查地址与地址查经纬度
热门文章
- 域名可以过户吗?域名过户需要多久?
- 信用卡透支忘还钱怎么办?
- android魅族升级,Android8.0快来了,魅族手机终于要升级 7.0了!
- 3-2加法器、4-2压缩器、5-2压缩器
- Android从相册中选取图片上传到阿里云OSS
- 如何成为一名懒惰的系统管理员
- 红米Note-4G双卡移动版线刷兼救砖_解账户锁_纯净刷机包_教程
- Generalizing to Unseen Domains via Adversarial Data Augmentation 正文
- wireshark--工具使用记录----TCP acked unseen segment
- 俄罗斯央行:犯罪分子很少使用加密货币来回笼资金