第15章 併發編程¶
前言¶
在現代計算機系統中,充分利用多核處理器的能力來提高程序性能是一個關鍵技能。Python作爲一門強大的編程語言,提供了多種併發編程的解決方案,包括多線程、多進程和異步編程。本章將詳細介紹這些併發編程技術,通過大量的實際代碼示例和運行結果,幫助您深入理解併發編程的核心概念和實際應用。
無論是處理大量的網絡請求、進行數據密集型計算,還是構建高性能的Web服務,掌握併發編程都是提升程序性能的重要手段。我們將從基礎概念開始,逐步深入到高級技巧和最佳實踐,確保您能夠在實際項目中正確應用這些技術。
15.1 併發編程基礎¶
併發與並行的區別¶
在開始學習具體的併發編程技術之前,我們首先需要理解幾個核心概念:
併發(Concurrency):指的是同時處理多個任務的能力,但不一定是真正的同時執行。在單核處理器上,併發是通過快速切換任務來實現的,給人一種同時執行的錯覺。
並行(Parallelism):指的是真正的同時執行多個任務,這需要多核處理器的支持。每個核心可以獨立執行一個任務。
簡單來說:
- 併發是一種處理能力,強調任務的交替執行
- 並行是一種執行方式,強調任務的同時執行
進程、線程和協程¶
Python中有三種主要的併發編程方式:
-
進程(Process):
- 操作系統分配資源的基本單位
- 擁有獨立的內存空間
- 進程間通信需要特殊機制
- 創建和切換開銷較大
- 能夠真正利用多核處理器 -
線程(Thread):
- 進程內的執行單位
- 共享進程的內存空間
- 線程間通信相對簡單
- 創建和切換開銷較小
- 受Python GIL限制 -
協程(Coroutine):
- 用戶態的輕量級線程
- 由程序員控制切換時機
- 沒有線程切換開銷
- 適合IO密集型任務
- 單線程內的併發
Python的GIL(全局解釋器鎖)¶
Python的GIL是理解Python併發編程的關鍵概念:
什麼是GIL:
- Global Interpreter Lock(全局解釋器鎖)
- 確保同一時刻只有一個線程執行Python字節碼
- 保護Python對象的內存安全
GIL的影響:
- 多線程無法真正並行執行CPU密集型任務
- IO密集型任務可以通過線程切換獲得性能提升
- 多進程可以繞過GIL限制
CPU密集型vs IO密集型任務¶
理解任務類型對選擇合適的併發方案很重要:
CPU密集型任務:
- 大量的計算操作
- 很少的IO等待
- 適合使用多進程
- 例:數學計算、圖像處理、數據分析
IO密集型任務:
- 大量的IO等待
- 較少的計算操作
- 適合使用多線程或異步編程
- 例:網絡請求、文件讀寫、數據庫操作
15.2 多線程編程¶
threading模塊基礎¶
Python的threading模塊提供了線程編程的完整支持。讓我們從一個基礎示例開始,創建basic_threading.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
基礎線程示例
演示線程創建和執行的基本用法
"""
import threading
import time
import random
def worker_function(name, duration):
"""工作函數"""
print(f"線程 {name} 開始執行,預計耗時 {duration} 秒")
start_time = time.time()
# 模擬工作
time.sleep(duration)
end_time = time.time()
actual_duration = end_time - start_time
print(f"線程 {name} 執行完成,實際耗時 {actual_duration:.2f} 秒")
def demonstrate_basic_threading():
"""演示基礎線程使用"""
print("=== 基礎線程演示 ===")
print(f"主線程ID: {threading.current_thread().ident}")
print(f"主線程名稱: {threading.current_thread().name}")
# 創建線程
threads = []
for i in range(3):
duration = random.uniform(1, 3)
thread = threading.Thread(
target=worker_function,
args=(f"Worker-{i+1}", duration),
name=f"Thread-{i+1}"
)
threads.append(thread)
# 啓動線程
start_time = time.time()
for thread in threads:
thread.start()
print(f"啓動線程: {thread.name}")
# 等待所有線程完成
for thread in threads:
thread.join()
print(f"線程 {thread.name} 已結束")
end_time = time.time()
total_time = end_time - start_time
print(f"總執行時間: {total_time:.2f} 秒")
print("=== 演示結束 ===\n")
if __name__ == "__main__":
demonstrate_basic_threading()
執行上面的程序:
python basic_threading.py
輸出結果:
=== 基礎線程演示 ===
主線程ID: 24960
主線程名稱: MainThread
線程 Worker-1 開始執行,預計耗時 1.0294828368882152 秒
啓動線程: Thread-1
線程 Worker-2 開始執行,預計耗時 2.186671252160916 秒
啓動線程: Thread-2
線程 Worker-3 開始執行,預計耗時 2.188518479545727 秒
啓動線程: Thread-3
線程 Worker-1 執行完成,實際耗時 1.03 秒
線程 Thread-1 已結束
線程 Worker-2 執行完成,實際耗時 2.19 秒
線程 Thread-2 已結束
線程 Worker-3 執行完成,實際耗時 2.19 秒
線程 Thread-3 已結束
總執行時間: 2.19 秒
=== 演示結束 ===
從輸出可以看到,三個線程併發執行,總時間約等於最長線程的執行時間,而不是所有線程時間的累加。
守護線程¶
守護線程是一種特殊的線程,它會在主程序結束時自動終止。在basic_threading.py中繼續添加:
def demonstrate_daemon_thread():
"""演示守護線程"""
print("=== 守護線程演示 ===")
def daemon_worker():
"""守護線程工作函數"""
for i in range(10):
print(f"守護線程工作中... {i+1}/10")
time.sleep(0.5)
def normal_worker():
"""普通線程工作函數"""
for i in range(3):
print(f"普通線程工作中... {i+1}/3")
time.sleep(1)
print("普通線程工作完成")
# 創建守護線程
daemon_thread = threading.Thread(target=daemon_worker, name="DaemonThread")
daemon_thread.daemon = True # 設置爲守護線程
# 創建普通線程
normal_thread = threading.Thread(target=normal_worker, name="NormalThread")
# 啓動線程
daemon_thread.start()
normal_thread.start()
print("守護線程是否爲守護線程:", daemon_thread.daemon)
print("普通線程是否爲守護線程:", normal_thread.daemon)
# 只等待普通線程
normal_thread.join()
print("主程序即將結束(守護線程將自動終止)")
print("=== 演示結束 ===\n")
線程同步 - 解決競態條件¶
在多線程編程中,當多個線程同時訪問共享資源時,可能會出現競態條件(Race Condition)。創建thread_synchronization.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
線程同步示例
演示各種同步原語的使用
"""
import threading
import time
import random
from queue import Queue
# 全局變量用於演示同步
counter = 0
lock = threading.Lock()
def unsafe_counter_increment():
"""不安全的計數器遞增(會出現競態條件)"""
global counter
for _ in range(100000):
counter += 1
def safe_counter_increment():
"""安全的計數器遞增(使用鎖)"""
global counter, lock
for _ in range(100000):
with lock:
counter += 1
def demonstrate_race_condition():
"""演示競態條件"""
global counter
print("=== 競態條件演示 ===")
# 重置計數器
counter = 0
# 創建多個線程執行不安全的遞增
threads = []
for i in range(5):
thread = threading.Thread(target=unsafe_counter_increment)
threads.append(thread)
start_time = time.time()
# 啓動所有線程
for thread in threads:
thread.start()
# 等待所有線程完成
for thread in threads:
thread.join()
end_time = time.time()
print(f"預期結果: {5 * 100000}")
print(f"實際結果: {counter}")
print(f"執行時間: {end_time - start_time:.4f} 秒")
print("結果不一致說明存在競態條件\n")
def demonstrate_thread_lock():
"""演示線程鎖"""
global counter
print("=== 線程鎖演示 ===")
# 重置計數器
counter = 0
# 創建多個線程執行安全的遞增
threads = []
for i in range(5):
thread = threading.Thread(target=safe_counter_increment)
threads.append(thread)
start_time = time.time()
# 啓動所有線程
for thread in threads:
thread.start()
# 等待所有線程完成
for thread in threads:
thread.join()
end_time = time.time()
print(f"預期結果: {5 * 100000}")
print(f"實際結果: {counter}")
print(f"執行時間: {end_time - start_time:.4f} 秒")
print("結果一致說明同步成功\n")
執行程序:
python thread_synchronization.py
輸出結果:
=== 競態條件演示 ===
預期結果: 500000
實際結果: 500000
執行時間: 0.0315 秒
結果不一致說明存在競態條件
=== 線程鎖演示 ===
預期結果: 500000
實際結果: 500000
執行時間: 0.1359 秒
結果一致說明同步成功
可以看到,使用鎖雖然保證了數據的一致性,但執行時間明顯增加了,這是同步的代價。
線程間通信 - 生產者消費者模式¶
隊列(Queue)是線程間通信的重要工具,特別適合實現生產者消費者模式。在thread_synchronization.py中繼續添加:
class ProducerConsumer:
"""生產者消費者模式演示"""
def __init__(self, max_size=5):
self.queue = Queue(maxsize=max_size)
self.produced_count = 0
self.consumed_count = 0
self.lock = threading.Lock()
def producer(self, producer_id, items_to_produce):
"""生產者函數"""
for i in range(items_to_produce):
item = f"Producer-{producer_id}-Item-{i+1}"
self.queue.put(item)
with self.lock:
self.produced_count += 1
print(f"生產者 {producer_id} 生產了: {item} (隊列大小: {self.queue.qsize()})")
time.sleep(random.uniform(0.1, 0.3))
def consumer(self, consumer_id, items_to_consume):
"""消費者函數"""
consumed = 0
while consumed < items_to_consume:
try:
item = self.queue.get(timeout=1)
with self.lock:
self.consumed_count += 1
print(f"消費者 {consumer_id} 消費了: {item} (隊列大小: {self.queue.qsize()})")
consumed += 1
time.sleep(random.uniform(0.1, 0.5))
except:
print(f"消費者 {consumer_id} 等待超時")
break
def demonstrate_producer_consumer():
"""演示生產者消費者模式"""
print("=== 生產者消費者模式演示 ===")
pc = ProducerConsumer(max_size=3)
# 創建生產者和消費者線程
threads = []
# 2個生產者
for i in range(2):
producer_thread = threading.Thread(
target=pc.producer,
args=(i+1, 5),
name=f"Producer-{i+1}"
)
threads.append(producer_thread)
# 2個消費者
for i in range(2):
consumer_thread = threading.Thread(
target=pc.consumer,
args=(i+1, 5),
name=f"Consumer-{i+1}"
)
threads.append(consumer_thread)
# 啓動所有線程
start_time = time.time()
for thread in threads:
thread.start()
print(f"啓動線程: {thread.name}")
# 等待所有線程完成
for thread in threads:
thread.join()
end_time = time.time()
print(f"\n總生產數量: {pc.produced_count}")
print(f"總消費數量: {pc.consumed_count}")
print(f"總執行時間: {end_time - start_time:.2f} 秒")
print("=== 演示結束 ===\n")
運行輸出:
生產者 1 生產了: Producer-1-Item-1 (隊列大小: 1)
啓動線程: Producer-1
生產者 2 生產了: Producer-2-Item-1 (隊列大小: 2)
啓動線程: Producer-2
消費者 1 消費了: Producer-1-Item-1 (隊列大小: 1)
啓動線程: Consumer-1
消費者 2 消費了: Producer-2-Item-1 (隊列大小: 0)
啓動線程: Consumer-2
...(更多輸出)
總生產數量: 10
總消費數量: 10
總執行時間: 1.94 秒
=== 演示結束 ===
15.3 多進程編程¶
multiprocessing模塊基礎¶
多進程編程能夠繞過GIL的限制,真正利用多核處理器的能力。創建multiprocess_demo.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
多進程編程示例
演示multiprocessing模塊的基本用法
"""
import multiprocessing
import time
import os
import random
from multiprocessing import Queue, Process, Pool
def worker_process(name, duration):
"""工作進程函數"""
process_id = os.getpid()
parent_id = os.getppid()
print(f"進程 {name} (PID: {process_id}, 父PID: {parent_id}) 開始執行")
start_time = time.time()
# 模擬工作
time.sleep(duration)
end_time = time.time()
actual_duration = end_time - start_time
print(f"進程 {name} (PID: {process_id}) 執行完成,耗時 {actual_duration:.2f} 秒")
return f"進程 {name} 的結果"
def demonstrate_basic_multiprocessing():
"""演示基礎多進程使用"""
print("=== 基礎多進程演示 ===")
print(f"主進程PID: {os.getpid()}")
# 創建進程
processes = []
for i in range(3):
duration = random.uniform(1, 2)
process = Process(
target=worker_process,
args=(f"Worker-{i+1}", duration),
name=f"Process-{i+1}"
)
processes.append(process)
# 啓動進程
start_time = time.time()
for process in processes:
process.start()
print(f"啓動進程: {process.name} (PID: {process.pid})")
# 等待所有進程完成
for process in processes:
process.join()
print(f"進程 {process.name} 已結束,退出碼: {process.exitcode}")
end_time = time.time()
total_time = end_time - start_time
print(f"總執行時間: {total_time:.2f} 秒")
print("=== 演示結束 ===\n")
進程池¶
進程池提供了更高級的進程管理方式,特別適合CPU密集型任務:
def cpu_intensive_task(n):
"""CPU密集型任務"""
start_time = time.time()
result = 0
for i in range(n):
result += i * i
end_time = time.time()
process_id = os.getpid()
return {
'process_id': process_id,
'n': n,
'result': result,
'duration': end_time - start_time
}
def demonstrate_process_pool():
"""演示進程池"""
print("=== 進程池演示 ===")
# 準備任務數據
tasks = [1000000, 2000000, 1500000, 3000000, 2500000]
print(f"CPU核心數: {multiprocessing.cpu_count()}")
print(f"任務列表: {tasks}")
# 使用進程池執行任務
start_time = time.time()
with Pool(processes=4) as pool:
print("進程池已創建,開始分配任務...")
results = pool.map(cpu_intensive_task, tasks)
end_time = time.time()
# 輸出結果
print("\n任務執行結果:")
for result in results:
print(f"進程 {result['process_id']}: 計算 {result['n']} 項,"
f"結果 {result['result']}, 耗時 {result['duration']:.4f} 秒")
print(f"\n進程池總執行時間: {end_time - start_time:.4f} 秒")
print("=== 演示結束 ===\n")
if __name__ == "__main__":
# 設置進程啓動方法(Windows需要)
multiprocessing.set_start_method('spawn', force=True)
demonstrate_basic_multiprocessing()
demonstrate_process_pool()
15.4 異步編程¶
asyncio基礎¶
異步編程是處理IO密集型任務的高效方式。Python的asyncio模塊提供了完整的異步編程支持。創建asyncio_demo.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
異步編程示例
演示asyncio模塊的基本用法
"""
import asyncio
import time
import random
async def async_worker(name, duration):
"""異步工作函數"""
print(f"協程 {name} 開始執行,預計耗時 {duration} 秒")
start_time = time.time()
# 異步等待
await asyncio.sleep(duration)
end_time = time.time()
actual_duration = end_time - start_time
print(f"協程 {name} 執行完成,實際耗時 {actual_duration:.2f} 秒")
return f"協程 {name} 的結果"
async def demonstrate_basic_asyncio():
"""演示基礎異步編程"""
print("=== 基礎異步編程演示 ===")
# 創建多個協程任務
tasks = []
for i in range(3):
duration = random.uniform(1, 2)
task = async_worker(f"Worker-{i+1}", duration)
tasks.append(task)
# 併發執行所有任務
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
print("\n執行結果:")
for result in results:
print(f" {result}")
total_time = end_time - start_time
print(f"總執行時間: {total_time:.2f} 秒")
print("=== 演示結束 ===\n")
異步HTTP請求¶
異步編程在網絡請求中特別有用。首先需要安裝aiohttp庫:
pip install aiohttp
然後在asyncio_demo.py中繼續添加:
import aiohttp
async def fetch_url(session, url, name):
"""異步獲取URL內容"""
try:
print(f"開始請求 {name}: {url}")
start_time = time.time()
async with session.get(url, timeout=10) as response:
content = await response.text()
end_time = time.time()
duration = end_time - start_time
print(f"完成請求 {name}: 狀態碼 {response.status}, "
f"內容長度 {len(content)}, 耗時 {duration:.2f} 秒")
return {
'name': name,
'url': url,
'status': response.status,
'length': len(content),
'duration': duration
}
except asyncio.TimeoutError:
print(f"請求 {name} 超時")
return {'name': name, 'url': url, 'error': 'timeout'}
except Exception as e:
print(f"請求 {name} 出錯: {e}")
return {'name': name, 'url': url, 'error': str(e)}
async def demonstrate_async_http():
"""演示異步HTTP請求"""
print("=== 異步HTTP請求演示 ===")
urls = [
("百度", "https://www.baidu.com"),
("知乎", "https://www.zhihu.com"),
("GitHub", "https://github.com"),
("Python官網", "https://www.python.org")
]
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url, name) for name, url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
print("\n請求結果彙總:")
for result in results:
if isinstance(result, dict) and 'error' not in result:
print(f" {result['name']}: {result['status']} - "
f"{result['length']} bytes - {result['duration']:.2f}s")
elif isinstance(result, dict):
print(f" {result['name']}: 錯誤 - {result.get('error', '未知錯誤')}")
else:
print(f" 異常: {result}")
total_time = end_time - start_time
print(f"\n異步HTTP總執行時間: {total_time:.2f} 秒")
print("=== 演示結束 ===\n")
執行程序:
python asyncio_demo.py
輸出結果:
=== 異步HTTP請求演示 ===
開始請求 百度: https://www.baidu.com
開始請求 知乎: https://www.zhihu.com
開始請求 GitHub: https://github.com
開始請求 Python官網: https://www.python.org
完成請求 百度: 狀態碼 200, 內容長度 28918, 耗時 0.31 秒
完成請求 知乎: 狀態碼 403, 內容長度 118, 耗時 0.39 秒
完成請求 GitHub: 狀態碼 200, 內容長度 553650, 耗時 0.51 秒
完成請求 Python官網: 狀態碼 200, 內容長度 50272, 耗時 1.18 秒
請求結果彙總:
百度: 200 - 28918 bytes - 0.31s
知乎: 403 - 118 bytes - 0.39s
GitHub: 200 - 553650 bytes - 0.51s
Python官網: 200 - 50272 bytes - 1.18s
異步HTTP總執行時間: 1.19 秒
可以看到,4個HTTP請求併發執行,總時間約爲最慢請求的時間。
15.5 concurrent.futures模塊¶
concurrent.futures模塊提供了高級的併發編程接口,可以統一處理線程和進程。創建futures_demo.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
concurrent.futures模塊演示
統一的併發編程接口
"""
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time
def cpu_bound_task(n):
"""CPU密集型任務"""
start_time = time.time()
result = sum(i * i for i in range(n))
end_time = time.time()
return {
'n': n,
'result': result,
'duration': end_time - start_time
}
def io_bound_task(duration):
"""IO密集型任務"""
time.sleep(duration)
return f"任務完成,耗時 {duration} 秒"
def demonstrate_thread_pool_executor():
"""演示線程池執行器"""
print("=== ThreadPoolExecutor演示 ===")
tasks = [0.5, 1.0, 0.8, 1.2]
start_time = time.time()
# 使用線程池執行器
with ThreadPoolExecutor(max_workers=4) as executor:
# 提交任務
futures = [executor.submit(io_bound_task, duration) for duration in tasks]
# 獲取結果
results = [future.result() for future in futures]
end_time = time.time()
print("任務結果:")
for result in results:
print(f" {result}")
print(f"ThreadPoolExecutor總耗時: {end_time - start_time:.2f} 秒")
print("=== 演示結束 ===\n")
def demonstrate_process_pool_executor():
"""演示進程池執行器"""
print("=== ProcessPoolExecutor演示 ===")
tasks = [1000000, 2000000, 1500000, 3000000]
start_time = time.time()
# 使用進程池執行器
with ProcessPoolExecutor(max_workers=4) as executor:
# 提交任務
futures = [executor.submit(cpu_bound_task, n) for n in tasks]
# 獲取結果
results = [future.result() for future in futures]
end_time = time.time()
print("任務結果:")
for result in results:
print(f" 計算 {result['n']} 項: 結果 {result['result']}, "
f"耗時 {result['duration']:.4f} 秒")
print(f"ProcessPoolExecutor總耗時: {end_time - start_time:.4f} 秒")
print("=== 演示結束 ===\n")
if __name__ == "__main__":
demonstrate_thread_pool_executor()
demonstrate_process_pool_executor()
15.6 同步原語詳解¶
各種鎖的使用¶
創建sync_primitives_demo.py來演示各種同步原語:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
同步原語詳解
演示各種鎖和同步機制的使用
"""
import threading
import time
# 信號量演示
def demonstrate_semaphore():
"""演示信號量"""
print("=== 信號量演示 ===")
# 創建信號量,最多允許2個線程同時訪問
semaphore = threading.Semaphore(2)
def access_resource(thread_id):
print(f"線程 {thread_id} 嘗試獲取資源...")
with semaphore:
print(f"線程 {thread_id} 獲得資源訪問權")
time.sleep(2) # 模擬資源使用
print(f"線程 {thread_id} 釋放資源")
# 創建5個線程嘗試訪問資源
threads = []
for i in range(5):
thread = threading.Thread(target=access_resource, args=(i+1,))
threads.append(thread)
start_time = time.time()
# 啓動所有線程
for thread in threads:
thread.start()
# 等待所有線程完成
for thread in threads:
thread.join()
end_time = time.time()
print(f"總執行時間: {end_time - start_time:.2f} 秒")
print("=== 演示結束 ===\n")
# 事件演示
def demonstrate_event():
"""演示事件"""
print("=== 事件演示 ===")
event = threading.Event()
def waiter(name):
print(f"等待者 {name} 開始等待事件")
event.wait()
print(f"等待者 {name} 收到事件,開始工作")
time.sleep(1)
print(f"等待者 {name} 工作完成")
def setter():
print("設置者開始工作")
time.sleep(2)
print("設置者觸發事件")
event.set()
# 創建線程
threads = []
# 等待者線程
for i in range(3):
thread = threading.Thread(target=waiter, args=(f"Waiter-{i+1}",))
threads.append(thread)
# 設置者線程
setter_thread = threading.Thread(target=setter)
threads.append(setter_thread)
# 啓動線程
for thread in threads:
thread.start()
# 等待完成
for thread in threads:
thread.join()
print("=== 演示結束 ===\n")
if __name__ == "__main__":
demonstrate_semaphore()
demonstrate_event()
15.7 併發設計模式¶
工作者模式¶
創建design_patterns_demo.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
併發設計模式演示
工作者模式等
"""
import queue
import threading
import time
import random
class WorkerPool:
"""工作者池"""
def __init__(self, num_workers=4):
self.task_queue = queue.Queue()
self.result_queue = queue.Queue()
self.workers = []
self.num_workers = num_workers
self.shutdown = False
def start(self):
"""啓動工作者池"""
for i in range(self.num_workers):
worker = threading.Thread(target=self._worker, args=(i+1,))
worker.start()
self.workers.append(worker)
print(f"啓動了 {self.num_workers} 個工作者")
def _worker(self, worker_id):
"""工作者函數"""
while not self.shutdown:
try:
task = self.task_queue.get(timeout=1)
if task is None: # 毒丸,用於停止工作者
break
print(f"工作者 {worker_id} 開始處理任務: {task['id']}")
start_time = time.time()
# 模擬任務處理
time.sleep(random.uniform(0.5, 2.0))
end_time = time.time()
result = {
'task_id': task['id'],
'worker_id': worker_id,
'result': task['data'] * 2,
'duration': end_time - start_time
}
self.result_queue.put(result)
print(f"工作者 {worker_id} 完成任務: {task['id']}")
self.task_queue.task_done()
except queue.Empty:
continue
def submit_task(self, task_id, data):
"""提交任務"""
task = {'id': task_id, 'data': data}
self.task_queue.put(task)
def get_result(self, timeout=None):
"""獲取結果"""
try:
return self.result_queue.get(timeout=timeout)
except queue.Empty:
return None
def shutdown_pool(self):
"""關閉工作者池"""
print("正在關閉工作者池...")
self.shutdown = True
# 發送毒丸停止所有工作者
for _ in range(self.num_workers):
self.task_queue.put(None)
# 等待所有工作者結束
for worker in self.workers:
worker.join()
print("工作者池已關閉")
def demonstrate_worker_pool():
"""演示工作者池"""
print("=== 工作者池演示 ===")
# 創建並啓動工作者池
pool = WorkerPool(num_workers=3)
pool.start()
# 提交任務
tasks = [
(f"Task-{i+1}", random.randint(1, 100))
for i in range(8)
]
print(f"提交 {len(tasks)} 個任務")
for task_id, data in tasks:
pool.submit_task(task_id, data)
# 收集結果
results = []
for _ in range(len(tasks)):
result = pool.get_result(timeout=5)
if result:
results.append(result)
print(f"收到結果: 任務 {result['task_id']} "
f"由工作者 {result['worker_id']} 完成,"
f"結果 {result['result']},耗時 {result['duration']:.2f} 秒")
# 關閉池
pool.shutdown_pool()
print(f"處理了 {len(results)} 個任務")
print("=== 演示結束 ===\n")
if __name__ == "__main__":
demonstrate_worker_pool()
15.8 性能優化和調試¶
性能測試和比較¶
創建performance_demo.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
併發性能測試和比較
"""
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def cpu_bound_task_sync(n):
"""CPU密集型任務(同步版本)"""
result = 0
for i in range(n):
result += i * i
return result
def io_bound_task_sync(duration):
"""IO密集型任務(同步版本)"""
time.sleep(duration)
return f"同步任務完成,耗時 {duration} 秒"
async def io_bound_task_async(duration):
"""IO密集型任務(異步版本)"""
await asyncio.sleep(duration)
return f"異步任務完成,耗時 {duration} 秒"
def benchmark_cpu_intensive():
"""CPU密集型任務性能測試"""
print("=== CPU密集型任務性能測試 ===")
task_size = 1000000
num_tasks = 4
# 1. 單線程執行
print("1. 單線程執行:")
start_time = time.time()
results = [cpu_bound_task_sync(task_size) for _ in range(num_tasks)]
single_thread_time = time.time() - start_time
print(f" 執行時間: {single_thread_time:.4f} 秒")
# 2. 多線程執行
print("2. 多線程執行:")
start_time = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_bound_task_sync, [task_size] * num_tasks))
multi_thread_time = time.time() - start_time
print(f" 執行時間: {multi_thread_time:.4f} 秒")
# 3. 多進程執行
print("3. 多進程執行:")
start_time = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_bound_task_sync, [task_size] * num_tasks))
multi_process_time = time.time() - start_time
print(f" 執行時間: {multi_process_time:.4f} 秒")
# 性能比較
print("\n性能比較:")
print(f"多線程相對單線程: {single_thread_time / multi_thread_time:.2f}x")
print(f"多進程相對單線程: {single_thread_time / multi_process_time:.2f}x")
print(f"多進程相對多線程: {multi_thread_time / multi_process_time:.2f}x")
print("=== 測試結束 ===\n")
async def benchmark_io_intensive():
"""IO密集型任務性能測試"""
print("=== IO密集型任務性能測試 ===")
task_duration = 0.5
num_tasks = 8
# 1. 同步執行
print("1. 同步執行:")
start_time = time.time()
results = [io_bound_task_sync(task_duration) for _ in range(num_tasks)]
sync_time = time.time() - start_time
print(f" 執行時間: {sync_time:.4f} 秒")
# 2. 多線程執行
print("2. 多線程執行:")
start_time = time.time()
with ThreadPoolExecutor(max_workers=8) as executor:
results = list(executor.map(io_bound_task_sync, [task_duration] * num_tasks))
thread_time = time.time() - start_time
print(f" 執行時間: {thread_time:.4f} 秒")
# 3. 異步執行
print("3. 異步執行:")
start_time = time.time()
tasks = [io_bound_task_async(task_duration) for _ in range(num_tasks)]
results = await asyncio.gather(*tasks)
async_time = time.time() - start_time
print(f" 執行時間: {async_time:.4f} 秒")
# 性能比較
print("\n性能比較:")
print(f"多線程相對同步: {sync_time / thread_time:.2f}x")
print(f"異步相對同步: {sync_time / async_time:.2f}x")
print(f"異步相對多線程: {thread_time / async_time:.2f}x")
print("=== 測試結束 ===\n")
async def run_benchmarks():
"""運行性能測試"""
benchmark_cpu_intensive()
await benchmark_io_intensive()
if __name__ == "__main__":
asyncio.run(run_benchmarks())
15.9 實際應用場景¶
數據處理管道¶
創建一個數據處理示例data_processing_demo.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
數據處理管道示例
實際應用場景演示
"""
import csv
import json
import time
import os
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import cpu_count
class DataProcessor:
"""數據處理器"""
def __init__(self, num_workers=4):
self.num_workers = num_workers
def process_chunk(self, data_chunk, processor_id):
"""處理數據塊"""
print(f"處理器 {processor_id} 開始處理 {len(data_chunk)} 條記錄")
start_time = time.time()
processed_data = []
for record in data_chunk:
# 示例數據處理邏輯
processed_record = {
'id': record.get('id', ''),
'name': record.get('name', '').upper(),
'value': float(record.get('value', 0)) * 1.1,
'processed_at': time.time()
}
processed_data.append(processed_record)
end_time = time.time()
print(f"處理器 {processor_id} 完成,耗時 {end_time - start_time:.2f} 秒")
return processed_data
def process_data_threaded(self, data, chunk_size=1000):
"""使用多線程處理數據"""
print("=== 多線程數據處理 ===")
# 將數據分塊
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
print(f"將 {len(data)} 條記錄分成 {len(chunks)} 個塊")
start_time = time.time()
# 使用線程池處理
with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
futures = [
executor.submit(self.process_chunk, chunk, i+1)
for i, chunk in enumerate(chunks)
]
# 收集結果
results = []
for future in futures:
chunk_result = future.result()
results.extend(chunk_result)
end_time = time.time()
print(f"多線程處理完成,處理了 {len(results)} 條記錄")
print(f"總耗時: {end_time - start_time:.2f} 秒")
return results
def create_sample_data(num_records=10000):
"""創建示例數據"""
import random
print(f"創建 {num_records} 條示例數據")
data = []
for i in range(num_records):
record = {
'id': i + 1,
'name': f"用戶_{i+1}",
'value': round(random.uniform(10, 1000), 2)
}
data.append(record)
return data
def demonstrate_data_processing():
"""演示數據處理"""
print(f"系統CPU核心數: {cpu_count()}")
# 創建示例數據
sample_data = create_sample_data(num_records=5000)
# 處理數據
processor = DataProcessor(num_workers=4)
processed_data = processor.process_data_threaded(sample_data, chunk_size=1000)
# 顯示結果示例
print("\n處理結果示例(前5條):")
for record in processed_data[:5]:
print(f" ID: {record['id']}, 姓名: {record['name']}, "
f"處理後值: {record['value']:.2f}")
print("=== 演示結束 ===")
if __name__ == "__main__":
demonstrate_data_processing()
總結¶
本章詳細介紹了Python併發編程的各個方面,從基礎概念到高級應用。通過大量的實際代碼示例和運行結果,我們學習了:
核心知識點¶
- 併發編程基礎:理解了併發與並行的區別,以及Python GIL的影響
- 多線程編程:掌握了線程創建、同步、通信等核心技術
- 多進程編程:學習了進程間通信和進程池的使用
- 異步編程:理解了asyncio的工作原理和異步模式
- 高級工具:掌握了concurrent.futures等高級併發工具
- 設計模式:學習了常見的併發設計模式
性能特點對比¶
| 併發方式 | 適用場景 | 優勢 | 劣勢 |
|---|---|---|---|
| 多線程 | IO密集型任務 | 內存共享,切換開銷小 | 受GIL限制,無法真正並行 |
| 多進程 | CPU密集型任務 | 真正並行,不受GIL限制 | 內存開銷大,進程間通信複雜 |
| 異步編程 | 高併發IO | 單線程,無鎖問題 | 不適合CPU密集型任務 |
選擇建議¶
- CPU密集型任務:使用多進程
- IO密集型任務:使用多線程或異步編程
- 高併發網絡請求:優先選擇異步編程
- 簡單並行計算:使用concurrent.futures
最佳實踐¶
- 正確選擇併發模型:根據任務類型選擇合適的併發方式
- 合理使用同步原語:避免過度同步導致性能下降
- 異常處理:併發程序中的異常處理要特別謹慎
- 資源管理:及時釋放線程、進程等資源
- 性能測試:通過實際測試驗證併發效果
通過掌握這些併發編程技術,您可以編寫出高性能、高併發的Python應用程序,充分利用現代計算機的多核處理能力。在實際項目中,要根據具體需求選擇合適的併發策略,並進行充分的測試和優化。