第15章 并发编程

前言

在现代计算机系统中,充分利用多核处理器的能力来提高程序性能是一个关键技能。Python作为一门强大的编程语言,提供了多种并发编程的解决方案,包括多线程、多进程和异步编程。本章将详细介绍这些并发编程技术,通过大量的实际代码示例和运行结果,帮助您深入理解并发编程的核心概念和实际应用。

无论是处理大量的网络请求、进行数据密集型计算,还是构建高性能的Web服务,掌握并发编程都是提升程序性能的重要手段。我们将从基础概念开始,逐步深入到高级技巧和最佳实践,确保您能够在实际项目中正确应用这些技术。

15.1 并发编程基础

并发与并行的区别

在开始学习具体的并发编程技术之前,我们首先需要理解几个核心概念:

并发(Concurrency):指的是同时处理多个任务的能力,但不一定是真正的同时执行。在单核处理器上,并发是通过快速切换任务来实现的,给人一种同时执行的错觉。

并行(Parallelism):指的是真正的同时执行多个任务,这需要多核处理器的支持。每个核心可以独立执行一个任务。

简单来说:
- 并发是一种处理能力,强调任务的交替执行
- 并行是一种执行方式,强调任务的同时执行

进程、线程和协程

Python中有三种主要的并发编程方式:

  1. 进程(Process)
    - 操作系统分配资源的基本单位
    - 拥有独立的内存空间
    - 进程间通信需要特殊机制
    - 创建和切换开销较大
    - 能够真正利用多核处理器

  2. 线程(Thread)
    - 进程内的执行单位
    - 共享进程的内存空间
    - 线程间通信相对简单
    - 创建和切换开销较小
    - 受Python GIL限制

  3. 协程(Coroutine)
    - 用户态的轻量级线程
    - 由程序员控制切换时机
    - 没有线程切换开销
    - 适合IO密集型任务
    - 单线程内的并发

Python的GIL(全局解释器锁)

Python的GIL是理解Python并发编程的关键概念:

什么是GIL
- Global Interpreter Lock(全局解释器锁)
- 确保同一时刻只有一个线程执行Python字节码
- 保护Python对象的内存安全

GIL的影响
- 多线程无法真正并行执行CPU密集型任务
- IO密集型任务可以通过线程切换获得性能提升
- 多进程可以绕过GIL限制

CPU密集型vs IO密集型任务

理解任务类型对选择合适的并发方案很重要:

CPU密集型任务
- 大量的计算操作
- 很少的IO等待
- 适合使用多进程
- 例:数学计算、图像处理、数据分析

IO密集型任务
- 大量的IO等待
- 较少的计算操作
- 适合使用多线程或异步编程
- 例:网络请求、文件读写、数据库操作

15.2 多线程编程

threading模块基础

Python的threading模块提供了线程编程的完整支持。让我们从一个基础示例开始,创建basic_threading.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
基础线程示例
演示线程创建和执行的基本用法
"""

import threading
import time
import random

def worker_function(name, duration):
    """工作函数"""
    print(f"线程 {name} 开始执行,预计耗时 {duration} 秒")
    start_time = time.time()

    # 模拟工作
    time.sleep(duration)

    end_time = time.time()
    actual_duration = end_time - start_time
    print(f"线程 {name} 执行完成,实际耗时 {actual_duration:.2f} 秒")

def demonstrate_basic_threading():
    """演示基础线程使用"""
    print("=== 基础线程演示 ===")
    print(f"主线程ID: {threading.current_thread().ident}")
    print(f"主线程名称: {threading.current_thread().name}")

    # 创建线程
    threads = []
    for i in range(3):
        duration = random.uniform(1, 3)
        thread = threading.Thread(
            target=worker_function,
            args=(f"Worker-{i+1}", duration),
            name=f"Thread-{i+1}"
        )
        threads.append(thread)

    # 启动线程
    start_time = time.time()
    for thread in threads:
        thread.start()
        print(f"启动线程: {thread.name}")

    # 等待所有线程完成
    for thread in threads:
        thread.join()
        print(f"线程 {thread.name} 已结束")

    end_time = time.time()
    total_time = end_time - start_time
    print(f"总执行时间: {total_time:.2f} 秒")
    print("=== 演示结束 ===\n")

if __name__ == "__main__":
    demonstrate_basic_threading()

执行上面的程序:

python basic_threading.py

输出结果:

=== 基础线程演示 ===
主线程ID: 24960
主线程名称: MainThread
线程 Worker-1 开始执行,预计耗时 1.0294828368882152 秒
启动线程: Thread-1
线程 Worker-2 开始执行,预计耗时 2.186671252160916 秒
启动线程: Thread-2
线程 Worker-3 开始执行,预计耗时 2.188518479545727 秒
启动线程: Thread-3
线程 Worker-1 执行完成,实际耗时 1.03 秒
线程 Thread-1 已结束
线程 Worker-2 执行完成,实际耗时 2.19 秒
线程 Thread-2 已结束
线程 Worker-3 执行完成,实际耗时 2.19 秒
线程 Thread-3 已结束
总执行时间: 2.19 秒
=== 演示结束 ===

从输出可以看到,三个线程并发执行,总时间约等于最长线程的执行时间,而不是所有线程时间的累加。

守护线程

守护线程是一种特殊的线程,它会在主程序结束时自动终止。在basic_threading.py中继续添加:

def demonstrate_daemon_thread():
    """演示守护线程"""
    print("=== 守护线程演示 ===")

    def daemon_worker():
        """守护线程工作函数"""
        for i in range(10):
            print(f"守护线程工作中... {i+1}/10")
            time.sleep(0.5)

    def normal_worker():
        """普通线程工作函数"""
        for i in range(3):
            print(f"普通线程工作中... {i+1}/3")
            time.sleep(1)
        print("普通线程工作完成")

    # 创建守护线程
    daemon_thread = threading.Thread(target=daemon_worker, name="DaemonThread")
    daemon_thread.daemon = True  # 设置为守护线程

    # 创建普通线程
    normal_thread = threading.Thread(target=normal_worker, name="NormalThread")

    # 启动线程
    daemon_thread.start()
    normal_thread.start()

    print("守护线程是否为守护线程:", daemon_thread.daemon)
    print("普通线程是否为守护线程:", normal_thread.daemon)

    # 只等待普通线程
    normal_thread.join()

    print("主程序即将结束(守护线程将自动终止)")
    print("=== 演示结束 ===\n")

线程同步 - 解决竞态条件

在多线程编程中,当多个线程同时访问共享资源时,可能会出现竞态条件(Race Condition)。创建thread_synchronization.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
线程同步示例
演示各种同步原语的使用
"""

import threading
import time
import random
from queue import Queue

# 全局变量用于演示同步
counter = 0
lock = threading.Lock()

def unsafe_counter_increment():
    """不安全的计数器递增(会出现竞态条件)"""
    global counter
    for _ in range(100000):
        counter += 1

def safe_counter_increment():
    """安全的计数器递增(使用锁)"""
    global counter, lock
    for _ in range(100000):
        with lock:
            counter += 1

def demonstrate_race_condition():
    """演示竞态条件"""
    global counter
    print("=== 竞态条件演示 ===")

    # 重置计数器
    counter = 0

    # 创建多个线程执行不安全的递增
    threads = []
    for i in range(5):
        thread = threading.Thread(target=unsafe_counter_increment)
        threads.append(thread)

    start_time = time.time()

    # 启动所有线程
    for thread in threads:
        thread.start()

    # 等待所有线程完成
    for thread in threads:
        thread.join()

    end_time = time.time()

    print(f"预期结果: {5 * 100000}")
    print(f"实际结果: {counter}")
    print(f"执行时间: {end_time - start_time:.4f} 秒")
    print("结果不一致说明存在竞态条件\n")

def demonstrate_thread_lock():
    """演示线程锁"""
    global counter
    print("=== 线程锁演示 ===")

    # 重置计数器
    counter = 0

    # 创建多个线程执行安全的递增
    threads = []
    for i in range(5):
        thread = threading.Thread(target=safe_counter_increment)
        threads.append(thread)

    start_time = time.time()

    # 启动所有线程
    for thread in threads:
        thread.start()

    # 等待所有线程完成
    for thread in threads:
        thread.join()

    end_time = time.time()

    print(f"预期结果: {5 * 100000}")
    print(f"实际结果: {counter}")
    print(f"执行时间: {end_time - start_time:.4f} 秒")
    print("结果一致说明同步成功\n")

执行程序:

python thread_synchronization.py

输出结果:

=== 竞态条件演示 ===
预期结果: 500000
实际结果: 500000
执行时间: 0.0315 秒
结果不一致说明存在竞态条件

=== 线程锁演示 ===
预期结果: 500000
实际结果: 500000
执行时间: 0.1359 秒
结果一致说明同步成功

可以看到,使用锁虽然保证了数据的一致性,但执行时间明显增加了,这是同步的代价。

线程间通信 - 生产者消费者模式

队列(Queue)是线程间通信的重要工具,特别适合实现生产者消费者模式。在thread_synchronization.py中继续添加:

class ProducerConsumer:
    """生产者消费者模式演示"""

    def __init__(self, max_size=5):
        self.queue = Queue(maxsize=max_size)
        self.produced_count = 0
        self.consumed_count = 0
        self.lock = threading.Lock()

    def producer(self, producer_id, items_to_produce):
        """生产者函数"""
        for i in range(items_to_produce):
            item = f"Producer-{producer_id}-Item-{i+1}"
            self.queue.put(item)

            with self.lock:
                self.produced_count += 1
                print(f"生产者 {producer_id} 生产了: {item} (队列大小: {self.queue.qsize()})")

            time.sleep(random.uniform(0.1, 0.3))

    def consumer(self, consumer_id, items_to_consume):
        """消费者函数"""
        consumed = 0
        while consumed < items_to_consume:
            try:
                item = self.queue.get(timeout=1)

                with self.lock:
                    self.consumed_count += 1
                    print(f"消费者 {consumer_id} 消费了: {item} (队列大小: {self.queue.qsize()})")

                consumed += 1
                time.sleep(random.uniform(0.1, 0.5))

            except:
                print(f"消费者 {consumer_id} 等待超时")
                break

def demonstrate_producer_consumer():
    """演示生产者消费者模式"""
    print("=== 生产者消费者模式演示 ===")

    pc = ProducerConsumer(max_size=3)

    # 创建生产者和消费者线程
    threads = []

    # 2个生产者
    for i in range(2):
        producer_thread = threading.Thread(
            target=pc.producer,
            args=(i+1, 5),
            name=f"Producer-{i+1}"
        )
        threads.append(producer_thread)

    # 2个消费者
    for i in range(2):
        consumer_thread = threading.Thread(
            target=pc.consumer,
            args=(i+1, 5),
            name=f"Consumer-{i+1}"
        )
        threads.append(consumer_thread)

    # 启动所有线程
    start_time = time.time()
    for thread in threads:
        thread.start()
        print(f"启动线程: {thread.name}")

    # 等待所有线程完成
    for thread in threads:
        thread.join()

    end_time = time.time()

    print(f"\n总生产数量: {pc.produced_count}")
    print(f"总消费数量: {pc.consumed_count}")
    print(f"总执行时间: {end_time - start_time:.2f} 秒")
    print("=== 演示结束 ===\n")

运行输出:

生产者 1 生产了: Producer-1-Item-1 (队列大小: 1)
启动线程: Producer-1
生产者 2 生产了: Producer-2-Item-1 (队列大小: 2)
启动线程: Producer-2
消费者 1 消费了: Producer-1-Item-1 (队列大小: 1)
启动线程: Consumer-1
消费者 2 消费了: Producer-2-Item-1 (队列大小: 0)
启动线程: Consumer-2
...(更多输出)
总生产数量: 10
总消费数量: 10
总执行时间: 1.94 秒
=== 演示结束 ===

15.3 多进程编程

multiprocessing模块基础

多进程编程能够绕过GIL的限制,真正利用多核处理器的能力。创建multiprocess_demo.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
多进程编程示例
演示multiprocessing模块的基本用法
"""

import multiprocessing
import time
import os
import random
from multiprocessing import Queue, Process, Pool

def worker_process(name, duration):
    """工作进程函数"""
    process_id = os.getpid()
    parent_id = os.getppid()

    print(f"进程 {name} (PID: {process_id}, 父PID: {parent_id}) 开始执行")
    start_time = time.time()

    # 模拟工作
    time.sleep(duration)

    end_time = time.time()
    actual_duration = end_time - start_time
    print(f"进程 {name} (PID: {process_id}) 执行完成,耗时 {actual_duration:.2f} 秒")

    return f"进程 {name} 的结果"

def demonstrate_basic_multiprocessing():
    """演示基础多进程使用"""
    print("=== 基础多进程演示 ===")
    print(f"主进程PID: {os.getpid()}")

    # 创建进程
    processes = []
    for i in range(3):
        duration = random.uniform(1, 2)
        process = Process(
            target=worker_process,
            args=(f"Worker-{i+1}", duration),
            name=f"Process-{i+1}"
        )
        processes.append(process)

    # 启动进程
    start_time = time.time()
    for process in processes:
        process.start()
        print(f"启动进程: {process.name} (PID: {process.pid})")

    # 等待所有进程完成
    for process in processes:
        process.join()
        print(f"进程 {process.name} 已结束,退出码: {process.exitcode}")

    end_time = time.time()
    total_time = end_time - start_time
    print(f"总执行时间: {total_time:.2f} 秒")
    print("=== 演示结束 ===\n")

进程池

进程池提供了更高级的进程管理方式,特别适合CPU密集型任务:

def cpu_intensive_task(n):
    """CPU密集型任务"""
    start_time = time.time()
    result = 0
    for i in range(n):
        result += i * i
    end_time = time.time()

    process_id = os.getpid()
    return {
        'process_id': process_id,
        'n': n,
        'result': result,
        'duration': end_time - start_time
    }

def demonstrate_process_pool():
    """演示进程池"""
    print("=== 进程池演示 ===")

    # 准备任务数据
    tasks = [1000000, 2000000, 1500000, 3000000, 2500000]

    print(f"CPU核心数: {multiprocessing.cpu_count()}")
    print(f"任务列表: {tasks}")

    # 使用进程池执行任务
    start_time = time.time()

    with Pool(processes=4) as pool:
        print("进程池已创建,开始分配任务...")
        results = pool.map(cpu_intensive_task, tasks)

    end_time = time.time()

    # 输出结果
    print("\n任务执行结果:")
    for result in results:
        print(f"进程 {result['process_id']}: 计算 {result['n']} 项,"
              f"结果 {result['result']}, 耗时 {result['duration']:.4f} 秒")

    print(f"\n进程池总执行时间: {end_time - start_time:.4f} 秒")
    print("=== 演示结束 ===\n")

if __name__ == "__main__":
    # 设置进程启动方法(Windows需要)
    multiprocessing.set_start_method('spawn', force=True)

    demonstrate_basic_multiprocessing()
    demonstrate_process_pool()

15.4 异步编程

asyncio基础

异步编程是处理IO密集型任务的高效方式。Python的asyncio模块提供了完整的异步编程支持。创建asyncio_demo.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
异步编程示例
演示asyncio模块的基本用法
"""

import asyncio
import time
import random

async def async_worker(name, duration):
    """异步工作函数"""
    print(f"协程 {name} 开始执行,预计耗时 {duration} 秒")
    start_time = time.time()

    # 异步等待
    await asyncio.sleep(duration)

    end_time = time.time()
    actual_duration = end_time - start_time
    print(f"协程 {name} 执行完成,实际耗时 {actual_duration:.2f} 秒")

    return f"协程 {name} 的结果"

async def demonstrate_basic_asyncio():
    """演示基础异步编程"""
    print("=== 基础异步编程演示 ===")

    # 创建多个协程任务
    tasks = []
    for i in range(3):
        duration = random.uniform(1, 2)
        task = async_worker(f"Worker-{i+1}", duration)
        tasks.append(task)

    # 并发执行所有任务
    start_time = time.time()
    results = await asyncio.gather(*tasks)
    end_time = time.time()

    print("\n执行结果:")
    for result in results:
        print(f"  {result}")

    total_time = end_time - start_time
    print(f"总执行时间: {total_time:.2f} 秒")
    print("=== 演示结束 ===\n")

异步HTTP请求

异步编程在网络请求中特别有用。首先需要安装aiohttp库:

pip install aiohttp

然后在asyncio_demo.py中继续添加:

import aiohttp

async def fetch_url(session, url, name):
    """异步获取URL内容"""
    try:
        print(f"开始请求 {name}: {url}")
        start_time = time.time()

        async with session.get(url, timeout=10) as response:
            content = await response.text()

        end_time = time.time()
        duration = end_time - start_time

        print(f"完成请求 {name}: 状态码 {response.status}, "
              f"内容长度 {len(content)}, 耗时 {duration:.2f} 秒")

        return {
            'name': name,
            'url': url,
            'status': response.status,
            'length': len(content),
            'duration': duration
        }

    except asyncio.TimeoutError:
        print(f"请求 {name} 超时")
        return {'name': name, 'url': url, 'error': 'timeout'}
    except Exception as e:
        print(f"请求 {name} 出错: {e}")
        return {'name': name, 'url': url, 'error': str(e)}

async def demonstrate_async_http():
    """演示异步HTTP请求"""
    print("=== 异步HTTP请求演示 ===")

    urls = [
        ("百度", "https://www.baidu.com"),
        ("知乎", "https://www.zhihu.com"),
        ("GitHub", "https://github.com"),
        ("Python官网", "https://www.python.org")
    ]

    start_time = time.time()

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url, name) for name, url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    end_time = time.time()

    print("\n请求结果汇总:")
    for result in results:
        if isinstance(result, dict) and 'error' not in result:
            print(f"  {result['name']}: {result['status']} - "
                  f"{result['length']} bytes - {result['duration']:.2f}s")
        elif isinstance(result, dict):
            print(f"  {result['name']}: 错误 - {result.get('error', '未知错误')}")
        else:
            print(f"  异常: {result}")

    total_time = end_time - start_time
    print(f"\n异步HTTP总执行时间: {total_time:.2f} 秒")
    print("=== 演示结束 ===\n")

执行程序:

python asyncio_demo.py

输出结果:

=== 异步HTTP请求演示 ===
开始请求 百度: https://www.baidu.com
开始请求 知乎: https://www.zhihu.com
开始请求 GitHub: https://github.com
开始请求 Python官网: https://www.python.org
完成请求 百度: 状态码 200, 内容长度 28918, 耗时 0.31 秒
完成请求 知乎: 状态码 403, 内容长度 118, 耗时 0.39 秒
完成请求 GitHub: 状态码 200, 内容长度 553650, 耗时 0.51 秒
完成请求 Python官网: 状态码 200, 内容长度 50272, 耗时 1.18 秒

请求结果汇总:
  百度: 200 - 28918 bytes - 0.31s
  知乎: 403 - 118 bytes - 0.39s
  GitHub: 200 - 553650 bytes - 0.51s
  Python官网: 200 - 50272 bytes - 1.18s

异步HTTP总执行时间: 1.19 秒

可以看到,4个HTTP请求并发执行,总时间约为最慢请求的时间。

15.5 concurrent.futures模块

concurrent.futures模块提供了高级的并发编程接口,可以统一处理线程和进程。创建futures_demo.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
concurrent.futures模块演示
统一的并发编程接口
"""

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time

def cpu_bound_task(n):
    """CPU密集型任务"""
    start_time = time.time()
    result = sum(i * i for i in range(n))
    end_time = time.time()
    return {
        'n': n,
        'result': result,
        'duration': end_time - start_time
    }

def io_bound_task(duration):
    """IO密集型任务"""
    time.sleep(duration)
    return f"任务完成,耗时 {duration} 秒"

def demonstrate_thread_pool_executor():
    """演示线程池执行器"""
    print("=== ThreadPoolExecutor演示 ===")

    tasks = [0.5, 1.0, 0.8, 1.2]

    start_time = time.time()

    # 使用线程池执行器
    with ThreadPoolExecutor(max_workers=4) as executor:
        # 提交任务
        futures = [executor.submit(io_bound_task, duration) for duration in tasks]

        # 获取结果
        results = [future.result() for future in futures]

    end_time = time.time()

    print("任务结果:")
    for result in results:
        print(f"  {result}")

    print(f"ThreadPoolExecutor总耗时: {end_time - start_time:.2f} 秒")
    print("=== 演示结束 ===\n")

def demonstrate_process_pool_executor():
    """演示进程池执行器"""
    print("=== ProcessPoolExecutor演示 ===")

    tasks = [1000000, 2000000, 1500000, 3000000]

    start_time = time.time()

    # 使用进程池执行器
    with ProcessPoolExecutor(max_workers=4) as executor:
        # 提交任务
        futures = [executor.submit(cpu_bound_task, n) for n in tasks]

        # 获取结果
        results = [future.result() for future in futures]

    end_time = time.time()

    print("任务结果:")
    for result in results:
        print(f"  计算 {result['n']} 项: 结果 {result['result']}, "
              f"耗时 {result['duration']:.4f} 秒")

    print(f"ProcessPoolExecutor总耗时: {end_time - start_time:.4f} 秒")
    print("=== 演示结束 ===\n")

if __name__ == "__main__":
    demonstrate_thread_pool_executor()
    demonstrate_process_pool_executor()

15.6 同步原语详解

各种锁的使用

创建sync_primitives_demo.py来演示各种同步原语:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
同步原语详解
演示各种锁和同步机制的使用
"""

import threading
import time

# 信号量演示
def demonstrate_semaphore():
    """演示信号量"""
    print("=== 信号量演示 ===")

    # 创建信号量,最多允许2个线程同时访问
    semaphore = threading.Semaphore(2)

    def access_resource(thread_id):
        print(f"线程 {thread_id} 尝试获取资源...")
        with semaphore:
            print(f"线程 {thread_id} 获得资源访问权")
            time.sleep(2)  # 模拟资源使用
            print(f"线程 {thread_id} 释放资源")

    # 创建5个线程尝试访问资源
    threads = []
    for i in range(5):
        thread = threading.Thread(target=access_resource, args=(i+1,))
        threads.append(thread)

    start_time = time.time()

    # 启动所有线程
    for thread in threads:
        thread.start()

    # 等待所有线程完成
    for thread in threads:
        thread.join()

    end_time = time.time()
    print(f"总执行时间: {end_time - start_time:.2f} 秒")
    print("=== 演示结束 ===\n")

# 事件演示
def demonstrate_event():
    """演示事件"""
    print("=== 事件演示 ===")

    event = threading.Event()

    def waiter(name):
        print(f"等待者 {name} 开始等待事件")
        event.wait()
        print(f"等待者 {name} 收到事件,开始工作")
        time.sleep(1)
        print(f"等待者 {name} 工作完成")

    def setter():
        print("设置者开始工作")
        time.sleep(2)
        print("设置者触发事件")
        event.set()

    # 创建线程
    threads = []

    # 等待者线程
    for i in range(3):
        thread = threading.Thread(target=waiter, args=(f"Waiter-{i+1}",))
        threads.append(thread)

    # 设置者线程
    setter_thread = threading.Thread(target=setter)
    threads.append(setter_thread)

    # 启动线程
    for thread in threads:
        thread.start()

    # 等待完成
    for thread in threads:
        thread.join()

    print("=== 演示结束 ===\n")

if __name__ == "__main__":
    demonstrate_semaphore()
    demonstrate_event()

15.7 并发设计模式

工作者模式

创建design_patterns_demo.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
并发设计模式演示
工作者模式等
"""

import queue
import threading
import time
import random

class WorkerPool:
    """工作者池"""

    def __init__(self, num_workers=4):
        self.task_queue = queue.Queue()
        self.result_queue = queue.Queue()
        self.workers = []
        self.num_workers = num_workers
        self.shutdown = False

    def start(self):
        """启动工作者池"""
        for i in range(self.num_workers):
            worker = threading.Thread(target=self._worker, args=(i+1,))
            worker.start()
            self.workers.append(worker)
        print(f"启动了 {self.num_workers} 个工作者")

    def _worker(self, worker_id):
        """工作者函数"""
        while not self.shutdown:
            try:
                task = self.task_queue.get(timeout=1)
                if task is None:  # 毒丸,用于停止工作者
                    break

                print(f"工作者 {worker_id} 开始处理任务: {task['id']}")
                start_time = time.time()

                # 模拟任务处理
                time.sleep(random.uniform(0.5, 2.0))

                end_time = time.time()
                result = {
                    'task_id': task['id'],
                    'worker_id': worker_id,
                    'result': task['data'] * 2,
                    'duration': end_time - start_time
                }

                self.result_queue.put(result)
                print(f"工作者 {worker_id} 完成任务: {task['id']}")

                self.task_queue.task_done()

            except queue.Empty:
                continue

    def submit_task(self, task_id, data):
        """提交任务"""
        task = {'id': task_id, 'data': data}
        self.task_queue.put(task)

    def get_result(self, timeout=None):
        """获取结果"""
        try:
            return self.result_queue.get(timeout=timeout)
        except queue.Empty:
            return None

    def shutdown_pool(self):
        """关闭工作者池"""
        print("正在关闭工作者池...")
        self.shutdown = True

        # 发送毒丸停止所有工作者
        for _ in range(self.num_workers):
            self.task_queue.put(None)

        # 等待所有工作者结束
        for worker in self.workers:
            worker.join()

        print("工作者池已关闭")

def demonstrate_worker_pool():
    """演示工作者池"""
    print("=== 工作者池演示 ===")

    # 创建并启动工作者池
    pool = WorkerPool(num_workers=3)
    pool.start()

    # 提交任务
    tasks = [
        (f"Task-{i+1}", random.randint(1, 100))
        for i in range(8)
    ]

    print(f"提交 {len(tasks)} 个任务")
    for task_id, data in tasks:
        pool.submit_task(task_id, data)

    # 收集结果
    results = []
    for _ in range(len(tasks)):
        result = pool.get_result(timeout=5)
        if result:
            results.append(result)
            print(f"收到结果: 任务 {result['task_id']} "
                  f"由工作者 {result['worker_id']} 完成,"
                  f"结果 {result['result']},耗时 {result['duration']:.2f} 秒")

    # 关闭池
    pool.shutdown_pool()

    print(f"处理了 {len(results)} 个任务")
    print("=== 演示结束 ===\n")

if __name__ == "__main__":
    demonstrate_worker_pool()

15.8 性能优化和调试

性能测试和比较

创建performance_demo.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
并发性能测试和比较
"""

import time
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def cpu_bound_task_sync(n):
    """CPU密集型任务(同步版本)"""
    result = 0
    for i in range(n):
        result += i * i
    return result

def io_bound_task_sync(duration):
    """IO密集型任务(同步版本)"""
    time.sleep(duration)
    return f"同步任务完成,耗时 {duration} 秒"

async def io_bound_task_async(duration):
    """IO密集型任务(异步版本)"""
    await asyncio.sleep(duration)
    return f"异步任务完成,耗时 {duration} 秒"

def benchmark_cpu_intensive():
    """CPU密集型任务性能测试"""
    print("=== CPU密集型任务性能测试 ===")

    task_size = 1000000
    num_tasks = 4

    # 1. 单线程执行
    print("1. 单线程执行:")
    start_time = time.time()
    results = [cpu_bound_task_sync(task_size) for _ in range(num_tasks)]
    single_thread_time = time.time() - start_time
    print(f"   执行时间: {single_thread_time:.4f} 秒")

    # 2. 多线程执行
    print("2. 多线程执行:")
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(cpu_bound_task_sync, [task_size] * num_tasks))
    multi_thread_time = time.time() - start_time
    print(f"   执行时间: {multi_thread_time:.4f} 秒")

    # 3. 多进程执行
    print("3. 多进程执行:")
    start_time = time.time()
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(cpu_bound_task_sync, [task_size] * num_tasks))
    multi_process_time = time.time() - start_time
    print(f"   执行时间: {multi_process_time:.4f} 秒")

    # 性能比较
    print("\n性能比较:")
    print(f"多线程相对单线程: {single_thread_time / multi_thread_time:.2f}x")
    print(f"多进程相对单线程: {single_thread_time / multi_process_time:.2f}x")
    print(f"多进程相对多线程: {multi_thread_time / multi_process_time:.2f}x")
    print("=== 测试结束 ===\n")

async def benchmark_io_intensive():
    """IO密集型任务性能测试"""
    print("=== IO密集型任务性能测试 ===")

    task_duration = 0.5
    num_tasks = 8

    # 1. 同步执行
    print("1. 同步执行:")
    start_time = time.time()
    results = [io_bound_task_sync(task_duration) for _ in range(num_tasks)]
    sync_time = time.time() - start_time
    print(f"   执行时间: {sync_time:.4f} 秒")

    # 2. 多线程执行
    print("2. 多线程执行:")
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=8) as executor:
        results = list(executor.map(io_bound_task_sync, [task_duration] * num_tasks))
    thread_time = time.time() - start_time
    print(f"   执行时间: {thread_time:.4f} 秒")

    # 3. 异步执行
    print("3. 异步执行:")
    start_time = time.time()
    tasks = [io_bound_task_async(task_duration) for _ in range(num_tasks)]
    results = await asyncio.gather(*tasks)
    async_time = time.time() - start_time
    print(f"   执行时间: {async_time:.4f} 秒")

    # 性能比较
    print("\n性能比较:")
    print(f"多线程相对同步: {sync_time / thread_time:.2f}x")
    print(f"异步相对同步: {sync_time / async_time:.2f}x")
    print(f"异步相对多线程: {thread_time / async_time:.2f}x")
    print("=== 测试结束 ===\n")

async def run_benchmarks():
    """运行性能测试"""
    benchmark_cpu_intensive()
    await benchmark_io_intensive()

if __name__ == "__main__":
    asyncio.run(run_benchmarks())

15.9 实际应用场景

数据处理管道

创建一个数据处理示例data_processing_demo.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
数据处理管道示例
实际应用场景演示
"""

import csv
import json
import time
import os
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import cpu_count

class DataProcessor:
    """数据处理器"""

    def __init__(self, num_workers=4):
        self.num_workers = num_workers

    def process_chunk(self, data_chunk, processor_id):
        """处理数据块"""
        print(f"处理器 {processor_id} 开始处理 {len(data_chunk)} 条记录")
        start_time = time.time()

        processed_data = []
        for record in data_chunk:
            # 示例数据处理逻辑
            processed_record = {
                'id': record.get('id', ''),
                'name': record.get('name', '').upper(),
                'value': float(record.get('value', 0)) * 1.1,
                'processed_at': time.time()
            }
            processed_data.append(processed_record)

        end_time = time.time()
        print(f"处理器 {processor_id} 完成,耗时 {end_time - start_time:.2f} 秒")

        return processed_data

    def process_data_threaded(self, data, chunk_size=1000):
        """使用多线程处理数据"""
        print("=== 多线程数据处理 ===")

        # 将数据分块
        chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
        print(f"将 {len(data)} 条记录分成 {len(chunks)} 个块")

        start_time = time.time()

        # 使用线程池处理
        with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
            futures = [
                executor.submit(self.process_chunk, chunk, i+1)
                for i, chunk in enumerate(chunks)
            ]

            # 收集结果
            results = []
            for future in futures:
                chunk_result = future.result()
                results.extend(chunk_result)

        end_time = time.time()

        print(f"多线程处理完成,处理了 {len(results)} 条记录")
        print(f"总耗时: {end_time - start_time:.2f} 秒")

        return results

def create_sample_data(num_records=10000):
    """创建示例数据"""
    import random

    print(f"创建 {num_records} 条示例数据")

    data = []
    for i in range(num_records):
        record = {
            'id': i + 1,
            'name': f"用户_{i+1}",
            'value': round(random.uniform(10, 1000), 2)
        }
        data.append(record)

    return data

def demonstrate_data_processing():
    """演示数据处理"""
    print(f"系统CPU核心数: {cpu_count()}")

    # 创建示例数据
    sample_data = create_sample_data(num_records=5000)

    # 处理数据
    processor = DataProcessor(num_workers=4)
    processed_data = processor.process_data_threaded(sample_data, chunk_size=1000)

    # 显示结果示例
    print("\n处理结果示例(前5条):")
    for record in processed_data[:5]:
        print(f"  ID: {record['id']}, 姓名: {record['name']}, "
              f"处理后值: {record['value']:.2f}")

    print("=== 演示结束 ===")

if __name__ == "__main__":
    demonstrate_data_processing()

总结

本章详细介绍了Python并发编程的各个方面,从基础概念到高级应用。通过大量的实际代码示例和运行结果,我们学习了:

核心知识点

  1. 并发编程基础:理解了并发与并行的区别,以及Python GIL的影响
  2. 多线程编程:掌握了线程创建、同步、通信等核心技术
  3. 多进程编程:学习了进程间通信和进程池的使用
  4. 异步编程:理解了asyncio的工作原理和异步模式
  5. 高级工具:掌握了concurrent.futures等高级并发工具
  6. 设计模式:学习了常见的并发设计模式

性能特点对比

并发方式 适用场景 优势 劣势
多线程 IO密集型任务 内存共享,切换开销小 受GIL限制,无法真正并行
多进程 CPU密集型任务 真正并行,不受GIL限制 内存开销大,进程间通信复杂
异步编程 高并发IO 单线程,无锁问题 不适合CPU密集型任务

选择建议

  • CPU密集型任务:使用多进程
  • IO密集型任务:使用多线程或异步编程
  • 高并发网络请求:优先选择异步编程
  • 简单并行计算:使用concurrent.futures

最佳实践

  1. 正确选择并发模型:根据任务类型选择合适的并发方式
  2. 合理使用同步原语:避免过度同步导致性能下降
  3. 异常处理:并发程序中的异常处理要特别谨慎
  4. 资源管理:及时释放线程、进程等资源
  5. 性能测试:通过实际测试验证并发效果

通过掌握这些并发编程技术,您可以编写出高性能、高并发的Python应用程序,充分利用现代计算机的多核处理能力。在实际项目中,要根据具体需求选择合适的并发策略,并进行充分的测试和优化。

小夜