进程与子进程

What — 是什么

Node.js 的 child_process 模块提供了创建子进程的能力,允许在主进程之外执行外部命令、运行脚本或进行进程间通信(IPC),是突破单线程限制、利用多核 CPU 的基础机制。

核心概念:

  • child_process.fork():创建 Node.js 子进程,自动建立 IPC 通道,专用于 Node.js 进程间通信
  • child_process.exec():执行 shell 命令,缓冲全部输出,适合简单命令(输出量小)
  • child_process.execFile():直接执行可执行文件(不经过 shell),比 exec 更安全高效
  • child_process.spawn():启动进程并流式获取输出,适合长时间运行/大量输出的进程
  • IPC 通信fork() 创建的子进程自动建立 message 事件通道,父子进程通过 send() 传递序列化数据
  • 进程间共享:父子进程内存独立,通过 IPC 消息传递或 SharedArrayBuffer 共享内存

关键特性:

  • spawn 返回 ChildProcess 对象,stdout/stderr 是 Stream,可流式消费
  • exec 缓冲输出,有 maxBuffer 限制(默认 1MB),超限触发 error
  • forkspawn 的特化版本,自动设置 ipc 通道和 NODE_CHANNEL_FD
  • IPC 传递的数据会被序列化/反序列化(JSON),不支持函数/Symbol/循环引用
  • 子进程可通过 process.disconnect() 主动关闭 IPC 通道
  • child.stdin/stdout/stderr 可 pipe 到其他流

运行机制:

  • 内存模型:每个子进程有独立的 V8 堆和 libuv 事件循环,父子进程内存完全隔离
  • 执行模型:spawn/fork 立即返回 ChildProcess 对象,子进程异步执行;exec/execFile 收集输出后在回调中返回
  • 并发模型:子进程由操作系统调度,与主进程并行执行;IPC 消息通过管道传递,主进程事件循环中异步处理

类型系统:

  • ChildProcess 对象:pid(进程 ID)、exitCode(退出码)、signalCode(终止信号)、stdin/stdout/stderr(标准流)
  • IPC 消息类型:可序列化的 JSON 值(字符串/数字/对象/数组),特殊处理 net.Socket/net.Server 可通过 IPC 传递文件描述符

Why — 为什么

适用场景:

  • CPU 密集计算:将计算任务 offload 到子进程,避免阻塞主进程事件循环
  • 执行系统命令:调用 git/npm/ffmpeg 等外部工具
  • 并行任务处理:多个子进程同时处理独立任务
  • 进程隔离:不信任的代码在沙箱子进程中执行
  • 微服务进程管理:主进程管理多个服务子进程

对比子进程方法:

维度spawnexecexecFilefork
输出方式流式缓冲缓冲流式+IPC
Shell可选
适用数据量大(无限)小(<1MB)小(<1MB)IPC 消息
Node.js 专用
IPC 通信

优缺点:

  • ✅ 优点:
    • 突破单线程限制,利用多核 CPU
    • 进程隔离,一个崩溃不影响另一个
    • IPC 通信简单直观
    • 可执行任意系统命令
  • ❌ 缺点:
    • 进程创建开销大(每个子进程 ~30MB 内存)
    • IPC 序列化有性能开销
    • 内存不共享,大数据需通过文件或共享内存传递
    • 子进程管理复杂,需处理退出/重启/错误

How — 怎么用

快速上手

const { spawn, exec, fork } = require('child_process');

// spawn:流式执行命令
const ls = spawn('ls', ['-la', '/usr']);
ls.stdout.on('data', (data) => console.log(data.toString()));
ls.stderr.on('data', (data) => console.error(data.toString()));
ls.on('close', (code) => console.log(`退出码: ${code}`));

// exec:缓冲执行命令
exec('git status', (error, stdout, stderr) => {
    if (error) { console.error(error); return; }
    console.log(stdout);
});

// fork:Node.js 子进程 + IPC
const child = fork('./worker.js');
child.send({ type: 'compute', data: [1, 2, 3] });
child.on('message', (msg) => console.log('结果:', msg));

代码示例

spawn 流式处理 + 错误处理:

const { spawn } = require('child_process');

// 执行长时间运行的命令,流式获取输出
function runCommand(command, args, options = {}) {
    return new Promise((resolve, reject) => {
        const proc = spawn(command, args, {
            cwd: options.cwd,
            env: { ...process.env, ...options.env },
            shell: true,
            stdio: ['pipe', 'pipe', 'pipe']
        });

        let stdout = '';
        let stderr = '';

        proc.stdout.on('data', (data) => {
            stdout += data;
            if (options.onData) options.onData(data.toString());
        });

        proc.stderr.on('data', (data) => {
            stderr += data;
            if (options.onError) options.onError(data.toString());
        });

        proc.on('close', (code) => {
            if (code === 0) resolve({ stdout, stderr, code });
            else reject(new Error(`Command failed with code ${code}: ${stderr}`));
        });

        proc.on('error', (err) => reject(err));

        // 超时处理
        if (options.timeout) {
            setTimeout(() => {
                proc.kill('SIGTERM');
                reject(new Error(`Command timed out after ${options.timeout}ms`));
            }, options.timeout);
        }
    });
}

// 使用示例:执行 npm install 并实时输出
await runCommand('npm', ['install'], {
    cwd: '/project/path',
    onData: (line) => process.stdout.write(line),
    timeout: 120000
});

fork + IPC 进程池:

// worker.js — 子进程代码
process.on('message', (msg) => {
    if (msg.type === 'compute') {
        try {
            const result = heavyComputation(msg.data);
            process.send({ type: 'result', id: msg.id, data: result });
        } catch (err) {
            process.send({ type: 'error', id: msg.id, error: err.message });
        }
    }
});

function heavyComputation(data) {
    // CPU 密集计算
    let sum = 0;
    for (let i = 0; i < data.iterations; i++) {
        sum += Math.sqrt(i) * Math.sin(i);
    }
    return sum;
}

// pool.js — 进程池管理
const { fork } = require('child_process');
const path = require('path');

class ProcessPool {
    constructor(workerPath, size = 4) {
        this.workerPath = workerPath;
        this.size = size;
        this.workers = [];
        this.queue = [];
        this.taskId = 0;
        this.callbacks = new Map();

        for (let i = 0; i < size; i++) {
            this._createWorker();
        }
    }

    _createWorker() {
        const worker = fork(this.workerPath);
        worker.busy = false;

        worker.on('message', (msg) => {
            const cb = this.callbacks.get(msg.id);
            if (cb) {
                this.callbacks.delete(msg.id);
                if (msg.type === 'result') cb.resolve(msg.data);
                else cb.reject(new Error(msg.error));
            }
            worker.busy = false;
            this._processQueue();
        });

        worker.on('exit', (code) => {
            console.error(`Worker ${worker.pid} exited with code ${code}`);
            const idx = this.workers.indexOf(worker);
            if (idx !== -1) this.workers.splice(idx, 1);
            this._createWorker(); // 自动补充
        });

        this.workers.push(worker);
    }

    send(data) {
        return new Promise((resolve, reject) => {
            const id = this.taskId++;
            const task = { id, data, resolve, reject };

            const idleWorker = this.workers.find(w => !w.busy);
            if (idleWorker) {
                idleWorker.busy = true;
                this.callbacks.set(id, task);
                idleWorker.send({ type: 'compute', id, data });
            } else {
                this.queue.push(task);
            }
        });
    }

    _processQueue() {
        if (this.queue.length === 0) return;
        const idleWorker = this.workers.find(w => !w.busy);
        if (!idleWorker) return;

        const task = this.queue.shift();
        idleWorker.busy = true;
        this.callbacks.set(task.id, task);
        idleWorker.send({ type: 'compute', id: task.id, data: task.data });
    }

    close() {
        this.workers.forEach(w => w.kill());
        this.workers = [];
    }
}

// 使用示例
const pool = new ProcessPool(path.join(__dirname, 'worker.js'), 4);

async function batchCompute() {
    const tasks = Array.from({ length: 10 }, (_, i) => ({
        iterations: 1000000 * (i + 1)
    }));

    const results = await Promise.all(
        tasks.map(data => pool.send(data))
    );
    console.log('All results:', results);
    pool.close();
}

常见问题与踩坑

问题原因解决方案
exec 输出截断超过 maxBuffer(默认 1MB)增大 maxBuffer 或改用 spawn 流式处理
子进程僵尸未监听 exit 事件,进程句柄未释放监听 close/exit,调用 kill 清理
IPC 传递大数据慢消息需 JSON 序列化用 SharedArrayBuffer 或临时文件
spawn 找不到命令PATH 环境变量未继承spawn(cmd, args, { env: process.env })
fork 子进程未退出子进程有活跃的定时器/连接子进程完成后调用 process.disconnect()process.exit()
Shell 注入风险exec 拼接用户输入到命令execFile/spawn 传参数组,避免 shell 解析

最佳实践

  • 长时间/大量输出用 spawn,简短命令用 exec
  • Node.js 进程间通信用 fork + IPC
  • 始终监听子进程的 errorexit 事件
  • 避免用 exec 拼接用户输入,防 Shell 注入
  • 使用进程池管理多个子进程,避免频繁创建销毁
  • 大数据传递用 SharedArrayBuffer 或文件,而非 IPC 消息

面试题

Q1: spawn、exec、execFile、fork 四个方法的区别?

spawn:启动进程返回 Stream,流式获取输出,适合大量输出/长时间运行。exec:执行 shell 命令,缓冲全部输出在回调中返回,适合短命令(有 maxBuffer 限制)。execFile:直接执行文件不经过 shell,比 exec 安全高效(无 shell 注入风险),其余同 exec。forkspawn 的特化版,专用于 Node.js 子进程,自动建立 IPC 通道,可通过 send() 通信。选择依据:Node.js 进程间通信用 fork,执行系统命令用 spawn/exec,安全执行文件用 execFile。

Q2: 子进程如何与主进程通信?IPC 的原理是什么?

fork() 创建的子进程自动建立 IPC 通道(基于 pipe),父子进程通过 process.send(msg) 发送消息,process.on('message', cb) 接收消息。IPC 底层使用 libuv 的 pipe(Unix Domain Socket/Named Pipe),数据经 JSON 序列化后传输。特殊能力:可通过 IPC 传递 net.Socketnet.Server 对象(实际传递文件描述符),实现连接迁移(如优雅重启时将已建立的连接迁移到新进程)。非 fork 的子进程没有 IPC 通道。

Q3: 如何在 Node.js 中实现多进程并行计算?

三种方案:① child_process.fork() + IPC:创建多个子进程,通过消息分发任务,子进程计算后返回结果。适合任务粒度较大(秒级)的场景。② worker_threads:线程级并行,共享 SharedArrayBuffer,通信开销更低。适合细粒度计算。③ cluster 模块:主进程监听端口,工作进程共享端口处理请求。适合 HTTP 服务横向扩展。进程池模式:维护固定数量的子进程,任务队列 + 空闲分配,避免频繁创建销毁进程。

Q4: 如何防止 Shell 注入?

Shell 注入发生在用 exec() 拼接用户输入时,如 exec('ls ' + userInput),用户输入 ; rm -rf / 会执行恶意命令。防范:① 用 spawn/execFile 传参数组——spawn('ls', [userInput]),参数直接传递不经 shell 解析;② 必须用 exec 时,对用户输入严格校验和转义;③ 设置 shell: false(spawn 默认);④ 使用白名单限制可执行的命令;⑤ 用 execFile 直接执行文件而非 shell 命令。

Q5: exec 的 maxBuffer 限制是什么?如何处理大输出?

execexecFile 将 stdout/stderr 缓冲在内存中,maxBuffer 指定最大字节数(默认 1024*1024 即 1MB)。超过限制触发 error 事件,子进程被杀掉。处理大输出:① 改用 spawn,流式消费 stdout/stderr(推荐);② 增大 maxBufferexec(cmd, { maxBuffer: 10 * 1024 * 1024 })(治标不治本);③ 将输出重定向到文件,命令完成后读取文件。生产环境推荐 spawn 流式处理。

Q6: fork 创建的子进程如何优雅退出?

步骤:① 主进程调用 child.send({ type: 'shutdown' }) 通知子进程;② 子进程收到消息后清理资源(关闭数据库连接、完成进行中的任务);③ 子进程调用 process.exit(0)process.disconnect();④ 主进程设超时,如果子进程未退出则 child.kill('SIGTERM'),再超时 child.kill('SIGKILL')。注意:child.kill() 默认发送 SIGTERM,child.kill('SIGKILL') 强制杀掉无法被捕获。

Q7: 多进程间如何共享数据?

三种方式:① IPC 消息传递:fork()send() 传递 JSON 数据,简单但有序列化开销和大小限制;② SharedArrayBuffer:worker_threads 中使用,零拷贝共享内存,但只支持二进制数据(需配合 Atomics 做同步);③ 外部存储:Redis/MQ/文件系统,多进程通过外部介质共享数据。注意 child_process 不支持 SharedArrayBuffer(需用 worker_threads),fork 的子进程只能通过 IPC 或外部存储共享。

Q8: 如何监控子进程的健康状态?

监控方式:① 监听 exit 事件,检测异常退出(code !== 0);② 监听 error 事件,捕获启动失败;③ 主进程定期通过 IPC 发送心跳请求,子进程响应表示存活;④ 监控子进程的内存/CPU(process.memoryUsage() 通过 IPC 上报);⑤ 使用进程池自动重启崩溃的子进程。PM2 的集群模式已内置这些能力——自动重启、内存超限重启、心跳检测。


相关链接: