15.1 并发编程基础

并发与并行的区别

在开始学习具体的并发编程技术之前,我们首先需要理解几个核心概念:

并发(Concurrency):指的是同时处理多个任务的能力,但不一定是真正的同时执行。在单核处理器上,并发是通过快速切换任务来实现的,给人一种同时执行的错觉。

并行(Parallelism):指的是真正的同时执行多个任务,这需要多核处理器的支持。每个核心可以独立执行一个任务。

简单来说:
- 并发是一种处理能力,强调任务的交替执行
- 并行是一种执行方式,强调任务的同时执行

进程、线程和协程

Python中有三种主要的并发编程方式:

  1. 进程(Process)
    - 操作系统分配资源的基本单位
    - 拥有独立的内存空间
    - 进程间通信需要特殊机制
    - 创建和切换开销较大
    - 能够真正利用多核处理器

  2. 线程(Thread)
    - 进程内的执行单位
    - 共享进程的内存空间
    - 线程间通信相对简单
    - 创建和切换开销较小
    - 受Python GIL限制

  3. 协程(Coroutine)
    - 用户态的轻量级线程
    - 由程序员控制切换时机
    - 没有线程切换开销
    - 适合IO密集型任务
    - 单线程内的并发

Python的GIL(全局解释器锁)

Python的GIL是理解Python并发编程的关键概念:

什么是GIL
- Global Interpreter Lock(全局解释器锁)
- 确保同一时刻只有一个线程执行Python字节码
- 保护Python对象的内存安全

GIL的影响
- 多线程无法真正并行执行CPU密集型任务
- IO密集型任务可以通过线程切换获得性能提升
- 多进程可以绕过GIL限制

CPU密集型vs IO密集型任务

理解任务类型对选择合适的并发方案很重要:

CPU密集型任务
- 大量的计算操作
- 很少的IO等待
- 适合使用多进程
- 例:数学计算、图像处理、数据分析

IO密集型任务
- 大量的IO等待
- 较少的计算操作
- 适合使用多线程或异步编程
- 例:网络请求、文件读写、数据库操作

15.2 多线程编程

threading模块基础

Python的threading模块提供了线程编程的完整支持。让我们从一个基础示例开始,创建basic_threading.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
基础线程示例
演示线程创建和执行的基本用法
"""

import threading
import time
import random

def worker_function(name, duration):
    """工作函数"""
    print(f"线程 {name} 开始执行,预计耗时 {duration} 秒")
    start_time = time.time()

    # 模拟工作
    time.sleep(duration)

    end_time = time.time()
    actual_duration = end_time - start_time
    print(f"线程 {name} 执行完成,实际耗时 {actual_duration:.2f} 秒")

def demonstrate_basic_threading():
    """演示基础线程使用"""
    print("=== 基础线程演示 ===")
    print(f"主线程ID: {threading.current_thread().ident}")
    print(f"主线程名称: {threading.current_thread().name}")

    # 创建线程
    threads = []
    for i in range(3):
        duration = random.uniform(1, 3)
        thread = threading.Thread(
            target=worker_function,
            args=(f"Worker-{i+1}", duration),
            name=f"Thread-{i+1}"
        )
        threads.append(thread)

    # 启动线程
    start_time = time.time()
    for thread in threads:
        thread.start()
        print(f"启动线程: {thread.name}")

    # 等待所有线程完成
    for thread in threads:
        thread.join()
        print(f"线程 {thread.name} 已结束")

    end_time = time.time()
    total_time = end_time - start_time
    print(f"总执行时间: {total_time:.2f} 秒")
    print("=== 演示结束 ===\n")

if __name__ == "__main__":
    demonstrate_basic_threading()

执行上面的程序:

python basic_threading.py

输出结果:

=== 基础线程演示 ===
主线程ID: 24960
主线程名称: MainThread
线程 Worker-1 开始执行,预计耗时 1.0294828368882152 秒
启动线程: Thread-1
线程 Worker-2 开始执行,预计耗时 2.186671252160916 秒
启动线程: Thread-2
线程 Worker-3 开始执行,预计耗时 2.188518479545727 秒
启动线程: Thread-3
线程 Worker-1 执行完成,实际耗时 1.03 秒
线程 Thread-1 已结束
线程 Worker-2 执行完成,实际耗时 2.19 秒
线程 Thread-2 已结束
线程 Worker-3 执行完成,实际耗时 2.19 秒
线程 Thread-3 已结束
总执行时间: 2.19 秒
=== 演示结束 ===

从输出可以看到,三个线程并发执行,总时间约等于最长线程的执行时间,而不是所有线程时间的累加。

守护线程

守护线程是一种特殊的线程,它会在主程序结束时自动终止。在basic_threading.py中继续添加:

def demonstrate_daemon_thread():
    """演示守护线程"""
    print("=== 守护线程演示 ===")

    def daemon_worker():
        """守护线程工作函数"""
        for i in range(10):
            print(f"守护线程工作中... {i+1}/10")
            time.sleep(0.5)

    def normal_worker():
        """普通线程工作函数"""
        for i in range(3):
            print(f"普通线程工作中... {i+1}/3")
            time.sleep(1)
        print("普通线程工作完成")

    # 创建守护线程
    daemon_thread = threading.Thread(target=daemon_worker, name="DaemonThread")
    daemon_thread.daemon = True  # 设置为守护线程

    # 创建普通线程
    normal_thread = threading.Thread(target=normal_worker, name="NormalThread")

    # 启动线程
    daemon_thread.start()
    normal_thread.start()

    print("守护线程是否为守护线程:", daemon_thread.daemon)
    print("普通线程是否为守护线程:", normal_thread.daemon)

    # 只等待普通线程
    normal_thread.join()

    print("主程序即将结束(守护线程将自动终止)")
    print("=== 演示结束 ===\n")

线程同步 - 解决竞态条件

在多线程编程中,当多个线程同时访问共享资源时,可能会出现竞态条件(Race Condition)。创建thread_synchronization.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
线程同步示例
演示各种同步原语的使用
"""

import threading
import time
import random

# 全局变量用于演示同步
counter = 0
lock = threading.Lock()

def unsafe_counter_increment():
    """不安全的计数器递增(会出现竞态条件)"""
    global counter
    for _ in range(100000):
        counter += 1

def safe_counter_increment():
    """安全的计数器递增(使用锁)"""
    global counter, lock
    for _ in range(100000):
        with lock:
            counter += 1

def demonstrate_race_condition():
    """演示竞态条件"""
    global counter
    print("=== 竞态条件演示 ===")

    # 重置计数器
    counter = 0

    # 创建多个线程执行不安全的递增
    threads = []
    for i in range(5):
        thread = threading.Thread(target=unsafe_counter_increment)
        threads.append(thread)

    start_time = time.time()

    # 启动所有线程
    for thread in threads:
        thread.start()

    # 等待所有线程完成
    for thread in threads:
        thread.join()

    end_time = time.time()

    print(f"预期结果: {5 * 100000}")
    print(f"实际结果: {counter}")
    print(f"执行时间: {end_time - start_time:.4f} 秒")
    print("结果不一致说明存在竞态条件\n")

def demonstrate_thread_lock():
    """演示线程锁"""
    global counter
    print("=== 线程锁演示 ===")

    # 重置计数器
    counter = 0

    # 创建多个线程执行安全的递增
    threads = []
    for i in range(5):
        thread = threading.Thread(target=safe_counter_increment)
        threads.append(thread)

    start_time = time.time()

    # 启动所有线程
    for thread in threads:
        thread.start()

    # 等待所有线程完成
    for thread in threads:
        thread.join()

    end_time = time.time()

    print(f"预期结果: {5 * 100000}")
    print(f"实际结果: {counter}")
    print(f"执行时间: {end_time - start_time:.4f} 秒")
    print("结果一致说明同步成功\n")

线程间通信 - 生产者消费者模式

队列(Queue)是线程间通信的重要工具,特别适合实现生产者消费者模式:

class ProducerConsumer:
    """生产者消费者模式演示"""

    def __init__(self, max_size=5):
        self.queue = queue.Queue(maxsize=max_size)
        self.produced_count = 0
        self.consumed_count = 0
        self.lock = threading.Lock()

    def producer(self, producer_id, items_to_produce):
        """生产者函数"""
        for i in range(items_to_produce):
            item = f"Producer-{producer_id}-Item-{i+1}"
            self.queue.put(item)

            with self.lock:
                self.produced_count += 1
                print(f"生产者 {producer_id} 生产了: {item} (队列大小: {self.queue.qsize()})")

            time.sleep(random.uniform(0.1, 0.3))

    def consumer(self, consumer_id, items_to_consume):
        """消费者函数"""
        consumed = 0
        while consumed < items_to_consume:
            try:
                item = self.queue.get(timeout=1)

                with self.lock:
                    self.consumed_count += 1
                    print(f"消费者 {consumer_id} 消费了: {item} (队列大小: {self.queue.qsize()})")

                consumed += 1
                time.sleep(random.uniform(0.1, 0.5))

            except:
                print(f"消费者 {consumer_id} 等待超时")
                break

def demonstrate_producer_consumer():
    """演示生产者消费者模式"""
    print("=== 生产者消费者模式演示 ===")

    pc = ProducerConsumer(max_size=3)

    # 创建生产者和消费者线程
    threads = []

    # 2个生产者
    for i in range(2):
        producer_thread = threading.Thread(
            target=pc.producer,
            args=(i+1, 5),
            name=f"Producer-{i+1}"
        )
        threads.append(producer_thread)

    # 2个消费者
    for i in range(2):
        consumer_thread = threading.Thread(
            target=pc.consumer,
            args=(i+1, 5),
            name=f"Consumer-{i+1}"
        )
        threads.append(consumer_thread)

    # 启动所有线程
    start_time = time.time()
    for thread in threads:
        thread.start()
        print(f"启动线程: {thread.name}")

    # 等待所有线程完成
    for thread in threads:
        thread.join()

    end_time = time.time()

    print(f"\n总生产数量: {pc.produced_count}")
    print(f"总消费数量: {pc.consumed_count}")
    print(f"总执行时间: {end_time - start_time:.2f} 秒")
    print("=== 演示结束 ===\n")

15.3 多进程编程

multiprocessing模块基础

多进程编程能够绕过GIL的限制,真正利用多核处理器的能力。创建multiprocess_demo.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
多进程编程示例
演示multiprocessing模块的基本用法
"""

import multiprocessing
import time
import os
import random
from multiprocessing import Queue, Process, Pool

def worker_process(name, duration):
    """工作进程函数"""
    process_id = os.getpid()
    parent_id = os.getppid()

    print(f"进程 {name} (PID: {process_id}, 父PID: {parent_id}) 开始执行")
    start_time = time.time()

    # 模拟工作
    time.sleep(duration)

    end_time = time.time()
    actual_duration = end_time - start_time
    print(f"进程 {name} (PID: {process_id}) 执行完成,耗时 {actual_duration:.2f} 秒")

    return f"进程 {name} 的结果"

def demonstrate_basic_multiprocessing():
    """演示基础多进程使用"""
    print("=== 基础多进程演示 ===")
    print(f"主进程PID: {os.getpid()}")

    # 创建进程
    processes = []
    for i in range(3):
        duration = random.uniform(1, 2)
        process = Process(
            target=worker_process,
            args=(f"Worker-{i+1}", duration),
            name=f"Process-{i+1}"
        )
        processes.append(process)

    # 启动进程
    start_time = time.time()
    for process in processes:
        process.start()
        print(f"启动进程: {process.name} (PID: {process.pid})")

    # 等待所有进程完成
    for process in processes:
        process.join()
        print(f"进程 {process.name} 已结束,退出码: {process.exitcode}")

    end_time = time.time()
    total_time = end_time - start_time
    print(f"总执行时间: {total_time:.2f} 秒")
    print("=== 演示结束 ===\n")

def cpu_intensive_task(n):
    """CPU密集型任务"""
    start_time = time.time()
    result = 0
    for i in range(n):
        result += i * i
    end_time = time.time()

    process_id = os.getpid()
    return {
        'process_id': process_id,
        'n': n,
        'result': result,
        'duration': end_time - start_time
    }

def demonstrate_process_pool():
    """演示进程池"""
    print("=== 进程池演示 ===")

    # 准备任务数据
    tasks = [1000000, 2000000, 1500000, 3000000, 2500000]

    print(f"CPU核心数: {multiprocessing.cpu_count()}")
    print(f"任务列表: {tasks}")

    # 使用进程池执行任务
    start_time = time.time()

    with Pool(processes=4) as pool:
        print("进程池已创建,开始分配任务...")
        results = pool.map(cpu_intensive_task, tasks)

    end_time = time.time()

    # 输出结果
    print("\n任务执行结果:")
    for result in results:
        print(f"进程 {result['process_id']}: 计算 {result['n']} 项,"
              f"结果 {result['result']}, 耗时 {result['duration']:.4f} 秒")

    print(f"\n进程池总执行时间: {end_time - start_time:.4f} 秒")
    print("=== 演示结束 ===\n")

if __name__ == "__main__":
    # 设置进程启动方法(Windows需要)
    multiprocessing.set_start_method('spawn', force=True)

    demonstrate_basic_multiprocessing()
    demonstrate_process_pool()

15.4 异步编程

asyncio基础

异步编程是处理IO密集型任务的高效方式。Python的asyncio模块提供了完整的异步编程支持。创建asyncio_demo.py
```python

!/usr/bin/env python3

-- coding: utf-8 --

”“”
异步编程示例
演示asyncio模块的基本用法
“”“

import asyncio
import time
import random

async def async_worker(name, duration):
“”“异步工作函数”“”
print(f”协程 {name} 开始执行,预计耗时 {duration} 秒”)
start_time = time.time()

# 异步等待
await asyncio.sleep(duration)

end_time =
Xiaoye