第10章 Python高级特性

Python作为一门现代化的编程语言,提供了许多高级特性来帮助开发者编写更加简洁、高效和优雅的代码。本章将深入探讨Python的核心高级特性,包括推导式、生成器、迭代器、装饰器、闭包和上下文管理器等。这些特性不仅能提高代码的可读性和性能,还体现了Python”优雅胜于丑陋”的设计哲学。

本系列文章所使用到的示例源码:Python从入门到精通示例代码

10.1 列表推导式

列表推导式(List Comprehension)是Python中最受欢迎的特性之一,它提供了一种简洁而强大的方式来创建列表。列表推导式不仅代码更加简洁,在很多情况下性能也优于传统的循环方式。

10.1.1 列表推导式的概念

列表推导式是一种基于现有可迭代对象创建新列表的简洁语法。它将循环和条件判断融合在一个表达式中,体现了函数式编程的思想。相比传统的for循环,列表推导式具有以下优势:

  • 简洁性:一行代码完成多行循环的功能
  • 可读性:表达式更接近自然语言的描述
  • 性能:在C语言层面优化,执行效率更高
  • 函数式风格:避免了显式的循环变量管理

10.1.2 基本语法结构

列表推导式的基本语法结构为:[expression for item in iterable]

# 基本列表推导式示例
numbers = [1, 2, 3, 4, 5]
squares = [x**2 for x in numbers]
print(f"原始列表: {numbers}")
print(f"平方列表: {squares}")

输出结果:

原始列表: [1, 2, 3, 4, 5]
平方列表: [1, 4, 9, 16, 25]

在这个例子中:
- x**2 是表达式部分,定义了如何转换每个元素
- for x in numbers 是迭代部分,定义了数据源和迭代变量
- 整个表达式返回一个新的列表

10.1.3 条件过滤

列表推导式可以包含条件判断,语法为:[expression for item in iterable if condition]

# 条件过滤示例
numbers = [1, 2, 3, 4, 5]
even_squares = [x**2 for x in numbers if x % 2 == 0]
print(f"偶数的平方: {even_squares}")

# 字符串处理
words = ['hello', 'world', 'python', 'programming']
capitalized = [word.capitalize() for word in words if len(word) > 5]
print(f"长度大于5的单词首字母大写: {capitalized}")

输出结果:

偶数的平方: [4, 16]
长度大于5的单词首字母大写: ['Python', 'Programming']

10.1.4 嵌套列表推导式

对于二维列表或更复杂的数据结构,可以使用嵌套的列表推导式:

# 嵌套列表推导式
matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
flattened = [num for row in matrix for num in row]
print(f"二维矩阵: {matrix}")
print(f"展平后: {flattened}")

输出结果:

二维矩阵: [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
展平后: [1, 2, 3, 4, 5, 6, 7, 8, 9]

10.1.5 性能对比

列表推导式通常比传统循环具有更好的性能:

import time

# 使用列表推导式
start_time = time.time()
list_comp_result = [x**2 for x in range(10000)]
list_comp_time = time.time() - start_time

# 使用传统循环
start_time = time.time()
loop_result = []
for x in range(10000):
    loop_result.append(x**2)
loop_time = time.time() - start_time

print(f"列表推导式耗时: {list_comp_time:.6f}秒")
print(f"传统循环耗时: {loop_time:.6f}秒")

输出结果:

列表推导式耗时: 0.001000
传统循环耗时: 0.000997

虽然在这个简单例子中性能差异不大,但在更复杂的场景中,列表推导式通常表现更好。

10.2 字典推导式与集合推导式

除了列表推导式,Python还提供了字典推导式和集合推导式,它们遵循相似的语法规则,但用于创建不同类型的数据结构。这些推导式同样具有简洁、高效的特点。

10.2.1 字典推导式的语法

字典推导式的基本语法为:{key: value for item in iterable}

# 基本字典推导式
numbers = [1, 2, 3, 4, 5]
square_dict = {x: x**2 for x in numbers}
print(f"数字及其平方的字典: {square_dict}")

# 条件过滤
even_square_dict = {x: x**2 for x in numbers if x % 2 == 0}
print(f"偶数及其平方的字典: {even_square_dict}")

输出结果:

数字及其平方的字典: {1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
偶数及其平方的字典: {2: 4, 4: 16}

10.2.2 字符串处理与键值互换

字典推导式在字符串处理和数据转换中非常有用:

# 字符串处理
words = ['hello', 'world', 'python']
word_lengths = {word: len(word) for word in words}
print(f"单词长度字典: {word_lengths}")

# 字典键值互换
original_dict = {'a': 1, 'b': 2, 'c': 3}
swapped_dict = {v: k for k, v in original_dict.items()}
print(f"原字典: {original_dict}")
print(f"键值互换后: {swapped_dict}")

输出结果:

单词长度字典: {'hello': 5, 'world': 5, 'python': 6}
原字典: {'a': 1, 'b': 2, 'c': 3}
键值互换后: {1: 'a', 2: 'b', 3: 'c'}

10.2.3 集合推导式的语法

集合推导式的语法为:{expression for item in iterable},它会自动去除重复元素:

# 基本集合推导式
numbers = [1, 2, 2, 3, 3, 4, 5]
unique_squares = {x**2 for x in numbers}
print(f"原列表: {numbers}")
print(f"去重后的平方集合: {unique_squares}")

# 条件过滤
even_set = {x for x in numbers if x % 2 == 0}
print(f"偶数集合: {even_set}")

输出结果:

原列表: [1, 2, 2, 3, 3, 4, 5]
去重后的平方集合: {1, 4, 9, 16, 25}
偶数集合: {2, 4}

10.2.4 集合运算与嵌套推导式

集合推导式可以与集合运算结合,处理更复杂的逻辑:

# 集合运算
set1 = {1, 2, 3, 4, 5}
set2 = {4, 5, 6, 7, 8}
intersection_squares = {x**2 for x in set1 if x in set2}
print(f"集合1: {set1}")
print(f"集合2: {set2}")
print(f"交集元素的平方: {intersection_squares}")

# 嵌套推导式
matrix = [[1, 2], [3, 4], [5, 6]]
flattened_set = {num for row in matrix for num in row if num % 2 == 0}
print(f"矩阵: {matrix}")
print(f"偶数元素集合: {flattened_set}")

输出结果:

集合1: {1, 2, 3, 4, 5}
集合2: {4, 5, 6, 7, 8}
交集元素的平方: {16, 25}
矩阵: [[1, 2], [3, 4], [5, 6]]
偶数元素集合: {2, 4, 6}

10.2.5 实际应用场景

推导式在数据处理、统计分析等场景中非常实用:

# 数据统计
scores = [85, 92, 78, 96, 88, 76, 94, 89]
grade_distribution = {
    'A': len([s for s in scores if s >= 90]),
    'B': len([s for s in scores if 80 <= s < 90]),
    'C': len([s for s in scores if 70 <= s < 80]),
    'D': len([s for s in scores if s < 70])
}
print(f"成绩分布: {grade_distribution}")

# 配置处理
config_list = [('debug', 'true'), ('port', '8080'), ('host', 'localhost')]
config_dict = {k: v for k, v in config_list}
print(f"配置字典: {config_dict}")

输出结果:

成绩分布: {'A': 3, 'B': 3, 'C': 1, 'D': 1}
配置字典: {'debug': 'true', 'port': '8080', 'host': 'localhost'}

10.2.6 推导式的限制与最佳实践

虽然推导式功能强大,但也需要注意其使用边界:

# 可读性边界示例
# 好的推导式:简洁明了
even_squares = [x**2 for x in range(10) if x % 2 == 0]

# 不推荐的推导式:过于复杂
# complex_result = [func(x) for x in data if condition1(x) and condition2(x) for y in other_data if related(x, y)]

# 对于复杂逻辑,使用传统循环更清晰
complex_result = []
for x in data:
    if condition1(x) and condition2(x):
        for y in other_data:
            if related(x, y):
                complex_result.append(func(x))

最佳实践原则:
- 保持推导式的简洁性,一行代码应该易于理解
- 复杂的条件判断或多层嵌套时考虑使用传统循环
- 优先考虑代码的可读性而非简洁性
- 在性能敏感的场景中,推导式通常是更好的选择

10.3 生成器(Generator)

生成器是Python中一个非常重要的特性,它提供了一种优雅的方式来创建迭代器。生成器采用惰性求值的策略,只在需要时才计算值,这使得它在处理大量数据或无限序列时具有显著的内存优势。

10.3.1 生成器的概念

生成器具有以下核心特性:
- 惰性求值:只在需要时才计算下一个值
- 内存效率:不需要将所有值同时存储在内存中
- 迭代器协议:自动实现了__iter__()__next__()方法
- 状态保持:能够记住函数执行的状态

10.3.2 生成器函数

生成器函数使用yield关键字来产生值,而不是return

# 基本生成器函数
def simple_generator():
    print("开始生成")
    yield 1
    print("生成第二个值")
    yield 2
    print("生成第三个值")
    yield 3
    print("生成结束")

# 使用生成器
gen = simple_generator()
print("创建生成器对象")
for value in gen:
    print(f"获得值: {value}")

输出结果:

创建生成器对象
开始生成
获得值: 1
生成第二个值
获得值: 2
生成第三个值
获得值: 3
生成结束

10.3.3 斐波那契数列生成器

生成器特别适合生成数学序列:

# 斐波那契数列生成器
def fibonacci_generator(n):
    """生成前n个斐波那契数"""
    a, b = 0, 1
    count = 0
    while count < n:
        yield a
        a, b = b, a + b
        count += 1

# 使用斐波那契生成器
print("前10个斐波那契数:")
for num in fibonacci_generator(10):
    print(num, end=" ")
print()

输出结果:

10个斐波那契数:
0 1 1 2 3 5 8 13 21 34

10.3.4 生成器表达式

生成器表达式提供了一种更简洁的创建生成器的方式:

# 生成器表达式
squares_gen = (x**2 for x in range(5))
print(f"生成器对象: {squares_gen}")
print("生成器产生的值:")
for square in squares_gen:
    print(square, end=" ")
print()

输出结果:

生成器对象: <generator object <genexpr> at 0x...>
生成器产生的值:
0 1 4 9 16

10.3.5 内存使用对比

生成器相比列表推导式具有显著的内存优势:

import sys

# 列表推导式
list_comp = [x**2 for x in range(1000)]
list_size = sys.getsizeof(list_comp)

# 生成器表达式
gen_expr = (x**2 for x in range(1000))
gen_size = sys.getsizeof(gen_expr)

print(f"列表推导式内存使用: {list_size} 字节")
print(f"生成器表达式内存使用: {gen_size} 字节")
print(f"内存节省: {list_size - gen_size} 字节")
print(f"节省比例: {(list_size - gen_size) / list_size * 100:.1f}%")

输出结果:

列表推导式内存使用: 8856 字节
生成器表达式内存使用: 104 字节
内存节省: 8752 字节
节省比例: 98.8%

10.3.6 无限序列生成器

生成器可以创建理论上无限的序列:

# 无限计数器
def infinite_counter(start=0, step=1):
    """无限计数生成器"""
    current = start
    while True:
        yield current
        current += step

# 使用无限生成器(注意:需要手动停止)
counter = infinite_counter(1, 2)
print("前10个奇数:")
for i, num in enumerate(counter):
    if i >= 10:
        break
    print(num, end=" ")
print()

输出结果:

10个奇数:
1 3 5 7 9 11 13 15 17 19

10.3.7 生成器的send方法

生成器支持双向通信,可以通过send()方法向生成器发送值:

# 支持send的生成器
def echo_generator():
    """回声生成器,可以接收外部发送的值"""
    value = None
    while True:
        received = yield value
        if received is not None:
            value = f"Echo: {received}"
        else:
            value = "等待输入..."

# 使用send方法
echo_gen = echo_generator()
print(next(echo_gen))  # 启动生成器
print(echo_gen.send("Hello"))
print(echo_gen.send("World"))
print(next(echo_gen))

输出结果:

None
Echo: Hello
Echo: World
等待输入...

10.3.8 管道处理示例

生成器非常适合构建数据处理管道:

# 数据处理管道
def read_numbers():
    """模拟读取数字数据"""
    numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    for num in numbers:
        print(f"读取: {num}")
        yield num

def filter_even(numbers):
    """过滤偶数"""
    for num in numbers:
        if num % 2 == 0:
            print(f"过滤得到偶数: {num}")
            yield num

def square_numbers(numbers):
    """计算平方"""
    for num in numbers:
        result = num ** 2
        print(f"计算平方: {num}^2 = {result}")
        yield result

# 构建处理管道
pipeline = square_numbers(filter_even(read_numbers()))
print("开始处理管道:")
results = list(pipeline)
print(f"最终结果: {results}")

输出结果:

开始处理管道:
读取: 1
读取: 2
过滤得到偶数: 2
计算平方: 2^2 = 4
读取: 3
读取: 4
过滤得到偶数: 4
计算平方: 4^2 = 16
读取: 5
读取: 6
过滤得到偶数: 6
计算平方: 6^2 = 36
读取: 7
读取: 8
过滤得到偶数: 8
计算平方: 8^2 = 64
读取: 9
读取: 10
过滤得到偶数: 10
计算平方: 10^2 = 100
最终结果: [4, 16, 36, 64, 100]

10.3.9 生成器的应用场景

生成器在以下场景中特别有用:

  1. 大数据处理:逐行处理大文件而不需要将整个文件加载到内存
  2. 无限序列:生成数学序列、随机数序列等
  3. 管道处理:构建数据处理流水线
  4. 内存优化:在内存受限的环境中处理大量数据
  5. 协程基础:为异步编程提供基础支持

生成器是Python中实现高效、优雅代码的重要工具,掌握其使用方法对于编写高质量的Python程序至关重要。

10.4 迭代器(Iterator)

迭代器是Python中实现迭代协议的对象,它提供了一种统一的方式来遍历容器中的元素。理解迭代器的工作原理对于掌握Python的高级特性至关重要。

10.4.1 迭代器协议

迭代器协议包含两个核心方法:
- __iter__():返回迭代器对象本身
- __next__():返回下一个值,没有更多值时抛出StopIteration异常

# 自定义迭代器类
class Counter:
    """简单的计数器迭代器"""
    def __init__(self, start, end):
        self.current = start
        self.end = end

    def __iter__(self):
        return self

    def __next__(self):
        if self.current < self.end:
            num = self.current
            self.current += 1
            return num
        else:
            raise StopIteration

# 使用自定义迭代器
print("使用自定义计数器迭代器:")
counter = Counter(1, 5)
for num in counter:
    print(f"计数: {num}")

输出结果:

使用自定义计数器迭代器:
计数: 1
计数: 2
计数: 3
计数: 4

10.4.2 可迭代对象与迭代器的区别

可迭代对象和迭代器是两个不同的概念:

# 演示可迭代对象与迭代器的区别
class NumberList:
    """可迭代对象:包含数据,但不直接实现迭代逻辑"""
    def __init__(self, numbers):
        self.numbers = numbers

    def __iter__(self):
        return NumberIterator(self.numbers)

class NumberIterator:
    """迭代器:实现具体的迭代逻辑"""
    def __init__(self, numbers):
        self.numbers = numbers
        self.index = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.index < len(self.numbers):
            value = self.numbers[self.index]
            self.index += 1
            return value
        else:
            raise StopIteration

# 使用可迭代对象
number_list = NumberList([10, 20, 30, 40])
print("第一次迭代:")
for num in number_list:
    print(f"数字: {num}")

print("第二次迭代:")
for num in number_list:
    print(f"数字: {num}")

# 直接使用迭代器
print("直接使用迭代器:")
iterator = NumberIterator([1, 2, 3])
print(f"迭代器类型: {type(iterator)}")
print(f"是否为迭代器: {hasattr(iterator, '__next__')}")
print(f"是否为可迭代对象: {hasattr(iterator, '__iter__')}")

输出结果:

第一次迭代:
数字: 10
数字: 20
数字: 30
数字: 40
第二次迭代:
数字: 10
数字: 20
数字: 30
数字: 40
直接使用迭代器:
迭代器类型: <class '__main__.NumberIterator'>
是否为迭代器: True
是否为可迭代对象: True

10.4.3 斐波那契迭代器

实现一个更复杂的迭代器示例:

# 斐波那契迭代器
class FibonacciIterator:
    """斐波那契数列迭代器"""
    def __init__(self, max_count):
        self.max_count = max_count
        self.count = 0
        self.current = 0
        self.next_val = 1

    def __iter__(self):
        return self

    def __next__(self):
        if self.count < self.max_count:
            result = self.current
            self.current, self.next_val = self.next_val, self.current + self.next_val
            self.count += 1
            return result
        else:
            raise StopIteration

# 使用斐波那契迭代器
print("斐波那契数列迭代器:")
fib_iter = FibonacciIterator(8)
for fib_num in fib_iter:
    print(fib_num, end=" ")
print()

输出结果:

斐波那契数列迭代器:
0 1 1 2 3 5 8 13

10.4.4 内置迭代器工具

Python提供了许多内置的迭代器工具:

# enumerate() - 添加索引
fruits = ['apple', 'banana', 'orange']
print("enumerate示例:")
for index, fruit in enumerate(fruits, start=1):
    print(f"{index}. {fruit}")

# zip() - 并行迭代
names = ['Alice', 'Bob', 'Charlie']
ages = [25, 30, 35]
print("\nzip示例:")
for name, age in zip(names, ages):
    print(f"{name} is {age} years old")

# reversed() - 反向迭代
numbers = [1, 2, 3, 4, 5]
print("\nreversed示例:")
for num in reversed(numbers):
    print(num, end=" ")
print()

输出结果:

enumerate示例:
1. apple
2. banana
3. orange

zip示例:
Alice is 25 years old
Bob is 30 years old
Charlie is 35 years old

reversed示例:
5 4 3 2 1

10.4.5 迭代器的状态管理

迭代器会保持其内部状态,这意味着它们只能被消费一次:

# 迭代器状态管理示例
class StatefulIterator:
    """展示迭代器状态管理的示例"""
    def __init__(self, data):
        self.data = data
        self.index = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.index < len(self.data):
            value = self.data[self.index]
            self.index += 1
            print(f"返回索引 {self.index-1} 的值: {value}")
            return value
        else:
            print("迭代器已耗尽")
            raise StopIteration

# 演示迭代器状态
print("迭代器状态管理:")
iterator = StatefulIterator(['a', 'b', 'c'])

print("第一次迭代:")
for item in iterator:
    pass

print("第二次迭代:")
for item in iterator:
    print(f"获得: {item}")
else:
    print("第二次迭代没有产生任何值")

输出结果:

迭代器状态管理:
第一次迭代:
返回索引 0 的值: a
返回索引 1 的值: b
返回索引 2 的值: c
第二次迭代:
迭代器已耗尽
第二次迭代没有产生任何值

10.4.6 迭代器链

可以将多个迭代器连接起来形成迭代器链:

# 迭代器链示例
class ChainIterator:
    """连接多个可迭代对象的迭代器"""
    def __init__(self, *iterables):
        self.iterables = iterables
        self.current_iterable = 0
        self.current_iterator = iter(self.iterables[0]) if self.iterables else iter([])

    def __iter__(self):
        return self

    def __next__(self):
        while self.current_iterable < len(self.iterables):
            try:
                return next(self.current_iterator)
            except StopIteration:
                self.current_iterable += 1
                if self.current_iterable < len(self.iterables):
                    self.current_iterator = iter(self.iterables[self.current_iterable])
                else:
                    raise StopIteration
        raise StopIteration

# 使用迭代器链
print("迭代器链示例:")
chain = ChainIterator([1, 2, 3], ['a', 'b'], [10, 20])
for item in chain:
    print(item, end=" ")
print()

输出结果:

迭代器链示例:
1 2 3 a b 10 20

10.4.7 itertools模块简介

itertools模块提供了许多有用的迭代器工具:

import itertools

# count() - 无限计数
print("itertools.count示例:")
counter = itertools.count(10, 2)  # 从10开始,步长为2
for i, num in enumerate(counter):
    if i >= 5:
        break
    print(num, end=" ")
print()

# cycle() - 循环迭代
print("\nitertools.cycle示例:")
cycler = itertools.cycle(['A', 'B', 'C'])
for i, letter in enumerate(cycler):
    if i >= 8:
        break
    print(letter, end=" ")
print()

# chain() - 连接迭代器
print("\nitertools.chain示例:")
chained = itertools.chain([1, 2, 3], ['x', 'y'], [10, 20])
for item in chained:
    print(item, end=" ")
print()

输出结果:

itertools.count示例:
10 12 14 16 18

itertools.cycle示例:
A B C A B C A B

itertools.chain示例:
1 2 3 x y 10 20

10.4.8 迭代器的应用场景

迭代器在以下场景中特别有用:

  1. 内存效率:处理大量数据时避免一次性加载到内存
  2. 惰性求值:只在需要时才计算值
  3. 无限序列:表示理论上无限的数据序列
  4. 状态保持:在迭代过程中维护复杂的状态信息
  5. 协议统一:为不同类型的容器提供统一的遍历接口

迭代器是Python中一个基础而重要的概念,它为许多高级特性(如生成器、推导式等)提供了理论基础。

10.5 装饰器(Decorator)

装饰器是Python中一个强大而优雅的特性,它允许我们在不修改原函数代码的情况下,为函数添加额外的功能。装饰器体现了”开放-封闭原则”,即对扩展开放,对修改封闭。

10.5.1 装饰器的概念

装饰器基于以下Python特性:
- 函数是一等公民:函数可以作为参数传递和返回值
- 高阶函数:接受函数作为参数或返回函数的函数
- 闭包:内部函数可以访问外部函数的变量
- 语法糖@decorator语法使代码更简洁

10.5.2 简单装饰器

最基本的装饰器实现:

# 简单装饰器示例
def my_decorator(func):
    """简单的装饰器函数"""
    def wrapper():
        print("装饰器:函数执行前")
        result = func()
        print("装饰器:函数执行后")
        return result
    return wrapper

# 使用装饰器
@my_decorator
def say_hello():
    """被装饰的函数"""
    print("Hello, World!")
    return "greeting"

# 调用被装饰的函数
print("调用装饰后的函数:")
result = say_hello()
print(f"函数返回值: {result}")

输出结果:

调用装饰后的函数:
装饰器:函数执行前
Hello, World!
装饰器:函数执行后
函数返回值: greeting

10.5.3 计时装饰器

实用的性能监控装饰器:

import time
import functools

def timing_decorator(func):
    """计时装饰器,测量函数执行时间"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        execution_time = end_time - start_time
        print(f"函数 {func.__name__} 执行时间: {execution_time:.4f}秒")
        return result
    return wrapper

@timing_decorator
def slow_function():
    """模拟耗时操作"""
    print("执行耗时操作...")
    time.sleep(0.1)  # 模拟0.1秒的操作
    return "操作完成"

# 测试计时装饰器
print("测试计时装饰器:")
result = slow_function()
print(f"结果: {result}")

输出结果:

测试计时装饰器:
执行耗时操作...
函数 slow_function 执行时间: 0.1001
结果: 操作完成

10.5.4 带参数的装饰器

装饰器工厂允许我们创建可配置的装饰器:

def repeat(times):
    """重复执行装饰器工厂"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            results = []
            for i in range(times):
                print(f"第 {i+1} 次执行:")
                result = func(*args, **kwargs)
                results.append(result)
            return results
        return wrapper
    return decorator

@repeat(3)
def greet(name):
    """问候函数"""
    message = f"Hello, {name}!"
    print(message)
    return message

# 测试带参数的装饰器
print("测试重复执行装饰器:")
results = greet("Alice")
print(f"所有结果: {results}")

输出结果:

测试重复执行装饰器:
 1 次执行:
Hello, Alice!
 2 次执行:
Hello, Alice!
 3 次执行:
Hello, Alice!
所有结果: ['Hello, Alice!', 'Hello, Alice!', 'Hello, Alice!']

10.5.5 日志装饰器

实用的日志记录装饰器:

import functools
from datetime import datetime

def log_calls(level="INFO"):
    """日志装饰器工厂"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            args_str = ", ".join(map(str, args))
            kwargs_str = ", ".join(f"{k}={v}" for k, v in kwargs.items())
            params = ", ".join(filter(None, [args_str, kwargs_str]))

            print(f"[{level}] {timestamp} - 调用函数 {func.__name__}({params})")

            try:
                result = func(*args, **kwargs)
                print(f"[{level}] {timestamp} - 函数 {func.__name__} 执行成功,返回: {result}")
                return result
            except Exception as e:
                print(f"[ERROR] {timestamp} - 函数 {func.__name__} 执行失败: {e}")
                raise
        return wrapper
    return decorator

@log_calls("DEBUG")
def calculate(a, b, operation="add"):
    """计算函数"""
    if operation == "add":
        return a + b
    elif operation == "multiply":
        return a * b
    else:
        raise ValueError(f"不支持的操作: {operation}")

# 测试日志装饰器
print("测试日志装饰器:")
result1 = calculate(5, 3)
result2 = calculate(4, 7, operation="multiply")
print(f"计算结果: {result1}, {result2}")

输出结果:

测试日志装饰器:
[DEBUG] 2024-01-15 10:30:45 - 调用函数 calculate(5, 3, operation=add)
[DEBUG] 2024-01-15 10:30:45 - 函数 calculate 执行成功返回: 8
[DEBUG] 2024-01-15 10:30:45 - 调用函数 calculate(4, 7, operation=multiply)
[DEBUG] 2024-01-15 10:30:45 - 函数 calculate 执行成功返回: 28
计算结果: 8, 28

10.5.6 类装饰器

使用类实现装饰器:

class CountCalls:
    """计数装饰器类"""
    def __init__(self, func):
        self.func = func
        self.count = 0
        functools.update_wrapper(self, func)

    def __call__(self, *args, **kwargs):
        self.count += 1
        print(f"函数 {self.func.__name__} 被调用第 {self.count} 次")
        return self.func(*args, **kwargs)

    def get_count(self):
        """获取调用次数"""
        return self.count

@CountCalls
def fibonacci(n):
    """计算斐波那契数"""
    if n <= 1:
        return n
    return fibonacci(n-1) + fibonacci(n-2)

# 测试类装饰器
print("测试类装饰器:")
result = fibonacci(5)
print(f"fibonacci(5) = {result}")
print(f"总调用次数: {fibonacci.get_count()}")

输出结果:

测试类装饰器:
函数 fibonacci 被调用第 1 
函数 fibonacci 被调用第 2 
函数 fibonacci 被调用第 3 
函数 fibonacci 被调用第 4 
函数 fibonacci 被调用第 5 
函数 fibonacci 被调用第 6 
函数 fibonacci 被调用第 7 
函数 fibonacci 被调用第 8 
函数 fibonacci 被调用第 9 
函数 fibonacci 被调用第 10 
函数 fibonacci 被调用第 11 
函数 fibonacci 被调用第 12 
函数 fibonacci 被调用第 13 
函数 fibonacci 被调用第 14 
函数 fibonacci 被调用第 15 
fibonacci(5) = 5
总调用次数: 15

10.5.7 缓存装饰器

实现简单的缓存机制:

def simple_cache(func):
    """简单的缓存装饰器"""
    cache = {}

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        # 创建缓存键
        key = str(args) + str(sorted(kwargs.items()))

        if key in cache:
            print(f"缓存命中: {func.__name__}{args}")
            return cache[key]

        print(f"计算结果: {func.__name__}{args}")
        result = func(*args, **kwargs)
        cache[key] = result
        return result

    # 添加清除缓存的方法
    wrapper.clear_cache = lambda: cache.clear()
    wrapper.cache_info = lambda: f"缓存大小: {len(cache)}"

    return wrapper

@simple_cache
def expensive_calculation(n):
    """模拟耗时计算"""
    time.sleep(0.1)  # 模拟计算时间
    return n * n * n

# 测试缓存装饰器
print("测试缓存装饰器:")
print(f"第一次调用: {expensive_calculation(5)}")
print(f"第二次调用: {expensive_calculation(5)}")
print(f"第三次调用: {expensive_calculation(3)}")
print(f"第四次调用: {expensive_calculation(5)}")
print(expensive_calculation.cache_info())

输出结果:

测试缓存装饰器:
计算结果: expensive_calculation(5,)
第一次调用: 125
缓存命中: expensive_calculation(5,)
第二次调用: 125
计算结果: expensive_calculation(3,)
第三次调用: 27
缓存命中: expensive_calculation(5,)
第四次调用: 125
缓存大小: 2

10.5.8 内置装饰器

Python提供了一些常用的内置装饰器:

class Person:
    """演示内置装饰器的类"""

    def __init__(self, name, age):
        self._name = name
        self._age = age

    @property
    def name(self):
        """name属性的getter"""
        print("获取name属性")
        return self._name

    @name.setter
    def name(self, value):
        """name属性的setter"""
        print(f"设置name属性为: {value}")
        if not isinstance(value, str):
            raise TypeError("姓名必须是字符串")
        self._name = value

    @staticmethod
    def is_adult(age):
        """静态方法:判断是否成年"""
        return age >= 18

    @classmethod
    def from_string(cls, person_str):
        """类方法:从字符串创建Person对象"""
        name, age = person_str.split('-')
        return cls(name, int(age))

    def __str__(self):
        return f"Person(name={self.name}, age={self._age})"

# 测试内置装饰器
print("测试内置装饰器:")

# 测试@property
person = Person("Alice", 25)
print(f"获取姓名: {person.name}")
person.name = "Bob"
print(f"修改后: {person}")

# 测试@staticmethod
print(f"18岁是否成年: {Person.is_adult(18)}")
print(f"16岁是否成年: {Person.is_adult(16)}")

# 测试@classmethod
person2 = Person.from_string("Charlie-30")
print(f"从字符串创建: {person2}")

输出结果:

测试内置装饰器:
获取name属性
获取姓名: Alice
设置name属性为: Bob
获取name属性
修改后: Person(name=Bob, age=25)
18岁是否成年: True
16岁是否成年: False
获取name属性
从字符串创建: Person(name=Charlie, age=30)

10.5.9 装饰器的应用场景

装饰器在以下场景中特别有用:

  1. 日志记录:自动记录函数调用信息
  2. 性能监控:测量函数执行时间和资源使用
  3. 权限检查:验证用户权限后再执行函数
  4. 缓存机制:缓存函数结果以提高性能
  5. 重试机制:在函数失败时自动重试
  6. 参数验证:检查函数参数的有效性
  7. 事务管理:自动处理数据库事务
  8. API限流:控制函数调用频率

装饰器是Python中实现横切关注点(Cross-cutting Concerns)的优雅方式,它让我们能够以声明式的方式为函数添加功能,保持代码的清洁和可维护性。

10.6 闭包(Closure)

闭包是函数式编程中的一个重要概念,它允许内部函数访问外部函数的变量,即使外部函数已经执行完毕。闭包在Python中有着广泛的应用,特别是在装饰器、回调函数和函数工厂等场景中。

10.6.1 闭包的概念

闭包具有以下核心特征:
- 嵌套函数:函数内部定义另一个函数
- 自由变量:内部函数引用外部函数的变量
- 词法作用域:变量的作用域由代码结构决定
- 变量持久化:外部函数执行完毕后,变量仍然存在

10.6.2 基本闭包示例

最简单的闭包实现:

# 基本闭包示例
def outer_function(x):
    """外部函数"""
    print(f"外部函数接收参数: {x}")

    def inner_function(y):
        """内部函数,形成闭包"""
        print(f"内部函数接收参数: {y}")
        print(f"访问外部变量: {x}")
        return x + y

    print("外部函数返回内部函数")
    return inner_function

# 创建闭包
print("创建闭包:")
closure = outer_function(10)
print(f"闭包对象: {closure}")
print(f"闭包类型: {type(closure)}")

# 调用闭包
print("\n调用闭包:")
result = closure(5)
print(f"结果: {result}")

输出结果:

创建闭包:
外部函数接收参数: 10
外部函数返回内部函数
闭包对象: <function outer_function.<locals>.inner_function at 0x...>
闭包类型: <class 'function'>

调用闭包:
内部函数接收参数: 5
访问外部变量: 10
结果: 15

10.6.3 闭包变量的持久化

闭包能够保持外部函数变量的状态:

# 闭包变量持久化示例
def create_counter(initial_value=0):
    """创建计数器闭包"""
    count = initial_value

    def counter():
        nonlocal count
        count += 1
        return count

    return counter

# 创建多个独立的计数器
print("创建独立的计数器:")
counter1 = create_counter(0)
counter2 = create_counter(100)

print("counter1的调用:")
for i in range(3):
    print(f"counter1(): {counter1()}")

print("\ncounter2的调用:")
for i in range(3):
    print(f"counter2(): {counter2()}")

print("\n再次调用counter1:")
print(f"counter1(): {counter1()}")

输出结果:

创建独立的计数器:
counter1的调用:
counter1(): 1
counter1(): 2
counter1(): 3

counter2的调用:
counter2(): 101
counter2(): 102
counter2(): 103

再次调用counter1:
counter1(): 4

10.6.4 闭包实现装饰器

闭包是实现装饰器的基础:

# 使用闭包实现装饰器
def create_multiplier_decorator(factor):
    """创建乘法装饰器的工厂函数"""
    print(f"创建乘法因子为 {factor} 的装饰器")

    def decorator(func):
        print(f"装饰函数 {func.__name__}")

        def wrapper(*args, **kwargs):
            result = func(*args, **kwargs)
            multiplied_result = result * factor
            print(f"原始结果: {result}, 乘以 {factor} 后: {multiplied_result}")
            return multiplied_result

        return wrapper
    return decorator

# 使用闭包装饰器
@create_multiplier_decorator(3)
def calculate_area(length, width):
    """计算面积"""
    area = length * width
    print(f"计算面积: {length} × {width} = {area}")
    return area

# 测试闭包装饰器
print("\n测试闭包装饰器:")
result = calculate_area(4, 5)
print(f"最终结果: {result}")

输出结果:

创建乘法因子为 3 的装饰器
装饰函数 calculate_area

测试闭包装饰器:
计算面积: 4 × 5 = 20
原始结果: 20, 乘以 3 后: 60
最终结果: 60

10.6.5 函数工厂

闭包可以用来创建具有特定行为的函数:

# 函数工厂示例
def create_validator(min_value, max_value):
    """创建数值验证器"""
    def validator(value):
        if not isinstance(value, (int, float)):
            return False, f"值必须是数字,得到: {type(value).__name__}"

        if value < min_value:
            return False, f"值 {value} 小于最小值 {min_value}"

        if value > max_value:
            return False, f"值 {value} 大于最大值 {max_value}"

        return True, f"值 {value} 有效"

    # 添加验证器的描述信息
    validator.description = f"验证范围: [{min_value}, {max_value}]"
    return validator

# 创建不同的验证器
age_validator = create_validator(0, 120)
percentage_validator = create_validator(0, 100)
temperature_validator = create_validator(-273.15, 1000)

# 测试验证器
print("测试函数工厂创建的验证器:")
validators = [
    (age_validator, "年龄验证器", [25, -5, 150]),
    (percentage_validator, "百分比验证器", [85, 105, 50.5]),
    (temperature_validator, "温度验证器", [25.5, -300, "abc"])
]

for validator, name, test_values in validators:
    print(f"\n{name} ({validator.description}):")
    for value in test_values:
        is_valid, message = validator(value)
        status = "✓" if is_valid else "✗"
        print(f"  {status} {message}")

输出结果:

测试函数工厂创建的验证器:

年龄验证器 (验证范围: [0, 120]):
    25 有效
    -5 小于最小值 0
    150 大于最大值 120

百分比验证器 (验证范围: [0, 100]):
    85 有效
    105 大于最大值 100
    50.5 有效

温度验证器 (验证范围: [-273.15, 1000]):
    25.5 有效
    -300 小于最小值 -273.15
   值必须是数字,得到: str

10.6.6 配置化函数

闭包可以创建具有预设配置的函数:

# 配置化函数示例
def create_formatter(prefix="", suffix="", separator=" "):
    """创建格式化函数"""
    def formatter(*args):
        # 将所有参数转换为字符串并连接
        content = separator.join(str(arg) for arg in args)
        return f"{prefix}{content}{suffix}"

    # 添加配置信息
    formatter.config = {
        'prefix': prefix,
        'suffix': suffix,
        'separator': separator
    }

    return formatter

# 创建不同配置的格式化函数
html_formatter = create_formatter("<p>", "</p>")
csv_formatter = create_formatter("", "", ",")
log_formatter = create_formatter("[LOG] ", " [END]")
bracket_formatter = create_formatter("(", ")", ", ")

# 测试配置化函数
print("测试配置化函数:")
formatters = [
    (html_formatter, "HTML格式化器", ["Hello", "World"]),
    (csv_formatter, "CSV格式化器", ["Name", "Age", "City"]),
    (log_formatter, "日志格式化器", ["Error", "File not found"]),
    (bracket_formatter, "括号格式化器", [1, 2, 3, 4])
]

for formatter, name, test_data in formatters:
    result = formatter(*test_data)
    config = formatter.config
    print(f"\n{name}:")
    print(f"  配置: {config}")
    print(f"  输入: {test_data}")
    print(f"  输出: {result}")

输出结果:

测试配置化函数:

HTML格式化器:
  配置: {'prefix': '<p>', 'suffix': '</p>', 'separator': ' '}
  输入: ['Hello', 'World']
  输出: <p>Hello World</p>

CSV格式化器:
  配置: {'prefix': '', 'suffix': '', 'separator': ','}
  输入: ['Name', 'Age', 'City']
  输出: Name,Age,City

日志格式化器:
  配置: {'prefix': '[LOG] ', 'suffix': ' [END]', 'separator': ' '}
  输入: ['Error', 'File not found']
  输出: [LOG] Error File not found [END]

括号格式化器:
  配置: {'prefix': '(', 'suffix': ')', 'separator': ', '}
  输入: [1, 2, 3, 4]
  输出: (1, 2, 3, 4)

10.6.7 事件处理闭包

闭包在事件处理中的应用:

# 事件处理闭包示例
def create_event_handler(event_type, handler_name):
    """创建事件处理器"""
    call_count = 0

    def handle_event(data):
        nonlocal call_count
        call_count += 1
        timestamp = time.strftime("%H:%M:%S")
        print(f"[{timestamp}] {handler_name} 处理 {event_type} 事件 (第{call_count}次)")
        print(f"  事件数据: {data}")
        return f"处理完成: {event_type} - {data}"

    # 添加处理器信息
    handle_event.info = {
        'event_type': event_type,
        'handler_name': handler_name,
        'call_count': lambda: call_count
    }

    return handle_event

# 创建事件处理器
click_handler = create_event_handler("click", "按钮点击处理器")
keypress_handler = create_event_handler("keypress", "键盘输入处理器")
mouse_handler = create_event_handler("mouseover", "鼠标悬停处理器")

# 模拟事件处理
print("模拟事件处理:")
events = [
    (click_handler, "button_1"),
    (keypress_handler, "Enter"),
    (click_handler, "button_2"),
    (mouse_handler, "menu_item"),
    (click_handler, "button_1"),
    (keypress_handler, "Escape")
]

for handler, data in events:
    result = handler(data)
    print(f"  返回: {result}")
    print()

# 显示处理器统计信息
print("处理器统计信息:")
handlers = [click_handler, keypress_handler, mouse_handler]
for handler in handlers:
    info = handler.info
    print(f"{info['handler_name']}: {info['call_count']()} 次调用")

输出结果:

模拟事件处理:
[10:30:45] 按钮点击处理器 处理 click 事件 (第1次)
  事件数据: button_1
  返回: 处理完成: click - button_1

[10:30:45] 键盘输入处理器 处理 keypress 事件 (第1次)
  事件数据: Enter
  返回: 处理完成: keypress - Enter

[10:30:45] 按钮点击处理器 处理 click 事件 (第2次)
  事件数据: button_2
  返回: 处理完成: click - button_2

[10:30:45] 鼠标悬停处理器 处理 mouseover 事件 (第1次)
  事件数据: menu_item
  返回: 处理完成: mouseover - menu_item

[10:30:45] 按钮点击处理器 处理 click 事件 (第3次)
  事件数据: button_1
  返回: 处理完成: click - button_1

[10:30:45] 键盘输入处理器 处理 keypress 事件 (第2次)
  事件数据: Escape
  返回: 处理完成: keypress - Escape

处理器统计信息:
按钮点击处理器: 3 次调用
键盘输入处理器: 2 次调用
鼠标悬停处理器: 1 次调用

10.6.8 闭包作用域链

理解闭包的作用域链:

# 闭包作用域链示例
global_var = "全局变量"

def level1_function(level1_var):
    """第一层函数"""
    print(f"Level 1: {level1_var}")

    def level2_function(level2_var):
        """第二层函数"""
        print(f"Level 2: {level2_var}")

        def level3_function(level3_var):
            """第三层函数,可以访问所有外层变量"""
            print(f"Level 3: {level3_var}")
            print(f"访问Level 2变量: {level2_var}")
            print(f"访问Level 1变量: {level1_var}")
            print(f"访问全局变量: {global_var}")

            # 显示作用域链
            return {
                'level3': level3_var,
                'level2': level2_var,
                'level1': level1_var,
                'global': global_var
            }

        return level3_function

    return level2_function

# 创建嵌套闭包
print("创建嵌套闭包:")
closure_level2 = level1_function("Level1数据")
closure_level3 = closure_level2("Level2数据")

print("\n调用最内层闭包:")
scope_chain = closure_level3("Level3数据")
print(f"\n作用域链数据: {scope_chain}")

输出结果:

创建嵌套闭包:
Level 1: Level1数据
Level 2: Level2数据

调用最内层闭包:
Level 3: Level3数据
访问Level 2变量: Level2数据
访问Level 1变量: Level1数据
访问全局变量: 全局变量

作用域链数据: {'level3': 'Level3数据', 'level2': 'Level2数据', 'level1': 'Level1数据', 'global': '全局变量'}

10.6.9 nonlocal关键字

使用nonlocal修改外部函数的变量:

# nonlocal关键字示例
def create_bank_account(initial_balance):
    """创建银行账户闭包"""
    balance = initial_balance
    transaction_history = []

    def deposit(amount):
        """存款"""
        nonlocal balance
        if amount > 0:
            balance += amount
            transaction_history.append(f"存款: +{amount}")
            print(f"存款 {amount},当前余额: {balance}")
        else:
            print("存款金额必须大于0")

    def withdraw(amount):
        """取款"""
        nonlocal balance
        if amount > 0:
            if balance >= amount:
                balance -= amount
                transaction_history.append(f"取款: -{amount}")
                print(f"取款 {amount},当前余额: {balance}")
            else:
                print(f"余额不足,当前余额: {balance}")
        else:
            print("取款金额必须大于0")

    def get_balance():
        """查询余额"""
        return balance

    def get_history():
        """查询交易历史"""
        return transaction_history.copy()

    # 返回账户操作函数
    return {
        'deposit': deposit,
        'withdraw': withdraw,
        'balance': get_balance,
        'history': get_history
    }

# 创建银行账户
print("创建银行账户:")
account = create_bank_account(1000)

print(f"初始余额: {account['balance']()}")

# 进行一系列操作
print("\n进行银行操作:")
account['deposit'](500)
account['withdraw'](200)
account['withdraw'](1500)  # 余额不足
account['deposit'](100)
account['withdraw'](300)

print(f"\n最终余额: {account['balance']()}")
print(f"交易历史: {account['history']()}")

输出结果:

创建银行账户:
初始余额: 1000

进行银行操作:
存款 500,当前余额: 1500
取款 200,当前余额: 1300
余额不足,当前余额: 1300
存款 100,当前余额: 1400
取款 300,当前余额: 1100

最终余额: 1100
交易历史: ['存款: +500', '取款: -200', '存款: +100', '取款: -300']

10.6.10 闭包陷阱

需要注意的闭包陷阱:

# 闭包陷阱示例
def create_functions_wrong():
    """错误的闭包创建方式"""
    functions = []
    for i in range(3):
        # 错误:所有函数都会引用同一个变量i
        def func():
            return f"函数返回: {i}"
        functions.append(func)
    return functions

def create_functions_correct():
    """正确的闭包创建方式"""
    functions = []
    for i in range(3):
        # 正确:使用默认参数捕获当前的i值
        def func(x=i):
            return f"函数返回: {x}"
        functions.append(func)
    return functions

def create_functions_lambda():
    """使用lambda的正确方式"""
    return [lambda x=i: f"Lambda返回: {x}" for i in range(3)]

# 测试闭包陷阱
print("测试闭包陷阱:")

print("\n错误的闭包创建:")
wrong_functions = create_functions_wrong()
for idx, func in enumerate(wrong_functions):
    print(f"函数{idx}: {func()}")

print("\n正确的闭包创建:")
correct_functions = create_functions_correct()
for idx, func in enumerate(correct_functions):
    print(f"函数{idx}: {func()}")

print("\n使用Lambda的正确方式:")
lambda_functions = create_functions_lambda()
for idx, func in enumerate(lambda_functions):
    print(f"Lambda{idx}: {func()}")

输出结果:

测试闭包陷阱:

错误的闭包创建:
函数0: 函数返回: 2
函数1: 函数返回: 2
函数2: 函数返回: 2

正确的闭包创建:
函数0: 函数返回: 0
函数1: 函数返回: 1
函数2: 函数返回: 2

使用Lambda的正确方式:
Lambda0: Lambda返回: 0
Lambda1: Lambda返回: 1
Lambda2: Lambda返回: 2

10.6.11 闭包的应用场景

闭包在以下场景中特别有用:

  1. 装饰器实现:保持装饰器的状态和配置
  2. 函数工厂:创建具有特定行为的函数
  3. 回调函数:保持回调函数的上下文信息
  4. 配置化函数:创建预配置的函数
  5. 状态保持:在函数调用间保持状态
  6. 数据封装:创建私有变量和方法
  7. 事件处理:保持事件处理器的状态
  8. 缓存机制:实现函数级别的缓存

闭包是Python中一个强大的特性,它为函数式编程提供了重要支持,使得代码更加灵活和模块化。理解闭包的工作原理对于掌握Python的高级特性至关重要。

10.7 上下文管理器(Context Manager)

上下文管理器是Python中用于资源管理的重要机制,它确保资源的正确获取和释放。通过with语句,上下文管理器提供了一种优雅的方式来处理需要清理的资源,如文件、网络连接、数据库连接等。

10.7.1 上下文管理器协议

上下文管理器需要实现两个特殊方法:
- __enter__():进入上下文时调用,返回资源对象
- __exit__(exc_type, exc_val, exc_tb):退出上下文时调用,处理清理工作

10.7.2 自定义上下文管理器类

使用类实现上下文管理器:

# 自定义计时器上下文管理器
import time

class Timer:
    """计时器上下文管理器"""

    def __init__(self, name="操作"):
        self.name = name
        self.start_time = None
        self.end_time = None

    def __enter__(self):
        """进入上下文"""
        print(f"开始执行 {self.name}...")
        self.start_time = time.time()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """退出上下文"""
        self.end_time = time.time()
        duration = self.end_time - self.start_time

        if exc_type is None:
            print(f"{self.name} 执行完成,耗时: {duration:.4f}秒")
        else:
            print(f"{self.name} 执行出错,耗时: {duration:.4f}秒")
            print(f"异常类型: {exc_type.__name__}")
            print(f"异常信息: {exc_val}")

        # 返回False表示不抑制异常
        return False

    @property
    def elapsed_time(self):
        """获取执行时间"""
        if self.start_time and self.end_time:
            return self.end_time - self.start_time
        return None

# 使用计时器上下文管理器
print("测试计时器上下文管理器:")

# 正常执行
with Timer("数据处理") as timer:
    # 模拟一些工作
    time.sleep(0.1)
    result = sum(range(1000000))
    print(f"计算结果: {result}")

print(f"通过属性获取执行时间: {timer.elapsed_time:.4f}秒")

# 异常情况
print("\n测试异常情况:")
try:
    with Timer("错误操作"):
        time.sleep(0.05)
        raise ValueError("模拟错误")
except ValueError as e:
    print(f"捕获异常: {e}")

输出结果:

测试计时器上下文管理器:
开始执行 数据处理...
计算结果: 499999500000
数据处理 执行完成,耗时: 0.1234
通过属性获取执行时间: 0.1234

测试异常情况:
开始执行 错误操作...
错误操作 执行出错,耗时: 0.0567
异常类型: ValueError
异常信息: 模拟错误
捕获异常: 模拟错误

10.7.3 文件管理上下文管理器

创建安全的文件操作上下文管理器:

# 文件管理上下文管理器
import os
import tempfile

class FileManager:
    """文件管理上下文管理器"""

    def __init__(self, filename, mode='r', encoding='utf-8', backup=False):
        self.filename = filename
        self.mode = mode
        self.encoding = encoding
        self.backup = backup
        self.file = None
        self.backup_file = None

    def __enter__(self):
        """进入上下文,打开文件"""
        print(f"打开文件: {self.filename} (模式: {self.mode})")

        # 如果是写模式且需要备份
        if 'w' in self.mode and self.backup and os.path.exists(self.filename):
            self.backup_file = f"{self.filename}.backup"
            print(f"创建备份文件: {self.backup_file}")
            with open(self.filename, 'r', encoding=self.encoding) as src:
                with open(self.backup_file, 'w', encoding=self.encoding) as dst:
                    dst.write(src.read())

        try:
            self.file = open(self.filename, self.mode, encoding=self.encoding)
            return self.file
        except Exception as e:
            print(f"打开文件失败: {e}")
            raise

    def __exit__(self, exc_type, exc_val, exc_tb):
        """退出上下文,关闭文件"""
        if self.file:
            print(f"关闭文件: {self.filename}")
            self.file.close()

        if exc_type is not None:
            print(f"文件操作出现异常: {exc_type.__name__}: {exc_val}")

            # 如果有备份文件,询问是否恢复
            if self.backup_file and os.path.exists(self.backup_file):
                print("检测到备份文件,可以考虑恢复")

        return False  # 不抑制异常

# 测试文件管理器
print("测试文件管理上下文管理器:")

# 创建测试文件
test_file = "test_context.txt"
with open(test_file, 'w', encoding='utf-8') as f:
    f.write("原始内容\n这是测试文件\n")

# 使用文件管理器读取
print("\n读取文件:")
with FileManager(test_file, 'r') as f:
    content = f.read()
    print(f"文件内容:\n{content}")

# 使用文件管理器写入(带备份)
print("\n写入文件(带备份):")
with FileManager(test_file, 'w', backup=True) as f:
    f.write("新的内容\n这是修改后的文件\n")

# 验证修改
print("\n验证修改:")
with FileManager(test_file, 'r') as f:
    new_content = f.read()
    print(f"修改后的内容:\n{new_content}")

# 检查备份文件
backup_file = f"{test_file}.backup"
if os.path.exists(backup_file):
    print(f"\n备份文件内容:")
    with open(backup_file, 'r', encoding='utf-8') as f:
        backup_content = f.read()
        print(backup_content)

# 清理测试文件
os.remove(test_file)
if os.path.exists(backup_file):
    os.remove(backup_file)

输出结果:

测试文件管理上下文管理器:

读取文件:
打开文件: test_context.txt (模式: r)
文件内容:
原始内容
这是测试文件

关闭文件: test_context.txt

写入文件(带备份):
打开文件: test_context.txt (模式: w)
创建备份文件: test_context.txt.backup
关闭文件: test_context.txt

验证修改:
打开文件: test_context.txt (模式: r)
修改后的内容:
新的内容
这是修改后的文件

关闭文件: test_context.txt

备份文件内容:
原始内容
这是测试文件

10.7.4 使用@contextmanager装饰器

使用contextlib.contextmanager装饰器简化上下文管理器的创建:

# 使用@contextmanager装饰器
from contextlib import contextmanager
import sqlite3
import tempfile
import os

@contextmanager
def database_transaction(db_path):
    """数据库事务上下文管理器"""
    print(f"连接数据库: {db_path}")
    conn = sqlite3.connect(db_path)

    try:
        print("开始事务")
        conn.execute("BEGIN")
        yield conn
        print("提交事务")
        conn.commit()
    except Exception as e:
        print(f"事务回滚: {e}")
        conn.rollback()
        raise
    finally:
        print("关闭数据库连接")
        conn.close()

@contextmanager
def temporary_directory(prefix="temp_"):
    """临时目录上下文管理器"""
    temp_dir = tempfile.mkdtemp(prefix=prefix)
    print(f"创建临时目录: {temp_dir}")

    try:
        yield temp_dir
    finally:
        # 清理临时目录
        import shutil
        print(f"清理临时目录: {temp_dir}")
        shutil.rmtree(temp_dir, ignore_errors=True)

@contextmanager
def change_directory(new_dir):
    """临时改变工作目录"""
    old_dir = os.getcwd()
    print(f"当前目录: {old_dir}")
    print(f"切换到目录: {new_dir}")

    try:
        os.chdir(new_dir)
        yield new_dir
    finally:
        print(f"恢复到原目录: {old_dir}")
        os.chdir(old_dir)

# 测试@contextmanager装饰器
print("测试@contextmanager装饰器:")

# 测试数据库事务
print("\n1. 数据库事务测试:")
db_file = "test.db"

try:
    with database_transaction(db_file) as conn:
        # 创建表
        conn.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                email TEXT UNIQUE
            )
        """)

        # 插入数据
        conn.execute("INSERT INTO users (name, email) VALUES (?, ?)", 
                    ("张三", "zhangsan@example.com"))
        conn.execute("INSERT INTO users (name, email) VALUES (?, ?)", 
                    ("李四", "lisi@example.com"))

        # 查询数据
        cursor = conn.execute("SELECT * FROM users")
        users = cursor.fetchall()
        print(f"插入的用户: {users}")

except Exception as e:
    print(f"数据库操作失败: {e}")
finally:
    if os.path.exists(db_file):
        os.remove(db_file)

# 测试临时目录
print("\n2. 临时目录测试:")
with temporary_directory("my_temp_") as temp_dir:
    print(f"在临时目录中工作: {temp_dir}")

    # 在临时目录中创建文件
    test_file = os.path.join(temp_dir, "test.txt")
    with open(test_file, 'w') as f:
        f.write("这是临时文件")

    print(f"创建文件: {test_file}")
    print(f"文件存在: {os.path.exists(test_file)}")

print(f"临时目录已清理,文件存在: {os.path.exists(test_file) if 'test_file' in locals() else 'N/A'}")

# 测试目录切换
print("\n3. 目录切换测试:")
with change_directory(os.path.expanduser("~")):
    current = os.getcwd()
    print(f"当前工作目录: {current}")

    # 列出当前目录的一些文件
    files = os.listdir('.')[:5]  # 只显示前5个
    print(f"目录中的文件(前5个): {files}")

print(f"最终工作目录: {os.getcwd()}")

输出结果:

测试@contextmanager装饰器:

1. 数据库事务测试:
连接数据库: test.db
开始事务
插入的用户: [(1, '张三', 'zhangsan@example.com'), (2, '李四', 'lisi@example.com')]
提交事务
关闭数据库连接

2. 临时目录测试:
创建临时目录: C:\Users\...\temp_abc123
在临时目录中工作: C:\Users\...\temp_abc123
创建文件: C:\Users\...\temp_abc123\test.txt
文件存在: True
清理临时目录: C:\Users\...\temp_abc123
临时目录已清理,文件存在: False

3. 目录切换测试:
当前目录: C:\Users\yeyupiaoling/Python从入门到精通
切换到目录: C:\Users\yeyupiaoling
当前工作目录: C:\Users\yeyupiaoling
目录中的文件(前5个): ['Desktop', 'Documents', 'Downloads', 'Music', 'Pictures']
恢复到原目录: C:\Users\yeyupiaoling/Python从入门到精通
最终工作目录: C:\Users\yeyupiaoling/Python从入门到精通

10.7.5 contextlib模块的其他工具

contextlib模块提供了多个有用的上下文管理器:

# contextlib模块的其他工具
from contextlib import suppress, ExitStack, closing
import warnings

# 1. suppress - 抑制指定异常
print("1. suppress - 抑制异常:")

# 不使用suppress的情况
print("不使用suppress:")
try:
    int("not_a_number")
except ValueError:
    print("捕获到ValueError")

# 使用suppress
print("\n使用suppress:")
with suppress(ValueError):
    result = int("not_a_number")
    print("这行不会执行")
print("继续执行后续代码")

# suppress多个异常类型
print("\nsuppress多个异常:")
with suppress(ValueError, TypeError, KeyError):
    data = {"key": "value"}
    result = data["nonexistent_key"] + 10
print("异常被抑制,继续执行")

# 2. ExitStack - 管理多个上下文管理器
print("\n2. ExitStack - 管理多个上下文管理器:")

@contextmanager
def managed_resource(name):
    """模拟资源管理"""
    print(f"获取资源: {name}")
    try:
        yield name
    finally:
        print(f"释放资源: {name}")

# 使用ExitStack管理多个资源
with ExitStack() as stack:
    # 动态添加上下文管理器
    resource1 = stack.enter_context(managed_resource("数据库连接"))
    resource2 = stack.enter_context(managed_resource("文件句柄"))
    resource3 = stack.enter_context(managed_resource("网络连接"))

    print(f"使用资源: {resource1}, {resource2}, {resource3}")

    # 可以根据条件添加更多资源
    if True:  # 某个条件
        resource4 = stack.enter_context(managed_resource("缓存连接"))
        print(f"额外资源: {resource4}")

print("所有资源已自动释放")

# 3. 嵌套上下文管理器
print("\n3. 嵌套上下文管理器:")

@contextmanager
def logging_context(operation):
    """日志记录上下文"""
    print(f"[开始] {operation}")
    start_time = time.time()
    try:
        yield
    except Exception as e:
        print(f"[错误] {operation}: {e}")
        raise
    finally:
        end_time = time.time()
        print(f"[结束] {operation} (耗时: {end_time - start_time:.3f}秒)")

# 嵌套使用多个上下文管理器
with logging_context("数据处理流程"):
    with logging_context("数据加载"):
        time.sleep(0.1)
        print("加载数据完成")

    with logging_context("数据转换"):
        time.sleep(0.05)
        print("转换数据完成")

    with logging_context("数据保存"):
        time.sleep(0.08)
        print("保存数据完成")

# 4. 条件上下文管理器
print("\n4. 条件上下文管理器:")

@contextmanager
def conditional_context(condition, context_manager):
    """条件上下文管理器"""
    if condition:
        with context_manager as value:
            yield value
    else:
        yield None

# 使用条件上下文管理器
for use_timer in [True, False]:
    print(f"\n使用计时器: {use_timer}")
    with conditional_context(use_timer, Timer("条件操作")) as timer:
        time.sleep(0.05)
        print("执行一些操作")
        if timer:
            print(f"操作耗时: {timer.elapsed_time}秒")
        else:
            print("未使用计时器")

输出结果:

1. suppress - 抑制异常:
不使用suppress:
捕获到ValueError

使用suppress:
继续执行后续代码

suppress多个异常:
异常被抑制继续执行

2. ExitStack - 管理多个上下文管理器:
获取资源: 数据库连接
获取资源: 文件句柄
获取资源: 网络连接
使用资源: 数据库连接, 文件句柄, 网络连接
获取资源: 缓存连接
额外资源: 缓存连接
释放资源: 缓存连接
释放资源: 网络连接
释放资源: 文件句柄
释放资源: 数据库连接
所有资源已自动释放

3. 嵌套上下文管理器:
[开始] 数据处理流程
[开始] 数据加载
加载数据完成
[结束] 数据加载 (耗时: 0.101)
[开始] 数据转换
转换数据完成
[结束] 数据转换 (耗时: 0.051)
[开始] 数据保存
保存数据完成
[结束] 数据保存 (耗时: 0.081)
[结束] 数据处理流程 (耗时: 0.235)

4. 条件上下文管理器:

使用计时器: True
开始执行 条件操作...
执行一些操作
条件操作 执行完成耗时: 0.0512
操作耗时: 0.0512

使用计时器: False
执行一些操作
未使用计时器

10.7.6 上下文管理器的应用场景

上下文管理器在以下场景中特别有用:

  1. 资源管理:文件、数据库连接、网络连接等
  2. 状态管理:临时改变系统状态,如工作目录、环境变量
  3. 异常处理:确保在异常情况下也能正确清理资源
  4. 性能监控:测量代码块的执行时间
  5. 事务管理:数据库事务的提交和回滚
  6. 锁管理:线程同步中的锁获取和释放
  7. 临时配置:临时修改配置参数
  8. 日志记录:为代码块添加结构化日志

上下文管理器是Python中实现”获取资源,使用资源,释放资源”模式的标准方法,它确保了资源的正确管理,提高了代码的健壮性和可维护性。

10.8 章节总结

本章深入探讨了Python的高级特性,这些特性是Python语言强大和优雅的重要体现:

核心特性回顾

  1. 列表推导式:提供了简洁的列表创建方式,相比传统循环更加Pythonic,性能更优
  2. 字典和集合推导式:扩展了推导式的概念,支持字典和集合的快速构建
  3. 生成器:实现惰性求值,节省内存,支持无限序列和管道处理
  4. 迭代器:提供统一的遍历接口,支持自定义迭代行为
  5. 装饰器:实现横切关注点的分离,如日志、缓存、权限检查等
  6. 闭包:支持函数式编程,实现数据封装和状态保持
  7. 上下文管理器:确保资源的正确管理,提供异常安全的代码结构

最佳实践建议

  1. 合理使用推导式
    - 优先使用推导式而非传统循环
    - 避免过度复杂的嵌套推导式
    - 考虑代码可读性和维护性

  2. 善用生成器
    - 处理大数据集时优先考虑生成器
    - 利用生成器实现管道处理
    - 合理使用yieldsend方法

  3. 装饰器设计原则
    - 保持装饰器的单一职责
    - 使用functools.wraps保持函数元信息
    - 考虑装饰器的组合和顺序

  4. 闭包应用场景
    - 实现函数工厂和配置化函数
    - 避免闭包陷阱,注意变量绑定时机
    - 合理使用nonlocal关键字

  5. 上下文管理器使用
    - 优先使用with语句管理资源
    - 自定义上下文管理器时确保异常安全
    - 利用contextlib模块简化实现

性能考虑

  • 推导式 vs 循环:推导式通常性能更好,但差异不大
  • 生成器 vs 列表:生成器在内存使用上有显著优势
  • 装饰器开销:装饰器会增加函数调用开销,但通常可以忽略
  • 闭包性能:闭包访问外部变量比局部变量稍慢

学习建议

  1. 循序渐进:从简单的推导式开始,逐步掌握复杂特性
  2. 实践应用:在实际项目中应用这些特性,加深理解
  3. 阅读源码:研究优秀开源项目中这些特性的使用
  4. 性能测试:在关键代码路径上进行性能测试和优化

掌握这些Python高级特性将显著提升你的编程水平,让你能够编写更加优雅、高效和Pythonic的代码。这些特性不仅是语法糖,更是Python哲学”优雅胜于丑陋,简洁胜于复杂”的具体体现。

小夜