第15章 并发编程¶
前言¶
在现代计算机系统中,充分利用多核处理器的能力来提高程序性能是一个关键技能。Python作为一门强大的编程语言,提供了多种并发编程的解决方案,包括多线程、多进程和异步编程。本章将详细介绍这些并发编程技术,通过大量的实际代码示例和运行结果,帮助您深入理解并发编程的核心概念和实际应用。
无论是处理大量的网络请求、进行数据密集型计算,还是构建高性能的Web服务,掌握并发编程都是提升程序性能的重要手段。我们将从基础概念开始,逐步深入到高级技巧和最佳实践,确保您能够在实际项目中正确应用这些技术。
15.1 并发编程基础¶
并发与并行的区别¶
在开始学习具体的并发编程技术之前,我们首先需要理解几个核心概念:
并发(Concurrency):指的是同时处理多个任务的能力,但不一定是真正的同时执行。在单核处理器上,并发是通过快速切换任务来实现的,给人一种同时执行的错觉。
并行(Parallelism):指的是真正的同时执行多个任务,这需要多核处理器的支持。每个核心可以独立执行一个任务。
简单来说:
- 并发是一种处理能力,强调任务的交替执行
- 并行是一种执行方式,强调任务的同时执行
进程、线程和协程¶
Python中有三种主要的并发编程方式:
-
进程(Process):
- 操作系统分配资源的基本单位
- 拥有独立的内存空间
- 进程间通信需要特殊机制
- 创建和切换开销较大
- 能够真正利用多核处理器 -
线程(Thread):
- 进程内的执行单位
- 共享进程的内存空间
- 线程间通信相对简单
- 创建和切换开销较小
- 受Python GIL限制 -
协程(Coroutine):
- 用户态的轻量级线程
- 由程序员控制切换时机
- 没有线程切换开销
- 适合IO密集型任务
- 单线程内的并发
Python的GIL(全局解释器锁)¶
Python的GIL是理解Python并发编程的关键概念:
什么是GIL:
- Global Interpreter Lock(全局解释器锁)
- 确保同一时刻只有一个线程执行Python字节码
- 保护Python对象的内存安全
GIL的影响:
- 多线程无法真正并行执行CPU密集型任务
- IO密集型任务可以通过线程切换获得性能提升
- 多进程可以绕过GIL限制
CPU密集型vs IO密集型任务¶
理解任务类型对选择合适的并发方案很重要:
CPU密集型任务:
- 大量的计算操作
- 很少的IO等待
- 适合使用多进程
- 例:数学计算、图像处理、数据分析
IO密集型任务:
- 大量的IO等待
- 较少的计算操作
- 适合使用多线程或异步编程
- 例:网络请求、文件读写、数据库操作
15.2 多线程编程¶
threading模块基础¶
Python的threading模块提供了线程编程的完整支持。让我们从一个基础示例开始,创建basic_threading.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
基础线程示例
演示线程创建和执行的基本用法
"""
import threading
import time
import random
def worker_function(name, duration):
"""工作函数"""
print(f"线程 {name} 开始执行,预计耗时 {duration} 秒")
start_time = time.time()
# 模拟工作
time.sleep(duration)
end_time = time.time()
actual_duration = end_time - start_time
print(f"线程 {name} 执行完成,实际耗时 {actual_duration:.2f} 秒")
def demonstrate_basic_threading():
"""演示基础线程使用"""
print("=== 基础线程演示 ===")
print(f"主线程ID: {threading.current_thread().ident}")
print(f"主线程名称: {threading.current_thread().name}")
# 创建线程
threads = []
for i in range(3):
duration = random.uniform(1, 3)
thread = threading.Thread(
target=worker_function,
args=(f"Worker-{i+1}", duration),
name=f"Thread-{i+1}"
)
threads.append(thread)
# 启动线程
start_time = time.time()
for thread in threads:
thread.start()
print(f"启动线程: {thread.name}")
# 等待所有线程完成
for thread in threads:
thread.join()
print(f"线程 {thread.name} 已结束")
end_time = time.time()
total_time = end_time - start_time
print(f"总执行时间: {total_time:.2f} 秒")
print("=== 演示结束 ===\n")
if __name__ == "__main__":
demonstrate_basic_threading()
执行上面的程序:
python basic_threading.py
输出结果:
=== 基础线程演示 ===
主线程ID: 24960
主线程名称: MainThread
线程 Worker-1 开始执行,预计耗时 1.0294828368882152 秒
启动线程: Thread-1
线程 Worker-2 开始执行,预计耗时 2.186671252160916 秒
启动线程: Thread-2
线程 Worker-3 开始执行,预计耗时 2.188518479545727 秒
启动线程: Thread-3
线程 Worker-1 执行完成,实际耗时 1.03 秒
线程 Thread-1 已结束
线程 Worker-2 执行完成,实际耗时 2.19 秒
线程 Thread-2 已结束
线程 Worker-3 执行完成,实际耗时 2.19 秒
线程 Thread-3 已结束
总执行时间: 2.19 秒
=== 演示结束 ===
从输出可以看到,三个线程并发执行,总时间约等于最长线程的执行时间,而不是所有线程时间的累加。
守护线程¶
守护线程是一种特殊的线程,它会在主程序结束时自动终止。在basic_threading.py中继续添加:
def demonstrate_daemon_thread():
"""演示守护线程"""
print("=== 守护线程演示 ===")
def daemon_worker():
"""守护线程工作函数"""
for i in range(10):
print(f"守护线程工作中... {i+1}/10")
time.sleep(0.5)
def normal_worker():
"""普通线程工作函数"""
for i in range(3):
print(f"普通线程工作中... {i+1}/3")
time.sleep(1)
print("普通线程工作完成")
# 创建守护线程
daemon_thread = threading.Thread(target=daemon_worker, name="DaemonThread")
daemon_thread.daemon = True # 设置为守护线程
# 创建普通线程
normal_thread = threading.Thread(target=normal_worker, name="NormalThread")
# 启动线程
daemon_thread.start()
normal_thread.start()
print("守护线程是否为守护线程:", daemon_thread.daemon)
print("普通线程是否为守护线程:", normal_thread.daemon)
# 只等待普通线程
normal_thread.join()
print("主程序即将结束(守护线程将自动终止)")
print("=== 演示结束 ===\n")
线程同步 - 解决竞态条件¶
在多线程编程中,当多个线程同时访问共享资源时,可能会出现竞态条件(Race Condition)。创建thread_synchronization.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
线程同步示例
演示各种同步原语的使用
"""
import threading
import time
import random
from queue import Queue
# 全局变量用于演示同步
counter = 0
lock = threading.Lock()
def unsafe_counter_increment():
"""不安全的计数器递增(会出现竞态条件)"""
global counter
for _ in range(100000):
counter += 1
def safe_counter_increment():
"""安全的计数器递增(使用锁)"""
global counter, lock
for _ in range(100000):
with lock:
counter += 1
def demonstrate_race_condition():
"""演示竞态条件"""
global counter
print("=== 竞态条件演示 ===")
# 重置计数器
counter = 0
# 创建多个线程执行不安全的递增
threads = []
for i in range(5):
thread = threading.Thread(target=unsafe_counter_increment)
threads.append(thread)
start_time = time.time()
# 启动所有线程
for thread in threads:
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
end_time = time.time()
print(f"预期结果: {5 * 100000}")
print(f"实际结果: {counter}")
print(f"执行时间: {end_time - start_time:.4f} 秒")
print("结果不一致说明存在竞态条件\n")
def demonstrate_thread_lock():
"""演示线程锁"""
global counter
print("=== 线程锁演示 ===")
# 重置计数器
counter = 0
# 创建多个线程执行安全的递增
threads = []
for i in range(5):
thread = threading.Thread(target=safe_counter_increment)
threads.append(thread)
start_time = time.time()
# 启动所有线程
for thread in threads:
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
end_time = time.time()
print(f"预期结果: {5 * 100000}")
print(f"实际结果: {counter}")
print(f"执行时间: {end_time - start_time:.4f} 秒")
print("结果一致说明同步成功\n")
执行程序:
python thread_synchronization.py
输出结果:
=== 竞态条件演示 ===
预期结果: 500000
实际结果: 500000
执行时间: 0.0315 秒
结果不一致说明存在竞态条件
=== 线程锁演示 ===
预期结果: 500000
实际结果: 500000
执行时间: 0.1359 秒
结果一致说明同步成功
可以看到,使用锁虽然保证了数据的一致性,但执行时间明显增加了,这是同步的代价。
线程间通信 - 生产者消费者模式¶
队列(Queue)是线程间通信的重要工具,特别适合实现生产者消费者模式。在thread_synchronization.py中继续添加:
class ProducerConsumer:
"""生产者消费者模式演示"""
def __init__(self, max_size=5):
self.queue = Queue(maxsize=max_size)
self.produced_count = 0
self.consumed_count = 0
self.lock = threading.Lock()
def producer(self, producer_id, items_to_produce):
"""生产者函数"""
for i in range(items_to_produce):
item = f"Producer-{producer_id}-Item-{i+1}"
self.queue.put(item)
with self.lock:
self.produced_count += 1
print(f"生产者 {producer_id} 生产了: {item} (队列大小: {self.queue.qsize()})")
time.sleep(random.uniform(0.1, 0.3))
def consumer(self, consumer_id, items_to_consume):
"""消费者函数"""
consumed = 0
while consumed < items_to_consume:
try:
item = self.queue.get(timeout=1)
with self.lock:
self.consumed_count += 1
print(f"消费者 {consumer_id} 消费了: {item} (队列大小: {self.queue.qsize()})")
consumed += 1
time.sleep(random.uniform(0.1, 0.5))
except:
print(f"消费者 {consumer_id} 等待超时")
break
def demonstrate_producer_consumer():
"""演示生产者消费者模式"""
print("=== 生产者消费者模式演示 ===")
pc = ProducerConsumer(max_size=3)
# 创建生产者和消费者线程
threads = []
# 2个生产者
for i in range(2):
producer_thread = threading.Thread(
target=pc.producer,
args=(i+1, 5),
name=f"Producer-{i+1}"
)
threads.append(producer_thread)
# 2个消费者
for i in range(2):
consumer_thread = threading.Thread(
target=pc.consumer,
args=(i+1, 5),
name=f"Consumer-{i+1}"
)
threads.append(consumer_thread)
# 启动所有线程
start_time = time.time()
for thread in threads:
thread.start()
print(f"启动线程: {thread.name}")
# 等待所有线程完成
for thread in threads:
thread.join()
end_time = time.time()
print(f"\n总生产数量: {pc.produced_count}")
print(f"总消费数量: {pc.consumed_count}")
print(f"总执行时间: {end_time - start_time:.2f} 秒")
print("=== 演示结束 ===\n")
运行输出:
生产者 1 生产了: Producer-1-Item-1 (队列大小: 1)
启动线程: Producer-1
生产者 2 生产了: Producer-2-Item-1 (队列大小: 2)
启动线程: Producer-2
消费者 1 消费了: Producer-1-Item-1 (队列大小: 1)
启动线程: Consumer-1
消费者 2 消费了: Producer-2-Item-1 (队列大小: 0)
启动线程: Consumer-2
...(更多输出)
总生产数量: 10
总消费数量: 10
总执行时间: 1.94 秒
=== 演示结束 ===
15.3 多进程编程¶
multiprocessing模块基础¶
多进程编程能够绕过GIL的限制,真正利用多核处理器的能力。创建multiprocess_demo.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
多进程编程示例
演示multiprocessing模块的基本用法
"""
import multiprocessing
import time
import os
import random
from multiprocessing import Queue, Process, Pool
def worker_process(name, duration):
"""工作进程函数"""
process_id = os.getpid()
parent_id = os.getppid()
print(f"进程 {name} (PID: {process_id}, 父PID: {parent_id}) 开始执行")
start_time = time.time()
# 模拟工作
time.sleep(duration)
end_time = time.time()
actual_duration = end_time - start_time
print(f"进程 {name} (PID: {process_id}) 执行完成,耗时 {actual_duration:.2f} 秒")
return f"进程 {name} 的结果"
def demonstrate_basic_multiprocessing():
"""演示基础多进程使用"""
print("=== 基础多进程演示 ===")
print(f"主进程PID: {os.getpid()}")
# 创建进程
processes = []
for i in range(3):
duration = random.uniform(1, 2)
process = Process(
target=worker_process,
args=(f"Worker-{i+1}", duration),
name=f"Process-{i+1}"
)
processes.append(process)
# 启动进程
start_time = time.time()
for process in processes:
process.start()
print(f"启动进程: {process.name} (PID: {process.pid})")
# 等待所有进程完成
for process in processes:
process.join()
print(f"进程 {process.name} 已结束,退出码: {process.exitcode}")
end_time = time.time()
total_time = end_time - start_time
print(f"总执行时间: {total_time:.2f} 秒")
print("=== 演示结束 ===\n")
进程池¶
进程池提供了更高级的进程管理方式,特别适合CPU密集型任务:
def cpu_intensive_task(n):
"""CPU密集型任务"""
start_time = time.time()
result = 0
for i in range(n):
result += i * i
end_time = time.time()
process_id = os.getpid()
return {
'process_id': process_id,
'n': n,
'result': result,
'duration': end_time - start_time
}
def demonstrate_process_pool():
"""演示进程池"""
print("=== 进程池演示 ===")
# 准备任务数据
tasks = [1000000, 2000000, 1500000, 3000000, 2500000]
print(f"CPU核心数: {multiprocessing.cpu_count()}")
print(f"任务列表: {tasks}")
# 使用进程池执行任务
start_time = time.time()
with Pool(processes=4) as pool:
print("进程池已创建,开始分配任务...")
results = pool.map(cpu_intensive_task, tasks)
end_time = time.time()
# 输出结果
print("\n任务执行结果:")
for result in results:
print(f"进程 {result['process_id']}: 计算 {result['n']} 项,"
f"结果 {result['result']}, 耗时 {result['duration']:.4f} 秒")
print(f"\n进程池总执行时间: {end_time - start_time:.4f} 秒")
print("=== 演示结束 ===\n")
if __name__ == "__main__":
# 设置进程启动方法(Windows需要)
multiprocessing.set_start_method('spawn', force=True)
demonstrate_basic_multiprocessing()
demonstrate_process_pool()
15.4 异步编程¶
asyncio基础¶
异步编程是处理IO密集型任务的高效方式。Python的asyncio模块提供了完整的异步编程支持。创建asyncio_demo.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
异步编程示例
演示asyncio模块的基本用法
"""
import asyncio
import time
import random
async def async_worker(name, duration):
"""异步工作函数"""
print(f"协程 {name} 开始执行,预计耗时 {duration} 秒")
start_time = time.time()
# 异步等待
await asyncio.sleep(duration)
end_time = time.time()
actual_duration = end_time - start_time
print(f"协程 {name} 执行完成,实际耗时 {actual_duration:.2f} 秒")
return f"协程 {name} 的结果"
async def demonstrate_basic_asyncio():
"""演示基础异步编程"""
print("=== 基础异步编程演示 ===")
# 创建多个协程任务
tasks = []
for i in range(3):
duration = random.uniform(1, 2)
task = async_worker(f"Worker-{i+1}", duration)
tasks.append(task)
# 并发执行所有任务
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
print("\n执行结果:")
for result in results:
print(f" {result}")
total_time = end_time - start_time
print(f"总执行时间: {total_time:.2f} 秒")
print("=== 演示结束 ===\n")
异步HTTP请求¶
异步编程在网络请求中特别有用。首先需要安装aiohttp库:
pip install aiohttp
然后在asyncio_demo.py中继续添加:
import aiohttp
async def fetch_url(session, url, name):
"""异步获取URL内容"""
try:
print(f"开始请求 {name}: {url}")
start_time = time.time()
async with session.get(url, timeout=10) as response:
content = await response.text()
end_time = time.time()
duration = end_time - start_time
print(f"完成请求 {name}: 状态码 {response.status}, "
f"内容长度 {len(content)}, 耗时 {duration:.2f} 秒")
return {
'name': name,
'url': url,
'status': response.status,
'length': len(content),
'duration': duration
}
except asyncio.TimeoutError:
print(f"请求 {name} 超时")
return {'name': name, 'url': url, 'error': 'timeout'}
except Exception as e:
print(f"请求 {name} 出错: {e}")
return {'name': name, 'url': url, 'error': str(e)}
async def demonstrate_async_http():
"""演示异步HTTP请求"""
print("=== 异步HTTP请求演示 ===")
urls = [
("百度", "https://www.baidu.com"),
("知乎", "https://www.zhihu.com"),
("GitHub", "https://github.com"),
("Python官网", "https://www.python.org")
]
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url, name) for name, url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
print("\n请求结果汇总:")
for result in results:
if isinstance(result, dict) and 'error' not in result:
print(f" {result['name']}: {result['status']} - "
f"{result['length']} bytes - {result['duration']:.2f}s")
elif isinstance(result, dict):
print(f" {result['name']}: 错误 - {result.get('error', '未知错误')}")
else:
print(f" 异常: {result}")
total_time = end_time - start_time
print(f"\n异步HTTP总执行时间: {total_time:.2f} 秒")
print("=== 演示结束 ===\n")
执行程序:
python asyncio_demo.py
输出结果:
=== 异步HTTP请求演示 ===
开始请求 百度: https://www.baidu.com
开始请求 知乎: https://www.zhihu.com
开始请求 GitHub: https://github.com
开始请求 Python官网: https://www.python.org
完成请求 百度: 状态码 200, 内容长度 28918, 耗时 0.31 秒
完成请求 知乎: 状态码 403, 内容长度 118, 耗时 0.39 秒
完成请求 GitHub: 状态码 200, 内容长度 553650, 耗时 0.51 秒
完成请求 Python官网: 状态码 200, 内容长度 50272, 耗时 1.18 秒
请求结果汇总:
百度: 200 - 28918 bytes - 0.31s
知乎: 403 - 118 bytes - 0.39s
GitHub: 200 - 553650 bytes - 0.51s
Python官网: 200 - 50272 bytes - 1.18s
异步HTTP总执行时间: 1.19 秒
可以看到,4个HTTP请求并发执行,总时间约为最慢请求的时间。
15.5 concurrent.futures模块¶
concurrent.futures模块提供了高级的并发编程接口,可以统一处理线程和进程。创建futures_demo.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
concurrent.futures模块演示
统一的并发编程接口
"""
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time
def cpu_bound_task(n):
"""CPU密集型任务"""
start_time = time.time()
result = sum(i * i for i in range(n))
end_time = time.time()
return {
'n': n,
'result': result,
'duration': end_time - start_time
}
def io_bound_task(duration):
"""IO密集型任务"""
time.sleep(duration)
return f"任务完成,耗时 {duration} 秒"
def demonstrate_thread_pool_executor():
"""演示线程池执行器"""
print("=== ThreadPoolExecutor演示 ===")
tasks = [0.5, 1.0, 0.8, 1.2]
start_time = time.time()
# 使用线程池执行器
with ThreadPoolExecutor(max_workers=4) as executor:
# 提交任务
futures = [executor.submit(io_bound_task, duration) for duration in tasks]
# 获取结果
results = [future.result() for future in futures]
end_time = time.time()
print("任务结果:")
for result in results:
print(f" {result}")
print(f"ThreadPoolExecutor总耗时: {end_time - start_time:.2f} 秒")
print("=== 演示结束 ===\n")
def demonstrate_process_pool_executor():
"""演示进程池执行器"""
print("=== ProcessPoolExecutor演示 ===")
tasks = [1000000, 2000000, 1500000, 3000000]
start_time = time.time()
# 使用进程池执行器
with ProcessPoolExecutor(max_workers=4) as executor:
# 提交任务
futures = [executor.submit(cpu_bound_task, n) for n in tasks]
# 获取结果
results = [future.result() for future in futures]
end_time = time.time()
print("任务结果:")
for result in results:
print(f" 计算 {result['n']} 项: 结果 {result['result']}, "
f"耗时 {result['duration']:.4f} 秒")
print(f"ProcessPoolExecutor总耗时: {end_time - start_time:.4f} 秒")
print("=== 演示结束 ===\n")
if __name__ == "__main__":
demonstrate_thread_pool_executor()
demonstrate_process_pool_executor()
15.6 同步原语详解¶
各种锁的使用¶
创建sync_primitives_demo.py来演示各种同步原语:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
同步原语详解
演示各种锁和同步机制的使用
"""
import threading
import time
# 信号量演示
def demonstrate_semaphore():
"""演示信号量"""
print("=== 信号量演示 ===")
# 创建信号量,最多允许2个线程同时访问
semaphore = threading.Semaphore(2)
def access_resource(thread_id):
print(f"线程 {thread_id} 尝试获取资源...")
with semaphore:
print(f"线程 {thread_id} 获得资源访问权")
time.sleep(2) # 模拟资源使用
print(f"线程 {thread_id} 释放资源")
# 创建5个线程尝试访问资源
threads = []
for i in range(5):
thread = threading.Thread(target=access_resource, args=(i+1,))
threads.append(thread)
start_time = time.time()
# 启动所有线程
for thread in threads:
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
end_time = time.time()
print(f"总执行时间: {end_time - start_time:.2f} 秒")
print("=== 演示结束 ===\n")
# 事件演示
def demonstrate_event():
"""演示事件"""
print("=== 事件演示 ===")
event = threading.Event()
def waiter(name):
print(f"等待者 {name} 开始等待事件")
event.wait()
print(f"等待者 {name} 收到事件,开始工作")
time.sleep(1)
print(f"等待者 {name} 工作完成")
def setter():
print("设置者开始工作")
time.sleep(2)
print("设置者触发事件")
event.set()
# 创建线程
threads = []
# 等待者线程
for i in range(3):
thread = threading.Thread(target=waiter, args=(f"Waiter-{i+1}",))
threads.append(thread)
# 设置者线程
setter_thread = threading.Thread(target=setter)
threads.append(setter_thread)
# 启动线程
for thread in threads:
thread.start()
# 等待完成
for thread in threads:
thread.join()
print("=== 演示结束 ===\n")
if __name__ == "__main__":
demonstrate_semaphore()
demonstrate_event()
15.7 并发设计模式¶
工作者模式¶
创建design_patterns_demo.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
并发设计模式演示
工作者模式等
"""
import queue
import threading
import time
import random
class WorkerPool:
"""工作者池"""
def __init__(self, num_workers=4):
self.task_queue = queue.Queue()
self.result_queue = queue.Queue()
self.workers = []
self.num_workers = num_workers
self.shutdown = False
def start(self):
"""启动工作者池"""
for i in range(self.num_workers):
worker = threading.Thread(target=self._worker, args=(i+1,))
worker.start()
self.workers.append(worker)
print(f"启动了 {self.num_workers} 个工作者")
def _worker(self, worker_id):
"""工作者函数"""
while not self.shutdown:
try:
task = self.task_queue.get(timeout=1)
if task is None: # 毒丸,用于停止工作者
break
print(f"工作者 {worker_id} 开始处理任务: {task['id']}")
start_time = time.time()
# 模拟任务处理
time.sleep(random.uniform(0.5, 2.0))
end_time = time.time()
result = {
'task_id': task['id'],
'worker_id': worker_id,
'result': task['data'] * 2,
'duration': end_time - start_time
}
self.result_queue.put(result)
print(f"工作者 {worker_id} 完成任务: {task['id']}")
self.task_queue.task_done()
except queue.Empty:
continue
def submit_task(self, task_id, data):
"""提交任务"""
task = {'id': task_id, 'data': data}
self.task_queue.put(task)
def get_result(self, timeout=None):
"""获取结果"""
try:
return self.result_queue.get(timeout=timeout)
except queue.Empty:
return None
def shutdown_pool(self):
"""关闭工作者池"""
print("正在关闭工作者池...")
self.shutdown = True
# 发送毒丸停止所有工作者
for _ in range(self.num_workers):
self.task_queue.put(None)
# 等待所有工作者结束
for worker in self.workers:
worker.join()
print("工作者池已关闭")
def demonstrate_worker_pool():
"""演示工作者池"""
print("=== 工作者池演示 ===")
# 创建并启动工作者池
pool = WorkerPool(num_workers=3)
pool.start()
# 提交任务
tasks = [
(f"Task-{i+1}", random.randint(1, 100))
for i in range(8)
]
print(f"提交 {len(tasks)} 个任务")
for task_id, data in tasks:
pool.submit_task(task_id, data)
# 收集结果
results = []
for _ in range(len(tasks)):
result = pool.get_result(timeout=5)
if result:
results.append(result)
print(f"收到结果: 任务 {result['task_id']} "
f"由工作者 {result['worker_id']} 完成,"
f"结果 {result['result']},耗时 {result['duration']:.2f} 秒")
# 关闭池
pool.shutdown_pool()
print(f"处理了 {len(results)} 个任务")
print("=== 演示结束 ===\n")
if __name__ == "__main__":
demonstrate_worker_pool()
15.8 性能优化和调试¶
性能测试和比较¶
创建performance_demo.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
并发性能测试和比较
"""
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def cpu_bound_task_sync(n):
"""CPU密集型任务(同步版本)"""
result = 0
for i in range(n):
result += i * i
return result
def io_bound_task_sync(duration):
"""IO密集型任务(同步版本)"""
time.sleep(duration)
return f"同步任务完成,耗时 {duration} 秒"
async def io_bound_task_async(duration):
"""IO密集型任务(异步版本)"""
await asyncio.sleep(duration)
return f"异步任务完成,耗时 {duration} 秒"
def benchmark_cpu_intensive():
"""CPU密集型任务性能测试"""
print("=== CPU密集型任务性能测试 ===")
task_size = 1000000
num_tasks = 4
# 1. 单线程执行
print("1. 单线程执行:")
start_time = time.time()
results = [cpu_bound_task_sync(task_size) for _ in range(num_tasks)]
single_thread_time = time.time() - start_time
print(f" 执行时间: {single_thread_time:.4f} 秒")
# 2. 多线程执行
print("2. 多线程执行:")
start_time = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_bound_task_sync, [task_size] * num_tasks))
multi_thread_time = time.time() - start_time
print(f" 执行时间: {multi_thread_time:.4f} 秒")
# 3. 多进程执行
print("3. 多进程执行:")
start_time = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_bound_task_sync, [task_size] * num_tasks))
multi_process_time = time.time() - start_time
print(f" 执行时间: {multi_process_time:.4f} 秒")
# 性能比较
print("\n性能比较:")
print(f"多线程相对单线程: {single_thread_time / multi_thread_time:.2f}x")
print(f"多进程相对单线程: {single_thread_time / multi_process_time:.2f}x")
print(f"多进程相对多线程: {multi_thread_time / multi_process_time:.2f}x")
print("=== 测试结束 ===\n")
async def benchmark_io_intensive():
"""IO密集型任务性能测试"""
print("=== IO密集型任务性能测试 ===")
task_duration = 0.5
num_tasks = 8
# 1. 同步执行
print("1. 同步执行:")
start_time = time.time()
results = [io_bound_task_sync(task_duration) for _ in range(num_tasks)]
sync_time = time.time() - start_time
print(f" 执行时间: {sync_time:.4f} 秒")
# 2. 多线程执行
print("2. 多线程执行:")
start_time = time.time()
with ThreadPoolExecutor(max_workers=8) as executor:
results = list(executor.map(io_bound_task_sync, [task_duration] * num_tasks))
thread_time = time.time() - start_time
print(f" 执行时间: {thread_time:.4f} 秒")
# 3. 异步执行
print("3. 异步执行:")
start_time = time.time()
tasks = [io_bound_task_async(task_duration) for _ in range(num_tasks)]
results = await asyncio.gather(*tasks)
async_time = time.time() - start_time
print(f" 执行时间: {async_time:.4f} 秒")
# 性能比较
print("\n性能比较:")
print(f"多线程相对同步: {sync_time / thread_time:.2f}x")
print(f"异步相对同步: {sync_time / async_time:.2f}x")
print(f"异步相对多线程: {thread_time / async_time:.2f}x")
print("=== 测试结束 ===\n")
async def run_benchmarks():
"""运行性能测试"""
benchmark_cpu_intensive()
await benchmark_io_intensive()
if __name__ == "__main__":
asyncio.run(run_benchmarks())
15.9 实际应用场景¶
数据处理管道¶
创建一个数据处理示例data_processing_demo.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
数据处理管道示例
实际应用场景演示
"""
import csv
import json
import time
import os
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import cpu_count
class DataProcessor:
"""数据处理器"""
def __init__(self, num_workers=4):
self.num_workers = num_workers
def process_chunk(self, data_chunk, processor_id):
"""处理数据块"""
print(f"处理器 {processor_id} 开始处理 {len(data_chunk)} 条记录")
start_time = time.time()
processed_data = []
for record in data_chunk:
# 示例数据处理逻辑
processed_record = {
'id': record.get('id', ''),
'name': record.get('name', '').upper(),
'value': float(record.get('value', 0)) * 1.1,
'processed_at': time.time()
}
processed_data.append(processed_record)
end_time = time.time()
print(f"处理器 {processor_id} 完成,耗时 {end_time - start_time:.2f} 秒")
return processed_data
def process_data_threaded(self, data, chunk_size=1000):
"""使用多线程处理数据"""
print("=== 多线程数据处理 ===")
# 将数据分块
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
print(f"将 {len(data)} 条记录分成 {len(chunks)} 个块")
start_time = time.time()
# 使用线程池处理
with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
futures = [
executor.submit(self.process_chunk, chunk, i+1)
for i, chunk in enumerate(chunks)
]
# 收集结果
results = []
for future in futures:
chunk_result = future.result()
results.extend(chunk_result)
end_time = time.time()
print(f"多线程处理完成,处理了 {len(results)} 条记录")
print(f"总耗时: {end_time - start_time:.2f} 秒")
return results
def create_sample_data(num_records=10000):
"""创建示例数据"""
import random
print(f"创建 {num_records} 条示例数据")
data = []
for i in range(num_records):
record = {
'id': i + 1,
'name': f"用户_{i+1}",
'value': round(random.uniform(10, 1000), 2)
}
data.append(record)
return data
def demonstrate_data_processing():
"""演示数据处理"""
print(f"系统CPU核心数: {cpu_count()}")
# 创建示例数据
sample_data = create_sample_data(num_records=5000)
# 处理数据
processor = DataProcessor(num_workers=4)
processed_data = processor.process_data_threaded(sample_data, chunk_size=1000)
# 显示结果示例
print("\n处理结果示例(前5条):")
for record in processed_data[:5]:
print(f" ID: {record['id']}, 姓名: {record['name']}, "
f"处理后值: {record['value']:.2f}")
print("=== 演示结束 ===")
if __name__ == "__main__":
demonstrate_data_processing()
总结¶
本章详细介绍了Python并发编程的各个方面,从基础概念到高级应用。通过大量的实际代码示例和运行结果,我们学习了:
核心知识点¶
- 并发编程基础:理解了并发与并行的区别,以及Python GIL的影响
- 多线程编程:掌握了线程创建、同步、通信等核心技术
- 多进程编程:学习了进程间通信和进程池的使用
- 异步编程:理解了asyncio的工作原理和异步模式
- 高级工具:掌握了concurrent.futures等高级并发工具
- 设计模式:学习了常见的并发设计模式
性能特点对比¶
| 并发方式 | 适用场景 | 优势 | 劣势 |
|---|---|---|---|
| 多线程 | IO密集型任务 | 内存共享,切换开销小 | 受GIL限制,无法真正并行 |
| 多进程 | CPU密集型任务 | 真正并行,不受GIL限制 | 内存开销大,进程间通信复杂 |
| 异步编程 | 高并发IO | 单线程,无锁问题 | 不适合CPU密集型任务 |
选择建议¶
- CPU密集型任务:使用多进程
- IO密集型任务:使用多线程或异步编程
- 高并发网络请求:优先选择异步编程
- 简单并行计算:使用concurrent.futures
最佳实践¶
- 正确选择并发模型:根据任务类型选择合适的并发方式
- 合理使用同步原语:避免过度同步导致性能下降
- 异常处理:并发程序中的异常处理要特别谨慎
- 资源管理:及时释放线程、进程等资源
- 性能测试:通过实际测试验证并发效果
通过掌握这些并发编程技术,您可以编写出高性能、高并发的Python应用程序,充分利用现代计算机的多核处理能力。在实际项目中,要根据具体需求选择合适的并发策略,并进行充分的测试和优化。