导航菜单

  • 1.vector
  • 2.milvus
  • 3.pymilvus
  • 4.rag
  • 5.rag_measure
  • ragflow
  • heapq
  • HNSW
  • cosine_similarity
  • math
  • typing
  • etcd
  • minio
  • collections
  • jieba
  • random
  • beautifulsoup4
  • chromadb
  • sentence_transformers
  • numpy
  • lxml
  • openpyxl
  • PyMuPDF
  • python-docx
  • requests
  • python-pptx
  • text_splitter
  • all-MiniLM-L6-v2
  • openai
  • llm
  • BPETokenizer
  • Flask
  • RAGAS
  • BagofWords
  • langchain
  • Pydantic
  • abc
  • faiss
  • MMR
  • scikit-learn
  • Runnable
  • PromptEngineering
  • dataclasses
  • LaTeX
  • rank_bm25
  • TF-IDF
  • asyncio
  • sqlalchemy
  • fastapi
  • Starlette
  • uvicorn
  • argparse
  • Generic
  • ssl
  • urllib
  • python-dotenv
  • RRF
  • CrossEncoder
  • Lost-in-the-middle
  • Jinja2
  • logger
  • io
  • venv
  • concurrent
  • parameter
  • SSE
  • 1. asyncio 是什么?它解决什么问题?
  • 2. 前置知识
    • 2.1 并发 vs 并行(不要混淆)
    • 2.2 I/O 密集 vs CPU 密集(决定你该不该用 asyncio)
    • 2.3 什么叫“阻塞”?
  • 3. 认识 async/await 与事件循环
  • 4. 协程(Coroutine):它到底是什么?
    • 4.1 协程的返回值
  • 5. 任务(Task):让多个协程“并发”起来
    • 5.1 串行 vs 并发
    • 5.2 asyncio.gather:一次等待多个任务(最常用)
  • 6. 常用模式:生产者-消费者(Queue)
  • 7. 让程序更“稳”:超时、取消、错误处理
    • 7.1 超时:asyncio.wait_for
    • 7.2 取消:task.cancel 与 CancelledError
    • 7.3 错误处理:gather(return_exceptions=True)
  • 8. 最佳实践
    • 8.1 不要阻塞事件循环(最重要)
    • 8.2 什么时候用 create_task,什么时候直接 await?
  • 9. 总结

1. asyncio 是什么?它解决什么问题? #

asyncio 是 Python 标准库中用于编写并发 I/O 程序的一套工具(常见 I/O:网络请求、磁盘读写、等待数据库响应等)。
它不是“多线程并行执行”,而是通过 事件循环(Event Loop) 在一个线程里调度多个任务:当某个任务在等待 I/O 时,CPU 不空转,而是去执行别的任务,从而提升整体吞吐量。

你可以把它理解成:

  • 协程:可暂停/可恢复的函数(像“会让出控制权的函数”)
  • 事件循环:一个调度器,负责决定“现在让哪个协程继续跑”
  • await:显式告诉调度器“我这里要等一会儿,你先去跑别的协程”

2. 前置知识 #

2.1 并发 vs 并行(不要混淆) #

  • 并行(Parallel):多个任务同时在多个 CPU 核心上执行(同一时刻真正在跑多个)
  • 并发(Concurrent):多个任务在同一段时间内交错推进(同一时刻可能只跑一个,但切换很快)

asyncio 的强项是 并发 I/O:让“等待 I/O 的时间”被别的任务利用起来。

2.2 I/O 密集 vs CPU 密集(决定你该不该用 asyncio) #

  • I/O 密集:大部分时间都在等外部(网络/磁盘/数据库),适合 asyncio
  • CPU 密集:大部分时间在算数/推理/图像处理,asyncio 帮助不大(通常用多进程或把计算下放到线程池/进程池)

2.3 什么叫“阻塞”? #

如果你在协程里调用了一个会长时间卡住的同步函数(比如 time.sleep(5)、某些同步网络请求),事件循环就被卡死了,其他协程也跑不了,这就是“阻塞事件循环”。

3. 认识 async/await 与事件循环 #

先用一个最短脚本建立直觉:await asyncio.sleep() 表示“我在等”,此时事件循环可以去跑别的任务。

# 导入 asyncio:Python 标准库中的异步框架
import asyncio


# 定义一个协程函数:使用 async def
async def hello_world():
    # 打印第一句话
    print("Hello")
    # 等待 1 秒(非阻塞地等待,会把控制权交还给事件循环)
    await asyncio.sleep(1)
    # 打印第二句话
    print("World")


# 定义程序入口协程
async def main():
    # 调用并等待协程执行完成
    await hello_world()


# 运行事件循环并执行 main 协程(Python 3.7+ 推荐写法)
if __name__ == "__main__":
    asyncio.run(main())
// 定义一个 sleep 函数:返回一个在 ms 毫秒后完成的 Promise
function sleep(ms) {
  // 返回 Promise,用 setTimeout 在未来某个时刻 resolve
  return new Promise((resolve) => setTimeout(resolve, ms));
}

// 定义一个 async 函数:相当于 Python 的 async def 协程
async function helloWorld() {
  // 打印第一句话
  console.log("Hello");
  // 等待 1 秒(非阻塞:把控制权交还给事件循环)
  await sleep(1000);
  // 打印第二句话
  console.log("World");
}

// 定义主函数:相当于 Python 的 main 协程
async function main() {
  // 调用并等待 helloWorld 执行完成
  await helloWorld();
}

// Node.js 程序入口
// 调用 main,并在出现未捕获异常时打印错误
main().catch((err) => console.error(err));

4. 协程(Coroutine):它到底是什么? #

协程可以理解为“可暂停的函数”。
当协程执行到 await 时,会暂停自己并把控制权交还给事件循环;等到被等待的事情完成后,再从暂停点继续往下执行。

4.1 协程的返回值 #

协程函数可以 return 一个值,调用方用 await 拿到返回值。

# 导入 asyncio
import asyncio


# 定义一个协程:模拟 I/O 后返回结果
async def fetch_data():
    # 模拟等待 0.5 秒
    await asyncio.sleep(0.5)
    # 返回结果字符串
    return "data"


# 主协程
async def main():
    # 等待协程并拿到返回值
    result = await fetch_data()
    # 打印返回值
    print("result =", result)


# 程序入口
if __name__ == "__main__":
    asyncio.run(main())
// 定义 sleep:模拟 I/O 等待
function sleep(ms) {
  // 返回 Promise,等待 ms 毫秒后完成
  return new Promise((resolve) => setTimeout(resolve, ms));
}

// 定义 async 函数:模拟 I/O 后返回结果
async function fetchData() {
  // 等待 0.5 秒
  await sleep(500);
  // 返回结果字符串
  return "data";
}

// 主函数
async function main() {
  // 等待 fetchData 并拿到返回值
  const result = await fetchData();
  // 打印返回值
  console.log("result =", result);
}

// 程序入口
main().catch((err) => console.error(err));

5. 任务(Task):让多个协程“并发”起来 #

仅仅 await 两个协程,默认是串行:第一个完成后才执行第二个。
要并发执行,你需要把协程包装成 Task 并交给事件循环调度。

5.1 串行 vs 并发 #

# 导入 asyncio
import asyncio


# 定义一个协程:等待 delay 秒后打印
async def say(name, delay):
    # 等待 delay 秒(非阻塞)
    await asyncio.sleep(delay)
    # 打印输出
    print(f"{name} done (delay={delay})")


# 主协程:演示串行与并发的区别
async def main():
    # 打印分隔线
    print("=== 串行执行(总时间≈ 1 + 2)===")
    # 串行:先等 1 秒的任务完成
    await say("Alice", 1)
    # 串行:再等 2 秒的任务完成
    await say("Bob", 2)

    # 打印分隔线
    print("=== 并发执行(总时间≈ max(1, 2))===")
    # 创建任务:把协程交给事件循环调度
    task1 = asyncio.create_task(say("Alice", 1))
    # 创建任务:把协程交给事件循环调度
    task2 = asyncio.create_task(say("Bob", 2))
    # 等待两个任务都完成
    await task1
    # 等待第二个任务完成(如果已经完成会立刻返回)
    await task2


# 程序入口
if __name__ == "__main__":
    asyncio.run(main())
// 定义 sleep:模拟非阻塞等待
function sleep(ms) {
  // 返回 Promise,让事件循环在等待期间去做别的事
  return new Promise((resolve) => setTimeout(resolve, ms));
}

// 定义 async 函数:等待 delay 秒后打印
async function say(name, delaySeconds) {
  // 等待 delaySeconds 秒
  await sleep(delaySeconds * 1000);
  // 打印输出
  console.log(`${name} done (delay=${delaySeconds})`);
}

// 主函数:演示串行与并发的区别
async function main() {
  // 打印分隔线
  console.log("=== 串行执行(总时间≈ 1 + 2)===");
  // 串行:先等 1 秒的任务完成
  await say("Alice", 1);
  // 串行:再等 2 秒的任务完成
  await say("Bob", 2);

  // 打印分隔线
  console.log("=== 并发执行(总时间≈ max(1, 2))===");
  // 并发:同时启动两个任务(Promise 立刻开始执行)
  const p1 = say("Alice", 1);
  // 并发:同时启动第二个任务
  const p2 = say("Bob", 2);
  // 等待两个任务都完成
  await p1;
  // 等待第二个任务完成(如果已经完成会立刻返回)
  await p2;
}

// 程序入口
main().catch((err) => console.error(err));

5.2 asyncio.gather:一次等待多个任务(最常用) #

asyncio.gather() 会并发执行多个协程/任务,并把返回值收集成列表(按传入顺序对应)。

# 导入 asyncio
import asyncio


# 定义协程:返回一个字符串
async def work(name, delay):
    # 等待 delay 秒
    await asyncio.sleep(delay)
    # 返回结果
    return f"{name} finished"


# 主协程
async def main():
    # 并发执行三个协程,并收集返回值
    results = await asyncio.gather(
        work("A", 1),
        work("B", 0.2),
        work("C", 0.5),
    )
    # 打印结果列表
    print("results =", results)


# 程序入口
if __name__ == "__main__":
    asyncio.run(main())
// 定义 sleep:模拟 I/O
function sleep(ms) {
  // 返回 Promise,在 ms 毫秒后完成
  return new Promise((resolve) => setTimeout(resolve, ms));
}

// 定义 async 函数:返回一个字符串
async function work(name, delaySeconds) {
  // 等待 delaySeconds 秒
  await sleep(delaySeconds * 1000);
  // 返回结果
  return `${name} finished`;
}

// 主函数
async function main() {
  // 并发执行三个任务,并收集返回值(按传入顺序)
  const results = await Promise.all([
    work("A", 1),
    work("B", 0.2),
    work("C", 0.5),
  ]);
  // 打印结果列表
  console.log("results =", results);
}

// 程序入口
main().catch((err) => console.error(err));

6. 常用模式:生产者-消费者(Queue) #

当你需要把“产生任务”和“处理任务”解耦时,asyncio.Queue 是最经典的方式。
生产者把数据放进队列,消费者从队列取数据处理;队列还能做限流(maxsize)。

# 导入 asyncio
import asyncio


# 定义生产者协程:往队列里放入若干个任务
async def producer(queue, count):
    # 循环生成 count 个任务
    for i in range(count):
        # 构造任务内容
        item = f"task-{i}"
        # 把任务放入队列(如果队列满,会在这里等待)
        await queue.put(item)
        # 打印日志
        print("produce ->", item)
        # 模拟生产耗时(非阻塞)
        await asyncio.sleep(0.1)
    # 放入结束信号 None,告诉消费者“没有新任务了”
    await queue.put(None)


# 定义消费者协程:从队列取任务并处理
async def consumer(queue):
    # 无限循环处理任务
    while True:
        # 从队列中取出一个任务(如果队列空,会在这里等待)
        item = await queue.get()
        # 如果遇到结束信号,就退出
        if item is None:
            # 标记这个 None 也处理完毕(否则 join 会卡住)
            queue.task_done()
            # 退出循环
            break
        # 模拟处理耗时(非阻塞)
        await asyncio.sleep(0.2)
        # 打印处理结果
        print("consume <-", item)
        # 告诉队列:这个任务处理完成
        queue.task_done()


# 主协程
async def main():
    # 创建一个队列,最大长度 3(用于演示背压/限流)
    queue = asyncio.Queue(maxsize=3)
    # 创建生产者任务
    producer_task = asyncio.create_task(producer(queue, count=8))
    # 创建消费者任务
    consumer_task = asyncio.create_task(consumer(queue))
    # 等待生产者结束
    await producer_task
    # 等待队列中所有任务都被标记为完成
    await queue.join()
    # 等待消费者结束
    await consumer_task
    # 打印结束信息
    print("all done")


# 程序入口
if __name__ == "__main__":
    asyncio.run(main())
// 定义 sleep:模拟非阻塞等待
function sleep(ms) {
  // 返回 Promise
  return new Promise((resolve) => setTimeout(resolve, ms));
}

// 定义一个简单的异步队列(类似 asyncio.Queue)
class AsyncQueue {
  // 构造函数:可以设置最大容量(用于演示背压)
  constructor(maxsize = Infinity) {
    // 保存最大容量
    this.maxsize = maxsize;
    // 实际存储队列元素
    this.items = [];
    // 等待取元素的消费者(resolve 函数队列)
    this.waitingGets = [];
    // 等待放入元素的生产者(resolve 函数队列)
    this.waitingPuts = [];
    // 未完成任务计数(用于 join)
    this.unfinishedTasks = 0;
    // join 等待者
    this.waitingJoins = [];
  }

  // 放入元素:满了就等待
  async put(item) {
    // 如果有消费者在等 get,直接把 item 交给它
    if (this.waitingGets.length > 0) {
      // 取出一个等待的消费者
      const resolveGet = this.waitingGets.shift();
      // 交付 item
      resolveGet(item);
      // 增加未完成任务
      this.unfinishedTasks += 1;
      // 返回
      return;
    }

    // 如果队列已满,则等待空间
    while (this.items.length >= this.maxsize) {
      // 等待直到有人取走元素
      await new Promise((resolve) => this.waitingPuts.push(resolve));
    }

    // 放入队列
    this.items.push(item);
    // 增加未完成任务
    this.unfinishedTasks += 1;
  }

  // 取出元素:空了就等待
  async get() {
    // 如果队列里有元素,直接取
    if (this.items.length > 0) {
      // 取出队头
      const item = this.items.shift();
      // 如果有生产者在等 put(因为满了),通知一个生产者继续
      if (this.waitingPuts.length > 0) {
        // 唤醒一个等待 put 的生产者
        const resolvePut = this.waitingPuts.shift();
        // 通知可继续 put
        resolvePut();
      }
      // 返回 item
      return item;
    }

    // 队列为空:等待生产者 put
    return await new Promise((resolve) => this.waitingGets.push(resolve));
  }

  // 标记一个任务完成(类似 queue.task_done)
  taskDone() {
    // 未完成任务减 1
    this.unfinishedTasks -= 1;
    // 如果所有任务都完成,则唤醒 join 等待者
    if (this.unfinishedTasks === 0) {
      // 依次唤醒所有 join 等待者
      while (this.waitingJoins.length > 0) {
        // 取出一个等待者
        const resolveJoin = this.waitingJoins.shift();
        // 唤醒
        resolveJoin();
      }
    }
  }

  // 等待队列中所有任务完成(类似 queue.join)
  async join() {
    // 如果已经没有未完成任务,直接返回
    if (this.unfinishedTasks === 0) {
      // 返回
      return;
    }
    // 否则等待
    await new Promise((resolve) => this.waitingJoins.push(resolve));
  }
}

// 生产者:往队列里放入若干个任务
async function producer(queue, count) {
  // 循环生成 count 个任务
  for (let i = 0; i < count; i += 1) {
    // 构造任务内容
    const item = `task-${i}`;
    // 放入队列(满了会等待)
    await queue.put(item);
    // 打印日志
    console.log("produce ->", item);
    // 模拟生产耗时
    await sleep(100);
  }
  // 放入结束信号 null(对应 Python 的 None)
  await queue.put(null);
}

// 消费者:从队列取任务并处理
async function consumer(queue) {
  // 无限循环
  while (true) {
    // 取一个任务(空了会等待)
    const item = await queue.get();
    // 如果遇到结束信号就退出
    if (item === null) {
      // 标记这个结束信号也处理完成
      queue.taskDone();
      // 退出循环
      break;
    }
    // 模拟处理耗时
    await sleep(200);
    // 打印处理结果
    console.log("consume <-", item);
    // 标记完成
    queue.taskDone();
  }
}

// 主函数
async function main() {
  // 创建队列,最大长度 3
  const queue = new AsyncQueue(3);
  // 启动生产者
  const p = producer(queue, 8);
  // 启动消费者
  const c = consumer(queue);
  // 等待生产者结束
  await p;
  // 等待队列中所有任务都完成
  await queue.join();
  // 等待消费者结束
  await c;
  // 打印结束信息
  console.log("all done");
}

// 程序入口
main().catch((err) => console.error(err));

7. 让程序更“稳”:超时、取消、错误处理 #

新手最容易写出“跑着跑着卡住”或“异常直接炸掉”的异步代码,所以这部分非常关键。

7.1 超时:asyncio.wait_for #

当你不希望某个任务无限等待时,用 asyncio.wait_for() 包一层超时控制。

# 导入 asyncio
import asyncio


# 定义一个很慢的任务
async def slow_job():
    # 模拟很慢的 I/O
    await asyncio.sleep(5)
    # 返回结果
    return "ok"


# 主协程
async def main():
    try:
        # 等待 slow_job 最多 1 秒,超时会抛出 TimeoutError
        result = await asyncio.wait_for(slow_job(), timeout=1)
        # 打印结果
        print("result =", result)
    except asyncio.TimeoutError:
        # 捕获超时异常
        print("timeout!")


# 程序入口
if __name__ == "__main__":
    asyncio.run(main())
// 定义 sleep:模拟 I/O
function sleep(ms) {
  // 返回 Promise
  return new Promise((resolve) => setTimeout(resolve, ms));
}

// 定义一个很慢的任务
async function slowJob() {
  // 模拟很慢的 I/O
  await sleep(5000);
  // 返回结果
  return "ok";
}

// 给 Promise 加超时控制(类似 asyncio.wait_for)
function withTimeout(promise, timeoutMs) {
  // 创建一个会超时 reject 的 Promise
  const timeoutPromise = new Promise((_, reject) => {
    // 设置定时器
    const id = setTimeout(() => {
      // 超时后抛出错误
      reject(new Error("TimeoutError"));
    }, timeoutMs);
    // 防止 Node 进程因为这个 timer 不退出(可选)
    id.unref?.();
  });
  // 谁先完成就用谁(相当于 race)
  return Promise.race([promise, timeoutPromise]);
}

// 主函数
async function main() {
  try {
    // 等待 slowJob 最多 1 秒
    const result = await withTimeout(slowJob(), 1000);
    // 打印结果
    console.log("result =", result);
  } catch (err) {
    // 超时或其他错误
    console.log("timeout!");
  }
}

// 程序入口
main().catch((err) => console.error(err));

7.2 取消:task.cancel 与 CancelledError #

你可以主动取消一个 Task。被取消的协程通常会在下一次 await 时收到 CancelledError。

# 导入 asyncio
import asyncio


# 定义一个会循环运行的任务
async def ticker():
    try:
        # 无限循环
        while True:
            # 打印心跳
            print("tick")
            # 等待 0.3 秒
            await asyncio.sleep(0.3)
    except asyncio.CancelledError:
        # 收到取消信号时做清理工作
        print("ticker cancelled, cleanup...")
        # 重新抛出异常,让取消状态正确传播
        raise


# 主协程
async def main():
    # 创建任务
    task = asyncio.create_task(ticker())
    # 让任务先跑一会儿
    await asyncio.sleep(1)
    # 发出取消信号
    task.cancel()
    try:
        # 等待任务结束(会抛出 CancelledError)
        await task
    except asyncio.CancelledError:
        # 确认取消成功
        print("main: cancelled confirmed")


# 程序入口
if __name__ == "__main__":
    asyncio.run(main())
// 定义 sleep:支持取消(用 AbortController)
function sleep(ms, signal) {
  // 返回一个可取消的 Promise
  return new Promise((resolve, reject) => {
    // 如果已经取消,直接拒绝
    if (signal?.aborted) {
      // 抛出取消错误
      reject(new Error("CancelledError"));
      // 返回
      return;
    }
    // 设置定时器
    const id = setTimeout(() => {
      // 移除监听
      signal?.removeEventListener("abort", onAbort);
      // 正常完成
      resolve();
    }, ms);
    // 取消处理函数
    function onAbort() {
      // 清理定时器
      clearTimeout(id);
      // 抛出取消错误
      reject(new Error("CancelledError"));
    }
    // 监听取消事件
    signal?.addEventListener("abort", onAbort, { once: true });
  });
}

// 定义一个会循环运行的任务
async function ticker(signal) {
  try {
    // 无限循环
    while (true) {
      // 打印心跳
      console.log("tick");
      // 等待 0.3 秒(可取消)
      await sleep(300, signal);
    }
  } catch (err) {
    // 如果是取消导致的错误
    if (err && err.message === "CancelledError") {
      // 做清理工作
      console.log("ticker cancelled, cleanup...");
      // 继续抛出,让调用方知道是取消
      throw err;
    }
    // 其他错误直接抛出
    throw err;
  }
}

// 主函数
async function main() {
  // 创建取消控制器
  const controller = new AbortController();
  // 启动 ticker(得到一个 Promise)
  const task = ticker(controller.signal);
  // 让任务先跑一会儿
  await sleep(1000);
  // 发出取消信号
  controller.abort();
  try {
    // 等待任务结束(会抛出 CancelledError)
    await task;
  } catch (err) {
    // 确认取消成功
    if (err && err.message === "CancelledError") {
      console.log("main: cancelled confirmed");
      return;
    }
    // 其他错误打印
    console.error(err);
  }
}

// 程序入口
main().catch((err) => console.error(err));

7.3 错误处理:gather(return_exceptions=True) #

如果你并发执行很多任务,通常希望“某个任务失败了不要影响其他任务”,可以用 return_exceptions=True 收集异常。

# 导入 asyncio
import asyncio


# 定义一个可能失败的任务
async def maybe_fail(i):
    # 等待一点时间
    await asyncio.sleep(0.2)
    # 让某些任务故意失败
    if i % 2 == 0:
        # 抛出异常
        raise ValueError(f"bad i={i}")
    # 返回成功结果
    return f"ok i={i}"


# 主协程
async def main():
    # 并发执行多个任务,并把异常当作结果返回
    results = await asyncio.gather(
        *(maybe_fail(i) for i in range(6)),
        return_exceptions=True,
    )
    # 遍历结果
    for r in results:
        # 如果是异常对象
        if isinstance(r, Exception):
            # 打印异常
            print("error ->", repr(r))
        else:
            # 打印成功结果
            print("ok    ->", r)


# 程序入口
if __name__ == "__main__":
    asyncio.run(main())
// 定义 sleep:模拟 I/O
function sleep(ms) {
  // 返回 Promise
  return new Promise((resolve) => setTimeout(resolve, ms));
}

// 定义一个可能失败的任务
async function maybeFail(i) {
  // 等待一点时间
  await sleep(200);
  // 让某些任务故意失败
  if (i % 2 === 0) {
    // 抛出异常
    throw new Error(`bad i=${i}`);
  }
  // 返回成功结果
  return `ok i=${i}`;
}

// 主函数
async function main() {
  // 并发执行多个任务,并把成功/失败都收集起来(类似 return_exceptions=True)
  const results = await Promise.allSettled(
    Array.from({ length: 6 }, (_, i) => maybeFail(i))
  );
  // 遍历结果
  for (const r of results) {
    // 如果成功
    if (r.status === "fulfilled") {
      // 打印成功结果
      console.log("ok    ->", r.value);
    } else {
      // 打印异常(reason)
      console.log("error ->", String(r.reason));
    }
  }
}

// 程序入口
main().catch((err) => console.error(err));

8. 最佳实践 #

8.1 不要阻塞事件循环(最重要) #

如果你在协程里用了 time.sleep(),整个事件循环就卡住了,所有异步并发都失效。

# 导入 asyncio
import asyncio
# 导入 time(用于演示错误用法)
import time


# 错误示例:阻塞事件循环
async def bad():
    # 这行会阻塞整个事件循环(不要在协程里这样写)
    time.sleep(1)
    # 打印结束
    print("bad done")


# 正确示例:非阻塞等待
async def good():
    # 非阻塞等待 1 秒
    await asyncio.sleep(1)
    # 打印结束
    print("good done")


# 主协程
async def main():
    # 先跑错误示例
    await bad()
    # 再跑正确示例
    await good()


# 程序入口
if __name__ == "__main__":
    asyncio.run(main())
// 定义 sleep:非阻塞等待
function sleep(ms) {
  // 返回 Promise
  return new Promise((resolve) => setTimeout(resolve, ms));
}

// 定义一个同步阻塞函数:用 while 循环卡住事件循环(演示用)
function blockForMs(ms) {
  // 记录开始时间
  const start = Date.now();
  // 一直忙等直到时间到(这会阻塞 Node 事件循环)
  while (Date.now() - start < ms) {
    // 什么都不做,纯忙等
  }
}

// 错误示例:阻塞事件循环
async function bad() {
  // 这行会阻塞整个事件循环(不要在 async 函数里这样写)
  blockForMs(1000);
  // 打印结束
  console.log("bad done");
}

// 正确示例:非阻塞等待
async function good() {
  // 非阻塞等待 1 秒
  await sleep(1000);
  // 打印结束
  console.log("good done");
}

// 主函数
async function main() {
  // 先跑错误示例
  await bad();
  // 再跑正确示例
  await good();
}

// 程序入口
main().catch((err) => console.error(err));

8.2 什么时候用 create_task,什么时候直接 await? #

  • 直接 await:你要“现在就等它做完”,流程更直观
  • create_task():你希望它“后台并发跑”,你稍后再等待它/取消它/收集它结果

9. 总结 #

  • asyncio 适合 I/O 密集型并发:等待网络/磁盘/数据库时,让 CPU 去执行别的协程
  • 核心三件套:协程(async def)+ await(让出控制权)+ 事件循环(调度执行)
  • 并发的关键工具:create_task / gather / Queue
  • 稳定性关键:超时(wait_for)、取消(cancel)、并发错误收集(gather(return_exceptions=True))
← 上一节 argparse 下一节 BagofWords →

访问验证

请输入访问令牌

Token不正确,请重新输入