Nodejs 提供了 cluster 来支持服务集群的扩展,提高多核 CPU 的利用效率,实现负载均衡,最大程度利用机器性能。本文从以下几个方面介绍 cluster 的 API 和用法:

  • cluster 启动 HTTP 服务器
  • 如何进行广播?
  • 如何实现状态共享?
  • 如何处理进程退出?
  • 更多进程控制方法:心跳保活、自动重启、负载检测

# cluster 启动 HTTP 服务器

为了方便测试,全局安装 autocannon:

npm install -g autocannon
1

不借助 cluster,编写一个简单的 http 服务器:

const http = require("http");
http.createServer((req, res) => {
    // 模拟cpu计算
    for (let i = 0; i < 100000; ++i) {}
    res.statusCode = 200;
    res.end("hello world!");
}).listen(4000);
1
2
3
4
5
6
7

借助 autocannon 开启 1000 个连接,每个连接的请求次数为 10 次,压测结果如下:

➜  _posts git:(master) ✗ autocannon -c 1000 -p 10 http://127.0.0.1:4000
Running 10s test @ http://127.0.0.1:4000
1000 connections with 10 pipelining factor
┌─────────┬──────┬──────┬────────┬────────┬──────────┬───────────┬────────────┐
│ Stat    │ 2.5% │ 50%  │ 97.5%  │ 99%    │ Avg      │ Stdev     │ Max        │
├─────────┼──────┼──────┼────────┼────────┼──────────┼───────────┼────────────┤
│ Latency │ 0 ms │ 0 ms │ 636 ms │ 650 ms │ 62.48 ms │ 197.51 ms │ 2928.64 ms │
└─────────┴──────┴──────┴────────┴────────┴──────────┴───────────┴────────────┘
┌───────────┬─────────┬─────────┬─────────┬─────────┬──────────┬────────┬─────────┐
│ Stat      │ 1%      │ 2.5%    │ 50%     │ 97.5%   │ Avg      │ Stdev  │ Min     │
├───────────┼─────────┼─────────┼─────────┼─────────┼──────────┼────────┼─────────┤
│ Req/Sec   │ 1309513095159111630315558.91901.4813092   │
├───────────┼─────────┼─────────┼─────────┼─────────┼──────────┼────────┼─────────┤
│ Bytes/Sec │ 1.47 MB │ 1.47 MB │ 1.78 MB │ 1.83 MB │ 1.74 MB  │ 101 kB │ 1.47 MB │
└───────────┴─────────┴─────────┴─────────┴─────────┴──────────┴────────┴─────────┘
Req/Bytes counts sampled once per second.
171k requests in 11.17s, 19.2 MB read
50 errors (0 timeouts)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

然后用 cluster 模块来启动一个利用多核的 http 服务器,代码如下:

const cluster = require("cluster");
const http = require("http");
const os = require("os");
if (cluster.isMaster) {
    const cpuNum = os.cpus().length;
    for (let i = 0; i < cpuNum; ++i) {
        cluster.fork();
    }
} else {
    runServer();
}
function runServer() {
    http.createServer((req, res) => {
        for (let i = 0; i < 100000; ++i) {}
        res.statusCode = 200;
        res.end("hello world!");
    }).listen(4000);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

同样利用 autocannon 进行测试,结果如下:

➜  _posts git:(master) ✗ autocannon -c 1000 -p 10 http://127.0.0.1:4000
Running 10s test @ http://127.0.0.1:4000
1000 connections with 10 pipelining factor
┌─────────┬──────┬──────┬────────┬────────┬─────────┬──────────┬──────────┐
│ Stat    │ 2.5% │ 50%  │ 97.5%  │ 99%    │ Avg     │ Stdev    │ Max      │
├─────────┼──────┼──────┼────────┼────────┼─────────┼──────────┼──────────┤
│ Latency │ 0 ms │ 0 ms │ 113 ms │ 125 ms │ 11.5 ms │ 37.37 ms │ 807.5 ms │
└─────────┴──────┴──────┴────────┴────────┴─────────┴──────────┴──────────┘
┌───────────┬────────┬────────┬─────────┬─────────┬─────────┬──────────┬────────┐
│ Stat      │ 1%     │ 2.5%   │ 50%     │ 97.5%   │ Avg     │ Stdev    │ Min    │
├───────────┼────────┼────────┼─────────┼─────────┼─────────┼──────────┼────────┤
│ Req/Sec   │ 43711437119702310867190811.216898.3443710  │
├───────────┼────────┼────────┼─────────┼─────────┼─────────┼──────────┼────────┤
│ Bytes/Sec │ 4.9 MB │ 4.9 MB │ 10.9 MB │ 12.2 MB │ 10.2 MB │ 1.89 MB  │ 4.9 MB │
└───────────┴────────┴────────┴─────────┴─────────┴─────────┴──────────┴────────┘
Req/Bytes counts sampled once per second.
908k requests in 10.7s, 102 MB read
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

可以看到,错误请求从 50 降低到 0,最长请求延迟从 2.9s 降低到了 0.8s,平均请求量从 1.5w 提升到了 9w,平均下载量从 1.74MB 提升到了 10.2MB。而本机的os.cpus().length返回的结果是 12,提升非常稳定,和 cpu 核数基本成正比。

从上面的实践也看到,从 cluster 开启的子进程总数量最好和 cpu 数量一样。

# 如何进行广播?

广播需要父子进程之间进行通信,多用于消息下发、数据共享。cluster 是基于 child_process 模块的,所以通信的做法和 child_process 区别不大。

在主进程中, cluster.workders 是个哈希表,可以遍历得到所有工作进程。如下所示,给所有的工作进程广播消息:

if (cluster.isMaster) {
    for (let i = 0; i < os.cpus().length; ++i) {
        cluster.fork();
    }
    // 给工作进程广播消息
    for (const id in cluster.workers) {
        cluster.workers[id].send({
            data: "msg"
        });
    }
} else if (cluster.isWorker) {
    // 工作进程接受到消息
    process.on("message", msg => {
        console.log("msg is", msg);
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 如何实现状态共享?

在上一个例子中,看到了借助 cluster.workers 和事件机制,来进行消息广播。但由于集群的每个节点是“分散”,所以对于有状态的服务应该想办法解决“状态共享”这个问题。

例如有需要我们进行总访问量统计的需求,并且将当前的访问量返回给客户端。由于每个进程都承载了一部分访问,工作进程接收到请求的时候,需要向主进程上报;工作进程接收到上报,更新访问总量,并且广播给各个工作进程。这就是一个完整的消息上报 => 状态更新 => 消息广播的过程。

按照上面的思路,首先封装工作进程的 http 逻辑,如下所示:

function runServer() {
    let visitTotal = 0;
    // 接收主进程的广播
    process.on("message", msg => {
        if (msg.tag === "broadcast") visitTotal = msg.visitTotal;
    });
    http.createServer((req, res) => {
        // 消息上报给主进程
        process.send({
            tag: "report"
        });
        res.statusCode = 200;
        res.end(`visit total times is ${visitTotal + 1}`);
    }).listen(4000);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

是的,就是通过传递消息上的一个字段,来标识是工作进程上报的消息还是主进程广播的消息。给主进程用的 broadcast() 函数如下:

function broadcast(workers, data) {
    for (const id in workers) {
        // 给工作进程广播消息
        workers[id].send({
            tag: "broadcast",
            ...data
        });
    }
}
1
2
3
4
5
6
7
8
9

最后,主进程中需要为工作进程添加message事件的监听器,这样才能收到工作进程的消息,并且更新保存在主进程中的状态(visitTotal),完成广播。代码如下:

if (cluster.isMaster) {
    let visitTotal = 0; // 访客总人数
    const cpuNum = os.cpus().length;
    for (let i = 0; i < cpuNum; ++i) {
        cluster.fork();
    }
    for (const id in cluster.workers) {
        cluster.workers[id].on("message", msg => {
            if (msg.tag === "report") {
                ++visitTotal;
                broadcast(cluster.workers, { visitTotal });
            }
        });
    }
} else if (cluster.isWorker) {
    runServer();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

其实,更常用的做法是专门准备一个服务器来进行统计,将服务单独部署。这里是为了深入理解和学习 cluster 模块。

# 如何处理进程退出?

cluster 模块中有 2 个 exit 事件:一个是 Worker 上的,仅用于工作进程中;另一个是主进程上,任何一个工作进程关闭都会触发。

在工作进程正常退出的时候,code 为 0,并且 Worker 上的 exitedAfterDisconnect 属性为 true。那么检测 code 和 exitedAfterDisconnect 属性,就能判断进程是否是异常退出。并且重新 fork 一个新的工作进程,来保持服务稳定运行。代码如下:

cluster.on("exit", (worker, code, signal) => {
    if (code || !worker.exitedAfterDisconnect) {
        console.log(`${worker.id} 崩溃,重启新的子进程`);
        cluster.fork();
    }
});
1
2
3
4
5
6

注意,exitedAfterDisconnect 属性在正常退出、调用 worker.kill() 或调用 worker.disconnect()时,均被设置为 true。因为调用 kill 和 disconnect 均为代码逻辑主动执行,属于程序的一部分。

# 更多进程控制方法:心跳保活、自动重启、负载检测

除了前面所讲的方法,进程控制的常见方法还有:心跳保活、自动重启、负载检测。

心跳保活:工作进程定时向主进程发送心跳包,主进程如果检测到长时间没有收到心跳包,要关闭对应的工作进程,并重启新的进程。

自动重启:给每个工作进程设置一个“生命周期”,例如 60mins。到时间后,通知主进程进行重启。

负载检测:工作进程和主进程可以定期检测 cpu 占用率、内存占用率、平均负载等指标,过高的话,则关闭重启对应工作进程。关于检测方法可以看这篇文章《NodeJS 模块研究 - os》

这些方法在 vemojs 中都有应用,具体可以看这篇文章:《VemoJS 源码拆解》

# 参考链接

来自: NodeJS集群模块研究 - cluster | 心谭博客
作者:心谭
Star仓库:github