异步编程
What — 是什么
异步编程(Asynchronous Programming)是 Node.js 的核心编程范式,通过非阻塞的方式处理 I/O 操作,使单线程能够高效处理大量并发请求。
核心概念:
- 回调函数(Callback):最原始的异步模式,通过函数参数传递后续逻辑
- Promise:表示异步操作最终完成(或失败)的对象,具有 pending → fulfilled/rejected 三种状态
- async/await:基于 Promise 的语法糖,让异步代码看起来像同步代码
- Generator:可通过
yield暂停执行、通过next()恢复执行的函数,配合执行器可实现异步 - AbortController:Web API 标准,用于中止一个或多个异步操作(如 fetch 请求)
- 异步迭代器(Async Iterator):实现了
[Symbol.asyncIterator]接口的对象,可通过for await...of逐项消费异步数据
异步演进路线:
回调函数 → Promise → Generator + co → async/await → 异步迭代器
(基础) (规范化) (过渡方案) (主流方案) (流式处理)
关键特性:
- 回调是异步的基石,但容易陷入回调地狱
- Promise 通过链式调用解决了回调嵌套问题,统一了异步错误处理
- async/await 是目前最主流的异步写法,兼顾可读性与错误处理
- Generator 提供了更灵活的执行控制能力(暂停/恢复/委托)
- AbortController 提供了标准化的异步操作取消机制
- 异步迭代器适用于流式数据的逐条消费
Why — 为什么
适用场景:
- I/O 密集型:文件读写、网络请求、数据库查询——Node.js 的主战场
- 并行请求:同时发起多个独立请求,用
Promise.all等待全部完成 - 流处理:逐行读取大文件、逐块处理网络数据,使用异步迭代器
三种异步方案对比:
| 维度 | 回调函数(Callback) | Promise | async/await |
|---|---|---|---|
| 可读性 | 差(嵌套层级深) | 中(链式调用 but 仍需 .then) | 好(接近同步代码风格) |
| 错误处理 | 手动传递 error 参数 | .catch() 集中捕获 | try/catch 同步风格捕获 |
| 调试体验 | 差(匿名函数栈不清) | 中(Promise 链有栈追踪) | 好(栈追踪清晰完整) |
| 控制流 | 困难(并行/竞速需额外库) | 强(Promise.all/race/allSettled/any) | 强(结合 Promise 静态方法) |
优缺点:
-
✅ 优点:
- 异步 I/O 让单线程高效处理高并发
- async/await 语法简洁,代码可读性高
- Promise 提供丰富的组合方法(all、allSettled、race、any)
- 统一的错误处理机制(try/catch + .catch)
- 异步迭代器天然适配流式场景
-
❌ 缺点:
- 回调地狱导致代码难以维护(已可通过 Promise 化解决)
- async/await 容易写出串行代码,遗漏并行优化
- 未处理的 Promise rejection 不会抛出同步异常,容易被忽略
- Generator 异步方案需要执行器(如 co 库),已非主流
- AbortController 在 Node.js 中的支持范围有限(部分 API 未实现 signal)
How — 怎么用
快速上手
// 三种异步写法对比:获取用户信息
// 1. 回调方式
function getUserCallback(id, callback) {
db.query('SELECT * FROM users WHERE id = ?', [id], (err, result) => {
if (err) return callback(err);
callback(null, result);
});
}
// 2. Promise 方式
function getUserPromise(id) {
return db.query('SELECT * FROM users WHERE id = ?', [id]);
}
// 3. async/await 方式
async function getUserAsync(id) {
try {
const result = await db.query('SELECT * FROM users WHERE id = ?', [id]);
return result;
} catch (err) {
console.error('查询失败:', err.message);
throw err;
}
}
代码示例
1. 回调转 Promise(promisify):
const { promisify } = require('node:util');
const fs = require('node:fs');
// ---- 手动 Promise 化 ----
function readFilePromise(path, encoding) {
return new Promise((resolve, reject) => {
fs.readFile(path, encoding, (err, data) => {
if (err) reject(err);
else resolve(data);
});
});
}
// ---- 使用 Node.js 内置 promisify ----
const readFile = promisify(fs.readFile);
async function demo() {
// 手动版本
const content1 = await readFilePromise('./package.json', 'utf8');
// 内置版本
const content2 = await readFile('./package.json', 'utf8');
// Node.js 18+ 推荐直接使用 fs.promises
const { promises: fsp } = require('node:fs');
const content3 = await fsp.readFile('./package.json', 'utf8');
console.log(content1 === content2); // true
}
// ---- 批量 promisify 回调风格的模块 ----
const callbackModule = {
getData(key, cb) {
setTimeout(() => cb(null, { key, value: 42 }), 100);
},
setData(key, val, cb) {
setTimeout(() => cb(null, 'OK'), 100);
}
};
const asyncModule = {};
for (const [name, fn] of Object.entries(callbackModule)) {
asyncModule[name] = promisify(fn);
}
// 使用
async function batchDemo() {
await asyncModule.setData('x', 100);
const data = await asyncModule.getData('x');
console.log(data); // { key: 'x', value: 42 }
}
2. 并行控制(Promise.all / allSettled / race / any):
// ---- Promise.all:全部成功才成功,一个失败即失败 ----
async function fetchAllUsers(ids) {
try {
const users = await Promise.all(
ids.map(id => fetchUser(id)) // 并行发起所有请求
);
return users; // 全部成功时返回结果数组
} catch (err) {
// 任意一个失败即进入 catch,其他请求仍在进行(无法取消)
console.error('至少一个请求失败:', err.message);
throw err;
}
}
// ---- Promise.allSettled:等待全部完成,不论成功失败 ----
async function fetchAllUsersSettled(ids) {
const results = await Promise.allSettled(
ids.map(id => fetchUser(id))
);
const succeeded = results
.filter(r => r.status === 'fulfilled')
.map(r => r.value);
const failed = results
.filter(r => r.status === 'rejected')
.map(r => r.reason);
console.log(`成功 ${succeeded.length} 个,失败 ${failed.length} 个`);
return { succeeded, failed };
}
// ---- Promise.race:取最先完成的(无论成功失败) ----
async function fetchWithTimeout(url, ms = 5000) {
const fetchPromise = fetch(url);
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(new Error(`请求超时 ${ms}ms`)), ms)
);
return Promise.race([fetchPromise, timeoutPromise]);
}
// ---- Promise.any:取最先成功的,全部失败才失败 ----
async function fetchFastest(mirrors) {
try {
const result = await Promise.any(
mirrors.map(url => fetch(url).then(r => r.json()))
);
return result; // 最快成功的那个
} catch (aggregateError) {
// AggregateError:所有 Promise 都失败
console.error('所有镜像均失败:', aggregateError.errors);
throw aggregateError;
}
}
// ---- 并发限制版 Promise.all ----
async function promiseLimit(tasks, concurrency = 3) {
const results = [];
const executing = new Set();
for (const [index, task] of tasks.entries()) {
const p = Promise.resolve().then(() => task());
results[index] = p;
executing.add(p);
p.finally(() => executing.delete(p));
if (executing.size >= concurrency) {
await Promise.race(executing); // 等待任一完成
}
}
return Promise.all(results);
}
// 使用:同时最多 3 个并发
async function limitedFetch() {
const ids = Array.from({ length: 20 }, (_, i) => i + 1);
const users = await promiseLimit(
ids.map(id => () => fetchUser(id)),
3
);
console.log(`获取了 ${users.length} 个用户`);
}
3. AbortController 取消请求:
const { AbortController } = require('node:abort-controller');
// ---- 基本使用:取消 fetch 请求 ----
async function fetchWithAbort(url, timeoutMs = 3000) {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), timeoutMs);
try {
const response = await fetch(url, {
signal: controller.signal
});
return await response.json();
} catch (err) {
if (err.name === 'AbortError') {
console.log('请求被取消(超时或手动中止)');
} else {
throw err;
}
} finally {
clearTimeout(timeout);
}
}
// ---- 手动取消 ----
async function manualAbort() {
const controller = new AbortController();
// 2 秒后手动取消
setTimeout(() => {
controller.abort(new Error('用户主动取消'));
}, 2000);
try {
const data = await fetch('https://api.example.com/slow', {
signal: controller.signal
});
return data;
} catch (err) {
if (err.name === 'AbortError') {
console.log('手动取消原因:', err.cause?.message);
}
}
}
// ---- 一个 AbortController 取消多个操作 ----
async function multiAbort() {
const controller = new AbortController();
const { signal } = controller;
// 同时发起多个请求,共享同一个 signal
const promise1 = fetch('https://api.example.com/a', { signal });
const promise2 = fetch('https://api.example.com/b', { signal });
const promise3 = new Promise((_, reject) => {
signal.addEventListener('abort', () => reject(signal.reason));
});
// 取消所有
setTimeout(() => controller.abort(new Error('批量取消')), 1000);
const results = await Promise.allSettled([promise1, promise2, promise3]);
results.forEach((r, i) => {
console.log(`请求 ${i + 1}: ${r.status}`);
});
}
// ---- Node.js 原生 API 支持 signal ----
const fs = require('node:fs/promises');
async function readFileWithAbort() {
const controller = new AbortController();
setTimeout(() => controller.abort(), 5000);
try {
// Node.js 15.9+ 的 fs.readFile 支持 signal
const content = await fs.readFile('./large-file.txt', {
encoding: 'utf8',
signal: controller.signal
});
return content;
} catch (err) {
if (err.name === 'AbortError') {
console.log('文件读取被取消');
}
throw err;
}
}
4. 异步迭代器处理流:
const fs = require('node:fs/promises');
const { createReadStream } = require('node:fs');
const readline = require('node:readline');
// ---- 逐行读取大文件(异步迭代器) ----
async function processLargeFile(filePath) {
const fileStream = createReadStream(filePath, { encoding: 'utf8' });
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
let lineCount = 0;
let totalBytes = 0;
// for await...of 逐行消费
for await (const line of rl) {
lineCount++;
totalBytes += Buffer.byteLength(line, 'utf8');
// 可以在这里对每行做处理
if (lineCount % 10000 === 0) {
console.log(`已处理 ${lineCount} 行...`);
}
}
console.log(`完成:共 ${lineCount} 行,${totalBytes} 字节`);
return { lineCount, totalBytes };
}
// ---- 自定义异步迭代器 ----
class PaginatedData {
constructor(apiClient, endpoint, pageSize = 50) {
this.client = apiClient;
this.endpoint = endpoint;
this.pageSize = pageSize;
}
// 实现 [Symbol.asyncIterator] 接口
async *[Symbol.asyncIterator]() {
let page = 1;
let hasMore = true;
while (hasMore) {
const response = await this.client.get(this.endpoint, {
params: { page, pageSize: this.pageSize }
});
yield* response.data.items; // 逐条产出
hasMore = response.data.hasNextPage;
page++;
}
}
}
// 使用自定义异步迭代器
async function fetchAllPages() {
const paginator = new PaginatedData(httpClient, '/api/users');
const allUsers = [];
for await (const user of paginator) {
allUsers.push(user);
if (allUsers.length % 100 === 0) {
console.log(`已拉取 ${allUsers.length} 条...`);
}
}
console.log(`全部拉取完成: ${allUsers.length} 条`);
}
// ---- 异步生成器 + 组合 ----
async function* filterAsync(iterable, predicate) {
for await (const item of iterable) {
if (await predicate(item)) {
yield item;
}
}
}
async function* mapAsync(iterable, mapper) {
for await (const item of iterable) {
yield await mapper(item);
}
}
// 链式处理流数据
async function processLogStream() {
const logStream = createReadStream('./app.log', { encoding: 'utf8' });
const rl = readline.createInterface({ input: logStream, crlfDelay: Infinity });
// 过滤错误日志 → 提取关键信息
const errorLines = filterAsync(rl, line => line.includes('[ERROR]'));
const parsedErrors = mapAsync(errorLines, line => {
const [timestamp, level, ...msg] = line.split(' ');
return { timestamp, level, message: msg.join(' ') };
});
for await (const error of parsedErrors) {
await notifyOpsTeam(error); // 逐条通知
}
}
常见问题与踩坑
| 问题 | 原因 | 解决方案 |
|---|---|---|
| async 函数中忘记 await | asyncFn() 不加 await 返回 Promise 而非值,异常被静默吞掉 | 始终 await 异步函数;配置 ESLint require-await 规则 |
| Promise.all 一个失败全部失败 | Promise.all 是”全部或无”语义 | 改用 Promise.allSettled 获取所有结果,或先做错误隔离 |
| for 循环中的 async 串行化 | for...of 内 await 会导致逐个等待 | 用 Promise.all(arr.map(...)) 实现并行,需要并发限制时使用 promiseLimit 模式 |
| 未处理的 Promise rejection | 没有 .catch 或 try/catch,Node.js 15+ 会终止进程 | 监听 process.on('unhandledRejection');始终处理 Promise 链末尾 |
| async 函数返回值被多包一层 | return await 在外层函数已是 async 时多余包裹 | 外层 async 函数中直接 return promise,不需要 return await(异常场景除外) |
| 回调风格 API 中的 this 丢失 | promisify 后的函数脱离原对象上下文 | 使用 promisify 的绑定版本:promisify(obj.method).bind(obj) 或使用 util.promisify 的自定义符号 |
最佳实践
- 优先使用 async/await:比 .then 链更易读、更易调试,配合 try/catch 统一错误处理
- 并行优于串行:无依赖关系的异步操作应使用
Promise.all并行执行,避免不必要的 await 等待 - 始终处理错误:每个 async 函数都应有 try/catch 或返回的 Promise 链末端有 .catch,同时监听
unhandledRejection兜底 - 使用 AbortController 管理生命周期:对可取消的异步操作传入 signal,避免资源泄漏(如组件卸载后仍在请求)
- 大文件/流式数据使用异步迭代器:用
for await...of逐项消费,避免一次性加载到内存 - 并发控制不可忽视:大量并行请求需限制并发数(如 3~5),避免打爆下游服务或触发限流
- 回调 API 迁移:新代码直接使用
fs.promises等 Promise 化 API,旧代码用util.promisify批量转换
面试题
Q1: 什么是回调地狱?有哪些解决方案?
回调地狱(Callback Hell)是指多层异步回调嵌套导致代码向右缩进、难以阅读和维护的现象。典型出现在连续依赖的异步操作中(如先读配置、再查数据库、再写缓存,层层嵌套)。解决方案:(1) 拆分函数,给回调函数命名,减少嵌套层级;(2) 使用 Promise 链式调用 .then 替代嵌套;(3) 使用 async/await 让异步代码看起来像同步代码,彻底消除嵌套;(4) 使用 Generator + 执行器(如 co 库),这是 async/await 的前身方案。
Q2: Promise 的状态机制是什么?状态转换有哪些规则?
Promise 有三种状态:pending(进行中)、fulfilled(已成功)、rejected(已失败)。状态转换规则:只能从 pending 转为 fulfilled 或 rejected,且一旦转换就不可逆(称为 settled)。调用 resolve(value) 转为 fulfilled,调用 reject(reason) 转为 rejected。如果 resolve 传入的是另一个 Promise,则当前 Promise 的状态取决于传入的 Promise。注意:即使已经 settled,再次调用 resolve/reject 也不会报错,只是被静默忽略。
Q3: async/await 的错误处理有哪些方式?各有什么优缺点?
主要三种方式:(1) try/catch 包裹 await——最直观,但大量 try/catch 会使代码冗长;(2) .catch() 方法——
const [err, data] = await promise.catch(e => [e])或包装函数await-to模式,避免 try 嵌套但需额外工具函数;(3) 全局兜底process.on('unhandledRejection')——只应作为最后一道防线,不能替代局部错误处理。推荐:业务关键路径用 try/catch,辅助操作用 .catch() 吞掉非关键错误,全局监听做兜底报警。
Q4: Promise.all、Promise.allSettled、Promise.race、Promise.any 的区别?
方法 成功条件 失败条件 返回值 典型场景 all 全部成功 任一失败 值数组 并行请求需全部成功 allSettled 永远成功 永远不失败 {status, value/reason} 数组 需收集所有结果 race 任一成功 任一失败 第一个结果 超时控制 any 任一成功 全部失败 第一个成功值 多源竞速取最快 Promise.all适合”全部或无”语义;allSettled不关心失败率;race取最快无论成败;any取最快成功,全部失败时抛出 AggregateError。
Q5: 微任务(Microtask)的执行时机是什么?和宏任务有什么关系?
微任务在当前宏任务执行完毕后、下一个宏任务开始前全部执行。具体时机:每调用栈清空后,事件循环会检查微任务队列,将队列中所有微任务依次执行完毕,然后再进入下一个宏任务阶段。在 Node.js 中,每个事件循环阶段(timers、poll、check 等)结束后都会清空微任务队列。process.nextTick 队列优先于 Promise 微任务队列。这意味着微任务的执行是”批量”的——不会只执行一个就切走,而是清空整个队列。
Q6: AbortController 如何使用?在 Node.js 中有哪些 API 支持?
AbortController 提供标准化的取消机制:创建 controller 实例,将 controller.signal 传入支持 signal 的异步 API,需要取消时调用 controller.abort(reason)。Node.js 中支持的 API 包括:fs.readFile/writeFile 等 fs.promises 方法、http/https 请求、EventTarget.addEventListener、readline 接口。一个 signal 可传给多个操作,实现批量取消。取消后抛出 AbortError(err.name === ‘AbortError’)。注意:并非所有异步 API 都支持 signal,不支持的需自行监听 abort 事件实现取消逻辑。
Q7: 异步迭代器的应用场景有哪些?如何自定义异步迭代器?
异步迭代器适用于:逐行读取大文件(readline)、分页拉取 API 数据、逐条消费消息队列、流式处理日志等。自定义方式:(1) 实现
[Symbol.asyncIterator]方法,返回一个异步迭代器对象(有 next() 方法返回{ value, done }的 Promise);(2) 使用异步生成器async function*,用yield产出值,运行时自动包装为异步迭代器。推荐用异步生成器,代码更简洁。消费端用for await...of语法逐项遍历,也可以手动调用[Symbol.asyncIterator]().next()逐条消费。
Q8: 如何实现并发控制?为什么不能直接 Promise.all 发起大量请求?
直接
Promise.all一次性发起全部请求,可能导致:打爆下游服务的连接数/限流、本地内存飙升(所有响应同时驻留)、操作系统文件描述符耗尽。并发控制的典型实现:(1) 信号量模式——维护一个计数器,超过并发上限时排队等待;(2) 池模式——维护固定大小的执行池,一个完成再塞入下一个;(3) 第三方库 p-limit/p-queue。核心思路:用Promise.race等待池中任一完成,腾出空位后再塞入新任务。要注意错误隔离——单个任务失败不应中断整个并发池。
相关链接:
- 事件循环
- Stream与Buffer
- 错误处理与调试
- Node.js async_hooks 文档:https://nodejs.org/api/async_hooks.html