1. concurrent.futures #
concurrent.futures 是 Python 3.2+ 引入的标准库模块,提供了一种高级的异步执行接口,极大地简化了并发编程。
2. 前置知识:什么是并发编程 #
在开始学习 concurrent.futures 之前,我们需要先理解几个基本概念。
2.1 什么是并发 #
并发(Concurrency)是指在同一时间段内,程序可以处理多个任务。想象一下,如果你要下载 10 个文件,传统的方式是下载完第一个,再下载第二个,以此类推。而并发的方式是同时启动 10 个下载任务,哪个先完成就先处理哪个。
2.2 为什么需要并发 #
假设你要处理 100 个网络请求,每个请求需要 1 秒钟。如果按顺序处理,需要 100 秒。如果使用并发,同时处理 10 个请求,理论上只需要 10 秒左右,效率提升了 10 倍。
2.3 I/O 密集型 vs CPU 密集型 #
这是选择不同并发方式的关键:
- I/O 密集型任务:大部分时间在等待(网络请求、文件读写、数据库查询)。这类任务适合使用线程。
- CPU 密集型任务:大部分时间在计算(数学运算、图像处理、数据分析)。这类任务适合使用进程。
2.4 Python 的 GIL(全局解释器锁) #
GIL 是 Python 的一个特性,它限制了同一时刻只能有一个线程执行 Python 代码。这意味着:
- 线程:适合 I/O 密集型任务(因为等待 I/O 时会释放 GIL)
- 进程:适合 CPU 密集型任务(每个进程有独立的 GIL,可以真正并行)
3. concurrent.futures 简介 #
concurrent.futures 模块提供了两个核心执行器:
- ThreadPoolExecutor:线程池执行器,适合 I/O 密集型任务
- ProcessPoolExecutor:进程池执行器,适合 CPU 密集型任务
它们都提供了相同的接口,使用起来非常简单,你只需要关注业务逻辑,而不需要手动管理线程或进程的创建和销毁。
4. 核心概念 #
4.1 Executor(执行器) #
执行器是一个抽象概念,负责管理和执行你提交的任务。你可以把它想象成一个"任务管理器",你只需要把任务交给它,它会自动分配资源来执行。
4.2 Future(未来对象) #
当你提交一个任务后,执行器会立即返回一个 Future 对象。这个对象代表任务的"未来结果",你可以通过它来查询任务状态、获取结果或处理异常。
5. ThreadPoolExecutor 基础用法 #
ThreadPoolExecutor 使用线程池来执行任务,适合处理 I/O 密集型操作,比如网络请求、文件读写等。
5.1 示例:最简单的用法 #
下面是一个完整的示例,展示如何使用线程池同时下载多个网页:
# 导入必要的模块
from concurrent.futures import ThreadPoolExecutor
import time
# 定义一个模拟下载网页的函数
def download_url(url):
"""
模拟下载网页的操作
在实际应用中,这里可以使用 requests.get(url)
"""
# 模拟网络延迟
time.sleep(1)
return f"{url}: 下载完成,大小 1024 字节"
# 要下载的 URL 列表
urls = [
'http://www.example.com',
'http://www.python.org',
'http://www.github.com'
]
# 使用线程池执行器
# max_workers 指定最大工作线程数,这里设置为 3
with ThreadPoolExecutor(max_workers=3) as executor:
# 使用 map 方法批量提交任务
# map 会自动为每个 URL 创建一个任务
results = executor.map(download_url, urls)
# 遍历结果并打印
for result in results:
print(result)
# 输出:
# http://www.example.com: 下载完成,大小 1024 字节
# http://www.python.org: 下载完成,大小 1024 字节
# http://www.github.com: 下载完成,大小 1024 字节5.2 示例:使用 submit 方法 #
submit 方法可以提交单个任务,并返回一个 Future 对象,让你有更多的控制权:
# 导入必要的模块
from concurrent.futures import ThreadPoolExecutor
import time
# 定义一个计算函数
def calculate_square(number):
"""
计算数字的平方
"""
time.sleep(0.5) # 模拟计算时间
return number ** 2
# 要计算的数字列表
numbers = [1, 2, 3, 4, 5]
# 使用线程池执行器
with ThreadPoolExecutor(max_workers=3) as executor:
# 使用 submit 方法提交任务
# submit 返回 Future 对象列表
futures = [executor.submit(calculate_square, num) for num in numbers]
# 遍历 Future 对象,获取结果
for future in futures:
# result() 方法会阻塞等待任务完成并返回结果
result = future.result()
print(f"结果: {result}")
# 输出:
# 结果: 1
# 结果: 4
# 结果: 9
# 结果: 16
# 结果: 256. ProcessPoolExecutor 基础用法 #
ProcessPoolExecutor 使用进程池来执行任务,适合处理 CPU 密集型操作,比如数学计算、图像处理等。
6.1 示例:计算质数 #
下面是一个完整的示例,展示如何使用进程池来判断多个大数是否为质数:
# 导入必要的模块
from concurrent.futures import ProcessPoolExecutor
import math
# 定义一个判断质数的函数
def is_prime(n):
"""
判断一个数是否为质数
这是一个 CPU 密集型任务
"""
if n < 2:
return False
# 从 2 到 sqrt(n) 检查是否有因子
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True
# 要检查的数字列表
numbers = [112272535095293, 112582705942171,
115280095190773, 115797848077099]
# 使用进程池执行器
# 如果不指定 max_workers,默认使用 CPU 核心数
with ProcessPoolExecutor() as executor:
# 使用 map 方法批量提交任务
results = list(executor.map(is_prime, numbers))
# 打印结果
for n, result in zip(numbers, results):
print(f"{n} 是质数: {result}")
# 输出示例:
# 112272535095293 是质数: True
# 112582705942171 是质数: True
# 115280095190773 是质数: True
# 115797848077099 是质数: True注意:使用 ProcessPoolExecutor 时,被执行的函数和参数必须可以被序列化(pickle),因为数据需要在进程间传递。
7. 核心方法详解 #
7.1 submit() 方法 #
submit() 方法用于提交单个任务,返回一个 Future 对象。
# 导入必要的模块
from concurrent.futures import ThreadPoolExecutor
import time
# 定义一个简单的计算函数
def power(x, n):
"""
计算 x 的 n 次方
"""
time.sleep(0.1) # 模拟计算时间
return x ** n
# 使用线程池执行器
with ThreadPoolExecutor() as executor:
# 提交任务,返回 Future 对象
future = executor.submit(power, 2, 3)
# 检查任务是否正在运行
print(f"任务是否正在运行: {future.running()}")
# 检查任务是否已完成
print(f"任务是否已完成: {future.done()}")
# 获取结果(会阻塞等待任务完成)
result = future.result()
print(f"计算结果: {result}")
# 获取异常(如果有的话,没有异常返回 None)
exception = future.exception()
print(f"异常信息: {exception}")
# 输出:
# 任务是否正在运行: True
# 任务是否已完成: False
# 计算结果: 8
# 异常信息: None7.2 map() 方法 #
map() 方法用于批量提交任务,类似于 Python 内置的 map() 函数,但会并发执行。
# 导入必要的模块
from concurrent.futures import ThreadPoolExecutor
# 定义一个处理函数
def process_item(item):
"""
处理单个数据项
"""
return item * 2
# 要处理的数据列表
items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 使用线程池执行器
with ThreadPoolExecutor(max_workers=4) as executor:
# 使用 map 批量提交任务
# timeout 参数设置超时时间(秒),这里设置为 5 秒
results = executor.map(process_item, items, timeout=5)
# 遍历结果并打印
# 注意:结果会保持原始顺序
for result in results:
print(result)
# 输出:
# 2
# 4
# 6
# 8
# 10
# 12
# 14
# 16
# 18
# 20重要提示:map() 方法返回的结果会保持原始输入的顺序,即使任务完成的顺序不同。
8. Future 对象的使用 #
Future 对象提供了丰富的方法来查询任务状态和处理结果。
8.1 示例:查询任务状态 #
# 导入必要的模块
from concurrent.futures import ThreadPoolExecutor
import time
# 定义一个耗时任务
def long_task(seconds):
"""
模拟一个耗时任务
"""
time.sleep(seconds)
return f"任务完成,耗时 {seconds} 秒"
# 使用线程池执行器
with ThreadPoolExecutor() as executor:
# 提交一个需要 2 秒的任务
future = executor.submit(long_task, 2)
# 在任务执行过程中,可以查询状态
print(f"任务是否正在运行: {future.running()}")
print(f"任务是否已完成: {future.done()}")
# 等待任务完成并获取结果
result = future.result()
print(result)
# 任务完成后再次查询状态
print(f"任务是否正在运行: {future.running()}")
print(f"任务是否已完成: {future.done()}")
# 输出:
# 任务是否正在运行: True
# 任务是否已完成: False
# 任务完成,耗时 2 秒
# 任务是否正在运行: False
# 任务是否已完成: True9. 按完成顺序处理结果 #
有时候,我们不需要按照提交顺序获取结果,而是希望哪个任务先完成就先处理哪个。这时可以使用 as_completed() 函数。
9.1 示例:按完成顺序获取结果 #
# 导入必要的模块
from concurrent.futures import ThreadPoolExecutor, as_completed
import random
import time
# 定义一个随机耗时的任务
def task(n):
"""
模拟一个随机耗时的任务
"""
# 随机等待 0.1 到 1.0 秒
sleep_time = random.uniform(0.1, 1.0)
time.sleep(sleep_time)
return n, n * n
# 使用线程池执行器
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交 10 个任务
futures = [executor.submit(task, i) for i in range(10)]
# 使用 as_completed 按完成顺序获取结果
# as_completed 返回一个迭代器,每次返回一个已完成的 Future
for future in as_completed(futures):
# 获取任务结果
n, result = future.result()
print(f"任务 {n} 完成,结果: {result}")
# 输出示例(顺序可能不同):
# 任务 2 完成,结果: 4
# 任务 0 完成,结果: 0
# 任务 5 完成,结果: 25
# ...10. 错误处理 #
在实际应用中,任务可能会出错。我们需要正确处理这些异常。
10.1 示例:处理任务中的异常 #
# 导入必要的模块
from concurrent.futures import ThreadPoolExecutor
# 定义一个可能出错的任务
def safe_divide(x, y):
"""
安全除法,可能会抛出异常
"""
return x / y
# 要计算的除法列表
# 注意:包含一个除以 0 的情况
calculations = [
(10, 2),
(20, 4),
(30, 0), # 这会引发 ZeroDivisionError
(40, 5)
]
# 使用线程池执行器
with ThreadPoolExecutor() as executor:
# 提交所有任务
futures = [executor.submit(safe_divide, x, y) for x, y in calculations]
# 遍历 Future 对象,处理结果和异常
for i, future in enumerate(futures):
try:
# 尝试获取结果,如果任务出错会抛出异常
result = future.result()
print(f"计算 {calculations[i]}: {result}")
except ZeroDivisionError as e:
# 捕获除以零的错误
print(f"计算 {calculations[i]}: 错误 - 不能除以零")
except Exception as e:
# 捕获其他所有异常
print(f"计算 {calculations[i]}: 未知错误 - {e}")
# 输出:
# 计算 (10, 2): 5.0
# 计算 (20, 4): 5.0
# 计算 (30, 0): 错误 - 不能除以零
# 计算 (40, 5): 8.011. 超时设置 #
有时候,我们希望任务在一定时间内完成,如果超时就取消任务。可以使用 timeout 参数。
11.1 示例:设置任务超时 #
# 导入必要的模块
from concurrent.futures import ThreadPoolExecutor
import time
# 定义一个可能很慢的任务
def slow_task(seconds):
"""
模拟一个耗时任务
"""
time.sleep(seconds)
return f"任务完成,耗时 {seconds} 秒"
# 使用线程池执行器
with ThreadPoolExecutor() as executor:
# 提交一个需要 5 秒的任务
future = executor.submit(slow_task, 5)
try:
# 设置超时时间为 2 秒
# 如果任务在 2 秒内未完成,会抛出 TimeoutError
result = future.result(timeout=2)
print(result)
except TimeoutError:
# 捕获超时异常
print("任务超时!")
# 尝试取消任务(如果任务还没开始执行)
cancelled = future.cancel()
if cancelled:
print("任务已取消")
else:
print("任务无法取消(可能正在执行)")
# 输出:
# 任务超时!
# 任务无法取消(可能正在执行)12. 如何选择执行器 #
选择正确的执行器对性能至关重要。下面是一个简单的选择指南:
| 场景 | 推荐执行器 | 原因 |
|---|---|---|
| 网络请求/API调用 | ThreadPoolExecutor | I/O密集型,线程切换开销小 |
| 文件读写 | ThreadPoolExecutor | I/O等待时间长,线程适合 |
| 图像处理/计算 | ProcessPoolExecutor | CPU密集型,需要绕过GIL |
| 数据分析/机器学习 | ProcessPoolExecutor | CPU密集型,充分利用多核 |
| 数据库查询 | ThreadPoolExecutor | I/O等待为主 |
12.1 判断任务类型的方法 #
如果你不确定任务类型,可以问自己:
- 任务大部分时间在等待(网络、文件、数据库)? → 使用
ThreadPoolExecutor - 任务大部分时间在计算(数学运算、数据处理)? → 使用
ProcessPoolExecutor
13. 设置工作线程或进程数 #
合理设置工作线程或进程数可以优化性能。
13.1 示例:根据任务类型设置工作数 #
# 导入必要的模块
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import os
# 获取 CPU 核心数
cpu_count = os.cpu_count() or 4
print(f"CPU 核心数: {cpu_count}")
# I/O 密集型任务:可以设置比 CPU 核心数更多的线程
# 因为线程在等待 I/O 时不会占用 CPU
io_workers = min(32, cpu_count * 4)
print(f"I/O 密集型推荐线程数: {io_workers}")
# CPU 密集型任务:通常使用进程池,工作进程数等于 CPU 核心数
# 每个进程可以充分利用一个 CPU 核心
cpu_workers = cpu_count
print(f"CPU 密集型推荐进程数: {cpu_workers}")
# 示例:处理 I/O 密集型任务
def io_task(url):
"""模拟 I/O 操作"""
import time
time.sleep(0.1) # 模拟网络延迟
return f"处理完成: {url}"
urls = [f"url_{i}" for i in range(20)]
# 使用线程池处理 I/O 密集型任务
with ThreadPoolExecutor(max_workers=io_workers) as executor:
results = list(executor.map(io_task, urls))
print(f"处理了 {len(results)} 个任务")
# 示例:处理 CPU 密集型任务
def cpu_task(n):
"""模拟 CPU 计算"""
total = 0
for i in range(n):
total += i ** 2
return total
numbers = [1000000] * 4
# 使用进程池处理 CPU 密集型任务
with ProcessPoolExecutor(max_workers=cpu_workers) as executor:
results = list(executor.map(cpu_task, numbers))
print(f"计算了 {len(results)} 个任务")14. 注意事项 #
在使用 concurrent.futures 时,需要注意以下几点:
14.1 GIL 的限制 #
Python 的全局解释器锁(GIL)限制了线程的并行能力。对于纯 Python 的 CPU 密集型任务,使用线程不会提升性能,应该使用进程池。
14.2 进程间数据传递 #
使用 ProcessPoolExecutor 时,函数和参数必须可以被序列化(pickle)。这意味着:
- 不能传递不可序列化的对象(如文件句柄、数据库连接等)
- 每个进程有独立的内存空间,修改参数不会影响原数据
14.3 异常处理 #
任务中的异常不会立即抛出,只有在调用 future.result() 时才会抛出。所以一定要记得处理异常。
14.4 资源清理 #
使用 with 语句可以确保执行器正确关闭。如果不使用 with,记得调用 executor.shutdown()。
15. 完整示例:批量下载文件 #
下面是一个完整的实际应用示例,展示如何使用线程池批量下载文件:
# 导入必要的模块
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
# 定义一个模拟下载文件的函数
def download_file(url, file_id):
"""
模拟下载文件的操作
在实际应用中,这里可以使用 requests.get() 或 urllib
"""
# 模拟网络延迟(随机 0.5 到 2 秒)
import random
download_time = random.uniform(0.5, 2.0)
time.sleep(download_time)
# 模拟文件大小
file_size = random.randint(100, 1000)
return {
'file_id': file_id,
'url': url,
'size': file_size,
'time': download_time
}
# 要下载的文件列表
files = [
('http://example.com/file1.zip', 1),
('http://example.com/file2.zip', 2),
('http://example.com/file3.zip', 3),
('http://example.com/file4.zip', 4),
('http://example.com/file5.zip', 5),
]
# 记录开始时间
start_time = time.time()
# 使用线程池执行器
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交所有下载任务
futures = {
executor.submit(download_file, url, file_id): (url, file_id)
for url, file_id in files
}
# 按完成顺序处理结果
completed = 0
total_size = 0
for future in as_completed(futures):
try:
# 获取下载结果
result = future.result()
completed += 1
total_size += result['size']
print(f"文件 {result['file_id']} 下载完成: "
f"大小 {result['size']} KB, "
f"耗时 {result['time']:.2f} 秒")
except Exception as e:
# 处理下载失败的情况
url, file_id = futures[future]
print(f"文件 {file_id} 下载失败: {e}")
# 计算总耗时
total_time = time.time() - start_time
print(f"\n总共下载 {completed} 个文件,总大小 {total_size} KB")
print(f"总耗时: {total_time:.2f} 秒")16. 总结 #
concurrent.futures 模块提供了简单而强大的并发编程接口:
- ThreadPoolExecutor:适合 I/O 密集型任务(网络、文件、数据库)
- ProcessPoolExecutor:适合 CPU 密集型任务(计算、处理)
- 简单易用:使用
submit()或map()即可提交任务 - 灵活控制:通过
Future对象可以查询状态、处理异常、设置超时
对于大多数并发需求,concurrent.futures 都是理想的选择。它让并发编程变得简单直观,你只需要关注业务逻辑,而不需要处理复杂的线程或进程管理。