NodeJS多线程通信和共享内存 - worker_threads
代码 node 版本:v14.8.0
# 基础多线程通信
master thread 中,Worker
返回的对象,代表着工作线程实例。
worker thread 中,require('worker_threads').parentPort
就是 master thread 的 MessagePort。
利用上面说到的两个对象,可以实现主线程和工作线程之间的“双工通信”。
const { Worker, isMainThread, parentPort } = require("worker_threads");
// 使用parentPort和worker进行双工通信
if (isMainThread) {
const worker = new Worker(__filename);
// 1. 针对工作线程(worker)开启消息监听
worker.on("message", message => console.log(message));
// 2. 像工作线程(worker)发送消息:"ping"
worker.postMessage("ping");
} else {
// 3. 针对主线程(parentPort)开启消息监听
parentPort.on("message", message =>
// 4. 接收到主线程的消息后,将其放入 pong 字段,并且发送给主线程(parentPort)
parentPort.postMessage({ pong: message })
);
}
上面输出:
{
pong: "ping";
}
// handling
题外话
可以看到,node 的设计参考了浏览器的 Web Worker
# 通信管道通信
通过通信管道(MessageChannel),可以跨任何线程(比如多个工作线程之间)进行数据通信。上面通过require('worker_threads').parentPort
的方式,缺点是只能在主线程和工作线程之间进行通信。
管道通信的思路:
- 通过
MessageChannel
创建通信管道 - 通过
postMessage
传递数据- 第一个参数是传递的数据,包括通信管道的
Messageport
- 第二个参数是 transformList,放入其中的对象,将在管道发送端中无法使用
- 第一个参数是传递的数据,包括通信管道的
来看一段通过通信管道进行通信的代码:
const assert = require("assert");
const {
Worker,
MessageChannel,
MessagePort,
isMainThread,
parentPort
} = require("worker_threads");
if (isMainThread) {
const worker = new Worker(__filename);
// 通信管道,可以进行更灵活的数据传递
// 不局限于父子线程,直接跨任何线程
const subChannel = new MessageChannel();
const uint8Array = new Uint8Array([1, 2, 3, 4]);
worker.postMessage(
{ hereIsYourPort: subChannel.port1, uint8Arr: uint8Array },
[subChannel.port1, uint8Array.buffer]
);
// transList中的对象在发送端是不可用的
// 如果想共享内存,那么需要sharedArray,并且不能放入transList(保证可用)
subChannel.port2.on("message", value => {
subChannel.port1.postMessage("helo");
console.log("[master] 接收到:", value);
console.log("[master]", uint8Array);
});
} else {
parentPort.once("message", value => {
assert(value.hereIsYourPort instanceof MessagePort);
value.uint8Arr[1] = 10;
console.log(value.uint8Arr);
value.hereIsYourPort.postMessage("工作线程的消息");
value.hereIsYourPort.close();
});
}
代码的输出是:
Uint8Array(4) [ 1, 10, 3, 4 ]
[master] 接收到: 工作线程的消息
[master] Uint8Array(0) []
当 master thread 接受到 port2 的消息后,尝试打印了 uint8Array
对象,发现对象为空。也就是上面说到的,放入第二个参数的对象,将在管道发送端不可用。
思考:如果第二个参数中,不包含`uint8Array.buffer`呢?
这种情况下,管道发送端依然可用uint8Array.buffer
;但是会进行一次“深拷贝”,将其拷贝到管道接收端(工作线程)。
# 通过共享内存进行通信
是的,nodejs 多线程也可以通过“共享内存”进行通信。master 和 worker(s)都操作同一段物理存储上的数据,实现数据共享,并且避免了拷贝带来的开销。
共享内存通信的思路:
- 通过
SharedArrayBuffer
创建二进制数据 SharedArrayBuffer
数据不能放在postMessage
的第二个参数中
来看一段通过共享内存进行通信的代码:
const assert = require("assert");
const {
Worker,
MessageChannel,
MessagePort,
isMainThread,
parentPort
} = require("worker_threads");
if (isMainThread) {
const worker = new Worker(__filename);
const subChannel = new MessageChannel();
// 共享内存
const sharedArray = new SharedArrayBuffer(4);
const uint8Arr = new Uint8Array(sharedArray);
console.log("[master] 原来的uint8Arr", uint8Arr);
worker.postMessage({ hereIsYourPort: subChannel.port1, uint8Arr }, [
subChannel.port1
]);
subChannel.port2.on("message", value => {
subChannel.port1.postMessage("helo");
console.log("[master] 接收到:" + value);
console.log("[master] 经过工作线程修改的uint8Arr", uint8Arr);
});
} else {
parentPort.once("message", value => {
assert(value.hereIsYourPort instanceof MessagePort);
value.uint8Arr[1] = 10;
console.log(value.uint8Arr);
value.hereIsYourPort.postMessage("工作线程消息");
value.hereIsYourPort.close();
});
}
上面代码的输出是:
[master] 原来的uint8Arr Uint8Array(4) [ 0, 0, 0, 0 ]
Uint8Array(4) [ 0, 10, 0, 0 ]
[master] 接收到:工作线程消息
[master] 经过工作线程修改的uint8Arr Uint8Array(4) [ 0, 10, 0, 0 ]
可以看到,worker thread 修改了 uint8Arr,由于采用共享内存,因此在 master thread 中,也可以读取到修改后的 uint8Arr。