第10章 Python高級特性¶
Python作爲一門現代化的編程語言,提供了許多高級特性來幫助開發者編寫更加簡潔、高效和優雅的代碼。本章將深入探討Python的核心高級特性,包括推導式、生成器、迭代器、裝飾器、閉包和上下文管理器等。這些特性不僅能提高代碼的可讀性和性能,還體現了Python”優雅勝於醜陋”的設計哲學。
本系列文章所使用到的示例源碼:Python從入門到精通示例代碼
10.1 列表推導式¶
列表推導式(List Comprehension)是Python中最受歡迎的特性之一,它提供了一種簡潔而強大的方式來創建列表。列表推導式不僅代碼更加簡潔,在很多情況下性能也優於傳統的循環方式。
10.1.1 列表推導式的概念¶
列表推導式是一種基於現有可迭代對象創建新列表的簡潔語法。它將循環和條件判斷融合在一個表達式中,體現了函數式編程的思想。相比傳統的for循環,列表推導式具有以下優勢:
- 簡潔性:一行代碼完成多行循環的功能
- 可讀性:表達式更接近自然語言的描述
- 性能:在C語言層面優化,執行效率更高
- 函數式風格:避免了顯式的循環變量管理
10.1.2 基本語法結構¶
列表推導式的基本語法結構爲:[expression for item in iterable]
# 基本列表推導式示例
numbers = [1, 2, 3, 4, 5]
squares = [x**2 for x in numbers]
print(f"原始列表: {numbers}")
print(f"平方列表: {squares}")
輸出結果:
原始列表: [1, 2, 3, 4, 5]
平方列表: [1, 4, 9, 16, 25]
在這個例子中:
- x**2 是表達式部分,定義瞭如何轉換每個元素
- for x in numbers 是迭代部分,定義了數據源和迭代變量
- 整個表達式返回一個新的列表
10.1.3 條件過濾¶
列表推導式可以包含條件判斷,語法爲:[expression for item in iterable if condition]
# 條件過濾示例
numbers = [1, 2, 3, 4, 5]
even_squares = [x**2 for x in numbers if x % 2 == 0]
print(f"偶數的平方: {even_squares}")
# 字符串處理
words = ['hello', 'world', 'python', 'programming']
capitalized = [word.capitalize() for word in words if len(word) > 5]
print(f"長度大於5的單詞首字母大寫: {capitalized}")
輸出結果:
偶數的平方: [4, 16]
長度大於5的單詞首字母大寫: ['Python', 'Programming']
10.1.4 嵌套列表推導式¶
對於二維列表或更復雜的數據結構,可以使用嵌套的列表推導式:
# 嵌套列表推導式
matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
flattened = [num for row in matrix for num in row]
print(f"二維矩陣: {matrix}")
print(f"展平後: {flattened}")
輸出結果:
二維矩陣: [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
展平後: [1, 2, 3, 4, 5, 6, 7, 8, 9]
10.1.5 性能對比¶
列表推導式通常比傳統循環具有更好的性能:
import time
# 使用列表推導式
start_time = time.time()
list_comp_result = [x**2 for x in range(10000)]
list_comp_time = time.time() - start_time
# 使用傳統循環
start_time = time.time()
loop_result = []
for x in range(10000):
loop_result.append(x**2)
loop_time = time.time() - start_time
print(f"列表推導式耗時: {list_comp_time:.6f}秒")
print(f"傳統循環耗時: {loop_time:.6f}秒")
輸出結果:
列表推導式耗時: 0.001000秒
傳統循環耗時: 0.000997秒
雖然在這個簡單例子中性能差異不大,但在更復雜的場景中,列表推導式通常表現更好。
10.2 字典推導式與集合推導式¶
除了列表推導式,Python還提供了字典推導式和集合推導式,它們遵循相似的語法規則,但用於創建不同類型的數據結構。這些推導式同樣具有簡潔、高效的特點。
10.2.1 字典推導式的語法¶
字典推導式的基本語法爲:{key: value for item in iterable}
# 基本字典推導式
numbers = [1, 2, 3, 4, 5]
square_dict = {x: x**2 for x in numbers}
print(f"數字及其平方的字典: {square_dict}")
# 條件過濾
even_square_dict = {x: x**2 for x in numbers if x % 2 == 0}
print(f"偶數及其平方的字典: {even_square_dict}")
輸出結果:
數字及其平方的字典: {1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
偶數及其平方的字典: {2: 4, 4: 16}
10.2.2 字符串處理與鍵值互換¶
字典推導式在字符串處理和數據轉換中非常有用:
# 字符串處理
words = ['hello', 'world', 'python']
word_lengths = {word: len(word) for word in words}
print(f"單詞長度字典: {word_lengths}")
# 字典鍵值互換
original_dict = {'a': 1, 'b': 2, 'c': 3}
swapped_dict = {v: k for k, v in original_dict.items()}
print(f"原字典: {original_dict}")
print(f"鍵值互換後: {swapped_dict}")
輸出結果:
單詞長度字典: {'hello': 5, 'world': 5, 'python': 6}
原字典: {'a': 1, 'b': 2, 'c': 3}
鍵值互換後: {1: 'a', 2: 'b', 3: 'c'}
10.2.3 集合推導式的語法¶
集合推導式的語法爲:{expression for item in iterable},它會自動去除重複元素:
# 基本集合推導式
numbers = [1, 2, 2, 3, 3, 4, 5]
unique_squares = {x**2 for x in numbers}
print(f"原列表: {numbers}")
print(f"去重後的平方集合: {unique_squares}")
# 條件過濾
even_set = {x for x in numbers if x % 2 == 0}
print(f"偶數集合: {even_set}")
輸出結果:
原列表: [1, 2, 2, 3, 3, 4, 5]
去重後的平方集合: {1, 4, 9, 16, 25}
偶數集合: {2, 4}
10.2.4 集合運算與嵌套推導式¶
集合推導式可以與集合運算結合,處理更復雜的邏輯:
# 集合運算
set1 = {1, 2, 3, 4, 5}
set2 = {4, 5, 6, 7, 8}
intersection_squares = {x**2 for x in set1 if x in set2}
print(f"集合1: {set1}")
print(f"集合2: {set2}")
print(f"交集元素的平方: {intersection_squares}")
# 嵌套推導式
matrix = [[1, 2], [3, 4], [5, 6]]
flattened_set = {num for row in matrix for num in row if num % 2 == 0}
print(f"矩陣: {matrix}")
print(f"偶數元素集合: {flattened_set}")
輸出結果:
集合1: {1, 2, 3, 4, 5}
集合2: {4, 5, 6, 7, 8}
交集元素的平方: {16, 25}
矩陣: [[1, 2], [3, 4], [5, 6]]
偶數元素集合: {2, 4, 6}
10.2.5 實際應用場景¶
推導式在數據處理、統計分析等場景中非常實用:
# 數據統計
scores = [85, 92, 78, 96, 88, 76, 94, 89]
grade_distribution = {
'A': len([s for s in scores if s >= 90]),
'B': len([s for s in scores if 80 <= s < 90]),
'C': len([s for s in scores if 70 <= s < 80]),
'D': len([s for s in scores if s < 70])
}
print(f"成績分佈: {grade_distribution}")
# 配置處理
config_list = [('debug', 'true'), ('port', '8080'), ('host', 'localhost')]
config_dict = {k: v for k, v in config_list}
print(f"配置字典: {config_dict}")
輸出結果:
成績分佈: {'A': 3, 'B': 3, 'C': 1, 'D': 1}
配置字典: {'debug': 'true', 'port': '8080', 'host': 'localhost'}
10.2.6 推導式的限制與最佳實踐¶
雖然推導式功能強大,但也需要注意其使用邊界:
# 可讀性邊界示例
# 好的推導式:簡潔明瞭
even_squares = [x**2 for x in range(10) if x % 2 == 0]
# 不推薦的推導式:過於複雜
# complex_result = [func(x) for x in data if condition1(x) and condition2(x) for y in other_data if related(x, y)]
# 對於複雜邏輯,使用傳統循環更清晰
complex_result = []
for x in data:
if condition1(x) and condition2(x):
for y in other_data:
if related(x, y):
complex_result.append(func(x))
最佳實踐原則:
- 保持推導式的簡潔性,一行代碼應該易於理解
- 複雜的條件判斷或多層嵌套時考慮使用傳統循環
- 優先考慮代碼的可讀性而非簡潔性
- 在性能敏感的場景中,推導式通常是更好的選擇
10.3 生成器(Generator)¶
生成器是Python中一個非常重要的特性,它提供了一種優雅的方式來創建迭代器。生成器採用惰性求值的策略,只在需要時才計算值,這使得它在處理大量數據或無限序列時具有顯著的內存優勢。
10.3.1 生成器的概念¶
生成器具有以下核心特性:
- 惰性求值:只在需要時才計算下一個值
- 內存效率:不需要將所有值同時存儲在內存中
- 迭代器協議:自動實現了__iter__()和__next__()方法
- 狀態保持:能夠記住函數執行的狀態
10.3.2 生成器函數¶
生成器函數使用yield關鍵字來產生值,而不是return:
# 基本生成器函數
def simple_generator():
print("開始生成")
yield 1
print("生成第二個值")
yield 2
print("生成第三個值")
yield 3
print("生成結束")
# 使用生成器
gen = simple_generator()
print("創建生成器對象")
for value in gen:
print(f"獲得值: {value}")
輸出結果:
創建生成器對象
開始生成
獲得值: 1
生成第二個值
獲得值: 2
生成第三個值
獲得值: 3
生成結束
10.3.3 斐波那契數列生成器¶
生成器特別適合生成數學序列:
# 斐波那契數列生成器
def fibonacci_generator(n):
"""生成前n個斐波那契數"""
a, b = 0, 1
count = 0
while count < n:
yield a
a, b = b, a + b
count += 1
# 使用斐波那契生成器
print("前10個斐波那契數:")
for num in fibonacci_generator(10):
print(num, end=" ")
print()
輸出結果:
前10個斐波那契數:
0 1 1 2 3 5 8 13 21 34
10.3.4 生成器表達式¶
生成器表達式提供了一種更簡潔的創建生成器的方式:
# 生成器表達式
squares_gen = (x**2 for x in range(5))
print(f"生成器對象: {squares_gen}")
print("生成器產生的值:")
for square in squares_gen:
print(square, end=" ")
print()
輸出結果:
生成器對象: <generator object <genexpr> at 0x...>
生成器產生的值:
0 1 4 9 16
10.3.5 內存使用對比¶
生成器相比列表推導式具有顯著的內存優勢:
import sys
# 列表推導式
list_comp = [x**2 for x in range(1000)]
list_size = sys.getsizeof(list_comp)
# 生成器表達式
gen_expr = (x**2 for x in range(1000))
gen_size = sys.getsizeof(gen_expr)
print(f"列表推導式內存使用: {list_size} 字節")
print(f"生成器表達式內存使用: {gen_size} 字節")
print(f"內存節省: {list_size - gen_size} 字節")
print(f"節省比例: {(list_size - gen_size) / list_size * 100:.1f}%")
輸出結果:
列表推導式內存使用: 8856 字節
生成器表達式內存使用: 104 字節
內存節省: 8752 字節
節省比例: 98.8%
10.3.6 無限序列生成器¶
生成器可以創建理論上無限的序列:
# 無限計數器
def infinite_counter(start=0, step=1):
"""無限計數生成器"""
current = start
while True:
yield current
current += step
# 使用無限生成器(注意:需要手動停止)
counter = infinite_counter(1, 2)
print("前10個奇數:")
for i, num in enumerate(counter):
if i >= 10:
break
print(num, end=" ")
print()
輸出結果:
前10個奇數:
1 3 5 7 9 11 13 15 17 19
10.3.7 生成器的send方法¶
生成器支持雙向通信,可以通過send()方法向生成器發送值:
# 支持send的生成器
def echo_generator():
"""回聲生成器,可以接收外部發送的值"""
value = None
while True:
received = yield value
if received is not None:
value = f"Echo: {received}"
else:
value = "等待輸入..."
# 使用send方法
echo_gen = echo_generator()
print(next(echo_gen)) # 啓動生成器
print(echo_gen.send("Hello"))
print(echo_gen.send("World"))
print(next(echo_gen))
輸出結果:
None
Echo: Hello
Echo: World
等待輸入...
10.3.8 管道處理示例¶
生成器非常適合構建數據處理管道:
# 數據處理管道
def read_numbers():
"""模擬讀取數字數據"""
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
for num in numbers:
print(f"讀取: {num}")
yield num
def filter_even(numbers):
"""過濾偶數"""
for num in numbers:
if num % 2 == 0:
print(f"過濾得到偶數: {num}")
yield num
def square_numbers(numbers):
"""計算平方"""
for num in numbers:
result = num ** 2
print(f"計算平方: {num}^2 = {result}")
yield result
# 構建處理管道
pipeline = square_numbers(filter_even(read_numbers()))
print("開始處理管道:")
results = list(pipeline)
print(f"最終結果: {results}")
輸出結果:
開始處理管道:
讀取: 1
讀取: 2
過濾得到偶數: 2
計算平方: 2^2 = 4
讀取: 3
讀取: 4
過濾得到偶數: 4
計算平方: 4^2 = 16
讀取: 5
讀取: 6
過濾得到偶數: 6
計算平方: 6^2 = 36
讀取: 7
讀取: 8
過濾得到偶數: 8
計算平方: 8^2 = 64
讀取: 9
讀取: 10
過濾得到偶數: 10
計算平方: 10^2 = 100
最終結果: [4, 16, 36, 64, 100]
10.3.9 生成器的應用場景¶
生成器在以下場景中特別有用:
- 大數據處理:逐行處理大文件而不需要將整個文件加載到內存
- 無限序列:生成數學序列、隨機數序列等
- 管道處理:構建數據處理流水線
- 內存優化:在內存受限的環境中處理大量數據
- 協程基礎:爲異步編程提供基礎支持
生成器是Python中實現高效、優雅代碼的重要工具,掌握其使用方法對於編寫高質量的Python程序至關重要。
10.4 迭代器(Iterator)¶
迭代器是Python中實現迭代協議的對象,它提供了一種統一的方式來遍歷容器中的元素。理解迭代器的工作原理對於掌握Python的高級特性至關重要。
10.4.1 迭代器協議¶
迭代器協議包含兩個核心方法:
- __iter__():返回迭代器對象本身
- __next__():返回下一個值,沒有更多值時拋出StopIteration異常
# 自定義迭代器類
class Counter:
"""簡單的計數器迭代器"""
def __init__(self, start, end):
self.current = start
self.end = end
def __iter__(self):
return self
def __next__(self):
if self.current < self.end:
num = self.current
self.current += 1
return num
else:
raise StopIteration
# 使用自定義迭代器
print("使用自定義計數器迭代器:")
counter = Counter(1, 5)
for num in counter:
print(f"計數: {num}")
輸出結果:
使用自定義計數器迭代器:
計數: 1
計數: 2
計數: 3
計數: 4
10.4.2 可迭代對象與迭代器的區別¶
可迭代對象和迭代器是兩個不同的概念:
# 演示可迭代對象與迭代器的區別
class NumberList:
"""可迭代對象:包含數據,但不直接實現迭代邏輯"""
def __init__(self, numbers):
self.numbers = numbers
def __iter__(self):
return NumberIterator(self.numbers)
class NumberIterator:
"""迭代器:實現具體的迭代邏輯"""
def __init__(self, numbers):
self.numbers = numbers
self.index = 0
def __iter__(self):
return self
def __next__(self):
if self.index < len(self.numbers):
value = self.numbers[self.index]
self.index += 1
return value
else:
raise StopIteration
# 使用可迭代對象
number_list = NumberList([10, 20, 30, 40])
print("第一次迭代:")
for num in number_list:
print(f"數字: {num}")
print("第二次迭代:")
for num in number_list:
print(f"數字: {num}")
# 直接使用迭代器
print("直接使用迭代器:")
iterator = NumberIterator([1, 2, 3])
print(f"迭代器類型: {type(iterator)}")
print(f"是否爲迭代器: {hasattr(iterator, '__next__')}")
print(f"是否爲可迭代對象: {hasattr(iterator, '__iter__')}")
輸出結果:
第一次迭代:
數字: 10
數字: 20
數字: 30
數字: 40
第二次迭代:
數字: 10
數字: 20
數字: 30
數字: 40
直接使用迭代器:
迭代器類型: <class '__main__.NumberIterator'>
是否爲迭代器: True
是否爲可迭代對象: True
10.4.3 斐波那契迭代器¶
實現一個更復雜的迭代器示例:
# 斐波那契迭代器
class FibonacciIterator:
"""斐波那契數列迭代器"""
def __init__(self, max_count):
self.max_count = max_count
self.count = 0
self.current = 0
self.next_val = 1
def __iter__(self):
return self
def __next__(self):
if self.count < self.max_count:
result = self.current
self.current, self.next_val = self.next_val, self.current + self.next_val
self.count += 1
return result
else:
raise StopIteration
# 使用斐波那契迭代器
print("斐波那契數列迭代器:")
fib_iter = FibonacciIterator(8)
for fib_num in fib_iter:
print(fib_num, end=" ")
print()
輸出結果:
斐波那契數列迭代器:
0 1 1 2 3 5 8 13
10.4.4 內置迭代器工具¶
Python提供了許多內置的迭代器工具:
# enumerate() - 添加索引
fruits = ['apple', 'banana', 'orange']
print("enumerate示例:")
for index, fruit in enumerate(fruits, start=1):
print(f"{index}. {fruit}")
# zip() - 並行迭代
names = ['Alice', 'Bob', 'Charlie']
ages = [25, 30, 35]
print("\nzip示例:")
for name, age in zip(names, ages):
print(f"{name} is {age} years old")
# reversed() - 反向迭代
numbers = [1, 2, 3, 4, 5]
print("\nreversed示例:")
for num in reversed(numbers):
print(num, end=" ")
print()
輸出結果:
enumerate示例:
1. apple
2. banana
3. orange
zip示例:
Alice is 25 years old
Bob is 30 years old
Charlie is 35 years old
reversed示例:
5 4 3 2 1
10.4.5 迭代器的狀態管理¶
迭代器會保持其內部狀態,這意味着它們只能被消費一次:
# 迭代器狀態管理示例
class StatefulIterator:
"""展示迭代器狀態管理的示例"""
def __init__(self, data):
self.data = data
self.index = 0
def __iter__(self):
return self
def __next__(self):
if self.index < len(self.data):
value = self.data[self.index]
self.index += 1
print(f"返回索引 {self.index-1} 的值: {value}")
return value
else:
print("迭代器已耗盡")
raise StopIteration
# 演示迭代器狀態
print("迭代器狀態管理:")
iterator = StatefulIterator(['a', 'b', 'c'])
print("第一次迭代:")
for item in iterator:
pass
print("第二次迭代:")
for item in iterator:
print(f"獲得: {item}")
else:
print("第二次迭代沒有產生任何值")
輸出結果:
迭代器狀態管理:
第一次迭代:
返回索引 0 的值: a
返回索引 1 的值: b
返回索引 2 的值: c
第二次迭代:
迭代器已耗盡
第二次迭代沒有產生任何值
10.4.6 迭代器鏈¶
可以將多個迭代器連接起來形成迭代器鏈:
# 迭代器鏈示例
class ChainIterator:
"""連接多個可迭代對象的迭代器"""
def __init__(self, *iterables):
self.iterables = iterables
self.current_iterable = 0
self.current_iterator = iter(self.iterables[0]) if self.iterables else iter([])
def __iter__(self):
return self
def __next__(self):
while self.current_iterable < len(self.iterables):
try:
return next(self.current_iterator)
except StopIteration:
self.current_iterable += 1
if self.current_iterable < len(self.iterables):
self.current_iterator = iter(self.iterables[self.current_iterable])
else:
raise StopIteration
raise StopIteration
# 使用迭代器鏈
print("迭代器鏈示例:")
chain = ChainIterator([1, 2, 3], ['a', 'b'], [10, 20])
for item in chain:
print(item, end=" ")
print()
輸出結果:
迭代器鏈示例:
1 2 3 a b 10 20
10.4.7 itertools模塊簡介¶
itertools模塊提供了許多有用的迭代器工具:
import itertools
# count() - 無限計數
print("itertools.count示例:")
counter = itertools.count(10, 2) # 從10開始,步長爲2
for i, num in enumerate(counter):
if i >= 5:
break
print(num, end=" ")
print()
# cycle() - 循環迭代
print("\nitertools.cycle示例:")
cycler = itertools.cycle(['A', 'B', 'C'])
for i, letter in enumerate(cycler):
if i >= 8:
break
print(letter, end=" ")
print()
# chain() - 連接迭代器
print("\nitertools.chain示例:")
chained = itertools.chain([1, 2, 3], ['x', 'y'], [10, 20])
for item in chained:
print(item, end=" ")
print()
輸出結果:
itertools.count示例:
10 12 14 16 18
itertools.cycle示例:
A B C A B C A B
itertools.chain示例:
1 2 3 x y 10 20
10.4.8 迭代器的應用場景¶
迭代器在以下場景中特別有用:
- 內存效率:處理大量數據時避免一次性加載到內存
- 惰性求值:只在需要時才計算值
- 無限序列:表示理論上無限的數據序列
- 狀態保持:在迭代過程中維護複雜的狀態信息
- 協議統一:爲不同類型的容器提供統一的遍歷接口
迭代器是Python中一個基礎而重要的概念,它爲許多高級特性(如生成器、推導式等)提供了理論基礎。
10.5 裝飾器(Decorator)¶
裝飾器是Python中一個強大而優雅的特性,它允許我們在不修改原函數代碼的情況下,爲函數添加額外的功能。裝飾器體現了”開放-封閉原則”,即對擴展開放,對修改封閉。
10.5.1 裝飾器的概念¶
裝飾器基於以下Python特性:
- 函數是一等公民:函數可以作爲參數傳遞和返回值
- 高階函數:接受函數作爲參數或返回函數的函數
- 閉包:內部函數可以訪問外部函數的變量
- 語法糖:@decorator語法使代碼更簡潔
10.5.2 簡單裝飾器¶
最基本的裝飾器實現:
# 簡單裝飾器示例
def my_decorator(func):
"""簡單的裝飾器函數"""
def wrapper():
print("裝飾器:函數執行前")
result = func()
print("裝飾器:函數執行後")
return result
return wrapper
# 使用裝飾器
@my_decorator
def say_hello():
"""被裝飾的函數"""
print("Hello, World!")
return "greeting"
# 調用被裝飾的函數
print("調用裝飾後的函數:")
result = say_hello()
print(f"函數返回值: {result}")
輸出結果:
調用裝飾後的函數:
裝飾器:函數執行前
Hello, World!
裝飾器:函數執行後
函數返回值: greeting
10.5.3 計時裝飾器¶
實用的性能監控裝飾器:
import time
import functools
def timing_decorator(func):
"""計時裝飾器,測量函數執行時間"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
execution_time = end_time - start_time
print(f"函數 {func.__name__} 執行時間: {execution_time:.4f}秒")
return result
return wrapper
@timing_decorator
def slow_function():
"""模擬耗時操作"""
print("執行耗時操作...")
time.sleep(0.1) # 模擬0.1秒的操作
return "操作完成"
# 測試計時裝飾器
print("測試計時裝飾器:")
result = slow_function()
print(f"結果: {result}")
輸出結果:
測試計時裝飾器:
執行耗時操作...
函數 slow_function 執行時間: 0.1001秒
結果: 操作完成
10.5.4 帶參數的裝飾器¶
裝飾器工廠允許我們創建可配置的裝飾器:
def repeat(times):
"""重複執行裝飾器工廠"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
results = []
for i in range(times):
print(f"第 {i+1} 次執行:")
result = func(*args, **kwargs)
results.append(result)
return results
return wrapper
return decorator
@repeat(3)
def greet(name):
"""問候函數"""
message = f"Hello, {name}!"
print(message)
return message
# 測試帶參數的裝飾器
print("測試重複執行裝飾器:")
results = greet("Alice")
print(f"所有結果: {results}")
輸出結果:
測試重複執行裝飾器:
第 1 次執行:
Hello, Alice!
第 2 次執行:
Hello, Alice!
第 3 次執行:
Hello, Alice!
所有結果: ['Hello, Alice!', 'Hello, Alice!', 'Hello, Alice!']
10.5.5 日誌裝飾器¶
實用的日誌記錄裝飾器:
import functools
from datetime import datetime
def log_calls(level="INFO"):
"""日誌裝飾器工廠"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
args_str = ", ".join(map(str, args))
kwargs_str = ", ".join(f"{k}={v}" for k, v in kwargs.items())
params = ", ".join(filter(None, [args_str, kwargs_str]))
print(f"[{level}] {timestamp} - 調用函數 {func.__name__}({params})")
try:
result = func(*args, **kwargs)
print(f"[{level}] {timestamp} - 函數 {func.__name__} 執行成功,返回: {result}")
return result
except Exception as e:
print(f"[ERROR] {timestamp} - 函數 {func.__name__} 執行失敗: {e}")
raise
return wrapper
return decorator
@log_calls("DEBUG")
def calculate(a, b, operation="add"):
"""計算函數"""
if operation == "add":
return a + b
elif operation == "multiply":
return a * b
else:
raise ValueError(f"不支持的操作: {operation}")
# 測試日誌裝飾器
print("測試日誌裝飾器:")
result1 = calculate(5, 3)
result2 = calculate(4, 7, operation="multiply")
print(f"計算結果: {result1}, {result2}")
輸出結果:
測試日誌裝飾器:
[DEBUG] 2024-01-15 10:30:45 - 調用函數 calculate(5, 3, operation=add)
[DEBUG] 2024-01-15 10:30:45 - 函數 calculate 執行成功,返回: 8
[DEBUG] 2024-01-15 10:30:45 - 調用函數 calculate(4, 7, operation=multiply)
[DEBUG] 2024-01-15 10:30:45 - 函數 calculate 執行成功,返回: 28
計算結果: 8, 28
10.5.6 類裝飾器¶
使用類實現裝飾器:
class CountCalls:
"""計數裝飾器類"""
def __init__(self, func):
self.func = func
self.count = 0
functools.update_wrapper(self, func)
def __call__(self, *args, **kwargs):
self.count += 1
print(f"函數 {self.func.__name__} 被調用第 {self.count} 次")
return self.func(*args, **kwargs)
def get_count(self):
"""獲取調用次數"""
return self.count
@CountCalls
def fibonacci(n):
"""計算斐波那契數"""
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
# 測試類裝飾器
print("測試類裝飾器:")
result = fibonacci(5)
print(f"fibonacci(5) = {result}")
print(f"總調用次數: {fibonacci.get_count()}")
輸出結果:
測試類裝飾器:
函數 fibonacci 被調用第 1 次
函數 fibonacci 被調用第 2 次
函數 fibonacci 被調用第 3 次
函數 fibonacci 被調用第 4 次
函數 fibonacci 被調用第 5 次
函數 fibonacci 被調用第 6 次
函數 fibonacci 被調用第 7 次
函數 fibonacci 被調用第 8 次
函數 fibonacci 被調用第 9 次
函數 fibonacci 被調用第 10 次
函數 fibonacci 被調用第 11 次
函數 fibonacci 被調用第 12 次
函數 fibonacci 被調用第 13 次
函數 fibonacci 被調用第 14 次
函數 fibonacci 被調用第 15 次
fibonacci(5) = 5
總調用次數: 15
10.5.7 緩存裝飾器¶
實現簡單的緩存機制:
def simple_cache(func):
"""簡單的緩存裝飾器"""
cache = {}
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 創建緩存鍵
key = str(args) + str(sorted(kwargs.items()))
if key in cache:
print(f"緩存命中: {func.__name__}{args}")
return cache[key]
print(f"計算結果: {func.__name__}{args}")
result = func(*args, **kwargs)
cache[key] = result
return result
# 添加清除緩存的方法
wrapper.clear_cache = lambda: cache.clear()
wrapper.cache_info = lambda: f"緩存大小: {len(cache)}"
return wrapper
@simple_cache
def expensive_calculation(n):
"""模擬耗時計算"""
time.sleep(0.1) # 模擬計算時間
return n * n * n
# 測試緩存裝飾器
print("測試緩存裝飾器:")
print(f"第一次調用: {expensive_calculation(5)}")
print(f"第二次調用: {expensive_calculation(5)}")
print(f"第三次調用: {expensive_calculation(3)}")
print(f"第四次調用: {expensive_calculation(5)}")
print(expensive_calculation.cache_info())
輸出結果:
測試緩存裝飾器:
計算結果: expensive_calculation(5,)
第一次調用: 125
緩存命中: expensive_calculation(5,)
第二次調用: 125
計算結果: expensive_calculation(3,)
第三次調用: 27
緩存命中: expensive_calculation(5,)
第四次調用: 125
緩存大小: 2
10.5.8 內置裝飾器¶
Python提供了一些常用的內置裝飾器:
class Person:
"""演示內置裝飾器的類"""
def __init__(self, name, age):
self._name = name
self._age = age
@property
def name(self):
"""name屬性的getter"""
print("獲取name屬性")
return self._name
@name.setter
def name(self, value):
"""name屬性的setter"""
print(f"設置name屬性爲: {value}")
if not isinstance(value, str):
raise TypeError("姓名必須是字符串")
self._name = value
@staticmethod
def is_adult(age):
"""靜態方法:判斷是否成年"""
return age >= 18
@classmethod
def from_string(cls, person_str):
"""類方法:從字符串創建Person對象"""
name, age = person_str.split('-')
return cls(name, int(age))
def __str__(self):
return f"Person(name={self.name}, age={self._age})"
# 測試內置裝飾器
print("測試內置裝飾器:")
# 測試@property
person = Person("Alice", 25)
print(f"獲取姓名: {person.name}")
person.name = "Bob"
print(f"修改後: {person}")
# 測試@staticmethod
print(f"18歲是否成年: {Person.is_adult(18)}")
print(f"16歲是否成年: {Person.is_adult(16)}")
# 測試@classmethod
person2 = Person.from_string("Charlie-30")
print(f"從字符串創建: {person2}")
輸出結果:
測試內置裝飾器:
獲取name屬性
獲取姓名: Alice
設置name屬性爲: Bob
獲取name屬性
修改後: Person(name=Bob, age=25)
18歲是否成年: True
16歲是否成年: False
獲取name屬性
從字符串創建: Person(name=Charlie, age=30)
10.5.9 裝飾器的應用場景¶
裝飾器在以下場景中特別有用:
- 日誌記錄:自動記錄函數調用信息
- 性能監控:測量函數執行時間和資源使用
- 權限檢查:驗證用戶權限後再執行函數
- 緩存機制:緩存函數結果以提高性能
- 重試機制:在函數失敗時自動重試
- 參數驗證:檢查函數參數的有效性
- 事務管理:自動處理數據庫事務
- API限流:控制函數調用頻率
裝飾器是Python中實現橫切關注點(Cross-cutting Concerns)的優雅方式,它讓我們能夠以聲明式的方式爲函數添加功能,保持代碼的清潔和可維護性。
10.6 閉包(Closure)¶
閉包是函數式編程中的一個重要概念,它允許內部函數訪問外部函數的變量,即使外部函數已經執行完畢。閉包在Python中有着廣泛的應用,特別是在裝飾器、回調函數和函數工廠等場景中。
10.6.1 閉包的概念¶
閉包具有以下核心特徵:
- 嵌套函數:函數內部定義另一個函數
- 自由變量:內部函數引用外部函數的變量
- 詞法作用域:變量的作用域由代碼結構決定
- 變量持久化:外部函數執行完畢後,變量仍然存在
10.6.2 基本閉包示例¶
最簡單的閉包實現:
# 基本閉包示例
def outer_function(x):
"""外部函數"""
print(f"外部函數接收參數: {x}")
def inner_function(y):
"""內部函數,形成閉包"""
print(f"內部函數接收參數: {y}")
print(f"訪問外部變量: {x}")
return x + y
print("外部函數返回內部函數")
return inner_function
# 創建閉包
print("創建閉包:")
closure = outer_function(10)
print(f"閉包對象: {closure}")
print(f"閉包類型: {type(closure)}")
# 調用閉包
print("\n調用閉包:")
result = closure(5)
print(f"結果: {result}")
輸出結果:
創建閉包:
外部函數接收參數: 10
外部函數返回內部函數
閉包對象: <function outer_function.<locals>.inner_function at 0x...>
閉包類型: <class 'function'>
調用閉包:
內部函數接收參數: 5
訪問外部變量: 10
結果: 15
10.6.3 閉包變量的持久化¶
閉包能夠保持外部函數變量的狀態:
# 閉包變量持久化示例
def create_counter(initial_value=0):
"""創建計數器閉包"""
count = initial_value
def counter():
nonlocal count
count += 1
return count
return counter
# 創建多個獨立的計數器
print("創建獨立的計數器:")
counter1 = create_counter(0)
counter2 = create_counter(100)
print("counter1的調用:")
for i in range(3):
print(f"counter1(): {counter1()}")
print("\ncounter2的調用:")
for i in range(3):
print(f"counter2(): {counter2()}")
print("\n再次調用counter1:")
print(f"counter1(): {counter1()}")
輸出結果:
創建獨立的計數器:
counter1的調用:
counter1(): 1
counter1(): 2
counter1(): 3
counter2的調用:
counter2(): 101
counter2(): 102
counter2(): 103
再次調用counter1:
counter1(): 4
10.6.4 閉包實現裝飾器¶
閉包是實現裝飾器的基礎:
# 使用閉包實現裝飾器
def create_multiplier_decorator(factor):
"""創建乘法裝飾器的工廠函數"""
print(f"創建乘法因子爲 {factor} 的裝飾器")
def decorator(func):
print(f"裝飾函數 {func.__name__}")
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
multiplied_result = result * factor
print(f"原始結果: {result}, 乘以 {factor} 後: {multiplied_result}")
return multiplied_result
return wrapper
return decorator
# 使用閉包裝飾器
@create_multiplier_decorator(3)
def calculate_area(length, width):
"""計算面積"""
area = length * width
print(f"計算面積: {length} × {width} = {area}")
return area
# 測試閉包裝飾器
print("\n測試閉包裝飾器:")
result = calculate_area(4, 5)
print(f"最終結果: {result}")
輸出結果:
創建乘法因子爲 3 的裝飾器
裝飾函數 calculate_area
測試閉包裝飾器:
計算面積: 4 × 5 = 20
原始結果: 20, 乘以 3 後: 60
最終結果: 60
10.6.5 函數工廠¶
閉包可以用來創建具有特定行爲的函數:
# 函數工廠示例
def create_validator(min_value, max_value):
"""創建數值驗證器"""
def validator(value):
if not isinstance(value, (int, float)):
return False, f"值必須是數字,得到: {type(value).__name__}"
if value < min_value:
return False, f"值 {value} 小於最小值 {min_value}"
if value > max_value:
return False, f"值 {value} 大於最大值 {max_value}"
return True, f"值 {value} 有效"
# 添加驗證器的描述信息
validator.description = f"驗證範圍: [{min_value}, {max_value}]"
return validator
# 創建不同的驗證器
age_validator = create_validator(0, 120)
percentage_validator = create_validator(0, 100)
temperature_validator = create_validator(-273.15, 1000)
# 測試驗證器
print("測試函數工廠創建的驗證器:")
validators = [
(age_validator, "年齡驗證器", [25, -5, 150]),
(percentage_validator, "百分比驗證器", [85, 105, 50.5]),
(temperature_validator, "溫度驗證器", [25.5, -300, "abc"])
]
for validator, name, test_values in validators:
print(f"\n{name} ({validator.description}):")
for value in test_values:
is_valid, message = validator(value)
status = "✓" if is_valid else "✗"
print(f" {status} {message}")
輸出結果:
測試函數工廠創建的驗證器:
年齡驗證器 (驗證範圍: [0, 120]):
✓ 值 25 有效
✗ 值 -5 小於最小值 0
✗ 值 150 大於最大值 120
百分比驗證器 (驗證範圍: [0, 100]):
✓ 值 85 有效
✗ 值 105 大於最大值 100
✓ 值 50.5 有效
溫度驗證器 (驗證範圍: [-273.15, 1000]):
✓ 值 25.5 有效
✗ 值 -300 小於最小值 -273.15
✗ 值必須是數字,得到: str
10.6.6 配置化函數¶
閉包可以創建具有預設配置的函數:
# 配置化函數示例
def create_formatter(prefix="", suffix="", separator=" "):
"""創建格式化函數"""
def formatter(*args):
# 將所有參數轉換爲字符串並連接
content = separator.join(str(arg) for arg in args)
return f"{prefix}{content}{suffix}"
# 添加配置信息
formatter.config = {
'prefix': prefix,
'suffix': suffix,
'separator': separator
}
return formatter
# 創建不同配置的格式化函數
html_formatter = create_formatter("<p>", "</p>")
csv_formatter = create_formatter("", "", ",")
log_formatter = create_formatter("[LOG] ", " [END]")
bracket_formatter = create_formatter("(", ")", ", ")
# 測試配置化函數
print("測試配置化函數:")
formatters = [
(html_formatter, "HTML格式化器", ["Hello", "World"]),
(csv_formatter, "CSV格式化器", ["Name", "Age", "City"]),
(log_formatter, "日誌格式化器", ["Error", "File not found"]),
(bracket_formatter, "括號格式化器", [1, 2, 3, 4])
]
for formatter, name, test_data in formatters:
result = formatter(*test_data)
config = formatter.config
print(f"\n{name}:")
print(f" 配置: {config}")
print(f" 輸入: {test_data}")
print(f" 輸出: {result}")
輸出結果:
測試配置化函數:
HTML格式化器:
配置: {'prefix': '<p>', 'suffix': '</p>', 'separator': ' '}
輸入: ['Hello', 'World']
輸出: <p>Hello World</p>
CSV格式化器:
配置: {'prefix': '', 'suffix': '', 'separator': ','}
輸入: ['Name', 'Age', 'City']
輸出: Name,Age,City
日誌格式化器:
配置: {'prefix': '[LOG] ', 'suffix': ' [END]', 'separator': ' '}
輸入: ['Error', 'File not found']
輸出: [LOG] Error File not found [END]
括號格式化器:
配置: {'prefix': '(', 'suffix': ')', 'separator': ', '}
輸入: [1, 2, 3, 4]
輸出: (1, 2, 3, 4)
10.6.7 事件處理閉包¶
閉包在事件處理中的應用:
# 事件處理閉包示例
def create_event_handler(event_type, handler_name):
"""創建事件處理器"""
call_count = 0
def handle_event(data):
nonlocal call_count
call_count += 1
timestamp = time.strftime("%H:%M:%S")
print(f"[{timestamp}] {handler_name} 處理 {event_type} 事件 (第{call_count}次)")
print(f" 事件數據: {data}")
return f"處理完成: {event_type} - {data}"
# 添加處理器信息
handle_event.info = {
'event_type': event_type,
'handler_name': handler_name,
'call_count': lambda: call_count
}
return handle_event
# 創建事件處理器
click_handler = create_event_handler("click", "按鈕點擊處理器")
keypress_handler = create_event_handler("keypress", "鍵盤輸入處理器")
mouse_handler = create_event_handler("mouseover", "鼠標懸停處理器")
# 模擬事件處理
print("模擬事件處理:")
events = [
(click_handler, "button_1"),
(keypress_handler, "Enter"),
(click_handler, "button_2"),
(mouse_handler, "menu_item"),
(click_handler, "button_1"),
(keypress_handler, "Escape")
]
for handler, data in events:
result = handler(data)
print(f" 返回: {result}")
print()
# 顯示處理器統計信息
print("處理器統計信息:")
handlers = [click_handler, keypress_handler, mouse_handler]
for handler in handlers:
info = handler.info
print(f"{info['handler_name']}: {info['call_count']()} 次調用")
輸出結果:
模擬事件處理:
[10:30:45] 按鈕點擊處理器 處理 click 事件 (第1次)
事件數據: button_1
返回: 處理完成: click - button_1
[10:30:45] 鍵盤輸入處理器 處理 keypress 事件 (第1次)
事件數據: Enter
返回: 處理完成: keypress - Enter
[10:30:45] 按鈕點擊處理器 處理 click 事件 (第2次)
事件數據: button_2
返回: 處理完成: click - button_2
[10:30:45] 鼠標懸停處理器 處理 mouseover 事件 (第1次)
事件數據: menu_item
返回: 處理完成: mouseover - menu_item
[10:30:45] 按鈕點擊處理器 處理 click 事件 (第3次)
事件數據: button_1
返回: 處理完成: click - button_1
[10:30:45] 鍵盤輸入處理器 處理 keypress 事件 (第2次)
事件數據: Escape
返回: 處理完成: keypress - Escape
處理器統計信息:
按鈕點擊處理器: 3 次調用
鍵盤輸入處理器: 2 次調用
鼠標懸停處理器: 1 次調用
10.6.8 閉包作用域鏈¶
理解閉包的作用域鏈:
# 閉包作用域鏈示例
global_var = "全局變量"
def level1_function(level1_var):
"""第一層函數"""
print(f"Level 1: {level1_var}")
def level2_function(level2_var):
"""第二層函數"""
print(f"Level 2: {level2_var}")
def level3_function(level3_var):
"""第三層函數,可以訪問所有外層變量"""
print(f"Level 3: {level3_var}")
print(f"訪問Level 2變量: {level2_var}")
print(f"訪問Level 1變量: {level1_var}")
print(f"訪問全局變量: {global_var}")
# 顯示作用域鏈
return {
'level3': level3_var,
'level2': level2_var,
'level1': level1_var,
'global': global_var
}
return level3_function
return level2_function
# 創建嵌套閉包
print("創建嵌套閉包:")
closure_level2 = level1_function("Level1數據")
closure_level3 = closure_level2("Level2數據")
print("\n調用最內層閉包:")
scope_chain = closure_level3("Level3數據")
print(f"\n作用域鏈數據: {scope_chain}")
輸出結果:
創建嵌套閉包:
Level 1: Level1數據
Level 2: Level2數據
調用最內層閉包:
Level 3: Level3數據
訪問Level 2變量: Level2數據
訪問Level 1變量: Level1數據
訪問全局變量: 全局變量
作用域鏈數據: {'level3': 'Level3數據', 'level2': 'Level2數據', 'level1': 'Level1數據', 'global': '全局變量'}
10.6.9 nonlocal關鍵字¶
使用nonlocal修改外部函數的變量:
# nonlocal關鍵字示例
def create_bank_account(initial_balance):
"""創建銀行賬戶閉包"""
balance = initial_balance
transaction_history = []
def deposit(amount):
"""存款"""
nonlocal balance
if amount > 0:
balance += amount
transaction_history.append(f"存款: +{amount}")
print(f"存款 {amount},當前餘額: {balance}")
else:
print("存款金額必須大於0")
def withdraw(amount):
"""取款"""
nonlocal balance
if amount > 0:
if balance >= amount:
balance -= amount
transaction_history.append(f"取款: -{amount}")
print(f"取款 {amount},當前餘額: {balance}")
else:
print(f"餘額不足,當前餘額: {balance}")
else:
print("取款金額必須大於0")
def get_balance():
"""查詢餘額"""
return balance
def get_history():
"""查詢交易歷史"""
return transaction_history.copy()
# 返回賬戶操作函數
return {
'deposit': deposit,
'withdraw': withdraw,
'balance': get_balance,
'history': get_history
}
# 創建銀行賬戶
print("創建銀行賬戶:")
account = create_bank_account(1000)
print(f"初始餘額: {account['balance']()}")
# 進行一系列操作
print("\n進行銀行操作:")
account['deposit'](500)
account['withdraw'](200)
account['withdraw'](1500) # 餘額不足
account['deposit'](100)
account['withdraw'](300)
print(f"\n最終餘額: {account['balance']()}")
print(f"交易歷史: {account['history']()}")
輸出結果:
創建銀行賬戶:
初始餘額: 1000
進行銀行操作:
存款 500,當前餘額: 1500
取款 200,當前餘額: 1300
餘額不足,當前餘額: 1300
存款 100,當前餘額: 1400
取款 300,當前餘額: 1100
最終餘額: 1100
交易歷史: ['存款: +500', '取款: -200', '存款: +100', '取款: -300']
10.6.10 閉包陷阱¶
需要注意的閉包陷阱:
# 閉包陷阱示例
def create_functions_wrong():
"""錯誤的閉包創建方式"""
functions = []
for i in range(3):
# 錯誤:所有函數都會引用同一個變量i
def func():
return f"函數返回: {i}"
functions.append(func)
return functions
def create_functions_correct():
"""正確的閉包創建方式"""
functions = []
for i in range(3):
# 正確:使用默認參數捕獲當前的i值
def func(x=i):
return f"函數返回: {x}"
functions.append(func)
return functions
def create_functions_lambda():
"""使用lambda的正確方式"""
return [lambda x=i: f"Lambda返回: {x}" for i in range(3)]
# 測試閉包陷阱
print("測試閉包陷阱:")
print("\n錯誤的閉包創建:")
wrong_functions = create_functions_wrong()
for idx, func in enumerate(wrong_functions):
print(f"函數{idx}: {func()}")
print("\n正確的閉包創建:")
correct_functions = create_functions_correct()
for idx, func in enumerate(correct_functions):
print(f"函數{idx}: {func()}")
print("\n使用Lambda的正確方式:")
lambda_functions = create_functions_lambda()
for idx, func in enumerate(lambda_functions):
print(f"Lambda{idx}: {func()}")
輸出結果:
測試閉包陷阱:
錯誤的閉包創建:
函數0: 函數返回: 2
函數1: 函數返回: 2
函數2: 函數返回: 2
正確的閉包創建:
函數0: 函數返回: 0
函數1: 函數返回: 1
函數2: 函數返回: 2
使用Lambda的正確方式:
Lambda0: Lambda返回: 0
Lambda1: Lambda返回: 1
Lambda2: Lambda返回: 2
10.6.11 閉包的應用場景¶
閉包在以下場景中特別有用:
- 裝飾器實現:保持裝飾器的狀態和配置
- 函數工廠:創建具有特定行爲的函數
- 回調函數:保持回調函數的上下文信息
- 配置化函數:創建預配置的函數
- 狀態保持:在函數調用間保持狀態
- 數據封裝:創建私有變量和方法
- 事件處理:保持事件處理器的狀態
- 緩存機制:實現函數級別的緩存
閉包是Python中一個強大的特性,它爲函數式編程提供了重要支持,使得代碼更加靈活和模塊化。理解閉包的工作原理對於掌握Python的高級特性至關重要。
10.7 上下文管理器(Context Manager)¶
上下文管理器是Python中用於資源管理的重要機制,它確保資源的正確獲取和釋放。通過with語句,上下文管理器提供了一種優雅的方式來處理需要清理的資源,如文件、網絡連接、數據庫連接等。
10.7.1 上下文管理器協議¶
上下文管理器需要實現兩個特殊方法:
- __enter__():進入上下文時調用,返回資源對象
- __exit__(exc_type, exc_val, exc_tb):退出上下文時調用,處理清理工作
10.7.2 自定義上下文管理器類¶
使用類實現上下文管理器:
# 自定義計時器上下文管理器
import time
class Timer:
"""計時器上下文管理器"""
def __init__(self, name="操作"):
self.name = name
self.start_time = None
self.end_time = None
def __enter__(self):
"""進入上下文"""
print(f"開始執行 {self.name}...")
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出上下文"""
self.end_time = time.time()
duration = self.end_time - self.start_time
if exc_type is None:
print(f"{self.name} 執行完成,耗時: {duration:.4f}秒")
else:
print(f"{self.name} 執行出錯,耗時: {duration:.4f}秒")
print(f"異常類型: {exc_type.__name__}")
print(f"異常信息: {exc_val}")
# 返回False表示不抑制異常
return False
@property
def elapsed_time(self):
"""獲取執行時間"""
if self.start_time and self.end_time:
return self.end_time - self.start_time
return None
# 使用計時器上下文管理器
print("測試計時器上下文管理器:")
# 正常執行
with Timer("數據處理") as timer:
# 模擬一些工作
time.sleep(0.1)
result = sum(range(1000000))
print(f"計算結果: {result}")
print(f"通過屬性獲取執行時間: {timer.elapsed_time:.4f}秒")
# 異常情況
print("\n測試異常情況:")
try:
with Timer("錯誤操作"):
time.sleep(0.05)
raise ValueError("模擬錯誤")
except ValueError as e:
print(f"捕獲異常: {e}")
輸出結果:
測試計時器上下文管理器:
開始執行 數據處理...
計算結果: 499999500000
數據處理 執行完成,耗時: 0.1234秒
通過屬性獲取執行時間: 0.1234秒
測試異常情況:
開始執行 錯誤操作...
錯誤操作 執行出錯,耗時: 0.0567秒
異常類型: ValueError
異常信息: 模擬錯誤
捕獲異常: 模擬錯誤
10.7.3 文件管理上下文管理器¶
創建安全的文件操作上下文管理器:
# 文件管理上下文管理器
import os
import tempfile
class FileManager:
"""文件管理上下文管理器"""
def __init__(self, filename, mode='r', encoding='utf-8', backup=False):
self.filename = filename
self.mode = mode
self.encoding = encoding
self.backup = backup
self.file = None
self.backup_file = None
def __enter__(self):
"""進入上下文,打開文件"""
print(f"打開文件: {self.filename} (模式: {self.mode})")
# 如果是寫模式且需要備份
if 'w' in self.mode and self.backup and os.path.exists(self.filename):
self.backup_file = f"{self.filename}.backup"
print(f"創建備份文件: {self.backup_file}")
with open(self.filename, 'r', encoding=self.encoding) as src:
with open(self.backup_file, 'w', encoding=self.encoding) as dst:
dst.write(src.read())
try:
self.file = open(self.filename, self.mode, encoding=self.encoding)
return self.file
except Exception as e:
print(f"打開文件失敗: {e}")
raise
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出上下文,關閉文件"""
if self.file:
print(f"關閉文件: {self.filename}")
self.file.close()
if exc_type is not None:
print(f"文件操作出現異常: {exc_type.__name__}: {exc_val}")
# 如果有備份文件,詢問是否恢復
if self.backup_file and os.path.exists(self.backup_file):
print("檢測到備份文件,可以考慮恢復")
return False # 不抑制異常
# 測試文件管理器
print("測試文件管理上下文管理器:")
# 創建測試文件
test_file = "test_context.txt"
with open(test_file, 'w', encoding='utf-8') as f:
f.write("原始內容\n這是測試文件\n")
# 使用文件管理器讀取
print("\n讀取文件:")
with FileManager(test_file, 'r') as f:
content = f.read()
print(f"文件內容:\n{content}")
# 使用文件管理器寫入(帶備份)
print("\n寫入文件(帶備份):")
with FileManager(test_file, 'w', backup=True) as f:
f.write("新的內容\n這是修改後的文件\n")
# 驗證修改
print("\n驗證修改:")
with FileManager(test_file, 'r') as f:
new_content = f.read()
print(f"修改後的內容:\n{new_content}")
# 檢查備份文件
backup_file = f"{test_file}.backup"
if os.path.exists(backup_file):
print(f"\n備份文件內容:")
with open(backup_file, 'r', encoding='utf-8') as f:
backup_content = f.read()
print(backup_content)
# 清理測試文件
os.remove(test_file)
if os.path.exists(backup_file):
os.remove(backup_file)
輸出結果:
測試文件管理上下文管理器:
讀取文件:
打開文件: test_context.txt (模式: r)
文件內容:
原始內容
這是測試文件
關閉文件: test_context.txt
寫入文件(帶備份):
打開文件: test_context.txt (模式: w)
創建備份文件: test_context.txt.backup
關閉文件: test_context.txt
驗證修改:
打開文件: test_context.txt (模式: r)
修改後的內容:
新的內容
這是修改後的文件
關閉文件: test_context.txt
備份文件內容:
原始內容
這是測試文件
10.7.4 使用@contextmanager裝飾器¶
使用contextlib.contextmanager裝飾器簡化上下文管理器的創建:
# 使用@contextmanager裝飾器
from contextlib import contextmanager
import sqlite3
import tempfile
import os
@contextmanager
def database_transaction(db_path):
"""數據庫事務上下文管理器"""
print(f"連接數據庫: {db_path}")
conn = sqlite3.connect(db_path)
try:
print("開始事務")
conn.execute("BEGIN")
yield conn
print("提交事務")
conn.commit()
except Exception as e:
print(f"事務回滾: {e}")
conn.rollback()
raise
finally:
print("關閉數據庫連接")
conn.close()
@contextmanager
def temporary_directory(prefix="temp_"):
"""臨時目錄上下文管理器"""
temp_dir = tempfile.mkdtemp(prefix=prefix)
print(f"創建臨時目錄: {temp_dir}")
try:
yield temp_dir
finally:
# 清理臨時目錄
import shutil
print(f"清理臨時目錄: {temp_dir}")
shutil.rmtree(temp_dir, ignore_errors=True)
@contextmanager
def change_directory(new_dir):
"""臨時改變工作目錄"""
old_dir = os.getcwd()
print(f"當前目錄: {old_dir}")
print(f"切換到目錄: {new_dir}")
try:
os.chdir(new_dir)
yield new_dir
finally:
print(f"恢復到原目錄: {old_dir}")
os.chdir(old_dir)
# 測試@contextmanager裝飾器
print("測試@contextmanager裝飾器:")
# 測試數據庫事務
print("\n1. 數據庫事務測試:")
db_file = "test.db"
try:
with database_transaction(db_file) as conn:
# 創建表
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE
)
""")
# 插入數據
conn.execute("INSERT INTO users (name, email) VALUES (?, ?)",
("張三", "zhangsan@example.com"))
conn.execute("INSERT INTO users (name, email) VALUES (?, ?)",
("李四", "lisi@example.com"))
# 查詢數據
cursor = conn.execute("SELECT * FROM users")
users = cursor.fetchall()
print(f"插入的用戶: {users}")
except Exception as e:
print(f"數據庫操作失敗: {e}")
finally:
if os.path.exists(db_file):
os.remove(db_file)
# 測試臨時目錄
print("\n2. 臨時目錄測試:")
with temporary_directory("my_temp_") as temp_dir:
print(f"在臨時目錄中工作: {temp_dir}")
# 在臨時目錄中創建文件
test_file = os.path.join(temp_dir, "test.txt")
with open(test_file, 'w') as f:
f.write("這是臨時文件")
print(f"創建文件: {test_file}")
print(f"文件存在: {os.path.exists(test_file)}")
print(f"臨時目錄已清理,文件存在: {os.path.exists(test_file) if 'test_file' in locals() else 'N/A'}")
# 測試目錄切換
print("\n3. 目錄切換測試:")
with change_directory(os.path.expanduser("~")):
current = os.getcwd()
print(f"當前工作目錄: {current}")
# 列出當前目錄的一些文件
files = os.listdir('.')[:5] # 只顯示前5個
print(f"目錄中的文件(前5個): {files}")
print(f"最終工作目錄: {os.getcwd()}")
輸出結果:
測試@contextmanager裝飾器:
1. 數據庫事務測試:
連接數據庫: test.db
開始事務
插入的用戶: [(1, '張三', 'zhangsan@example.com'), (2, '李四', 'lisi@example.com')]
提交事務
關閉數據庫連接
2. 臨時目錄測試:
創建臨時目錄: C:\Users\...\temp_abc123
在臨時目錄中工作: C:\Users\...\temp_abc123
創建文件: C:\Users\...\temp_abc123\test.txt
文件存在: True
清理臨時目錄: C:\Users\...\temp_abc123
臨時目錄已清理,文件存在: False
3. 目錄切換測試:
當前目錄: C:\Users\yeyupiaoling/Python從入門到精通
切換到目錄: C:\Users\yeyupiaoling
當前工作目錄: C:\Users\yeyupiaoling
目錄中的文件(前5個): ['Desktop', 'Documents', 'Downloads', 'Music', 'Pictures']
恢復到原目錄: C:\Users\yeyupiaoling/Python從入門到精通
最終工作目錄: C:\Users\yeyupiaoling/Python從入門到精通
10.7.5 contextlib模塊的其他工具¶
contextlib模塊提供了多個有用的上下文管理器:
# contextlib模塊的其他工具
from contextlib import suppress, ExitStack, closing
import warnings
# 1. suppress - 抑制指定異常
print("1. suppress - 抑制異常:")
# 不使用suppress的情況
print("不使用suppress:")
try:
int("not_a_number")
except ValueError:
print("捕獲到ValueError")
# 使用suppress
print("\n使用suppress:")
with suppress(ValueError):
result = int("not_a_number")
print("這行不會執行")
print("繼續執行後續代碼")
# suppress多個異常類型
print("\nsuppress多個異常:")
with suppress(ValueError, TypeError, KeyError):
data = {"key": "value"}
result = data["nonexistent_key"] + 10
print("異常被抑制,繼續執行")
# 2. ExitStack - 管理多個上下文管理器
print("\n2. ExitStack - 管理多個上下文管理器:")
@contextmanager
def managed_resource(name):
"""模擬資源管理"""
print(f"獲取資源: {name}")
try:
yield name
finally:
print(f"釋放資源: {name}")
# 使用ExitStack管理多個資源
with ExitStack() as stack:
# 動態添加上下文管理器
resource1 = stack.enter_context(managed_resource("數據庫連接"))
resource2 = stack.enter_context(managed_resource("文件句柄"))
resource3 = stack.enter_context(managed_resource("網絡連接"))
print(f"使用資源: {resource1}, {resource2}, {resource3}")
# 可以根據條件添加更多資源
if True: # 某個條件
resource4 = stack.enter_context(managed_resource("緩存連接"))
print(f"額外資源: {resource4}")
print("所有資源已自動釋放")
# 3. 嵌套上下文管理器
print("\n3. 嵌套上下文管理器:")
@contextmanager
def logging_context(operation):
"""日誌記錄上下文"""
print(f"[開始] {operation}")
start_time = time.time()
try:
yield
except Exception as e:
print(f"[錯誤] {operation}: {e}")
raise
finally:
end_time = time.time()
print(f"[結束] {operation} (耗時: {end_time - start_time:.3f}秒)")
# 嵌套使用多個上下文管理器
with logging_context("數據處理流程"):
with logging_context("數據加載"):
time.sleep(0.1)
print("加載數據完成")
with logging_context("數據轉換"):
time.sleep(0.05)
print("轉換數據完成")
with logging_context("數據保存"):
time.sleep(0.08)
print("保存數據完成")
# 4. 條件上下文管理器
print("\n4. 條件上下文管理器:")
@contextmanager
def conditional_context(condition, context_manager):
"""條件上下文管理器"""
if condition:
with context_manager as value:
yield value
else:
yield None
# 使用條件上下文管理器
for use_timer in [True, False]:
print(f"\n使用計時器: {use_timer}")
with conditional_context(use_timer, Timer("條件操作")) as timer:
time.sleep(0.05)
print("執行一些操作")
if timer:
print(f"操作耗時: {timer.elapsed_time}秒")
else:
print("未使用計時器")
輸出結果:
1. suppress - 抑制異常:
不使用suppress:
捕獲到ValueError
使用suppress:
繼續執行後續代碼
suppress多個異常:
異常被抑制,繼續執行
2. ExitStack - 管理多個上下文管理器:
獲取資源: 數據庫連接
獲取資源: 文件句柄
獲取資源: 網絡連接
使用資源: 數據庫連接, 文件句柄, 網絡連接
獲取資源: 緩存連接
額外資源: 緩存連接
釋放資源: 緩存連接
釋放資源: 網絡連接
釋放資源: 文件句柄
釋放資源: 數據庫連接
所有資源已自動釋放
3. 嵌套上下文管理器:
[開始] 數據處理流程
[開始] 數據加載
加載數據完成
[結束] 數據加載 (耗時: 0.101秒)
[開始] 數據轉換
轉換數據完成
[結束] 數據轉換 (耗時: 0.051秒)
[開始] 數據保存
保存數據完成
[結束] 數據保存 (耗時: 0.081秒)
[結束] 數據處理流程 (耗時: 0.235秒)
4. 條件上下文管理器:
使用計時器: True
開始執行 條件操作...
執行一些操作
條件操作 執行完成,耗時: 0.0512秒
操作耗時: 0.0512秒
使用計時器: False
執行一些操作
未使用計時器
10.7.6 上下文管理器的應用場景¶
上下文管理器在以下場景中特別有用:
- 資源管理:文件、數據庫連接、網絡連接等
- 狀態管理:臨時改變系統狀態,如工作目錄、環境變量
- 異常處理:確保在異常情況下也能正確清理資源
- 性能監控:測量代碼塊的執行時間
- 事務管理:數據庫事務的提交和回滾
- 鎖管理:線程同步中的鎖獲取和釋放
- 臨時配置:臨時修改配置參數
- 日誌記錄:爲代碼塊添加結構化日誌
上下文管理器是Python中實現”獲取資源,使用資源,釋放資源”模式的標準方法,它確保了資源的正確管理,提高了代碼的健壯性和可維護性。
10.8 章節總結¶
本章深入探討了Python的高級特性,這些特性是Python語言強大和優雅的重要體現:
核心特性回顧¶
- 列表推導式:提供了簡潔的列表創建方式,相比傳統循環更加Pythonic,性能更優
- 字典和集合推導式:擴展了推導式的概念,支持字典和集合的快速構建
- 生成器:實現惰性求值,節省內存,支持無限序列和管道處理
- 迭代器:提供統一的遍歷接口,支持自定義迭代行爲
- 裝飾器:實現橫切關注點的分離,如日誌、緩存、權限檢查等
- 閉包:支持函數式編程,實現數據封裝和狀態保持
- 上下文管理器:確保資源的正確管理,提供異常安全的代碼結構
最佳實踐建議¶
-
合理使用推導式:
- 優先使用推導式而非傳統循環
- 避免過度複雜的嵌套推導式
- 考慮代碼可讀性和維護性 -
善用生成器:
- 處理大數據集時優先考慮生成器
- 利用生成器實現管道處理
- 合理使用yield和send方法 -
裝飾器設計原則:
- 保持裝飾器的單一職責
- 使用functools.wraps保持函數元信息
- 考慮裝飾器的組合和順序 -
閉包應用場景:
- 實現函數工廠和配置化函數
- 避免閉包陷阱,注意變量綁定時機
- 合理使用nonlocal關鍵字 -
上下文管理器使用:
- 優先使用with語句管理資源
- 自定義上下文管理器時確保異常安全
- 利用contextlib模塊簡化實現
性能考慮¶
- 推導式 vs 循環:推導式通常性能更好,但差異不大
- 生成器 vs 列表:生成器在內存使用上有顯著優勢
- 裝飾器開銷:裝飾器會增加函數調用開銷,但通常可以忽略
- 閉包性能:閉包訪問外部變量比局部變量稍慢
學習建議¶
- 循序漸進:從簡單的推導式開始,逐步掌握複雜特性
- 實踐應用:在實際項目中應用這些特性,加深理解
- 閱讀源碼:研究優秀開源項目中這些特性的使用
- 性能測試:在關鍵代碼路徑上進行性能測試和優化
掌握這些Python高級特性將顯著提升你的編程水平,讓你能夠編寫更加優雅、高效和Pythonic的代碼。這些特性不僅是語法糖,更是Python哲學”優雅勝於醜陋,簡潔勝於複雜”的具體體現。