15.1 并发编程基础¶
并发与并行的区别¶
在开始学习具体的并发编程技术之前,我们首先需要理解几个核心概念:
并发(Concurrency):指的是同时处理多个任务的能力,但不一定是真正的同时执行。在单核处理器上,并发是通过快速切换任务来实现的,给人一种同时执行的错觉。
并行(Parallelism):指的是真正的同时执行多个任务,这需要多核处理器的支持。每个核心可以独立执行一个任务。
简单来说:
- 并发是一种处理能力,强调任务的交替执行
- 并行是一种执行方式,强调任务的同时执行
进程、线程和协程¶
Python中有三种主要的并发编程方式:
-
进程(Process):
- 操作系统分配资源的基本单位
- 拥有独立的内存空间
- 进程间通信需要特殊机制
- 创建和切换开销较大
- 能够真正利用多核处理器 -
线程(Thread):
- 进程内的执行单位
- 共享进程的内存空间
- 线程间通信相对简单
- 创建和切换开销较小
- 受Python GIL限制 -
协程(Coroutine):
- 用户态的轻量级线程
- 由程序员控制切换时机
- 没有线程切换开销
- 适合IO密集型任务
- 单线程内的并发
Python的GIL(全局解释器锁)¶
Python的GIL是理解Python并发编程的关键概念:
什么是GIL:
- Global Interpreter Lock(全局解释器锁)
- 确保同一时刻只有一个线程执行Python字节码
- 保护Python对象的内存安全
GIL的影响:
- 多线程无法真正并行执行CPU密集型任务
- IO密集型任务可以通过线程切换获得性能提升
- 多进程可以绕过GIL限制
CPU密集型vs IO密集型任务¶
理解任务类型对选择合适的并发方案很重要:
CPU密集型任务:
- 大量的计算操作
- 很少的IO等待
- 适合使用多进程
- 例:数学计算、图像处理、数据分析
IO密集型任务:
- 大量的IO等待
- 较少的计算操作
- 适合使用多线程或异步编程
- 例:网络请求、文件读写、数据库操作
15.2 多线程编程¶
threading模块基础¶
Python的threading模块提供了线程编程的完整支持。让我们从一个基础示例开始,创建basic_threading.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
基础线程示例
演示线程创建和执行的基本用法
"""
import threading
import time
import random
def worker_function(name, duration):
"""工作函数"""
print(f"线程 {name} 开始执行,预计耗时 {duration} 秒")
start_time = time.time()
# 模拟工作
time.sleep(duration)
end_time = time.time()
actual_duration = end_time - start_time
print(f"线程 {name} 执行完成,实际耗时 {actual_duration:.2f} 秒")
def demonstrate_basic_threading():
"""演示基础线程使用"""
print("=== 基础线程演示 ===")
print(f"主线程ID: {threading.current_thread().ident}")
print(f"主线程名称: {threading.current_thread().name}")
# 创建线程
threads = []
for i in range(3):
duration = random.uniform(1, 3)
thread = threading.Thread(
target=worker_function,
args=(f"Worker-{i+1}", duration),
name=f"Thread-{i+1}"
)
threads.append(thread)
# 启动线程
start_time = time.time()
for thread in threads:
thread.start()
print(f"启动线程: {thread.name}")
# 等待所有线程完成
for thread in threads:
thread.join()
print(f"线程 {thread.name} 已结束")
end_time = time.time()
total_time = end_time - start_time
print(f"总执行时间: {total_time:.2f} 秒")
print("=== 演示结束 ===\n")
if __name__ == "__main__":
demonstrate_basic_threading()
执行上面的程序:
python basic_threading.py
输出结果:
=== 基础线程演示 ===
主线程ID: 24960
主线程名称: MainThread
线程 Worker-1 开始执行,预计耗时 1.0294828368882152 秒
启动线程: Thread-1
线程 Worker-2 开始执行,预计耗时 2.186671252160916 秒
启动线程: Thread-2
线程 Worker-3 开始执行,预计耗时 2.188518479545727 秒
启动线程: Thread-3
线程 Worker-1 执行完成,实际耗时 1.03 秒
线程 Thread-1 已结束
线程 Worker-2 执行完成,实际耗时 2.19 秒
线程 Thread-2 已结束
线程 Worker-3 执行完成,实际耗时 2.19 秒
线程 Thread-3 已结束
总执行时间: 2.19 秒
=== 演示结束 ===
从输出可以看到,三个线程并发执行,总时间约等于最长线程的执行时间,而不是所有线程时间的累加。
守护线程¶
守护线程是一种特殊的线程,它会在主程序结束时自动终止。在basic_threading.py中继续添加:
def demonstrate_daemon_thread():
"""演示守护线程"""
print("=== 守护线程演示 ===")
def daemon_worker():
"""守护线程工作函数"""
for i in range(10):
print(f"守护线程工作中... {i+1}/10")
time.sleep(0.5)
def normal_worker():
"""普通线程工作函数"""
for i in range(3):
print(f"普通线程工作中... {i+1}/3")
time.sleep(1)
print("普通线程工作完成")
# 创建守护线程
daemon_thread = threading.Thread(target=daemon_worker, name="DaemonThread")
daemon_thread.daemon = True # 设置为守护线程
# 创建普通线程
normal_thread = threading.Thread(target=normal_worker, name="NormalThread")
# 启动线程
daemon_thread.start()
normal_thread.start()
print("守护线程是否为守护线程:", daemon_thread.daemon)
print("普通线程是否为守护线程:", normal_thread.daemon)
# 只等待普通线程
normal_thread.join()
print("主程序即将结束(守护线程将自动终止)")
print("=== 演示结束 ===\n")
线程同步 - 解决竞态条件¶
在多线程编程中,当多个线程同时访问共享资源时,可能会出现竞态条件(Race Condition)。创建thread_synchronization.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
线程同步示例
演示各种同步原语的使用
"""
import threading
import time
import random
# 全局变量用于演示同步
counter = 0
lock = threading.Lock()
def unsafe_counter_increment():
"""不安全的计数器递增(会出现竞态条件)"""
global counter
for _ in range(100000):
counter += 1
def safe_counter_increment():
"""安全的计数器递增(使用锁)"""
global counter, lock
for _ in range(100000):
with lock:
counter += 1
def demonstrate_race_condition():
"""演示竞态条件"""
global counter
print("=== 竞态条件演示 ===")
# 重置计数器
counter = 0
# 创建多个线程执行不安全的递增
threads = []
for i in range(5):
thread = threading.Thread(target=unsafe_counter_increment)
threads.append(thread)
start_time = time.time()
# 启动所有线程
for thread in threads:
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
end_time = time.time()
print(f"预期结果: {5 * 100000}")
print(f"实际结果: {counter}")
print(f"执行时间: {end_time - start_time:.4f} 秒")
print("结果不一致说明存在竞态条件\n")
def demonstrate_thread_lock():
"""演示线程锁"""
global counter
print("=== 线程锁演示 ===")
# 重置计数器
counter = 0
# 创建多个线程执行安全的递增
threads = []
for i in range(5):
thread = threading.Thread(target=safe_counter_increment)
threads.append(thread)
start_time = time.time()
# 启动所有线程
for thread in threads:
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
end_time = time.time()
print(f"预期结果: {5 * 100000}")
print(f"实际结果: {counter}")
print(f"执行时间: {end_time - start_time:.4f} 秒")
print("结果一致说明同步成功\n")
线程间通信 - 生产者消费者模式¶
队列(Queue)是线程间通信的重要工具,特别适合实现生产者消费者模式:
class ProducerConsumer:
"""生产者消费者模式演示"""
def __init__(self, max_size=5):
self.queue = queue.Queue(maxsize=max_size)
self.produced_count = 0
self.consumed_count = 0
self.lock = threading.Lock()
def producer(self, producer_id, items_to_produce):
"""生产者函数"""
for i in range(items_to_produce):
item = f"Producer-{producer_id}-Item-{i+1}"
self.queue.put(item)
with self.lock:
self.produced_count += 1
print(f"生产者 {producer_id} 生产了: {item} (队列大小: {self.queue.qsize()})")
time.sleep(random.uniform(0.1, 0.3))
def consumer(self, consumer_id, items_to_consume):
"""消费者函数"""
consumed = 0
while consumed < items_to_consume:
try:
item = self.queue.get(timeout=1)
with self.lock:
self.consumed_count += 1
print(f"消费者 {consumer_id} 消费了: {item} (队列大小: {self.queue.qsize()})")
consumed += 1
time.sleep(random.uniform(0.1, 0.5))
except:
print(f"消费者 {consumer_id} 等待超时")
break
def demonstrate_producer_consumer():
"""演示生产者消费者模式"""
print("=== 生产者消费者模式演示 ===")
pc = ProducerConsumer(max_size=3)
# 创建生产者和消费者线程
threads = []
# 2个生产者
for i in range(2):
producer_thread = threading.Thread(
target=pc.producer,
args=(i+1, 5),
name=f"Producer-{i+1}"
)
threads.append(producer_thread)
# 2个消费者
for i in range(2):
consumer_thread = threading.Thread(
target=pc.consumer,
args=(i+1, 5),
name=f"Consumer-{i+1}"
)
threads.append(consumer_thread)
# 启动所有线程
start_time = time.time()
for thread in threads:
thread.start()
print(f"启动线程: {thread.name}")
# 等待所有线程完成
for thread in threads:
thread.join()
end_time = time.time()
print(f"\n总生产数量: {pc.produced_count}")
print(f"总消费数量: {pc.consumed_count}")
print(f"总执行时间: {end_time - start_time:.2f} 秒")
print("=== 演示结束 ===\n")
15.3 多进程编程¶
multiprocessing模块基础¶
多进程编程能够绕过GIL的限制,真正利用多核处理器的能力。创建multiprocess_demo.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
多进程编程示例
演示multiprocessing模块的基本用法
"""
import multiprocessing
import time
import os
import random
from multiprocessing import Queue, Process, Pool
def worker_process(name, duration):
"""工作进程函数"""
process_id = os.getpid()
parent_id = os.getppid()
print(f"进程 {name} (PID: {process_id}, 父PID: {parent_id}) 开始执行")
start_time = time.time()
# 模拟工作
time.sleep(duration)
end_time = time.time()
actual_duration = end_time - start_time
print(f"进程 {name} (PID: {process_id}) 执行完成,耗时 {actual_duration:.2f} 秒")
return f"进程 {name} 的结果"
def demonstrate_basic_multiprocessing():
"""演示基础多进程使用"""
print("=== 基础多进程演示 ===")
print(f"主进程PID: {os.getpid()}")
# 创建进程
processes = []
for i in range(3):
duration = random.uniform(1, 2)
process = Process(
target=worker_process,
args=(f"Worker-{i+1}", duration),
name=f"Process-{i+1}"
)
processes.append(process)
# 启动进程
start_time = time.time()
for process in processes:
process.start()
print(f"启动进程: {process.name} (PID: {process.pid})")
# 等待所有进程完成
for process in processes:
process.join()
print(f"进程 {process.name} 已结束,退出码: {process.exitcode}")
end_time = time.time()
total_time = end_time - start_time
print(f"总执行时间: {total_time:.2f} 秒")
print("=== 演示结束 ===\n")
def cpu_intensive_task(n):
"""CPU密集型任务"""
start_time = time.time()
result = 0
for i in range(n):
result += i * i
end_time = time.time()
process_id = os.getpid()
return {
'process_id': process_id,
'n': n,
'result': result,
'duration': end_time - start_time
}
def demonstrate_process_pool():
"""演示进程池"""
print("=== 进程池演示 ===")
# 准备任务数据
tasks = [1000000, 2000000, 1500000, 3000000, 2500000]
print(f"CPU核心数: {multiprocessing.cpu_count()}")
print(f"任务列表: {tasks}")
# 使用进程池执行任务
start_time = time.time()
with Pool(processes=4) as pool:
print("进程池已创建,开始分配任务...")
results = pool.map(cpu_intensive_task, tasks)
end_time = time.time()
# 输出结果
print("\n任务执行结果:")
for result in results:
print(f"进程 {result['process_id']}: 计算 {result['n']} 项,"
f"结果 {result['result']}, 耗时 {result['duration']:.4f} 秒")
print(f"\n进程池总执行时间: {end_time - start_time:.4f} 秒")
print("=== 演示结束 ===\n")
if __name__ == "__main__":
# 设置进程启动方法(Windows需要)
multiprocessing.set_start_method('spawn', force=True)
demonstrate_basic_multiprocessing()
demonstrate_process_pool()
15.4 异步编程¶
asyncio基础¶
异步编程是处理IO密集型任务的高效方式。Python的asyncio模块提供了完整的异步编程支持。创建asyncio_demo.py:
```python
!/usr/bin/env python3¶
-- coding: utf-8 --¶
”“”
异步编程示例
演示asyncio模块的基本用法
“”“
import asyncio
import time
import random
async def async_worker(name, duration):
“”“异步工作函数”“”
print(f”协程 {name} 开始执行,预计耗时 {duration} 秒”)
start_time = time.time()
# 异步等待
await asyncio.sleep(duration)
end_time =