第10章 Python高级特性¶
Python作为一门现代化的编程语言,提供了许多高级特性来帮助开发者编写更加简洁、高效和优雅的代码。本章将深入探讨Python的核心高级特性,包括推导式、生成器、迭代器、装饰器、闭包和上下文管理器等。这些特性不仅能提高代码的可读性和性能,还体现了Python”优雅胜于丑陋”的设计哲学。
本系列文章所使用到的示例源码:Python从入门到精通示例代码
10.1 列表推导式¶
列表推导式(List Comprehension)是Python中最受欢迎的特性之一,它提供了一种简洁而强大的方式来创建列表。列表推导式不仅代码更加简洁,在很多情况下性能也优于传统的循环方式。
10.1.1 列表推导式的概念¶
列表推导式是一种基于现有可迭代对象创建新列表的简洁语法。它将循环和条件判断融合在一个表达式中,体现了函数式编程的思想。相比传统的for循环,列表推导式具有以下优势:
- 简洁性:一行代码完成多行循环的功能
- 可读性:表达式更接近自然语言的描述
- 性能:在C语言层面优化,执行效率更高
- 函数式风格:避免了显式的循环变量管理
10.1.2 基本语法结构¶
列表推导式的基本语法结构为:[expression for item in iterable]
# 基本列表推导式示例
numbers = [1, 2, 3, 4, 5]
squares = [x**2 for x in numbers]
print(f"原始列表: {numbers}")
print(f"平方列表: {squares}")
输出结果:
原始列表: [1, 2, 3, 4, 5]
平方列表: [1, 4, 9, 16, 25]
在这个例子中:
- x**2 是表达式部分,定义了如何转换每个元素
- for x in numbers 是迭代部分,定义了数据源和迭代变量
- 整个表达式返回一个新的列表
10.1.3 条件过滤¶
列表推导式可以包含条件判断,语法为:[expression for item in iterable if condition]
# 条件过滤示例
numbers = [1, 2, 3, 4, 5]
even_squares = [x**2 for x in numbers if x % 2 == 0]
print(f"偶数的平方: {even_squares}")
# 字符串处理
words = ['hello', 'world', 'python', 'programming']
capitalized = [word.capitalize() for word in words if len(word) > 5]
print(f"长度大于5的单词首字母大写: {capitalized}")
输出结果:
偶数的平方: [4, 16]
长度大于5的单词首字母大写: ['Python', 'Programming']
10.1.4 嵌套列表推导式¶
对于二维列表或更复杂的数据结构,可以使用嵌套的列表推导式:
# 嵌套列表推导式
matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
flattened = [num for row in matrix for num in row]
print(f"二维矩阵: {matrix}")
print(f"展平后: {flattened}")
输出结果:
二维矩阵: [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
展平后: [1, 2, 3, 4, 5, 6, 7, 8, 9]
10.1.5 性能对比¶
列表推导式通常比传统循环具有更好的性能:
import time
# 使用列表推导式
start_time = time.time()
list_comp_result = [x**2 for x in range(10000)]
list_comp_time = time.time() - start_time
# 使用传统循环
start_time = time.time()
loop_result = []
for x in range(10000):
loop_result.append(x**2)
loop_time = time.time() - start_time
print(f"列表推导式耗时: {list_comp_time:.6f}秒")
print(f"传统循环耗时: {loop_time:.6f}秒")
输出结果:
列表推导式耗时: 0.001000秒
传统循环耗时: 0.000997秒
虽然在这个简单例子中性能差异不大,但在更复杂的场景中,列表推导式通常表现更好。
10.2 字典推导式与集合推导式¶
除了列表推导式,Python还提供了字典推导式和集合推导式,它们遵循相似的语法规则,但用于创建不同类型的数据结构。这些推导式同样具有简洁、高效的特点。
10.2.1 字典推导式的语法¶
字典推导式的基本语法为:{key: value for item in iterable}
# 基本字典推导式
numbers = [1, 2, 3, 4, 5]
square_dict = {x: x**2 for x in numbers}
print(f"数字及其平方的字典: {square_dict}")
# 条件过滤
even_square_dict = {x: x**2 for x in numbers if x % 2 == 0}
print(f"偶数及其平方的字典: {even_square_dict}")
输出结果:
数字及其平方的字典: {1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
偶数及其平方的字典: {2: 4, 4: 16}
10.2.2 字符串处理与键值互换¶
字典推导式在字符串处理和数据转换中非常有用:
# 字符串处理
words = ['hello', 'world', 'python']
word_lengths = {word: len(word) for word in words}
print(f"单词长度字典: {word_lengths}")
# 字典键值互换
original_dict = {'a': 1, 'b': 2, 'c': 3}
swapped_dict = {v: k for k, v in original_dict.items()}
print(f"原字典: {original_dict}")
print(f"键值互换后: {swapped_dict}")
输出结果:
单词长度字典: {'hello': 5, 'world': 5, 'python': 6}
原字典: {'a': 1, 'b': 2, 'c': 3}
键值互换后: {1: 'a', 2: 'b', 3: 'c'}
10.2.3 集合推导式的语法¶
集合推导式的语法为:{expression for item in iterable},它会自动去除重复元素:
# 基本集合推导式
numbers = [1, 2, 2, 3, 3, 4, 5]
unique_squares = {x**2 for x in numbers}
print(f"原列表: {numbers}")
print(f"去重后的平方集合: {unique_squares}")
# 条件过滤
even_set = {x for x in numbers if x % 2 == 0}
print(f"偶数集合: {even_set}")
输出结果:
原列表: [1, 2, 2, 3, 3, 4, 5]
去重后的平方集合: {1, 4, 9, 16, 25}
偶数集合: {2, 4}
10.2.4 集合运算与嵌套推导式¶
集合推导式可以与集合运算结合,处理更复杂的逻辑:
# 集合运算
set1 = {1, 2, 3, 4, 5}
set2 = {4, 5, 6, 7, 8}
intersection_squares = {x**2 for x in set1 if x in set2}
print(f"集合1: {set1}")
print(f"集合2: {set2}")
print(f"交集元素的平方: {intersection_squares}")
# 嵌套推导式
matrix = [[1, 2], [3, 4], [5, 6]]
flattened_set = {num for row in matrix for num in row if num % 2 == 0}
print(f"矩阵: {matrix}")
print(f"偶数元素集合: {flattened_set}")
输出结果:
集合1: {1, 2, 3, 4, 5}
集合2: {4, 5, 6, 7, 8}
交集元素的平方: {16, 25}
矩阵: [[1, 2], [3, 4], [5, 6]]
偶数元素集合: {2, 4, 6}
10.2.5 实际应用场景¶
推导式在数据处理、统计分析等场景中非常实用:
# 数据统计
scores = [85, 92, 78, 96, 88, 76, 94, 89]
grade_distribution = {
'A': len([s for s in scores if s >= 90]),
'B': len([s for s in scores if 80 <= s < 90]),
'C': len([s for s in scores if 70 <= s < 80]),
'D': len([s for s in scores if s < 70])
}
print(f"成绩分布: {grade_distribution}")
# 配置处理
config_list = [('debug', 'true'), ('port', '8080'), ('host', 'localhost')]
config_dict = {k: v for k, v in config_list}
print(f"配置字典: {config_dict}")
输出结果:
成绩分布: {'A': 3, 'B': 3, 'C': 1, 'D': 1}
配置字典: {'debug': 'true', 'port': '8080', 'host': 'localhost'}
10.2.6 推导式的限制与最佳实践¶
虽然推导式功能强大,但也需要注意其使用边界:
# 可读性边界示例
# 好的推导式:简洁明了
even_squares = [x**2 for x in range(10) if x % 2 == 0]
# 不推荐的推导式:过于复杂
# complex_result = [func(x) for x in data if condition1(x) and condition2(x) for y in other_data if related(x, y)]
# 对于复杂逻辑,使用传统循环更清晰
complex_result = []
for x in data:
if condition1(x) and condition2(x):
for y in other_data:
if related(x, y):
complex_result.append(func(x))
最佳实践原则:
- 保持推导式的简洁性,一行代码应该易于理解
- 复杂的条件判断或多层嵌套时考虑使用传统循环
- 优先考虑代码的可读性而非简洁性
- 在性能敏感的场景中,推导式通常是更好的选择
10.3 生成器(Generator)¶
生成器是Python中一个非常重要的特性,它提供了一种优雅的方式来创建迭代器。生成器采用惰性求值的策略,只在需要时才计算值,这使得它在处理大量数据或无限序列时具有显著的内存优势。
10.3.1 生成器的概念¶
生成器具有以下核心特性:
- 惰性求值:只在需要时才计算下一个值
- 内存效率:不需要将所有值同时存储在内存中
- 迭代器协议:自动实现了__iter__()和__next__()方法
- 状态保持:能够记住函数执行的状态
10.3.2 生成器函数¶
生成器函数使用yield关键字来产生值,而不是return:
# 基本生成器函数
def simple_generator():
print("开始生成")
yield 1
print("生成第二个值")
yield 2
print("生成第三个值")
yield 3
print("生成结束")
# 使用生成器
gen = simple_generator()
print("创建生成器对象")
for value in gen:
print(f"获得值: {value}")
输出结果:
创建生成器对象
开始生成
获得值: 1
生成第二个值
获得值: 2
生成第三个值
获得值: 3
生成结束
10.3.3 斐波那契数列生成器¶
生成器特别适合生成数学序列:
# 斐波那契数列生成器
def fibonacci_generator(n):
"""生成前n个斐波那契数"""
a, b = 0, 1
count = 0
while count < n:
yield a
a, b = b, a + b
count += 1
# 使用斐波那契生成器
print("前10个斐波那契数:")
for num in fibonacci_generator(10):
print(num, end=" ")
print()
输出结果:
前10个斐波那契数:
0 1 1 2 3 5 8 13 21 34
10.3.4 生成器表达式¶
生成器表达式提供了一种更简洁的创建生成器的方式:
# 生成器表达式
squares_gen = (x**2 for x in range(5))
print(f"生成器对象: {squares_gen}")
print("生成器产生的值:")
for square in squares_gen:
print(square, end=" ")
print()
输出结果:
生成器对象: <generator object <genexpr> at 0x...>
生成器产生的值:
0 1 4 9 16
10.3.5 内存使用对比¶
生成器相比列表推导式具有显著的内存优势:
import sys
# 列表推导式
list_comp = [x**2 for x in range(1000)]
list_size = sys.getsizeof(list_comp)
# 生成器表达式
gen_expr = (x**2 for x in range(1000))
gen_size = sys.getsizeof(gen_expr)
print(f"列表推导式内存使用: {list_size} 字节")
print(f"生成器表达式内存使用: {gen_size} 字节")
print(f"内存节省: {list_size - gen_size} 字节")
print(f"节省比例: {(list_size - gen_size) / list_size * 100:.1f}%")
输出结果:
列表推导式内存使用: 8856 字节
生成器表达式内存使用: 104 字节
内存节省: 8752 字节
节省比例: 98.8%
10.3.6 无限序列生成器¶
生成器可以创建理论上无限的序列:
# 无限计数器
def infinite_counter(start=0, step=1):
"""无限计数生成器"""
current = start
while True:
yield current
current += step
# 使用无限生成器(注意:需要手动停止)
counter = infinite_counter(1, 2)
print("前10个奇数:")
for i, num in enumerate(counter):
if i >= 10:
break
print(num, end=" ")
print()
输出结果:
前10个奇数:
1 3 5 7 9 11 13 15 17 19
10.3.7 生成器的send方法¶
生成器支持双向通信,可以通过send()方法向生成器发送值:
# 支持send的生成器
def echo_generator():
"""回声生成器,可以接收外部发送的值"""
value = None
while True:
received = yield value
if received is not None:
value = f"Echo: {received}"
else:
value = "等待输入..."
# 使用send方法
echo_gen = echo_generator()
print(next(echo_gen)) # 启动生成器
print(echo_gen.send("Hello"))
print(echo_gen.send("World"))
print(next(echo_gen))
输出结果:
None
Echo: Hello
Echo: World
等待输入...
10.3.8 管道处理示例¶
生成器非常适合构建数据处理管道:
# 数据处理管道
def read_numbers():
"""模拟读取数字数据"""
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
for num in numbers:
print(f"读取: {num}")
yield num
def filter_even(numbers):
"""过滤偶数"""
for num in numbers:
if num % 2 == 0:
print(f"过滤得到偶数: {num}")
yield num
def square_numbers(numbers):
"""计算平方"""
for num in numbers:
result = num ** 2
print(f"计算平方: {num}^2 = {result}")
yield result
# 构建处理管道
pipeline = square_numbers(filter_even(read_numbers()))
print("开始处理管道:")
results = list(pipeline)
print(f"最终结果: {results}")
输出结果:
开始处理管道:
读取: 1
读取: 2
过滤得到偶数: 2
计算平方: 2^2 = 4
读取: 3
读取: 4
过滤得到偶数: 4
计算平方: 4^2 = 16
读取: 5
读取: 6
过滤得到偶数: 6
计算平方: 6^2 = 36
读取: 7
读取: 8
过滤得到偶数: 8
计算平方: 8^2 = 64
读取: 9
读取: 10
过滤得到偶数: 10
计算平方: 10^2 = 100
最终结果: [4, 16, 36, 64, 100]
10.3.9 生成器的应用场景¶
生成器在以下场景中特别有用:
- 大数据处理:逐行处理大文件而不需要将整个文件加载到内存
- 无限序列:生成数学序列、随机数序列等
- 管道处理:构建数据处理流水线
- 内存优化:在内存受限的环境中处理大量数据
- 协程基础:为异步编程提供基础支持
生成器是Python中实现高效、优雅代码的重要工具,掌握其使用方法对于编写高质量的Python程序至关重要。
10.4 迭代器(Iterator)¶
迭代器是Python中实现迭代协议的对象,它提供了一种统一的方式来遍历容器中的元素。理解迭代器的工作原理对于掌握Python的高级特性至关重要。
10.4.1 迭代器协议¶
迭代器协议包含两个核心方法:
- __iter__():返回迭代器对象本身
- __next__():返回下一个值,没有更多值时抛出StopIteration异常
# 自定义迭代器类
class Counter:
"""简单的计数器迭代器"""
def __init__(self, start, end):
self.current = start
self.end = end
def __iter__(self):
return self
def __next__(self):
if self.current < self.end:
num = self.current
self.current += 1
return num
else:
raise StopIteration
# 使用自定义迭代器
print("使用自定义计数器迭代器:")
counter = Counter(1, 5)
for num in counter:
print(f"计数: {num}")
输出结果:
使用自定义计数器迭代器:
计数: 1
计数: 2
计数: 3
计数: 4
10.4.2 可迭代对象与迭代器的区别¶
可迭代对象和迭代器是两个不同的概念:
# 演示可迭代对象与迭代器的区别
class NumberList:
"""可迭代对象:包含数据,但不直接实现迭代逻辑"""
def __init__(self, numbers):
self.numbers = numbers
def __iter__(self):
return NumberIterator(self.numbers)
class NumberIterator:
"""迭代器:实现具体的迭代逻辑"""
def __init__(self, numbers):
self.numbers = numbers
self.index = 0
def __iter__(self):
return self
def __next__(self):
if self.index < len(self.numbers):
value = self.numbers[self.index]
self.index += 1
return value
else:
raise StopIteration
# 使用可迭代对象
number_list = NumberList([10, 20, 30, 40])
print("第一次迭代:")
for num in number_list:
print(f"数字: {num}")
print("第二次迭代:")
for num in number_list:
print(f"数字: {num}")
# 直接使用迭代器
print("直接使用迭代器:")
iterator = NumberIterator([1, 2, 3])
print(f"迭代器类型: {type(iterator)}")
print(f"是否为迭代器: {hasattr(iterator, '__next__')}")
print(f"是否为可迭代对象: {hasattr(iterator, '__iter__')}")
输出结果:
第一次迭代:
数字: 10
数字: 20
数字: 30
数字: 40
第二次迭代:
数字: 10
数字: 20
数字: 30
数字: 40
直接使用迭代器:
迭代器类型: <class '__main__.NumberIterator'>
是否为迭代器: True
是否为可迭代对象: True
10.4.3 斐波那契迭代器¶
实现一个更复杂的迭代器示例:
# 斐波那契迭代器
class FibonacciIterator:
"""斐波那契数列迭代器"""
def __init__(self, max_count):
self.max_count = max_count
self.count = 0
self.current = 0
self.next_val = 1
def __iter__(self):
return self
def __next__(self):
if self.count < self.max_count:
result = self.current
self.current, self.next_val = self.next_val, self.current + self.next_val
self.count += 1
return result
else:
raise StopIteration
# 使用斐波那契迭代器
print("斐波那契数列迭代器:")
fib_iter = FibonacciIterator(8)
for fib_num in fib_iter:
print(fib_num, end=" ")
print()
输出结果:
斐波那契数列迭代器:
0 1 1 2 3 5 8 13
10.4.4 内置迭代器工具¶
Python提供了许多内置的迭代器工具:
# enumerate() - 添加索引
fruits = ['apple', 'banana', 'orange']
print("enumerate示例:")
for index, fruit in enumerate(fruits, start=1):
print(f"{index}. {fruit}")
# zip() - 并行迭代
names = ['Alice', 'Bob', 'Charlie']
ages = [25, 30, 35]
print("\nzip示例:")
for name, age in zip(names, ages):
print(f"{name} is {age} years old")
# reversed() - 反向迭代
numbers = [1, 2, 3, 4, 5]
print("\nreversed示例:")
for num in reversed(numbers):
print(num, end=" ")
print()
输出结果:
enumerate示例:
1. apple
2. banana
3. orange
zip示例:
Alice is 25 years old
Bob is 30 years old
Charlie is 35 years old
reversed示例:
5 4 3 2 1
10.4.5 迭代器的状态管理¶
迭代器会保持其内部状态,这意味着它们只能被消费一次:
# 迭代器状态管理示例
class StatefulIterator:
"""展示迭代器状态管理的示例"""
def __init__(self, data):
self.data = data
self.index = 0
def __iter__(self):
return self
def __next__(self):
if self.index < len(self.data):
value = self.data[self.index]
self.index += 1
print(f"返回索引 {self.index-1} 的值: {value}")
return value
else:
print("迭代器已耗尽")
raise StopIteration
# 演示迭代器状态
print("迭代器状态管理:")
iterator = StatefulIterator(['a', 'b', 'c'])
print("第一次迭代:")
for item in iterator:
pass
print("第二次迭代:")
for item in iterator:
print(f"获得: {item}")
else:
print("第二次迭代没有产生任何值")
输出结果:
迭代器状态管理:
第一次迭代:
返回索引 0 的值: a
返回索引 1 的值: b
返回索引 2 的值: c
第二次迭代:
迭代器已耗尽
第二次迭代没有产生任何值
10.4.6 迭代器链¶
可以将多个迭代器连接起来形成迭代器链:
# 迭代器链示例
class ChainIterator:
"""连接多个可迭代对象的迭代器"""
def __init__(self, *iterables):
self.iterables = iterables
self.current_iterable = 0
self.current_iterator = iter(self.iterables[0]) if self.iterables else iter([])
def __iter__(self):
return self
def __next__(self):
while self.current_iterable < len(self.iterables):
try:
return next(self.current_iterator)
except StopIteration:
self.current_iterable += 1
if self.current_iterable < len(self.iterables):
self.current_iterator = iter(self.iterables[self.current_iterable])
else:
raise StopIteration
raise StopIteration
# 使用迭代器链
print("迭代器链示例:")
chain = ChainIterator([1, 2, 3], ['a', 'b'], [10, 20])
for item in chain:
print(item, end=" ")
print()
输出结果:
迭代器链示例:
1 2 3 a b 10 20
10.4.7 itertools模块简介¶
itertools模块提供了许多有用的迭代器工具:
import itertools
# count() - 无限计数
print("itertools.count示例:")
counter = itertools.count(10, 2) # 从10开始,步长为2
for i, num in enumerate(counter):
if i >= 5:
break
print(num, end=" ")
print()
# cycle() - 循环迭代
print("\nitertools.cycle示例:")
cycler = itertools.cycle(['A', 'B', 'C'])
for i, letter in enumerate(cycler):
if i >= 8:
break
print(letter, end=" ")
print()
# chain() - 连接迭代器
print("\nitertools.chain示例:")
chained = itertools.chain([1, 2, 3], ['x', 'y'], [10, 20])
for item in chained:
print(item, end=" ")
print()
输出结果:
itertools.count示例:
10 12 14 16 18
itertools.cycle示例:
A B C A B C A B
itertools.chain示例:
1 2 3 x y 10 20
10.4.8 迭代器的应用场景¶
迭代器在以下场景中特别有用:
- 内存效率:处理大量数据时避免一次性加载到内存
- 惰性求值:只在需要时才计算值
- 无限序列:表示理论上无限的数据序列
- 状态保持:在迭代过程中维护复杂的状态信息
- 协议统一:为不同类型的容器提供统一的遍历接口
迭代器是Python中一个基础而重要的概念,它为许多高级特性(如生成器、推导式等)提供了理论基础。
10.5 装饰器(Decorator)¶
装饰器是Python中一个强大而优雅的特性,它允许我们在不修改原函数代码的情况下,为函数添加额外的功能。装饰器体现了”开放-封闭原则”,即对扩展开放,对修改封闭。
10.5.1 装饰器的概念¶
装饰器基于以下Python特性:
- 函数是一等公民:函数可以作为参数传递和返回值
- 高阶函数:接受函数作为参数或返回函数的函数
- 闭包:内部函数可以访问外部函数的变量
- 语法糖:@decorator语法使代码更简洁
10.5.2 简单装饰器¶
最基本的装饰器实现:
# 简单装饰器示例
def my_decorator(func):
"""简单的装饰器函数"""
def wrapper():
print("装饰器:函数执行前")
result = func()
print("装饰器:函数执行后")
return result
return wrapper
# 使用装饰器
@my_decorator
def say_hello():
"""被装饰的函数"""
print("Hello, World!")
return "greeting"
# 调用被装饰的函数
print("调用装饰后的函数:")
result = say_hello()
print(f"函数返回值: {result}")
输出结果:
调用装饰后的函数:
装饰器:函数执行前
Hello, World!
装饰器:函数执行后
函数返回值: greeting
10.5.3 计时装饰器¶
实用的性能监控装饰器:
import time
import functools
def timing_decorator(func):
"""计时装饰器,测量函数执行时间"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
execution_time = end_time - start_time
print(f"函数 {func.__name__} 执行时间: {execution_time:.4f}秒")
return result
return wrapper
@timing_decorator
def slow_function():
"""模拟耗时操作"""
print("执行耗时操作...")
time.sleep(0.1) # 模拟0.1秒的操作
return "操作完成"
# 测试计时装饰器
print("测试计时装饰器:")
result = slow_function()
print(f"结果: {result}")
输出结果:
测试计时装饰器:
执行耗时操作...
函数 slow_function 执行时间: 0.1001秒
结果: 操作完成
10.5.4 带参数的装饰器¶
装饰器工厂允许我们创建可配置的装饰器:
def repeat(times):
"""重复执行装饰器工厂"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
results = []
for i in range(times):
print(f"第 {i+1} 次执行:")
result = func(*args, **kwargs)
results.append(result)
return results
return wrapper
return decorator
@repeat(3)
def greet(name):
"""问候函数"""
message = f"Hello, {name}!"
print(message)
return message
# 测试带参数的装饰器
print("测试重复执行装饰器:")
results = greet("Alice")
print(f"所有结果: {results}")
输出结果:
测试重复执行装饰器:
第 1 次执行:
Hello, Alice!
第 2 次执行:
Hello, Alice!
第 3 次执行:
Hello, Alice!
所有结果: ['Hello, Alice!', 'Hello, Alice!', 'Hello, Alice!']
10.5.5 日志装饰器¶
实用的日志记录装饰器:
import functools
from datetime import datetime
def log_calls(level="INFO"):
"""日志装饰器工厂"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
args_str = ", ".join(map(str, args))
kwargs_str = ", ".join(f"{k}={v}" for k, v in kwargs.items())
params = ", ".join(filter(None, [args_str, kwargs_str]))
print(f"[{level}] {timestamp} - 调用函数 {func.__name__}({params})")
try:
result = func(*args, **kwargs)
print(f"[{level}] {timestamp} - 函数 {func.__name__} 执行成功,返回: {result}")
return result
except Exception as e:
print(f"[ERROR] {timestamp} - 函数 {func.__name__} 执行失败: {e}")
raise
return wrapper
return decorator
@log_calls("DEBUG")
def calculate(a, b, operation="add"):
"""计算函数"""
if operation == "add":
return a + b
elif operation == "multiply":
return a * b
else:
raise ValueError(f"不支持的操作: {operation}")
# 测试日志装饰器
print("测试日志装饰器:")
result1 = calculate(5, 3)
result2 = calculate(4, 7, operation="multiply")
print(f"计算结果: {result1}, {result2}")
输出结果:
测试日志装饰器:
[DEBUG] 2024-01-15 10:30:45 - 调用函数 calculate(5, 3, operation=add)
[DEBUG] 2024-01-15 10:30:45 - 函数 calculate 执行成功,返回: 8
[DEBUG] 2024-01-15 10:30:45 - 调用函数 calculate(4, 7, operation=multiply)
[DEBUG] 2024-01-15 10:30:45 - 函数 calculate 执行成功,返回: 28
计算结果: 8, 28
10.5.6 类装饰器¶
使用类实现装饰器:
class CountCalls:
"""计数装饰器类"""
def __init__(self, func):
self.func = func
self.count = 0
functools.update_wrapper(self, func)
def __call__(self, *args, **kwargs):
self.count += 1
print(f"函数 {self.func.__name__} 被调用第 {self.count} 次")
return self.func(*args, **kwargs)
def get_count(self):
"""获取调用次数"""
return self.count
@CountCalls
def fibonacci(n):
"""计算斐波那契数"""
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
# 测试类装饰器
print("测试类装饰器:")
result = fibonacci(5)
print(f"fibonacci(5) = {result}")
print(f"总调用次数: {fibonacci.get_count()}")
输出结果:
测试类装饰器:
函数 fibonacci 被调用第 1 次
函数 fibonacci 被调用第 2 次
函数 fibonacci 被调用第 3 次
函数 fibonacci 被调用第 4 次
函数 fibonacci 被调用第 5 次
函数 fibonacci 被调用第 6 次
函数 fibonacci 被调用第 7 次
函数 fibonacci 被调用第 8 次
函数 fibonacci 被调用第 9 次
函数 fibonacci 被调用第 10 次
函数 fibonacci 被调用第 11 次
函数 fibonacci 被调用第 12 次
函数 fibonacci 被调用第 13 次
函数 fibonacci 被调用第 14 次
函数 fibonacci 被调用第 15 次
fibonacci(5) = 5
总调用次数: 15
10.5.7 缓存装饰器¶
实现简单的缓存机制:
def simple_cache(func):
"""简单的缓存装饰器"""
cache = {}
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 创建缓存键
key = str(args) + str(sorted(kwargs.items()))
if key in cache:
print(f"缓存命中: {func.__name__}{args}")
return cache[key]
print(f"计算结果: {func.__name__}{args}")
result = func(*args, **kwargs)
cache[key] = result
return result
# 添加清除缓存的方法
wrapper.clear_cache = lambda: cache.clear()
wrapper.cache_info = lambda: f"缓存大小: {len(cache)}"
return wrapper
@simple_cache
def expensive_calculation(n):
"""模拟耗时计算"""
time.sleep(0.1) # 模拟计算时间
return n * n * n
# 测试缓存装饰器
print("测试缓存装饰器:")
print(f"第一次调用: {expensive_calculation(5)}")
print(f"第二次调用: {expensive_calculation(5)}")
print(f"第三次调用: {expensive_calculation(3)}")
print(f"第四次调用: {expensive_calculation(5)}")
print(expensive_calculation.cache_info())
输出结果:
测试缓存装饰器:
计算结果: expensive_calculation(5,)
第一次调用: 125
缓存命中: expensive_calculation(5,)
第二次调用: 125
计算结果: expensive_calculation(3,)
第三次调用: 27
缓存命中: expensive_calculation(5,)
第四次调用: 125
缓存大小: 2
10.5.8 内置装饰器¶
Python提供了一些常用的内置装饰器:
class Person:
"""演示内置装饰器的类"""
def __init__(self, name, age):
self._name = name
self._age = age
@property
def name(self):
"""name属性的getter"""
print("获取name属性")
return self._name
@name.setter
def name(self, value):
"""name属性的setter"""
print(f"设置name属性为: {value}")
if not isinstance(value, str):
raise TypeError("姓名必须是字符串")
self._name = value
@staticmethod
def is_adult(age):
"""静态方法:判断是否成年"""
return age >= 18
@classmethod
def from_string(cls, person_str):
"""类方法:从字符串创建Person对象"""
name, age = person_str.split('-')
return cls(name, int(age))
def __str__(self):
return f"Person(name={self.name}, age={self._age})"
# 测试内置装饰器
print("测试内置装饰器:")
# 测试@property
person = Person("Alice", 25)
print(f"获取姓名: {person.name}")
person.name = "Bob"
print(f"修改后: {person}")
# 测试@staticmethod
print(f"18岁是否成年: {Person.is_adult(18)}")
print(f"16岁是否成年: {Person.is_adult(16)}")
# 测试@classmethod
person2 = Person.from_string("Charlie-30")
print(f"从字符串创建: {person2}")
输出结果:
测试内置装饰器:
获取name属性
获取姓名: Alice
设置name属性为: Bob
获取name属性
修改后: Person(name=Bob, age=25)
18岁是否成年: True
16岁是否成年: False
获取name属性
从字符串创建: Person(name=Charlie, age=30)
10.5.9 装饰器的应用场景¶
装饰器在以下场景中特别有用:
- 日志记录:自动记录函数调用信息
- 性能监控:测量函数执行时间和资源使用
- 权限检查:验证用户权限后再执行函数
- 缓存机制:缓存函数结果以提高性能
- 重试机制:在函数失败时自动重试
- 参数验证:检查函数参数的有效性
- 事务管理:自动处理数据库事务
- API限流:控制函数调用频率
装饰器是Python中实现横切关注点(Cross-cutting Concerns)的优雅方式,它让我们能够以声明式的方式为函数添加功能,保持代码的清洁和可维护性。
10.6 闭包(Closure)¶
闭包是函数式编程中的一个重要概念,它允许内部函数访问外部函数的变量,即使外部函数已经执行完毕。闭包在Python中有着广泛的应用,特别是在装饰器、回调函数和函数工厂等场景中。
10.6.1 闭包的概念¶
闭包具有以下核心特征:
- 嵌套函数:函数内部定义另一个函数
- 自由变量:内部函数引用外部函数的变量
- 词法作用域:变量的作用域由代码结构决定
- 变量持久化:外部函数执行完毕后,变量仍然存在
10.6.2 基本闭包示例¶
最简单的闭包实现:
# 基本闭包示例
def outer_function(x):
"""外部函数"""
print(f"外部函数接收参数: {x}")
def inner_function(y):
"""内部函数,形成闭包"""
print(f"内部函数接收参数: {y}")
print(f"访问外部变量: {x}")
return x + y
print("外部函数返回内部函数")
return inner_function
# 创建闭包
print("创建闭包:")
closure = outer_function(10)
print(f"闭包对象: {closure}")
print(f"闭包类型: {type(closure)}")
# 调用闭包
print("\n调用闭包:")
result = closure(5)
print(f"结果: {result}")
输出结果:
创建闭包:
外部函数接收参数: 10
外部函数返回内部函数
闭包对象: <function outer_function.<locals>.inner_function at 0x...>
闭包类型: <class 'function'>
调用闭包:
内部函数接收参数: 5
访问外部变量: 10
结果: 15
10.6.3 闭包变量的持久化¶
闭包能够保持外部函数变量的状态:
# 闭包变量持久化示例
def create_counter(initial_value=0):
"""创建计数器闭包"""
count = initial_value
def counter():
nonlocal count
count += 1
return count
return counter
# 创建多个独立的计数器
print("创建独立的计数器:")
counter1 = create_counter(0)
counter2 = create_counter(100)
print("counter1的调用:")
for i in range(3):
print(f"counter1(): {counter1()}")
print("\ncounter2的调用:")
for i in range(3):
print(f"counter2(): {counter2()}")
print("\n再次调用counter1:")
print(f"counter1(): {counter1()}")
输出结果:
创建独立的计数器:
counter1的调用:
counter1(): 1
counter1(): 2
counter1(): 3
counter2的调用:
counter2(): 101
counter2(): 102
counter2(): 103
再次调用counter1:
counter1(): 4
10.6.4 闭包实现装饰器¶
闭包是实现装饰器的基础:
# 使用闭包实现装饰器
def create_multiplier_decorator(factor):
"""创建乘法装饰器的工厂函数"""
print(f"创建乘法因子为 {factor} 的装饰器")
def decorator(func):
print(f"装饰函数 {func.__name__}")
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
multiplied_result = result * factor
print(f"原始结果: {result}, 乘以 {factor} 后: {multiplied_result}")
return multiplied_result
return wrapper
return decorator
# 使用闭包装饰器
@create_multiplier_decorator(3)
def calculate_area(length, width):
"""计算面积"""
area = length * width
print(f"计算面积: {length} × {width} = {area}")
return area
# 测试闭包装饰器
print("\n测试闭包装饰器:")
result = calculate_area(4, 5)
print(f"最终结果: {result}")
输出结果:
创建乘法因子为 3 的装饰器
装饰函数 calculate_area
测试闭包装饰器:
计算面积: 4 × 5 = 20
原始结果: 20, 乘以 3 后: 60
最终结果: 60
10.6.5 函数工厂¶
闭包可以用来创建具有特定行为的函数:
# 函数工厂示例
def create_validator(min_value, max_value):
"""创建数值验证器"""
def validator(value):
if not isinstance(value, (int, float)):
return False, f"值必须是数字,得到: {type(value).__name__}"
if value < min_value:
return False, f"值 {value} 小于最小值 {min_value}"
if value > max_value:
return False, f"值 {value} 大于最大值 {max_value}"
return True, f"值 {value} 有效"
# 添加验证器的描述信息
validator.description = f"验证范围: [{min_value}, {max_value}]"
return validator
# 创建不同的验证器
age_validator = create_validator(0, 120)
percentage_validator = create_validator(0, 100)
temperature_validator = create_validator(-273.15, 1000)
# 测试验证器
print("测试函数工厂创建的验证器:")
validators = [
(age_validator, "年龄验证器", [25, -5, 150]),
(percentage_validator, "百分比验证器", [85, 105, 50.5]),
(temperature_validator, "温度验证器", [25.5, -300, "abc"])
]
for validator, name, test_values in validators:
print(f"\n{name} ({validator.description}):")
for value in test_values:
is_valid, message = validator(value)
status = "✓" if is_valid else "✗"
print(f" {status} {message}")
输出结果:
测试函数工厂创建的验证器:
年龄验证器 (验证范围: [0, 120]):
✓ 值 25 有效
✗ 值 -5 小于最小值 0
✗ 值 150 大于最大值 120
百分比验证器 (验证范围: [0, 100]):
✓ 值 85 有效
✗ 值 105 大于最大值 100
✓ 值 50.5 有效
温度验证器 (验证范围: [-273.15, 1000]):
✓ 值 25.5 有效
✗ 值 -300 小于最小值 -273.15
✗ 值必须是数字,得到: str
10.6.6 配置化函数¶
闭包可以创建具有预设配置的函数:
# 配置化函数示例
def create_formatter(prefix="", suffix="", separator=" "):
"""创建格式化函数"""
def formatter(*args):
# 将所有参数转换为字符串并连接
content = separator.join(str(arg) for arg in args)
return f"{prefix}{content}{suffix}"
# 添加配置信息
formatter.config = {
'prefix': prefix,
'suffix': suffix,
'separator': separator
}
return formatter
# 创建不同配置的格式化函数
html_formatter = create_formatter("<p>", "</p>")
csv_formatter = create_formatter("", "", ",")
log_formatter = create_formatter("[LOG] ", " [END]")
bracket_formatter = create_formatter("(", ")", ", ")
# 测试配置化函数
print("测试配置化函数:")
formatters = [
(html_formatter, "HTML格式化器", ["Hello", "World"]),
(csv_formatter, "CSV格式化器", ["Name", "Age", "City"]),
(log_formatter, "日志格式化器", ["Error", "File not found"]),
(bracket_formatter, "括号格式化器", [1, 2, 3, 4])
]
for formatter, name, test_data in formatters:
result = formatter(*test_data)
config = formatter.config
print(f"\n{name}:")
print(f" 配置: {config}")
print(f" 输入: {test_data}")
print(f" 输出: {result}")
输出结果:
测试配置化函数:
HTML格式化器:
配置: {'prefix': '<p>', 'suffix': '</p>', 'separator': ' '}
输入: ['Hello', 'World']
输出: <p>Hello World</p>
CSV格式化器:
配置: {'prefix': '', 'suffix': '', 'separator': ','}
输入: ['Name', 'Age', 'City']
输出: Name,Age,City
日志格式化器:
配置: {'prefix': '[LOG] ', 'suffix': ' [END]', 'separator': ' '}
输入: ['Error', 'File not found']
输出: [LOG] Error File not found [END]
括号格式化器:
配置: {'prefix': '(', 'suffix': ')', 'separator': ', '}
输入: [1, 2, 3, 4]
输出: (1, 2, 3, 4)
10.6.7 事件处理闭包¶
闭包在事件处理中的应用:
# 事件处理闭包示例
def create_event_handler(event_type, handler_name):
"""创建事件处理器"""
call_count = 0
def handle_event(data):
nonlocal call_count
call_count += 1
timestamp = time.strftime("%H:%M:%S")
print(f"[{timestamp}] {handler_name} 处理 {event_type} 事件 (第{call_count}次)")
print(f" 事件数据: {data}")
return f"处理完成: {event_type} - {data}"
# 添加处理器信息
handle_event.info = {
'event_type': event_type,
'handler_name': handler_name,
'call_count': lambda: call_count
}
return handle_event
# 创建事件处理器
click_handler = create_event_handler("click", "按钮点击处理器")
keypress_handler = create_event_handler("keypress", "键盘输入处理器")
mouse_handler = create_event_handler("mouseover", "鼠标悬停处理器")
# 模拟事件处理
print("模拟事件处理:")
events = [
(click_handler, "button_1"),
(keypress_handler, "Enter"),
(click_handler, "button_2"),
(mouse_handler, "menu_item"),
(click_handler, "button_1"),
(keypress_handler, "Escape")
]
for handler, data in events:
result = handler(data)
print(f" 返回: {result}")
print()
# 显示处理器统计信息
print("处理器统计信息:")
handlers = [click_handler, keypress_handler, mouse_handler]
for handler in handlers:
info = handler.info
print(f"{info['handler_name']}: {info['call_count']()} 次调用")
输出结果:
模拟事件处理:
[10:30:45] 按钮点击处理器 处理 click 事件 (第1次)
事件数据: button_1
返回: 处理完成: click - button_1
[10:30:45] 键盘输入处理器 处理 keypress 事件 (第1次)
事件数据: Enter
返回: 处理完成: keypress - Enter
[10:30:45] 按钮点击处理器 处理 click 事件 (第2次)
事件数据: button_2
返回: 处理完成: click - button_2
[10:30:45] 鼠标悬停处理器 处理 mouseover 事件 (第1次)
事件数据: menu_item
返回: 处理完成: mouseover - menu_item
[10:30:45] 按钮点击处理器 处理 click 事件 (第3次)
事件数据: button_1
返回: 处理完成: click - button_1
[10:30:45] 键盘输入处理器 处理 keypress 事件 (第2次)
事件数据: Escape
返回: 处理完成: keypress - Escape
处理器统计信息:
按钮点击处理器: 3 次调用
键盘输入处理器: 2 次调用
鼠标悬停处理器: 1 次调用
10.6.8 闭包作用域链¶
理解闭包的作用域链:
# 闭包作用域链示例
global_var = "全局变量"
def level1_function(level1_var):
"""第一层函数"""
print(f"Level 1: {level1_var}")
def level2_function(level2_var):
"""第二层函数"""
print(f"Level 2: {level2_var}")
def level3_function(level3_var):
"""第三层函数,可以访问所有外层变量"""
print(f"Level 3: {level3_var}")
print(f"访问Level 2变量: {level2_var}")
print(f"访问Level 1变量: {level1_var}")
print(f"访问全局变量: {global_var}")
# 显示作用域链
return {
'level3': level3_var,
'level2': level2_var,
'level1': level1_var,
'global': global_var
}
return level3_function
return level2_function
# 创建嵌套闭包
print("创建嵌套闭包:")
closure_level2 = level1_function("Level1数据")
closure_level3 = closure_level2("Level2数据")
print("\n调用最内层闭包:")
scope_chain = closure_level3("Level3数据")
print(f"\n作用域链数据: {scope_chain}")
输出结果:
创建嵌套闭包:
Level 1: Level1数据
Level 2: Level2数据
调用最内层闭包:
Level 3: Level3数据
访问Level 2变量: Level2数据
访问Level 1变量: Level1数据
访问全局变量: 全局变量
作用域链数据: {'level3': 'Level3数据', 'level2': 'Level2数据', 'level1': 'Level1数据', 'global': '全局变量'}
10.6.9 nonlocal关键字¶
使用nonlocal修改外部函数的变量:
# nonlocal关键字示例
def create_bank_account(initial_balance):
"""创建银行账户闭包"""
balance = initial_balance
transaction_history = []
def deposit(amount):
"""存款"""
nonlocal balance
if amount > 0:
balance += amount
transaction_history.append(f"存款: +{amount}")
print(f"存款 {amount},当前余额: {balance}")
else:
print("存款金额必须大于0")
def withdraw(amount):
"""取款"""
nonlocal balance
if amount > 0:
if balance >= amount:
balance -= amount
transaction_history.append(f"取款: -{amount}")
print(f"取款 {amount},当前余额: {balance}")
else:
print(f"余额不足,当前余额: {balance}")
else:
print("取款金额必须大于0")
def get_balance():
"""查询余额"""
return balance
def get_history():
"""查询交易历史"""
return transaction_history.copy()
# 返回账户操作函数
return {
'deposit': deposit,
'withdraw': withdraw,
'balance': get_balance,
'history': get_history
}
# 创建银行账户
print("创建银行账户:")
account = create_bank_account(1000)
print(f"初始余额: {account['balance']()}")
# 进行一系列操作
print("\n进行银行操作:")
account['deposit'](500)
account['withdraw'](200)
account['withdraw'](1500) # 余额不足
account['deposit'](100)
account['withdraw'](300)
print(f"\n最终余额: {account['balance']()}")
print(f"交易历史: {account['history']()}")
输出结果:
创建银行账户:
初始余额: 1000
进行银行操作:
存款 500,当前余额: 1500
取款 200,当前余额: 1300
余额不足,当前余额: 1300
存款 100,当前余额: 1400
取款 300,当前余额: 1100
最终余额: 1100
交易历史: ['存款: +500', '取款: -200', '存款: +100', '取款: -300']
10.6.10 闭包陷阱¶
需要注意的闭包陷阱:
# 闭包陷阱示例
def create_functions_wrong():
"""错误的闭包创建方式"""
functions = []
for i in range(3):
# 错误:所有函数都会引用同一个变量i
def func():
return f"函数返回: {i}"
functions.append(func)
return functions
def create_functions_correct():
"""正确的闭包创建方式"""
functions = []
for i in range(3):
# 正确:使用默认参数捕获当前的i值
def func(x=i):
return f"函数返回: {x}"
functions.append(func)
return functions
def create_functions_lambda():
"""使用lambda的正确方式"""
return [lambda x=i: f"Lambda返回: {x}" for i in range(3)]
# 测试闭包陷阱
print("测试闭包陷阱:")
print("\n错误的闭包创建:")
wrong_functions = create_functions_wrong()
for idx, func in enumerate(wrong_functions):
print(f"函数{idx}: {func()}")
print("\n正确的闭包创建:")
correct_functions = create_functions_correct()
for idx, func in enumerate(correct_functions):
print(f"函数{idx}: {func()}")
print("\n使用Lambda的正确方式:")
lambda_functions = create_functions_lambda()
for idx, func in enumerate(lambda_functions):
print(f"Lambda{idx}: {func()}")
输出结果:
测试闭包陷阱:
错误的闭包创建:
函数0: 函数返回: 2
函数1: 函数返回: 2
函数2: 函数返回: 2
正确的闭包创建:
函数0: 函数返回: 0
函数1: 函数返回: 1
函数2: 函数返回: 2
使用Lambda的正确方式:
Lambda0: Lambda返回: 0
Lambda1: Lambda返回: 1
Lambda2: Lambda返回: 2
10.6.11 闭包的应用场景¶
闭包在以下场景中特别有用:
- 装饰器实现:保持装饰器的状态和配置
- 函数工厂:创建具有特定行为的函数
- 回调函数:保持回调函数的上下文信息
- 配置化函数:创建预配置的函数
- 状态保持:在函数调用间保持状态
- 数据封装:创建私有变量和方法
- 事件处理:保持事件处理器的状态
- 缓存机制:实现函数级别的缓存
闭包是Python中一个强大的特性,它为函数式编程提供了重要支持,使得代码更加灵活和模块化。理解闭包的工作原理对于掌握Python的高级特性至关重要。
10.7 上下文管理器(Context Manager)¶
上下文管理器是Python中用于资源管理的重要机制,它确保资源的正确获取和释放。通过with语句,上下文管理器提供了一种优雅的方式来处理需要清理的资源,如文件、网络连接、数据库连接等。
10.7.1 上下文管理器协议¶
上下文管理器需要实现两个特殊方法:
- __enter__():进入上下文时调用,返回资源对象
- __exit__(exc_type, exc_val, exc_tb):退出上下文时调用,处理清理工作
10.7.2 自定义上下文管理器类¶
使用类实现上下文管理器:
# 自定义计时器上下文管理器
import time
class Timer:
"""计时器上下文管理器"""
def __init__(self, name="操作"):
self.name = name
self.start_time = None
self.end_time = None
def __enter__(self):
"""进入上下文"""
print(f"开始执行 {self.name}...")
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出上下文"""
self.end_time = time.time()
duration = self.end_time - self.start_time
if exc_type is None:
print(f"{self.name} 执行完成,耗时: {duration:.4f}秒")
else:
print(f"{self.name} 执行出错,耗时: {duration:.4f}秒")
print(f"异常类型: {exc_type.__name__}")
print(f"异常信息: {exc_val}")
# 返回False表示不抑制异常
return False
@property
def elapsed_time(self):
"""获取执行时间"""
if self.start_time and self.end_time:
return self.end_time - self.start_time
return None
# 使用计时器上下文管理器
print("测试计时器上下文管理器:")
# 正常执行
with Timer("数据处理") as timer:
# 模拟一些工作
time.sleep(0.1)
result = sum(range(1000000))
print(f"计算结果: {result}")
print(f"通过属性获取执行时间: {timer.elapsed_time:.4f}秒")
# 异常情况
print("\n测试异常情况:")
try:
with Timer("错误操作"):
time.sleep(0.05)
raise ValueError("模拟错误")
except ValueError as e:
print(f"捕获异常: {e}")
输出结果:
测试计时器上下文管理器:
开始执行 数据处理...
计算结果: 499999500000
数据处理 执行完成,耗时: 0.1234秒
通过属性获取执行时间: 0.1234秒
测试异常情况:
开始执行 错误操作...
错误操作 执行出错,耗时: 0.0567秒
异常类型: ValueError
异常信息: 模拟错误
捕获异常: 模拟错误
10.7.3 文件管理上下文管理器¶
创建安全的文件操作上下文管理器:
# 文件管理上下文管理器
import os
import tempfile
class FileManager:
"""文件管理上下文管理器"""
def __init__(self, filename, mode='r', encoding='utf-8', backup=False):
self.filename = filename
self.mode = mode
self.encoding = encoding
self.backup = backup
self.file = None
self.backup_file = None
def __enter__(self):
"""进入上下文,打开文件"""
print(f"打开文件: {self.filename} (模式: {self.mode})")
# 如果是写模式且需要备份
if 'w' in self.mode and self.backup and os.path.exists(self.filename):
self.backup_file = f"{self.filename}.backup"
print(f"创建备份文件: {self.backup_file}")
with open(self.filename, 'r', encoding=self.encoding) as src:
with open(self.backup_file, 'w', encoding=self.encoding) as dst:
dst.write(src.read())
try:
self.file = open(self.filename, self.mode, encoding=self.encoding)
return self.file
except Exception as e:
print(f"打开文件失败: {e}")
raise
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出上下文,关闭文件"""
if self.file:
print(f"关闭文件: {self.filename}")
self.file.close()
if exc_type is not None:
print(f"文件操作出现异常: {exc_type.__name__}: {exc_val}")
# 如果有备份文件,询问是否恢复
if self.backup_file and os.path.exists(self.backup_file):
print("检测到备份文件,可以考虑恢复")
return False # 不抑制异常
# 测试文件管理器
print("测试文件管理上下文管理器:")
# 创建测试文件
test_file = "test_context.txt"
with open(test_file, 'w', encoding='utf-8') as f:
f.write("原始内容\n这是测试文件\n")
# 使用文件管理器读取
print("\n读取文件:")
with FileManager(test_file, 'r') as f:
content = f.read()
print(f"文件内容:\n{content}")
# 使用文件管理器写入(带备份)
print("\n写入文件(带备份):")
with FileManager(test_file, 'w', backup=True) as f:
f.write("新的内容\n这是修改后的文件\n")
# 验证修改
print("\n验证修改:")
with FileManager(test_file, 'r') as f:
new_content = f.read()
print(f"修改后的内容:\n{new_content}")
# 检查备份文件
backup_file = f"{test_file}.backup"
if os.path.exists(backup_file):
print(f"\n备份文件内容:")
with open(backup_file, 'r', encoding='utf-8') as f:
backup_content = f.read()
print(backup_content)
# 清理测试文件
os.remove(test_file)
if os.path.exists(backup_file):
os.remove(backup_file)
输出结果:
测试文件管理上下文管理器:
读取文件:
打开文件: test_context.txt (模式: r)
文件内容:
原始内容
这是测试文件
关闭文件: test_context.txt
写入文件(带备份):
打开文件: test_context.txt (模式: w)
创建备份文件: test_context.txt.backup
关闭文件: test_context.txt
验证修改:
打开文件: test_context.txt (模式: r)
修改后的内容:
新的内容
这是修改后的文件
关闭文件: test_context.txt
备份文件内容:
原始内容
这是测试文件
10.7.4 使用@contextmanager装饰器¶
使用contextlib.contextmanager装饰器简化上下文管理器的创建:
# 使用@contextmanager装饰器
from contextlib import contextmanager
import sqlite3
import tempfile
import os
@contextmanager
def database_transaction(db_path):
"""数据库事务上下文管理器"""
print(f"连接数据库: {db_path}")
conn = sqlite3.connect(db_path)
try:
print("开始事务")
conn.execute("BEGIN")
yield conn
print("提交事务")
conn.commit()
except Exception as e:
print(f"事务回滚: {e}")
conn.rollback()
raise
finally:
print("关闭数据库连接")
conn.close()
@contextmanager
def temporary_directory(prefix="temp_"):
"""临时目录上下文管理器"""
temp_dir = tempfile.mkdtemp(prefix=prefix)
print(f"创建临时目录: {temp_dir}")
try:
yield temp_dir
finally:
# 清理临时目录
import shutil
print(f"清理临时目录: {temp_dir}")
shutil.rmtree(temp_dir, ignore_errors=True)
@contextmanager
def change_directory(new_dir):
"""临时改变工作目录"""
old_dir = os.getcwd()
print(f"当前目录: {old_dir}")
print(f"切换到目录: {new_dir}")
try:
os.chdir(new_dir)
yield new_dir
finally:
print(f"恢复到原目录: {old_dir}")
os.chdir(old_dir)
# 测试@contextmanager装饰器
print("测试@contextmanager装饰器:")
# 测试数据库事务
print("\n1. 数据库事务测试:")
db_file = "test.db"
try:
with database_transaction(db_file) as conn:
# 创建表
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE
)
""")
# 插入数据
conn.execute("INSERT INTO users (name, email) VALUES (?, ?)",
("张三", "zhangsan@example.com"))
conn.execute("INSERT INTO users (name, email) VALUES (?, ?)",
("李四", "lisi@example.com"))
# 查询数据
cursor = conn.execute("SELECT * FROM users")
users = cursor.fetchall()
print(f"插入的用户: {users}")
except Exception as e:
print(f"数据库操作失败: {e}")
finally:
if os.path.exists(db_file):
os.remove(db_file)
# 测试临时目录
print("\n2. 临时目录测试:")
with temporary_directory("my_temp_") as temp_dir:
print(f"在临时目录中工作: {temp_dir}")
# 在临时目录中创建文件
test_file = os.path.join(temp_dir, "test.txt")
with open(test_file, 'w') as f:
f.write("这是临时文件")
print(f"创建文件: {test_file}")
print(f"文件存在: {os.path.exists(test_file)}")
print(f"临时目录已清理,文件存在: {os.path.exists(test_file) if 'test_file' in locals() else 'N/A'}")
# 测试目录切换
print("\n3. 目录切换测试:")
with change_directory(os.path.expanduser("~")):
current = os.getcwd()
print(f"当前工作目录: {current}")
# 列出当前目录的一些文件
files = os.listdir('.')[:5] # 只显示前5个
print(f"目录中的文件(前5个): {files}")
print(f"最终工作目录: {os.getcwd()}")
输出结果:
测试@contextmanager装饰器:
1. 数据库事务测试:
连接数据库: test.db
开始事务
插入的用户: [(1, '张三', 'zhangsan@example.com'), (2, '李四', 'lisi@example.com')]
提交事务
关闭数据库连接
2. 临时目录测试:
创建临时目录: C:\Users\...\temp_abc123
在临时目录中工作: C:\Users\...\temp_abc123
创建文件: C:\Users\...\temp_abc123\test.txt
文件存在: True
清理临时目录: C:\Users\...\temp_abc123
临时目录已清理,文件存在: False
3. 目录切换测试:
当前目录: C:\Users\yeyupiaoling/Python从入门到精通
切换到目录: C:\Users\yeyupiaoling
当前工作目录: C:\Users\yeyupiaoling
目录中的文件(前5个): ['Desktop', 'Documents', 'Downloads', 'Music', 'Pictures']
恢复到原目录: C:\Users\yeyupiaoling/Python从入门到精通
最终工作目录: C:\Users\yeyupiaoling/Python从入门到精通
10.7.5 contextlib模块的其他工具¶
contextlib模块提供了多个有用的上下文管理器:
# contextlib模块的其他工具
from contextlib import suppress, ExitStack, closing
import warnings
# 1. suppress - 抑制指定异常
print("1. suppress - 抑制异常:")
# 不使用suppress的情况
print("不使用suppress:")
try:
int("not_a_number")
except ValueError:
print("捕获到ValueError")
# 使用suppress
print("\n使用suppress:")
with suppress(ValueError):
result = int("not_a_number")
print("这行不会执行")
print("继续执行后续代码")
# suppress多个异常类型
print("\nsuppress多个异常:")
with suppress(ValueError, TypeError, KeyError):
data = {"key": "value"}
result = data["nonexistent_key"] + 10
print("异常被抑制,继续执行")
# 2. ExitStack - 管理多个上下文管理器
print("\n2. ExitStack - 管理多个上下文管理器:")
@contextmanager
def managed_resource(name):
"""模拟资源管理"""
print(f"获取资源: {name}")
try:
yield name
finally:
print(f"释放资源: {name}")
# 使用ExitStack管理多个资源
with ExitStack() as stack:
# 动态添加上下文管理器
resource1 = stack.enter_context(managed_resource("数据库连接"))
resource2 = stack.enter_context(managed_resource("文件句柄"))
resource3 = stack.enter_context(managed_resource("网络连接"))
print(f"使用资源: {resource1}, {resource2}, {resource3}")
# 可以根据条件添加更多资源
if True: # 某个条件
resource4 = stack.enter_context(managed_resource("缓存连接"))
print(f"额外资源: {resource4}")
print("所有资源已自动释放")
# 3. 嵌套上下文管理器
print("\n3. 嵌套上下文管理器:")
@contextmanager
def logging_context(operation):
"""日志记录上下文"""
print(f"[开始] {operation}")
start_time = time.time()
try:
yield
except Exception as e:
print(f"[错误] {operation}: {e}")
raise
finally:
end_time = time.time()
print(f"[结束] {operation} (耗时: {end_time - start_time:.3f}秒)")
# 嵌套使用多个上下文管理器
with logging_context("数据处理流程"):
with logging_context("数据加载"):
time.sleep(0.1)
print("加载数据完成")
with logging_context("数据转换"):
time.sleep(0.05)
print("转换数据完成")
with logging_context("数据保存"):
time.sleep(0.08)
print("保存数据完成")
# 4. 条件上下文管理器
print("\n4. 条件上下文管理器:")
@contextmanager
def conditional_context(condition, context_manager):
"""条件上下文管理器"""
if condition:
with context_manager as value:
yield value
else:
yield None
# 使用条件上下文管理器
for use_timer in [True, False]:
print(f"\n使用计时器: {use_timer}")
with conditional_context(use_timer, Timer("条件操作")) as timer:
time.sleep(0.05)
print("执行一些操作")
if timer:
print(f"操作耗时: {timer.elapsed_time}秒")
else:
print("未使用计时器")
输出结果:
1. suppress - 抑制异常:
不使用suppress:
捕获到ValueError
使用suppress:
继续执行后续代码
suppress多个异常:
异常被抑制,继续执行
2. ExitStack - 管理多个上下文管理器:
获取资源: 数据库连接
获取资源: 文件句柄
获取资源: 网络连接
使用资源: 数据库连接, 文件句柄, 网络连接
获取资源: 缓存连接
额外资源: 缓存连接
释放资源: 缓存连接
释放资源: 网络连接
释放资源: 文件句柄
释放资源: 数据库连接
所有资源已自动释放
3. 嵌套上下文管理器:
[开始] 数据处理流程
[开始] 数据加载
加载数据完成
[结束] 数据加载 (耗时: 0.101秒)
[开始] 数据转换
转换数据完成
[结束] 数据转换 (耗时: 0.051秒)
[开始] 数据保存
保存数据完成
[结束] 数据保存 (耗时: 0.081秒)
[结束] 数据处理流程 (耗时: 0.235秒)
4. 条件上下文管理器:
使用计时器: True
开始执行 条件操作...
执行一些操作
条件操作 执行完成,耗时: 0.0512秒
操作耗时: 0.0512秒
使用计时器: False
执行一些操作
未使用计时器
10.7.6 上下文管理器的应用场景¶
上下文管理器在以下场景中特别有用:
- 资源管理:文件、数据库连接、网络连接等
- 状态管理:临时改变系统状态,如工作目录、环境变量
- 异常处理:确保在异常情况下也能正确清理资源
- 性能监控:测量代码块的执行时间
- 事务管理:数据库事务的提交和回滚
- 锁管理:线程同步中的锁获取和释放
- 临时配置:临时修改配置参数
- 日志记录:为代码块添加结构化日志
上下文管理器是Python中实现”获取资源,使用资源,释放资源”模式的标准方法,它确保了资源的正确管理,提高了代码的健壮性和可维护性。
10.8 章节总结¶
本章深入探讨了Python的高级特性,这些特性是Python语言强大和优雅的重要体现:
核心特性回顾¶
- 列表推导式:提供了简洁的列表创建方式,相比传统循环更加Pythonic,性能更优
- 字典和集合推导式:扩展了推导式的概念,支持字典和集合的快速构建
- 生成器:实现惰性求值,节省内存,支持无限序列和管道处理
- 迭代器:提供统一的遍历接口,支持自定义迭代行为
- 装饰器:实现横切关注点的分离,如日志、缓存、权限检查等
- 闭包:支持函数式编程,实现数据封装和状态保持
- 上下文管理器:确保资源的正确管理,提供异常安全的代码结构
最佳实践建议¶
-
合理使用推导式:
- 优先使用推导式而非传统循环
- 避免过度复杂的嵌套推导式
- 考虑代码可读性和维护性 -
善用生成器:
- 处理大数据集时优先考虑生成器
- 利用生成器实现管道处理
- 合理使用yield和send方法 -
装饰器设计原则:
- 保持装饰器的单一职责
- 使用functools.wraps保持函数元信息
- 考虑装饰器的组合和顺序 -
闭包应用场景:
- 实现函数工厂和配置化函数
- 避免闭包陷阱,注意变量绑定时机
- 合理使用nonlocal关键字 -
上下文管理器使用:
- 优先使用with语句管理资源
- 自定义上下文管理器时确保异常安全
- 利用contextlib模块简化实现
性能考虑¶
- 推导式 vs 循环:推导式通常性能更好,但差异不大
- 生成器 vs 列表:生成器在内存使用上有显著优势
- 装饰器开销:装饰器会增加函数调用开销,但通常可以忽略
- 闭包性能:闭包访问外部变量比局部变量稍慢
学习建议¶
- 循序渐进:从简单的推导式开始,逐步掌握复杂特性
- 实践应用:在实际项目中应用这些特性,加深理解
- 阅读源码:研究优秀开源项目中这些特性的使用
- 性能测试:在关键代码路径上进行性能测试和优化
掌握这些Python高级特性将显著提升你的编程水平,让你能够编写更加优雅、高效和Pythonic的代码。这些特性不仅是语法糖,更是Python哲学”优雅胜于丑陋,简洁胜于复杂”的具体体现。