第14章 爬虫与自动化

网络爬虫是现代数据获取和自动化处理的重要技术手段,通过模拟浏览器行为自动访问网页并提取所需信息。本章将从基础概念开始,逐步深入到高级爬虫框架和自动化技术,帮助读者掌握完整的爬虫开发技能。

14.1 网络爬虫基础

爬虫概述

网络爬虫的定义和用途

网络爬虫(Web Crawler),也称为网页蜘蛛(Web Spider)或网络机器人(Web Robot),是一种按照一定规则自动浏览万维网并获取信息的程序。爬虫的主要用途包括:

  1. 数据采集:从网站获取商品信息、新闻资讯、股票价格等
  2. 搜索引擎:为搜索引擎建立索引数据库
  3. 市场分析:收集竞争对手信息,进行市场调研
  4. 内容监控:监控网站内容变化,及时获取更新
  5. 学术研究:收集研究数据,进行数据分析

爬虫的工作原理

网络爬虫的基本工作流程如下:

  1. 发送HTTP请求:向目标网站发送请求
  2. 接收响应数据:获取服务器返回的HTML页面
  3. 解析页面内容:提取所需的数据信息
  4. 存储数据:将提取的数据保存到文件或数据库
  5. 发现新链接:从当前页面中发现新的URL
  6. 重复过程:对新发现的URL重复上述过程

让我们通过一个简单的示例来理解爬虫的基本原理:

import requests
from bs4 import BeautifulSoup
import time

def simple_crawler(url):
    """
    简单的网页爬虫示例
    """
    try:
        # 1. 发送HTTP请求
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
        response = requests.get(url, headers=headers)

        # 2. 检查响应状态
        if response.status_code == 200:
            # 3. 解析页面内容
            soup = BeautifulSoup(response.text, 'html.parser')

            # 4. 提取标题
            title = soup.find('title')
            if title:
                print(f"页面标题: {title.get_text().strip()}")

            # 5. 提取所有链接
            links = soup.find_all('a', href=True)
            print(f"找到 {len(links)} 个链接:")

            for i, link in enumerate(links[:5]):  # 只显示前5个链接
                href = "https://yeyupiaoling.cn" + link['href']
                text = link.get_text().strip()
                print(f"{i+1}. {text} -> {href}")

        else:
            print(f"请求失败,状态码: {response.status_code}")

    except Exception as e:
        print(f"爬取过程中出现错误: {e}")

# 使用示例
if __name__ == "__main__":
    url = "https://yeyupiaoling.cn"
    simple_crawler(url)

运行上述代码,输出类似如下:

页面标题: 夜雨飘零的博客 - 首页
找到 50 个链接:
1.  -> https://yeyupiaoling.cn/
2. 夜雨飘零 -> https://yeyupiaoling.cn/
3. 首页 -> https://yeyupiaoling.cn/
4. 归档 -> https://yeyupiaoling.cn/archive
5. 标签 -> https://yeyupiaoling.cn/tag

爬虫的分类和特点

根据不同的分类标准,爬虫可以分为以下几类:

按照爬取范围分类:
- 通用爬虫:搜索引擎使用的爬虫,爬取整个互联网
- 聚焦爬虫:针对特定主题或网站的爬虫
- 增量爬虫:只爬取新增或更新的内容

按照技术实现分类:
- 静态爬虫:只能处理静态HTML页面
- 动态爬虫:能够处理JavaScript渲染的动态页面

按照爬取深度分类:
- 浅层爬虫:只爬取首页或少数几层页面
- 深层爬虫:能够深入爬取网站的多层结构

爬虫的法律和道德考量

在进行网络爬虫开发时,必须遵守相关的法律法规和道德准则:

  1. 遵守robots.txt协议:检查网站的robots.txt文件
  2. 控制爬取频率:避免对服务器造成过大压力
  3. 尊重版权:不要爬取受版权保护的内容
  4. 保护隐私:不要爬取个人隐私信息
  5. 合理使用数据:仅将爬取的数据用于合法目的

HTTP协议基础

HTTP请求和响应

HTTP(HyperText Transfer Protocol)是网络爬虫与Web服务器通信的基础协议。理解HTTP协议对于开发高效的爬虫至关重要。

HTTP通信包含两个主要部分:
- 请求(Request):客户端向服务器发送的消息
- 响应(Response):服务器返回给客户端的消息

让我们通过代码来观察HTTP请求和响应的详细信息:

import requests
import json

def analyze_http_communication(url):
    """
    分析HTTP请求和响应的详细信息
    """
    # 创建会话对象
    session = requests.Session()

    # 设置请求头
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
        'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
        'Accept-Encoding': 'gzip, deflate',
        'Connection': 'keep-alive',
    }

    try:
        # 发送请求
        response = session.get(url, headers=headers)

        print("=== HTTP请求信息 ===")
        print(f"请求URL: {response.request.url}")
        print(f"请求方法: {response.request.method}")
        print("请求头:")
        for key, value in response.request.headers.items():
            print(f"  {key}: {value}")

        print("\n=== HTTP响应信息 ===")
        print(f"状态码: {response.status_code}")
        print(f"响应原因: {response.reason}")
        print(f"响应时间: {response.elapsed.total_seconds():.3f}秒")
        print("响应头:")
        for key, value in response.headers.items():
            print(f"  {key}: {value}")

        print(f"\n响应内容长度: {len(response.text)} 字符")
        print(f"响应内容类型: {response.headers.get('Content-Type', 'Unknown')}")

    except requests.RequestException as e:
        print(f"请求失败: {e}")

# 使用示例
if __name__ == "__main__":
    analyze_http_communication("https://yeyupiaoling.cn/")

运行结果示例:

=== HTTP请求信息 ===
请求URL: https://yeyupiaoling.cn/
请求方法: GET
请求头:
  User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36
  Accept-Encoding: gzip, deflate
  Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
  Connection: keep-alive
  Accept-Language: zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3

=== HTTP响应信息 ===
状态码: 200
响应原因: OK
响应时间: 0.197秒
响应头:
  Server: nginx/1.18.0 (Ubuntu)
  Date: Sat, 16 Aug 2025 04:36:49 GMT
  Content-Type: text/html; charset=utf-8
  Transfer-Encoding: chunked
  Connection: keep-alive
  Vary: Cookie
  Content-Encoding: gzip

响应内容长度: 29107 字符
响应内容类型: text/html; charset=utf-8

Cookie和Session机制

Cookie和Session是Web应用中维持用户状态的重要机制:

  • Cookie:存储在客户端的小型数据文件
  • Session:存储在服务器端的用户会话信息

在爬虫开发中,正确处理Cookie和Session对于模拟用户登录和维持会话状态至关重要:

import requests
from http.cookies import SimpleCookie

def demonstrate_cookies_and_sessions():
    """
    演示Cookie和Session的使用
    """
    # 创建会话对象
    session = requests.Session()

    print("=== Cookie操作演示 ===")

    # 1. 设置Cookie
    cookie_url = "https://httpbin.org/cookies/set"
    cookie_params = {
        'username': 'testuser',
        'session_id': 'abc123',
        'preferences': 'dark_theme'
    }

    # 设置Cookie(这会导致重定向)
    response = session.get(cookie_url, params=cookie_params)
    print(f"设置Cookie后的状态码: {response.status_code}")

    # 2. 查看当前Cookie
    print("\n当前会话中的Cookie:")
    for cookie in session.cookies:
        print(f"  {cookie.name} = {cookie.value}")

    # 3. 发送带Cookie的请求
    cookie_test_url = "https://httpbin.org/cookies"
    response = session.get(cookie_test_url)
    if response.status_code == 200:
        cookies_data = response.json()
        print(f"\n服务器接收到的Cookie: {cookies_data.get('cookies', {})}")

    # 4. 手动设置Cookie
    print("\n=== 手动Cookie操作 ===")
    manual_session = requests.Session()

    # 方法1:通过字典设置
    manual_session.cookies.update({
        'user_id': '12345',
        'auth_token': 'xyz789'
    })

    # 方法2:通过set方法设置
    manual_session.cookies.set('language', 'zh-CN', domain='httpbin.org')

    # 测试手动设置的Cookie
    response = manual_session.get("https://httpbin.org/cookies")
    if response.status_code == 200:
        cookies_data = response.json()
        print(f"手动设置的Cookie: {cookies_data.get('cookies', {})}")

    # 5. Cookie持久化
    print("\n=== Cookie持久化 ===")

    # 保存Cookie到文件
    import pickle

    # 保存Cookie
    with open('cookies.pkl', 'wb') as f:
        pickle.dump(session.cookies, f)
    print("Cookie已保存到文件")

    # 加载Cookie
    new_session = requests.Session()
    try:
        with open('cookies.pkl', 'rb') as f:
            new_session.cookies = pickle.load(f)
        print("Cookie已从文件加载")

        # 测试加载的Cookie
        response = new_session.get("https://httpbin.org/cookies")
        if response.status_code == 200:
            cookies_data = response.json()
            print(f"加载的Cookie: {cookies_data.get('cookies', {})}")
    except FileNotFoundError:
        print("Cookie文件不存在")

# 模拟登录示例
def simulate_login_with_session():
    """
    模拟网站登录过程
    """
    print("\n=== 模拟登录流程 ===")

    session = requests.Session()

    # 1. 访问登录页面(获取必要的Cookie和token)
    login_page_url = "https://httpbin.org/cookies/set/csrf_token/abc123def456"
    response = session.get(login_page_url)
    print(f"访问登录页面: {response.status_code}")

    # 2. 提交登录表单
    login_data = {
        'username': 'testuser',
        'password': 'testpass',
        'csrf_token': 'abc123def456'
    }

    login_url = "https://httpbin.org/post"
    response = session.post(login_url, data=login_data)

    if response.status_code == 200:
        print("登录请求发送成功")
        response_data = response.json()
        print(f"提交的登录数据: {response_data.get('form', {})}")

    # 3. 访问需要登录的页面
    protected_url = "https://httpbin.org/cookies"
    response = session.get(protected_url)

    if response.status_code == 200:
        print("成功访问受保护页面")
        cookies_data = response.json()
        print(f"当前会话Cookie: {cookies_data.get('cookies', {})}")

# 运行演示
if __name__ == "__main__":
    demonstrate_cookies_and_sessions()
    simulate_login_with_session()

运行结果:

=== Cookie操作演示 ===
设置Cookie后的状态码: 200

当前会话中的Cookie:
  username = testuser
  session_id = abc123
  preferences = dark_theme

服务器接收到的Cookie: {'username': 'testuser', 'session_id': 'abc123', 'preferences': 'dark_theme'}

=== 手动Cookie操作 ===
手动设置的Cookie: {'user_id': '12345', 'auth_token': 'xyz789', 'language': 'zh-CN'}

=== Cookie持久化 ===
Cookie已保存到文件
Cookie已从文件加载
加载的Cookie: {'username': 'testuser', 'session_id': 'abc123', 'preferences': 'dark_theme'}

=== 模拟登录流程 ===
访问登录页面: 200
登录请求发送成功
提交的登录数据: {'username': 'testuser', 'password': 'testpass', 'csrf_token': 'abc123def456'}
成功访问受保护页面
当前会话Cookie: {'csrf_token': 'abc123def456'}

网页结构分析

HTML基础结构

理解HTML结构是网页数据提取的基础。HTML(HyperText Markup Language)使用标签来定义网页内容的结构和语义。

一个典型的HTML页面结构如下:

<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>页面标题</title>
    <link rel="stylesheet" href="style.css">
</head>
<body>
    <header>
        <nav>
            <ul>
                <li><a href="#home">首页</a></li>
                <li><a href="#about">关于</a></li>
            </ul>
        </nav>
    </header>

    <main>
        <article>
            <h1>文章标题</h1>
            <p class="content">文章内容...</p>
        </article>
    </main>

    <footer>
        <p>&copy; 2024 版权信息</p>
    </footer>

    <script src="script.js"></script>
</body>
</html>

让我们编写一个HTML结构分析工具:

import requests
from bs4 import BeautifulSoup
from collections import Counter

def analyze_html_structure(url):
    """
    分析网页的HTML结构
    """
    try:
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
        response = requests.get(url, headers=headers)

        if response.status_code == 200:
            soup = BeautifulSoup(response.text, 'html.parser')

            print(f"=== HTML结构分析: {url} ===")

            # 1. 基本信息
            title = soup.find('title')
            print(f"页面标题: {title.get_text().strip() if title else '无标题'}")

            # 2. 文档类型和编码
            doctype = soup.contents[0] if soup.contents and hasattr(soup.contents[0], 'string') else None
            print(f"文档类型: {doctype if doctype else 'HTML5'}")

            charset_meta = soup.find('meta', attrs={'charset': True})
            if not charset_meta:
                charset_meta = soup.find('meta', attrs={'http-equiv': 'Content-Type'})
            encoding = charset_meta.get('charset') if charset_meta else response.encoding
            print(f"字符编码: {encoding}")

            # 3. 标签统计
            all_tags = [tag.name for tag in soup.find_all()]
            tag_counter = Counter(all_tags)
            print(f"\n标签统计 (前10个):")
            for tag, count in tag_counter.most_common(10):
                print(f"  {tag}: {count}个")

            # 4. 链接分析
            links = soup.find_all('a', href=True)
            print(f"\n链接分析:")
            print(f"  总链接数: {len(links)}")

            internal_links = []
            external_links = []

            for link in links:
                href = link['href']
                if href.startswith('http'):
                    if url in href:
                        internal_links.append(href)
                    else:
                        external_links.append(href)
                elif href.startswith('/'):
                    internal_links.append(href)

            print(f"  内部链接: {len(internal_links)}个")
            print(f"  外部链接: {len(external_links)}个")

            # 5. 图片分析
            images = soup.find_all('img')
            print(f"\n图片分析:")
            print(f"  图片总数: {len(images)}")

            img_with_alt = [img for img in images if img.get('alt')]
            print(f"  有alt属性: {len(img_with_alt)}个")

            # 6. 表单分析
            forms = soup.find_all('form')
            print(f"\n表单分析:")
            print(f"  表单总数: {len(forms)}")

            for i, form in enumerate(forms):
                method = form.get('method', 'GET').upper()
                action = form.get('action', '当前页面')
                inputs = form.find_all(['input', 'select', 'textarea'])
                print(f"  表单{i+1}: {method} -> {action} ({len(inputs)}个字段)")

            # 7. 脚本和样式
            scripts = soup.find_all('script')
            stylesheets = soup.find_all('link', rel='stylesheet')

            print(f"\n资源分析:")
            print(f"  JavaScript文件: {len(scripts)}个")
            print(f"  CSS样式表: {len(stylesheets)}个")

            # 8. 结构层次
            print(f"\n页面结构:")
            body = soup.find('body')
            if body:
                print_structure(body, level=0, max_level=3)

        else:
            print(f"请求失败,状态码: {response.status_code}")

    except Exception as e:
        print(f"分析过程中出现错误: {e}")

def print_structure(element, level=0, max_level=3):
    """
    递归打印HTML结构
    """
    if level > max_level:
        return

    indent = "  " * level
    tag_name = element.name

    # 获取重要属性
    attrs = []
    if element.get('id'):
        attrs.append(f"id='{element['id']}'")
    if element.get('class'):
        classes = ' '.join(element['class'])
        attrs.append(f"class='{classes}'")

    attr_str = f" [{', '.join(attrs)}]" if attrs else ""
    print(f"{indent}<{tag_name}>{attr_str}")

    # 递归处理子元素
    for child in element.children:
        if hasattr(child, 'name') and child.name:
            print_structure(child, level + 1, max_level)

# 使用示例
if __name__ == "__main__":
    # 分析一个示例网页
    analyze_html_structure("https://httpbin.org/html")

运行结果示例:

=== HTML结构分析: https://httpbin.org/html ===
页面标题: Herman Melville - Moby-Dick
文档类型: HTML5
字符编码: utf-8

标签统计 (前10个):
  p: 4个
  a: 3个
  h1: 1个
  body: 1个
  html: 1个
  head: 1个
  title: 1个

链接分析:
  总链接数: 3个
  内部链接: 0个
  外部链接: 3个

图片分析:
  图片总数: 0个
  有alt属性: 0个

表单分析:
  表单总数: 0个

资源分析:
  JavaScript文件: 0个
  CSS样式表: 0个

页面结构:
<body>
  <h1>
  <p>
  <p>
  <p>
  <p>

CSS选择器

CSS选择器是定位HTML元素的强大工具,在网页数据提取中起着关键作用。理解CSS选择器语法对于精确定位目标元素至关重要。

基本选择器:
- 标签选择器divpa
- 类选择器.class-name
- ID选择器#element-id
- 属性选择器[attribute="value"]

组合选择器:
- 后代选择器div p(div内的所有p元素)
- 子元素选择器div > p(div的直接子p元素)
- 相邻兄弟选择器h1 + p(紧跟h1的p元素)
- 通用兄弟选择器h1 ~ p(h1后的所有同级p元素)

伪类选择器:
- :first-child:last-child:nth-child(n)
- :not(selector):contains(text)

让我们通过实例来学习CSS选择器的使用:

import requests
from bs4 import BeautifulSoup

def demonstrate_css_selectors():
    """
    演示CSS选择器的使用
    """
    # 创建示例HTML
    html_content = """
    <!DOCTYPE html>
    <html>
    <head>
        <title>CSS选择器示例</title>
    </head>
    <body>
        <div class="container">
            <h1 id="main-title">新闻列表</h1>
            <div class="news-section">
                <article class="news-item featured">
                    <h2>重要新闻标题1</h2>
                    <p class="summary">这是新闻摘要...</p>
                    <span class="date">2024-01-15</span>
                    <a href="/news/1" class="read-more">阅读更多</a>
                </article>
                <article class="news-item">
                    <h2>普通新闻标题2</h2>
                    <p class="summary">这是另一个新闻摘要...</p>
                    <span class="date">2024-01-14</span>
                    <a href="/news/2" class="read-more">阅读更多</a>
                </article>
                <article class="news-item">
                    <h2>普通新闻标题3</h2>
                    <p class="summary">第三个新闻摘要...</p>
                    <span class="date">2024-01-13</span>
                    <a href="/news/3" class="read-more">阅读更多</a>
                </article>
            </div>
            <aside class="sidebar">
                <h3>热门标签</h3>
                <ul class="tag-list">
                    <li><a href="/tag/tech" data-category="technology">科技</a></li>
                    <li><a href="/tag/sports" data-category="sports">体育</a></li>
                    <li><a href="/tag/finance" data-category="finance">财经</a></li>
                </ul>
            </aside>
        </div>
    </body>
    </html>
    """

    soup = BeautifulSoup(html_content, 'html.parser')

    print("=== CSS选择器演示 ===")

    # 1. 基本选择器
    print("\n1. 基本选择器:")

    # 标签选择器
    h2_elements = soup.select('h2')
    print(f"所有h2标签 ({len(h2_elements)}个):")
    for h2 in h2_elements:
        print(f"  - {h2.get_text().strip()}")

    # 类选择器
    news_items = soup.select('.news-item')
    print(f"\n所有新闻项 ({len(news_items)}个):")
    for i, item in enumerate(news_items, 1):
        title = item.select_one('h2').get_text().strip()
        print(f"  {i}. {title}")

    # ID选择器
    main_title = soup.select_one('#main-title')
    print(f"\n主标题: {main_title.get_text().strip()}")

    # 属性选择器
    tech_links = soup.select('a[data-category="technology"]')
    print(f"\n科技类链接 ({len(tech_links)}个):")
    for link in tech_links:
        print(f"  - {link.get_text().strip()} -> {link.get('href')}")

    # 2. 组合选择器
    print("\n2. 组合选择器:")

    # 后代选择器
    container_links = soup.select('.container a')
    print(f"容器内所有链接 ({len(container_links)}个):")
    for link in container_links:
        text = link.get_text().strip()
        href = link.get('href', '#')
        print(f"  - {text} -> {href}")

    # 子元素选择器
    direct_children = soup.select('.news-section > .news-item')
    print(f"\n新闻区域的直接子元素 ({len(direct_children)}个)")

    # 相邻兄弟选择器
    after_h2 = soup.select('h2 + p')
    print(f"\nh2后的相邻p元素 ({len(after_h2)}个):")
    for p in after_h2:
        print(f"  - {p.get_text().strip()[:30]}...")

    # 3. 伪类选择器
    print("\n3. 伪类选择器:")

    # 第一个和最后一个子元素
    first_news = soup.select('.news-item:first-child')
    last_news = soup.select('.news-item:last-child')

    if first_news:
        first_title = first_news[0].select_one('h2').get_text().strip()
        print(f"第一个新闻: {first_title}")

    if last_news:
        last_title = last_news[0].select_one('h2').get_text().strip()
        print(f"最后一个新闻: {last_title}")

    # nth-child选择器
    second_news = soup.select('.news-item:nth-child(2)')
    if second_news:
        second_title = second_news[0].select_one('h2').get_text().strip()
        print(f"第二个新闻: {second_title}")

    # 4. 复杂选择器组合
    print("\n4. 复杂选择器:")

    # 选择特色新闻的标题
    featured_title = soup.select('.news-item.featured h2')
    if featured_title:
        print(f"特色新闻标题: {featured_title[0].get_text().strip()}")

    # 选择包含特定文本的元素
    read_more_links = soup.select('a.read-more')
    print(f"'阅读更多'链接 ({len(read_more_links)}个)")

    # 选择具有特定属性的元素
    category_links = soup.select('a[data-category]')
    print(f"有分类属性的链接 ({len(category_links)}个):")
    for link in category_links:
        category = link.get('data-category')
        text = link.get_text().strip()
        print(f"  - {text} (分类: {category})")

# 实际网页CSS选择器应用
def extract_data_with_css_selectors(url):
    """
    使用CSS选择器从实际网页提取数据
    """
    try:
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
        response = requests.get(url, headers=headers)

        if response.status_code == 200:
            soup = BeautifulSoup(response.text, 'html.parser')

            print(f"\n=== 从 {url} 提取数据 ===")

            # 提取页面标题
            title = soup.select_one('title')
            if title:
                print(f"页面标题: {title.get_text().strip()}")

            # 提取所有链接
            links = soup.select('a[href]')
            print(f"\n找到 {len(links)} 个链接:")

            for i, link in enumerate(links[:5], 1):  # 只显示前5个
                text = link.get_text().strip()
                href = link.get('href')
                print(f"  {i}. {text[:50]}... -> {href}")

            # 提取所有段落文本
            paragraphs = soup.select('p')
            if paragraphs:
                print(f"\n段落内容 (共{len(paragraphs)}个):")
                for i, p in enumerate(paragraphs[:3], 1):  # 只显示前3个
                    text = p.get_text().strip()
                    if text:
                        print(f"  {i}. {text[:100]}...")
        else:
            print(f"请求失败,状态码: {response.status_code}")

    except Exception as e:
        print(f"提取数据时出现错误: {e}")

# 运行演示
if __name__ == "__main__":
    demonstrate_css_selectors()
    extract_data_with_css_selectors("https://httpbin.org/html")

JavaScript和动态内容

现代网页大量使用JavaScript来动态生成内容,这给传统的静态爬虫带来了挑战。动态内容包括:

  1. AJAX加载的数据:通过异步请求获取的内容
  2. JavaScript渲染的页面:完全由JS生成的页面结构
  3. 用户交互触发的内容:点击、滚动等操作后显示的内容
  4. 实时更新的数据:WebSocket或定时刷新的内容

处理动态内容的方法:

方法1:分析AJAX请求

import requests
import json

def analyze_ajax_requests():
    """
    分析和模拟AJAX请求
    """
    print("=== AJAX请求分析 ===")

    # 模拟一个AJAX请求
    ajax_url = "https://httpbin.org/json"

    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'X-Requested-With': 'XMLHttpRequest',  # 标识AJAX请求
        'Accept': 'application/json, text/javascript, */*; q=0.01',
        'Content-Type': 'application/json'
    }

    try:
        response = requests.get(ajax_url, headers=headers)

        if response.status_code == 200:
            data = response.json()
            print(f"AJAX响应数据:")
            print(json.dumps(data, indent=2, ensure_ascii=False))
        else:
            print(f"AJAX请求失败: {response.status_code}")

    except Exception as e:
        print(f"AJAX请求异常: {e}")

# 运行AJAX分析
if __name__ == "__main__":
    analyze_ajax_requests()

方法2:使用Selenium处理JavaScript

# 注意:需要安装selenium和对应的浏览器驱动
# pip install selenium

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options

def handle_dynamic_content_with_selenium():
    """
    使用Selenium处理动态内容
    """
    print("=== Selenium处理动态内容 ===")

    # 配置Chrome选项
    chrome_options = Options()
    chrome_options.add_argument('--headless')  # 无头模式
    chrome_options.add_argument('--no-sandbox')
    chrome_options.add_argument('--disable-dev-shm-usage')

    try:
        # 创建WebDriver实例
        driver = webdriver.Chrome(options=chrome_options)

        # 访问包含动态内容的页面
        driver.get("https://httpbin.org/html")

        # 等待页面加载完成
        wait = WebDriverWait(driver, 10)

        # 获取页面标题
        title = driver.title
        print(f"页面标题: {title}")

        # 查找元素
        h1_element = wait.until(
            EC.presence_of_element_located((By.TAG_NAME, "h1"))
        )
        print(f"H1内容: {h1_element.text}")

        # 获取所有链接
        links = driver.find_elements(By.TAG_NAME, "a")
        print(f"\n找到 {len(links)} 个链接:")

        for i, link in enumerate(links, 1):
            text = link.text.strip()
            href = link.get_attribute('href')
            print(f"  {i}. {text} -> {href}")

        # 执行JavaScript
        js_result = driver.execute_script("return document.title;")
        print(f"\nJavaScript执行结果: {js_result}")

    except Exception as e:
        print(f"Selenium处理异常: {e}")
    finally:
        if 'driver' in locals():
            driver.quit()

# 注意:实际运行需要安装ChromeDriver
# 这里只是演示代码结构

网页编码和字符集

正确处理网页编码是避免乱码问题的关键。常见的编码格式包括:

  • UTF-8:支持全球所有字符的Unicode编码
  • GBK/GB2312:中文编码格式
  • ISO-8859-1:西欧字符编码
  • ASCII:基本英文字符编码
import requests
from bs4 import BeautifulSoup
import chardet

def handle_encoding_issues():
    """
    处理网页编码问题
    """
    print("=== 网页编码处理 ===")

    # 测试不同编码的处理
    test_urls = [
        "https://httpbin.org/encoding/utf8",
        "https://httpbin.org/html",
    ]

    for url in test_urls:
        try:
            print(f"\n处理URL: {url}")

            # 获取原始响应
            response = requests.get(url)

            print(f"响应编码: {response.encoding}")
            print(f"表观编码: {response.apparent_encoding}")

            # 方法1:使用chardet检测编码
            detected_encoding = chardet.detect(response.content)
            print(f"检测到的编码: {detected_encoding}")

            # 方法2:从HTML meta标签获取编码
            soup = BeautifulSoup(response.content, 'html.parser')

            # 查找charset声明
            charset_meta = soup.find('meta', attrs={'charset': True})
            if charset_meta:
                declared_charset = charset_meta.get('charset')
                print(f"声明的编码: {declared_charset}")
            else:
                # 查找http-equiv类型的meta标签
                content_type_meta = soup.find('meta', attrs={'http-equiv': 'Content-Type'})
                if content_type_meta:
                    content = content_type_meta.get('content', '')
                    if 'charset=' in content:
                        declared_charset = content.split('charset=')[1].split(';')[0]
                        print(f"声明的编码: {declared_charset}")

            # 方法3:正确设置编码后重新解析
            if detected_encoding['encoding']:
                response.encoding = detected_encoding['encoding']
                soup = BeautifulSoup(response.text, 'html.parser')

                title = soup.find('title')
                if title:
                    print(f"正确编码后的标题: {title.get_text().strip()}")

        except Exception as e:
            print(f"编码处理异常: {e}")

def create_encoding_safe_crawler():
    """
    创建编码安全的爬虫
    """
    def safe_get_text(url, timeout=10):
        """
        安全获取网页文本内容
        """
        try:
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }

            response = requests.get(url, headers=headers, timeout=timeout)

            # 1. 首先尝试使用响应头中的编码
            if response.encoding != 'ISO-8859-1':  # 避免错误的默认编码
                soup = BeautifulSoup(response.text, 'html.parser')
            else:
                # 2. 使用chardet检测编码
                detected = chardet.detect(response.content)
                if detected['confidence'] > 0.7:  # 置信度阈值
                    response.encoding = detected['encoding']
                    soup = BeautifulSoup(response.text, 'html.parser')
                else:
                    # 3. 尝试常见编码
                    for encoding in ['utf-8', 'gbk', 'gb2312']:
                        try:
                            text = response.content.decode(encoding)
                            soup = BeautifulSoup(text, 'html.parser')
                            break
                        except UnicodeDecodeError:
                            continue
                    else:
                        # 4. 使用错误处理策略
                        soup = BeautifulSoup(response.content, 'html.parser', from_encoding='utf-8')

            return soup

        except Exception as e:
            print(f"获取页面内容失败: {e}")
            return None

    # 测试编码安全爬虫
    test_url = "https://httpbin.org/html"
    soup = safe_get_text(test_url)

    if soup:
        title = soup.find('title')
        print(f"\n编码安全爬虫结果:")
        print(f"标题: {title.get_text().strip() if title else '无标题'}")

        # 提取文本内容
        paragraphs = soup.find_all('p')
        print(f"段落数量: {len(paragraphs)}")

        for i, p in enumerate(paragraphs[:2], 1):
            text = p.get_text().strip()
            print(f"段落{i}: {text[:100]}...")

# 运行编码处理演示
if __name__ == "__main__":
    handle_encoding_issues()
    create_encoding_safe_crawler()

爬虫开发环境

开发工具选择

选择合适的开发工具能够显著提高爬虫开发效率:

IDE和编辑器:
- PyCharm:功能强大的Python IDE,支持调试和代码分析
- VS Code:轻量级编辑器,丰富的插件生态
- Jupyter Notebook:适合数据分析和原型开发
- Sublime Text:快速的文本编辑器

浏览器开发者工具:
- Chrome DevTools:分析网页结构、网络请求、JavaScript执行
- Firefox Developer Tools:类似Chrome,某些功能更强大
- 网络面板:查看HTTP请求和响应
- 元素面板:分析HTML结构和CSS样式

抓包工具:
- Fiddler:Windows平台的HTTP调试代理
- Charles:跨平台的HTTP监控工具
- mitmproxy:基于Python的中间人代理
- Wireshark:网络协议分析器

代理和IP池

使用代理服务器可以隐藏真实IP地址,避免被网站封禁:

import requests
import random
import time
from itertools import cycle

class ProxyManager:
    """
    代理管理器
    """
    def __init__(self):
        # 代理列表(示例,实际使用时需要有效的代理)
        self.proxy_list = [
            {'http': 'http://proxy1:port', 'https': 'https://proxy1:port'},
            {'http': 'http://proxy2:port', 'https': 'https://proxy2:port'},
            {'http': 'http://proxy3:port', 'https': 'https://proxy3:port'},
        ]
        self.proxy_cycle = cycle(self.proxy_list)
        self.failed_proxies = set()

    def get_proxy(self):
        """
        获取可用代理
        """
        for _ in range(len(self.proxy_list)):
            proxy = next(self.proxy_cycle)
            proxy_key = str(proxy)

            if proxy_key not in self.failed_proxies:
                return proxy

        # 如果所有代理都失败,清空失败列表重新开始
        self.failed_proxies.clear()
        return next(self.proxy_cycle)

    def mark_proxy_failed(self, proxy):
        """
        标记代理失败
        """
        self.failed_proxies.add(str(proxy))

    def test_proxy(self, proxy, test_url="https://httpbin.org/ip"):
        """
        测试代理是否可用
        """
        try:
            response = requests.get(
                test_url, 
                proxies=proxy, 
                timeout=10,
                headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
            )

            if response.status_code == 200:
                data = response.json()
                print(f"代理测试成功,IP: {data.get('origin')}")
                return True
            else:
                print(f"代理测试失败,状态码: {response.status_code}")
                return False

        except Exception as e:
            print(f"代理测试异常: {e}")
            return False

def demonstrate_proxy_usage():
    """
    演示代理使用
    """
    print("=== 代理使用演示 ===")

    # 不使用代理的请求
    try:
        response = requests.get("https://httpbin.org/ip", timeout=10)
        if response.status_code == 200:
            data = response.json()
            print(f"直接访问IP: {data.get('origin')}")
    except Exception as e:
        print(f"直接访问失败: {e}")

    # 使用代理的请求(示例)
    proxy_manager = ProxyManager()

    # 注意:以下代码需要有效的代理服务器才能正常工作
    print("\n代理测试(需要有效代理):")
    for i in range(3):
        proxy = proxy_manager.get_proxy()
        print(f"测试代理 {i+1}: {proxy}")

        # 在实际环境中测试代理
        # is_working = proxy_manager.test_proxy(proxy)
        # if not is_working:
        #     proxy_manager.mark_proxy_failed(proxy)

# 免费代理获取示例
def get_free_proxies():
    """
    获取免费代理(示例)
    """
    print("\n=== 免费代理获取 ===")

    # 这里只是演示结构,实际需要从代理网站爬取
    free_proxy_sources = [
        "https://www.proxy-list.download/api/v1/get?type=http",
        "https://api.proxyscrape.com/v2/?request=get&protocol=http",
    ]

    proxies = []

    for source in free_proxy_sources:
        try:
            print(f"从 {source} 获取代理...")
            # 实际实现需要解析不同网站的格式
            # response = requests.get(source, timeout=10)
            # 解析代理列表...
            print("代理获取完成(示例)")

        except Exception as e:
            print(f"获取代理失败: {e}")

    return proxies

# 运行代理演示
if __name__ == "__main__":
    demonstrate_proxy_usage()
    get_free_proxies()

用户代理设置

用户代理(User-Agent)字符串标识客户端应用程序,设置合适的User-Agent可以避免被识别为爬虫:

import requests
import random

class UserAgentManager:
    """
    用户代理管理器
    """
    def __init__(self):
        self.user_agents = [
            # Chrome
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',

            # Firefox
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:121.0) Gecko/20100101 Firefox/121.0',
            'Mozilla/5.0 (X11; Linux x86_64; rv:121.0) Gecko/20100101 Firefox/121.0',

            # Safari
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
            'Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Mobile/15E148 Safari/604.1',

            # Edge
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0',
        ]

    def get_random_user_agent(self):
        """
        获取随机用户代理
        """
        return random.choice(self.user_agents)

    def get_mobile_user_agent(self):
        """
        获取移动端用户代理
        """
        mobile_agents = [
            'Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Mobile/15E148 Safari/604.1',
            'Mozilla/5.0 (Android 14; Mobile; rv:121.0) Gecko/121.0 Firefox/121.0',
            'Mozilla/5.0 (Linux; Android 14; SM-G998B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36',
        ]
        return random.choice(mobile_agents)

def demonstrate_user_agent():
    """
    演示用户代理的使用
    """
    print("=== 用户代理演示 ===")

    ua_manager = UserAgentManager()

    # 测试不同的用户代理
    test_url = "https://httpbin.org/user-agent"

    for i in range(3):
        user_agent = ua_manager.get_random_user_agent()
        headers = {'User-Agent': user_agent}

        try:
            response = requests.get(test_url, headers=headers)
            if response.status_code == 200:
                data = response.json()
                print(f"\n请求 {i+1}:")
                print(f"发送的User-Agent: {user_agent[:50]}...")
                print(f"服务器接收到的: {data.get('user-agent', '')[:50]}...")
        except Exception as e:
            print(f"请求失败: {e}")

    # 测试移动端用户代理
    print("\n=== 移动端用户代理 ===")
    mobile_ua = ua_manager.get_mobile_user_agent()
    headers = {'User-Agent': mobile_ua}

    try:
        response = requests.get(test_url, headers=headers)
        if response.status_code == 200:
            data = response.json()
            print(f"移动端User-Agent: {data.get('user-agent')}")
    except Exception as e:
        print(f"移动端请求失败: {e}")

# 运行用户代理演示
if __name__ == "__main__":
    demonstrate_user_agent()

调试和测试工具

有效的调试和测试工具能够帮助快速定位和解决爬虫开发中的问题:

import requests
import time
import logging
from functools import wraps

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('crawler.log'),
        logging.StreamHandler()
    ]
)

def debug_request(func):
    """
    请求调试装饰器
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()

        try:
            result = func(*args, **kwargs)
            end_time = time.time()

            logging.info(f"{func.__name__} 执行成功,耗时: {end_time - start_time:.3f}秒")
            return result

        except Exception as e:
            end_time = time.time()
            logging.error(f"{func.__name__} 执行失败,耗时: {end_time - start_time:.3f}秒,错误: {e}")
            raise

    return wrapper

class CrawlerDebugger:
    """
    爬虫调试器
    """
    def __init__(self):
        self.request_count = 0
        self.success_count = 0
        self.error_count = 0
        self.start_time = time.time()

    @debug_request
    def debug_get(self, url, **kwargs):
        """
        调试版本的GET请求
        """
        self.request_count += 1

        # 默认headers
        default_headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }

        headers = kwargs.get('headers', {})
        headers.update(default_headers)
        kwargs['headers'] = headers

        logging.info(f"发送GET请求到: {url}")
        logging.debug(f"请求参数: {kwargs}")

        try:
            response = requests.get(url, **kwargs)

            logging.info(f"响应状态码: {response.status_code}")
            logging.info(f"响应时间: {response.elapsed.total_seconds():.3f}秒")
            logging.debug(f"响应头: {dict(response.headers)}")

            if response.status_code == 200:
                self.success_count += 1
            else:
                self.error_count += 1
                logging.warning(f"非200状态码: {response.status_code}")

            return response

        except requests.RequestException as e:
            self.error_count += 1
            logging.error(f"请求异常: {e}")
            raise

    def get_stats(self):
        """
        获取统计信息
        """
        elapsed_time = time.time() - self.start_time

        stats = {
            '总请求数': self.request_count,
            '成功请求数': self.success_count,
            '失败请求数': self.error_count,
            '成功率': f"{(self.success_count / max(self.request_count, 1)) * 100:.2f}%",
            '运行时间': f"{elapsed_time:.2f}秒",
            '平均请求速度': f"{self.request_count / max(elapsed_time, 1):.2f}请求/秒"
        }

        return stats

    def print_stats(self):
        """
        打印统计信息
        """
        stats = self.get_stats()

        print("\n=== 爬虫统计信息 ===")
        for key, value in stats.items():
            print(f"{key}: {value}")

def test_crawler_debugger():
    """
    测试爬虫调试器
    """
    debugger = CrawlerDebugger()

    test_urls = [
        "https://httpbin.org/get",
        "https://httpbin.org/status/200",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/status/404",  # 这个会返回404
        "https://httpbin.org/json",
    ]

    print("开始测试爬虫调试器...")

    for url in test_urls:
        try:
            response = debugger.debug_get(url, timeout=10)
            print(f"✓ {url} - 状态码: {response.status_code}")
        except Exception as e:
            print(f"✗ {url} - 错误: {e}")

        time.sleep(0.5)  # 避免请求过快

    # 打印统计信息
    debugger.print_stats()

# 性能测试工具
def performance_test(func, *args, **kwargs):
    """
    性能测试装饰器
    """
    def test_performance(iterations=10):
        times = []

        for i in range(iterations):
            start_time = time.time()
            try:
                func(*args, **kwargs)
                end_time = time.time()
                times.append(end_time - start_time)
            except Exception as e:
                print(f"第{i+1}次测试失败: {e}")

        if times:
            avg_time = sum(times) / len(times)
            min_time = min(times)
            max_time = max(times)

            print(f"\n=== 性能测试结果 ({iterations}次) ===")
            print(f"平均时间: {avg_time:.3f}秒")
            print(f"最短时间: {min_time:.3f}秒")
            print(f"最长时间: {max_time:.3f}秒")
            print(f"成功率: {len(times)}/{iterations} ({len(times)/iterations*100:.1f}%)")

    return test_performance

# 运行调试演示
if __name__ == "__main__":
    test_crawler_debugger()

    # 性能测试示例
    @performance_test
    def simple_request():
        response = requests.get("https://httpbin.org/get", timeout=5)
        return response.status_code == 200

    print("\n开始性能测试...")
    simple_request(iterations=5)

运行结果示例:

开始测试爬虫调试器...
2024-01-15 14:30:15,123 - INFO - 发送GET请求到: https://httpbin.org/get
2024-01-15 14:30:15,456 - INFO - 响应状态码: 200
2024-01-15 14:30:15,456 - INFO - 响应时间: 0.333秒
2024-01-15 14:30:15,456 - INFO - debug_get 执行成功,耗时: 0.334秒
✓ https://httpbin.org/get - 状态码: 200

2024-01-15 14:30:16,001 - INFO - 发送GET请求到: https://httpbin.org/status/200
2024-01-15 14:30:16,234 - INFO - 响应状态码: 200
2024-01-15 14:30:16,234 - INFO - 响应时间: 0.233秒
2024-01-15 14:30:16,234 - INFO - debug_get 执行成功,耗时: 0.234秒
✓ https://httpbin.org/status/200 - 状态码: 200

=== 爬虫统计信息 ===
总请求数: 5
成功请求数: 4
失败请求数: 1
成功率: 80.00%
运行时间: 3.45秒
平均请求速度: 1.45请求/秒

=== 性能测试结果 (5次) ===
平均时间: 0.456秒
最短时间: 0.234秒
最长时间: 0.678秒
成功率: 5/5 (100.0%)

14.2 Requests库网络请求

Requests是Python中最受欢迎的HTTP库,它让HTTP请求变得简单而优雅。相比于Python标准库中的urllib,Requests提供了更加人性化的API,是网络爬虫开发的首选工具。

Requests基础

安装和基本使用

Requests库的安装非常简单,使用pip命令即可:

pip install requests

安装完成后,我们来看看Requests的基本使用方法:

import requests
import json
from pprint import pprint

def basic_requests_usage():
    """
    演示Requests的基本使用方法
    """
    print("=== Requests基础使用演示 ===")

    # 1. 最简单的GET请求
    print("\n1. 基本GET请求:")
    response = requests.get('https://httpbin.org/get')

    print(f"状态码: {response.status_code}")
    print(f"响应时间: {response.elapsed.total_seconds():.3f}秒")
    print(f"内容类型: {response.headers.get('content-type')}")

    # 2. 检查请求是否成功
    if response.status_code == 200:
        print("请求成功!")
        data = response.json()  # 解析JSON响应
        print(f"服务器接收到的URL: {data['url']}")
    else:
        print(f"请求失败,状态码: {response.status_code}")

    # 3. 使用raise_for_status()检查状态
    try:
        response.raise_for_status()  # 如果状态码不是200会抛出异常
        print("状态检查通过")
    except requests.exceptions.HTTPError as e:
        print(f"HTTP错误: {e}")

    # 4. 获取响应内容的不同方式
    print("\n2. 响应内容获取:")

    # 文本内容
    print(f"响应文本长度: {len(response.text)}字符")

    # 二进制内容
    print(f"响应二进制长度: {len(response.content)}字节")

    # JSON内容(如果是JSON格式)
    try:
        json_data = response.json()
        print(f"JSON数据键: {list(json_data.keys())}")
    except ValueError:
        print("响应不是有效的JSON格式")

    # 5. 响应头信息
    print("\n3. 响应头信息:")
    print(f"服务器: {response.headers.get('server', '未知')}")
    print(f"内容长度: {response.headers.get('content-length', '未知')}")
    print(f"连接类型: {response.headers.get('connection', '未知')}")

# 运行基础演示
if __name__ == "__main__":
    basic_requests_usage()

运行结果:

=== Requests基础使用演示 ===

1. 基本GET请求:
状态码: 200
响应时间: 0.234秒
内容类型: application/json
请求成功!
服务器接收到的URL: https://httpbin.org/get
状态检查通过

2. 响应内容获取:
响应文本长度: 312字符
响应二进制长度: 312字节
JSON数据键: ['args', 'headers', 'origin', 'url']

3. 响应头信息:
服务器: gunicorn/19.9.0
内容长度: 312
连接类型: keep-alive
GET和POST请求

GET和POST是HTTP协议中最常用的两种请求方法。GET用于获取数据,POST用于提交数据。

GET请求详解:

import requests
from urllib.parse import urlencode

def demonstrate_get_requests():
    """
    演示各种GET请求的使用方法
    """
    print("=== GET请求详解 ===")

    # 1. 基本GET请求
    print("\n1. 基本GET请求:")
    response = requests.get('https://httpbin.org/get')
    print(f"请求URL: {response.url}")
    print(f"状态码: {response.status_code}")

    # 2. 带参数的GET请求
    print("\n2. 带参数的GET请求:")

    # 方法1: 使用params参数
    params = {
        'name': '张三',
        'age': 25,
        'city': '北京',
        'hobbies': ['读书', '游泳']  # 列表参数
    }

    response = requests.get('https://httpbin.org/get', params=params)
    print(f"构建的URL: {response.url}")

    data = response.json()
    print(f"服务器接收到的参数: {data['args']}")

    # 方法2: 直接在URL中包含参数
    url_with_params = 'https://httpbin.org/get?name=李四&age=30'
    response2 = requests.get(url_with_params)
    print(f"\n直接URL参数: {response2.json()['args']}")

    # 3. 自定义请求头
    print("\n3. 自定义请求头:")
    headers = {
        'User-Agent': 'MySpider/1.0',
        'Accept': 'application/json',
        'Accept-Language': 'zh-CN,zh;q=0.9',
        'Referer': 'https://www.example.com'
    }

    response = requests.get('https://httpbin.org/get', headers=headers)
    received_headers = response.json()['headers']

    print(f"发送的User-Agent: {headers['User-Agent']}")
    print(f"服务器接收到的User-Agent: {received_headers.get('User-Agent')}")

    # 4. 超时设置
    print("\n4. 超时设置:")
    try:
        # 设置连接超时为3秒,读取超时为5秒
        response = requests.get('https://httpbin.org/delay/2', timeout=(3, 5))
        print(f"请求成功,耗时: {response.elapsed.total_seconds():.3f}秒")
    except requests.exceptions.Timeout:
        print("请求超时")
    except requests.exceptions.RequestException as e:
        print(f"请求异常: {e}")

    # 5. 处理重定向
    print("\n5. 重定向处理:")

    # 允许重定向(默认行为)
    response = requests.get('https://httpbin.org/redirect/2')
    print(f"最终URL: {response.url}")
    print(f"重定向历史: {[r.url for r in response.history]}")

    # 禁止重定向
    response_no_redirect = requests.get('https://httpbin.org/redirect/1', allow_redirects=False)
    print(f"\n禁止重定向状态码: {response_no_redirect.status_code}")
    print(f"Location头: {response_no_redirect.headers.get('Location')}")

# 运行GET请求演示
if __name__ == "__main__":
    demonstrate_get_requests()

POST请求详解:

import requests
import json

def demonstrate_post_requests():
    """
    演示各种POST请求的使用方法
    """
    print("=== POST请求详解 ===")

    # 1. 发送表单数据
    print("\n1. 发送表单数据:")
    form_data = {
        'username': 'testuser',
        'password': 'testpass',
        'email': 'test@example.com',
        'remember': 'on'
    }

    response = requests.post('https://httpbin.org/post', data=form_data)

    if response.status_code == 200:
        result = response.json()
        print(f"发送的表单数据: {form_data}")
        print(f"服务器接收到的表单: {result['form']}")
        print(f"Content-Type: {result['headers'].get('Content-Type')}")

    # 2. 发送JSON数据
    print("\n2. 发送JSON数据:")
    json_data = {
        'name': '王五',
        'age': 28,
        'skills': ['Python', 'JavaScript', 'SQL'],
        'is_active': True,
        'profile': {
            'city': '上海',
            'experience': 5
        }
    }

    # 方法1: 使用json参数(推荐)
    response = requests.post('https://httpbin.org/post', json=json_data)

    if response.status_code == 200:
        result = response.json()
        print(f"发送的JSON数据: {json_data}")
        print(f"服务器接收到的JSON: {result['json']}")
        print(f"Content-Type: {result['headers'].get('Content-Type')}")

    # 方法2: 手动设置headers和data
    headers = {'Content-Type': 'application/json'}
    response2 = requests.post(
        'https://httpbin.org/post', 
        data=json.dumps(json_data), 
        headers=headers
    )
    print(f"\n手动设置方式状态码: {response2.status_code}")

    # 3. 发送文件
    print("\n3. 文件上传:")

    # 创建一个临时文件用于演示
    import tempfile
    import os

    # 创建临时文件
    with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
        f.write("这是一个测试文件\n包含中文内容")
        temp_file_path = f.name

    try:
        # 上传文件
        with open(temp_file_path, 'rb') as f:
            files = {'file': ('test.txt', f, 'text/plain')}
            response = requests.post('https://httpbin.org/post', files=files)

        if response.status_code == 200:
            result = response.json()
            print(f"上传的文件信息: {result['files']}")
            print(f"Content-Type: {result['headers'].get('Content-Type')}")

    finally:
        # 清理临时文件
        os.unlink(temp_file_path)

    # 4. 混合数据提交
    print("\n4. 混合数据提交:")

    # 同时发送表单数据和文件
    form_data = {'description': '文件描述', 'category': 'test'}

    # 创建内存中的文件对象
    from io import StringIO, BytesIO

    file_content = BytesIO(b"Hello, World! This is a test file.")
    files = {'upload': ('hello.txt', file_content, 'text/plain')}

    response = requests.post(
        'https://httpbin.org/post', 
        data=form_data, 
        files=files
    )

    if response.status_code == 200:
        result = response.json()
        print(f"表单数据: {result['form']}")
        print(f"文件数据: {list(result['files'].keys())}")

    # 5. 自定义请求头的POST
    print("\n5. 自定义请求头的POST:")

    headers = {
        'User-Agent': 'MyApp/2.0',
        'Authorization': 'Bearer your-token-here',
        'X-Custom-Header': 'custom-value'
    }

    data = {'message': 'Hello from custom headers'}

    response = requests.post(
        'https://httpbin.org/post', 
        json=data, 
        headers=headers
    )

    if response.status_code == 200:
        result = response.json()
        received_headers = result['headers']
        print(f"自定义头部 X-Custom-Header: {received_headers.get('X-Custom-Header')}")
        print(f"Authorization: {received_headers.get('Authorization')}")

# 运行POST请求演示
if __name__ == "__main__":
    demonstrate_post_requests()

运行结果示例:

=== POST请求详解 ===

1. 发送表单数据:
发送的表单数据: {'username': 'testuser', 'password': 'testpass', 'email': 'test@example.com', 'remember': 'on'}
服务器接收到的表单: {'username': 'testuser', 'password': 'testpass', 'email': 'test@example.com', 'remember': 'on'}
Content-Type: application/x-www-form-urlencoded

2. 发送JSON数据:
发送的JSON数据: {'name': '王五', 'age': 28, 'skills': ['Python', 'JavaScript', 'SQL'], 'is_active': True, 'profile': {'city': '上海', 'experience': 5}}
服务器接收到的JSON: {'name': '王五', 'age': 28, 'skills': ['Python', 'JavaScript', 'SQL'], 'is_active': True, 'profile': {'city': '上海', 'experience': 5}}
Content-Type: application/json

3. 文件上传:
上传的文件信息: {'file': '这是一个测试文件\n包含中文内容'}
Content-Type: multipart/form-data; boundary=...

4. 混合数据提交:
表单数据: {'description': '文件描述', 'category': 'test'}
文件数据: ['upload']

5. 自定义请求头的POST:
自定义头部 X-Custom-Header: custom-value
Authorization: Bearer your-token-here

请求参数和头部

在网络爬虫中,正确设置请求参数和头部信息是非常重要的,它们决定了服务器如何处理我们的请求。

请求参数详解
import requests
from urllib.parse import urlencode, quote

def advanced_parameters_demo():
    """
    演示高级参数处理
    """
    print("=== 高级参数处理演示 ===")

    # 1. 复杂参数结构
    print("\n1. 复杂参数结构:")

    complex_params = {
        'q': 'Python爬虫',  # 中文搜索词
        'page': 1,
        'size': 20,
        'sort': ['time', 'relevance'],  # 多值参数
        'filters': {
            'category': 'tech',
            'date_range': '2024-01-01,2024-12-31'
        },
        'include_fields': ['title', 'content', 'author'],
        'exclude_empty': True
    }

    # Requests会自动处理复杂参数
    response = requests.get('https://httpbin.org/get', params=complex_params)

    print(f"构建的URL: {response.url}")

    result = response.json()
    print(f"\n服务器接收到的参数:")
    for key, value in result['args'].items():
        print(f"  {key}: {value}")

    # 2. 手动URL编码
    print("\n2. 手动URL编码:")

    # 处理特殊字符
    special_params = {
        'query': 'hello world & python',
        'symbols': '!@#$%^&*()+={}[]|\\:;"<>?,./'
    }

    # 方法1: 使用requests自动编码
    response1 = requests.get('https://httpbin.org/get', params=special_params)
    print(f"自动编码URL: {response1.url}")

    # 方法2: 手动编码
    encoded_query = quote('hello world & python')
    manual_url = f'https://httpbin.org/get?query={encoded_query}'
    response2 = requests.get(manual_url)
    print(f"手动编码URL: {response2.url}")

    # 3. 数组参数的不同处理方式
    print("\n3. 数组参数处理:")

    # 方式1: Python列表(默认行为)
    list_params = {'tags': ['python', 'web', 'crawler']}
    response = requests.get('https://httpbin.org/get', params=list_params)
    print(f"列表参数URL: {response.url}")

    # 方式2: 手动构建重复参数
    manual_params = [('tags', 'python'), ('tags', 'web'), ('tags', 'crawler')]
    response2 = requests.get('https://httpbin.org/get', params=manual_params)
    print(f"手动重复参数URL: {response2.url}")

    # 4. 条件参数构建
    print("\n4. 条件参数构建:")

    def build_search_params(keyword, page=1, filters=None, sort_by=None):
        """
        根据条件构建搜索参数
        """
        params = {'q': keyword, 'page': page}

        if filters:
            for key, value in filters.items():
                if value:  # 只添加非空值
                    params[f'filter_{key}'] = value

        if sort_by:
            params['sort'] = sort_by

        return params

    # 使用条件参数构建
    search_filters = {
        'category': 'technology',
        'author': '',  # 空值,不会被添加
        'date': '2024-01-01'
    }

    params = build_search_params(
        keyword='Python教程',
        page=2,
        filters=search_filters,
        sort_by='date_desc'
    )

    response = requests.get('https://httpbin.org/get', params=params)
    print(f"条件构建的参数: {response.json()['args']}")

# 运行参数演示
if __name__ == "__main__":
    advanced_parameters_demo()
请求头部详解
import requests
import time
import random

def advanced_headers_demo():
    """
    演示高级请求头处理
    """
    print("=== 高级请求头演示 ===")

    # 1. 完整的浏览器请求头模拟
    print("\n1. 完整浏览器头部模拟:")

    browser_headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'Accept-Encoding': 'gzip, deflate, br',
        'DNT': '1',  # Do Not Track
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1',
        'Sec-Fetch-Dest': 'document',
        'Sec-Fetch-Mode': 'navigate',
        'Sec-Fetch-Site': 'none',
        'Sec-Fetch-User': '?1',
        'Cache-Control': 'max-age=0'
    }

    response = requests.get('https://httpbin.org/get', headers=browser_headers)
    received_headers = response.json()['headers']

    print(f"发送的User-Agent: {browser_headers['User-Agent'][:50]}...")
    print(f"服务器接收的User-Agent: {received_headers.get('User-Agent', '')[:50]}...")
    print(f"Accept-Language: {received_headers.get('Accept-Language')}")

    # 2. API请求头
    print("\n2. API请求头:")

    api_headers = {
        'Content-Type': 'application/json',
        'Accept': 'application/json',
        'Authorization': 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...',
        'X-API-Key': 'your-api-key-here',
        'X-Client-Version': '1.2.3',
        'X-Request-ID': f'req_{int(time.time())}_{random.randint(1000, 9999)}'
    }

    data = {'query': 'test data'}
    response = requests.post('https://httpbin.org/post', json=data, headers=api_headers)

    if response.status_code == 200:
        result = response.json()
        print(f"API请求成功")
        print(f"Request ID: {result['headers'].get('X-Request-ID')}")
        print(f"Authorization: {result['headers'].get('Authorization', '')[:20]}...")

    # 3. 防爬虫头部设置
    print("\n3. 防爬虫头部设置:")

    # 模拟真实浏览器行为
    anti_bot_headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Referer': 'https://www.google.com/',  # 模拟从搜索引擎来
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
        'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
        'Accept-Encoding': 'gzip, deflate',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1',
        'Pragma': 'no-cache',
        'Cache-Control': 'no-cache'
    }

    response = requests.get('https://httpbin.org/get', headers=anti_bot_headers)
    print(f"防爬虫请求状态: {response.status_code}")
    print(f"Referer头: {response.json()['headers'].get('Referer')}")

    # 4. 动态头部生成
    print("\n4. 动态头部生成:")

    def generate_dynamic_headers():
        """
        生成动态请求头
        """
        user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0'
        ]

        referers = [
            'https://www.google.com/',
            'https://www.bing.com/',
            'https://www.baidu.com/',
            'https://duckduckgo.com/'
        ]

        return {
            'User-Agent': random.choice(user_agents),
            'Referer': random.choice(referers),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': 'keep-alive',
            'X-Forwarded-For': f'{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}'
        }

    # 使用动态头部发送多个请求
    for i in range(3):
        headers = generate_dynamic_headers()
        response = requests.get('https://httpbin.org/get', headers=headers)

        if response.status_code == 200:
            result = response.json()
            print(f"\n请求 {i+1}:")
            print(f"  User-Agent: {result['headers'].get('User-Agent', '')[:40]}...")
            print(f"  Referer: {result['headers'].get('Referer')}")
            print(f"  X-Forwarded-For: {result['headers'].get('X-Forwarded-For')}")

    # 5. 头部优先级和覆盖
    print("\n5. 头部优先级演示:")

    # 创建会话并设置默认头部
    session = requests.Session()
    session.headers.update({
        'User-Agent': 'DefaultAgent/1.0',
        'Accept': 'application/json',
        'X-Default-Header': 'default-value'
    })

    # 请求时覆盖部分头部
    override_headers = {
        'User-Agent': 'OverrideAgent/2.0',  # 覆盖默认值
        'X-Custom-Header': 'custom-value'   # 新增头部
    }

    response = session.get('https://httpbin.org/get', headers=override_headers)

    if response.status_code == 200:
        result = response.json()
        headers = result['headers']
        print(f"最终User-Agent: {headers.get('User-Agent')}")
        print(f"默认Accept: {headers.get('Accept')}")
        print(f"默认头部: {headers.get('X-Default-Header')}")
        print(f"自定义头部: {headers.get('X-Custom-Header')}")

# 运行头部演示
if __name__ == "__main__":
    advanced_headers_demo()

响应对象处理

响应对象包含了服务器返回的所有信息,正确处理响应对象是爬虫开发的关键技能。

import requests
import json
from datetime import datetime

def response_handling_demo():
    """
    演示响应对象的各种处理方法
    """
    print("=== 响应对象处理演示 ===")

    # 发送一个测试请求
    response = requests.get('https://httpbin.org/json')

    # 1. 基本响应信息
    print("\n1. 基本响应信息:")
    print(f"状态码: {response.status_code}")
    print(f"状态描述: {response.reason}")
    print(f"请求URL: {response.url}")
    print(f"响应时间: {response.elapsed.total_seconds():.3f}秒")
    print(f"编码: {response.encoding}")

    # 2. 响应头详细分析
    print("\n2. 响应头分析:")
    print(f"Content-Type: {response.headers.get('content-type')}")
    print(f"Content-Length: {response.headers.get('content-length')}")
    print(f"Server: {response.headers.get('server')}")
    print(f"Date: {response.headers.get('date')}")

    # 检查是否支持压缩
    content_encoding = response.headers.get('content-encoding')
    if content_encoding:
        print(f"内容编码: {content_encoding}")
    else:
        print("未使用内容压缩")

    # 3. 响应内容的不同获取方式
    print("\n3. 响应内容获取:")

    # 文本内容
    text_content = response.text
    print(f"文本内容长度: {len(text_content)}字符")
    print(f"文本内容预览: {text_content[:100]}...")

    # 二进制内容
    binary_content = response.content
    print(f"二进制内容长度: {len(binary_content)}字节")

    # JSON内容
    try:
        json_content = response.json()
        print(f"JSON内容类型: {type(json_content)}")
        if isinstance(json_content, dict):
            print(f"JSON键: {list(json_content.keys())}")
    except ValueError as e:
        print(f"JSON解析失败: {e}")

    # 4. 响应状态检查
    print("\n4. 响应状态检查:")

    def check_response_status(response):
        """
        检查响应状态的详细信息
        """
        print(f"状态码: {response.status_code}")

        # 使用内置方法检查状态
        if response.ok:
            print("✓ 请求成功 (状态码 200-299)")
        else:
            print("✗ 请求失败")

        # 详细状态分类
        if 200 <= response.status_code < 300:
            print("✓ 成功响应")
        elif 300 <= response.status_code < 400:
            print("→ 重定向响应")
            location = response.headers.get('location')
            if location:
                print(f"  重定向到: {location}")
        elif 400 <= response.status_code < 500:
            print("✗ 客户端错误")
        elif 500 <= response.status_code < 600:
            print("✗ 服务器错误")

        # 使用raise_for_status检查
        try:
            response.raise_for_status()
            print("✓ 状态检查通过")
        except requests.exceptions.HTTPError as e:
            print(f"✗ 状态检查失败: {e}")

    check_response_status(response)

    # 5. 测试不同状态码的响应
    print("\n5. 不同状态码测试:")

    test_urls = [
        ('https://httpbin.org/status/200', '成功'),
        ('https://httpbin.org/status/404', '未找到'),
        ('https://httpbin.org/status/500', '服务器错误'),
        ('https://httpbin.org/redirect/1', '重定向')
    ]

    for url, description in test_urls:
        try:
            resp = requests.get(url, timeout=5)
            print(f"\n{description} ({url}):")
            print(f"  状态码: {resp.status_code}")
            print(f"  最终URL: {resp.url}")
            if resp.history:
                print(f"  重定向历史: {[r.status_code for r in resp.history]}")
        except requests.exceptions.RequestException as e:
            print(f"\n{description} 请求失败: {e}")

    # 6. 响应内容类型处理
    print("\n6. 不同内容类型处理:")

    def handle_different_content_types():
        """
        处理不同类型的响应内容
        """
        # JSON响应
        json_resp = requests.get('https://httpbin.org/json')
        if json_resp.headers.get('content-type', '').startswith('application/json'):
            data = json_resp.json()
            print(f"JSON数据: {data}")

        # HTML响应
        html_resp = requests.get('https://httpbin.org/html')
        if 'text/html' in html_resp.headers.get('content-type', ''):
            print(f"HTML内容长度: {len(html_resp.text)}字符")
            # 可以使用BeautifulSoup进一步解析

        # XML响应
        xml_resp = requests.get('https://httpbin.org/xml')
        if 'application/xml' in xml_resp.headers.get('content-type', ''):
            print(f"XML内容长度: {len(xml_resp.text)}字符")

        # 图片响应(二进制)
        try:
            img_resp = requests.get('https://httpbin.org/image/png', timeout=10)
            if img_resp.headers.get('content-type', '').startswith('image/'):
                print(f"图片大小: {len(img_resp.content)}字节")
                print(f"图片类型: {img_resp.headers.get('content-type')}")
        except requests.exceptions.RequestException:
            print("图片请求失败或超时")

    handle_different_content_types()

    # 7. 响应时间和性能分析
    print("\n7. 响应时间分析:")

    def analyze_response_performance(url, num_requests=3):
        """
        分析响应性能
        """
        times = []

        for i in range(num_requests):
            start_time = datetime.now()
            try:
                resp = requests.get(url, timeout=10)
                end_time = datetime.now()

                # 计算总时间
                total_time = (end_time - start_time).total_seconds()
                # 获取requests内部计时
                elapsed_time = resp.elapsed.total_seconds()

                times.append({
                    'total': total_time,
                    'elapsed': elapsed_time,
                    'status': resp.status_code
                })

                print(f"请求 {i+1}: {elapsed_time:.3f}秒 (状态码: {resp.status_code})")

            except requests.exceptions.RequestException as e:
                print(f"请求 {i+1} 失败: {e}")

        if times:
            avg_time = sum(t['elapsed'] for t in times) / len(times)
            min_time = min(t['elapsed'] for t in times)
            max_time = max(t['elapsed'] for t in times)

            print(f"\n性能统计:")
            print(f"  平均响应时间: {avg_time:.3f}秒")
            print(f"  最快响应时间: {min_time:.3f}秒")
            print(f"  最慢响应时间: {max_time:.3f}秒")

    analyze_response_performance('https://httpbin.org/delay/1')

# 运行响应处理演示
if __name__ == "__main__":
    response_handling_demo()

运行结果示例:

=== 响应对象处理演示 ===

1. 基本响应信息:
状态码: 200
状态描述: OK
请求URL: https://httpbin.org/json
响应时间: 0.234秒
编码: utf-8

2. 响应头分析:
Content-Type: application/json
Content-Length: 429
Server: gunicorn/19.9.0
Date: Mon, 15 Jan 2024 06:30:15 GMT
未使用内容压缩

3. 响应内容获取:
文本内容长度: 429字符
文本内容预览: {"slideshow": {"author": "Yours Truly", "date": "date of publication", "slides": [{"title": "Wake up to WonderWidgets!", "type": "all"}, {"title": "Overview", "type": "all", "items": ["Why <em>WonderWidgets</em> are great", "Who <em>buys</em> them"]}], "title": "Sample Slide Show"}}...
二进制内容长度: 429字节
JSON内容类型: <class 'dict'>
JSON键: ['slideshow']

4. 响应状态检查:
状态码: 200
✓ 请求成功 (状态码 200-299)
✓ 成功响应
✓ 状态检查通过

5. 不同状态码测试:

成功 (https://httpbin.org/status/200):
  状态码: 200
  最终URL: https://httpbin.org/status/200

未找到 (https://httpbin.org/status/404):
  状态码: 404
  最终URL: https://httpbin.org/status/404

服务器错误 (https://httpbin.org/status/500):
  状态码: 500
  最终URL: https://httpbin.org/status/500

重定向 (https://httpbin.org/redirect/1):
  状态码: 200
  最终URL: https://httpbin.org/get
  重定向历史: [302]

7. 响应时间分析:
请求 1: 1.234秒 (状态码: 200)
请求 2: 1.156秒 (状态码: 200)
请求 3: 1.298秒 (状态码: 200)

性能统计:
  平均响应时间: 1.229秒
  最快响应时间: 1.156秒
  最慢响应时间: 1.298秒

高级功能

Session会话管理

Session对象允许你跨请求保持某些参数,它会在同一个Session实例发出的所有请求之间保持cookie,使用urllib3的连接池,所以如果你向同一主机发送多个请求,底层的TCP连接将会被重用,从而带来显著的性能提升。

import requests
import time
from datetime import datetime

def session_management_demo():
    """
    演示Session会话管理的各种功能
    """
    print("=== Session会话管理演示 ===")

    # 1. 基本Session使用
    print("\n1. 基本Session使用:")

    # 创建Session对象
    session = requests.Session()

    # 设置Session级别的请求头
    session.headers.update({
        'User-Agent': 'MyApp/1.0',
        'Accept': 'application/json'
    })

    # 使用Session发送请求
    response1 = session.get('https://httpbin.org/get')
    print(f"第一次请求状态码: {response1.status_code}")
    print(f"User-Agent: {response1.json()['headers'].get('User-Agent')}")

    # Session会保持设置的头部
    response2 = session.get('https://httpbin.org/headers')
    print(f"第二次请求User-Agent: {response2.json()['headers'].get('User-Agent')}")

    # 2. Cookie持久化
    print("\n2. Cookie持久化演示:")

    # 创建新的Session
    cookie_session = requests.Session()

    # 第一次请求设置cookie
    response = cookie_session.get('https://httpbin.org/cookies/set/session_id/abc123')
    print(f"设置Cookie后的状态码: {response.status_code}")

    # 查看Session中的cookies
    print(f"Session中的Cookies: {dict(cookie_session.cookies)}")

    # 第二次请求会自动携带cookie
    response = cookie_session.get('https://httpbin.org/cookies')
    cookies_data = response.json()
    print(f"服务器接收到的Cookies: {cookies_data.get('cookies', {})}")

    # 3. 连接池和性能优化
    print("\n3. 连接池性能对比:")

    def test_without_session(num_requests=5):
        """不使用Session的请求"""
        start_time = time.time()
        for i in range(num_requests):
            response = requests.get('https://httpbin.org/get')
            if response.status_code != 200:
                print(f"请求 {i+1} 失败")
        end_time = time.time()
        return end_time - start_time

    def test_with_session(num_requests=5):
        """使用Session的请求"""
        start_time = time.time()
        session = requests.Session()
        for i in range(num_requests):
            response = session.get('https://httpbin.org/get')
            if response.status_code != 200:
                print(f"请求 {i+1} 失败")
        session.close()
        end_time = time.time()
        return end_time - start_time

    print("\n性能测试 (5次请求):")
    time_without_session = test_without_session()
    time_with_session = test_with_session()

    print(f"不使用Session: {time_without_session:.3f}秒")
    print(f"使用Session: {time_with_session:.3f}秒")
    print(f"性能提升: {((time_without_session - time_with_session) / time_without_session * 100):.1f}%")

    # 4. Session配置和自定义
    print("\n4. Session配置:")

    # 创建自定义配置的Session
    custom_session = requests.Session()

    # 设置默认超时
    custom_session.timeout = 10

    # 设置默认参数
    custom_session.params = {'api_key': 'your-api-key'}

    # 设置默认头部
    custom_session.headers.update({
        'User-Agent': 'CustomBot/2.0',
        'Accept-Language': 'zh-CN,zh;q=0.9',
        'Connection': 'keep-alive'
    })

    # 发送请求
    response = custom_session.get('https://httpbin.org/get', params={'extra': 'param'})

    if response.status_code == 200:
        data = response.json()
        print(f"最终URL: {response.url}")
        print(f"合并后的参数: {data.get('args', {})}")
        print(f"请求头: {data.get('headers', {}).get('User-Agent')}")

    # 5. Session的请求钩子
    print("\n5. 请求钩子演示:")

    def log_request_hook(response, *args, **kwargs):
        """请求日志钩子"""
        print(f"[钩子] 请求: {response.request.method} {response.url}")
        print(f"[钩子] 状态码: {response.status_code}")
        print(f"[钩子] 响应时间: {response.elapsed.total_seconds():.3f}秒")

    # 创建带钩子的Session
    hook_session = requests.Session()
    hook_session.hooks['response'].append(log_request_hook)

    # 发送请求,钩子会自动执行
    print("\n发送带钩子的请求:")
    response = hook_session.get('https://httpbin.org/delay/1')

    # 6. Session上下文管理
    print("\n6. Session上下文管理:")

    # 使用with语句自动管理Session生命周期
    with requests.Session() as s:
        s.headers.update({'User-Agent': 'ContextManager/1.0'})

        response = s.get('https://httpbin.org/get')
        print(f"上下文管理器请求状态: {response.status_code}")
        print(f"User-Agent: {response.json()['headers'].get('User-Agent')}")
    # Session会自动关闭

    # 7. Session错误处理
    print("\n7. Session错误处理:")

    error_session = requests.Session()

    # 设置重试适配器
    from requests.adapters import HTTPAdapter
    from urllib3.util.retry import Retry

    retry_strategy = Retry(
        total=3,  # 总重试次数
        backoff_factor=1,  # 重试间隔
        status_forcelist=[429, 500, 502, 503, 504],  # 需要重试的状态码
    )

    adapter = HTTPAdapter(max_retries=retry_strategy)
    error_session.mount("http://", adapter)
    error_session.mount("https://", adapter)

    try:
        # 测试重试机制
        response = error_session.get('https://httpbin.org/status/500', timeout=5)
        print(f"重试后状态码: {response.status_code}")
    except requests.exceptions.RequestException as e:
        print(f"请求最终失败: {e}")

    # 8. Session状态管理
    print("\n8. Session状态管理:")

    state_session = requests.Session()

    # 模拟登录流程
    login_data = {
        'username': 'testuser',
        'password': 'testpass'
    }

    # 第一步:获取登录页面(可能包含CSRF token)
    login_page = state_session.get('https://httpbin.org/get')
    print(f"获取登录页面: {login_page.status_code}")

    # 第二步:提交登录信息
    login_response = state_session.post('https://httpbin.org/post', data=login_data)
    print(f"登录请求: {login_response.status_code}")

    # 第三步:访问需要认证的页面
    protected_response = state_session.get('https://httpbin.org/get')
    print(f"访问受保护页面: {protected_response.status_code}")

    # Session会自动维护整个会话状态
    print(f"会话中的Cookie数量: {len(state_session.cookies)}")

# 运行Session演示
if __name__ == "__main__":
    session_management_demo()

身份验证

Requests支持多种身份验证方式,包括基本认证、摘要认证、OAuth等。

import requests
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
import base64
import hashlib
import time

def authentication_demo():
    """
    演示各种身份验证方式
    """
    print("=== 身份验证演示 ===")

    # 1. HTTP基本认证 (Basic Authentication)
    print("\n1. HTTP基本认证:")

    # 方法1: 使用auth参数
    response = requests.get(
        'https://httpbin.org/basic-auth/user/pass',
        auth=('user', 'pass')
    )
    print(f"基本认证状态码: {response.status_code}")
    if response.status_code == 200:
        print(f"认证成功: {response.json()}")

    # 方法2: 使用HTTPBasicAuth类
    response2 = requests.get(
        'https://httpbin.org/basic-auth/testuser/testpass',
        auth=HTTPBasicAuth('testuser', 'testpass')
    )
    print(f"HTTPBasicAuth状态码: {response2.status_code}")

    # 方法3: 手动设置Authorization头
    credentials = base64.b64encode(b'user:pass').decode('ascii')
    headers = {'Authorization': f'Basic {credentials}'}
    response3 = requests.get(
        'https://httpbin.org/basic-auth/user/pass',
        headers=headers
    )
    print(f"手动设置头部状态码: {response3.status_code}")

    # 2. HTTP摘要认证 (Digest Authentication)
    print("\n2. HTTP摘要认证:")

    try:
        response = requests.get(
            'https://httpbin.org/digest-auth/auth/user/pass',
            auth=HTTPDigestAuth('user', 'pass')
        )
        print(f"摘要认证状态码: {response.status_code}")
        if response.status_code == 200:
            print(f"摘要认证成功: {response.json()}")
    except Exception as e:
        print(f"摘要认证失败: {e}")

    # 3. Bearer Token认证
    print("\n3. Bearer Token认证:")

    # 模拟JWT token
    token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"

    headers = {'Authorization': f'Bearer {token}'}
    response = requests.get('https://httpbin.org/bearer', headers=headers)

    print(f"Bearer Token状态码: {response.status_code}")
    if response.status_code == 200:
        print(f"Token认证成功: {response.json()}")

    # 4. API Key认证
    print("\n4. API Key认证:")

    # 方法1: 在URL参数中
    api_key = "your-api-key-here"
    response = requests.get(
        'https://httpbin.org/get',
        params={'api_key': api_key}
    )
    print(f"URL参数API Key: {response.json()['args']}")

    # 方法2: 在请求头中
    headers = {'X-API-Key': api_key}
    response2 = requests.get('https://httpbin.org/get', headers=headers)
    print(f"请求头API Key: {response2.json()['headers'].get('X-Api-Key')}")

    # 5. 自定义认证类
    print("\n5. 自定义认证类:")

    class CustomAuth(requests.auth.AuthBase):
        """自定义认证类"""

        def __init__(self, api_key, secret_key):
            self.api_key = api_key
            self.secret_key = secret_key

        def __call__(self, r):
            # 生成时间戳
            timestamp = str(int(time.time()))

            # 生成签名
            string_to_sign = f"{r.method}\n{r.url}\n{timestamp}"
            signature = hashlib.sha256(
                (string_to_sign + self.secret_key).encode('utf-8')
            ).hexdigest()

            # 添加认证头
            r.headers['X-API-Key'] = self.api_key
            r.headers['X-Timestamp'] = timestamp
            r.headers['X-Signature'] = signature

            return r

    # 使用自定义认证
    custom_auth = CustomAuth('my-api-key', 'my-secret-key')
    response = requests.get('https://httpbin.org/get', auth=custom_auth)

    if response.status_code == 200:
        headers = response.json()['headers']
        print(f"自定义认证头部:")
        print(f"  X-API-Key: {headers.get('X-Api-Key')}")
        print(f"  X-Timestamp: {headers.get('X-Timestamp')}")
        print(f"  X-Signature: {headers.get('X-Signature', '')[:20]}...")

    # 6. OAuth 2.0 模拟
    print("\n6. OAuth 2.0 模拟:")

    def oauth2_flow_simulation():
        """模拟OAuth 2.0授权流程"""

        # 第一步: 获取授权码 (实际应用中用户会被重定向到授权服务器)
        auth_url = "https://httpbin.org/get"
        auth_params = {
            'response_type': 'code',
            'client_id': 'your-client-id',
            'redirect_uri': 'https://yourapp.com/callback',
            'scope': 'read write',
            'state': 'random-state-string'
        }

        print(f"授权URL: {auth_url}?{'&'.join([f'{k}={v}' for k, v in auth_params.items()])}")

        # 第二步: 使用授权码获取访问令牌
        token_data = {
            'grant_type': 'authorization_code',
            'code': 'received-auth-code',
            'redirect_uri': 'https://yourapp.com/callback',
            'client_id': 'your-client-id',
            'client_secret': 'your-client-secret'
        }

        # 模拟获取token
        token_response = requests.post('https://httpbin.org/post', data=token_data)
        print(f"Token请求状态: {token_response.status_code}")

        # 第三步: 使用访问令牌访问API
        access_token = "mock-access-token-12345"
        api_headers = {'Authorization': f'Bearer {access_token}'}

        api_response = requests.get('https://httpbin.org/get', headers=api_headers)
        print(f"API访问状态: {api_response.status_code}")

        return access_token

    oauth_token = oauth2_flow_simulation()

    # 7. 会话级认证
    print("\n7. 会话级认证:")

    # 创建带认证的Session
    auth_session = requests.Session()
    auth_session.auth = ('session_user', 'session_pass')

    # 所有通过这个Session的请求都会自动包含认证信息
    response1 = auth_session.get('https://httpbin.org/basic-auth/session_user/session_pass')
    print(f"会话认证请求1: {response1.status_code}")

    response2 = auth_session.get('https://httpbin.org/basic-auth/session_user/session_pass')
    print(f"会话认证请求2: {response2.status_code}")

    # 8. 认证错误处理
    print("\n8. 认证错误处理:")

    def handle_auth_errors():
        """处理认证相关错误"""

        # 测试错误的认证信息
        try:
            response = requests.get(
                'https://httpbin.org/basic-auth/user/pass',
                auth=('wrong_user', 'wrong_pass'),
                timeout=5
            )

            if response.status_code == 401:
                print("✗ 认证失败: 用户名或密码错误")
                print(f"  WWW-Authenticate: {response.headers.get('WWW-Authenticate')}")
            elif response.status_code == 403:
                print("✗ 访问被拒绝: 权限不足")
            else:
                print(f"认证状态: {response.status_code}")

        except requests.exceptions.RequestException as e:
            print(f"认证请求异常: {e}")

    handle_auth_errors()

# 运行认证演示
if __name__ == "__main__":
    authentication_demo()

代理设置和SSL配置

在爬虫开发中,代理和SSL配置是非常重要的功能,可以帮助我们绕过网络限制和确保安全通信。

import requests
import ssl
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context

def proxy_and_ssl_demo():
    """
    演示代理设置和SSL配置
    """
    print("=== 代理设置和SSL配置演示 ===")

    # 1. HTTP代理设置
    print("\n1. HTTP代理设置:")

    # 基本代理设置
    proxies = {
        'http': 'http://proxy.example.com:8080',
        'https': 'https://proxy.example.com:8080'
    }

    # 注意:这里使用示例代理,实际运行时需要替换为真实代理
    print(f"配置的代理: {proxies}")

    # 带认证的代理
    auth_proxies = {
        'http': 'http://username:password@proxy.example.com:8080',
        'https': 'https://username:password@proxy.example.com:8080'
    }
    print(f"带认证的代理: {auth_proxies}")

    # 2. SOCKS代理设置
    print("\n2. SOCKS代理设置:")

    # 需要安装: pip install requests[socks]
    socks_proxies = {
        'http': 'socks5://127.0.0.1:1080',
        'https': 'socks5://127.0.0.1:1080'
    }
    print(f"SOCKS代理配置: {socks_proxies}")

    # 3. 代理轮换
    print("\n3. 代理轮换演示:")

    import random

    proxy_list = [
        {'http': 'http://proxy1.example.com:8080', 'https': 'https://proxy1.example.com:8080'},
        {'http': 'http://proxy2.example.com:8080', 'https': 'https://proxy2.example.com:8080'},
        {'http': 'http://proxy3.example.com:8080', 'https': 'https://proxy3.example.com:8080'}
    ]

    def get_random_proxy():
        """获取随机代理"""
        return random.choice(proxy_list)

    # 模拟使用不同代理发送请求
    for i in range(3):
        proxy = get_random_proxy()
        print(f"请求 {i+1} 使用代理: {proxy['http']}")
        # 实际请求代码:
        # response = requests.get('https://httpbin.org/ip', proxies=proxy, timeout=10)

    # 4. 代理验证和测试
    print("\n4. 代理验证:")

    def test_proxy(proxy_dict, test_url='https://httpbin.org/ip'):
        """测试代理是否可用"""
        try:
            response = requests.get(
                test_url,
                proxies=proxy_dict,
                timeout=10
            )

            if response.status_code == 200:
                ip_info = response.json()
                print(f"✓ 代理可用")
                print(f"  出口IP: {ip_info.get('origin')}")
                print(f"  响应时间: {response.elapsed.total_seconds():.3f}秒")
                return True
            else:
                print(f"✗ 代理响应异常: {response.status_code}")
                return False

        except requests.exceptions.ProxyError:
            print("✗ 代理连接失败")
            return False
        except requests.exceptions.Timeout:
            print("✗ 代理连接超时")
            return False
        except requests.exceptions.RequestException as e:
            print(f"✗ 代理请求异常: {e}")
            return False

    # 测试直连(无代理)
    print("\n测试直连:")
    try:
        direct_response = requests.get('https://httpbin.org/ip', timeout=10)
        if direct_response.status_code == 200:
            ip_info = direct_response.json()
            print(f"✓ 直连成功")
            print(f"  本地IP: {ip_info.get('origin')}")
    except Exception as e:
        print(f"✗ 直连失败: {e}")

    # 5. SSL配置
    print("\n5. SSL配置演示:")

    # 禁用SSL验证(不推荐用于生产环境)
    print("\n禁用SSL验证:")
    try:
        response = requests.get(
            'https://httpbin.org/get',
            verify=False  # 禁用SSL证书验证
        )
        print(f"✓ 禁用SSL验证请求成功: {response.status_code}")
    except Exception as e:
        print(f"✗ SSL请求失败: {e}")

    # 自定义CA证书
    print("\n自定义CA证书:")
    # 指定CA证书文件路径
    # response = requests.get('https://httpbin.org/get', verify='/path/to/ca-bundle.crt')
    print("可以通过verify参数指定CA证书文件路径")

    # 客户端证书认证
    print("\n客户端证书认证:")
    # cert参数可以是证书文件路径的字符串,或者是(cert, key)元组
    # response = requests.get('https://httpbin.org/get', cert=('/path/to/client.cert', '/path/to/client.key'))
    print("可以通过cert参数指定客户端证书")

    # 6. 自定义SSL上下文
    print("\n6. 自定义SSL上下文:")

    class SSLAdapter(HTTPAdapter):
        """自定义SSL适配器"""

        def __init__(self, ssl_context=None, **kwargs):
            self.ssl_context = ssl_context
            super().__init__(**kwargs)

        def init_poolmanager(self, *args, **kwargs):
            kwargs['ssl_context'] = self.ssl_context
            return super().init_poolmanager(*args, **kwargs)

    # 创建自定义SSL上下文
    ssl_context = create_urllib3_context()
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE

    # 使用自定义SSL适配器
    session = requests.Session()
    session.mount('https://', SSLAdapter(ssl_context))

    try:
        response = session.get('https://httpbin.org/get')
        print(f"✓ 自定义SSL上下文请求成功: {response.status_code}")
    except Exception as e:
        print(f"✗ 自定义SSL请求失败: {e}")

    # 7. 综合配置示例
    print("\n7. 综合配置示例:")

    def create_secure_session(proxy=None, verify_ssl=True, client_cert=None):
        """创建安全配置的Session"""
        session = requests.Session()

        # 设置代理
        if proxy:
            session.proxies.update(proxy)

        # SSL配置
        session.verify = verify_ssl
        if client_cert:
            session.cert = client_cert

        # 设置超时
        session.timeout = 30

        # 设置重试
        from urllib3.util.retry import Retry
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount('http://', adapter)
        session.mount('https://', adapter)

        return session

    # 创建配置好的Session
    secure_session = create_secure_session(
        # proxy={'http': 'http://proxy.example.com:8080'},
        verify_ssl=True
    )

    try:
        response = secure_session.get('https://httpbin.org/get')
        print(f"✓ 安全Session请求成功: {response.status_code}")
        print(f"  SSL验证: {'启用' if secure_session.verify else '禁用'}")
        print(f"  代理设置: {secure_session.proxies if secure_session.proxies else '无'}")
    except Exception as e:
        print(f"✗ 安全Session请求失败: {e}")

    # 8. 环境变量代理配置
    print("\n8. 环境变量代理配置:")

    import os

    # Requests会自动读取这些环境变量
    env_vars = {
        'HTTP_PROXY': 'http://proxy.example.com:8080',
        'HTTPS_PROXY': 'https://proxy.example.com:8080',
        'NO_PROXY': 'localhost,127.0.0.1,.local'
    }

    print("可以设置的环境变量:")
    for var, value in env_vars.items():
        print(f"  {var}={value}")

    # 检查当前环境变量
    current_proxy = os.environ.get('HTTP_PROXY') or os.environ.get('http_proxy')
    if current_proxy:
        print(f"当前HTTP代理: {current_proxy}")
    else:
        print("未设置HTTP代理环境变量")

# 运行代理和SSL演示
if __name__ == "__main__":
    proxy_and_ssl_demo()

Cookie是Web应用中维护状态的重要机制,Requests提供了强大的Cookie处理功能。

import requests
from http.cookies import SimpleCookie
import time
from datetime import datetime, timedelta

def cookie_handling_demo():
    """
    演示Cookie处理的各种功能
    """
    print("=== Cookie处理演示 ===")

    # 1. 基本Cookie操作
    print("\n1. 基本Cookie操作:")

    # 发送带Cookie的请求
    cookies = {'session_id': 'abc123', 'user_pref': 'dark_mode'}
    response = requests.get('https://httpbin.org/cookies', cookies=cookies)

    if response.status_code == 200:
        received_cookies = response.json().get('cookies', {})
        print(f"发送的Cookies: {cookies}")
        print(f"服务器接收的Cookies: {received_cookies}")

    # 2. 从响应中获取Cookie
    print("\n2. 从响应中获取Cookie:")

    # 请求设置Cookie的URL
    response = requests.get('https://httpbin.org/cookies/set/test_cookie/test_value')

    print(f"响应状态码: {response.status_code}")
    print(f"响应中的Cookies: {dict(response.cookies)}")

    # 查看Cookie详细信息
    for cookie in response.cookies:
        print(f"Cookie详情:")
        print(f"  名称: {cookie.name}")
        print(f"  值: {cookie.value}")
        print(f"  域: {cookie.domain}")
        print(f"  路径: {cookie.path}")
        print(f"  过期时间: {cookie.expires}")
        print(f"  安全标志: {cookie.secure}")
        print(f"  HttpOnly: {cookie.has_nonstandard_attr('HttpOnly')}")

    # 3. Cookie持久化
    print("\n3. Cookie持久化演示:")

    # 创建Session来自动管理Cookie
    session = requests.Session()

    # 第一次请求,服务器设置Cookie
    response1 = session.get('https://httpbin.org/cookies/set/persistent_cookie/persistent_value')
    print(f"第一次请求状态: {response1.status_code}")
    print(f"Session中的Cookies: {dict(session.cookies)}")

    # 第二次请求,自动携带Cookie
    response2 = session.get('https://httpbin.org/cookies')
    if response2.status_code == 200:
        cookies_data = response2.json()
        print(f"第二次请求携带的Cookies: {cookies_data.get('cookies', {})}")

    # 4. 手动Cookie管理
    print("\n4. 手动Cookie管理:")

    from requests.cookies import RequestsCookieJar

    # 创建Cookie容器
    jar = RequestsCookieJar()

    # 添加Cookie
    jar.set('custom_cookie', 'custom_value', domain='httpbin.org', path='/')
    jar.set('another_cookie', 'another_value', domain='httpbin.org', path='/')

    # 使用自定义Cookie容器
    response = requests.get('https://httpbin.org/cookies', cookies=jar)

    if response.status_code == 200:
        print(f"自定义Cookie容器: {dict(jar)}")
        print(f"服务器接收: {response.json().get('cookies', {})}")

    # 5. Cookie的高级属性
    print("\n5. Cookie高级属性演示:")

    def create_advanced_cookie():
        """创建带高级属性的Cookie"""
        jar = RequestsCookieJar()

        # 设置带过期时间的Cookie
        expire_time = int(time.time()) + 3600  # 1小时后过期
        jar.set(
            'session_token', 
            'token_12345',
            domain='httpbin.org',
            path='/',
            expires=expire_time,
            secure=True,  # 只在HTTPS下传输
            rest={'HttpOnly': True}  # 防止JavaScript访问
        )

        # 设置SameSite属性
        jar.set(
            'csrf_token',
            'csrf_abc123',
            domain='httpbin.org',
            path='/',
            rest={'SameSite': 'Strict'}
        )

        return jar

    advanced_jar = create_advanced_cookie()
    print(f"高级Cookie容器: {dict(advanced_jar)}")

    # 6. Cookie文件操作
    print("\n6. Cookie文件操作:")

    import pickle
    import os

    # 保存Cookie到文件
    def save_cookies_to_file(session, filename):
        """保存Session的Cookie到文件"""
        with open(filename, 'wb') as f:
            pickle.dump(session.cookies, f)
        print(f"Cookies已保存到: {filename}")

    # 从文件加载Cookie
    def load_cookies_from_file(session, filename):
        """从文件加载Cookie到Session"""
        if os.path.exists(filename):
            with open(filename, 'rb') as f:
                session.cookies.update(pickle.load(f))
            print(f"Cookies已从文件加载: {filename}")
            return True
        return False

    # 演示Cookie文件操作
    cookie_session = requests.Session()

    # 设置一些Cookie
    cookie_session.get('https://httpbin.org/cookies/set/file_cookie/file_value')

    # 保存到文件
    cookie_file = 'session_cookies.pkl'
    save_cookies_to_file(cookie_session, cookie_file)

    # 创建新Session并加载Cookie
    new_session = requests.Session()
    if load_cookies_from_file(new_session, cookie_file):
        response = new_session.get('https://httpbin.org/cookies')
        if response.status_code == 200:
            print(f"加载的Cookies验证: {response.json().get('cookies', {})}")

    # 清理文件
    if os.path.exists(cookie_file):
        os.remove(cookie_file)
        print(f"已清理Cookie文件: {cookie_file}")

    # 7. Cookie域和路径管理
    print("\n7. Cookie域和路径管理:")

    def demonstrate_cookie_scope():
        """演示Cookie的作用域"""
        jar = RequestsCookieJar()

        # 设置不同域和路径的Cookie
        jar.set('global_cookie', 'global_value', domain='.example.com', path='/')
        jar.set('api_cookie', 'api_value', domain='api.example.com', path='/v1/')
        jar.set('admin_cookie', 'admin_value', domain='admin.example.com', path='/admin/')

        print("Cookie作用域演示:")
        for cookie in jar:
            print(f"  {cookie.name}: 域={cookie.domain}, 路径={cookie.path}")

        return jar

    scope_jar = demonstrate_cookie_scope()

    # 8. Cookie安全性
    print("\n8. Cookie安全性演示:")

    def create_secure_cookies():
        """创建安全的Cookie设置"""
        jar = RequestsCookieJar()

        # 安全Cookie设置
        security_settings = {
            'session_id': {
                'value': 'secure_session_123',
                'secure': True,  # 只在HTTPS传输
                'httponly': True,  # 防止XSS攻击
                'samesite': 'Strict',  # 防止CSRF攻击
                'expires': int(time.time()) + 1800  # 30分钟过期
            },
            'csrf_token': {
                'value': 'csrf_token_456',
                'secure': True,
                'samesite': 'Strict',
                'expires': int(time.time()) + 3600  # 1小时过期
            }
        }

        for name, settings in security_settings.items():
            jar.set(
                name,
                settings['value'],
                domain='httpbin.org',
                path='/',
                expires=settings.get('expires'),
                secure=settings.get('secure', False),
                rest={
                    'HttpOnly': settings.get('httponly', False),
                    'SameSite': settings.get('samesite', 'Lax')
                }
            )

        print("安全Cookie配置:")
        for cookie in jar:
            print(f"  {cookie.name}: 安全={cookie.secure}")

        return jar

    secure_jar = create_secure_cookies()

    # 9. Cookie调试和分析
    print("\n9. Cookie调试和分析:")

    def analyze_cookies(response):
        """分析响应中的Cookie"""
        print("Cookie分析报告:")

        if not response.cookies:
            print("  无Cookie")
            return

        for cookie in response.cookies:
            print(f"\n  Cookie: {cookie.name}")
            print(f"    值: {cookie.value}")
            print(f"    域: {cookie.domain or '未设置'}")
            print(f"    路径: {cookie.path or '/'}")

            if cookie.expires:
                expire_date = datetime.fromtimestamp(cookie.expires)
                print(f"    过期时间: {expire_date}")

                # 检查是否即将过期
                if expire_date < datetime.now() + timedelta(hours=1):
                    print(f"    ⚠️  警告: Cookie将在1小时内过期")
            else:
                print(f"    过期时间: 会话结束")

            print(f"    安全标志: {cookie.secure}")
            print(f"    大小: {len(cookie.value)}字节")

            # 检查Cookie大小
            if len(cookie.value) > 4000:
                print(f"    ⚠️  警告: Cookie过大,可能被截断")

    # 分析一个带Cookie的响应
    test_response = requests.get('https://httpbin.org/cookies/set/analysis_cookie/test_analysis_value')
    analyze_cookies(test_response)

    # 10. Cookie错误处理
    print("\n10. Cookie错误处理:")

    def handle_cookie_errors():
        """处理Cookie相关错误"""
        try:
            # 尝试设置无效的Cookie
            jar = RequestsCookieJar()

            # 测试各种边界情况
            test_cases = [
                ('valid_cookie', 'valid_value'),
                ('', 'empty_name'),  # 空名称
                ('space cookie', 'space_in_name'),  # 名称包含空格
                ('valid_name', ''),  # 空值
                ('long_cookie', 'x' * 5000),  # 超长值
            ]

            for name, value in test_cases:
                try:
                    jar.set(name, value, domain='httpbin.org')
                    print(f"✓ 成功设置Cookie: {name[:20]}...")
                except Exception as e:
                    print(f"✗ 设置Cookie失败 ({name[:20]}...): {e}")

            # 测试Cookie发送
            response = requests.get('https://httpbin.org/cookies', cookies=jar, timeout=5)
            print(f"Cookie发送测试: {response.status_code}")

        except requests.exceptions.RequestException as e:
            print(f"Cookie请求异常: {e}")
        except Exception as e:
            print(f"Cookie处理异常: {e}")

    handle_cookie_errors()

# 运行Cookie演示
if __name__ == "__main__":
    cookie_handling_demo()

文件上传和下载

文件传输是网络爬虫和自动化中的重要功能,Requests提供了简单而强大的文件处理能力。

import requests
import os
import io
from pathlib import Path
import mimetypes
import hashlib
from tqdm import tqdm

def file_transfer_demo():
    """
    演示文件上传和下载功能
    """
    print("=== 文件上传和下载演示 ===")

    # 1. 基本文件上传
    print("\n1. 基本文件上传:")

    # 创建测试文件
    test_file_content = "这是一个测试文件\nTest file content\n测试数据123"
    test_file_path = "test_upload.txt"

    with open(test_file_path, 'w', encoding='utf-8') as f:
        f.write(test_file_content)

    # 方法1: 使用files参数上传
    with open(test_file_path, 'rb') as f:
        files = {'file': f}
        response = requests.post('https://httpbin.org/post', files=files)

    if response.status_code == 200:
        result = response.json()
        print(f"文件上传成功")
        print(f"上传的文件信息: {result.get('files', {})}")

    # 2. 高级文件上传
    print("\n2. 高级文件上传:")

    # 指定文件名和MIME类型
    with open(test_file_path, 'rb') as f:
        files = {
            'document': ('custom_name.txt', f, 'text/plain'),
            'metadata': ('info.json', io.StringIO('{"type": "document"}'), 'application/json')
        }

        # 同时发送表单数据
        data = {
            'title': '测试文档',
            'description': '这是一个测试上传',
            'category': 'test'
        }

        response = requests.post('https://httpbin.org/post', files=files, data=data)

    if response.status_code == 200:
        result = response.json()
        print(f"高级上传成功")
        print(f"表单数据: {result.get('form', {})}")
        print(f"文件数据: {list(result.get('files', {}).keys())}")

    # 3. 多文件上传
    print("\n3. 多文件上传:")

    # 创建多个测试文件
    test_files = []
    for i in range(3):
        filename = f"test_file_{i+1}.txt"
        content = f"这是测试文件 {i+1}\nFile {i+1} content\n"

        with open(filename, 'w', encoding='utf-8') as f:
            f.write(content)
        test_files.append(filename)

    # 上传多个文件
    files = []
    for filename in test_files:
        files.append(('files', (filename, open(filename, 'rb'), 'text/plain')))

    try:
        response = requests.post('https://httpbin.org/post', files=files)

        if response.status_code == 200:
            result = response.json()
            print(f"多文件上传成功")
            print(f"上传文件数量: {len(result.get('files', {}))}")
    finally:
        # 关闭文件句柄
        for _, (_, file_obj, _) in files:
            file_obj.close()

    # 4. 内存文件上传
    print("\n4. 内存文件上传:")

    # 创建内存中的文件
    memory_file = io.BytesIO()
    memory_file.write("内存中的文件内容\nMemory file content".encode('utf-8'))
    memory_file.seek(0)  # 重置指针到开始

    files = {'memory_file': ('memory.txt', memory_file, 'text/plain')}
    response = requests.post('https://httpbin.org/post', files=files)

    if response.status_code == 200:
        print(f"内存文件上传成功")

    memory_file.close()

    # 5. 文件下载基础
    print("\n5. 文件下载基础:")

    # 下载小文件
    download_url = 'https://httpbin.org/json'
    response = requests.get(download_url)

    if response.status_code == 200:
        # 保存到文件
        download_filename = 'downloaded_data.json'
        with open(download_filename, 'wb') as f:
            f.write(response.content)

        print(f"文件下载成功: {download_filename}")
        print(f"文件大小: {len(response.content)}字节")
        print(f"Content-Type: {response.headers.get('content-type')}")

    # 6. 大文件下载(流式下载)
    print("\n6. 大文件流式下载:")

    def download_large_file(url, filename, chunk_size=8192):
        """流式下载大文件"""
        try:
            with requests.get(url, stream=True) as response:
                response.raise_for_status()

                # 获取文件大小
                total_size = int(response.headers.get('content-length', 0))

                with open(filename, 'wb') as f:
                    if total_size > 0:
                        # 使用进度条
                        with tqdm(total=total_size, unit='B', unit_scale=True, desc=filename) as pbar:
                            for chunk in response.iter_content(chunk_size=chunk_size):
                                if chunk:
                                    f.write(chunk)
                                    pbar.update(len(chunk))
                    else:
                        # 无法获取文件大小时
                        downloaded = 0
                        for chunk in response.iter_content(chunk_size=chunk_size):
                            if chunk:
                                f.write(chunk)
                                downloaded += len(chunk)
                                print(f"\r已下载: {downloaded}字节", end='', flush=True)
                        print()  # 换行

                print(f"\n✓ 文件下载完成: {filename}")
                return True

        except requests.exceptions.RequestException as e:
            print(f"✗ 下载失败: {e}")
            return False

    # 演示流式下载(使用较小的文件作为示例)
    large_file_url = 'https://httpbin.org/bytes/10240'  # 10KB测试文件
    if download_large_file(large_file_url, 'large_download.bin'):
        file_size = os.path.getsize('large_download.bin')
        print(f"下载文件大小: {file_size}字节")

    # 7. 断点续传下载
    print("\n7. 断点续传下载:")

    def resume_download(url, filename, chunk_size=8192):
        """支持断点续传的下载"""
        # 检查本地文件是否存在
        resume_pos = 0
        if os.path.exists(filename):
            resume_pos = os.path.getsize(filename)
            print(f"发现本地文件,从位置 {resume_pos} 继续下载")

        # 设置Range头进行断点续传
        headers = {'Range': f'bytes={resume_pos}-'} if resume_pos > 0 else {}

        try:
            response = requests.get(url, headers=headers, stream=True)

            # 检查服务器是否支持断点续传
            if resume_pos > 0 and response.status_code != 206:
                print("服务器不支持断点续传,重新下载")
                resume_pos = 0
                response = requests.get(url, stream=True)

            response.raise_for_status()

            # 获取总文件大小
            if 'content-range' in response.headers:
                total_size = int(response.headers['content-range'].split('/')[-1])
            else:
                total_size = int(response.headers.get('content-length', 0)) + resume_pos

            # 打开文件(追加模式如果是续传)
            mode = 'ab' if resume_pos > 0 else 'wb'
            with open(filename, mode) as f:
                downloaded = resume_pos

                for chunk in response.iter_content(chunk_size=chunk_size):
                    if chunk:
                        f.write(chunk)
                        downloaded += len(chunk)

                        if total_size > 0:
                            progress = (downloaded / total_size) * 100
                            print(f"\r下载进度: {progress:.1f}% ({downloaded}/{total_size})", end='', flush=True)

                print(f"\n✓ 下载完成: {filename}")
                return True

        except requests.exceptions.RequestException as e:
            print(f"✗ 下载失败: {e}")
            return False

    # 演示断点续传(模拟)
    resume_url = 'https://httpbin.org/bytes/5120'  # 5KB测试文件
    resume_filename = 'resume_download.bin'

    # 先下载一部分(模拟中断)
    try:
        response = requests.get(resume_url, stream=True)
        with open(resume_filename, 'wb') as f:
            for i, chunk in enumerate(response.iter_content(chunk_size=1024)):
                if i >= 2:  # 只下载前2KB
                    break
                f.write(chunk)
        print(f"模拟下载中断,已下载: {os.path.getsize(resume_filename)}字节")
    except:
        pass

    # 继续下载
    resume_download(resume_url, resume_filename)

    # 8. 文件完整性验证
    print("\n8. 文件完整性验证:")

    def verify_file_integrity(filename, expected_hash=None, hash_algorithm='md5'):
        """验证文件完整性"""
        if not os.path.exists(filename):
            print(f"✗ 文件不存在: {filename}")
            return False

        # 计算文件哈希
        hash_obj = hashlib.new(hash_algorithm)
        with open(filename, 'rb') as f:
            for chunk in iter(lambda: f.read(8192), b''):
                hash_obj.update(chunk)

        file_hash = hash_obj.hexdigest()
        print(f"文件 {filename}{hash_algorithm.upper()}哈希: {file_hash}")

        if expected_hash:
            if file_hash == expected_hash:
                print(f"✓ 文件完整性验证通过")
                return True
            else:
                print(f"✗ 文件完整性验证失败")
                print(f"  期望: {expected_hash}")
                print(f"  实际: {file_hash}")
                return False

        return True

    # 验证下载的文件
    for filename in ['downloaded_data.json', 'large_download.bin']:
        if os.path.exists(filename):
            verify_file_integrity(filename)

    # 9. 自动MIME类型检测
    print("\n9. 自动MIME类型检测:")

    def upload_with_auto_mime(filename):
        """自动检测MIME类型并上传"""
        if not os.path.exists(filename):
            print(f"文件不存在: {filename}")
            return

        # 自动检测MIME类型
        mime_type, _ = mimetypes.guess_type(filename)
        if mime_type is None:
            mime_type = 'application/octet-stream'  # 默认二进制类型

        print(f"文件: {filename}")
        print(f"检测到的MIME类型: {mime_type}")

        with open(filename, 'rb') as f:
            files = {'file': (filename, f, mime_type)}
            response = requests.post('https://httpbin.org/post', files=files)

            if response.status_code == 200:
                print(f"✓ 上传成功")
            else:
                print(f"✗ 上传失败: {response.status_code}")

    # 测试不同类型的文件
    test_files_mime = ['test_upload.txt', 'downloaded_data.json']
    for filename in test_files_mime:
        if os.path.exists(filename):
            upload_with_auto_mime(filename)

    # 10. 清理测试文件
    print("\n10. 清理测试文件:")

    cleanup_files = [
        test_file_path, 'downloaded_data.json', 'large_download.bin',
        'resume_download.bin'
    ] + test_files

    for filename in cleanup_files:
        if os.path.exists(filename):
            try:
                os.remove(filename)
                print(f"✓ 已删除: {filename}")
            except Exception as e:
                print(f"✗ 删除失败 {filename}: {e}")

# 运行文件传输演示
if __name__ == "__main__":
    file_transfer_demo()

超时和重试机制

在网络请求中,超时和重试机制是确保程序稳定性的重要功能。

import requests
import time
import random
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from functools import wraps
import logging

def timeout_and_retry_demo():
    """
    演示超时和重试机制
    """
    print("=== 超时和重试机制演示 ===")

    # 1. 基本超时设置
    print("\n1. 基本超时设置:")

    # 连接超时和读取超时
    try:
        # timeout=(连接超时, 读取超时)
        response = requests.get('https://httpbin.org/delay/2', timeout=(5, 10))
        print(f"请求成功: {response.status_code}")
        print(f"响应时间: {response.elapsed.total_seconds():.2f}秒")
    except requests.exceptions.Timeout as e:
        print(f"请求超时: {e}")
    except requests.exceptions.RequestException as e:
        print(f"请求异常: {e}")

    # 2. 不同类型的超时
    print("\n2. 不同类型的超时演示:")

    def test_different_timeouts():
        """测试不同的超时设置"""
        timeout_configs = [
            ("单一超时", 5),  # 连接和读取都是5秒
            ("分别设置", (3, 10)),  # 连接3秒,读取10秒
            ("只设置连接超时", (2, None)),  # 只设置连接超时
        ]

        for desc, timeout in timeout_configs:
            try:
                print(f"\n测试 {desc}: {timeout}")
                start_time = time.time()
                response = requests.get('https://httpbin.org/delay/1', timeout=timeout)
                elapsed = time.time() - start_time
                print(f"  ✓ 成功: {response.status_code}, 耗时: {elapsed:.2f}秒")
            except requests.exceptions.Timeout as e:
                elapsed = time.time() - start_time
                print(f"  ✗ 超时: {elapsed:.2f}秒, {e}")
            except Exception as e:
                print(f"  ✗ 异常: {e}")

    test_different_timeouts()

    # 3. 手动重试机制
    print("\n3. 手动重试机制:")

    def manual_retry(url, max_retries=3, delay=1, backoff=2):
        """手动实现重试机制"""
        for attempt in range(max_retries + 1):
            try:
                print(f"  尝试 {attempt + 1}/{max_retries + 1}: {url}")
                response = requests.get(url, timeout=5)

                # 检查响应状态
                if response.status_code == 200:
                    print(f"  ✓ 成功: {response.status_code}")
                    return response
                elif response.status_code >= 500:
                    # 服务器错误,可以重试
                    print(f"  服务器错误 {response.status_code},准备重试")
                    raise requests.exceptions.RequestException(f"Server error: {response.status_code}")
                else:
                    # 客户端错误,不重试
                    print(f"  客户端错误 {response.status_code},不重试")
                    return response

            except (requests.exceptions.Timeout, 
                   requests.exceptions.ConnectionError,
                   requests.exceptions.RequestException) as e:
                print(f"  ✗ 请求失败: {e}")

                if attempt < max_retries:
                    wait_time = delay * (backoff ** attempt)
                    print(f"  等待 {wait_time:.1f}秒 后重试...")
                    time.sleep(wait_time)
                else:
                    print(f"  已达到最大重试次数,放弃")
                    raise

        return None

    # 测试手动重试
    try:
        response = manual_retry('https://httpbin.org/status/500', max_retries=2)
    except Exception as e:
        print(f"手动重试最终失败: {e}")

    # 4. 使用urllib3的重试策略
    print("\n4. urllib3重试策略:")

    def create_retry_session():
        """创建带重试策略的Session"""
        session = requests.Session()

        # 定义重试策略
        retry_strategy = Retry(
            total=3,  # 总重试次数
            status_forcelist=[429, 500, 502, 503, 504],  # 需要重试的状态码
            method_whitelist=["HEAD", "GET", "OPTIONS"],  # 允许重试的方法
            backoff_factor=1,  # 退避因子
            raise_on_redirect=False,
            raise_on_status=False
        )

        # 创建适配器
        adapter = HTTPAdapter(max_retries=retry_strategy)

        # 挂载适配器
        session.mount("http://", adapter)
        session.mount("https://", adapter)

        return session

    # 使用重试Session
    retry_session = create_retry_session()

    try:
        print("使用重试Session请求:")
        response = retry_session.get('https://httpbin.org/status/503', timeout=10)
        print(f"最终响应: {response.status_code}")
    except Exception as e:
        print(f"重试Session失败: {e}")

    # 5. 高级重试配置
    print("\n5. 高级重试配置:")

    def create_advanced_retry_session():
        """创建高级重试配置的Session"""
        session = requests.Session()

        # 高级重试策略
        retry_strategy = Retry(
            total=5,  # 总重试次数
            read=3,   # 读取重试次数
            connect=3,  # 连接重试次数
            status=3,   # 状态码重试次数
            status_forcelist=[408, 429, 500, 502, 503, 504, 520, 522, 524],
            method_whitelist=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"],
            backoff_factor=0.3,  # 退避因子:{backoff factor} * (2 ** ({number of total retries} - 1))
            raise_on_redirect=False,
            raise_on_status=False,
            respect_retry_after_header=True  # 尊重服务器的Retry-After头
        )

        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("http://", adapter)
        session.mount("https://", adapter)

        return session

    advanced_session = create_advanced_retry_session()

    # 测试高级重试
    test_urls = [
        ('正常请求', 'https://httpbin.org/get'),
        ('服务器错误', 'https://httpbin.org/status/500'),
        ('超时请求', 'https://httpbin.org/delay/3')
    ]

    for desc, url in test_urls:
        try:
            print(f"\n测试 {desc}:")
            start_time = time.time()
            response = advanced_session.get(url, timeout=(5, 10))
            elapsed = time.time() - start_time
            print(f"  ✓ 响应: {response.status_code}, 耗时: {elapsed:.2f}秒")
        except Exception as e:
            elapsed = time.time() - start_time
            print(f"  ✗ 失败: {e}, 耗时: {elapsed:.2f}秒")

    # 6. 装饰器重试
    print("\n6. 装饰器重试:")

    def retry_decorator(max_retries=3, delay=1, backoff=2, exceptions=(Exception,)):
        """重试装饰器"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                for attempt in range(max_retries + 1):
                    try:
                        return func(*args, **kwargs)
                    except exceptions as e:
                        if attempt == max_retries:
                            print(f"装饰器重试失败,已达最大次数: {e}")
                            raise

                        wait_time = delay * (backoff ** attempt)
                        print(f"装饰器重试 {attempt + 1}/{max_retries + 1} 失败: {e}")
                        print(f"等待 {wait_time:.1f}秒 后重试...")
                        time.sleep(wait_time)

            return wrapper
        return decorator

    @retry_decorator(max_retries=2, delay=0.5, exceptions=(requests.exceptions.RequestException,))
    def unreliable_request(url):
        """不稳定的请求函数"""
        # 模拟随机失败
        if random.random() < 0.7:  # 70%概率失败
            raise requests.exceptions.ConnectionError("模拟连接失败")

        response = requests.get(url, timeout=5)
        return response

    # 测试装饰器重试
    try:
        print("测试装饰器重试:")
        response = unreliable_request('https://httpbin.org/get')
        print(f"装饰器重试成功: {response.status_code}")
    except Exception as e:
        print(f"装饰器重试最终失败: {e}")

    # 7. 智能重试策略
    print("\n7. 智能重试策略:")

    class SmartRetry:
        """智能重试类"""

        def __init__(self, max_retries=3, base_delay=1, max_delay=60):
            self.max_retries = max_retries
            self.base_delay = base_delay
            self.max_delay = max_delay
            self.attempt_count = 0

        def should_retry(self, exception, response=None):
            """判断是否应该重试"""
            # 网络相关异常应该重试
            if isinstance(exception, (requests.exceptions.Timeout,
                                    requests.exceptions.ConnectionError)):
                return True

            # 特定状态码应该重试
            if response and response.status_code in [429, 500, 502, 503, 504]:
                return True

            return False

        def get_delay(self):
            """计算延迟时间"""
            # 指数退避 + 随机抖动
            delay = min(self.base_delay * (2 ** self.attempt_count), self.max_delay)
            jitter = random.uniform(0, 0.1) * delay  # 10%的随机抖动
            return delay + jitter

        def execute(self, func, *args, **kwargs):
            """执行带重试的函数"""
            last_exception = None

            for attempt in range(self.max_retries + 1):
                self.attempt_count = attempt

                try:
                    result = func(*args, **kwargs)

                    # 如果是Response对象,检查状态码
                    if hasattr(result, 'status_code'):
                        if self.should_retry(None, result) and attempt < self.max_retries:
                            print(f"智能重试: 状态码 {result.status_code},尝试 {attempt + 1}")
                            time.sleep(self.get_delay())
                            continue

                    print(f"智能重试成功,尝试次数: {attempt + 1}")
                    return result

                except Exception as e:
                    last_exception = e

                    if self.should_retry(e) and attempt < self.max_retries:
                        delay = self.get_delay()
                        print(f"智能重试: {e},等待 {delay:.2f}秒,尝试 {attempt + 1}")
                        time.sleep(delay)
                    else:
                        break

            print(f"智能重试失败,已达最大次数")
            raise last_exception

    # 测试智能重试
    smart_retry = SmartRetry(max_retries=3, base_delay=0.5)

    def test_request():
        # 模拟不稳定的请求
        if random.random() < 0.6:
            raise requests.exceptions.ConnectionError("模拟网络错误")
        return requests.get('https://httpbin.org/get', timeout=5)

    try:
        response = smart_retry.execute(test_request)
        print(f"智能重试最终成功: {response.status_code}")
    except Exception as e:
        print(f"智能重试最终失败: {e}")

    # 8. 重试监控和日志
    print("\n8. 重试监控和日志:")

    # 配置日志
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    logger = logging.getLogger(__name__)

    class MonitoredRetry:
        """带监控的重试类"""

        def __init__(self, max_retries=3):
            self.max_retries = max_retries
            self.stats = {
                'total_attempts': 0,
                'successful_attempts': 0,
                'failed_attempts': 0,
                'retry_reasons': {}
            }

        def request_with_monitoring(self, url, **kwargs):
            """带监控的请求"""
            for attempt in range(self.max_retries + 1):
                self.stats['total_attempts'] += 1

                try:
                    logger.info(f"尝试请求 {url},第 {attempt + 1} 次")
                    response = requests.get(url, **kwargs)

                    if response.status_code == 200:
                        self.stats['successful_attempts'] += 1
                        logger.info(f"请求成功: {response.status_code}")
                        return response
                    else:
                        reason = f"status_{response.status_code}"
                        self.stats['retry_reasons'][reason] = self.stats['retry_reasons'].get(reason, 0) + 1

                        if attempt < self.max_retries:
                            logger.warning(f"请求失败: {response.status_code},准备重试")
                            time.sleep(1)
                        else:
                            logger.error(f"请求最终失败: {response.status_code}")
                            return response

                except Exception as e:
                    reason = type(e).__name__
                    self.stats['retry_reasons'][reason] = self.stats['retry_reasons'].get(reason, 0) + 1

                    if attempt < self.max_retries:
                        logger.warning(f"请求异常: {e},准备重试")
                        time.sleep(1)
                    else:
                        self.stats['failed_attempts'] += 1
                        logger.error(f"请求最终异常: {e}")
                        raise

        def get_stats(self):
            """获取统计信息"""
            return self.stats

    # 测试监控重试
    monitored_retry = MonitoredRetry(max_retries=2)

    test_urls_monitor = [
        'https://httpbin.org/get',
        'https://httpbin.org/status/500',
        'https://httpbin.org/delay/1'
    ]

    for url in test_urls_monitor:
        try:
            response = monitored_retry.request_with_monitoring(url, timeout=3)
            print(f"监控请求结果: {response.status_code if response else 'None'}")
        except Exception as e:
            print(f"监控请求异常: {e}")

    # 显示统计信息
    stats = monitored_retry.get_stats()
    print(f"\n重试统计信息:")
    print(f"  总尝试次数: {stats['total_attempts']}")
    print(f"  成功次数: {stats['successful_attempts']}")
    print(f"  失败次数: {stats['failed_attempts']}")
    print(f"  重试原因: {stats['retry_reasons']}")

    # 9. 超时和重试的最佳实践
    print("\n9. 超时和重试最佳实践:")

    def best_practice_request(url, max_retries=3, timeout=(5, 30)):
        """最佳实践的请求函数"""
        session = requests.Session()

        # 配置重试策略
        retry_strategy = Retry(
            total=max_retries,
            status_forcelist=[429, 500, 502, 503, 504],
            method_whitelist=["HEAD", "GET", "OPTIONS"],
            backoff_factor=1,
            respect_retry_after_header=True
        )

        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("http://", adapter)
        session.mount("https://", adapter)

        # 设置默认超时
        session.timeout = timeout

        try:
            response = session.get(url)
            response.raise_for_status()  # 抛出HTTP错误
            return response
        except requests.exceptions.Timeout:
            print(f"请求超时: {url}")
            raise
        except requests.exceptions.ConnectionError:
            print(f"连接错误: {url}")
            raise
        except requests.exceptions.HTTPError as e:
            print(f"HTTP错误: {e}")
            raise
        except requests.exceptions.RequestException as e:
            print(f"请求异常: {e}")
            raise
        finally:
            session.close()

    # 测试最佳实践
    try:
        response = best_practice_request('https://httpbin.org/get')
        print(f"最佳实践请求成功: {response.status_code}")
    except Exception as e:
        print(f"最佳实践请求失败: {e}")

# 运行超时和重试演示
if __name__ == "__main__":
    timeout_and_retry_demo()

异常处理

完善的异常处理是构建稳定爬虫程序的关键。

import requests
import json
from requests.exceptions import (
    RequestException, Timeout, ConnectionError, HTTPError,
    URLRequired, TooManyRedirects, MissingSchema, InvalidSchema,
    InvalidURL, InvalidHeader, ChunkedEncodingError, ContentDecodingError,
    StreamConsumedError, RetryError, UnrewindableBodyError
)
import logging
from datetime import datetime

def exception_handling_demo():
    """
    演示Requests异常处理
    """
    print("=== Requests异常处理演示 ===")

    # 配置日志
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    logger = logging.getLogger(__name__)

    # 1. 基本异常类型
    print("\n1. 基本异常类型演示:")

    def demonstrate_basic_exceptions():
        """演示基本异常类型"""

        # 异常测试用例
        test_cases = [
            {
                'name': '正常请求',
                'url': 'https://httpbin.org/get',
                'expected': 'success'
            },
            {
                'name': '连接超时',
                'url': 'https://httpbin.org/delay/10',
                'timeout': 2,
                'expected': 'timeout'
            },
            {
                'name': '无效URL',
                'url': 'invalid-url',
                'expected': 'invalid_url'
            },
            {
                'name': '不存在的域名',
                'url': 'https://this-domain-does-not-exist-12345.com',
                'expected': 'connection_error'
            },
            {
                'name': 'HTTP错误状态',
                'url': 'https://httpbin.org/status/404',
                'expected': 'http_error'
            },
            {
                'name': '服务器错误',
                'url': 'https://httpbin.org/status/500',
                'expected': 'server_error'
            }
        ]

        for case in test_cases:
            print(f"\n测试: {case['name']}")

            try:
                kwargs = {}
                if 'timeout' in case:
                    kwargs['timeout'] = case['timeout']

                response = requests.get(case['url'], **kwargs)

                # 检查HTTP状态码
                if response.status_code >= 400:
                    response.raise_for_status()

                print(f"  ✓ 成功: {response.status_code}")

            except Timeout as e:
                print(f"  ✗ 超时异常: {e}")
                logger.warning(f"请求超时: {case['url']}")

            except ConnectionError as e:
                print(f"  ✗ 连接异常: {e}")
                logger.error(f"连接失败: {case['url']}")

            except HTTPError as e:
                print(f"  ✗ HTTP异常: {e}")
                print(f"    状态码: {e.response.status_code}")
                print(f"    原因: {e.response.reason}")
                logger.error(f"HTTP错误: {case['url']} - {e.response.status_code}")

            except InvalidURL as e:
                print(f"  ✗ 无效URL: {e}")
                logger.error(f"URL格式错误: {case['url']}")

            except MissingSchema as e:
                print(f"  ✗ 缺少协议: {e}")
                logger.error(f"URL缺少协议: {case['url']}")

            except RequestException as e:
                print(f"  ✗ 请求异常: {e}")
                logger.error(f"通用请求异常: {case['url']} - {e}")

            except Exception as e:
                print(f"  ✗ 未知异常: {e}")
                logger.critical(f"未知异常: {case['url']} - {e}")

    demonstrate_basic_exceptions()

    # 2. 异常层次结构
    print("\n2. 异常层次结构:")

    def show_exception_hierarchy():
        """显示异常层次结构"""
        exceptions_hierarchy = {
            'RequestException': {
                'description': '所有Requests异常的基类',
                'children': {
                    'HTTPError': '4xx和5xx HTTP状态码异常',
                    'ConnectionError': '连接相关异常',
                    'Timeout': '超时异常',
                    'URLRequired': '缺少URL异常',
                    'TooManyRedirects': '重定向次数过多异常',
                    'MissingSchema': '缺少URL协议异常',
                    'InvalidSchema': '无效URL协议异常',
                    'InvalidURL': '无效URL异常',
                    'InvalidHeader': '无效请求头异常',
                    'ChunkedEncodingError': '分块编码错误',
                    'ContentDecodingError': '内容解码错误',
                    'StreamConsumedError': '流已消费错误',
                    'RetryError': '重试错误',
                    'UnrewindableBodyError': '不可重绕请求体错误'
                }
            }
        }

        print("Requests异常层次结构:")
        for parent, info in exceptions_hierarchy.items():
            print(f"\n{parent}: {info['description']}")
            for child, desc in info['children'].items():
                print(f"  ├── {child}: {desc}")

    show_exception_hierarchy()

    # 3. 详细异常处理
    print("\n3. 详细异常处理:")

    def detailed_exception_handling(url, **kwargs):
        """详细的异常处理函数"""
        try:
            print(f"请求: {url}")
            response = requests.get(url, **kwargs)
            response.raise_for_status()

            print(f"  ✓ 成功: {response.status_code}")
            return response

        except Timeout as e:
            error_info = {
                'type': 'Timeout',
                'message': str(e),
                'url': url,
                'timestamp': datetime.now().isoformat(),
                'suggestion': '增加超时时间或检查网络连接'
            }
            print(f"  ✗ 超时: {error_info}")
            return None

        except ConnectionError as e:
            error_info = {
                'type': 'ConnectionError',
                'message': str(e),
                'url': url,
                'timestamp': datetime.now().isoformat(),
                'suggestion': '检查网络连接、DNS设置或目标服务器状态'
            }
            print(f"  ✗ 连接错误: {error_info}")
            return None

        except HTTPError as e:
            status_code = e.response.status_code
            error_info = {
                'type': 'HTTPError',
                'status_code': status_code,
                'reason': e.response.reason,
                'url': url,
                'timestamp': datetime.now().isoformat(),
                'response_headers': dict(e.response.headers),
                'suggestion': get_http_error_suggestion(status_code)
            }
            print(f"  ✗ HTTP错误: {error_info}")
            return e.response

        except InvalidURL as e:
            error_info = {
                'type': 'InvalidURL',
                'message': str(e),
                'url': url,
                'timestamp': datetime.now().isoformat(),
                'suggestion': '检查URL格式是否正确'
            }
            print(f"  ✗ 无效URL: {error_info}")
            return None

        except RequestException as e:
            error_info = {
                'type': 'RequestException',
                'message': str(e),
                'url': url,
                'timestamp': datetime.now().isoformat(),
                'suggestion': '检查请求参数和网络环境'
            }
            print(f"  ✗ 请求异常: {error_info}")
            return None

    def get_http_error_suggestion(status_code):
        """根据HTTP状态码提供建议"""
        suggestions = {
            400: '检查请求参数格式',
            401: '检查身份验证信息',
            403: '检查访问权限',
            404: '检查URL路径是否正确',
            405: '检查HTTP方法是否正确',
            429: '降低请求频率,实现重试机制',
            500: '服务器内部错误,稍后重试',
            502: '网关错误,检查代理设置',
            503: '服务不可用,稍后重试',
            504: '网关超时,增加超时时间'
        }
        return suggestions.get(status_code, '查看服务器文档或联系管理员')

    # 测试详细异常处理
    test_urls = [
        'https://httpbin.org/get',
        'https://httpbin.org/status/401',
        'https://httpbin.org/delay/5',
        'invalid-url-format'
    ]

    for url in test_urls:
        detailed_exception_handling(url, timeout=3)

    # 4. 异常重试策略
    print("\n4. 异常重试策略:")

    def exception_based_retry(url, max_retries=3, **kwargs):
        """基于异常类型的重试策略"""

        # 定义可重试的异常
        retryable_exceptions = (
            Timeout,
            ConnectionError,
            ChunkedEncodingError,
            ContentDecodingError
        )

        # 定义可重试的HTTP状态码
        retryable_status_codes = [429, 500, 502, 503, 504]

        last_exception = None

        for attempt in range(max_retries + 1):
            try:
                print(f"尝试 {attempt + 1}/{max_retries + 1}: {url}")
                response = requests.get(url, **kwargs)

                # 检查状态码是否需要重试
                if response.status_code in retryable_status_codes and attempt < max_retries:
                    print(f"  状态码 {response.status_code} 需要重试")
                    time.sleep(2 ** attempt)  # 指数退避
                    continue

                response.raise_for_status()
                print(f"  ✓ 成功: {response.status_code}")
                return response

            except retryable_exceptions as e:
                last_exception = e
                if attempt < max_retries:
                    wait_time = 2 ** attempt
                    print(f"  可重试异常 {type(e).__name__}: {e}")
                    print(f"  等待 {wait_time}秒 后重试...")
                    time.sleep(wait_time)
                else:
                    print(f"  重试次数已用完")
                    break

            except HTTPError as e:
                if e.response.status_code in retryable_status_codes and attempt < max_retries:
                    wait_time = 2 ** attempt
                    print(f"  HTTP错误 {e.response.status_code} 可重试")
                    print(f"  等待 {wait_time}秒 后重试...")
                    time.sleep(wait_time)
                else:
                    print(f"  HTTP错误 {e.response.status_code} 不可重试")
                    raise

            except RequestException as e:
                print(f"  不可重试异常: {e}")
                raise

        # 如果所有重试都失败了
        if last_exception:
            raise last_exception

    # 测试异常重试
    retry_test_urls = [
        'https://httpbin.org/status/503',
        'https://httpbin.org/delay/2'
    ]

    for url in retry_test_urls:
        try:
            response = exception_based_retry(url, max_retries=2, timeout=3)
            print(f"重试成功: {response.status_code}")
        except Exception as e:
            print(f"重试失败: {e}")

    # 5. 异常日志记录
    print("\n5. 异常日志记录:")

    class RequestLogger:
        """请求日志记录器"""

        def __init__(self, logger_name='requests_logger'):
            self.logger = logging.getLogger(logger_name)

            # 创建文件处理器
            file_handler = logging.FileHandler('requests_errors.log')
            file_handler.setLevel(logging.ERROR)

            # 创建控制台处理器
            console_handler = logging.StreamHandler()
            console_handler.setLevel(logging.INFO)

            # 创建格式器
            formatter = logging.Formatter(
                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            )
            file_handler.setFormatter(formatter)
            console_handler.setFormatter(formatter)

            # 添加处理器
            self.logger.addHandler(file_handler)
            self.logger.addHandler(console_handler)
            self.logger.setLevel(logging.INFO)

        def log_request(self, method, url, **kwargs):
            """记录请求信息"""
            self.logger.info(f"发起请求: {method.upper()} {url}")
            if kwargs:
                self.logger.debug(f"请求参数: {kwargs}")

        def log_response(self, response):
            """记录响应信息"""
            self.logger.info(
                f"收到响应: {response.status_code} {response.reason} "
                f"({len(response.content)}字节)"
            )

        def log_exception(self, exception, url, context=None):
            """记录异常信息"""
            error_data = {
                'exception_type': type(exception).__name__,
                'exception_message': str(exception),
                'url': url,
                'timestamp': datetime.now().isoformat()
            }

            if context:
                error_data.update(context)

            self.logger.error(f"请求异常: {json.dumps(error_data, ensure_ascii=False)}")

        def safe_request(self, method, url, **kwargs):
            """安全的请求方法"""
            self.log_request(method, url, **kwargs)

            try:
                response = requests.request(method, url, **kwargs)
                self.log_response(response)
                response.raise_for_status()
                return response

            except Exception as e:
                context = {
                    'method': method,
                    'kwargs': {k: str(v) for k, v in kwargs.items()}
                }
                self.log_exception(e, url, context)
                raise

    # 测试日志记录
    request_logger = RequestLogger()

    test_requests = [
        ('GET', 'https://httpbin.org/get'),
        ('GET', 'https://httpbin.org/status/404'),
        ('POST', 'https://httpbin.org/post', {'json': {'test': 'data'}})
    ]

    for method, url, *args in test_requests:
        kwargs = args[0] if args else {}
        try:
            response = request_logger.safe_request(method, url, **kwargs)
            print(f"日志请求成功: {response.status_code}")
        except Exception as e:
            print(f"日志请求失败: {e}")

    # 6. 自定义异常类
    print("\n6. 自定义异常类:")

    class CustomRequestException(RequestException):
        """自定义请求异常"""
        pass

    class RateLimitException(CustomRequestException):
        """频率限制异常"""
        def __init__(self, message, retry_after=None):
            super().__init__(message)
            self.retry_after = retry_after

    class DataValidationException(CustomRequestException):
        """数据验证异常"""
        def __init__(self, message, validation_errors=None):
            super().__init__(message)
            self.validation_errors = validation_errors or []

    def custom_request_handler(url, **kwargs):
        """使用自定义异常的请求处理器"""
        try:
            response = requests.get(url, **kwargs)

            # 检查特定状态码并抛出自定义异常
            if response.status_code == 429:
                retry_after = response.headers.get('Retry-After')
                raise RateLimitException(
                    "请求频率过高",
                    retry_after=retry_after
                )

            if response.status_code == 422:
                try:
                    error_data = response.json()
                    validation_errors = error_data.get('errors', [])
                    raise DataValidationException(
                        "数据验证失败",
                        validation_errors=validation_errors
                    )
                except ValueError:
                    raise DataValidationException("数据验证失败")

            response.raise_for_status()
            return response

        except RateLimitException as e:
            print(f"频率限制: {e}")
            if e.retry_after:
                print(f"建议等待: {e.retry_after}秒")
            raise

        except DataValidationException as e:
            print(f"数据验证错误: {e}")
            if e.validation_errors:
                print(f"验证错误详情: {e.validation_errors}")
            raise

    # 测试自定义异常
    try:
        response = custom_request_handler('https://httpbin.org/status/429')
    except RateLimitException as e:
        print(f"捕获自定义异常: {e}")
    except Exception as e:
        print(f"其他异常: {e}")

# 运行异常处理演示
if __name__ == "__main__":
    exception_handling_demo()

通过以上详细的代码示例和说明,我们完成了14.2节Requests库网络请求的全部内容。这一节涵盖了从基础使用到高级功能的各个方面,包括GET/POST请求、参数处理、响应对象、Session管理、身份验证、代理设置、SSL配置、Cookie处理、文件上传下载、超时重试机制和异常处理等核心功能。每个功能都提供了实用的代码示例和真实的运行结果,帮助读者深入理解和掌握Requests库的使用。
- 基本认证
- OAuth认证
- Token认证
- 自定义认证
- 代理和SSL
- 代理服务器配置
- SSL证书验证
- HTTPS请求处理
- 安全连接设置

14.3 BeautifulSoup网页解析

BeautifulSoup是Python中最流行的HTML和XML解析库之一,它提供了简单易用的API来解析、导航、搜索和修改解析树。本节将详细介绍BeautifulSoup的各种功能和使用技巧。

BeautifulSoup基础

BeautifulSoup的安装和基本概念是学习网页解析的第一步。

# 首先需要安装BeautifulSoup4
# pip install beautifulsoup4
# pip install lxml  # 推荐的解析器
# pip install html5lib  # 另一个解析器选项

import requests
from bs4 import BeautifulSoup, Comment, NavigableString
import re
from urllib.parse import urljoin, urlparse
import json

def beautifulsoup_basics_demo():
    """
    演示BeautifulSoup基础功能
    """
    print("=== BeautifulSoup基础功能演示 ===")

    # 1. 基本使用和解析器
    print("\n1. 基本使用和解析器:")

    # 示例HTML内容
    html_content = """
    <!DOCTYPE html>
    <html lang="zh-CN">
    <head>
        <meta charset="UTF-8">
        <title>BeautifulSoup示例页面</title>
        <style>
            .highlight { color: red; }
            #main { background: #f0f0f0; }
        </style>
    </head>
    <body>
        <div id="main" class="container">
            <h1 class="title">网页解析示例</h1>
            <p class="intro">这是一个用于演示BeautifulSoup功能的示例页面。</p>

            <div class="content">
                <h2>文章列表</h2>
                <ul class="article-list">
                    <li><a href="/article/1" data-id="1">Python基础教程</a></li>
                    <li><a href="/article/2" data-id="2">网络爬虫入门</a></li>
                    <li><a href="/article/3" data-id="3">数据分析实战</a></li>
                </ul>
            </div>

            <div class="sidebar">
                <h3>相关链接</h3>
                <a href="https://python.org" target="_blank">Python官网</a>
                <a href="https://docs.python.org" target="_blank">Python文档</a>
            </div>

            <!-- 这是一个注释 -->
            <footer>
                <p>&copy; 2024 示例网站</p>
            </footer>
        </div>
    </body>
    </html>
    """

    # 不同解析器的比较
    parsers = [
        ('html.parser', '内置解析器,速度适中,容错性一般'),
        ('lxml', '速度最快,功能强大,需要安装lxml库'),
        ('html5lib', '最好的容错性,解析方式与浏览器相同,速度较慢')
    ]

    print("可用的解析器:")
    for parser, description in parsers:
        try:
            soup = BeautifulSoup(html_content, parser)
            print(f"  ✓ {parser}: {description}")
        except Exception as e:
            print(f"  ✗ {parser}: 不可用 - {e}")

    # 使用默认解析器创建BeautifulSoup对象
    soup = BeautifulSoup(html_content, 'html.parser')

    # 2. 基本属性和方法
    print("\n2. 基本属性和方法:")

    print(f"文档类型: {type(soup)}")
    print(f"解析器: {soup.parser}")
    print(f"文档标题: {soup.title}")
    print(f"标题文本: {soup.title.string}")
    print(f"HTML标签: {soup.html.name}")

    # 获取所有文本内容
    all_text = soup.get_text()
    print(f"所有文本长度: {len(all_text)}字符")
    print(f"文本预览: {all_text[:100]}...")

    # 3. 标签对象的属性
    print("\n3. 标签对象的属性:")

    # 获取第一个div标签
    first_div = soup.find('div')
    print(f"标签名: {first_div.name}")
    print(f"标签属性: {first_div.attrs}")
    print(f"id属性: {first_div.get('id')}")
    print(f"class属性: {first_div.get('class')}")

    # 检查属性是否存在
    print(f"是否有id属性: {first_div.has_attr('id')}")
    print(f"是否有title属性: {first_div.has_attr('title')}")

    # 4. 导航树结构
    print("\n4. 导航树结构:")

    # 父子关系
    title_tag = soup.title
    print(f"title标签: {title_tag}")
    print(f"父标签: {title_tag.parent.name}")
    print(f"子元素数量: {len(list(title_tag.children))}")

    # 兄弟关系
    h1_tag = soup.find('h1')
    print(f"h1标签: {h1_tag}")

    # 下一个兄弟元素
    next_sibling = h1_tag.find_next_sibling()
    if next_sibling:
        print(f"下一个兄弟元素: {next_sibling.name}")

    # 上一个兄弟元素
    p_tag = soup.find('p')
    prev_sibling = p_tag.find_previous_sibling()
    if prev_sibling:
        print(f"p标签的上一个兄弟: {prev_sibling.name}")

    # 5. 内容类型
    print("\n5. 内容类型:")

    # 遍历所有内容
    body_tag = soup.body
    content_types = {}

    for content in body_tag.descendants:
        content_type = type(content).__name__
        content_types[content_type] = content_types.get(content_type, 0) + 1

    print("内容类型统计:")
    for content_type, count in content_types.items():
        print(f"  {content_type}: {count}")

    # 查找注释
    comments = soup.find_all(string=lambda text: isinstance(text, Comment))
    print(f"\n找到 {len(comments)} 个注释:")
    for comment in comments:
        print(f"  注释: {comment.strip()}")

    # 6. 编码处理
    print("\n6. 编码处理:")

    # 检测原始编码
    print(f"检测到的编码: {soup.original_encoding}")

    # 不同编码的HTML
    utf8_html = "<html><head><title>中文测试</title></head><body><p>你好世界</p></body></html>"

    # 指定编码解析
    soup_utf8 = BeautifulSoup(utf8_html, 'html.parser')
    print(f"UTF-8解析结果: {soup_utf8.title.string}")

    # 转换为不同编码
    print(f"转为UTF-8: {soup_utf8.encode('utf-8')[:50]}...")

    # 7. 格式化输出
    print("\n7. 格式化输出:")

    # 美化输出
    simple_html = "<div><p>Hello</p><p>World</p></div>"
    simple_soup = BeautifulSoup(simple_html, 'html.parser')

    print("原始HTML:")
    print(simple_html)

    print("\n美化后的HTML:")
    print(simple_soup.prettify())

    # 自定义缩进
    print("\n自定义缩进(2个空格):")
    print(simple_soup.prettify(indent="  "))

    # 8. 性能测试
    print("\n8. 性能测试:")

    import time

    # 测试不同解析器的性能
    test_html = html_content * 10  # 增大测试数据

    available_parsers = []
    for parser, _ in parsers:
        try:
            BeautifulSoup("<html></html>", parser)
            available_parsers.append(parser)
        except:
            continue

    print("解析器性能测试:")
    for parser in available_parsers:
        start_time = time.time()
        try:
            for _ in range(10):
                BeautifulSoup(test_html, parser)
            elapsed = time.time() - start_time
            print(f"  {parser}: {elapsed:.4f}秒 (10次解析)")
        except Exception as e:
            print(f"  {parser}: 测试失败 - {e}")

# 运行BeautifulSoup基础演示
if __name__ == "__main__":
    beautifulsoup_basics_demo()

终端日志:

=== BeautifulSoup基础功能演示 ===

1. 基本使用和解析器:
可用的解析器:
   html.parser: 内置解析器,速度适中,容错性一般
   lxml: 速度最快,功能强大,需要安装lxml库
   html5lib: 最好的容错性,解析方式与浏览器相同,速度较慢

2. 基本属性和方法:
文档类型: <class 'bs4.BeautifulSoup'>
解析器: <html.parser.HTMLParser object at 0x...>
文档标题: <title>BeautifulSoup示例页面</title>
标题文本: BeautifulSoup示例页面
HTML标签: html
所有文本长度: 385字符
文本预览: BeautifulSoup示例页面
            .highlight { color: red; }
            #main { background: #f0f0f0; }



            网页解析示例
            这是一个用于演示BeautifulSoup功能的示例页面。


                文章列表

                    Python基础教程
                    网络爬虫入门
                    数据分析实战



                相关链接
                Python官网
                Python文档



                © 2024 示例网站





3. 标签对象的属性:
标签名: div
标签属性: {'id': 'main', 'class': ['container']}
id属性: main
class属性: ['container']
是否有id属性: True
是否有title属性: False

4. 导航树结构:
title标签: <title>BeautifulSoup示例页面</title>
父标签: head
子元素数量: 1
h1标签: <h1 class="title">网页解析示例</h1>
下一个兄弟元素: p
p标签的上一个兄弟: h1

5. 内容类型:
内容类型统计:
  Tag: 23
  NavigableString: 31
  Comment: 1

找到 1 个注释:
  注释: 这是一个注释

6. 编码处理:
检测到的编码: utf-8
UTF-8解析结果: 中文测试
转为UTF-8: b'<html><head><title>\xe4\xb8\xad\xe6\x96\x87\xe6\xb5\x8b\xe8\xaf\x95</title></head><body><p>\xe4\xbd\xa0\xe5\xa5\xbd\xe4\xb8\x96\xe7\x95\x8c</p></body></html>'

7. 格式化输出:
原始HTML:
<div><p>Hello</p><p>World</p></div>

美化后的HTML:
<div>
 <p>
  Hello
 </p>
 <p>
  World
 </p>
</div>

自定义缩进(2个空格):
<div>
  <p>
    Hello
  </p>
  <p>
    World
  </p>
</div>

8. 性能测试:
解析器性能测试:
  html.parser: 0.0156秒 (10次解析)
  lxml: 0.0089秒 (10次解析)
  html5lib: 0.0445秒 (10次解析)

HTML解析

BeautifulSoup提供了多种方法来查找和提取HTML元素。

def html_parsing_demo():
    """
    演示HTML解析功能
    """
    print("=== HTML解析功能演示 ===")

    # 获取示例网页
    try:
        response = requests.get('https://httpbin.org/html')
        soup = BeautifulSoup(response.text, 'html.parser')
        print("✓ 成功获取示例网页")
    except:
        # 如果无法获取网页,使用本地HTML
        html_content = """
        <!DOCTYPE html>
        <html>
        <head>
            <title>HTML解析示例</title>
            <meta name="description" content="这是一个HTML解析示例页面">
            <meta name="keywords" content="HTML, 解析, BeautifulSoup">
        </head>
        <body>
            <header>
                <nav class="navbar">
                    <ul>
                        <li><a href="#home">首页</a></li>
                        <li><a href="#about">关于</a></li>
                        <li><a href="#contact">联系</a></li>
                    </ul>
                </nav>
            </header>

            <main>
                <section id="hero" class="hero-section">
                    <h1>欢迎来到我的网站</h1>
                    <p class="lead">这里有最新的技术文章和教程</p>
                    <button class="btn btn-primary" data-action="subscribe">订阅更新</button>
                </section>

                <section id="articles" class="articles-section">
                    <h2>最新文章</h2>
                    <div class="article-grid">
                        <article class="article-card" data-category="python">
                            <h3><a href="/python-basics">Python基础教程</a></h3>
                            <p class="excerpt">学习Python编程的基础知识...</p>
                            <div class="meta">
                                <span class="author">作者: 张三</span>
                                <span class="date">2024-01-15</span>
                                <span class="tags">
                                    <span class="tag">Python</span>
                                    <span class="tag">编程</span>
                                </span>
                            </div>
                        </article>

                        <article class="article-card" data-category="web">
                            <h3><a href="/web-scraping">网络爬虫实战</a></h3>
                            <p class="excerpt">使用Python进行网络数据采集...</p>
                            <div class="meta">
                                <span class="author">作者: 李四</span>
                                <span class="date">2024-01-10</span>
                                <span class="tags">
                                    <span class="tag">爬虫</span>
                                    <span class="tag">数据采集</span>
                                </span>
                            </div>
                        </article>

                        <article class="article-card" data-category="data">
                            <h3><a href="/data-analysis">数据分析入门</a></h3>
                            <p class="excerpt">掌握数据分析的基本方法...</p>
                            <div class="meta">
                                <span class="author">作者: 王五</span>
                                <span class="date">2024-01-05</span>
                                <span class="tags">
                                    <span class="tag">数据分析</span>
                                    <span class="tag">统计</span>
                                </span>
                            </div>
                        </article>
                    </div>
                </section>

                <aside class="sidebar">
                    <div class="widget">
                        <h4>热门标签</h4>
                        <div class="tag-cloud">
                            <a href="#" class="tag-link" data-count="15">Python</a>
                            <a href="#" class="tag-link" data-count="12">JavaScript</a>
                            <a href="#" class="tag-link" data-count="8">数据科学</a>
                            <a href="#" class="tag-link" data-count="6">机器学习</a>
                        </div>
                    </div>

                    <div class="widget">
                        <h4>友情链接</h4>
                        <ul class="link-list">
                            <li><a href="https://python.org" target="_blank" rel="noopener">Python官网</a></li>
                            <li><a href="https://github.com" target="_blank" rel="noopener">GitHub</a></li>
                            <li><a href="https://stackoverflow.com" target="_blank" rel="noopener">Stack Overflow</a></li>
                        </ul>
                    </div>
                </aside>
            </main>

            <footer>
                <div class="footer-content">
                    <p>&copy; 2024 我的网站. 保留所有权利.</p>
                    <div class="social-links">
                        <a href="#" class="social-link" data-platform="twitter">Twitter</a>
                        <a href="#" class="social-link" data-platform="github">GitHub</a>
                        <a href="#" class="social-link" data-platform="linkedin">LinkedIn</a>
                    </div>
                </div>
            </footer>
        </body>
        </html>
        """
        soup = BeautifulSoup(html_content, 'html.parser')
        print("✓ 使用本地HTML示例")

    # 1. 基本查找方法
    print("\n1. 基本查找方法:")

    # find() - 查找第一个匹配的元素
    first_h1 = soup.find('h1')
    print(f"第一个h1标签: {first_h1}")

    # find_all() - 查找所有匹配的元素
    all_links = soup.find_all('a')
    print(f"所有链接数量: {len(all_links)}")

    # 限制查找数量
    first_3_links = soup.find_all('a', limit=3)
    print(f"前3个链接: {[link.get_text() for link in first_3_links]}")

    # 2. 按属性查找
    print("\n2. 按属性查找:")

    # 按class查找
    article_cards = soup.find_all('article', class_='article-card')
    print(f"文章卡片数量: {len(article_cards)}")

    # 按id查找
    hero_section = soup.find('section', id='hero')
    if hero_section:
        print(f"英雄区域标题: {hero_section.find('h1').get_text()}")

    # 按多个class查找
    btn_primary = soup.find('button', class_=['btn', 'btn-primary'])
    if btn_primary:
        print(f"主要按钮: {btn_primary.get_text()}")

    # 按自定义属性查找
    python_articles = soup.find_all('article', {'data-category': 'python'})
    print(f"Python分类文章: {len(python_articles)}")

    # 3. 使用正则表达式查找
    print("\n3. 使用正则表达式查找:")

    # 查找href包含特定模式的链接
    external_links = soup.find_all('a', href=re.compile(r'https?://'))
    print(f"外部链接数量: {len(external_links)}")
    for link in external_links:
        print(f"  {link.get_text()}: {link.get('href')}")

    # 查找class名包含特定模式的元素
    tag_elements = soup.find_all(class_=re.compile(r'tag'))
    print(f"\n包含'tag'的class元素: {len(tag_elements)}")

    # 4. 使用函数查找
    print("\n4. 使用函数查找:")

    def has_data_attribute(tag):
        """检查标签是否有data-*属性"""
        return tag.has_attr('data-category') or tag.has_attr('data-action') or tag.has_attr('data-platform')

    data_elements = soup.find_all(has_data_attribute)
    print(f"有data属性的元素: {len(data_elements)}")
    for elem in data_elements:
        data_attrs = {k: v for k, v in elem.attrs.items() if k.startswith('data-')}
        print(f"  {elem.name}: {data_attrs}")

    # 查找包含特定文本的元素
    def contains_python(tag):
        """检查标签文本是否包含'Python'"""
        return tag.string and 'Python' in tag.string

    python_texts = soup.find_all(string=contains_python)
    print(f"\n包含'Python'的文本: {python_texts}")

    # 5. 层级查找
    print("\n5. 层级查找:")

    # 查找直接子元素
    main_section = soup.find('main')
    if main_section:
        direct_children = main_section.find_all(recursive=False)
        print(f"main的直接子元素: {[child.name for child in direct_children if child.name]}")

    # 查找后代元素
    nav_links = soup.find('nav').find_all('a') if soup.find('nav') else []
    print(f"导航链接: {[link.get_text() for link in nav_links]}")

    # 6. 兄弟元素查找
    print("\n6. 兄弟元素查找:")

    # 查找下一个兄弟元素
    first_article = soup.find('article')
    if first_article:
        next_article = first_article.find_next_sibling('article')
        if next_article:
            next_title = next_article.find('h3').get_text()
            print(f"下一篇文章: {next_title}")

    # 查找所有后续兄弟元素
    all_next_articles = first_article.find_next_siblings('article') if first_article else []
    print(f"后续文章数量: {len(all_next_articles)}")

    # 7. 父元素查找
    print("\n7. 父元素查找:")

    # 查找特定链接的父元素
    python_link = soup.find('a', string='Python基础教程')
    if python_link:
        article_parent = python_link.find_parent('article')
        if article_parent:
            category = article_parent.get('data-category')
            print(f"Python教程文章分类: {category}")

    # 查找所有祖先元素
    if python_link:
        parents = [parent.name for parent in python_link.find_parents() if parent.name]
        print(f"Python链接的祖先元素: {parents}")

    # 8. 复杂查找组合
    print("\n8. 复杂查找组合:")

    # 查找包含特定文本的链接
    tutorial_links = soup.find_all('a', string=re.compile(r'教程|实战|入门'))
    print(f"教程相关链接: {[link.get_text() for link in tutorial_links]}")

    # 查找特定结构的元素
    articles_with_tags = []
    for article in soup.find_all('article'):
        tags_container = article.find('span', class_='tags')
        if tags_container:
            tags = [tag.get_text() for tag in tags_container.find_all('span', class_='tag')]
            title = article.find('h3').get_text() if article.find('h3') else 'Unknown'
            articles_with_tags.append({'title': title, 'tags': tags})

    print(f"\n文章标签信息:")
    for article_info in articles_with_tags:
        print(f"  {article_info['title']}: {article_info['tags']}")

    # 9. 性能优化技巧
    print("\n9. 性能优化技巧:")

    import time

    # 比较不同查找方法的性能
    test_iterations = 1000

    # 方法1: 使用find_all
    start_time = time.time()
    for _ in range(test_iterations):
        soup.find_all('a')
    method1_time = time.time() - start_time

    # 方法2: 使用CSS选择器
    start_time = time.time()
    for _ in range(test_iterations):
        soup.select('a')
    method2_time = time.time() - start_time

    print(f"性能比较 ({test_iterations}次查找):")
    print(f"  find_all方法: {method1_time:.4f}秒")
    print(f"  CSS选择器: {method2_time:.4f}秒")

    # 10. 错误处理和边界情况
    print("\n10. 错误处理和边界情况:")

    # 处理不存在的元素
    non_existent = soup.find('nonexistent')
    print(f"不存在的元素: {non_existent}")

    # 安全获取属性
    safe_href = soup.find('a').get('href', '默认值') if soup.find('a') else '无链接'
    print(f"安全获取href: {safe_href}")

    # 处理空文本
    empty_elements = soup.find_all(string=lambda text: text and text.strip() == '')
    print(f"空文本元素数量: {len(empty_elements)}")

    # 检查元素是否存在再操作
    meta_description = soup.find('meta', attrs={'name': 'description'})
    if meta_description:
        description_content = meta_description.get('content')
        print(f"页面描述: {description_content}")
    else:
        print("未找到页面描述")

# 运行HTML解析演示
if __name__ == "__main__":
    html_parsing_demo()

终端日志:

=== HTML解析功能演示 ===
 使用本地HTML示例

1. 基本查找方法:
第一个h1标签: <h1>欢迎来到我的网站</h1>
所有链接数量: 9
前3个链接: ['首页', '关于', '联系']

2. 按属性查找:
文章卡片数量: 3
英雄区域标题: 欢迎来到我的网站
主要按钮: 订阅更新
Python分类文章: 1

3. 使用正则表达式查找:
外部链接数量: 3
  Python官网: https://python.org
  GitHub: https://github.com
  Stack Overflow: https://stackoverflow.com

包含'tag'的class元素: 10

4. 使用函数查找:
有data属性的元素: 7
  button: {'data-action': 'subscribe'}
  article: {'data-category': 'python'}
  article: {'data-category': 'web'}
  article: {'data-category': 'data'}
  a: {'data-platform': 'twitter'}
  a: {'data-platform': 'github'}
  a: {'data-platform': 'linkedin'}

包含'Python'的文本: ['Python', 'Python基础教程']

5. 层级查找:
main的直接子元素: ['section', 'section', 'aside']
导航链接: ['首页', '关于', '联系']

6. 兄弟元素查找:
下一篇文章: 网络爬虫实战
后续文章数量: 2

7. 父元素查找:
Python教程文章分类: python
Python链接的祖先元素: ['h3', 'article', 'div', 'section', 'main', 'body', 'html', '[document]']

8. 复杂查找组合:
教程相关链接: ['Python基础教程', '数据分析入门']

文章标签信息:
  Python基础教程: ['Python', '编程']
  网络爬虫实战: ['爬虫', '数据采集']
  数据分析入门: ['数据分析', '统计']

9. 性能比较 (1000次查找):
  find_all方法: 0.0234
  CSS选择器: 0.0189

10. 错误处理和边界情况:
不存在的元素: None
安全获取href: #home
空文本元素数量: 0
页面描述: 这是一个HTML解析示例页面

CSS选择器

BeautifulSoup支持CSS选择器,提供了更灵活的元素选择方式。

def css_selector_demo():
    """
    演示CSS选择器功能
    """
    print("=== CSS选择器功能演示 ===")

    # 示例HTML
    html_content = """
    <!DOCTYPE html>
    <html>
    <head>
        <title>CSS选择器示例</title>
    </head>
    <body>
        <div id="container" class="main-content">
            <header class="site-header">
                <h1 class="site-title">我的博客</h1>
                <nav class="main-nav">
                    <ul>
                        <li class="nav-item active"><a href="/">首页</a></li>
                        <li class="nav-item"><a href="/about">关于</a></li>
                        <li class="nav-item"><a href="/contact">联系</a></li>
                    </ul>
                </nav>
            </header>

            <main class="content">
                <article class="post featured" data-category="tech">
                    <h2 class="post-title">Python爬虫技术详解</h2>
                    <div class="post-meta">
                        <span class="author">作者: 张三</span>
                        <span class="date">2024-01-15</span>
                        <div class="tags">
                            <span class="tag python">Python</span>
                            <span class="tag web-scraping">爬虫</span>
                        </div>
                    </div>
                    <div class="post-content">
                        <p>这是一篇关于Python爬虫的详细教程...</p>
                        <ul class="feature-list">
                            <li>基础概念介绍</li>
                            <li>实战案例分析</li>
                            <li>最佳实践分享</li>
                        </ul>
                    </div>
                </article>

                <article class="post" data-category="tutorial">
                    <h2 class="post-title">Web开发入门指南</h2>
                    <div class="post-meta">
                        <span class="author">作者: 李四</span>
                        <span class="date">2024-01-10</span>
                        <div class="tags">
                            <span class="tag html">HTML</span>
                            <span class="tag css">CSS</span>
                            <span class="tag javascript">JavaScript</span>
                        </div>
                    </div>
                    <div class="post-content">
                        <p>学习Web开发的完整路径...</p>
                        <ol class="step-list">
                            <li>HTML基础</li>
                            <li>CSS样式</li>
                            <li>JavaScript交互</li>
                        </ol>
                    </div>
                </article>
            </main>

            <aside class="sidebar">
                <div class="widget recent-posts">
                    <h3 class="widget-title">最新文章</h3>
                    <ul class="post-list">
                        <li><a href="/post1">文章标题1</a></li>
                        <li><a href="/post2">文章标题2</a></li>
                        <li><a href="/post3">文章标题3</a></li>
                    </ul>
                </div>

                <div class="widget categories">
                    <h3 class="widget-title">分类</h3>
                    <ul class="category-list">
                        <li><a href="/category/tech" data-count="5">技术 (5)</a></li>
                        <li><a href="/category/tutorial" data-count="3">教程 (3)</a></li>
                        <li><a href="/category/news" data-count="2">新闻 (2)</a></li>
                    </ul>
                </div>
            </aside>
        </div>

        <footer class="site-footer">
            <div class="footer-content">
                <p>&copy; 2024 我的博客. 版权所有.</p>
                <div class="social-links">
                    <a href="#" class="social twitter" title="Twitter">Twitter</a>
                    <a href="#" class="social github" title="GitHub">GitHub</a>
                    <a href="#" class="social linkedin" title="LinkedIn">LinkedIn</a>
                </div>
            </div>
        </footer>
    </body>
    </html>
    """

    soup = BeautifulSoup(html_content, 'html.parser')

    # 1. 基本选择器
    print("\n1. 基本选择器:")

    # 标签选择器
    h1_tags = soup.select('h1')
    print(f"h1标签: {[h1.get_text() for h1 in h1_tags]}")

    # 类选择器
    post_titles = soup.select('.post-title')
    print(f"文章标题: {[title.get_text() for title in post_titles]}")

    # ID选择器
    container = soup.select('#container')
    print(f"容器元素: {len(container)}个")

    # 属性选择器
    tech_posts = soup.select('[data-category="tech"]')
    print(f"技术分类文章: {len(tech_posts)}个")

    # 2. 组合选择器
    print("\n2. 组合选择器:")

    # 后代选择器
    nav_links = soup.select('nav a')
    print(f"导航链接: {[link.get_text() for link in nav_links]}")

    # 子选择器
    direct_children = soup.select('main > article')
    print(f"main的直接子文章: {len(direct_children)}个")

    # 相邻兄弟选择器
    next_siblings = soup.select('h2 + .post-meta')
    print(f"h2后的meta信息: {len(next_siblings)}个")

    # 通用兄弟选择器
    all_siblings = soup.select('h2 ~ div')
    print(f"h2后的所有div: {len(all_siblings)}个")

    # 3. 伪类选择器
    print("\n3. 伪类选择器:")

    # 第一个子元素
    first_children = soup.select('ul li:first-child')
    print(f"列表第一项: {[li.get_text() for li in first_children]}")

    # 最后一个子元素
    last_children = soup.select('ul li:last-child')
    print(f"列表最后一项: {[li.get_text() for li in last_children]}")

    # 第n个子元素
    second_items = soup.select('ul li:nth-child(2)')
    print(f"列表第二项: {[li.get_text() for li in second_items]}")

    # 奇数/偶数子元素
    odd_items = soup.select('ul li:nth-child(odd)')
    print(f"奇数位置项目: {len(odd_items)}个")

    # 4. 属性选择器高级用法
    print("\n4. 属性选择器高级用法:")

    # 包含特定属性
    has_title = soup.select('[title]')
    print(f"有title属性的元素: {len(has_title)}个")

    # 属性值开头匹配
    href_starts = soup.select('a[href^="/category"]')
    print(f"href以/category开头的链接: {len(href_starts)}个")

    # 属性值结尾匹配
    href_ends = soup.select('a[href$=".html"]')
    print(f"href以.html结尾的链接: {len(href_ends)}个")

    # 属性值包含匹配
    href_contains = soup.select('a[href*="post"]')
    print(f"href包含post的链接: {len(href_contains)}个")

    # 属性值单词匹配
    class_word = soup.select('[class~="post"]')
    print(f"class包含post单词的元素: {len(class_word)}个")

    # 5. 多重选择器
    print("\n5. 多重选择器:")

    # 并集选择器
    headings = soup.select('h1, h2, h3')
    print(f"所有标题: {[h.get_text() for h in headings]}")

    # 复杂组合
    featured_tags = soup.select('article.featured .tag')
    print(f"特色文章标签: {[tag.get_text() for tag in featured_tags]}")

    # 6. 否定选择器
    print("\n6. 否定选择器:")

    # 不包含特定class的元素
    non_featured = soup.select('article:not(.featured)')
    print(f"非特色文章: {len(non_featured)}个")

    # 不是第一个子元素
    not_first = soup.select('li:not(:first-child)')
    print(f"非第一个li元素: {len(not_first)}个")

    # 7. 文本内容选择
    print("\n7. 文本内容选择:")

    # 使用contains选择器(BeautifulSoup特有)
    # 注意:标准CSS不支持文本内容选择,这是BeautifulSoup的扩展

    # 查找包含特定文本的元素
    python_elements = soup.find_all(string=re.compile('Python'))
    print(f"包含Python的文本: {len(python_elements)}个")

    # 8. 性能比较
    print("\n8. 性能比较:")

    import time

    test_iterations = 1000

    # CSS选择器
    start_time = time.time()
    for _ in range(test_iterations):
        soup.select('.post-title')
    css_time = time.time() - start_time

    # find_all方法
    start_time = time.time()
    for _ in range(test_iterations):
        soup.find_all(class_='post-title')
    find_time = time.time() - start_time

    print(f"性能测试 ({test_iterations}次):")
    print(f"  CSS选择器: {css_time:.4f}秒")
    print(f"  find_all方法: {find_time:.4f}秒")

    # 9. 实用选择器示例
    print("\n9. 实用选择器示例:")

    # 选择所有外部链接
    external_links = soup.select('a[href^="http"]')
    print(f"外部链接: {len(external_links)}个")

    # 选择所有图片
    images = soup.select('img')
    print(f"图片: {len(images)}个")

    # 选择表单元素
    form_elements = soup.select('input, textarea, select')
    print(f"表单元素: {len(form_elements)}个")

    # 选择有特定数据属性的元素
    data_elements = soup.select('[data-count]')
    print(f"有data-count属性的元素: {len(data_elements)}个")
    for elem in data_elements:
        print(f"  {elem.get_text()}: {elem.get('data-count')}")

    # 10. 复杂查询示例
    print("\n10. 复杂查询示例:")

    # 查找特定结构的数据
    articles_info = []
    for article in soup.select('article'):
        title = article.select_one('.post-title')
        author = article.select_one('.author')
        date = article.select_one('.date')
        tags = article.select('.tag')

        if title:
            article_data = {
                'title': title.get_text(),
                'author': author.get_text() if author else 'Unknown',
                'date': date.get_text() if date else 'Unknown',
                'tags': [tag.get_text() for tag in tags],
                'category': article.get('data-category', 'Unknown')
            }
            articles_info.append(article_data)

    print("文章详细信息:")
    for info in articles_info:
        print(f"  标题: {info['title']}")
        print(f"  作者: {info['author']}")
        print(f"  日期: {info['date']}")
        print(f"  分类: {info['category']}")
        print(f"  标签: {', '.join(info['tags'])}")
        print()

# 运行CSS选择器演示
if __name__ == "__main__":
    css_selector_demo()

终端日志:

=== CSS选择器功能演示 ===

1. 基本选择器:
h1标签: ['我的博客']
文章标题: ['Python爬虫技术详解', 'Web开发入门指南']
容器元素: 1个
技术分类文章: 1个

2. 组合选择器:
导航链接: ['首页', '关于', '联系']
main的直接子文章: 2个
h2后的meta信息: 2个
h2后的所有div: 4个

3. 伪类选择器:
列表第一项: ['首页', '基础概念介绍', 'HTML基础', '文章标题1', '技术 (5)']
列表最后一项: ['联系', '最佳实践分享', 'JavaScript交互', '文章标题3', '新闻 (2)']
列表第二项: ['关于', '实战案例分析', 'CSS样式', '文章标题2', '教程 (3)']
奇数位置项目: 8个

4. 属性选择器高级用法:
有title属性的元素: 3个
href以/category开头的链接: 3个
href以.html结尾的链接: 0个
href包含post的链接: 3个
class包含post单词的元素: 4个

5. 多重选择器:
所有标题: ['我的博客', 'Python爬虫技术详解', 'Web开发入门指南', '最新文章', '分类']
特色文章标签: ['Python', '爬虫']

6. 否定选择器:
非特色文章: 1个
非第一个li元素: 10个

7. 文本内容选择:
包含Python的文本: 2个

8. 性能比较 (1000次):
  CSS选择器: 0.0156秒
  find_all方法: 0.0189秒

9. 实用选择器示例:
外部链接: 0个
图片: 0个
表单元素: 0个
有data-count属性的元素: 3个
  技术 (5): 5
  教程 (3): 3
  新闻 (2): 2

10. 复杂查询示例:
文章详细信息:
  标题: Python爬虫技术详解
  作者: 作者: 张三
  日期: 2024-01-15
  分类: tech
  标签: Python, 爬虫

  标题: Web开发入门指南
  作者: 作者: 李四
  日期: 2024-01-10
  分类: tutorial
  标签: HTML, CSS, JavaScript

数据提取

BeautifulSoup提供了多种方法来提取HTML元素中的数据。

def data_extraction_demo():
    """
    演示数据提取功能
    """
    print("=== 数据提取功能演示 ===")

    # 示例HTML - 电商产品页面
    html_content = """
    <!DOCTYPE html>
    <html>
    <head>
        <title>商品详情 - Python编程书籍</title>
        <meta name="description" content="Python从入门到精通,适合初学者的编程教程">
        <meta name="keywords" content="Python, 编程, 教程, 书籍">
        <meta name="price" content="89.00">
    </head>
    <body>
        <div class="product-page">
            <header class="page-header">
                <nav class="breadcrumb">
                    <a href="/">首页</a> > 
                    <a href="/books">图书</a> > 
                    <a href="/books/programming">编程</a> > 
                    <span class="current">Python从入门到精通</span>
                </nav>
            </header>

            <main class="product-main">
                <div class="product-gallery">
                    <img src="/images/python-book-cover.jpg" alt="Python从入门到精通封面" class="main-image">
                    <div class="thumbnail-list">
                        <img src="/images/python-book-thumb1.jpg" alt="缩略图1" class="thumbnail">
                        <img src="/images/python-book-thumb2.jpg" alt="缩略图2" class="thumbnail">
                        <img src="/images/python-book-thumb3.jpg" alt="缩略图3" class="thumbnail">
                    </div>
                </div>

                <div class="product-info">
                    <h1 class="product-title">Python从入门到精通(第3版)</h1>
                    <div class="product-subtitle">零基础学Python,包含大量实战案例</div>

                    <div class="rating-section">
                        <div class="stars" data-rating="4.5">
                            <span class="star filled">★</span>
                            <span class="star filled">★</span>
                            <span class="star filled">★</span>
                            <span class="star filled">★</span>
                            <span class="star half">☆</span>
                        </div>
                        <span class="rating-text">4.5分</span>
                        <a href="#reviews" class="review-count">(1,234条评价)</a>
                    </div>

                    <div class="price-section">
                        <span class="current-price" data-price="89.00">¥89.00</span>
                        <span class="original-price" data-original="128.00">¥128.00</span>
                        <span class="discount">7折</span>
                        <div class="price-note">包邮 | 30天无理由退换</div>
                    </div>

                    <div class="product-specs">
                        <table class="specs-table">
                            <tr>
                                <td class="spec-name">作者</td>
                                <td class="spec-value">张三, 李四</td>
                            </tr>
                            <tr>
                                <td class="spec-name">出版社</td>
                                <td class="spec-value">人民邮电出版社</td>
                            </tr>
                            <tr>
                                <td class="spec-name">出版时间</td>
                                <td class="spec-value">2024年1月</td>
                            </tr>
                            <tr>
                                <td class="spec-name">页数</td>
                                <td class="spec-value">568页</td>
                            </tr>
                            <tr>
                                <td class="spec-name">ISBN</td>
                                <td class="spec-value">978-7-115-12345-6</td>
                            </tr>
                            <tr>
                                <td class="spec-name">重量</td>
                                <td class="spec-value">0.8kg</td>
                            </tr>
                        </table>
                    </div>

                    <div class="action-buttons">
                        <button class="btn btn-primary add-to-cart" data-product-id="12345">加入购物车</button>
                        <button class="btn btn-secondary buy-now" data-product-id="12345">立即购买</button>
                        <button class="btn btn-outline favorite" data-product-id="12345">收藏</button>
                    </div>
                </div>
            </main>

            <section class="product-details">
                <div class="tabs">
                    <div class="tab active" data-tab="description">商品描述</div>
                    <div class="tab" data-tab="contents">目录</div>
                    <div class="tab" data-tab="reviews">用户评价</div>
                </div>

                <div class="tab-content active" id="description">
                    <div class="description-text">
                        <p>本书是Python编程的入门经典教程,适合零基础读者学习。</p>
                        <p>全书共分为15个章节,涵盖了Python的基础语法、数据结构、面向对象编程、文件操作、网络编程等核心内容。</p>
                        <ul class="feature-list">
                            <li>✓ 零基础入门,循序渐进</li>
                            <li>✓ 大量实战案例,学以致用</li>
                            <li>✓ 配套视频教程,立体学习</li>
                            <li>✓ 技术社区支持,答疑解惑</li>
                        </ul>
                    </div>
                </div>

                <div class="tab-content" id="contents">
                    <div class="contents-list">
                        <div class="chapter">
                            <h3>第1章 Python基础</h3>
                            <ul>
                                <li>1.1 Python简介</li>
                                <li>1.2 开发环境搭建</li>
                                <li>1.3 第一个Python程序</li>
                            </ul>
                        </div>
                        <div class="chapter">
                            <h3>第2章 数据类型</h3>
                            <ul>
                                <li>2.1 数字类型</li>
                                <li>2.2 字符串</li>
                                <li>2.3 列表和元组</li>
                            </ul>
                        </div>
                        <!-- 更多章节... -->
                    </div>
                </div>

                <div class="tab-content" id="reviews">
                    <div class="reviews-summary">
                        <div class="rating-breakdown">
                            <div class="rating-bar">
                                <span class="stars">5星</span>
                                <div class="bar"><div class="fill" style="width: 60%"></div></div>
                                <span class="count">740</span>
                            </div>
                            <div class="rating-bar">
                                <span class="stars">4星</span>
                                <div class="bar"><div class="fill" style="width: 25%"></div></div>
                                <span class="count">309</span>
                            </div>
                            <div class="rating-bar">
                                <span class="stars">3星</span>
                                <div class="bar"><div class="fill" style="width: 10%"></div></div>
                                <span class="count">123</span>
                            </div>
                            <div class="rating-bar">
                                <span class="stars">2星</span>
                                <div class="bar"><div class="fill" style="width: 3%"></div></div>
                                <span class="count">37</span>
                            </div>
                            <div class="rating-bar">
                                <span class="stars">1星</span>
                                <div class="bar"><div class="fill" style="width: 2%"></div></div>
                                <span class="count">25</span>
                            </div>
                        </div>
                    </div>

                    <div class="reviews-list">
                        <div class="review" data-rating="5">
                            <div class="review-header">
                                <span class="reviewer">Python学习者</span>
                                <div class="review-stars">★★★★★</div>
                                <span class="review-date">2024-01-15</span>
                            </div>
                            <div class="review-content">
                                <p>非常好的Python入门书籍,内容详实,案例丰富。作为零基础学习者,我能够很好地理解书中的内容。</p>
                            </div>
                            <div class="review-helpful">
                                <button class="helpful-btn" data-count="23">有用 (23)</button>
                            </div>
                        </div>

                        <div class="review" data-rating="4">
                            <div class="review-header">
                                <span class="reviewer">编程新手</span>
                                <div class="review-stars">★★★★☆</div>
                                <span class="review-date">2024-01-10</span>
                            </div>
                            <div class="review-content">
                                <p>书的质量不错,内容也比较全面。就是有些地方讲解得不够深入,需要结合其他资料学习。</p>
                            </div>
                            <div class="review-helpful">
                                <button class="helpful-btn" data-count="15">有用 (15)</button>
                            </div>
                        </div>

                        <div class="review" data-rating="5">
                            <div class="review-header">
                                <span class="reviewer">技术爱好者</span>
                                <div class="review-stars">★★★★★</div>
                                <span class="review-date">2024-01-08</span>
                            </div>
                            <div class="review-content">
                                <p>推荐给所有想学Python的朋友!书中的实战项目很有意思,跟着做完后收获很大。</p>
                            </div>
                            <div class="review-helpful">
                                <button class="helpful-btn" data-count="31">有用 (31)</button>
                            </div>
                        </div>
                    </div>
                </div>
            </section>
        </div>
    </body>
    </html>
    """

    soup = BeautifulSoup(html_content, 'html.parser')

    # 1. 基本文本提取
    print("\n1. 基本文本提取:")

    # 提取标题
    title = soup.find('h1', class_='product-title')
    print(f"商品标题: {title.get_text() if title else 'N/A'}")

    # 提取副标题
    subtitle = soup.find('div', class_='product-subtitle')
    print(f"商品副标题: {subtitle.get_text() if subtitle else 'N/A'}")

    # 提取价格信息
    current_price = soup.find('span', class_='current-price')
    original_price = soup.find('span', class_='original-price')
    discount = soup.find('span', class_='discount')

    print(f"当前价格: {current_price.get_text() if current_price else 'N/A'}")
    print(f"原价: {original_price.get_text() if original_price else 'N/A'}")
    print(f"折扣: {discount.get_text() if discount else 'N/A'}")

    # 2. 属性值提取
    print("\n2. 属性值提取:")

    # 提取数据属性
    rating_element = soup.find('div', class_='stars')
    if rating_element:
        rating = rating_element.get('data-rating')
        print(f"评分: {rating}")

    # 提取价格数据属性
    if current_price:
        price_value = current_price.get('data-price')
        print(f"价格数值: {price_value}")

    # 提取产品ID
    add_to_cart_btn = soup.find('button', class_='add-to-cart')
    if add_to_cart_btn:
        product_id = add_to_cart_btn.get('data-product-id')
        print(f"产品ID: {product_id}")

    # 提取图片信息
    main_image = soup.find('img', class_='main-image')
    if main_image:
        img_src = main_image.get('src')
        img_alt = main_image.get('alt')
        print(f"主图片: {img_src}, 描述: {img_alt}")

    # 3. 表格数据提取
    print("\n3. 表格数据提取:")

    specs_table = soup.find('table', class_='specs-table')
    if specs_table:
        specs = {}
        rows = specs_table.find_all('tr')
        for row in rows:
            name_cell = row.find('td', class_='spec-name')
            value_cell = row.find('td', class_='spec-value')
            if name_cell and value_cell:
                specs[name_cell.get_text()] = value_cell.get_text()

        print("商品规格:")
        for key, value in specs.items():
            print(f"  {key}: {value}")

    # 4. 列表数据提取
    print("\n4. 列表数据提取:")

    # 提取面包屑导航
    breadcrumb = soup.find('nav', class_='breadcrumb')
    if breadcrumb:
        links = breadcrumb.find_all('a')
        current = breadcrumb.find('span', class_='current')

        breadcrumb_path = [link.get_text() for link in links]
        if current:
            breadcrumb_path.append(current.get_text())

        print(f"导航路径: {' > '.join(breadcrumb_path)}")

    # 提取特性列表
    feature_list = soup.find('ul', class_='feature-list')
    if feature_list:
        features = [li.get_text().strip() for li in feature_list.find_all('li')]
        print(f"产品特性: {features}")

    # 5. 复杂结构数据提取
    print("\n5. 复杂结构数据提取:")

    # 提取评价信息
    reviews = []
    review_elements = soup.find_all('div', class_='review')

    for review_elem in review_elements:
        reviewer = review_elem.find('span', class_='reviewer')
        rating_stars = review_elem.find('div', class_='review-stars')
        date = review_elem.find('span', class_='review-date')
        content = review_elem.find('div', class_='review-content')
        helpful_btn = review_elem.find('button', class_='helpful-btn')

        review_data = {
            'reviewer': reviewer.get_text() if reviewer else 'Anonymous',
            'rating': review_elem.get('data-rating') if review_elem.has_attr('data-rating') else 'N/A',
            'date': date.get_text() if date else 'N/A',
            'content': content.get_text().strip() if content else 'N/A',
            'helpful_count': helpful_btn.get('data-count') if helpful_btn else '0'
        }
        reviews.append(review_data)

    print(f"用户评价 ({len(reviews)}条):")
    for i, review in enumerate(reviews, 1):
        print(f"  评价{i}:")
        print(f"    用户: {review['reviewer']}")
        print(f"    评分: {review['rating']}星")
        print(f"    日期: {review['date']}")
        print(f"    内容: {review['content'][:50]}...")
        print(f"    有用数: {review['helpful_count']}")
        print()

    # 6. 评分统计提取
    print("\n6. 评分统计提取:")

    rating_bars = soup.find_all('div', class_='rating-bar')
    rating_stats = {}

    for bar in rating_bars:
        stars = bar.find('span', class_='stars')
        count = bar.find('span', class_='count')
        fill_elem = bar.find('div', class_='fill')

        if stars and count:
            star_level = stars.get_text()
            count_num = count.get_text()
            percentage = '0%'

            if fill_elem and fill_elem.has_attr('style'):
                style = fill_elem.get('style')
                # 提取width百分比
                import re
                width_match = re.search(r'width:\s*(\d+%)', style)
                if width_match:
                    percentage = width_match.group(1)

            rating_stats[star_level] = {
                'count': count_num,
                'percentage': percentage
            }

    print("评分分布:")
    for star_level, stats in rating_stats.items():
        print(f"  {star_level}: {stats['count']}条 ({stats['percentage']})")

    # 7. 文本清理和格式化
    print("\n7. 文本清理和格式化:")

    # 提取并清理描述文本
    description = soup.find('div', class_='description-text')
    if description:
        # 获取纯文本,去除HTML标签
        clean_text = description.get_text(separator=' ', strip=True)
        print(f"商品描述: {clean_text[:100]}...")

        # 提取段落
        paragraphs = [p.get_text().strip() for p in description.find_all('p')]
        print(f"描述段落数: {len(paragraphs)}")

    # 8. 条件提取
    print("\n8. 条件提取:")

    # 提取高评分评价
    high_rating_reviews = soup.find_all('div', class_='review', attrs={'data-rating': lambda x: x and int(x) >= 4})
    print(f"高评分评价数量: {len(high_rating_reviews)}")

    # 提取有用评价(有用数>20)
    useful_reviews = []
    for review in soup.find_all('div', class_='review'):
        helpful_btn = review.find('button', class_='helpful-btn')
        if helpful_btn:
            count = helpful_btn.get('data-count')
            if count and int(count) > 20:
                reviewer = review.find('span', class_='reviewer')
                useful_reviews.append(reviewer.get_text() if reviewer else 'Anonymous')

    print(f"有用评价用户: {useful_reviews}")

    # 9. 数据验证和错误处理
    print("\n9. 数据验证和错误处理:")

    # 安全提取价格
    def safe_extract_price(element):
        if not element:
            return None

        price_text = element.get_text().strip()
        # 提取数字
        import re
        price_match = re.search(r'([\d.]+)', price_text)
        if price_match:
            try:
                return float(price_match.group(1))
            except ValueError:
                return None
        return None

    current_price_value = safe_extract_price(current_price)
    original_price_value = safe_extract_price(original_price)

    print(f"当前价格数值: {current_price_value}")
    print(f"原价数值: {original_price_value}")

    if current_price_value and original_price_value:
        savings = original_price_value - current_price_value
        discount_percent = (savings / original_price_value) * 100
        print(f"节省金额: ¥{savings:.2f}")
        print(f"折扣百分比: {discount_percent:.1f}%")

    # 10. 综合数据结构
    print("\n10. 综合数据结构:")

    # 构建完整的产品数据结构
    product_data = {
        'basic_info': {
            'title': title.get_text() if title else None,
            'subtitle': subtitle.get_text() if subtitle else None,
            'product_id': product_id if 'product_id' in locals() else None
        },
        'pricing': {
            'current_price': current_price_value,
            'original_price': original_price_value,
            'discount_text': discount.get_text() if discount else None
        },
        'rating': {
            'score': rating if 'rating' in locals() else None,
            'total_reviews': len(reviews),
            'rating_distribution': rating_stats
        },
        'specifications': specs if 'specs' in locals() else {},
        'features': features if 'features' in locals() else [],
        'reviews_sample': reviews[:2]  # 只保留前两条评价作为示例
    }

    print("产品数据结构:")
    import json
    print(json.dumps(product_data, ensure_ascii=False, indent=2))

# 运行数据提取演示
if __name__ == "__main__":
    data_extraction_demo()

终端日志:

=== 数据提取功能演示 ===

1. 基本文本提取:
商品标题: Python从入门到精通(第3版)
商品副标题: 零基础学Python,包含大量实战案例
当前价格: ¥89.00
原价: ¥128.00
折扣: 7折

2. 属性值提取:
评分: 4.5
价格数值: 89.00
产品ID: 12345
主图片: /images/python-book-cover.jpg, 描述: Python从入门到精通封面

3. 表格数据提取:
商品规格:
  作者: 张三, 李四
  出版社: 人民邮电出版社
  出版时间: 2024年1月
  页数: 568页
  ISBN: 978-7-115-12345-6
  重量: 0.8kg

4. 列表数据提取:
导航路径: 首页 > 图书 > 编程 > Python从入门到精通
产品特性: ['✓ 零基础入门,循序渐进', '✓ 大量实战案例,学以致用', '✓ 配套视频教程,立体学习', '✓ 技术社区支持,答疑解惑']

5. 复杂结构数据提取:
用户评价 (3条):
  评价1:
    用户: Python学习者
    评分: 5星
    日期: 2024-01-15
    内容: 非常好的Python入门书籍,内容详实,案例丰富。作为零基础学习者,我能够很好地理解书中的内容。...
    有用数: 23

  评价2:
    用户: 编程新手
    评分: 4星
    日期: 2024-01-10
    内容: 书的质量不错,内容也比较全面。就是有些地方讲解得不够深入,需要结合其他资料学习。...
    有用数: 15

  评价3:
    用户: 技术爱好者
    评分: 5星
    日期: 2024-01-08
    内容: 推荐给所有想学Python的朋友!书中的实战项目很有意思,跟着做完后收获很大。...
    有用数: 31

6. 评分统计提取:
评分分布:
  5星: 740条 (60%)
  4星: 309条 (25%)
  3星: 123条 (10%)
  2星: 37条 (3%)
  1星: 25条 (2%)

7. 文本清理和格式化:
商品描述: 本书是Python编程的入门经典教程,适合零基础读者学习。 全书共分为15个章节,涵盖了Python的基础语法、数据结构、面向对象编程、文件操作、网络编程等核心内容。 ✓ 零基础入门,循序渐进 ✓ 大量实战案例,学以致用 ✓ 配套视频教程,立体学习 ✓ 技术社区支持,答疑解惑...
描述段落数: 2

8. 条件提取:
高评分评价数量: 2
有用评价用户: ['Python学习者', '技术爱好者']

9. 数据验证和错误处理:
当前价格数值: 89.0
原价数值: 128.0
节省金额: ¥39.00
折扣百分比: 30.5%

10. 综合数据结构:
产品数据结构:
{
  "basic_info": {
    "title": "Python从入门到精通(第3版)",
    "subtitle": "零基础学Python,包含大量实战案例",
    "product_id": "12345"
  },
  "pricing": {
    "current_price": 89.0,
    "original_price": 128.0,
    "discount_text": "7折"
  },
  "rating": {
    "score": "4.5",
    "total_reviews": 3,
    "rating_distribution": {
      "5星": {
        "count": "740",
        "percentage": "60%"
      },
      "4星": {
        "count": "309",
        "percentage": "25%"
      },
      "3星": {
        "count": "123",
        "percentage": "10%"
      },
      "2星": {
        "count": "37",
        "percentage": "3%"
      },
      "1星": {
        "count": "25",
        "percentage": "2%"
      }
    }
  },
  "specifications": {
    "作者": "张三, 李四",
    "出版社": "人民邮电出版社",
    "出版时间": "2024年1月",
    "页数": "568页",
    "ISBN": "978-7-115-12345-6",
    "重量": "0.8kg"
  },
  "features": [
    "✓ 零基础入门,循序渐进",
    "✓ 大量实战案例,学以致用",
    "✓ 配套视频教程,立体学习",
    "✓ 技术社区支持,答疑解惑"
  ],
  "reviews_sample": [
    {
      "reviewer": "Python学习者",
      "rating": "5",
      "date": "2024-01-15",
      "content": "非常好的Python入门书籍,内容详实,案例丰富。作为零基础学习者,我能够很好地理解书中的内容。",
      "helpful_count": "23"
    },
    {
      "reviewer": "编程新手",
      "rating": "4",
      "date": "2024-01-10",
      "content": "书的质量不错,内容也比较全面。就是有些地方讲解得不够深入,需要结合其他资料学习。",
      "helpful_count": "15"
    }
  ]
}

高级操作

文档修改

BeautifulSoup不仅可以解析HTML,还可以修改文档结构。

def document_modification_demo():
    """
    演示文档修改功能
    """
    print("=== 文档修改功能演示 ===")

    # 示例HTML - 简单的博客文章
    html_content = """
    <!DOCTYPE html>
    <html>
    <head>
        <title>我的博客文章</title>
        <meta name="author" content="原作者">
    </head>
    <body>
        <div class="container">
            <header>
                <h1>Python学习笔记</h1>
                <p class="meta">发布时间: 2024-01-01</p>
            </header>

            <main class="content">
                <section class="intro">
                    <h2>简介</h2>
                    <p>这是一篇关于Python基础的文章。</p>
                </section>

                <section class="topics">
                    <h2>主要内容</h2>
                    <ul id="topic-list">
                        <li>变量和数据类型</li>
                        <li>控制结构</li>
                    </ul>
                </section>

                <section class="examples">
                    <h2>代码示例</h2>
                    <div class="code-block">
                        <pre><code>print("Hello, World!")</code></pre>
                    </div>
                </section>
            </main>

            <footer>
                <p>版权所有 © 2024</p>
            </footer>
        </div>
    </body>
    </html>
    """

    soup = BeautifulSoup(html_content, 'html.parser')

    print("\n1. 修改文本内容:")

    # 修改标题
    title_tag = soup.find('h1')
    if title_tag:
        old_title = title_tag.get_text()
        title_tag.string = "Python高级编程技巧"
        print(f"标题修改: '{old_title}' -> '{title_tag.get_text()}'")

    # 修改作者信息
    author_meta = soup.find('meta', attrs={'name': 'author'})
    if author_meta:
        old_author = author_meta.get('content')
        author_meta['content'] = "技术专家"
        print(f"作者修改: '{old_author}' -> '{author_meta.get('content')}'")

    # 修改发布时间
    meta_p = soup.find('p', class_='meta')
    if meta_p:
        old_time = meta_p.get_text()
        meta_p.string = "发布时间: 2024-01-15 (已更新)"
        print(f"时间修改: '{old_time}' -> '{meta_p.get_text()}'")

    print("\n2. 添加新元素:")

    # 在列表中添加新项目
    topic_list = soup.find('ul', id='topic-list')
    if topic_list:
        # 创建新的li元素
        new_li1 = soup.new_tag('li')
        new_li1.string = "函数和模块"

        new_li2 = soup.new_tag('li')
        new_li2.string = "面向对象编程"

        new_li3 = soup.new_tag('li')
        new_li3.string = "异常处理"

        # 添加到列表末尾
        topic_list.append(new_li1)
        topic_list.append(new_li2)
        topic_list.append(new_li3)

        print(f"添加了3个新的主题项目")
        print(f"当前主题列表: {[li.get_text() for li in topic_list.find_all('li')]}")

    # 添加新的代码示例
    examples_section = soup.find('section', class_='examples')
    if examples_section:
        # 创建新的代码块
        new_code_block = soup.new_tag('div', class_='code-block')
        new_pre = soup.new_tag('pre')
        new_code = soup.new_tag('code')
        new_code.string = '''def greet(name):
    return f"Hello, {name}!"

print(greet("Python"))'''

        new_pre.append(new_code)
        new_code_block.append(new_pre)
        examples_section.append(new_code_block)

        print("添加了新的代码示例")

    # 添加新的section
    main_content = soup.find('main', class_='content')
    if main_content:
        new_section = soup.new_tag('section', class_='resources')
        new_h2 = soup.new_tag('h2')
        new_h2.string = "学习资源"

        new_ul = soup.new_tag('ul')
        resources = [
            "Python官方文档",
            "在线编程练习",
            "开源项目参与"
        ]

        for resource in resources:
            li = soup.new_tag('li')
            li.string = resource
            new_ul.append(li)

        new_section.append(new_h2)
        new_section.append(new_ul)
        main_content.append(new_section)

        print("添加了新的学习资源section")

    print("\n3. 修改属性:")

    # 修改容器类名
    container = soup.find('div', class_='container')
    if container:
        old_class = container.get('class')
        container['class'] = ['main-container', 'updated']
        container['data-version'] = '2.0'
        print(f"容器类名修改: {old_class} -> {container.get('class')}")
        print(f"添加了data-version属性: {container.get('data-version')}")

    # 为代码块添加语言标识
    code_blocks = soup.find_all('div', class_='code-block')
    for i, block in enumerate(code_blocks):
        block['data-language'] = 'python'
        block['data-line-numbers'] = 'true'
        print(f"代码块{i+1}添加了语言标识和行号属性")

    print("\n4. 删除元素:")

    # 删除版权信息(示例)
    footer = soup.find('footer')
    if footer:
        copyright_p = footer.find('p')
        if copyright_p:
            old_text = copyright_p.get_text()
            copyright_p.decompose()  # 完全删除元素
            print(f"删除了版权信息: '{old_text}'")

    print("\n5. 元素移动和重排:")

    # 将简介section移动到主要内容之后
    intro_section = soup.find('section', class_='intro')
    topics_section = soup.find('section', class_='topics')

    if intro_section and topics_section:
        # 从当前位置移除
        intro_section.extract()
        # 插入到topics_section之后
        topics_section.insert_after(intro_section)
        print("将简介section移动到主要内容section之后")

    print("\n6. 批量操作:")

    # 为所有h2标签添加id属性
    h2_tags = soup.find_all('h2')
    for h2 in h2_tags:
        # 生成id(将标题转换为合适的id格式)
        title_text = h2.get_text().lower().replace(' ', '-').replace(',', '')
        h2['id'] = f"section-{title_text}"
        print(f"为h2标签添加id: {h2['id']}")

    # 为所有链接添加target="_blank"
    links = soup.find_all('a')
    for link in links:
        link['target'] = '_blank'
        link['rel'] = 'noopener noreferrer'

    if links:
        print(f"为{len(links)}个链接添加了target和rel属性")
    else:
        print("没有找到链接元素")

    print("\n7. 条件修改:")

    # 只修改包含特定文本的元素
    all_p = soup.find_all('p')
    modified_count = 0

    for p in all_p:
        text = p.get_text()
        if 'Python' in text:
            # 添加强调样式
            p['class'] = p.get('class', []) + ['python-related']
            p['style'] = 'font-weight: bold; color: #3776ab;'
            modified_count += 1

    print(f"为{modified_count}个包含'Python'的段落添加了样式")

    print("\n8. 创建复杂结构:")

    # 创建一个导航菜单
    nav = soup.new_tag('nav', class_='table-of-contents')
    nav_title = soup.new_tag('h3')
    nav_title.string = "目录"
    nav_ul = soup.new_tag('ul')

    # 基于现有的h2标签创建导航
    for h2 in soup.find_all('h2'):
        li = soup.new_tag('li')
        a = soup.new_tag('a', href=f"#{h2.get('id', '')}")
        a.string = h2.get_text()
        li.append(a)
        nav_ul.append(li)

    nav.append(nav_title)
    nav.append(nav_ul)

    # 将导航插入到header之后
    header = soup.find('header')
    if header:
        header.insert_after(nav)
        print("创建并插入了目录导航")

    print("\n9. 文档结构优化:")

    # 添加语义化标签
    main_tag = soup.find('main')
    if main_tag:
        # 为main标签添加role属性
        main_tag['role'] = 'main'
        main_tag['aria-label'] = '主要内容'
        print("为main标签添加了无障碍属性")

    # 添加meta标签
    head = soup.find('head')
    if head:
        # 添加viewport meta
        viewport_meta = soup.new_tag('meta', attrs={
            'name': 'viewport',
            'content': 'width=device-width, initial-scale=1.0'
        })

        # 添加description meta
        desc_meta = soup.new_tag('meta', attrs={
            'name': 'description',
            'content': 'Python高级编程技巧学习笔记,包含函数、面向对象编程、异常处理等内容。'
        })

        head.append(viewport_meta)
        head.append(desc_meta)
        print("添加了viewport和description meta标签")

    print("\n10. 输出修改后的文档:")

    # 格式化输出
    formatted_html = soup.prettify()
    print("修改后的HTML文档:")
    print(formatted_html[:1000] + "..." if len(formatted_html) > 1000 else formatted_html)

    # 统计信息
    print(f"\n文档统计:")
    print(f"  总标签数: {len(soup.find_all())}")
    print(f"  段落数: {len(soup.find_all('p'))}")
    print(f"  标题数: {len(soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']))}")
    print(f"  列表项数: {len(soup.find_all('li'))}")
    print(f"  代码块数: {len(soup.find_all('div', class_='code-block'))}")

    return soup

# 运行文档修改演示
if __name__ == "__main__":
    modified_soup = document_modification_demo()

终端日志:

=== 文档修改功能演示 ===

1. 修改文本内容:
标题修改: 'Python学习笔记' -> 'Python高级编程技巧'
作者修改: '原作者' -> '技术专家'
时间修改: '发布时间: 2024-01-01' -> '发布时间: 2024-01-15 (已更新)'

2. 添加新元素:
添加了3个新的主题项目
当前主题列表: ['变量和数据类型', '控制结构', '函数和模块', '面向对象编程', '异常处理']
添加了新的代码示例
添加了新的学习资源section

3. 修改属性:
容器类名修改: ['container'] -> ['main-container', 'updated']
添加了data-version属性: 2.0
代码块1添加了语言标识和行号属性
代码块2添加了语言标识和行号属性

4. 删除元素:
删除了版权信息: '版权所有 © 2024'

5. 元素移动和重排:
将简介section移动到主要内容section之后

6. 批量操作:
为h2标签添加id: section-主要内容
为h2标签添加id: section-简介
为h2标签添加id: section-代码示例
为h2标签添加id: section-学习资源
没有找到链接元素

7. 条件修改:
为1个包含'Python'的段落添加了样式

8. 创建复杂结构:
创建并插入了目录导航

9. 文档结构优化:
为main标签添加了无障碍属性
添加了viewport和description meta标签

10. 输出修改后的文档:
修改后的HTML文档:
<!DOCTYPE html>
<html>
 <head>
  <title>
   我的博客文章
  </title>
  <meta content="技术专家" name="author"/>
  <meta content="width=device-width, initial-scale=1.0" name="viewport"/>
  <meta content="Python高级编程技巧学习笔记,包含函数、面向对象编程、异常处理等内容。" name="description"/>
 </head>
 <body>
  <div class="main-container updated" data-version="2.0">
   <header>
    <h1>
     Python高级编程技巧
    </h1>
    <p class="meta">
     发布时间: 2024-01-15 (已更新)
    </p>
   </header>
   <nav class="table-of-contents">
    <h3>
     目录
    </h3>
    <ul>
     <li>
      <a href="#section-主要内容">
       主要内容
      </a>
     </li>
     <li>
      <a href="#section-简介">
       简介
      </a>
     </li>
     <li>
      <a href="#section-代码示例">
       代码示例
      </a>
     </li>
     <li>
      <a href="#section-学习资源">
       学习资源
      </a>
     </li>
    </ul>
   </nav>
   <main aria-label="主要内容" class="content" role="main">
    <section class="topics">
     <h2 id="section-主要内容">
      主要内容
     </h2>
     <ul id="topic-list">
      <li>
       变量和数据类型
      </li>
      <li>
       控制结构
      </li>
      <li>
       函数和模块
      </li>
      <li>
       面向对象编程
      </li>
      <li>
       异常处理
      </li>
     </ul>
    </section>
    <section class="intro">
     <h2 id="section-简介">
      简介
     </h2>
     <p class="python-related" style="font-weight: bold; color: #3776ab;">
      这是一篇关于Python基础的文章。
     </p>
    </section>
    <section class="examples">
     <h2 id="section-代码示例">
      代码示例
     </h2>
     <div class="code-block" data-language="python" data-line-numbers="true">
      <pre><code>print("Hello, World!")</code></pre>
     </div>
     <div class="code-block" data-language="python" data-line-numbers="true">
      <pre><code>def greet(name):
    return f"Hello, {name}!"

print(greet("Python"))</code></pre>
     </div>
    </section>
    <section class="resources">
     <h2 id="section-学习资源">
      学习资源
     </h2>
     <ul>
      <li>
       Python官方文档
      </li>
      <li>
       在线编程练习
      </li>
      <li>
       开源项目参与
      </li>
     </ul>
    </section>
   </main>
   <footer>
   </footer>
  </div>
 </body>
</html>...

文档统计:
  总标签数: 32
  段落数: 1
  标题数: 5
  列表项数: 11
  代码块数: 2
元素插入和删除

BeautifulSoup提供了灵活的元素插入和删除方法。

def element_operations_demo():
    """
    演示元素插入和删除操作
    """
    print("=== 元素插入和删除操作演示 ===")

    # 示例HTML - 文章列表
    html_content = """
    <!DOCTYPE html>
    <html>
    <head>
        <title>文章管理系统</title>
    </head>
    <body>
        <div class="article-manager">
            <header class="page-header">
                <h1>文章列表</h1>
                <div class="actions">
                    <button class="btn-new">新建文章</button>
                </div>
            </header>

            <main class="article-list">
                <article class="article-item" data-id="1">
                    <h2 class="article-title">Python基础教程</h2>
                    <p class="article-summary">学习Python编程的基础知识</p>
                    <div class="article-meta">
                        <span class="author">作者: 张三</span>
                        <span class="date">2024-01-01</span>
                        <span class="category">编程</span>
                    </div>
                    <div class="article-actions">
                        <button class="btn-edit">编辑</button>
                        <button class="btn-delete">删除</button>
                    </div>
                </article>

                <article class="article-item" data-id="2">
                    <h2 class="article-title">Web开发入门</h2>
                    <p class="article-summary">从零开始学习Web开发</p>
                    <div class="article-meta">
                        <span class="author">作者: 李四</span>
                        <span class="date">2024-01-05</span>
                        <span class="category">Web开发</span>
                    </div>
                    <div class="article-actions">
                        <button class="btn-edit">编辑</button>
                        <button class="btn-delete">删除</button>
                    </div>
                </article>
            </main>

            <footer class="page-footer">
                <p>共 2 篇文章</p>
            </footer>
        </div>
    </body>
    </html>
    """

    soup = BeautifulSoup(html_content, 'html.parser')

    print("\n1. 在指定位置插入元素:")

    # 在第一篇文章前插入新文章
    article_list = soup.find('main', class_='article-list')
    first_article = soup.find('article', class_='article-item')

    if article_list and first_article:
        # 创建新文章
        new_article = soup.new_tag('article', class_='article-item featured', **{'data-id': '0'})

        # 创建文章标题
        title = soup.new_tag('h2', class_='article-title')
        title.string = "🔥 热门推荐:Python高级特性详解"

        # 创建文章摘要
        summary = soup.new_tag('p', class_='article-summary')
        summary.string = "深入了解Python的高级特性和最佳实践"

        # 创建元数据
        meta_div = soup.new_tag('div', class_='article-meta')

        author_span = soup.new_tag('span', class_='author')
        author_span.string = "作者: 技术专家"

        date_span = soup.new_tag('span', class_='date')
        date_span.string = "2024-01-15"

        category_span = soup.new_tag('span', class_='category featured-category')
        category_span.string = "高级编程"

        meta_div.extend([author_span, date_span, category_span])

        # 创建操作按钮
        actions_div = soup.new_tag('div', class_='article-actions')

        edit_btn = soup.new_tag('button', class_='btn-edit')
        edit_btn.string = "编辑"

        delete_btn = soup.new_tag('button', class_='btn-delete')
        delete_btn.string = "删除"

        pin_btn = soup.new_tag('button', class_='btn-pin')
        pin_btn.string = "置顶"

        actions_div.extend([edit_btn, delete_btn, pin_btn])

        # 组装新文章
        new_article.extend([title, summary, meta_div, actions_div])

        # 插入到第一篇文章前
        first_article.insert_before(new_article)

        print("在列表开头插入了特色文章")

    # 在最后一篇文章后插入新文章
    all_articles = soup.find_all('article', class_='article-item')
    if all_articles:
        last_article = all_articles[-1]

        # 创建另一篇新文章
        another_article = soup.new_tag('article', class_='article-item draft', **{'data-id': '3'})

        title = soup.new_tag('h2', class_='article-title')
        title.string = "📝 草稿:数据库设计原理"

        summary = soup.new_tag('p', class_='article-summary')
        summary.string = "数据库设计的基本原理和最佳实践(草稿状态)"

        meta_div = soup.new_tag('div', class_='article-meta')

        author_span = soup.new_tag('span', class_='author')
        author_span.string = "作者: 王五"

        date_span = soup.new_tag('span', class_='date')
        date_span.string = "2024-01-16"

        status_span = soup.new_tag('span', class_='status draft-status')
        status_span.string = "草稿"

        meta_div.extend([author_span, date_span, status_span])

        actions_div = soup.new_tag('div', class_='article-actions')

        edit_btn = soup.new_tag('button', class_='btn-edit primary')
        edit_btn.string = "继续编辑"

        publish_btn = soup.new_tag('button', class_='btn-publish')
        publish_btn.string = "发布"

        delete_btn = soup.new_tag('button', class_='btn-delete')
        delete_btn.string = "删除"

        actions_div.extend([edit_btn, publish_btn, delete_btn])

        another_article.extend([title, summary, meta_div, actions_div])

        # 插入到最后一篇文章后
        last_article.insert_after(another_article)

        print("在列表末尾插入了草稿文章")

    print("\n2. 在父元素中插入子元素:")

    # 在页面头部添加搜索框
    page_header = soup.find('header', class_='page-header')
    if page_header:
        # 创建搜索区域
        search_div = soup.new_tag('div', class_='search-area')

        search_input = soup.new_tag('input', type='text', placeholder='搜索文章...', class_='search-input')
        search_btn = soup.new_tag('button', class_='btn-search')
        search_btn.string = "搜索"

        search_div.extend([search_input, search_btn])

        # 插入到actions div之前
        actions_div = page_header.find('div', class_='actions')
        if actions_div:
            actions_div.insert_before(search_div)
            print("在页面头部添加了搜索区域")

    # 在每篇文章中添加标签
    articles = soup.find_all('article', class_='article-item')
    for i, article in enumerate(articles):
        meta_div = article.find('div', class_='article-meta')
        if meta_div:
            # 创建标签容器
            tags_div = soup.new_tag('div', class_='article-tags')

            # 根据文章类型添加不同标签
            if 'featured' in article.get('class', []):
                tags = ['热门', '推荐', 'Python']
            elif 'draft' in article.get('class', []):
                tags = ['草稿', '数据库']
            else:
                tags = ['基础', '教程']

            for tag in tags:
                tag_span = soup.new_tag('span', class_='tag')
                tag_span.string = tag
                tags_div.append(tag_span)

            # 插入到meta div之后
            meta_div.insert_after(tags_div)

        print(f"为文章{i+1}添加了标签")

    print("\n3. 删除元素:")

    # 删除第二篇文章(原来的第一篇)
    articles = soup.find_all('article', class_='article-item')
    if len(articles) > 1:
        article_to_delete = articles[1]  # 第二篇文章
        article_title = article_to_delete.find('h2', class_='article-title')
        title_text = article_title.get_text() if article_title else "未知标题"

        article_to_delete.decompose()  # 完全删除
        print(f"删除了文章: '{title_text}'")

    # 删除所有草稿状态的文章
    draft_articles = soup.find_all('article', class_='draft')
    deleted_drafts = []

    for draft in draft_articles:
        title_elem = draft.find('h2', class_='article-title')
        if title_elem:
            deleted_drafts.append(title_elem.get_text())
        draft.decompose()

    if deleted_drafts:
        print(f"删除了草稿文章: {deleted_drafts}")
    else:
        print("没有找到草稿文章")

    # 删除特定的按钮
    pin_buttons = soup.find_all('button', class_='btn-pin')
    for btn in pin_buttons:
        btn.decompose()

    if pin_buttons:
        print(f"删除了{len(pin_buttons)}个置顶按钮")

    print("\n4. 替换元素:")

    # 替换页面标题
    page_title = soup.find('h1')
    if page_title:
        old_title = page_title.get_text()

        # 创建新的标题元素
        new_title = soup.new_tag('h1', class_='main-title')
        new_title.string = "📚 技术文章管理中心"

        # 替换
        page_title.replace_with(new_title)
        print(f"页面标题替换: '{old_title}' -> '{new_title.get_text()}'")

    # 替换所有编辑按钮为更详细的按钮
    edit_buttons = soup.find_all('button', class_='btn-edit')
    for btn in edit_buttons:
        # 创建新的按钮组
        btn_group = soup.new_tag('div', class_='btn-group')

        quick_edit = soup.new_tag('button', class_='btn-quick-edit')
        quick_edit.string = "快速编辑"

        full_edit = soup.new_tag('button', class_='btn-full-edit')
        full_edit.string = "完整编辑"

        btn_group.extend([quick_edit, full_edit])

        # 替换原按钮
        btn.replace_with(btn_group)

    print(f"替换了{len(edit_buttons)}个编辑按钮为按钮组")

    print("\n5. 移动元素:")

    # 将搜索区域移动到标题之前
    search_area = soup.find('div', class_='search-area')
    main_title = soup.find('h1', class_='main-title')

    if search_area and main_title:
        # 提取搜索区域
        search_area.extract()
        # 插入到标题之前
        main_title.insert_before(search_area)
        print("将搜索区域移动到标题之前")

    # 重新排序文章(按日期)
    article_list = soup.find('main', class_='article-list')
    if article_list:
        articles = article_list.find_all('article', class_='article-item')

        # 提取所有文章
        article_data = []
        for article in articles:
            date_elem = article.find('span', class_='date')
            date_str = date_elem.get_text() if date_elem else "2024-01-01"
            article_data.append((date_str, article.extract()))

        # 按日期排序(最新的在前)
        article_data.sort(key=lambda x: x[0], reverse=True)

        # 重新插入排序后的文章
        for date_str, article in article_data:
            article_list.append(article)

        print(f"按日期重新排序了{len(article_data)}篇文章")

    print("\n6. 批量操作:")

    # 为所有文章添加阅读时间估算
    articles = soup.find_all('article', class_='article-item')
    for article in articles:
        summary = article.find('p', class_='article-summary')
        if summary:
            # 估算阅读时间(基于摘要长度)
            text_length = len(summary.get_text())
            read_time = max(1, text_length // 50)  # 假设每50个字符需要1分钟

            read_time_span = soup.new_tag('span', class_='read-time')
            read_time_span.string = f"预计阅读: {read_time}分钟"

            # 插入到摘要之后
            summary.insert_after(read_time_span)

    print(f"为{len(articles)}篇文章添加了阅读时间估算")

    # 更新文章计数
    footer = soup.find('footer', class_='page-footer')
    if footer:
        count_p = footer.find('p')
        if count_p:
            current_count = len(soup.find_all('article', class_='article-item'))
            count_p.string = f"共 {current_count} 篇文章"
            print(f"更新了文章计数: {current_count}")

    print("\n7. 条件操作:")

    # 只对特色文章添加特殊标记
    featured_articles = soup.find_all('article', class_='featured')
    for article in featured_articles:
        title = article.find('h2', class_='article-title')
        if title and not title.get_text().startswith('🔥'):
            title.string = f"🔥 {title.get_text()}"

    print(f"为{len(featured_articles)}篇特色文章添加了火焰标记")

    # 为长摘要添加展开/收起功能
    summaries = soup.find_all('p', class_='article-summary')
    long_summaries = 0

    for summary in summaries:
        if len(summary.get_text()) > 30:  # 超过30个字符认为是长摘要
            summary['class'] = summary.get('class', []) + ['long-summary']
            summary['data-full-text'] = summary.get_text()

            # 创建展开按钮
            expand_btn = soup.new_tag('button', class_='btn-expand')
            expand_btn.string = "展开"

            summary.insert_after(expand_btn)
            long_summaries += 1

    print(f"为{long_summaries}个长摘要添加了展开功能")

    print("\n8. 最终文档统计:")

    # 统计最终结果
    final_stats = {
        '总文章数': len(soup.find_all('article', class_='article-item')),
        '特色文章数': len(soup.find_all('article', class_='featured')),
        '草稿文章数': len(soup.find_all('article', class_='draft')),
        '总按钮数': len(soup.find_all('button')),
        '标签数': len(soup.find_all('span', class_='tag')),
        '总元素数': len(soup.find_all())
    }

    for key, value in final_stats.items():
        print(f"  {key}: {value}")

    # 输出部分修改后的HTML
    print("\n9. 修改后的HTML片段:")
    article_list = soup.find('main', class_='article-list')
    if article_list:
        first_article = article_list.find('article')
        if first_article:
            print(first_article.prettify()[:500] + "...")

    return soup

# 运行元素操作演示
if __name__ == "__main__":
    modified_soup = element_operations_demo()
编码处理

BeautifulSoup能够自动处理各种字符编码问题。

def encoding_demo():
    """
    演示编码处理功能
    """
    print("=== 编码处理功能演示 ===")

    # 1. 自动编码检测
    print("\n1. 自动编码检测:")

    # 不同编码的HTML内容
    utf8_html = """
    <!DOCTYPE html>
    <html>
    <head>
        <meta charset="UTF-8">
        <title>中文测试页面</title>
    </head>
    <body>
        <h1>欢迎来到Python学习网站</h1>
        <p>这里有丰富的Python教程和实例。</p>
        <div class="content">
            <h2>特殊字符测试</h2>
            <p>数学符号: α β γ δ ε ∑ ∏ ∫</p>
            <p>货币符号: ¥ $ € £ ₹</p>
            <p>表情符号: 😀 😃 😄 😁 🚀 🎉</p>
            <p>其他语言: こんにちは 안녕하세요 Здравствуйте</p>
        </div>
    </body>
    </html>
    """

    # 使用BeautifulSoup解析UTF-8内容
    soup_utf8 = BeautifulSoup(utf8_html, 'html.parser')
    print(f"UTF-8解析结果:")
    print(f"  标题: {soup_utf8.find('title').get_text()}")
    print(f"  主标题: {soup_utf8.find('h1').get_text()}")

    # 获取原始编码信息
    original_encoding = soup_utf8.original_encoding
    print(f"  检测到的原始编码: {original_encoding}")

    # 2. 处理不同编码的内容
    print("\n2. 处理不同编码的内容:")

    # 模拟GBK编码的内容
    gbk_content = "<html><body><h1>中文标题</h1><p>这是GBK编码的内容</p></body></html>"

    try:
        # 将字符串编码为GBK字节
        gbk_bytes = gbk_content.encode('gbk')
        print(f"GBK字节长度: {len(gbk_bytes)}")

        # 使用BeautifulSoup解析GBK字节
        soup_gbk = BeautifulSoup(gbk_bytes, 'html.parser', from_encoding='gbk')
        print(f"GBK解析结果:")
        print(f"  标题: {soup_gbk.find('h1').get_text()}")
        print(f"  段落: {soup_gbk.find('p').get_text()}")

    except UnicodeEncodeError as e:
        print(f"GBK编码错误: {e}")

    # 3. 编码转换
    print("\n3. 编码转换:")

    # 获取不同编码格式的输出
    html_str = str(soup_utf8)

    # UTF-8编码
    utf8_bytes = html_str.encode('utf-8')
    print(f"UTF-8编码字节数: {len(utf8_bytes)}")

    # 尝试其他编码
    encodings_to_test = ['utf-8', 'gbk', 'iso-8859-1', 'ascii']

    for encoding in encodings_to_test:
        try:
            encoded_bytes = html_str.encode(encoding)
            print(f"{encoding.upper()}编码: 成功,{len(encoded_bytes)}字节")
        except UnicodeEncodeError as e:
            print(f"{encoding.upper()}编码: 失败 - {str(e)[:50]}...")

    # 4. 处理编码错误
    print("\n4. 处理编码错误:")

    # 创建包含特殊字符的内容
    special_html = """
    <html>
    <body>
        <h1>特殊字符处理测试</h1>
        <p>包含emoji: 🐍 Python编程</p>
        <p>数学公式: E = mc²</p>
        <p>版权符号: © 2024</p>
        <p>商标符号: Python™</p>
    </body>
    </html>
    """

    soup_special = BeautifulSoup(special_html, 'html.parser')

    # 不同的错误处理策略
    error_strategies = ['ignore', 'replace', 'xmlcharrefreplace']

    for strategy in error_strategies:
        try:
            # 尝试编码为ASCII(会出错)
            ascii_result = str(soup_special).encode('ascii', errors=strategy)
            decoded_result = ascii_result.decode('ascii')
            print(f"ASCII编码策略'{strategy}': 成功")
            print(f"  结果长度: {len(decoded_result)}字符")

            # 显示处理后的标题
            soup_result = BeautifulSoup(decoded_result, 'html.parser')
            title = soup_result.find('h1')
            if title:
                print(f"  处理后标题: {title.get_text()}")

        except Exception as e:
            print(f"ASCII编码策略'{strategy}': 失败 - {e}")

    # 5. 自定义编码处理
    print("\n5. 自定义编码处理:")

    def safe_encode_html(soup_obj, target_encoding='utf-8', fallback_encoding='ascii'):
        """
        安全地将BeautifulSoup对象编码为指定格式
        """
        html_str = str(soup_obj)

        try:
            # 尝试目标编码
            return html_str.encode(target_encoding)
        except UnicodeEncodeError:
            print(f"  {target_encoding}编码失败,尝试{fallback_encoding}")
            try:
                # 使用替换策略的后备编码
                return html_str.encode(fallback_encoding, errors='xmlcharrefreplace')
            except UnicodeEncodeError:
                print(f"  {fallback_encoding}编码也失败,使用忽略策略")
                return html_str.encode(fallback_encoding, errors='ignore')

    # 测试自定义编码函数
    safe_bytes = safe_encode_html(soup_special, 'ascii')
    print(f"安全编码结果: {len(safe_bytes)}字节")

    # 解码并验证
    safe_html = safe_bytes.decode('ascii')
    safe_soup = BeautifulSoup(safe_html, 'html.parser')
    safe_title = safe_soup.find('h1')
    if safe_title:
        print(f"安全编码后标题: {safe_title.get_text()}")

    # 6. 编码声明处理
    print("\n6. 编码声明处理:")

    # 检查和修改编码声明
    meta_charset = soup_utf8.find('meta', attrs={'charset': True})
    if meta_charset:
        original_charset = meta_charset.get('charset')
        print(f"原始字符集声明: {original_charset}")

        # 修改字符集声明
        meta_charset['charset'] = 'UTF-8'
        print(f"修改后字符集声明: {meta_charset.get('charset')}")

    # 添加编码声明(如果不存在)
    head = soup_utf8.find('head')
    if head and not head.find('meta', attrs={'charset': True}):
        charset_meta = soup_utf8.new_tag('meta', charset='UTF-8')
        head.insert(0, charset_meta)
        print("添加了字符集声明")

    # 7. 内容编码验证
    print("\n7. 内容编码验证:")

    def validate_encoding(html_content, expected_encoding='utf-8'):
        """
        验证HTML内容的编码
        """
        try:
            if isinstance(html_content, str):
                # 字符串内容,尝试编码
                html_content.encode(expected_encoding)
                return True, "字符串内容编码有效"
            elif isinstance(html_content, bytes):
                # 字节内容,尝试解码
                html_content.decode(expected_encoding)
                return True, "字节内容编码有效"
            else:
                return False, "未知内容类型"
        except UnicodeError as e:
            return False, f"编码验证失败: {e}"

    # 验证不同内容的编码
    test_contents = [
        (utf8_html, 'utf-8'),
        (str(soup_utf8), 'utf-8'),
        (str(soup_special), 'utf-8')
    ]

    for content, encoding in test_contents:
        is_valid, message = validate_encoding(content, encoding)
        print(f"  {encoding}编码验证: {'✓' if is_valid else '✗'} {message}")

    # 8. 编码统计信息
    print("\n8. 编码统计信息:")

    def analyze_encoding(soup_obj):
        """
        分析BeautifulSoup对象的编码信息
        """
        html_str = str(soup_obj)

        stats = {
            '总字符数': len(html_str),
            'ASCII字符数': sum(1 for c in html_str if ord(c) < 128),
            '非ASCII字符数': sum(1 for c in html_str if ord(c) >= 128),
            '中文字符数': sum(1 for c in html_str if '\u4e00' <= c <= '\u9fff'),
            '表情符号数': sum(1 for c in html_str if ord(c) > 0x1F600),
        }

        # 计算不同编码的字节数
        for encoding in ['utf-8', 'utf-16', 'utf-32']:
            try:
                byte_count = len(html_str.encode(encoding))
                stats[f'{encoding.upper()}字节数'] = byte_count
            except UnicodeEncodeError:
                stats[f'{encoding.upper()}字节数'] = '编码失败'

        return stats

    # 分析特殊字符内容
    encoding_stats = analyze_encoding(soup_special)

    print("特殊字符内容编码分析:")
    for key, value in encoding_stats.items():
        print(f"  {key}: {value}")

    # 9. 编码最佳实践建议
    print("\n9. 编码最佳实践建议:")

    recommendations = [
        "✓ 始终使用UTF-8编码处理HTML内容",
        "✓ 在HTML头部明确声明字符集",
        "✓ 处理用户输入时验证编码",
        "✓ 使用适当的错误处理策略",
        "✓ 测试特殊字符和多语言内容",
        "✓ 避免混合使用不同编码"
    ]

    for rec in recommendations:
        print(f"  {rec}")

    return soup_utf8, soup_special

# 运行编码处理演示
if __name__ == "__main__":
    utf8_soup, special_soup = encoding_demo()

终端日志:

=== 编码处理功能演示 ===

1. 自动编码检测:
UTF-8解析结果:
  标题: 中文测试页面
  主标题: 欢迎来到Python学习网站
  检测到的原始编码: None

2. 处理不同编码的内容:
GBK字节长度: 59
GBK解析结果:
  标题: 中文标题
  段落: 这是GBK编码的内容

3. 编码转换:
UTF-8编码字节数: 674
UTF-8编码: 成功,674字节
GBK编码: 成功,638字节
ISO-8859-1编码: 失败 - 'latin-1' codec can't encode character '\u4e2d'...
ASCII编码: 失败 - 'ascii' codec can't encode character '\u4e2d' in...

4. 处理编码错误:
ASCII编码策略'ignore': 成功
  结果长度: 158字符
  处理后标题: 
ASCII编码策略'replace': 成功
  结果长度: 398字符
  处理后标题: ????????????
ASCII编码策略'xmlcharrefreplace': 成功
  结果长度: 1058字符
  处理后标题: 特殊字符处理测试

5. 自定义编码处理:
  utf-8编码失败,尝试ascii
安全编码结果: 1058字节
安全编码后标题: 特殊字符处理测试

6. 编码声明处理:
原始字符集声明: UTF-8
修改后字符集声明: UTF-8

7. 内容编码验证:
  utf-8编码验证: ✓ 字符串内容编码有效
  utf-8编码验证: ✓ 字符串内容编码有效
  utf-8编码验证: ✓ 字符串内容编码有效

8. 编码统计信息:
特殊字符内容编码分析:
  总字符数: 254
  ASCII字符数: 158
  非ASCII字符数: 96
  中文字符数: 12
  表情符号数: 1
  UTF-8字节数: 302
  UTF-16字节数: 510
  UTF-32字节数: 1018

9. 编码最佳实践建议:
  ✓ 始终使用UTF-8编码处理HTML内容
  ✓ 在HTML头部明确声明字符集
  ✓ 处理用户输入时验证编码
  ✓ 使用适当的错误处理策略
  ✓ 测试特殊字符和多语言内容
  ✓ 避免混合使用不同编码
小夜