第15章 併發編程

前言

在現代計算機系統中,充分利用多核處理器的能力來提高程序性能是一個關鍵技能。Python作爲一門強大的編程語言,提供了多種併發編程的解決方案,包括多線程、多進程和異步編程。本章將詳細介紹這些併發編程技術,通過大量的實際代碼示例和運行結果,幫助您深入理解併發編程的核心概念和實際應用。

無論是處理大量的網絡請求、進行數據密集型計算,還是構建高性能的Web服務,掌握併發編程都是提升程序性能的重要手段。我們將從基礎概念開始,逐步深入到高級技巧和最佳實踐,確保您能夠在實際項目中正確應用這些技術。

15.1 併發編程基礎

併發與並行的區別

在開始學習具體的併發編程技術之前,我們首先需要理解幾個核心概念:

併發(Concurrency):指的是同時處理多個任務的能力,但不一定是真正的同時執行。在單核處理器上,併發是通過快速切換任務來實現的,給人一種同時執行的錯覺。

並行(Parallelism):指的是真正的同時執行多個任務,這需要多核處理器的支持。每個核心可以獨立執行一個任務。

簡單來說:
- 併發是一種處理能力,強調任務的交替執行
- 並行是一種執行方式,強調任務的同時執行

進程、線程和協程

Python中有三種主要的併發編程方式:

  1. 進程(Process)
    - 操作系統分配資源的基本單位
    - 擁有獨立的內存空間
    - 進程間通信需要特殊機制
    - 創建和切換開銷較大
    - 能夠真正利用多核處理器

  2. 線程(Thread)
    - 進程內的執行單位
    - 共享進程的內存空間
    - 線程間通信相對簡單
    - 創建和切換開銷較小
    - 受Python GIL限制

  3. 協程(Coroutine)
    - 用戶態的輕量級線程
    - 由程序員控制切換時機
    - 沒有線程切換開銷
    - 適合IO密集型任務
    - 單線程內的併發

Python的GIL(全局解釋器鎖)

Python的GIL是理解Python併發編程的關鍵概念:

什麼是GIL
- Global Interpreter Lock(全局解釋器鎖)
- 確保同一時刻只有一個線程執行Python字節碼
- 保護Python對象的內存安全

GIL的影響
- 多線程無法真正並行執行CPU密集型任務
- IO密集型任務可以通過線程切換獲得性能提升
- 多進程可以繞過GIL限制

CPU密集型vs IO密集型任務

理解任務類型對選擇合適的併發方案很重要:

CPU密集型任務
- 大量的計算操作
- 很少的IO等待
- 適合使用多進程
- 例:數學計算、圖像處理、數據分析

IO密集型任務
- 大量的IO等待
- 較少的計算操作
- 適合使用多線程或異步編程
- 例:網絡請求、文件讀寫、數據庫操作

15.2 多線程編程

threading模塊基礎

Python的threading模塊提供了線程編程的完整支持。讓我們從一個基礎示例開始,創建basic_threading.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
基礎線程示例
演示線程創建和執行的基本用法
"""

import threading
import time
import random

def worker_function(name, duration):
    """工作函數"""
    print(f"線程 {name} 開始執行,預計耗時 {duration} 秒")
    start_time = time.time()

    # 模擬工作
    time.sleep(duration)

    end_time = time.time()
    actual_duration = end_time - start_time
    print(f"線程 {name} 執行完成,實際耗時 {actual_duration:.2f} 秒")

def demonstrate_basic_threading():
    """演示基礎線程使用"""
    print("=== 基礎線程演示 ===")
    print(f"主線程ID: {threading.current_thread().ident}")
    print(f"主線程名稱: {threading.current_thread().name}")

    # 創建線程
    threads = []
    for i in range(3):
        duration = random.uniform(1, 3)
        thread = threading.Thread(
            target=worker_function,
            args=(f"Worker-{i+1}", duration),
            name=f"Thread-{i+1}"
        )
        threads.append(thread)

    # 啓動線程
    start_time = time.time()
    for thread in threads:
        thread.start()
        print(f"啓動線程: {thread.name}")

    # 等待所有線程完成
    for thread in threads:
        thread.join()
        print(f"線程 {thread.name} 已結束")

    end_time = time.time()
    total_time = end_time - start_time
    print(f"總執行時間: {total_time:.2f} 秒")
    print("=== 演示結束 ===\n")

if __name__ == "__main__":
    demonstrate_basic_threading()

執行上面的程序:

python basic_threading.py

輸出結果:

=== 基礎線程演示 ===
主線程ID: 24960
主線程名稱: MainThread
線程 Worker-1 開始執行,預計耗時 1.0294828368882152 秒
啓動線程: Thread-1
線程 Worker-2 開始執行,預計耗時 2.186671252160916 秒
啓動線程: Thread-2
線程 Worker-3 開始執行,預計耗時 2.188518479545727 秒
啓動線程: Thread-3
線程 Worker-1 執行完成,實際耗時 1.03 秒
線程 Thread-1 已結束
線程 Worker-2 執行完成,實際耗時 2.19 秒
線程 Thread-2 已結束
線程 Worker-3 執行完成,實際耗時 2.19 秒
線程 Thread-3 已結束
總執行時間: 2.19 秒
=== 演示結束 ===

從輸出可以看到,三個線程併發執行,總時間約等於最長線程的執行時間,而不是所有線程時間的累加。

守護線程

守護線程是一種特殊的線程,它會在主程序結束時自動終止。在basic_threading.py中繼續添加:

def demonstrate_daemon_thread():
    """演示守護線程"""
    print("=== 守護線程演示 ===")

    def daemon_worker():
        """守護線程工作函數"""
        for i in range(10):
            print(f"守護線程工作中... {i+1}/10")
            time.sleep(0.5)

    def normal_worker():
        """普通線程工作函數"""
        for i in range(3):
            print(f"普通線程工作中... {i+1}/3")
            time.sleep(1)
        print("普通線程工作完成")

    # 創建守護線程
    daemon_thread = threading.Thread(target=daemon_worker, name="DaemonThread")
    daemon_thread.daemon = True  # 設置爲守護線程

    # 創建普通線程
    normal_thread = threading.Thread(target=normal_worker, name="NormalThread")

    # 啓動線程
    daemon_thread.start()
    normal_thread.start()

    print("守護線程是否爲守護線程:", daemon_thread.daemon)
    print("普通線程是否爲守護線程:", normal_thread.daemon)

    # 只等待普通線程
    normal_thread.join()

    print("主程序即將結束(守護線程將自動終止)")
    print("=== 演示結束 ===\n")

線程同步 - 解決競態條件

在多線程編程中,當多個線程同時訪問共享資源時,可能會出現競態條件(Race Condition)。創建thread_synchronization.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
線程同步示例
演示各種同步原語的使用
"""

import threading
import time
import random
from queue import Queue

# 全局變量用於演示同步
counter = 0
lock = threading.Lock()

def unsafe_counter_increment():
    """不安全的計數器遞增(會出現競態條件)"""
    global counter
    for _ in range(100000):
        counter += 1

def safe_counter_increment():
    """安全的計數器遞增(使用鎖)"""
    global counter, lock
    for _ in range(100000):
        with lock:
            counter += 1

def demonstrate_race_condition():
    """演示競態條件"""
    global counter
    print("=== 競態條件演示 ===")

    # 重置計數器
    counter = 0

    # 創建多個線程執行不安全的遞增
    threads = []
    for i in range(5):
        thread = threading.Thread(target=unsafe_counter_increment)
        threads.append(thread)

    start_time = time.time()

    # 啓動所有線程
    for thread in threads:
        thread.start()

    # 等待所有線程完成
    for thread in threads:
        thread.join()

    end_time = time.time()

    print(f"預期結果: {5 * 100000}")
    print(f"實際結果: {counter}")
    print(f"執行時間: {end_time - start_time:.4f} 秒")
    print("結果不一致說明存在競態條件\n")

def demonstrate_thread_lock():
    """演示線程鎖"""
    global counter
    print("=== 線程鎖演示 ===")

    # 重置計數器
    counter = 0

    # 創建多個線程執行安全的遞增
    threads = []
    for i in range(5):
        thread = threading.Thread(target=safe_counter_increment)
        threads.append(thread)

    start_time = time.time()

    # 啓動所有線程
    for thread in threads:
        thread.start()

    # 等待所有線程完成
    for thread in threads:
        thread.join()

    end_time = time.time()

    print(f"預期結果: {5 * 100000}")
    print(f"實際結果: {counter}")
    print(f"執行時間: {end_time - start_time:.4f} 秒")
    print("結果一致說明同步成功\n")

執行程序:

python thread_synchronization.py

輸出結果:

=== 競態條件演示 ===
預期結果: 500000
實際結果: 500000
執行時間: 0.0315 秒
結果不一致說明存在競態條件

=== 線程鎖演示 ===
預期結果: 500000
實際結果: 500000
執行時間: 0.1359 秒
結果一致說明同步成功

可以看到,使用鎖雖然保證了數據的一致性,但執行時間明顯增加了,這是同步的代價。

線程間通信 - 生產者消費者模式

隊列(Queue)是線程間通信的重要工具,特別適合實現生產者消費者模式。在thread_synchronization.py中繼續添加:

class ProducerConsumer:
    """生產者消費者模式演示"""

    def __init__(self, max_size=5):
        self.queue = Queue(maxsize=max_size)
        self.produced_count = 0
        self.consumed_count = 0
        self.lock = threading.Lock()

    def producer(self, producer_id, items_to_produce):
        """生產者函數"""
        for i in range(items_to_produce):
            item = f"Producer-{producer_id}-Item-{i+1}"
            self.queue.put(item)

            with self.lock:
                self.produced_count += 1
                print(f"生產者 {producer_id} 生產了: {item} (隊列大小: {self.queue.qsize()})")

            time.sleep(random.uniform(0.1, 0.3))

    def consumer(self, consumer_id, items_to_consume):
        """消費者函數"""
        consumed = 0
        while consumed < items_to_consume:
            try:
                item = self.queue.get(timeout=1)

                with self.lock:
                    self.consumed_count += 1
                    print(f"消費者 {consumer_id} 消費了: {item} (隊列大小: {self.queue.qsize()})")

                consumed += 1
                time.sleep(random.uniform(0.1, 0.5))

            except:
                print(f"消費者 {consumer_id} 等待超時")
                break

def demonstrate_producer_consumer():
    """演示生產者消費者模式"""
    print("=== 生產者消費者模式演示 ===")

    pc = ProducerConsumer(max_size=3)

    # 創建生產者和消費者線程
    threads = []

    # 2個生產者
    for i in range(2):
        producer_thread = threading.Thread(
            target=pc.producer,
            args=(i+1, 5),
            name=f"Producer-{i+1}"
        )
        threads.append(producer_thread)

    # 2個消費者
    for i in range(2):
        consumer_thread = threading.Thread(
            target=pc.consumer,
            args=(i+1, 5),
            name=f"Consumer-{i+1}"
        )
        threads.append(consumer_thread)

    # 啓動所有線程
    start_time = time.time()
    for thread in threads:
        thread.start()
        print(f"啓動線程: {thread.name}")

    # 等待所有線程完成
    for thread in threads:
        thread.join()

    end_time = time.time()

    print(f"\n總生產數量: {pc.produced_count}")
    print(f"總消費數量: {pc.consumed_count}")
    print(f"總執行時間: {end_time - start_time:.2f} 秒")
    print("=== 演示結束 ===\n")

運行輸出:

生產者 1 生產了: Producer-1-Item-1 (隊列大小: 1)
啓動線程: Producer-1
生產者 2 生產了: Producer-2-Item-1 (隊列大小: 2)
啓動線程: Producer-2
消費者 1 消費了: Producer-1-Item-1 (隊列大小: 1)
啓動線程: Consumer-1
消費者 2 消費了: Producer-2-Item-1 (隊列大小: 0)
啓動線程: Consumer-2
...(更多輸出)
總生產數量: 10
總消費數量: 10
總執行時間: 1.94 秒
=== 演示結束 ===

15.3 多進程編程

multiprocessing模塊基礎

多進程編程能夠繞過GIL的限制,真正利用多核處理器的能力。創建multiprocess_demo.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
多進程編程示例
演示multiprocessing模塊的基本用法
"""

import multiprocessing
import time
import os
import random
from multiprocessing import Queue, Process, Pool

def worker_process(name, duration):
    """工作進程函數"""
    process_id = os.getpid()
    parent_id = os.getppid()

    print(f"進程 {name} (PID: {process_id}, 父PID: {parent_id}) 開始執行")
    start_time = time.time()

    # 模擬工作
    time.sleep(duration)

    end_time = time.time()
    actual_duration = end_time - start_time
    print(f"進程 {name} (PID: {process_id}) 執行完成,耗時 {actual_duration:.2f} 秒")

    return f"進程 {name} 的結果"

def demonstrate_basic_multiprocessing():
    """演示基礎多進程使用"""
    print("=== 基礎多進程演示 ===")
    print(f"主進程PID: {os.getpid()}")

    # 創建進程
    processes = []
    for i in range(3):
        duration = random.uniform(1, 2)
        process = Process(
            target=worker_process,
            args=(f"Worker-{i+1}", duration),
            name=f"Process-{i+1}"
        )
        processes.append(process)

    # 啓動進程
    start_time = time.time()
    for process in processes:
        process.start()
        print(f"啓動進程: {process.name} (PID: {process.pid})")

    # 等待所有進程完成
    for process in processes:
        process.join()
        print(f"進程 {process.name} 已結束,退出碼: {process.exitcode}")

    end_time = time.time()
    total_time = end_time - start_time
    print(f"總執行時間: {total_time:.2f} 秒")
    print("=== 演示結束 ===\n")

進程池

進程池提供了更高級的進程管理方式,特別適合CPU密集型任務:

def cpu_intensive_task(n):
    """CPU密集型任務"""
    start_time = time.time()
    result = 0
    for i in range(n):
        result += i * i
    end_time = time.time()

    process_id = os.getpid()
    return {
        'process_id': process_id,
        'n': n,
        'result': result,
        'duration': end_time - start_time
    }

def demonstrate_process_pool():
    """演示進程池"""
    print("=== 進程池演示 ===")

    # 準備任務數據
    tasks = [1000000, 2000000, 1500000, 3000000, 2500000]

    print(f"CPU核心數: {multiprocessing.cpu_count()}")
    print(f"任務列表: {tasks}")

    # 使用進程池執行任務
    start_time = time.time()

    with Pool(processes=4) as pool:
        print("進程池已創建,開始分配任務...")
        results = pool.map(cpu_intensive_task, tasks)

    end_time = time.time()

    # 輸出結果
    print("\n任務執行結果:")
    for result in results:
        print(f"進程 {result['process_id']}: 計算 {result['n']} 項,"
              f"結果 {result['result']}, 耗時 {result['duration']:.4f} 秒")

    print(f"\n進程池總執行時間: {end_time - start_time:.4f} 秒")
    print("=== 演示結束 ===\n")

if __name__ == "__main__":
    # 設置進程啓動方法(Windows需要)
    multiprocessing.set_start_method('spawn', force=True)

    demonstrate_basic_multiprocessing()
    demonstrate_process_pool()

15.4 異步編程

asyncio基礎

異步編程是處理IO密集型任務的高效方式。Python的asyncio模塊提供了完整的異步編程支持。創建asyncio_demo.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
異步編程示例
演示asyncio模塊的基本用法
"""

import asyncio
import time
import random

async def async_worker(name, duration):
    """異步工作函數"""
    print(f"協程 {name} 開始執行,預計耗時 {duration} 秒")
    start_time = time.time()

    # 異步等待
    await asyncio.sleep(duration)

    end_time = time.time()
    actual_duration = end_time - start_time
    print(f"協程 {name} 執行完成,實際耗時 {actual_duration:.2f} 秒")

    return f"協程 {name} 的結果"

async def demonstrate_basic_asyncio():
    """演示基礎異步編程"""
    print("=== 基礎異步編程演示 ===")

    # 創建多個協程任務
    tasks = []
    for i in range(3):
        duration = random.uniform(1, 2)
        task = async_worker(f"Worker-{i+1}", duration)
        tasks.append(task)

    # 併發執行所有任務
    start_time = time.time()
    results = await asyncio.gather(*tasks)
    end_time = time.time()

    print("\n執行結果:")
    for result in results:
        print(f"  {result}")

    total_time = end_time - start_time
    print(f"總執行時間: {total_time:.2f} 秒")
    print("=== 演示結束 ===\n")

異步HTTP請求

異步編程在網絡請求中特別有用。首先需要安裝aiohttp庫:

pip install aiohttp

然後在asyncio_demo.py中繼續添加:

import aiohttp

async def fetch_url(session, url, name):
    """異步獲取URL內容"""
    try:
        print(f"開始請求 {name}: {url}")
        start_time = time.time()

        async with session.get(url, timeout=10) as response:
            content = await response.text()

        end_time = time.time()
        duration = end_time - start_time

        print(f"完成請求 {name}: 狀態碼 {response.status}, "
              f"內容長度 {len(content)}, 耗時 {duration:.2f} 秒")

        return {
            'name': name,
            'url': url,
            'status': response.status,
            'length': len(content),
            'duration': duration
        }

    except asyncio.TimeoutError:
        print(f"請求 {name} 超時")
        return {'name': name, 'url': url, 'error': 'timeout'}
    except Exception as e:
        print(f"請求 {name} 出錯: {e}")
        return {'name': name, 'url': url, 'error': str(e)}

async def demonstrate_async_http():
    """演示異步HTTP請求"""
    print("=== 異步HTTP請求演示 ===")

    urls = [
        ("百度", "https://www.baidu.com"),
        ("知乎", "https://www.zhihu.com"),
        ("GitHub", "https://github.com"),
        ("Python官網", "https://www.python.org")
    ]

    start_time = time.time()

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url, name) for name, url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    end_time = time.time()

    print("\n請求結果彙總:")
    for result in results:
        if isinstance(result, dict) and 'error' not in result:
            print(f"  {result['name']}: {result['status']} - "
                  f"{result['length']} bytes - {result['duration']:.2f}s")
        elif isinstance(result, dict):
            print(f"  {result['name']}: 錯誤 - {result.get('error', '未知錯誤')}")
        else:
            print(f"  異常: {result}")

    total_time = end_time - start_time
    print(f"\n異步HTTP總執行時間: {total_time:.2f} 秒")
    print("=== 演示結束 ===\n")

執行程序:

python asyncio_demo.py

輸出結果:

=== 異步HTTP請求演示 ===
開始請求 百度: https://www.baidu.com
開始請求 知乎: https://www.zhihu.com
開始請求 GitHub: https://github.com
開始請求 Python官網: https://www.python.org
完成請求 百度: 狀態碼 200, 內容長度 28918, 耗時 0.31 秒
完成請求 知乎: 狀態碼 403, 內容長度 118, 耗時 0.39 秒
完成請求 GitHub: 狀態碼 200, 內容長度 553650, 耗時 0.51 秒
完成請求 Python官網: 狀態碼 200, 內容長度 50272, 耗時 1.18 秒

請求結果彙總:
  百度: 200 - 28918 bytes - 0.31s
  知乎: 403 - 118 bytes - 0.39s
  GitHub: 200 - 553650 bytes - 0.51s
  Python官網: 200 - 50272 bytes - 1.18s

異步HTTP總執行時間: 1.19 秒

可以看到,4個HTTP請求併發執行,總時間約爲最慢請求的時間。

15.5 concurrent.futures模塊

concurrent.futures模塊提供了高級的併發編程接口,可以統一處理線程和進程。創建futures_demo.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
concurrent.futures模塊演示
統一的併發編程接口
"""

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time

def cpu_bound_task(n):
    """CPU密集型任務"""
    start_time = time.time()
    result = sum(i * i for i in range(n))
    end_time = time.time()
    return {
        'n': n,
        'result': result,
        'duration': end_time - start_time
    }

def io_bound_task(duration):
    """IO密集型任務"""
    time.sleep(duration)
    return f"任務完成,耗時 {duration} 秒"

def demonstrate_thread_pool_executor():
    """演示線程池執行器"""
    print("=== ThreadPoolExecutor演示 ===")

    tasks = [0.5, 1.0, 0.8, 1.2]

    start_time = time.time()

    # 使用線程池執行器
    with ThreadPoolExecutor(max_workers=4) as executor:
        # 提交任務
        futures = [executor.submit(io_bound_task, duration) for duration in tasks]

        # 獲取結果
        results = [future.result() for future in futures]

    end_time = time.time()

    print("任務結果:")
    for result in results:
        print(f"  {result}")

    print(f"ThreadPoolExecutor總耗時: {end_time - start_time:.2f} 秒")
    print("=== 演示結束 ===\n")

def demonstrate_process_pool_executor():
    """演示進程池執行器"""
    print("=== ProcessPoolExecutor演示 ===")

    tasks = [1000000, 2000000, 1500000, 3000000]

    start_time = time.time()

    # 使用進程池執行器
    with ProcessPoolExecutor(max_workers=4) as executor:
        # 提交任務
        futures = [executor.submit(cpu_bound_task, n) for n in tasks]

        # 獲取結果
        results = [future.result() for future in futures]

    end_time = time.time()

    print("任務結果:")
    for result in results:
        print(f"  計算 {result['n']} 項: 結果 {result['result']}, "
              f"耗時 {result['duration']:.4f} 秒")

    print(f"ProcessPoolExecutor總耗時: {end_time - start_time:.4f} 秒")
    print("=== 演示結束 ===\n")

if __name__ == "__main__":
    demonstrate_thread_pool_executor()
    demonstrate_process_pool_executor()

15.6 同步原語詳解

各種鎖的使用

創建sync_primitives_demo.py來演示各種同步原語:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
同步原語詳解
演示各種鎖和同步機制的使用
"""

import threading
import time

# 信號量演示
def demonstrate_semaphore():
    """演示信號量"""
    print("=== 信號量演示 ===")

    # 創建信號量,最多允許2個線程同時訪問
    semaphore = threading.Semaphore(2)

    def access_resource(thread_id):
        print(f"線程 {thread_id} 嘗試獲取資源...")
        with semaphore:
            print(f"線程 {thread_id} 獲得資源訪問權")
            time.sleep(2)  # 模擬資源使用
            print(f"線程 {thread_id} 釋放資源")

    # 創建5個線程嘗試訪問資源
    threads = []
    for i in range(5):
        thread = threading.Thread(target=access_resource, args=(i+1,))
        threads.append(thread)

    start_time = time.time()

    # 啓動所有線程
    for thread in threads:
        thread.start()

    # 等待所有線程完成
    for thread in threads:
        thread.join()

    end_time = time.time()
    print(f"總執行時間: {end_time - start_time:.2f} 秒")
    print("=== 演示結束 ===\n")

# 事件演示
def demonstrate_event():
    """演示事件"""
    print("=== 事件演示 ===")

    event = threading.Event()

    def waiter(name):
        print(f"等待者 {name} 開始等待事件")
        event.wait()
        print(f"等待者 {name} 收到事件,開始工作")
        time.sleep(1)
        print(f"等待者 {name} 工作完成")

    def setter():
        print("設置者開始工作")
        time.sleep(2)
        print("設置者觸發事件")
        event.set()

    # 創建線程
    threads = []

    # 等待者線程
    for i in range(3):
        thread = threading.Thread(target=waiter, args=(f"Waiter-{i+1}",))
        threads.append(thread)

    # 設置者線程
    setter_thread = threading.Thread(target=setter)
    threads.append(setter_thread)

    # 啓動線程
    for thread in threads:
        thread.start()

    # 等待完成
    for thread in threads:
        thread.join()

    print("=== 演示結束 ===\n")

if __name__ == "__main__":
    demonstrate_semaphore()
    demonstrate_event()

15.7 併發設計模式

工作者模式

創建design_patterns_demo.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
併發設計模式演示
工作者模式等
"""

import queue
import threading
import time
import random

class WorkerPool:
    """工作者池"""

    def __init__(self, num_workers=4):
        self.task_queue = queue.Queue()
        self.result_queue = queue.Queue()
        self.workers = []
        self.num_workers = num_workers
        self.shutdown = False

    def start(self):
        """啓動工作者池"""
        for i in range(self.num_workers):
            worker = threading.Thread(target=self._worker, args=(i+1,))
            worker.start()
            self.workers.append(worker)
        print(f"啓動了 {self.num_workers} 個工作者")

    def _worker(self, worker_id):
        """工作者函數"""
        while not self.shutdown:
            try:
                task = self.task_queue.get(timeout=1)
                if task is None:  # 毒丸,用於停止工作者
                    break

                print(f"工作者 {worker_id} 開始處理任務: {task['id']}")
                start_time = time.time()

                # 模擬任務處理
                time.sleep(random.uniform(0.5, 2.0))

                end_time = time.time()
                result = {
                    'task_id': task['id'],
                    'worker_id': worker_id,
                    'result': task['data'] * 2,
                    'duration': end_time - start_time
                }

                self.result_queue.put(result)
                print(f"工作者 {worker_id} 完成任務: {task['id']}")

                self.task_queue.task_done()

            except queue.Empty:
                continue

    def submit_task(self, task_id, data):
        """提交任務"""
        task = {'id': task_id, 'data': data}
        self.task_queue.put(task)

    def get_result(self, timeout=None):
        """獲取結果"""
        try:
            return self.result_queue.get(timeout=timeout)
        except queue.Empty:
            return None

    def shutdown_pool(self):
        """關閉工作者池"""
        print("正在關閉工作者池...")
        self.shutdown = True

        # 發送毒丸停止所有工作者
        for _ in range(self.num_workers):
            self.task_queue.put(None)

        # 等待所有工作者結束
        for worker in self.workers:
            worker.join()

        print("工作者池已關閉")

def demonstrate_worker_pool():
    """演示工作者池"""
    print("=== 工作者池演示 ===")

    # 創建並啓動工作者池
    pool = WorkerPool(num_workers=3)
    pool.start()

    # 提交任務
    tasks = [
        (f"Task-{i+1}", random.randint(1, 100))
        for i in range(8)
    ]

    print(f"提交 {len(tasks)} 個任務")
    for task_id, data in tasks:
        pool.submit_task(task_id, data)

    # 收集結果
    results = []
    for _ in range(len(tasks)):
        result = pool.get_result(timeout=5)
        if result:
            results.append(result)
            print(f"收到結果: 任務 {result['task_id']} "
                  f"由工作者 {result['worker_id']} 完成,"
                  f"結果 {result['result']},耗時 {result['duration']:.2f} 秒")

    # 關閉池
    pool.shutdown_pool()

    print(f"處理了 {len(results)} 個任務")
    print("=== 演示結束 ===\n")

if __name__ == "__main__":
    demonstrate_worker_pool()

15.8 性能優化和調試

性能測試和比較

創建performance_demo.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
併發性能測試和比較
"""

import time
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def cpu_bound_task_sync(n):
    """CPU密集型任務(同步版本)"""
    result = 0
    for i in range(n):
        result += i * i
    return result

def io_bound_task_sync(duration):
    """IO密集型任務(同步版本)"""
    time.sleep(duration)
    return f"同步任務完成,耗時 {duration} 秒"

async def io_bound_task_async(duration):
    """IO密集型任務(異步版本)"""
    await asyncio.sleep(duration)
    return f"異步任務完成,耗時 {duration} 秒"

def benchmark_cpu_intensive():
    """CPU密集型任務性能測試"""
    print("=== CPU密集型任務性能測試 ===")

    task_size = 1000000
    num_tasks = 4

    # 1. 單線程執行
    print("1. 單線程執行:")
    start_time = time.time()
    results = [cpu_bound_task_sync(task_size) for _ in range(num_tasks)]
    single_thread_time = time.time() - start_time
    print(f"   執行時間: {single_thread_time:.4f} 秒")

    # 2. 多線程執行
    print("2. 多線程執行:")
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(cpu_bound_task_sync, [task_size] * num_tasks))
    multi_thread_time = time.time() - start_time
    print(f"   執行時間: {multi_thread_time:.4f} 秒")

    # 3. 多進程執行
    print("3. 多進程執行:")
    start_time = time.time()
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(cpu_bound_task_sync, [task_size] * num_tasks))
    multi_process_time = time.time() - start_time
    print(f"   執行時間: {multi_process_time:.4f} 秒")

    # 性能比較
    print("\n性能比較:")
    print(f"多線程相對單線程: {single_thread_time / multi_thread_time:.2f}x")
    print(f"多進程相對單線程: {single_thread_time / multi_process_time:.2f}x")
    print(f"多進程相對多線程: {multi_thread_time / multi_process_time:.2f}x")
    print("=== 測試結束 ===\n")

async def benchmark_io_intensive():
    """IO密集型任務性能測試"""
    print("=== IO密集型任務性能測試 ===")

    task_duration = 0.5
    num_tasks = 8

    # 1. 同步執行
    print("1. 同步執行:")
    start_time = time.time()
    results = [io_bound_task_sync(task_duration) for _ in range(num_tasks)]
    sync_time = time.time() - start_time
    print(f"   執行時間: {sync_time:.4f} 秒")

    # 2. 多線程執行
    print("2. 多線程執行:")
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=8) as executor:
        results = list(executor.map(io_bound_task_sync, [task_duration] * num_tasks))
    thread_time = time.time() - start_time
    print(f"   執行時間: {thread_time:.4f} 秒")

    # 3. 異步執行
    print("3. 異步執行:")
    start_time = time.time()
    tasks = [io_bound_task_async(task_duration) for _ in range(num_tasks)]
    results = await asyncio.gather(*tasks)
    async_time = time.time() - start_time
    print(f"   執行時間: {async_time:.4f} 秒")

    # 性能比較
    print("\n性能比較:")
    print(f"多線程相對同步: {sync_time / thread_time:.2f}x")
    print(f"異步相對同步: {sync_time / async_time:.2f}x")
    print(f"異步相對多線程: {thread_time / async_time:.2f}x")
    print("=== 測試結束 ===\n")

async def run_benchmarks():
    """運行性能測試"""
    benchmark_cpu_intensive()
    await benchmark_io_intensive()

if __name__ == "__main__":
    asyncio.run(run_benchmarks())

15.9 實際應用場景

數據處理管道

創建一個數據處理示例data_processing_demo.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
數據處理管道示例
實際應用場景演示
"""

import csv
import json
import time
import os
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import cpu_count

class DataProcessor:
    """數據處理器"""

    def __init__(self, num_workers=4):
        self.num_workers = num_workers

    def process_chunk(self, data_chunk, processor_id):
        """處理數據塊"""
        print(f"處理器 {processor_id} 開始處理 {len(data_chunk)} 條記錄")
        start_time = time.time()

        processed_data = []
        for record in data_chunk:
            # 示例數據處理邏輯
            processed_record = {
                'id': record.get('id', ''),
                'name': record.get('name', '').upper(),
                'value': float(record.get('value', 0)) * 1.1,
                'processed_at': time.time()
            }
            processed_data.append(processed_record)

        end_time = time.time()
        print(f"處理器 {processor_id} 完成,耗時 {end_time - start_time:.2f} 秒")

        return processed_data

    def process_data_threaded(self, data, chunk_size=1000):
        """使用多線程處理數據"""
        print("=== 多線程數據處理 ===")

        # 將數據分塊
        chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
        print(f"將 {len(data)} 條記錄分成 {len(chunks)} 個塊")

        start_time = time.time()

        # 使用線程池處理
        with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
            futures = [
                executor.submit(self.process_chunk, chunk, i+1)
                for i, chunk in enumerate(chunks)
            ]

            # 收集結果
            results = []
            for future in futures:
                chunk_result = future.result()
                results.extend(chunk_result)

        end_time = time.time()

        print(f"多線程處理完成,處理了 {len(results)} 條記錄")
        print(f"總耗時: {end_time - start_time:.2f} 秒")

        return results

def create_sample_data(num_records=10000):
    """創建示例數據"""
    import random

    print(f"創建 {num_records} 條示例數據")

    data = []
    for i in range(num_records):
        record = {
            'id': i + 1,
            'name': f"用戶_{i+1}",
            'value': round(random.uniform(10, 1000), 2)
        }
        data.append(record)

    return data

def demonstrate_data_processing():
    """演示數據處理"""
    print(f"系統CPU核心數: {cpu_count()}")

    # 創建示例數據
    sample_data = create_sample_data(num_records=5000)

    # 處理數據
    processor = DataProcessor(num_workers=4)
    processed_data = processor.process_data_threaded(sample_data, chunk_size=1000)

    # 顯示結果示例
    print("\n處理結果示例(前5條):")
    for record in processed_data[:5]:
        print(f"  ID: {record['id']}, 姓名: {record['name']}, "
              f"處理後值: {record['value']:.2f}")

    print("=== 演示結束 ===")

if __name__ == "__main__":
    demonstrate_data_processing()

總結

本章詳細介紹了Python併發編程的各個方面,從基礎概念到高級應用。通過大量的實際代碼示例和運行結果,我們學習了:

核心知識點

  1. 併發編程基礎:理解了併發與並行的區別,以及Python GIL的影響
  2. 多線程編程:掌握了線程創建、同步、通信等核心技術
  3. 多進程編程:學習了進程間通信和進程池的使用
  4. 異步編程:理解了asyncio的工作原理和異步模式
  5. 高級工具:掌握了concurrent.futures等高級併發工具
  6. 設計模式:學習了常見的併發設計模式

性能特點對比

併發方式 適用場景 優勢 劣勢
多線程 IO密集型任務 內存共享,切換開銷小 受GIL限制,無法真正並行
多進程 CPU密集型任務 真正並行,不受GIL限制 內存開銷大,進程間通信複雜
異步編程 高併發IO 單線程,無鎖問題 不適合CPU密集型任務

選擇建議

  • CPU密集型任務:使用多進程
  • IO密集型任務:使用多線程或異步編程
  • 高併發網絡請求:優先選擇異步編程
  • 簡單並行計算:使用concurrent.futures

最佳實踐

  1. 正確選擇併發模型:根據任務類型選擇合適的併發方式
  2. 合理使用同步原語:避免過度同步導致性能下降
  3. 異常處理:併發程序中的異常處理要特別謹慎
  4. 資源管理:及時釋放線程、進程等資源
  5. 性能測試:通過實際測試驗證併發效果

通過掌握這些併發編程技術,您可以編寫出高性能、高併發的Python應用程序,充分利用現代計算機的多核處理能力。在實際項目中,要根據具體需求選擇合適的併發策略,並進行充分的測試和優化。

小夜