异步编程

What — 是什么

异步编程(Asynchronous Programming)是 Node.js 的核心编程范式,通过非阻塞的方式处理 I/O 操作,使单线程能够高效处理大量并发请求。

核心概念:

  • 回调函数(Callback):最原始的异步模式,通过函数参数传递后续逻辑
  • Promise:表示异步操作最终完成(或失败)的对象,具有 pending → fulfilled/rejected 三种状态
  • async/await:基于 Promise 的语法糖,让异步代码看起来像同步代码
  • Generator:可通过 yield 暂停执行、通过 next() 恢复执行的函数,配合执行器可实现异步
  • AbortController:Web API 标准,用于中止一个或多个异步操作(如 fetch 请求)
  • 异步迭代器(Async Iterator):实现了 [Symbol.asyncIterator] 接口的对象,可通过 for await...of 逐项消费异步数据

异步演进路线:

回调函数 → Promise → Generator + co → async/await → 异步迭代器
(基础)     (规范化)   (过渡方案)       (主流方案)    (流式处理)

关键特性:

  • 回调是异步的基石,但容易陷入回调地狱
  • Promise 通过链式调用解决了回调嵌套问题,统一了异步错误处理
  • async/await 是目前最主流的异步写法,兼顾可读性与错误处理
  • Generator 提供了更灵活的执行控制能力(暂停/恢复/委托)
  • AbortController 提供了标准化的异步操作取消机制
  • 异步迭代器适用于流式数据的逐条消费

Why — 为什么

适用场景:

  • I/O 密集型:文件读写、网络请求、数据库查询——Node.js 的主战场
  • 并行请求:同时发起多个独立请求,用 Promise.all 等待全部完成
  • 流处理:逐行读取大文件、逐块处理网络数据,使用异步迭代器

三种异步方案对比:

维度回调函数(Callback)Promiseasync/await
可读性差(嵌套层级深)中(链式调用 but 仍需 .then)好(接近同步代码风格)
错误处理手动传递 error 参数.catch() 集中捕获try/catch 同步风格捕获
调试体验差(匿名函数栈不清)中(Promise 链有栈追踪)好(栈追踪清晰完整)
控制流困难(并行/竞速需额外库)强(Promise.all/race/allSettled/any)强(结合 Promise 静态方法)

优缺点:

  • ✅ 优点:

    • 异步 I/O 让单线程高效处理高并发
    • async/await 语法简洁,代码可读性高
    • Promise 提供丰富的组合方法(all、allSettled、race、any)
    • 统一的错误处理机制(try/catch + .catch)
    • 异步迭代器天然适配流式场景
  • ❌ 缺点:

    • 回调地狱导致代码难以维护(已可通过 Promise 化解决)
    • async/await 容易写出串行代码,遗漏并行优化
    • 未处理的 Promise rejection 不会抛出同步异常,容易被忽略
    • Generator 异步方案需要执行器(如 co 库),已非主流
    • AbortController 在 Node.js 中的支持范围有限(部分 API 未实现 signal)

How — 怎么用

快速上手

// 三种异步写法对比:获取用户信息

// 1. 回调方式
function getUserCallback(id, callback) {
    db.query('SELECT * FROM users WHERE id = ?', [id], (err, result) => {
        if (err) return callback(err);
        callback(null, result);
    });
}

// 2. Promise 方式
function getUserPromise(id) {
    return db.query('SELECT * FROM users WHERE id = ?', [id]);
}

// 3. async/await 方式
async function getUserAsync(id) {
    try {
        const result = await db.query('SELECT * FROM users WHERE id = ?', [id]);
        return result;
    } catch (err) {
        console.error('查询失败:', err.message);
        throw err;
    }
}

代码示例

1. 回调转 Promise(promisify):

const { promisify } = require('node:util');
const fs = require('node:fs');

// ---- 手动 Promise 化 ----
function readFilePromise(path, encoding) {
    return new Promise((resolve, reject) => {
        fs.readFile(path, encoding, (err, data) => {
            if (err) reject(err);
            else resolve(data);
        });
    });
}

// ---- 使用 Node.js 内置 promisify ----
const readFile = promisify(fs.readFile);

async function demo() {
    // 手动版本
    const content1 = await readFilePromise('./package.json', 'utf8');

    // 内置版本
    const content2 = await readFile('./package.json', 'utf8');

    // Node.js 18+ 推荐直接使用 fs.promises
    const { promises: fsp } = require('node:fs');
    const content3 = await fsp.readFile('./package.json', 'utf8');

    console.log(content1 === content2); // true
}

// ---- 批量 promisify 回调风格的模块 ----
const callbackModule = {
    getData(key, cb) {
        setTimeout(() => cb(null, { key, value: 42 }), 100);
    },
    setData(key, val, cb) {
        setTimeout(() => cb(null, 'OK'), 100);
    }
};

const asyncModule = {};
for (const [name, fn] of Object.entries(callbackModule)) {
    asyncModule[name] = promisify(fn);
}

// 使用
async function batchDemo() {
    await asyncModule.setData('x', 100);
    const data = await asyncModule.getData('x');
    console.log(data); // { key: 'x', value: 42 }
}

2. 并行控制(Promise.all / allSettled / race / any):

// ---- Promise.all:全部成功才成功,一个失败即失败 ----
async function fetchAllUsers(ids) {
    try {
        const users = await Promise.all(
            ids.map(id => fetchUser(id)) // 并行发起所有请求
        );
        return users; // 全部成功时返回结果数组
    } catch (err) {
        // 任意一个失败即进入 catch,其他请求仍在进行(无法取消)
        console.error('至少一个请求失败:', err.message);
        throw err;
    }
}

// ---- Promise.allSettled:等待全部完成,不论成功失败 ----
async function fetchAllUsersSettled(ids) {
    const results = await Promise.allSettled(
        ids.map(id => fetchUser(id))
    );

    const succeeded = results
        .filter(r => r.status === 'fulfilled')
        .map(r => r.value);

    const failed = results
        .filter(r => r.status === 'rejected')
        .map(r => r.reason);

    console.log(`成功 ${succeeded.length} 个,失败 ${failed.length} 个`);
    return { succeeded, failed };
}

// ---- Promise.race:取最先完成的(无论成功失败) ----
async function fetchWithTimeout(url, ms = 5000) {
    const fetchPromise = fetch(url);
    const timeoutPromise = new Promise((_, reject) =>
        setTimeout(() => reject(new Error(`请求超时 ${ms}ms`)), ms)
    );
    return Promise.race([fetchPromise, timeoutPromise]);
}

// ---- Promise.any:取最先成功的,全部失败才失败 ----
async function fetchFastest(mirrors) {
    try {
        const result = await Promise.any(
            mirrors.map(url => fetch(url).then(r => r.json()))
        );
        return result; // 最快成功的那个
    } catch (aggregateError) {
        // AggregateError:所有 Promise 都失败
        console.error('所有镜像均失败:', aggregateError.errors);
        throw aggregateError;
    }
}

// ---- 并发限制版 Promise.all ----
async function promiseLimit(tasks, concurrency = 3) {
    const results = [];
    const executing = new Set();

    for (const [index, task] of tasks.entries()) {
        const p = Promise.resolve().then(() => task());
        results[index] = p;
        executing.add(p);
        p.finally(() => executing.delete(p));

        if (executing.size >= concurrency) {
            await Promise.race(executing); // 等待任一完成
        }
    }

    return Promise.all(results);
}

// 使用:同时最多 3 个并发
async function limitedFetch() {
    const ids = Array.from({ length: 20 }, (_, i) => i + 1);
    const users = await promiseLimit(
        ids.map(id => () => fetchUser(id)),
        3
    );
    console.log(`获取了 ${users.length} 个用户`);
}

3. AbortController 取消请求:

const { AbortController } = require('node:abort-controller');

// ---- 基本使用:取消 fetch 请求 ----
async function fetchWithAbort(url, timeoutMs = 3000) {
    const controller = new AbortController();
    const timeout = setTimeout(() => controller.abort(), timeoutMs);

    try {
        const response = await fetch(url, {
            signal: controller.signal
        });
        return await response.json();
    } catch (err) {
        if (err.name === 'AbortError') {
            console.log('请求被取消(超时或手动中止)');
        } else {
            throw err;
        }
    } finally {
        clearTimeout(timeout);
    }
}

// ---- 手动取消 ----
async function manualAbort() {
    const controller = new AbortController();

    // 2 秒后手动取消
    setTimeout(() => {
        controller.abort(new Error('用户主动取消'));
    }, 2000);

    try {
        const data = await fetch('https://api.example.com/slow', {
            signal: controller.signal
        });
        return data;
    } catch (err) {
        if (err.name === 'AbortError') {
            console.log('手动取消原因:', err.cause?.message);
        }
    }
}

// ---- 一个 AbortController 取消多个操作 ----
async function multiAbort() {
    const controller = new AbortController();
    const { signal } = controller;

    // 同时发起多个请求,共享同一个 signal
    const promise1 = fetch('https://api.example.com/a', { signal });
    const promise2 = fetch('https://api.example.com/b', { signal });
    const promise3 = new Promise((_, reject) => {
        signal.addEventListener('abort', () => reject(signal.reason));
    });

    // 取消所有
    setTimeout(() => controller.abort(new Error('批量取消')), 1000);

    const results = await Promise.allSettled([promise1, promise2, promise3]);
    results.forEach((r, i) => {
        console.log(`请求 ${i + 1}: ${r.status}`);
    });
}

// ---- Node.js 原生 API 支持 signal ----
const fs = require('node:fs/promises');

async function readFileWithAbort() {
    const controller = new AbortController();
    setTimeout(() => controller.abort(), 5000);

    try {
        // Node.js 15.9+ 的 fs.readFile 支持 signal
        const content = await fs.readFile('./large-file.txt', {
            encoding: 'utf8',
            signal: controller.signal
        });
        return content;
    } catch (err) {
        if (err.name === 'AbortError') {
            console.log('文件读取被取消');
        }
        throw err;
    }
}

4. 异步迭代器处理流:

const fs = require('node:fs/promises');
const { createReadStream } = require('node:fs');
const readline = require('node:readline');

// ---- 逐行读取大文件(异步迭代器) ----
async function processLargeFile(filePath) {
    const fileStream = createReadStream(filePath, { encoding: 'utf8' });
    const rl = readline.createInterface({
        input: fileStream,
        crlfDelay: Infinity
    });

    let lineCount = 0;
    let totalBytes = 0;

    // for await...of 逐行消费
    for await (const line of rl) {
        lineCount++;
        totalBytes += Buffer.byteLength(line, 'utf8');

        // 可以在这里对每行做处理
        if (lineCount % 10000 === 0) {
            console.log(`已处理 ${lineCount} 行...`);
        }
    }

    console.log(`完成:共 ${lineCount} 行,${totalBytes} 字节`);
    return { lineCount, totalBytes };
}

// ---- 自定义异步迭代器 ----
class PaginatedData {
    constructor(apiClient, endpoint, pageSize = 50) {
        this.client = apiClient;
        this.endpoint = endpoint;
        this.pageSize = pageSize;
    }

    // 实现 [Symbol.asyncIterator] 接口
    async *[Symbol.asyncIterator]() {
        let page = 1;
        let hasMore = true;

        while (hasMore) {
            const response = await this.client.get(this.endpoint, {
                params: { page, pageSize: this.pageSize }
            });

            yield* response.data.items; // 逐条产出

            hasMore = response.data.hasNextPage;
            page++;
        }
    }
}

// 使用自定义异步迭代器
async function fetchAllPages() {
    const paginator = new PaginatedData(httpClient, '/api/users');

    const allUsers = [];
    for await (const user of paginator) {
        allUsers.push(user);
        if (allUsers.length % 100 === 0) {
            console.log(`已拉取 ${allUsers.length} 条...`);
        }
    }
    console.log(`全部拉取完成: ${allUsers.length} 条`);
}

// ---- 异步生成器 + 组合 ----
async function* filterAsync(iterable, predicate) {
    for await (const item of iterable) {
        if (await predicate(item)) {
            yield item;
        }
    }
}

async function* mapAsync(iterable, mapper) {
    for await (const item of iterable) {
        yield await mapper(item);
    }
}

// 链式处理流数据
async function processLogStream() {
    const logStream = createReadStream('./app.log', { encoding: 'utf8' });
    const rl = readline.createInterface({ input: logStream, crlfDelay: Infinity });

    // 过滤错误日志 → 提取关键信息
    const errorLines = filterAsync(rl, line => line.includes('[ERROR]'));
    const parsedErrors = mapAsync(errorLines, line => {
        const [timestamp, level, ...msg] = line.split(' ');
        return { timestamp, level, message: msg.join(' ') };
    });

    for await (const error of parsedErrors) {
        await notifyOpsTeam(error); // 逐条通知
    }
}

常见问题与踩坑

问题原因解决方案
async 函数中忘记 awaitasyncFn() 不加 await 返回 Promise 而非值,异常被静默吞掉始终 await 异步函数;配置 ESLint require-await 规则
Promise.all 一个失败全部失败Promise.all 是”全部或无”语义改用 Promise.allSettled 获取所有结果,或先做错误隔离
for 循环中的 async 串行化for...of 内 await 会导致逐个等待Promise.all(arr.map(...)) 实现并行,需要并发限制时使用 promiseLimit 模式
未处理的 Promise rejection没有 .catch 或 try/catch,Node.js 15+ 会终止进程监听 process.on('unhandledRejection');始终处理 Promise 链末尾
async 函数返回值被多包一层return await 在外层函数已是 async 时多余包裹外层 async 函数中直接 return promise,不需要 return await(异常场景除外)
回调风格 API 中的 this 丢失promisify 后的函数脱离原对象上下文使用 promisify 的绑定版本:promisify(obj.method).bind(obj) 或使用 util.promisify 的自定义符号

最佳实践

  1. 优先使用 async/await:比 .then 链更易读、更易调试,配合 try/catch 统一错误处理
  2. 并行优于串行:无依赖关系的异步操作应使用 Promise.all 并行执行,避免不必要的 await 等待
  3. 始终处理错误:每个 async 函数都应有 try/catch 或返回的 Promise 链末端有 .catch,同时监听 unhandledRejection 兜底
  4. 使用 AbortController 管理生命周期:对可取消的异步操作传入 signal,避免资源泄漏(如组件卸载后仍在请求)
  5. 大文件/流式数据使用异步迭代器:用 for await...of 逐项消费,避免一次性加载到内存
  6. 并发控制不可忽视:大量并行请求需限制并发数(如 3~5),避免打爆下游服务或触发限流
  7. 回调 API 迁移:新代码直接使用 fs.promises 等 Promise 化 API,旧代码用 util.promisify 批量转换

面试题

Q1: 什么是回调地狱?有哪些解决方案?

回调地狱(Callback Hell)是指多层异步回调嵌套导致代码向右缩进、难以阅读和维护的现象。典型出现在连续依赖的异步操作中(如先读配置、再查数据库、再写缓存,层层嵌套)。解决方案:(1) 拆分函数,给回调函数命名,减少嵌套层级;(2) 使用 Promise 链式调用 .then 替代嵌套;(3) 使用 async/await 让异步代码看起来像同步代码,彻底消除嵌套;(4) 使用 Generator + 执行器(如 co 库),这是 async/await 的前身方案。

Q2: Promise 的状态机制是什么?状态转换有哪些规则?

Promise 有三种状态:pending(进行中)、fulfilled(已成功)、rejected(已失败)。状态转换规则:只能从 pending 转为 fulfilled 或 rejected,且一旦转换就不可逆(称为 settled)。调用 resolve(value) 转为 fulfilled,调用 reject(reason) 转为 rejected。如果 resolve 传入的是另一个 Promise,则当前 Promise 的状态取决于传入的 Promise。注意:即使已经 settled,再次调用 resolve/reject 也不会报错,只是被静默忽略。

Q3: async/await 的错误处理有哪些方式?各有什么优缺点?

主要三种方式:(1) try/catch 包裹 await——最直观,但大量 try/catch 会使代码冗长;(2) .catch() 方法——const [err, data] = await promise.catch(e => [e]) 或包装函数 await-to 模式,避免 try 嵌套但需额外工具函数;(3) 全局兜底 process.on('unhandledRejection')——只应作为最后一道防线,不能替代局部错误处理。推荐:业务关键路径用 try/catch,辅助操作用 .catch() 吞掉非关键错误,全局监听做兜底报警。

Q4: Promise.all、Promise.allSettled、Promise.race、Promise.any 的区别?

方法成功条件失败条件返回值典型场景
all全部成功任一失败值数组并行请求需全部成功
allSettled永远成功永远不失败{status, value/reason} 数组需收集所有结果
race任一成功任一失败第一个结果超时控制
any任一成功全部失败第一个成功值多源竞速取最快
Promise.all 适合”全部或无”语义;allSettled 不关心失败率;race 取最快无论成败;any 取最快成功,全部失败时抛出 AggregateError。

Q5: 微任务(Microtask)的执行时机是什么?和宏任务有什么关系?

微任务在当前宏任务执行完毕后、下一个宏任务开始前全部执行。具体时机:每调用栈清空后,事件循环会检查微任务队列,将队列中所有微任务依次执行完毕,然后再进入下一个宏任务阶段。在 Node.js 中,每个事件循环阶段(timers、poll、check 等)结束后都会清空微任务队列。process.nextTick 队列优先于 Promise 微任务队列。这意味着微任务的执行是”批量”的——不会只执行一个就切走,而是清空整个队列。

Q6: AbortController 如何使用?在 Node.js 中有哪些 API 支持?

AbortController 提供标准化的取消机制:创建 controller 实例,将 controller.signal 传入支持 signal 的异步 API,需要取消时调用 controller.abort(reason)。Node.js 中支持的 API 包括:fs.readFile/writeFile 等 fs.promises 方法、http/https 请求、EventTarget.addEventListener、readline 接口。一个 signal 可传给多个操作,实现批量取消。取消后抛出 AbortError(err.name === ‘AbortError’)。注意:并非所有异步 API 都支持 signal,不支持的需自行监听 abort 事件实现取消逻辑。

Q7: 异步迭代器的应用场景有哪些?如何自定义异步迭代器?

异步迭代器适用于:逐行读取大文件(readline)、分页拉取 API 数据、逐条消费消息队列、流式处理日志等。自定义方式:(1) 实现 [Symbol.asyncIterator] 方法,返回一个异步迭代器对象(有 next() 方法返回 { value, done } 的 Promise);(2) 使用异步生成器 async function*,用 yield 产出值,运行时自动包装为异步迭代器。推荐用异步生成器,代码更简洁。消费端用 for await...of 语法逐项遍历,也可以手动调用 [Symbol.asyncIterator]().next() 逐条消费。

Q8: 如何实现并发控制?为什么不能直接 Promise.all 发起大量请求?

直接 Promise.all 一次性发起全部请求,可能导致:打爆下游服务的连接数/限流、本地内存飙升(所有响应同时驻留)、操作系统文件描述符耗尽。并发控制的典型实现:(1) 信号量模式——维护一个计数器,超过并发上限时排队等待;(2) 池模式——维护固定大小的执行池,一个完成再塞入下一个;(3) 第三方库 p-limit/p-queue。核心思路:用 Promise.race 等待池中任一完成,腾出空位后再塞入新任务。要注意错误隔离——单个任务失败不应中断整个并发池。


相关链接: