1. asyncio 是什么?它解决什么问题? #
asyncio 是 Python 标准库中用于编写并发 I/O 程序的一套工具(常见 I/O:网络请求、磁盘读写、等待数据库响应等)。
它不是“多线程并行执行”,而是通过 事件循环(Event Loop) 在一个线程里调度多个任务:当某个任务在等待 I/O 时,CPU 不空转,而是去执行别的任务,从而提升整体吞吐量。
你可以把它理解成:
- 协程:可暂停/可恢复的函数(像“会让出控制权的函数”)
- 事件循环:一个调度器,负责决定“现在让哪个协程继续跑”
- await:显式告诉调度器“我这里要等一会儿,你先去跑别的协程”
2. 前置知识 #
2.1 并发 vs 并行(不要混淆) #
- 并行(Parallel):多个任务同时在多个 CPU 核心上执行(同一时刻真正在跑多个)
- 并发(Concurrent):多个任务在同一段时间内交错推进(同一时刻可能只跑一个,但切换很快)
asyncio 的强项是 并发 I/O:让“等待 I/O 的时间”被别的任务利用起来。
2.2 I/O 密集 vs CPU 密集(决定你该不该用 asyncio) #
- I/O 密集:大部分时间都在等外部(网络/磁盘/数据库),适合 asyncio
- CPU 密集:大部分时间在算数/推理/图像处理,asyncio 帮助不大(通常用多进程或把计算下放到线程池/进程池)
2.3 什么叫“阻塞”? #
如果你在协程里调用了一个会长时间卡住的同步函数(比如 time.sleep(5)、某些同步网络请求),事件循环就被卡死了,其他协程也跑不了,这就是“阻塞事件循环”。
3. 认识 async/await 与事件循环 #
先用一个最短脚本建立直觉:await asyncio.sleep() 表示“我在等”,此时事件循环可以去跑别的任务。
# 导入 asyncio:Python 标准库中的异步框架
import asyncio
# 定义一个协程函数:使用 async def
async def hello_world():
# 打印第一句话
print("Hello")
# 等待 1 秒(非阻塞地等待,会把控制权交还给事件循环)
await asyncio.sleep(1)
# 打印第二句话
print("World")
# 定义程序入口协程
async def main():
# 调用并等待协程执行完成
await hello_world()
# 运行事件循环并执行 main 协程(Python 3.7+ 推荐写法)
if __name__ == "__main__":
asyncio.run(main())// 定义一个 sleep 函数:返回一个在 ms 毫秒后完成的 Promise
function sleep(ms) {
// 返回 Promise,用 setTimeout 在未来某个时刻 resolve
return new Promise((resolve) => setTimeout(resolve, ms));
}
// 定义一个 async 函数:相当于 Python 的 async def 协程
async function helloWorld() {
// 打印第一句话
console.log("Hello");
// 等待 1 秒(非阻塞:把控制权交还给事件循环)
await sleep(1000);
// 打印第二句话
console.log("World");
}
// 定义主函数:相当于 Python 的 main 协程
async function main() {
// 调用并等待 helloWorld 执行完成
await helloWorld();
}
// Node.js 程序入口
// 调用 main,并在出现未捕获异常时打印错误
main().catch((err) => console.error(err));4. 协程(Coroutine):它到底是什么? #
协程可以理解为“可暂停的函数”。
当协程执行到 await 时,会暂停自己并把控制权交还给事件循环;等到被等待的事情完成后,再从暂停点继续往下执行。
4.1 协程的返回值 #
协程函数可以 return 一个值,调用方用 await 拿到返回值。
# 导入 asyncio
import asyncio
# 定义一个协程:模拟 I/O 后返回结果
async def fetch_data():
# 模拟等待 0.5 秒
await asyncio.sleep(0.5)
# 返回结果字符串
return "data"
# 主协程
async def main():
# 等待协程并拿到返回值
result = await fetch_data()
# 打印返回值
print("result =", result)
# 程序入口
if __name__ == "__main__":
asyncio.run(main())// 定义 sleep:模拟 I/O 等待
function sleep(ms) {
// 返回 Promise,等待 ms 毫秒后完成
return new Promise((resolve) => setTimeout(resolve, ms));
}
// 定义 async 函数:模拟 I/O 后返回结果
async function fetchData() {
// 等待 0.5 秒
await sleep(500);
// 返回结果字符串
return "data";
}
// 主函数
async function main() {
// 等待 fetchData 并拿到返回值
const result = await fetchData();
// 打印返回值
console.log("result =", result);
}
// 程序入口
main().catch((err) => console.error(err));5. 任务(Task):让多个协程“并发”起来 #
仅仅 await 两个协程,默认是串行:第一个完成后才执行第二个。
要并发执行,你需要把协程包装成 Task 并交给事件循环调度。
5.1 串行 vs 并发 #
# 导入 asyncio
import asyncio
# 定义一个协程:等待 delay 秒后打印
async def say(name, delay):
# 等待 delay 秒(非阻塞)
await asyncio.sleep(delay)
# 打印输出
print(f"{name} done (delay={delay})")
# 主协程:演示串行与并发的区别
async def main():
# 打印分隔线
print("=== 串行执行(总时间≈ 1 + 2)===")
# 串行:先等 1 秒的任务完成
await say("Alice", 1)
# 串行:再等 2 秒的任务完成
await say("Bob", 2)
# 打印分隔线
print("=== 并发执行(总时间≈ max(1, 2))===")
# 创建任务:把协程交给事件循环调度
task1 = asyncio.create_task(say("Alice", 1))
# 创建任务:把协程交给事件循环调度
task2 = asyncio.create_task(say("Bob", 2))
# 等待两个任务都完成
await task1
# 等待第二个任务完成(如果已经完成会立刻返回)
await task2
# 程序入口
if __name__ == "__main__":
asyncio.run(main())// 定义 sleep:模拟非阻塞等待
function sleep(ms) {
// 返回 Promise,让事件循环在等待期间去做别的事
return new Promise((resolve) => setTimeout(resolve, ms));
}
// 定义 async 函数:等待 delay 秒后打印
async function say(name, delaySeconds) {
// 等待 delaySeconds 秒
await sleep(delaySeconds * 1000);
// 打印输出
console.log(`${name} done (delay=${delaySeconds})`);
}
// 主函数:演示串行与并发的区别
async function main() {
// 打印分隔线
console.log("=== 串行执行(总时间≈ 1 + 2)===");
// 串行:先等 1 秒的任务完成
await say("Alice", 1);
// 串行:再等 2 秒的任务完成
await say("Bob", 2);
// 打印分隔线
console.log("=== 并发执行(总时间≈ max(1, 2))===");
// 并发:同时启动两个任务(Promise 立刻开始执行)
const p1 = say("Alice", 1);
// 并发:同时启动第二个任务
const p2 = say("Bob", 2);
// 等待两个任务都完成
await p1;
// 等待第二个任务完成(如果已经完成会立刻返回)
await p2;
}
// 程序入口
main().catch((err) => console.error(err));5.2 asyncio.gather:一次等待多个任务(最常用) #
asyncio.gather() 会并发执行多个协程/任务,并把返回值收集成列表(按传入顺序对应)。
# 导入 asyncio
import asyncio
# 定义协程:返回一个字符串
async def work(name, delay):
# 等待 delay 秒
await asyncio.sleep(delay)
# 返回结果
return f"{name} finished"
# 主协程
async def main():
# 并发执行三个协程,并收集返回值
results = await asyncio.gather(
work("A", 1),
work("B", 0.2),
work("C", 0.5),
)
# 打印结果列表
print("results =", results)
# 程序入口
if __name__ == "__main__":
asyncio.run(main())// 定义 sleep:模拟 I/O
function sleep(ms) {
// 返回 Promise,在 ms 毫秒后完成
return new Promise((resolve) => setTimeout(resolve, ms));
}
// 定义 async 函数:返回一个字符串
async function work(name, delaySeconds) {
// 等待 delaySeconds 秒
await sleep(delaySeconds * 1000);
// 返回结果
return `${name} finished`;
}
// 主函数
async function main() {
// 并发执行三个任务,并收集返回值(按传入顺序)
const results = await Promise.all([
work("A", 1),
work("B", 0.2),
work("C", 0.5),
]);
// 打印结果列表
console.log("results =", results);
}
// 程序入口
main().catch((err) => console.error(err));6. 常用模式:生产者-消费者(Queue) #
当你需要把“产生任务”和“处理任务”解耦时,asyncio.Queue 是最经典的方式。
生产者把数据放进队列,消费者从队列取数据处理;队列还能做限流(maxsize)。
# 导入 asyncio
import asyncio
# 定义生产者协程:往队列里放入若干个任务
async def producer(queue, count):
# 循环生成 count 个任务
for i in range(count):
# 构造任务内容
item = f"task-{i}"
# 把任务放入队列(如果队列满,会在这里等待)
await queue.put(item)
# 打印日志
print("produce ->", item)
# 模拟生产耗时(非阻塞)
await asyncio.sleep(0.1)
# 放入结束信号 None,告诉消费者“没有新任务了”
await queue.put(None)
# 定义消费者协程:从队列取任务并处理
async def consumer(queue):
# 无限循环处理任务
while True:
# 从队列中取出一个任务(如果队列空,会在这里等待)
item = await queue.get()
# 如果遇到结束信号,就退出
if item is None:
# 标记这个 None 也处理完毕(否则 join 会卡住)
queue.task_done()
# 退出循环
break
# 模拟处理耗时(非阻塞)
await asyncio.sleep(0.2)
# 打印处理结果
print("consume <-", item)
# 告诉队列:这个任务处理完成
queue.task_done()
# 主协程
async def main():
# 创建一个队列,最大长度 3(用于演示背压/限流)
queue = asyncio.Queue(maxsize=3)
# 创建生产者任务
producer_task = asyncio.create_task(producer(queue, count=8))
# 创建消费者任务
consumer_task = asyncio.create_task(consumer(queue))
# 等待生产者结束
await producer_task
# 等待队列中所有任务都被标记为完成
await queue.join()
# 等待消费者结束
await consumer_task
# 打印结束信息
print("all done")
# 程序入口
if __name__ == "__main__":
asyncio.run(main())// 定义 sleep:模拟非阻塞等待
function sleep(ms) {
// 返回 Promise
return new Promise((resolve) => setTimeout(resolve, ms));
}
// 定义一个简单的异步队列(类似 asyncio.Queue)
class AsyncQueue {
// 构造函数:可以设置最大容量(用于演示背压)
constructor(maxsize = Infinity) {
// 保存最大容量
this.maxsize = maxsize;
// 实际存储队列元素
this.items = [];
// 等待取元素的消费者(resolve 函数队列)
this.waitingGets = [];
// 等待放入元素的生产者(resolve 函数队列)
this.waitingPuts = [];
// 未完成任务计数(用于 join)
this.unfinishedTasks = 0;
// join 等待者
this.waitingJoins = [];
}
// 放入元素:满了就等待
async put(item) {
// 如果有消费者在等 get,直接把 item 交给它
if (this.waitingGets.length > 0) {
// 取出一个等待的消费者
const resolveGet = this.waitingGets.shift();
// 交付 item
resolveGet(item);
// 增加未完成任务
this.unfinishedTasks += 1;
// 返回
return;
}
// 如果队列已满,则等待空间
while (this.items.length >= this.maxsize) {
// 等待直到有人取走元素
await new Promise((resolve) => this.waitingPuts.push(resolve));
}
// 放入队列
this.items.push(item);
// 增加未完成任务
this.unfinishedTasks += 1;
}
// 取出元素:空了就等待
async get() {
// 如果队列里有元素,直接取
if (this.items.length > 0) {
// 取出队头
const item = this.items.shift();
// 如果有生产者在等 put(因为满了),通知一个生产者继续
if (this.waitingPuts.length > 0) {
// 唤醒一个等待 put 的生产者
const resolvePut = this.waitingPuts.shift();
// 通知可继续 put
resolvePut();
}
// 返回 item
return item;
}
// 队列为空:等待生产者 put
return await new Promise((resolve) => this.waitingGets.push(resolve));
}
// 标记一个任务完成(类似 queue.task_done)
taskDone() {
// 未完成任务减 1
this.unfinishedTasks -= 1;
// 如果所有任务都完成,则唤醒 join 等待者
if (this.unfinishedTasks === 0) {
// 依次唤醒所有 join 等待者
while (this.waitingJoins.length > 0) {
// 取出一个等待者
const resolveJoin = this.waitingJoins.shift();
// 唤醒
resolveJoin();
}
}
}
// 等待队列中所有任务完成(类似 queue.join)
async join() {
// 如果已经没有未完成任务,直接返回
if (this.unfinishedTasks === 0) {
// 返回
return;
}
// 否则等待
await new Promise((resolve) => this.waitingJoins.push(resolve));
}
}
// 生产者:往队列里放入若干个任务
async function producer(queue, count) {
// 循环生成 count 个任务
for (let i = 0; i < count; i += 1) {
// 构造任务内容
const item = `task-${i}`;
// 放入队列(满了会等待)
await queue.put(item);
// 打印日志
console.log("produce ->", item);
// 模拟生产耗时
await sleep(100);
}
// 放入结束信号 null(对应 Python 的 None)
await queue.put(null);
}
// 消费者:从队列取任务并处理
async function consumer(queue) {
// 无限循环
while (true) {
// 取一个任务(空了会等待)
const item = await queue.get();
// 如果遇到结束信号就退出
if (item === null) {
// 标记这个结束信号也处理完成
queue.taskDone();
// 退出循环
break;
}
// 模拟处理耗时
await sleep(200);
// 打印处理结果
console.log("consume <-", item);
// 标记完成
queue.taskDone();
}
}
// 主函数
async function main() {
// 创建队列,最大长度 3
const queue = new AsyncQueue(3);
// 启动生产者
const p = producer(queue, 8);
// 启动消费者
const c = consumer(queue);
// 等待生产者结束
await p;
// 等待队列中所有任务都完成
await queue.join();
// 等待消费者结束
await c;
// 打印结束信息
console.log("all done");
}
// 程序入口
main().catch((err) => console.error(err));7. 让程序更“稳”:超时、取消、错误处理 #
新手最容易写出“跑着跑着卡住”或“异常直接炸掉”的异步代码,所以这部分非常关键。
7.1 超时:asyncio.wait_for #
当你不希望某个任务无限等待时,用 asyncio.wait_for() 包一层超时控制。
# 导入 asyncio
import asyncio
# 定义一个很慢的任务
async def slow_job():
# 模拟很慢的 I/O
await asyncio.sleep(5)
# 返回结果
return "ok"
# 主协程
async def main():
try:
# 等待 slow_job 最多 1 秒,超时会抛出 TimeoutError
result = await asyncio.wait_for(slow_job(), timeout=1)
# 打印结果
print("result =", result)
except asyncio.TimeoutError:
# 捕获超时异常
print("timeout!")
# 程序入口
if __name__ == "__main__":
asyncio.run(main())// 定义 sleep:模拟 I/O
function sleep(ms) {
// 返回 Promise
return new Promise((resolve) => setTimeout(resolve, ms));
}
// 定义一个很慢的任务
async function slowJob() {
// 模拟很慢的 I/O
await sleep(5000);
// 返回结果
return "ok";
}
// 给 Promise 加超时控制(类似 asyncio.wait_for)
function withTimeout(promise, timeoutMs) {
// 创建一个会超时 reject 的 Promise
const timeoutPromise = new Promise((_, reject) => {
// 设置定时器
const id = setTimeout(() => {
// 超时后抛出错误
reject(new Error("TimeoutError"));
}, timeoutMs);
// 防止 Node 进程因为这个 timer 不退出(可选)
id.unref?.();
});
// 谁先完成就用谁(相当于 race)
return Promise.race([promise, timeoutPromise]);
}
// 主函数
async function main() {
try {
// 等待 slowJob 最多 1 秒
const result = await withTimeout(slowJob(), 1000);
// 打印结果
console.log("result =", result);
} catch (err) {
// 超时或其他错误
console.log("timeout!");
}
}
// 程序入口
main().catch((err) => console.error(err));7.2 取消:task.cancel 与 CancelledError #
你可以主动取消一个 Task。被取消的协程通常会在下一次 await 时收到 CancelledError。
# 导入 asyncio
import asyncio
# 定义一个会循环运行的任务
async def ticker():
try:
# 无限循环
while True:
# 打印心跳
print("tick")
# 等待 0.3 秒
await asyncio.sleep(0.3)
except asyncio.CancelledError:
# 收到取消信号时做清理工作
print("ticker cancelled, cleanup...")
# 重新抛出异常,让取消状态正确传播
raise
# 主协程
async def main():
# 创建任务
task = asyncio.create_task(ticker())
# 让任务先跑一会儿
await asyncio.sleep(1)
# 发出取消信号
task.cancel()
try:
# 等待任务结束(会抛出 CancelledError)
await task
except asyncio.CancelledError:
# 确认取消成功
print("main: cancelled confirmed")
# 程序入口
if __name__ == "__main__":
asyncio.run(main())// 定义 sleep:支持取消(用 AbortController)
function sleep(ms, signal) {
// 返回一个可取消的 Promise
return new Promise((resolve, reject) => {
// 如果已经取消,直接拒绝
if (signal?.aborted) {
// 抛出取消错误
reject(new Error("CancelledError"));
// 返回
return;
}
// 设置定时器
const id = setTimeout(() => {
// 移除监听
signal?.removeEventListener("abort", onAbort);
// 正常完成
resolve();
}, ms);
// 取消处理函数
function onAbort() {
// 清理定时器
clearTimeout(id);
// 抛出取消错误
reject(new Error("CancelledError"));
}
// 监听取消事件
signal?.addEventListener("abort", onAbort, { once: true });
});
}
// 定义一个会循环运行的任务
async function ticker(signal) {
try {
// 无限循环
while (true) {
// 打印心跳
console.log("tick");
// 等待 0.3 秒(可取消)
await sleep(300, signal);
}
} catch (err) {
// 如果是取消导致的错误
if (err && err.message === "CancelledError") {
// 做清理工作
console.log("ticker cancelled, cleanup...");
// 继续抛出,让调用方知道是取消
throw err;
}
// 其他错误直接抛出
throw err;
}
}
// 主函数
async function main() {
// 创建取消控制器
const controller = new AbortController();
// 启动 ticker(得到一个 Promise)
const task = ticker(controller.signal);
// 让任务先跑一会儿
await sleep(1000);
// 发出取消信号
controller.abort();
try {
// 等待任务结束(会抛出 CancelledError)
await task;
} catch (err) {
// 确认取消成功
if (err && err.message === "CancelledError") {
console.log("main: cancelled confirmed");
return;
}
// 其他错误打印
console.error(err);
}
}
// 程序入口
main().catch((err) => console.error(err));7.3 错误处理:gather(return_exceptions=True) #
如果你并发执行很多任务,通常希望“某个任务失败了不要影响其他任务”,可以用 return_exceptions=True 收集异常。
# 导入 asyncio
import asyncio
# 定义一个可能失败的任务
async def maybe_fail(i):
# 等待一点时间
await asyncio.sleep(0.2)
# 让某些任务故意失败
if i % 2 == 0:
# 抛出异常
raise ValueError(f"bad i={i}")
# 返回成功结果
return f"ok i={i}"
# 主协程
async def main():
# 并发执行多个任务,并把异常当作结果返回
results = await asyncio.gather(
*(maybe_fail(i) for i in range(6)),
return_exceptions=True,
)
# 遍历结果
for r in results:
# 如果是异常对象
if isinstance(r, Exception):
# 打印异常
print("error ->", repr(r))
else:
# 打印成功结果
print("ok ->", r)
# 程序入口
if __name__ == "__main__":
asyncio.run(main())// 定义 sleep:模拟 I/O
function sleep(ms) {
// 返回 Promise
return new Promise((resolve) => setTimeout(resolve, ms));
}
// 定义一个可能失败的任务
async function maybeFail(i) {
// 等待一点时间
await sleep(200);
// 让某些任务故意失败
if (i % 2 === 0) {
// 抛出异常
throw new Error(`bad i=${i}`);
}
// 返回成功结果
return `ok i=${i}`;
}
// 主函数
async function main() {
// 并发执行多个任务,并把成功/失败都收集起来(类似 return_exceptions=True)
const results = await Promise.allSettled(
Array.from({ length: 6 }, (_, i) => maybeFail(i))
);
// 遍历结果
for (const r of results) {
// 如果成功
if (r.status === "fulfilled") {
// 打印成功结果
console.log("ok ->", r.value);
} else {
// 打印异常(reason)
console.log("error ->", String(r.reason));
}
}
}
// 程序入口
main().catch((err) => console.error(err));8. 最佳实践 #
8.1 不要阻塞事件循环(最重要) #
如果你在协程里用了 time.sleep(),整个事件循环就卡住了,所有异步并发都失效。
# 导入 asyncio
import asyncio
# 导入 time(用于演示错误用法)
import time
# 错误示例:阻塞事件循环
async def bad():
# 这行会阻塞整个事件循环(不要在协程里这样写)
time.sleep(1)
# 打印结束
print("bad done")
# 正确示例:非阻塞等待
async def good():
# 非阻塞等待 1 秒
await asyncio.sleep(1)
# 打印结束
print("good done")
# 主协程
async def main():
# 先跑错误示例
await bad()
# 再跑正确示例
await good()
# 程序入口
if __name__ == "__main__":
asyncio.run(main())// 定义 sleep:非阻塞等待
function sleep(ms) {
// 返回 Promise
return new Promise((resolve) => setTimeout(resolve, ms));
}
// 定义一个同步阻塞函数:用 while 循环卡住事件循环(演示用)
function blockForMs(ms) {
// 记录开始时间
const start = Date.now();
// 一直忙等直到时间到(这会阻塞 Node 事件循环)
while (Date.now() - start < ms) {
// 什么都不做,纯忙等
}
}
// 错误示例:阻塞事件循环
async function bad() {
// 这行会阻塞整个事件循环(不要在 async 函数里这样写)
blockForMs(1000);
// 打印结束
console.log("bad done");
}
// 正确示例:非阻塞等待
async function good() {
// 非阻塞等待 1 秒
await sleep(1000);
// 打印结束
console.log("good done");
}
// 主函数
async function main() {
// 先跑错误示例
await bad();
// 再跑正确示例
await good();
}
// 程序入口
main().catch((err) => console.error(err));8.2 什么时候用 create_task,什么时候直接 await? #
- 直接
await:你要“现在就等它做完”,流程更直观 create_task():你希望它“后台并发跑”,你稍后再等待它/取消它/收集它结果
9. 总结 #
- asyncio 适合 I/O 密集型并发:等待网络/磁盘/数据库时,让 CPU 去执行别的协程
- 核心三件套:协程(
async def)+await(让出控制权)+ 事件循环(调度执行) - 并发的关键工具:
create_task/gather/Queue - 稳定性关键:超时(
wait_for)、取消(cancel)、并发错误收集(gather(return_exceptions=True))