Python多进程Pool使用示例


引言

在Python中,全局解释器锁(GIL)限制了多线程的并行执行能力,使得CPU密集型任务难以通过多线程获得性能提升。而multiprocessing模块通过创建多个进程来绕过GIL限制,真正实现并行计算。其中Pool类提供了便捷的进程池接口,可以高效地管理多个工作进程。本文将详细介绍Pool的使用方法和实际应用示例。

进程池基础概念

进程池(Pool)预先创建一组工作进程,等待分配任务。相比为每个任务创建新进程,使用进程池有以下优势:

  1. 减少进程创建/销毁的开销
  2. 自动管理任务分配
  3. 控制并发进程数量
  4. 提供多种任务分配方式

基本使用示例

1. 创建进程池

from multiprocessing import Pool
import os

def worker(x):
    print(f"进程 {os.getpid()} 处理任务 {x}")
    return x * x

if __name__ == '__main__':
    # 创建包含4个工作进程的池
    with Pool(processes=4) as pool:
        # 同步执行单个任务
        result = pool.apply(worker, args=(10,))
        print(f"同步执行结果: {result}")

        # 异步执行单个任务
        async_result = pool.apply_async(worker, args=(20,))
        print(f"异步执行结果: {async_result.get()}")  # get()会阻塞直到结果就绪

        # 并行执行多个任务
        results = pool.map(worker, range(10))
        print(f"map结果: {results}")

        # 异步并行执行多个任务
        async_results = pool.map_async(worker, range(10))
        print(f"map_async结果: {async_results.get()}")

2. 常用方法对比

方法执行方式返回结果适用场景
apply同步执行单个任务直接返回结果需要顺序执行的任务
apply_async异步执行单个任务返回AsyncResult对象非阻塞执行单个任务
map并行执行多个任务返回结果列表批量处理可迭代数据
map_async异步并行执行多个任务返回AsyncResult对象非阻塞批量处理
imap惰性并行执行返回迭代器处理大量数据时节省内存
imap_unordered无序惰性并行执行返回迭代器不关心顺序的大量数据处理

实际应用示例

示例1:图像批量处理

from multiprocessing import Pool
from PIL import Image
import os

def process_image(filename):
    try:
        img = Image.open(filename)
        img = img.rotate(90).resize((256, 256))
        output_path = f"processed_{os.path.basename(filename)}"
        img.save(output_path)
        return True
    except Exception as e:
        print(f"处理 {filename} 失败: {e}")
        return False

if __name__ == '__main__':
    image_files = [f for f in os.listdir() if f.endswith(('.jpg', '.png'))]

    with Pool(processes=4) as pool:
        results = pool.map(process_image, image_files)

    print(f"成功处理 {sum(results)}/{len(image_files)} 张图片")

示例2:计算密集型任务

from multiprocessing import Pool
import time

def is_prime(n):
    if n < 2:
        return False
    for i in range(2, int(n**0.5)+1):
        if n % i == 0:
            return False
    return True

if __name__ == '__main__':
    numbers = range(1000000, 1000100)  # 检查100个连续大数是否为质数

    start = time.time()
    with Pool() as pool:  # 不指定进程数则使用os.cpu_count()
        prime_results = pool.map(is_prime, numbers)
    elapsed = time.time() - start

    primes = [n for n, is_p in zip(numbers, prime_results) if is_p]
    print(f"找到 {len(primes)} 个质数: {primes[:5]}... (耗时: {elapsed:.2f}秒)")

示例3:并行下载文件

from multiprocessing import Pool
import requests
import os

def download_file(url):
    try:
        local_filename = url.split('/')[-1]
        with requests.get(url, stream=True) as r:
            r.raise_for_status()
            with open(local_filename, 'wb') as f:
                for chunk in r.iter_content(chunk_size=8192):
                    f.write(chunk)
        return (url, True)
    except Exception as e:
        return (url, False, str(e))

if __name__ == '__main__':
    urls = [
        'https://example.com/file1.zip',
        'https://example.com/file2.pdf',
        # 添加更多URL...
    ]

    with Pool(processes=4) as pool:
        results = pool.map(download_file, urls)

    for url, success, *error in results:
        status = "成功" if success else f"失败: {error[0]}"
        print(f"{url} 下载{status}")

高级用法与注意事项

1. 初始化工作进程

def init_worker():
    import signal
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def worker(x):
    # 每个工作进程只执行一次初始化
    if not hasattr(worker, 'initialized'):
        print(f"进程 {os.getpid()} 初始化")
        worker.initialized = True
    return x * x

if __name__ == '__main__':
    with Pool(processes=4, initializer=init_worker) as pool:
        print(pool.map(worker, range(10)))

2. 处理异常

def safe_worker(x):
    try:
        if x == 3:
            raise ValueError("故意抛出异常")
        return x * x
    except Exception as e:
        return (x, str(e))

if __name__ == '__main__':
    with Pool(processes=2) as pool:
        results = pool.map(safe_worker, range(5))

    for result in results:
        if isinstance(result, tuple):  # 错误结果
            print(f"任务 {result[0]} 失败: {result[1]}")
        else:
            print(f"任务结果: {result}")

3. 进度跟踪

from tqdm import tqdm

def worker(x):
    time.sleep(0.1)  # 模拟耗时任务
    return x * x

if __name__ == '__main__':
    with Pool(processes=4) as pool:
        results = list(tqdm(pool.imap(worker, range(100)), total=100))

性能优化建议

  1. 合理设置进程数:通常设置为CPU核心数,过多会导致上下文切换开销
  2. 减少进程间通信:传递大量数据会降低性能,考虑共享内存
  3. 避免小任务:任务执行时间应远大于进程间通信开销
  4. 使用imap处理大数据:避免一次性加载所有数据到内存
  5. 考虑lokypathos:更高级的进程池实现

常见问题解答

Q: 如何终止正在运行的进程池?

A: 使用pool.terminate()立即终止所有工作进程,或pool.close() + pool.join()优雅关闭。

Q: 为什么要在if __name__ == '__main__':中运行?

A: 避免Windows平台下的递归创建进程问题。

Q: 如何共享数据?

A: 使用multiprocessing.Value/ArrayManager创建共享变量。

结论

Python的multiprocessing.Pool提供了简单而强大的并行计算能力,特别适合CPU密集型任务。通过合理使用进程池,可以充分利用多核CPU资源,显著提高程序性能。掌握Pool的各种方法及其适用场景,能够帮助开发者编写更高效的并行程序。

,

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注