文章目录加载中

NodeJS多线程通信和共享内存 - worker_threads

代码 node 版本:v14.8.0

# 基础多线程通信

master thread 中,Worker 返回的对象,代表着工作线程实例。

worker thread 中,require('worker_threads').parentPort 就是 master thread 的 MessagePort。

利用上面说到的两个对象,可以实现主线程和工作线程之间的“双工通信”。

const { Worker, isMainThread, parentPort } = require("worker_threads");

// 使用parentPort和worker进行双工通信
if (isMainThread) {
  const worker = new Worker(__filename);
  // 1. 针对工作线程(worker)开启消息监听
  worker.on("message", message => console.log(message));
  // 2. 像工作线程(worker)发送消息:"ping"
  worker.postMessage("ping");
} else {
  // 3. 针对主线程(parentPort)开启消息监听
  parentPort.on("message", message =>
    // 4. 接收到主线程的消息后,将其放入 pong 字段,并且发送给主线程(parentPort)
    parentPort.postMessage({ pong: message })
  );
}

上面输出:

{
  pong: "ping";
}
// handling

题外话

可以看到,node 的设计参考了浏览器的 Web Worker

# 通信管道通信

通过通信管道(MessageChannel),可以跨任何线程(比如多个工作线程之间)进行数据通信。上面通过require('worker_threads').parentPort 的方式,缺点是只能在主线程和工作线程之间进行通信。

管道通信的思路:

  • 通过 MessageChannel 创建通信管道
  • 通过 postMessage 传递数据
    • 第一个参数是传递的数据,包括通信管道的Messageport
    • 第二个参数是 transformList,放入其中的对象,将在管道发送端中无法使用

来看一段通过通信管道进行通信的代码:

const assert = require("assert");
const {
  Worker,
  MessageChannel,
  MessagePort,
  isMainThread,
  parentPort
} = require("worker_threads");
if (isMainThread) {
  const worker = new Worker(__filename);
  // 通信管道,可以进行更灵活的数据传递
  // 不局限于父子线程,直接跨任何线程
  const subChannel = new MessageChannel();
  const uint8Array = new Uint8Array([1, 2, 3, 4]);

  worker.postMessage(
    { hereIsYourPort: subChannel.port1, uint8Arr: uint8Array },
    [subChannel.port1, uint8Array.buffer]
  );
  // transList中的对象在发送端是不可用的
  // 如果想共享内存,那么需要sharedArray,并且不能放入transList(保证可用)
  subChannel.port2.on("message", value => {
    subChannel.port1.postMessage("helo");
    console.log("[master] 接收到:", value);
    console.log("[master]", uint8Array);
  });
} else {
  parentPort.once("message", value => {
    assert(value.hereIsYourPort instanceof MessagePort);
    value.uint8Arr[1] = 10;
    console.log(value.uint8Arr);
    value.hereIsYourPort.postMessage("工作线程的消息");
    value.hereIsYourPort.close();
  });
}

代码的输出是:

Uint8Array(4) [ 1, 10, 3, 4 ]
[master] 接收到: 工作线程的消息
[master] Uint8Array(0) []

当 master thread 接受到 port2 的消息后,尝试打印了 uint8Array 对象,发现对象为空。也就是上面说到的,放入第二个参数的对象,将在管道发送端不可用。

思考:如果第二个参数中,不包含`uint8Array.buffer`呢?

这种情况下,管道发送端依然可用uint8Array.buffer;但是会进行一次“深拷贝”,将其拷贝到管道接收端(工作线程)。

# 通过共享内存进行通信

是的,nodejs 多线程也可以通过“共享内存”进行通信。master 和 worker(s)都操作同一段物理存储上的数据,实现数据共享,并且避免了拷贝带来的开销。

共享内存通信的思路:

  • 通过 SharedArrayBuffer 创建二进制数据
  • SharedArrayBuffer 数据不能放在postMessage的第二个参数中

来看一段通过共享内存进行通信的代码:

const assert = require("assert");
const {
  Worker,
  MessageChannel,
  MessagePort,
  isMainThread,
  parentPort
} = require("worker_threads");
if (isMainThread) {
  const worker = new Worker(__filename);
  const subChannel = new MessageChannel();
  // 共享内存
  const sharedArray = new SharedArrayBuffer(4);
  const uint8Arr = new Uint8Array(sharedArray);
  console.log("[master] 原来的uint8Arr", uint8Arr);

  worker.postMessage({ hereIsYourPort: subChannel.port1, uint8Arr }, [
    subChannel.port1
  ]);

  subChannel.port2.on("message", value => {
    subChannel.port1.postMessage("helo");
    console.log("[master] 接收到:" + value);
    console.log("[master] 经过工作线程修改的uint8Arr", uint8Arr);
  });
} else {
  parentPort.once("message", value => {
    assert(value.hereIsYourPort instanceof MessagePort);
    value.uint8Arr[1] = 10;
    console.log(value.uint8Arr);
    value.hereIsYourPort.postMessage("工作线程消息");
    value.hereIsYourPort.close();
  });
}

上面代码的输出是:

[master] 原来的uint8Arr Uint8Array(4) [ 0, 0, 0, 0 ]
Uint8Array(4) [ 0, 10, 0, 0 ]
[master] 接收到:工作线程消息
[master] 经过工作线程修改的uint8Arr Uint8Array(4) [ 0, 10, 0, 0 ]

可以看到,worker thread 修改了 uint8Arr,由于采用共享内存,因此在 master thread 中,也可以读取到修改后的 uint8Arr。

本文来自心谭博客:xin-tan.com,经常更新web和算法的文章笔记,前往github查看目录归纳:github.com/dongyuanxin/blog