导航菜单

  • 1.vector
  • 2.milvus
  • 3.pymilvus
  • 4.rag
  • 5.rag_measure
  • ragflow
  • heapq
  • HNSW
  • cosine_similarity
  • math
  • typing
  • etcd
  • minio
  • collections
  • jieba
  • random
  • beautifulsoup4
  • chromadb
  • sentence_transformers
  • numpy
  • lxml
  • openpyxl
  • PyMuPDF
  • python-docx
  • requests
  • python-pptx
  • text_splitter
  • all-MiniLM-L6-v2
  • openai
  • llm
  • BPETokenizer
  • Flask
  • RAGAS
  • BagofWords
  • langchain
  • Pydantic
  • abc
  • faiss
  • MMR
  • scikit-learn
  • Runnable
  • PromptEngineering
  • dataclasses
  • LaTeX
  • rank_bm25
  • TF-IDF
  • asyncio
  • sqlalchemy
  • fastapi
  • Starlette
  • uvicorn
  • argparse
  • Generic
  • ssl
  • urllib
  • python-dotenv
  • RRF
  • CrossEncoder
  • Lost-in-the-middle
  • Jinja2
  • logger
  • io
  • venv
  • concurrent
  • parameter
  • SSE
  • 1. concurrent.futures
  • 2. 前置知识:什么是并发编程
    • 2.1 什么是并发
    • 2.2 为什么需要并发
    • 2.3 I/O 密集型 vs CPU 密集型
    • 2.4 Python 的 GIL(全局解释器锁)
  • 3. concurrent.futures 简介
  • 4. 核心概念
    • 4.1 Executor(执行器)
    • 4.2 Future(未来对象)
  • 5. ThreadPoolExecutor 基础用法
    • 5.1 示例:最简单的用法
    • 5.2 示例:使用 submit 方法
  • 6. ProcessPoolExecutor 基础用法
    • 6.1 示例:计算质数
  • 7. 核心方法详解
    • 7.1 submit() 方法
    • 7.2 map() 方法
  • 8. Future 对象的使用
    • 8.1 示例:查询任务状态
  • 9. 按完成顺序处理结果
    • 9.1 示例:按完成顺序获取结果
  • 10. 错误处理
    • 10.1 示例:处理任务中的异常
  • 11. 超时设置
    • 11.1 示例:设置任务超时
  • 12. 如何选择执行器
    • 12.1 判断任务类型的方法
  • 13. 设置工作线程或进程数
    • 13.1 示例:根据任务类型设置工作数
  • 14. 注意事项
    • 14.1 GIL 的限制
    • 14.2 进程间数据传递
    • 14.3 异常处理
    • 14.4 资源清理
  • 15. 完整示例:批量下载文件
  • 16. 总结

1. concurrent.futures #

concurrent.futures 是 Python 3.2+ 引入的标准库模块,提供了一种高级的异步执行接口,极大地简化了并发编程。

2. 前置知识:什么是并发编程 #

在开始学习 concurrent.futures 之前,我们需要先理解几个基本概念。

2.1 什么是并发 #

并发(Concurrency)是指在同一时间段内,程序可以处理多个任务。想象一下,如果你要下载 10 个文件,传统的方式是下载完第一个,再下载第二个,以此类推。而并发的方式是同时启动 10 个下载任务,哪个先完成就先处理哪个。

2.2 为什么需要并发 #

假设你要处理 100 个网络请求,每个请求需要 1 秒钟。如果按顺序处理,需要 100 秒。如果使用并发,同时处理 10 个请求,理论上只需要 10 秒左右,效率提升了 10 倍。

2.3 I/O 密集型 vs CPU 密集型 #

这是选择不同并发方式的关键:

  • I/O 密集型任务:大部分时间在等待(网络请求、文件读写、数据库查询)。这类任务适合使用线程。
  • CPU 密集型任务:大部分时间在计算(数学运算、图像处理、数据分析)。这类任务适合使用进程。

2.4 Python 的 GIL(全局解释器锁) #

GIL 是 Python 的一个特性,它限制了同一时刻只能有一个线程执行 Python 代码。这意味着:

  • 线程:适合 I/O 密集型任务(因为等待 I/O 时会释放 GIL)
  • 进程:适合 CPU 密集型任务(每个进程有独立的 GIL,可以真正并行)

3. concurrent.futures 简介 #

concurrent.futures 模块提供了两个核心执行器:

  • ThreadPoolExecutor:线程池执行器,适合 I/O 密集型任务
  • ProcessPoolExecutor:进程池执行器,适合 CPU 密集型任务

它们都提供了相同的接口,使用起来非常简单,你只需要关注业务逻辑,而不需要手动管理线程或进程的创建和销毁。

4. 核心概念 #

4.1 Executor(执行器) #

执行器是一个抽象概念,负责管理和执行你提交的任务。你可以把它想象成一个"任务管理器",你只需要把任务交给它,它会自动分配资源来执行。

4.2 Future(未来对象) #

当你提交一个任务后,执行器会立即返回一个 Future 对象。这个对象代表任务的"未来结果",你可以通过它来查询任务状态、获取结果或处理异常。

5. ThreadPoolExecutor 基础用法 #

ThreadPoolExecutor 使用线程池来执行任务,适合处理 I/O 密集型操作,比如网络请求、文件读写等。

5.1 示例:最简单的用法 #

下面是一个完整的示例,展示如何使用线程池同时下载多个网页:

# 导入必要的模块
from concurrent.futures import ThreadPoolExecutor
import time

# 定义一个模拟下载网页的函数
def download_url(url):
    """
    模拟下载网页的操作
    在实际应用中,这里可以使用 requests.get(url)
    """
    # 模拟网络延迟
    time.sleep(1)
    return f"{url}: 下载完成,大小 1024 字节"

# 要下载的 URL 列表
urls = [
    'http://www.example.com',
    'http://www.python.org',
    'http://www.github.com'
]

# 使用线程池执行器
# max_workers 指定最大工作线程数,这里设置为 3
with ThreadPoolExecutor(max_workers=3) as executor:
    # 使用 map 方法批量提交任务
    # map 会自动为每个 URL 创建一个任务
    results = executor.map(download_url, urls)

    # 遍历结果并打印
    for result in results:
        print(result)

# 输出:
# http://www.example.com: 下载完成,大小 1024 字节
# http://www.python.org: 下载完成,大小 1024 字节
# http://www.github.com: 下载完成,大小 1024 字节

5.2 示例:使用 submit 方法 #

submit 方法可以提交单个任务,并返回一个 Future 对象,让你有更多的控制权:

# 导入必要的模块
from concurrent.futures import ThreadPoolExecutor
import time

# 定义一个计算函数
def calculate_square(number):
    """
    计算数字的平方
    """
    time.sleep(0.5)  # 模拟计算时间
    return number ** 2

# 要计算的数字列表
numbers = [1, 2, 3, 4, 5]

# 使用线程池执行器
with ThreadPoolExecutor(max_workers=3) as executor:
    # 使用 submit 方法提交任务
    # submit 返回 Future 对象列表
    futures = [executor.submit(calculate_square, num) for num in numbers]

    # 遍历 Future 对象,获取结果
    for future in futures:
        # result() 方法会阻塞等待任务完成并返回结果
        result = future.result()
        print(f"结果: {result}")

# 输出:
# 结果: 1
# 结果: 4
# 结果: 9
# 结果: 16
# 结果: 25

6. ProcessPoolExecutor 基础用法 #

ProcessPoolExecutor 使用进程池来执行任务,适合处理 CPU 密集型操作,比如数学计算、图像处理等。

6.1 示例:计算质数 #

下面是一个完整的示例,展示如何使用进程池来判断多个大数是否为质数:

# 导入必要的模块
from concurrent.futures import ProcessPoolExecutor
import math

# 定义一个判断质数的函数
def is_prime(n):
    """
    判断一个数是否为质数
    这是一个 CPU 密集型任务
    """
    if n < 2:
        return False
    # 从 2 到 sqrt(n) 检查是否有因子
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0:
            return False
    return True

# 要检查的数字列表
numbers = [112272535095293, 112582705942171, 
           115280095190773, 115797848077099]

# 使用进程池执行器
# 如果不指定 max_workers,默认使用 CPU 核心数
with ProcessPoolExecutor() as executor:
    # 使用 map 方法批量提交任务
    results = list(executor.map(is_prime, numbers))

# 打印结果
for n, result in zip(numbers, results):
    print(f"{n} 是质数: {result}")

# 输出示例:
# 112272535095293 是质数: True
# 112582705942171 是质数: True
# 115280095190773 是质数: True
# 115797848077099 是质数: True

注意:使用 ProcessPoolExecutor 时,被执行的函数和参数必须可以被序列化(pickle),因为数据需要在进程间传递。

7. 核心方法详解 #

7.1 submit() 方法 #

submit() 方法用于提交单个任务,返回一个 Future 对象。

# 导入必要的模块
from concurrent.futures import ThreadPoolExecutor
import time

# 定义一个简单的计算函数
def power(x, n):
    """
    计算 x 的 n 次方
    """
    time.sleep(0.1)  # 模拟计算时间
    return x ** n

# 使用线程池执行器
with ThreadPoolExecutor() as executor:
    # 提交任务,返回 Future 对象
    future = executor.submit(power, 2, 3)

    # 检查任务是否正在运行
    print(f"任务是否正在运行: {future.running()}")

    # 检查任务是否已完成
    print(f"任务是否已完成: {future.done()}")

    # 获取结果(会阻塞等待任务完成)
    result = future.result()
    print(f"计算结果: {result}")

    # 获取异常(如果有的话,没有异常返回 None)
    exception = future.exception()
    print(f"异常信息: {exception}")

# 输出:
# 任务是否正在运行: True
# 任务是否已完成: False
# 计算结果: 8
# 异常信息: None

7.2 map() 方法 #

map() 方法用于批量提交任务,类似于 Python 内置的 map() 函数,但会并发执行。

# 导入必要的模块
from concurrent.futures import ThreadPoolExecutor

# 定义一个处理函数
def process_item(item):
    """
    处理单个数据项
    """
    return item * 2

# 要处理的数据列表
items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# 使用线程池执行器
with ThreadPoolExecutor(max_workers=4) as executor:
    # 使用 map 批量提交任务
    # timeout 参数设置超时时间(秒),这里设置为 5 秒
    results = executor.map(process_item, items, timeout=5)

    # 遍历结果并打印
    # 注意:结果会保持原始顺序
    for result in results:
        print(result)

# 输出:
# 2
# 4
# 6
# 8
# 10
# 12
# 14
# 16
# 18
# 20

重要提示:map() 方法返回的结果会保持原始输入的顺序,即使任务完成的顺序不同。

8. Future 对象的使用 #

Future 对象提供了丰富的方法来查询任务状态和处理结果。

8.1 示例:查询任务状态 #

# 导入必要的模块
from concurrent.futures import ThreadPoolExecutor
import time

# 定义一个耗时任务
def long_task(seconds):
    """
    模拟一个耗时任务
    """
    time.sleep(seconds)
    return f"任务完成,耗时 {seconds} 秒"

# 使用线程池执行器
with ThreadPoolExecutor() as executor:
    # 提交一个需要 2 秒的任务
    future = executor.submit(long_task, 2)

    # 在任务执行过程中,可以查询状态
    print(f"任务是否正在运行: {future.running()}")
    print(f"任务是否已完成: {future.done()}")

    # 等待任务完成并获取结果
    result = future.result()
    print(result)

    # 任务完成后再次查询状态
    print(f"任务是否正在运行: {future.running()}")
    print(f"任务是否已完成: {future.done()}")

# 输出:
# 任务是否正在运行: True
# 任务是否已完成: False
# 任务完成,耗时 2 秒
# 任务是否正在运行: False
# 任务是否已完成: True

9. 按完成顺序处理结果 #

有时候,我们不需要按照提交顺序获取结果,而是希望哪个任务先完成就先处理哪个。这时可以使用 as_completed() 函数。

9.1 示例:按完成顺序获取结果 #

# 导入必要的模块
from concurrent.futures import ThreadPoolExecutor, as_completed
import random
import time

# 定义一个随机耗时的任务
def task(n):
    """
    模拟一个随机耗时的任务
    """
    # 随机等待 0.1 到 1.0 秒
    sleep_time = random.uniform(0.1, 1.0)
    time.sleep(sleep_time)
    return n, n * n

# 使用线程池执行器
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交 10 个任务
    futures = [executor.submit(task, i) for i in range(10)]

    # 使用 as_completed 按完成顺序获取结果
    # as_completed 返回一个迭代器,每次返回一个已完成的 Future
    for future in as_completed(futures):
        # 获取任务结果
        n, result = future.result()
        print(f"任务 {n} 完成,结果: {result}")

# 输出示例(顺序可能不同):
# 任务 2 完成,结果: 4
# 任务 0 完成,结果: 0
# 任务 5 完成,结果: 25
# ...

10. 错误处理 #

在实际应用中,任务可能会出错。我们需要正确处理这些异常。

10.1 示例:处理任务中的异常 #

# 导入必要的模块
from concurrent.futures import ThreadPoolExecutor

# 定义一个可能出错的任务
def safe_divide(x, y):
    """
    安全除法,可能会抛出异常
    """
    return x / y

# 要计算的除法列表
# 注意:包含一个除以 0 的情况
calculations = [
    (10, 2),
    (20, 4),
    (30, 0),  # 这会引发 ZeroDivisionError
    (40, 5)
]

# 使用线程池执行器
with ThreadPoolExecutor() as executor:
    # 提交所有任务
    futures = [executor.submit(safe_divide, x, y) for x, y in calculations]

    # 遍历 Future 对象,处理结果和异常
    for i, future in enumerate(futures):
        try:
            # 尝试获取结果,如果任务出错会抛出异常
            result = future.result()
            print(f"计算 {calculations[i]}: {result}")
        except ZeroDivisionError as e:
            # 捕获除以零的错误
            print(f"计算 {calculations[i]}: 错误 - 不能除以零")
        except Exception as e:
            # 捕获其他所有异常
            print(f"计算 {calculations[i]}: 未知错误 - {e}")

# 输出:
# 计算 (10, 2): 5.0
# 计算 (20, 4): 5.0
# 计算 (30, 0): 错误 - 不能除以零
# 计算 (40, 5): 8.0

11. 超时设置 #

有时候,我们希望任务在一定时间内完成,如果超时就取消任务。可以使用 timeout 参数。

11.1 示例:设置任务超时 #

# 导入必要的模块
from concurrent.futures import ThreadPoolExecutor
import time

# 定义一个可能很慢的任务
def slow_task(seconds):
    """
    模拟一个耗时任务
    """
    time.sleep(seconds)
    return f"任务完成,耗时 {seconds} 秒"

# 使用线程池执行器
with ThreadPoolExecutor() as executor:
    # 提交一个需要 5 秒的任务
    future = executor.submit(slow_task, 5)

    try:
        # 设置超时时间为 2 秒
        # 如果任务在 2 秒内未完成,会抛出 TimeoutError
        result = future.result(timeout=2)
        print(result)
    except TimeoutError:
        # 捕获超时异常
        print("任务超时!")
        # 尝试取消任务(如果任务还没开始执行)
        cancelled = future.cancel()
        if cancelled:
            print("任务已取消")
        else:
            print("任务无法取消(可能正在执行)")

# 输出:
# 任务超时!
# 任务无法取消(可能正在执行)

12. 如何选择执行器 #

选择正确的执行器对性能至关重要。下面是一个简单的选择指南:

场景 推荐执行器 原因
网络请求/API调用 ThreadPoolExecutor I/O密集型,线程切换开销小
文件读写 ThreadPoolExecutor I/O等待时间长,线程适合
图像处理/计算 ProcessPoolExecutor CPU密集型,需要绕过GIL
数据分析/机器学习 ProcessPoolExecutor CPU密集型,充分利用多核
数据库查询 ThreadPoolExecutor I/O等待为主

12.1 判断任务类型的方法 #

如果你不确定任务类型,可以问自己:

  • 任务大部分时间在等待(网络、文件、数据库)? → 使用 ThreadPoolExecutor
  • 任务大部分时间在计算(数学运算、数据处理)? → 使用 ProcessPoolExecutor

13. 设置工作线程或进程数 #

合理设置工作线程或进程数可以优化性能。

13.1 示例:根据任务类型设置工作数 #

# 导入必要的模块
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import os

# 获取 CPU 核心数
cpu_count = os.cpu_count() or 4
print(f"CPU 核心数: {cpu_count}")

# I/O 密集型任务:可以设置比 CPU 核心数更多的线程
# 因为线程在等待 I/O 时不会占用 CPU
io_workers = min(32, cpu_count * 4)
print(f"I/O 密集型推荐线程数: {io_workers}")

# CPU 密集型任务:通常使用进程池,工作进程数等于 CPU 核心数
# 每个进程可以充分利用一个 CPU 核心
cpu_workers = cpu_count
print(f"CPU 密集型推荐进程数: {cpu_workers}")

# 示例:处理 I/O 密集型任务
def io_task(url):
    """模拟 I/O 操作"""
    import time
    time.sleep(0.1)  # 模拟网络延迟
    return f"处理完成: {url}"

urls = [f"url_{i}" for i in range(20)]

# 使用线程池处理 I/O 密集型任务
with ThreadPoolExecutor(max_workers=io_workers) as executor:
    results = list(executor.map(io_task, urls))
    print(f"处理了 {len(results)} 个任务")

# 示例:处理 CPU 密集型任务
def cpu_task(n):
    """模拟 CPU 计算"""
    total = 0
    for i in range(n):
        total += i ** 2
    return total

numbers = [1000000] * 4

# 使用进程池处理 CPU 密集型任务
with ProcessPoolExecutor(max_workers=cpu_workers) as executor:
    results = list(executor.map(cpu_task, numbers))
    print(f"计算了 {len(results)} 个任务")

14. 注意事项 #

在使用 concurrent.futures 时,需要注意以下几点:

14.1 GIL 的限制 #

Python 的全局解释器锁(GIL)限制了线程的并行能力。对于纯 Python 的 CPU 密集型任务,使用线程不会提升性能,应该使用进程池。

14.2 进程间数据传递 #

使用 ProcessPoolExecutor 时,函数和参数必须可以被序列化(pickle)。这意味着:

  • 不能传递不可序列化的对象(如文件句柄、数据库连接等)
  • 每个进程有独立的内存空间,修改参数不会影响原数据

14.3 异常处理 #

任务中的异常不会立即抛出,只有在调用 future.result() 时才会抛出。所以一定要记得处理异常。

14.4 资源清理 #

使用 with 语句可以确保执行器正确关闭。如果不使用 with,记得调用 executor.shutdown()。

15. 完整示例:批量下载文件 #

下面是一个完整的实际应用示例,展示如何使用线程池批量下载文件:

# 导入必要的模块
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

# 定义一个模拟下载文件的函数
def download_file(url, file_id):
    """
    模拟下载文件的操作
    在实际应用中,这里可以使用 requests.get() 或 urllib
    """
    # 模拟网络延迟(随机 0.5 到 2 秒)
    import random
    download_time = random.uniform(0.5, 2.0)
    time.sleep(download_time)

    # 模拟文件大小
    file_size = random.randint(100, 1000)

    return {
        'file_id': file_id,
        'url': url,
        'size': file_size,
        'time': download_time
    }

# 要下载的文件列表
files = [
    ('http://example.com/file1.zip', 1),
    ('http://example.com/file2.zip', 2),
    ('http://example.com/file3.zip', 3),
    ('http://example.com/file4.zip', 4),
    ('http://example.com/file5.zip', 5),
]

# 记录开始时间
start_time = time.time()

# 使用线程池执行器
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交所有下载任务
    futures = {
        executor.submit(download_file, url, file_id): (url, file_id)
        for url, file_id in files
    }

    # 按完成顺序处理结果
    completed = 0
    total_size = 0

    for future in as_completed(futures):
        try:
            # 获取下载结果
            result = future.result()
            completed += 1
            total_size += result['size']

            print(f"文件 {result['file_id']} 下载完成: "
                  f"大小 {result['size']} KB, "
                  f"耗时 {result['time']:.2f} 秒")
        except Exception as e:
            # 处理下载失败的情况
            url, file_id = futures[future]
            print(f"文件 {file_id} 下载失败: {e}")

# 计算总耗时
total_time = time.time() - start_time
print(f"\n总共下载 {completed} 个文件,总大小 {total_size} KB")
print(f"总耗时: {total_time:.2f} 秒")

16. 总结 #

concurrent.futures 模块提供了简单而强大的并发编程接口:

  1. ThreadPoolExecutor:适合 I/O 密集型任务(网络、文件、数据库)
  2. ProcessPoolExecutor:适合 CPU 密集型任务(计算、处理)
  3. 简单易用:使用 submit() 或 map() 即可提交任务
  4. 灵活控制:通过 Future 对象可以查询状态、处理异常、设置超时

对于大多数并发需求,concurrent.futures 都是理想的选择。它让并发编程变得简单直观,你只需要关注业务逻辑,而不需要处理复杂的线程或进程管理。

← 上一节 collections 下一节 cosine_similarity →

访问验证

请输入访问令牌

Token不正确,请重新输入