下载安卓APP箭头
箭头给我发消息

客服QQ:3315713922

Node.js C++ 层的任务管理

作者:匿名     来源: 编程语言点击数:778发布时间: 2022-12-06 14:41:30

标签: Node.jsC++任务管理

大神带你学编程,欢迎选课

  我们都知道 Node.js 是基于事件循环来运行的,本质上是一个生产者 / 消费者模型,所以就少不了任务的管理机制,不过本文不是介绍事件循环中的任务管理,而是 C++ 层的任务管理。本文主要介绍 SetImmediate、SetImmediateThreadsafe、RequestInterrupt、AddCleanupHook 这四个 API 产生的任务。时间关系,随便写写,权当笔记。

  好久没更新了,今天写个笔记。

  我们都知道 Node.js 是基于事件循环来运行的,本质上是一个生产者 / 消费者模型,所以就少不了任务的管理机制,不过本文不是介绍事件循环中的任务管理,而是 C++ 层的任务管理。本文主要介绍 SetImmediate、SetImmediateThreadsafe、RequestInterrupt、AddCleanupHook 这四个 API 产生的任务。时间关系,随便写写,权当笔记。

  任务管理机制的初始化

  首先来看一下 Node.js 启动的过程中,和任务管理相关的逻辑。

  复制

  1.  uv_check_start(immediate_check_handle(), CheckImmediate)

  2.  uv_async_init(

  3.  event_loop(),

  4.  &task_queues_async_,

  5.  [](uv_async_t* async) {

  6.  Environment* env = ContainerOf(&Environment::task_queues_async_, async);

  7.  env->RunAndClearNativeImmediates();

  8.  })

  CheckImmediate 是在 check 阶段执行的函数,task_queues_async_ 则用于线程间通信,即当子线程往主线程提交任务时,通过 task_queues_async_ 通知主线程,然后主线程执行 uv_async_init 注册的回调。上面的代码就是消费者的逻辑。后面再详细分析里面的处理流程。

  提交任务

  接下来逐个看一下生产者的逻辑。

  复制

  1.  template

  2.  void Environment::SetImmediate(Fn&& cb, CallbackFlags::Flags flags) {

  3.  auto callback = native_immediates_.CreateCallback(std::move(cb), flags);

  4.  native_immediates_.Push(std::move(callback));

  5.  // ...

  6.  }

  SetImmediate 用于同线程的代码提交任务。

  复制

  1.  template

  2.  void Environment::SetImmediateThreadsafe(Fn&& cb, CallbackFlags::Flags flags) {

  3.  auto callback = native_immediates_threadsafe_.CreateCallback(

  4.  std::move(cb), flags);

  5.  {

  6.  Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);

  7.  native_immediates_threadsafe_.Push(std::move(callback));

  8.  if (task_queues_async_initialized_)

  9.  uv_async_send(&task_queues_async_);

  10.  }

  11.  }

  SetImmediateThreadsafe 用于子线程给主线程提交任务,所以需要加锁。

  复制

  1.  template

  2.  void Environment::RequestInterrupt(Fn&& cb) {

  3.  auto callback = native_immediates_interrupts_.CreateCallback(

  4.  std::move(cb), CallbackFlags::kRefed);

  5.  {

  6.  Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);

  7.  native_immediates_interrupts_.Push(std::move(callback));

  8.  if (task_queues_async_initialized_)

  9.  uv_async_send(&task_queues_async_);

  10.  }

  11.  RequestInterruptFromV8();

  12.  }

  RequestInterrupt 用于子线程给主线程提交代码,他和 SetImmediateThreadsafe 有一个很重要的区别是调用了 RequestInterruptFromV8。

  复制

  1.  void Environment::RequestInterruptFromV8() {

  2.  isolate()->RequestInterrupt([](Isolate* isolate, void* data) {

  3.  std::unique_ptr<environment*> env_ptr { static_cast<environment**>(data) };

  4.  Environment* env = *env_ptr;

  5.  env->RunAndClearInterrupts();

  6.  }, interrupt_data);

  7.  }

  RequestInterrupt 可以使得提交的代码在 JS 代码死循环时依然会被执行。接着看 AddCleanupHook。

  复制

  1.  void Environment::AddCleanupHook(CleanupQueue::Callback fn, void* arg) {

  2.  cleanup_queue_.Add(fn, arg);

  3.  }

  AddCleanupHook 用于注册线程退出前的回调。生产者的逻辑都比较简单,就是往任务队列里插入一个任务,如果是涉及到线程间的任务,则通知主线程。

  消费者

  接下来看一下消费者的逻辑,根据前面的分析可以知道,消费者有几个:CheckImmediate,task_queues_async_ 的处理函数、RequestInterrupt 注册的函数、退出前回调处理函数。先看 CheckImmediate。

  复制

  1.  void Environment::CheckImmediate(uv_check_t* handle) {

  2.  Environment* env = Environment::from_immediate_check_handle(handle);

  3.  env->RunAndClearNativeImmediates();

  4.  }

      5.

  6.  void Environment::RunAndClearNativeImmediates(bool only_refed) {

  7.  RunAndClearInterrupts();

      8.

  9.  auto drain_list = [&](NativeImmediateQueue* queue) {

  10.  while (auto head = queue->Shift()) {

  11.  head->Call(this);

  12.  }

  13.  return false;

  14.  };

  15.  while (drain_list(&native_immediates_)) {}

  16.  NativeImmediateQueue threadsafe_immediates;

  17.  if (native_immediates_threadsafe_.size() > 0) {

  18.  Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);

  19.  threadsafe_immediates.ConcatMove(std::move(native_immediates_threadsafe_));

  20.  }

  21.  while (drain_list(&threadsafe_immediates)) {}

  22.  }

      23.

  24.  void Environment::RunAndClearInterrupts() {

  25.  while (native_immediates_interrupts_.size() > 0) {

  26.  NativeImmediateQueue queue;

  27.  {

  28.  Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);

  29.  queue.ConcatMove(std::move(native_immediates_interrupts_));

  30.  }

  31.  while (auto head = queue.Shift())

  32.  head->Call(this);

  33.  }

  34.  }

  CheckImmediate 函数中处理了SetImmediate、SetImmediateThreadsafe 和 RequestInterrupt 产生的任务。但是如果主线程阻塞在 Poll IO 阶段时,只有子线程提交任务时会唤醒主线程,具体是通过 task_queues_async_ 结构体,看一下处理函数。

  复制

  1.  env->RunAndClearNativeImmediates();

  可以看到这时候也是处理了SetImmediate、SetImmediateThreadsafe 和 RequestInterrupt 产生的任务。最后来看一下处理退出前回调的函数,具体时机是 FreeEnvironment 函数中的 env->RunCleanup()。

  复制

  1.  void Environment::RunCleanup() {

  2.  RunAndClearNativeImmediates(true);

  3.  while (!cleanup_queue_.empty() || principal_realm_->HasCleanupHooks() ||

  4.  native_immediates_.size() > 0 ||

  5.  native_immediates_threadsafe_.size() > 0 ||

  6.  native_immediates_interrupts_.size() > 0) {

  7.  // 见 CleanupQueue::Drain

  8.  cleanup_queue_.Drain();

  9.  RunAndClearNativeImmediates(true);

  10.  }

  11.  }

      12.

  13.  // cleanup_queue_.Drain();

  14.  void CleanupQueue::Drain() {

  15.  std::vector callbacks(cleanup_hooks_.begin(),

  16.  cleanup_hooks_.end());

  17.  std::sort(callbacks.begin(),

  18.  callbacks.end(),

  19.  [](const CleanupHookCallback& a, const CleanupHookCallback& b) {

  20.  return a.insertion_order_counter_ > b.insertion_order_counter_;

  21.  });

      22.

  23.  for (const CleanupHookCallback& cb : callbacks) {

  24.  cb.fn_(cb.arg_);

  25.  cleanup_hooks_.erase(cb);

  26.  }

  27.  }

  RunCleanup 中同时处理了 SetImmediate、SetImmediateThreadsafe、 RequestInterrupt 产生的任务和注册的退出前回调。

  来源: 编程杂技

    >>>>>>点击进入编程语言专题

赞(9)
踩(0)
分享到:
华为认证网络工程师 HCIE直播课视频教程