第14章 爬虫与自动化¶
网络爬虫是现代数据获取和自动化处理的重要技术手段,通过模拟浏览器行为自动访问网页并提取所需信息。本章将从基础概念开始,逐步深入到高级爬虫框架和自动化技术,帮助读者掌握完整的爬虫开发技能。
14.1 网络爬虫基础¶
爬虫概述¶
网络爬虫的定义和用途¶
网络爬虫(Web Crawler),也称为网页蜘蛛(Web Spider)或网络机器人(Web Robot),是一种按照一定规则自动浏览万维网并获取信息的程序。爬虫的主要用途包括:
- 数据采集:从网站获取商品信息、新闻资讯、股票价格等
- 搜索引擎:为搜索引擎建立索引数据库
- 市场分析:收集竞争对手信息,进行市场调研
- 内容监控:监控网站内容变化,及时获取更新
- 学术研究:收集研究数据,进行数据分析
爬虫的工作原理¶
网络爬虫的基本工作流程如下:
- 发送HTTP请求:向目标网站发送请求
- 接收响应数据:获取服务器返回的HTML页面
- 解析页面内容:提取所需的数据信息
- 存储数据:将提取的数据保存到文件或数据库
- 发现新链接:从当前页面中发现新的URL
- 重复过程:对新发现的URL重复上述过程
让我们通过一个简单的示例来理解爬虫的基本原理:
import requests
from bs4 import BeautifulSoup
import time
def simple_crawler(url):
"""
简单的网页爬虫示例
"""
try:
# 1. 发送HTTP请求
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers)
# 2. 检查响应状态
if response.status_code == 200:
# 3. 解析页面内容
soup = BeautifulSoup(response.text, 'html.parser')
# 4. 提取标题
title = soup.find('title')
if title:
print(f"页面标题: {title.get_text().strip()}")
# 5. 提取所有链接
links = soup.find_all('a', href=True)
print(f"找到 {len(links)} 个链接:")
for i, link in enumerate(links[:5]): # 只显示前5个链接
href = "https://yeyupiaoling.cn" + link['href']
text = link.get_text().strip()
print(f"{i+1}. {text} -> {href}")
else:
print(f"请求失败,状态码: {response.status_code}")
except Exception as e:
print(f"爬取过程中出现错误: {e}")
# 使用示例
if __name__ == "__main__":
url = "https://yeyupiaoling.cn"
simple_crawler(url)
运行上述代码,输出类似如下:
页面标题: 夜雨飘零的博客 - 首页
找到 50 个链接:
1. -> https://yeyupiaoling.cn/
2. 夜雨飘零 -> https://yeyupiaoling.cn/
3. 首页 -> https://yeyupiaoling.cn/
4. 归档 -> https://yeyupiaoling.cn/archive
5. 标签 -> https://yeyupiaoling.cn/tag
爬虫的分类和特点¶
根据不同的分类标准,爬虫可以分为以下几类:
按照爬取范围分类:
- 通用爬虫:搜索引擎使用的爬虫,爬取整个互联网
- 聚焦爬虫:针对特定主题或网站的爬虫
- 增量爬虫:只爬取新增或更新的内容
按照技术实现分类:
- 静态爬虫:只能处理静态HTML页面
- 动态爬虫:能够处理JavaScript渲染的动态页面
按照爬取深度分类:
- 浅层爬虫:只爬取首页或少数几层页面
- 深层爬虫:能够深入爬取网站的多层结构
爬虫的法律和道德考量¶
在进行网络爬虫开发时,必须遵守相关的法律法规和道德准则:
- 遵守robots.txt协议:检查网站的robots.txt文件
- 控制爬取频率:避免对服务器造成过大压力
- 尊重版权:不要爬取受版权保护的内容
- 保护隐私:不要爬取个人隐私信息
- 合理使用数据:仅将爬取的数据用于合法目的
HTTP协议基础¶
HTTP请求和响应¶
HTTP(HyperText Transfer Protocol)是网络爬虫与Web服务器通信的基础协议。理解HTTP协议对于开发高效的爬虫至关重要。
HTTP通信包含两个主要部分:
- 请求(Request):客户端向服务器发送的消息
- 响应(Response):服务器返回给客户端的消息
让我们通过代码来观察HTTP请求和响应的详细信息:
import requests
import json
def analyze_http_communication(url):
"""
分析HTTP请求和响应的详细信息
"""
# 创建会话对象
session = requests.Session()
# 设置请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
}
try:
# 发送请求
response = session.get(url, headers=headers)
print("=== HTTP请求信息 ===")
print(f"请求URL: {response.request.url}")
print(f"请求方法: {response.request.method}")
print("请求头:")
for key, value in response.request.headers.items():
print(f" {key}: {value}")
print("\n=== HTTP响应信息 ===")
print(f"状态码: {response.status_code}")
print(f"响应原因: {response.reason}")
print(f"响应时间: {response.elapsed.total_seconds():.3f}秒")
print("响应头:")
for key, value in response.headers.items():
print(f" {key}: {value}")
print(f"\n响应内容长度: {len(response.text)} 字符")
print(f"响应内容类型: {response.headers.get('Content-Type', 'Unknown')}")
except requests.RequestException as e:
print(f"请求失败: {e}")
# 使用示例
if __name__ == "__main__":
analyze_http_communication("https://yeyupiaoling.cn/")
运行结果示例:
=== HTTP请求信息 ===
请求URL: https://yeyupiaoling.cn/
请求方法: GET
请求头:
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36
Accept-Encoding: gzip, deflate
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
Connection: keep-alive
Accept-Language: zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3
=== HTTP响应信息 ===
状态码: 200
响应原因: OK
响应时间: 0.197秒
响应头:
Server: nginx/1.18.0 (Ubuntu)
Date: Sat, 16 Aug 2025 04:36:49 GMT
Content-Type: text/html; charset=utf-8
Transfer-Encoding: chunked
Connection: keep-alive
Vary: Cookie
Content-Encoding: gzip
响应内容长度: 29107 字符
响应内容类型: text/html; charset=utf-8
Cookie和Session机制¶
Cookie和Session是Web应用中维持用户状态的重要机制:
- Cookie:存储在客户端的小型数据文件
- Session:存储在服务器端的用户会话信息
在爬虫开发中,正确处理Cookie和Session对于模拟用户登录和维持会话状态至关重要:
import requests
from http.cookies import SimpleCookie
def demonstrate_cookies_and_sessions():
"""
演示Cookie和Session的使用
"""
# 创建会话对象
session = requests.Session()
print("=== Cookie操作演示 ===")
# 1. 设置Cookie
cookie_url = "https://httpbin.org/cookies/set"
cookie_params = {
'username': 'testuser',
'session_id': 'abc123',
'preferences': 'dark_theme'
}
# 设置Cookie(这会导致重定向)
response = session.get(cookie_url, params=cookie_params)
print(f"设置Cookie后的状态码: {response.status_code}")
# 2. 查看当前Cookie
print("\n当前会话中的Cookie:")
for cookie in session.cookies:
print(f" {cookie.name} = {cookie.value}")
# 3. 发送带Cookie的请求
cookie_test_url = "https://httpbin.org/cookies"
response = session.get(cookie_test_url)
if response.status_code == 200:
cookies_data = response.json()
print(f"\n服务器接收到的Cookie: {cookies_data.get('cookies', {})}")
# 4. 手动设置Cookie
print("\n=== 手动Cookie操作 ===")
manual_session = requests.Session()
# 方法1:通过字典设置
manual_session.cookies.update({
'user_id': '12345',
'auth_token': 'xyz789'
})
# 方法2:通过set方法设置
manual_session.cookies.set('language', 'zh-CN', domain='httpbin.org')
# 测试手动设置的Cookie
response = manual_session.get("https://httpbin.org/cookies")
if response.status_code == 200:
cookies_data = response.json()
print(f"手动设置的Cookie: {cookies_data.get('cookies', {})}")
# 5. Cookie持久化
print("\n=== Cookie持久化 ===")
# 保存Cookie到文件
import pickle
# 保存Cookie
with open('cookies.pkl', 'wb') as f:
pickle.dump(session.cookies, f)
print("Cookie已保存到文件")
# 加载Cookie
new_session = requests.Session()
try:
with open('cookies.pkl', 'rb') as f:
new_session.cookies = pickle.load(f)
print("Cookie已从文件加载")
# 测试加载的Cookie
response = new_session.get("https://httpbin.org/cookies")
if response.status_code == 200:
cookies_data = response.json()
print(f"加载的Cookie: {cookies_data.get('cookies', {})}")
except FileNotFoundError:
print("Cookie文件不存在")
# 模拟登录示例
def simulate_login_with_session():
"""
模拟网站登录过程
"""
print("\n=== 模拟登录流程 ===")
session = requests.Session()
# 1. 访问登录页面(获取必要的Cookie和token)
login_page_url = "https://httpbin.org/cookies/set/csrf_token/abc123def456"
response = session.get(login_page_url)
print(f"访问登录页面: {response.status_code}")
# 2. 提交登录表单
login_data = {
'username': 'testuser',
'password': 'testpass',
'csrf_token': 'abc123def456'
}
login_url = "https://httpbin.org/post"
response = session.post(login_url, data=login_data)
if response.status_code == 200:
print("登录请求发送成功")
response_data = response.json()
print(f"提交的登录数据: {response_data.get('form', {})}")
# 3. 访问需要登录的页面
protected_url = "https://httpbin.org/cookies"
response = session.get(protected_url)
if response.status_code == 200:
print("成功访问受保护页面")
cookies_data = response.json()
print(f"当前会话Cookie: {cookies_data.get('cookies', {})}")
# 运行演示
if __name__ == "__main__":
demonstrate_cookies_and_sessions()
simulate_login_with_session()
运行结果:
=== Cookie操作演示 ===
设置Cookie后的状态码: 200
当前会话中的Cookie:
username = testuser
session_id = abc123
preferences = dark_theme
服务器接收到的Cookie: {'username': 'testuser', 'session_id': 'abc123', 'preferences': 'dark_theme'}
=== 手动Cookie操作 ===
手动设置的Cookie: {'user_id': '12345', 'auth_token': 'xyz789', 'language': 'zh-CN'}
=== Cookie持久化 ===
Cookie已保存到文件
Cookie已从文件加载
加载的Cookie: {'username': 'testuser', 'session_id': 'abc123', 'preferences': 'dark_theme'}
=== 模拟登录流程 ===
访问登录页面: 200
登录请求发送成功
提交的登录数据: {'username': 'testuser', 'password': 'testpass', 'csrf_token': 'abc123def456'}
成功访问受保护页面
当前会话Cookie: {'csrf_token': 'abc123def456'}
网页结构分析¶
HTML基础结构¶
理解HTML结构是网页数据提取的基础。HTML(HyperText Markup Language)使用标签来定义网页内容的结构和语义。
一个典型的HTML页面结构如下:
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>页面标题</title>
<link rel="stylesheet" href="style.css">
</head>
<body>
<header>
<nav>
<ul>
<li><a href="#home">首页</a></li>
<li><a href="#about">关于</a></li>
</ul>
</nav>
</header>
<main>
<article>
<h1>文章标题</h1>
<p class="content">文章内容...</p>
</article>
</main>
<footer>
<p>© 2024 版权信息</p>
</footer>
<script src="script.js"></script>
</body>
</html>
让我们编写一个HTML结构分析工具:
import requests
from bs4 import BeautifulSoup
from collections import Counter
def analyze_html_structure(url):
"""
分析网页的HTML结构
"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
print(f"=== HTML结构分析: {url} ===")
# 1. 基本信息
title = soup.find('title')
print(f"页面标题: {title.get_text().strip() if title else '无标题'}")
# 2. 文档类型和编码
doctype = soup.contents[0] if soup.contents and hasattr(soup.contents[0], 'string') else None
print(f"文档类型: {doctype if doctype else 'HTML5'}")
charset_meta = soup.find('meta', attrs={'charset': True})
if not charset_meta:
charset_meta = soup.find('meta', attrs={'http-equiv': 'Content-Type'})
encoding = charset_meta.get('charset') if charset_meta else response.encoding
print(f"字符编码: {encoding}")
# 3. 标签统计
all_tags = [tag.name for tag in soup.find_all()]
tag_counter = Counter(all_tags)
print(f"\n标签统计 (前10个):")
for tag, count in tag_counter.most_common(10):
print(f" {tag}: {count}个")
# 4. 链接分析
links = soup.find_all('a', href=True)
print(f"\n链接分析:")
print(f" 总链接数: {len(links)}")
internal_links = []
external_links = []
for link in links:
href = link['href']
if href.startswith('http'):
if url in href:
internal_links.append(href)
else:
external_links.append(href)
elif href.startswith('/'):
internal_links.append(href)
print(f" 内部链接: {len(internal_links)}个")
print(f" 外部链接: {len(external_links)}个")
# 5. 图片分析
images = soup.find_all('img')
print(f"\n图片分析:")
print(f" 图片总数: {len(images)}")
img_with_alt = [img for img in images if img.get('alt')]
print(f" 有alt属性: {len(img_with_alt)}个")
# 6. 表单分析
forms = soup.find_all('form')
print(f"\n表单分析:")
print(f" 表单总数: {len(forms)}")
for i, form in enumerate(forms):
method = form.get('method', 'GET').upper()
action = form.get('action', '当前页面')
inputs = form.find_all(['input', 'select', 'textarea'])
print(f" 表单{i+1}: {method} -> {action} ({len(inputs)}个字段)")
# 7. 脚本和样式
scripts = soup.find_all('script')
stylesheets = soup.find_all('link', rel='stylesheet')
print(f"\n资源分析:")
print(f" JavaScript文件: {len(scripts)}个")
print(f" CSS样式表: {len(stylesheets)}个")
# 8. 结构层次
print(f"\n页面结构:")
body = soup.find('body')
if body:
print_structure(body, level=0, max_level=3)
else:
print(f"请求失败,状态码: {response.status_code}")
except Exception as e:
print(f"分析过程中出现错误: {e}")
def print_structure(element, level=0, max_level=3):
"""
递归打印HTML结构
"""
if level > max_level:
return
indent = " " * level
tag_name = element.name
# 获取重要属性
attrs = []
if element.get('id'):
attrs.append(f"id='{element['id']}'")
if element.get('class'):
classes = ' '.join(element['class'])
attrs.append(f"class='{classes}'")
attr_str = f" [{', '.join(attrs)}]" if attrs else ""
print(f"{indent}<{tag_name}>{attr_str}")
# 递归处理子元素
for child in element.children:
if hasattr(child, 'name') and child.name:
print_structure(child, level + 1, max_level)
# 使用示例
if __name__ == "__main__":
# 分析一个示例网页
analyze_html_structure("https://httpbin.org/html")
运行结果示例:
=== HTML结构分析: https://httpbin.org/html ===
页面标题: Herman Melville - Moby-Dick
文档类型: HTML5
字符编码: utf-8
标签统计 (前10个):
p: 4个
a: 3个
h1: 1个
body: 1个
html: 1个
head: 1个
title: 1个
链接分析:
总链接数: 3个
内部链接: 0个
外部链接: 3个
图片分析:
图片总数: 0个
有alt属性: 0个
表单分析:
表单总数: 0个
资源分析:
JavaScript文件: 0个
CSS样式表: 0个
页面结构:
<body>
<h1>
<p>
<p>
<p>
<p>
CSS选择器¶
CSS选择器是定位HTML元素的强大工具,在网页数据提取中起着关键作用。理解CSS选择器语法对于精确定位目标元素至关重要。
基本选择器:
- 标签选择器:div、p、a
- 类选择器:.class-name
- ID选择器:#element-id
- 属性选择器:[attribute="value"]
组合选择器:
- 后代选择器:div p(div内的所有p元素)
- 子元素选择器:div > p(div的直接子p元素)
- 相邻兄弟选择器:h1 + p(紧跟h1的p元素)
- 通用兄弟选择器:h1 ~ p(h1后的所有同级p元素)
伪类选择器:
- :first-child、:last-child、:nth-child(n)
- :not(selector)、:contains(text)
让我们通过实例来学习CSS选择器的使用:
import requests
from bs4 import BeautifulSoup
def demonstrate_css_selectors():
"""
演示CSS选择器的使用
"""
# 创建示例HTML
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>CSS选择器示例</title>
</head>
<body>
<div class="container">
<h1 id="main-title">新闻列表</h1>
<div class="news-section">
<article class="news-item featured">
<h2>重要新闻标题1</h2>
<p class="summary">这是新闻摘要...</p>
<span class="date">2024-01-15</span>
<a href="/news/1" class="read-more">阅读更多</a>
</article>
<article class="news-item">
<h2>普通新闻标题2</h2>
<p class="summary">这是另一个新闻摘要...</p>
<span class="date">2024-01-14</span>
<a href="/news/2" class="read-more">阅读更多</a>
</article>
<article class="news-item">
<h2>普通新闻标题3</h2>
<p class="summary">第三个新闻摘要...</p>
<span class="date">2024-01-13</span>
<a href="/news/3" class="read-more">阅读更多</a>
</article>
</div>
<aside class="sidebar">
<h3>热门标签</h3>
<ul class="tag-list">
<li><a href="/tag/tech" data-category="technology">科技</a></li>
<li><a href="/tag/sports" data-category="sports">体育</a></li>
<li><a href="/tag/finance" data-category="finance">财经</a></li>
</ul>
</aside>
</div>
</body>
</html>
"""
soup = BeautifulSoup(html_content, 'html.parser')
print("=== CSS选择器演示 ===")
# 1. 基本选择器
print("\n1. 基本选择器:")
# 标签选择器
h2_elements = soup.select('h2')
print(f"所有h2标签 ({len(h2_elements)}个):")
for h2 in h2_elements:
print(f" - {h2.get_text().strip()}")
# 类选择器
news_items = soup.select('.news-item')
print(f"\n所有新闻项 ({len(news_items)}个):")
for i, item in enumerate(news_items, 1):
title = item.select_one('h2').get_text().strip()
print(f" {i}. {title}")
# ID选择器
main_title = soup.select_one('#main-title')
print(f"\n主标题: {main_title.get_text().strip()}")
# 属性选择器
tech_links = soup.select('a[data-category="technology"]')
print(f"\n科技类链接 ({len(tech_links)}个):")
for link in tech_links:
print(f" - {link.get_text().strip()} -> {link.get('href')}")
# 2. 组合选择器
print("\n2. 组合选择器:")
# 后代选择器
container_links = soup.select('.container a')
print(f"容器内所有链接 ({len(container_links)}个):")
for link in container_links:
text = link.get_text().strip()
href = link.get('href', '#')
print(f" - {text} -> {href}")
# 子元素选择器
direct_children = soup.select('.news-section > .news-item')
print(f"\n新闻区域的直接子元素 ({len(direct_children)}个)")
# 相邻兄弟选择器
after_h2 = soup.select('h2 + p')
print(f"\nh2后的相邻p元素 ({len(after_h2)}个):")
for p in after_h2:
print(f" - {p.get_text().strip()[:30]}...")
# 3. 伪类选择器
print("\n3. 伪类选择器:")
# 第一个和最后一个子元素
first_news = soup.select('.news-item:first-child')
last_news = soup.select('.news-item:last-child')
if first_news:
first_title = first_news[0].select_one('h2').get_text().strip()
print(f"第一个新闻: {first_title}")
if last_news:
last_title = last_news[0].select_one('h2').get_text().strip()
print(f"最后一个新闻: {last_title}")
# nth-child选择器
second_news = soup.select('.news-item:nth-child(2)')
if second_news:
second_title = second_news[0].select_one('h2').get_text().strip()
print(f"第二个新闻: {second_title}")
# 4. 复杂选择器组合
print("\n4. 复杂选择器:")
# 选择特色新闻的标题
featured_title = soup.select('.news-item.featured h2')
if featured_title:
print(f"特色新闻标题: {featured_title[0].get_text().strip()}")
# 选择包含特定文本的元素
read_more_links = soup.select('a.read-more')
print(f"'阅读更多'链接 ({len(read_more_links)}个)")
# 选择具有特定属性的元素
category_links = soup.select('a[data-category]')
print(f"有分类属性的链接 ({len(category_links)}个):")
for link in category_links:
category = link.get('data-category')
text = link.get_text().strip()
print(f" - {text} (分类: {category})")
# 实际网页CSS选择器应用
def extract_data_with_css_selectors(url):
"""
使用CSS选择器从实际网页提取数据
"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
print(f"\n=== 从 {url} 提取数据 ===")
# 提取页面标题
title = soup.select_one('title')
if title:
print(f"页面标题: {title.get_text().strip()}")
# 提取所有链接
links = soup.select('a[href]')
print(f"\n找到 {len(links)} 个链接:")
for i, link in enumerate(links[:5], 1): # 只显示前5个
text = link.get_text().strip()
href = link.get('href')
print(f" {i}. {text[:50]}... -> {href}")
# 提取所有段落文本
paragraphs = soup.select('p')
if paragraphs:
print(f"\n段落内容 (共{len(paragraphs)}个):")
for i, p in enumerate(paragraphs[:3], 1): # 只显示前3个
text = p.get_text().strip()
if text:
print(f" {i}. {text[:100]}...")
else:
print(f"请求失败,状态码: {response.status_code}")
except Exception as e:
print(f"提取数据时出现错误: {e}")
# 运行演示
if __name__ == "__main__":
demonstrate_css_selectors()
extract_data_with_css_selectors("https://httpbin.org/html")
JavaScript和动态内容¶
现代网页大量使用JavaScript来动态生成内容,这给传统的静态爬虫带来了挑战。动态内容包括:
- AJAX加载的数据:通过异步请求获取的内容
- JavaScript渲染的页面:完全由JS生成的页面结构
- 用户交互触发的内容:点击、滚动等操作后显示的内容
- 实时更新的数据:WebSocket或定时刷新的内容
处理动态内容的方法:
方法1:分析AJAX请求
import requests
import json
def analyze_ajax_requests():
"""
分析和模拟AJAX请求
"""
print("=== AJAX请求分析 ===")
# 模拟一个AJAX请求
ajax_url = "https://httpbin.org/json"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'X-Requested-With': 'XMLHttpRequest', # 标识AJAX请求
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Content-Type': 'application/json'
}
try:
response = requests.get(ajax_url, headers=headers)
if response.status_code == 200:
data = response.json()
print(f"AJAX响应数据:")
print(json.dumps(data, indent=2, ensure_ascii=False))
else:
print(f"AJAX请求失败: {response.status_code}")
except Exception as e:
print(f"AJAX请求异常: {e}")
# 运行AJAX分析
if __name__ == "__main__":
analyze_ajax_requests()
方法2:使用Selenium处理JavaScript
# 注意:需要安装selenium和对应的浏览器驱动
# pip install selenium
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
def handle_dynamic_content_with_selenium():
"""
使用Selenium处理动态内容
"""
print("=== Selenium处理动态内容 ===")
# 配置Chrome选项
chrome_options = Options()
chrome_options.add_argument('--headless') # 无头模式
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
try:
# 创建WebDriver实例
driver = webdriver.Chrome(options=chrome_options)
# 访问包含动态内容的页面
driver.get("https://httpbin.org/html")
# 等待页面加载完成
wait = WebDriverWait(driver, 10)
# 获取页面标题
title = driver.title
print(f"页面标题: {title}")
# 查找元素
h1_element = wait.until(
EC.presence_of_element_located((By.TAG_NAME, "h1"))
)
print(f"H1内容: {h1_element.text}")
# 获取所有链接
links = driver.find_elements(By.TAG_NAME, "a")
print(f"\n找到 {len(links)} 个链接:")
for i, link in enumerate(links, 1):
text = link.text.strip()
href = link.get_attribute('href')
print(f" {i}. {text} -> {href}")
# 执行JavaScript
js_result = driver.execute_script("return document.title;")
print(f"\nJavaScript执行结果: {js_result}")
except Exception as e:
print(f"Selenium处理异常: {e}")
finally:
if 'driver' in locals():
driver.quit()
# 注意:实际运行需要安装ChromeDriver
# 这里只是演示代码结构
网页编码和字符集¶
正确处理网页编码是避免乱码问题的关键。常见的编码格式包括:
- UTF-8:支持全球所有字符的Unicode编码
- GBK/GB2312:中文编码格式
- ISO-8859-1:西欧字符编码
- ASCII:基本英文字符编码
import requests
from bs4 import BeautifulSoup
import chardet
def handle_encoding_issues():
"""
处理网页编码问题
"""
print("=== 网页编码处理 ===")
# 测试不同编码的处理
test_urls = [
"https://httpbin.org/encoding/utf8",
"https://httpbin.org/html",
]
for url in test_urls:
try:
print(f"\n处理URL: {url}")
# 获取原始响应
response = requests.get(url)
print(f"响应编码: {response.encoding}")
print(f"表观编码: {response.apparent_encoding}")
# 方法1:使用chardet检测编码
detected_encoding = chardet.detect(response.content)
print(f"检测到的编码: {detected_encoding}")
# 方法2:从HTML meta标签获取编码
soup = BeautifulSoup(response.content, 'html.parser')
# 查找charset声明
charset_meta = soup.find('meta', attrs={'charset': True})
if charset_meta:
declared_charset = charset_meta.get('charset')
print(f"声明的编码: {declared_charset}")
else:
# 查找http-equiv类型的meta标签
content_type_meta = soup.find('meta', attrs={'http-equiv': 'Content-Type'})
if content_type_meta:
content = content_type_meta.get('content', '')
if 'charset=' in content:
declared_charset = content.split('charset=')[1].split(';')[0]
print(f"声明的编码: {declared_charset}")
# 方法3:正确设置编码后重新解析
if detected_encoding['encoding']:
response.encoding = detected_encoding['encoding']
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.find('title')
if title:
print(f"正确编码后的标题: {title.get_text().strip()}")
except Exception as e:
print(f"编码处理异常: {e}")
def create_encoding_safe_crawler():
"""
创建编码安全的爬虫
"""
def safe_get_text(url, timeout=10):
"""
安全获取网页文本内容
"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=timeout)
# 1. 首先尝试使用响应头中的编码
if response.encoding != 'ISO-8859-1': # 避免错误的默认编码
soup = BeautifulSoup(response.text, 'html.parser')
else:
# 2. 使用chardet检测编码
detected = chardet.detect(response.content)
if detected['confidence'] > 0.7: # 置信度阈值
response.encoding = detected['encoding']
soup = BeautifulSoup(response.text, 'html.parser')
else:
# 3. 尝试常见编码
for encoding in ['utf-8', 'gbk', 'gb2312']:
try:
text = response.content.decode(encoding)
soup = BeautifulSoup(text, 'html.parser')
break
except UnicodeDecodeError:
continue
else:
# 4. 使用错误处理策略
soup = BeautifulSoup(response.content, 'html.parser', from_encoding='utf-8')
return soup
except Exception as e:
print(f"获取页面内容失败: {e}")
return None
# 测试编码安全爬虫
test_url = "https://httpbin.org/html"
soup = safe_get_text(test_url)
if soup:
title = soup.find('title')
print(f"\n编码安全爬虫结果:")
print(f"标题: {title.get_text().strip() if title else '无标题'}")
# 提取文本内容
paragraphs = soup.find_all('p')
print(f"段落数量: {len(paragraphs)}")
for i, p in enumerate(paragraphs[:2], 1):
text = p.get_text().strip()
print(f"段落{i}: {text[:100]}...")
# 运行编码处理演示
if __name__ == "__main__":
handle_encoding_issues()
create_encoding_safe_crawler()
爬虫开发环境¶
开发工具选择¶
选择合适的开发工具能够显著提高爬虫开发效率:
IDE和编辑器:
- PyCharm:功能强大的Python IDE,支持调试和代码分析
- VS Code:轻量级编辑器,丰富的插件生态
- Jupyter Notebook:适合数据分析和原型开发
- Sublime Text:快速的文本编辑器
浏览器开发者工具:
- Chrome DevTools:分析网页结构、网络请求、JavaScript执行
- Firefox Developer Tools:类似Chrome,某些功能更强大
- 网络面板:查看HTTP请求和响应
- 元素面板:分析HTML结构和CSS样式
抓包工具:
- Fiddler:Windows平台的HTTP调试代理
- Charles:跨平台的HTTP监控工具
- mitmproxy:基于Python的中间人代理
- Wireshark:网络协议分析器
代理和IP池¶
使用代理服务器可以隐藏真实IP地址,避免被网站封禁:
import requests
import random
import time
from itertools import cycle
class ProxyManager:
"""
代理管理器
"""
def __init__(self):
# 代理列表(示例,实际使用时需要有效的代理)
self.proxy_list = [
{'http': 'http://proxy1:port', 'https': 'https://proxy1:port'},
{'http': 'http://proxy2:port', 'https': 'https://proxy2:port'},
{'http': 'http://proxy3:port', 'https': 'https://proxy3:port'},
]
self.proxy_cycle = cycle(self.proxy_list)
self.failed_proxies = set()
def get_proxy(self):
"""
获取可用代理
"""
for _ in range(len(self.proxy_list)):
proxy = next(self.proxy_cycle)
proxy_key = str(proxy)
if proxy_key not in self.failed_proxies:
return proxy
# 如果所有代理都失败,清空失败列表重新开始
self.failed_proxies.clear()
return next(self.proxy_cycle)
def mark_proxy_failed(self, proxy):
"""
标记代理失败
"""
self.failed_proxies.add(str(proxy))
def test_proxy(self, proxy, test_url="https://httpbin.org/ip"):
"""
测试代理是否可用
"""
try:
response = requests.get(
test_url,
proxies=proxy,
timeout=10,
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
)
if response.status_code == 200:
data = response.json()
print(f"代理测试成功,IP: {data.get('origin')}")
return True
else:
print(f"代理测试失败,状态码: {response.status_code}")
return False
except Exception as e:
print(f"代理测试异常: {e}")
return False
def demonstrate_proxy_usage():
"""
演示代理使用
"""
print("=== 代理使用演示 ===")
# 不使用代理的请求
try:
response = requests.get("https://httpbin.org/ip", timeout=10)
if response.status_code == 200:
data = response.json()
print(f"直接访问IP: {data.get('origin')}")
except Exception as e:
print(f"直接访问失败: {e}")
# 使用代理的请求(示例)
proxy_manager = ProxyManager()
# 注意:以下代码需要有效的代理服务器才能正常工作
print("\n代理测试(需要有效代理):")
for i in range(3):
proxy = proxy_manager.get_proxy()
print(f"测试代理 {i+1}: {proxy}")
# 在实际环境中测试代理
# is_working = proxy_manager.test_proxy(proxy)
# if not is_working:
# proxy_manager.mark_proxy_failed(proxy)
# 免费代理获取示例
def get_free_proxies():
"""
获取免费代理(示例)
"""
print("\n=== 免费代理获取 ===")
# 这里只是演示结构,实际需要从代理网站爬取
free_proxy_sources = [
"https://www.proxy-list.download/api/v1/get?type=http",
"https://api.proxyscrape.com/v2/?request=get&protocol=http",
]
proxies = []
for source in free_proxy_sources:
try:
print(f"从 {source} 获取代理...")
# 实际实现需要解析不同网站的格式
# response = requests.get(source, timeout=10)
# 解析代理列表...
print("代理获取完成(示例)")
except Exception as e:
print(f"获取代理失败: {e}")
return proxies
# 运行代理演示
if __name__ == "__main__":
demonstrate_proxy_usage()
get_free_proxies()
用户代理设置¶
用户代理(User-Agent)字符串标识客户端应用程序,设置合适的User-Agent可以避免被识别为爬虫:
import requests
import random
class UserAgentManager:
"""
用户代理管理器
"""
def __init__(self):
self.user_agents = [
# Chrome
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
# Firefox
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:121.0) Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (X11; Linux x86_64; rv:121.0) Gecko/20100101 Firefox/121.0',
# Safari
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
'Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Mobile/15E148 Safari/604.1',
# Edge
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0',
]
def get_random_user_agent(self):
"""
获取随机用户代理
"""
return random.choice(self.user_agents)
def get_mobile_user_agent(self):
"""
获取移动端用户代理
"""
mobile_agents = [
'Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Mobile/15E148 Safari/604.1',
'Mozilla/5.0 (Android 14; Mobile; rv:121.0) Gecko/121.0 Firefox/121.0',
'Mozilla/5.0 (Linux; Android 14; SM-G998B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36',
]
return random.choice(mobile_agents)
def demonstrate_user_agent():
"""
演示用户代理的使用
"""
print("=== 用户代理演示 ===")
ua_manager = UserAgentManager()
# 测试不同的用户代理
test_url = "https://httpbin.org/user-agent"
for i in range(3):
user_agent = ua_manager.get_random_user_agent()
headers = {'User-Agent': user_agent}
try:
response = requests.get(test_url, headers=headers)
if response.status_code == 200:
data = response.json()
print(f"\n请求 {i+1}:")
print(f"发送的User-Agent: {user_agent[:50]}...")
print(f"服务器接收到的: {data.get('user-agent', '')[:50]}...")
except Exception as e:
print(f"请求失败: {e}")
# 测试移动端用户代理
print("\n=== 移动端用户代理 ===")
mobile_ua = ua_manager.get_mobile_user_agent()
headers = {'User-Agent': mobile_ua}
try:
response = requests.get(test_url, headers=headers)
if response.status_code == 200:
data = response.json()
print(f"移动端User-Agent: {data.get('user-agent')}")
except Exception as e:
print(f"移动端请求失败: {e}")
# 运行用户代理演示
if __name__ == "__main__":
demonstrate_user_agent()
调试和测试工具¶
有效的调试和测试工具能够帮助快速定位和解决爬虫开发中的问题:
import requests
import time
import logging
from functools import wraps
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('crawler.log'),
logging.StreamHandler()
]
)
def debug_request(func):
"""
请求调试装饰器
"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
end_time = time.time()
logging.info(f"{func.__name__} 执行成功,耗时: {end_time - start_time:.3f}秒")
return result
except Exception as e:
end_time = time.time()
logging.error(f"{func.__name__} 执行失败,耗时: {end_time - start_time:.3f}秒,错误: {e}")
raise
return wrapper
class CrawlerDebugger:
"""
爬虫调试器
"""
def __init__(self):
self.request_count = 0
self.success_count = 0
self.error_count = 0
self.start_time = time.time()
@debug_request
def debug_get(self, url, **kwargs):
"""
调试版本的GET请求
"""
self.request_count += 1
# 默认headers
default_headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
headers = kwargs.get('headers', {})
headers.update(default_headers)
kwargs['headers'] = headers
logging.info(f"发送GET请求到: {url}")
logging.debug(f"请求参数: {kwargs}")
try:
response = requests.get(url, **kwargs)
logging.info(f"响应状态码: {response.status_code}")
logging.info(f"响应时间: {response.elapsed.total_seconds():.3f}秒")
logging.debug(f"响应头: {dict(response.headers)}")
if response.status_code == 200:
self.success_count += 1
else:
self.error_count += 1
logging.warning(f"非200状态码: {response.status_code}")
return response
except requests.RequestException as e:
self.error_count += 1
logging.error(f"请求异常: {e}")
raise
def get_stats(self):
"""
获取统计信息
"""
elapsed_time = time.time() - self.start_time
stats = {
'总请求数': self.request_count,
'成功请求数': self.success_count,
'失败请求数': self.error_count,
'成功率': f"{(self.success_count / max(self.request_count, 1)) * 100:.2f}%",
'运行时间': f"{elapsed_time:.2f}秒",
'平均请求速度': f"{self.request_count / max(elapsed_time, 1):.2f}请求/秒"
}
return stats
def print_stats(self):
"""
打印统计信息
"""
stats = self.get_stats()
print("\n=== 爬虫统计信息 ===")
for key, value in stats.items():
print(f"{key}: {value}")
def test_crawler_debugger():
"""
测试爬虫调试器
"""
debugger = CrawlerDebugger()
test_urls = [
"https://httpbin.org/get",
"https://httpbin.org/status/200",
"https://httpbin.org/delay/1",
"https://httpbin.org/status/404", # 这个会返回404
"https://httpbin.org/json",
]
print("开始测试爬虫调试器...")
for url in test_urls:
try:
response = debugger.debug_get(url, timeout=10)
print(f"✓ {url} - 状态码: {response.status_code}")
except Exception as e:
print(f"✗ {url} - 错误: {e}")
time.sleep(0.5) # 避免请求过快
# 打印统计信息
debugger.print_stats()
# 性能测试工具
def performance_test(func, *args, **kwargs):
"""
性能测试装饰器
"""
def test_performance(iterations=10):
times = []
for i in range(iterations):
start_time = time.time()
try:
func(*args, **kwargs)
end_time = time.time()
times.append(end_time - start_time)
except Exception as e:
print(f"第{i+1}次测试失败: {e}")
if times:
avg_time = sum(times) / len(times)
min_time = min(times)
max_time = max(times)
print(f"\n=== 性能测试结果 ({iterations}次) ===")
print(f"平均时间: {avg_time:.3f}秒")
print(f"最短时间: {min_time:.3f}秒")
print(f"最长时间: {max_time:.3f}秒")
print(f"成功率: {len(times)}/{iterations} ({len(times)/iterations*100:.1f}%)")
return test_performance
# 运行调试演示
if __name__ == "__main__":
test_crawler_debugger()
# 性能测试示例
@performance_test
def simple_request():
response = requests.get("https://httpbin.org/get", timeout=5)
return response.status_code == 200
print("\n开始性能测试...")
simple_request(iterations=5)
运行结果示例:
开始测试爬虫调试器...
2024-01-15 14:30:15,123 - INFO - 发送GET请求到: https://httpbin.org/get
2024-01-15 14:30:15,456 - INFO - 响应状态码: 200
2024-01-15 14:30:15,456 - INFO - 响应时间: 0.333秒
2024-01-15 14:30:15,456 - INFO - debug_get 执行成功,耗时: 0.334秒
✓ https://httpbin.org/get - 状态码: 200
2024-01-15 14:30:16,001 - INFO - 发送GET请求到: https://httpbin.org/status/200
2024-01-15 14:30:16,234 - INFO - 响应状态码: 200
2024-01-15 14:30:16,234 - INFO - 响应时间: 0.233秒
2024-01-15 14:30:16,234 - INFO - debug_get 执行成功,耗时: 0.234秒
✓ https://httpbin.org/status/200 - 状态码: 200
=== 爬虫统计信息 ===
总请求数: 5
成功请求数: 4
失败请求数: 1
成功率: 80.00%
运行时间: 3.45秒
平均请求速度: 1.45请求/秒
=== 性能测试结果 (5次) ===
平均时间: 0.456秒
最短时间: 0.234秒
最长时间: 0.678秒
成功率: 5/5 (100.0%)
14.2 Requests库网络请求¶
Requests是Python中最受欢迎的HTTP库,它让HTTP请求变得简单而优雅。相比于Python标准库中的urllib,Requests提供了更加人性化的API,是网络爬虫开发的首选工具。
Requests基础¶
安装和基本使用¶
Requests库的安装非常简单,使用pip命令即可:
pip install requests
安装完成后,我们来看看Requests的基本使用方法:
import requests
import json
from pprint import pprint
def basic_requests_usage():
"""
演示Requests的基本使用方法
"""
print("=== Requests基础使用演示 ===")
# 1. 最简单的GET请求
print("\n1. 基本GET请求:")
response = requests.get('https://httpbin.org/get')
print(f"状态码: {response.status_code}")
print(f"响应时间: {response.elapsed.total_seconds():.3f}秒")
print(f"内容类型: {response.headers.get('content-type')}")
# 2. 检查请求是否成功
if response.status_code == 200:
print("请求成功!")
data = response.json() # 解析JSON响应
print(f"服务器接收到的URL: {data['url']}")
else:
print(f"请求失败,状态码: {response.status_code}")
# 3. 使用raise_for_status()检查状态
try:
response.raise_for_status() # 如果状态码不是200会抛出异常
print("状态检查通过")
except requests.exceptions.HTTPError as e:
print(f"HTTP错误: {e}")
# 4. 获取响应内容的不同方式
print("\n2. 响应内容获取:")
# 文本内容
print(f"响应文本长度: {len(response.text)}字符")
# 二进制内容
print(f"响应二进制长度: {len(response.content)}字节")
# JSON内容(如果是JSON格式)
try:
json_data = response.json()
print(f"JSON数据键: {list(json_data.keys())}")
except ValueError:
print("响应不是有效的JSON格式")
# 5. 响应头信息
print("\n3. 响应头信息:")
print(f"服务器: {response.headers.get('server', '未知')}")
print(f"内容长度: {response.headers.get('content-length', '未知')}")
print(f"连接类型: {response.headers.get('connection', '未知')}")
# 运行基础演示
if __name__ == "__main__":
basic_requests_usage()
运行结果:
=== Requests基础使用演示 ===
1. 基本GET请求:
状态码: 200
响应时间: 0.234秒
内容类型: application/json
请求成功!
服务器接收到的URL: https://httpbin.org/get
状态检查通过
2. 响应内容获取:
响应文本长度: 312字符
响应二进制长度: 312字节
JSON数据键: ['args', 'headers', 'origin', 'url']
3. 响应头信息:
服务器: gunicorn/19.9.0
内容长度: 312
连接类型: keep-alive
GET和POST请求¶
GET和POST是HTTP协议中最常用的两种请求方法。GET用于获取数据,POST用于提交数据。
GET请求详解:
import requests
from urllib.parse import urlencode
def demonstrate_get_requests():
"""
演示各种GET请求的使用方法
"""
print("=== GET请求详解 ===")
# 1. 基本GET请求
print("\n1. 基本GET请求:")
response = requests.get('https://httpbin.org/get')
print(f"请求URL: {response.url}")
print(f"状态码: {response.status_code}")
# 2. 带参数的GET请求
print("\n2. 带参数的GET请求:")
# 方法1: 使用params参数
params = {
'name': '张三',
'age': 25,
'city': '北京',
'hobbies': ['读书', '游泳'] # 列表参数
}
response = requests.get('https://httpbin.org/get', params=params)
print(f"构建的URL: {response.url}")
data = response.json()
print(f"服务器接收到的参数: {data['args']}")
# 方法2: 直接在URL中包含参数
url_with_params = 'https://httpbin.org/get?name=李四&age=30'
response2 = requests.get(url_with_params)
print(f"\n直接URL参数: {response2.json()['args']}")
# 3. 自定义请求头
print("\n3. 自定义请求头:")
headers = {
'User-Agent': 'MySpider/1.0',
'Accept': 'application/json',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Referer': 'https://www.example.com'
}
response = requests.get('https://httpbin.org/get', headers=headers)
received_headers = response.json()['headers']
print(f"发送的User-Agent: {headers['User-Agent']}")
print(f"服务器接收到的User-Agent: {received_headers.get('User-Agent')}")
# 4. 超时设置
print("\n4. 超时设置:")
try:
# 设置连接超时为3秒,读取超时为5秒
response = requests.get('https://httpbin.org/delay/2', timeout=(3, 5))
print(f"请求成功,耗时: {response.elapsed.total_seconds():.3f}秒")
except requests.exceptions.Timeout:
print("请求超时")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
# 5. 处理重定向
print("\n5. 重定向处理:")
# 允许重定向(默认行为)
response = requests.get('https://httpbin.org/redirect/2')
print(f"最终URL: {response.url}")
print(f"重定向历史: {[r.url for r in response.history]}")
# 禁止重定向
response_no_redirect = requests.get('https://httpbin.org/redirect/1', allow_redirects=False)
print(f"\n禁止重定向状态码: {response_no_redirect.status_code}")
print(f"Location头: {response_no_redirect.headers.get('Location')}")
# 运行GET请求演示
if __name__ == "__main__":
demonstrate_get_requests()
POST请求详解:
import requests
import json
def demonstrate_post_requests():
"""
演示各种POST请求的使用方法
"""
print("=== POST请求详解 ===")
# 1. 发送表单数据
print("\n1. 发送表单数据:")
form_data = {
'username': 'testuser',
'password': 'testpass',
'email': 'test@example.com',
'remember': 'on'
}
response = requests.post('https://httpbin.org/post', data=form_data)
if response.status_code == 200:
result = response.json()
print(f"发送的表单数据: {form_data}")
print(f"服务器接收到的表单: {result['form']}")
print(f"Content-Type: {result['headers'].get('Content-Type')}")
# 2. 发送JSON数据
print("\n2. 发送JSON数据:")
json_data = {
'name': '王五',
'age': 28,
'skills': ['Python', 'JavaScript', 'SQL'],
'is_active': True,
'profile': {
'city': '上海',
'experience': 5
}
}
# 方法1: 使用json参数(推荐)
response = requests.post('https://httpbin.org/post', json=json_data)
if response.status_code == 200:
result = response.json()
print(f"发送的JSON数据: {json_data}")
print(f"服务器接收到的JSON: {result['json']}")
print(f"Content-Type: {result['headers'].get('Content-Type')}")
# 方法2: 手动设置headers和data
headers = {'Content-Type': 'application/json'}
response2 = requests.post(
'https://httpbin.org/post',
data=json.dumps(json_data),
headers=headers
)
print(f"\n手动设置方式状态码: {response2.status_code}")
# 3. 发送文件
print("\n3. 文件上传:")
# 创建一个临时文件用于演示
import tempfile
import os
# 创建临时文件
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write("这是一个测试文件\n包含中文内容")
temp_file_path = f.name
try:
# 上传文件
with open(temp_file_path, 'rb') as f:
files = {'file': ('test.txt', f, 'text/plain')}
response = requests.post('https://httpbin.org/post', files=files)
if response.status_code == 200:
result = response.json()
print(f"上传的文件信息: {result['files']}")
print(f"Content-Type: {result['headers'].get('Content-Type')}")
finally:
# 清理临时文件
os.unlink(temp_file_path)
# 4. 混合数据提交
print("\n4. 混合数据提交:")
# 同时发送表单数据和文件
form_data = {'description': '文件描述', 'category': 'test'}
# 创建内存中的文件对象
from io import StringIO, BytesIO
file_content = BytesIO(b"Hello, World! This is a test file.")
files = {'upload': ('hello.txt', file_content, 'text/plain')}
response = requests.post(
'https://httpbin.org/post',
data=form_data,
files=files
)
if response.status_code == 200:
result = response.json()
print(f"表单数据: {result['form']}")
print(f"文件数据: {list(result['files'].keys())}")
# 5. 自定义请求头的POST
print("\n5. 自定义请求头的POST:")
headers = {
'User-Agent': 'MyApp/2.0',
'Authorization': 'Bearer your-token-here',
'X-Custom-Header': 'custom-value'
}
data = {'message': 'Hello from custom headers'}
response = requests.post(
'https://httpbin.org/post',
json=data,
headers=headers
)
if response.status_code == 200:
result = response.json()
received_headers = result['headers']
print(f"自定义头部 X-Custom-Header: {received_headers.get('X-Custom-Header')}")
print(f"Authorization: {received_headers.get('Authorization')}")
# 运行POST请求演示
if __name__ == "__main__":
demonstrate_post_requests()
运行结果示例:
=== POST请求详解 ===
1. 发送表单数据:
发送的表单数据: {'username': 'testuser', 'password': 'testpass', 'email': 'test@example.com', 'remember': 'on'}
服务器接收到的表单: {'username': 'testuser', 'password': 'testpass', 'email': 'test@example.com', 'remember': 'on'}
Content-Type: application/x-www-form-urlencoded
2. 发送JSON数据:
发送的JSON数据: {'name': '王五', 'age': 28, 'skills': ['Python', 'JavaScript', 'SQL'], 'is_active': True, 'profile': {'city': '上海', 'experience': 5}}
服务器接收到的JSON: {'name': '王五', 'age': 28, 'skills': ['Python', 'JavaScript', 'SQL'], 'is_active': True, 'profile': {'city': '上海', 'experience': 5}}
Content-Type: application/json
3. 文件上传:
上传的文件信息: {'file': '这是一个测试文件\n包含中文内容'}
Content-Type: multipart/form-data; boundary=...
4. 混合数据提交:
表单数据: {'description': '文件描述', 'category': 'test'}
文件数据: ['upload']
5. 自定义请求头的POST:
自定义头部 X-Custom-Header: custom-value
Authorization: Bearer your-token-here
请求参数和头部¶
在网络爬虫中,正确设置请求参数和头部信息是非常重要的,它们决定了服务器如何处理我们的请求。
请求参数详解¶
import requests
from urllib.parse import urlencode, quote
def advanced_parameters_demo():
"""
演示高级参数处理
"""
print("=== 高级参数处理演示 ===")
# 1. 复杂参数结构
print("\n1. 复杂参数结构:")
complex_params = {
'q': 'Python爬虫', # 中文搜索词
'page': 1,
'size': 20,
'sort': ['time', 'relevance'], # 多值参数
'filters': {
'category': 'tech',
'date_range': '2024-01-01,2024-12-31'
},
'include_fields': ['title', 'content', 'author'],
'exclude_empty': True
}
# Requests会自动处理复杂参数
response = requests.get('https://httpbin.org/get', params=complex_params)
print(f"构建的URL: {response.url}")
result = response.json()
print(f"\n服务器接收到的参数:")
for key, value in result['args'].items():
print(f" {key}: {value}")
# 2. 手动URL编码
print("\n2. 手动URL编码:")
# 处理特殊字符
special_params = {
'query': 'hello world & python',
'symbols': '!@#$%^&*()+={}[]|\\:;"<>?,./'
}
# 方法1: 使用requests自动编码
response1 = requests.get('https://httpbin.org/get', params=special_params)
print(f"自动编码URL: {response1.url}")
# 方法2: 手动编码
encoded_query = quote('hello world & python')
manual_url = f'https://httpbin.org/get?query={encoded_query}'
response2 = requests.get(manual_url)
print(f"手动编码URL: {response2.url}")
# 3. 数组参数的不同处理方式
print("\n3. 数组参数处理:")
# 方式1: Python列表(默认行为)
list_params = {'tags': ['python', 'web', 'crawler']}
response = requests.get('https://httpbin.org/get', params=list_params)
print(f"列表参数URL: {response.url}")
# 方式2: 手动构建重复参数
manual_params = [('tags', 'python'), ('tags', 'web'), ('tags', 'crawler')]
response2 = requests.get('https://httpbin.org/get', params=manual_params)
print(f"手动重复参数URL: {response2.url}")
# 4. 条件参数构建
print("\n4. 条件参数构建:")
def build_search_params(keyword, page=1, filters=None, sort_by=None):
"""
根据条件构建搜索参数
"""
params = {'q': keyword, 'page': page}
if filters:
for key, value in filters.items():
if value: # 只添加非空值
params[f'filter_{key}'] = value
if sort_by:
params['sort'] = sort_by
return params
# 使用条件参数构建
search_filters = {
'category': 'technology',
'author': '', # 空值,不会被添加
'date': '2024-01-01'
}
params = build_search_params(
keyword='Python教程',
page=2,
filters=search_filters,
sort_by='date_desc'
)
response = requests.get('https://httpbin.org/get', params=params)
print(f"条件构建的参数: {response.json()['args']}")
# 运行参数演示
if __name__ == "__main__":
advanced_parameters_demo()
请求头部详解¶
import requests
import time
import random
def advanced_headers_demo():
"""
演示高级请求头处理
"""
print("=== 高级请求头演示 ===")
# 1. 完整的浏览器请求头模拟
print("\n1. 完整浏览器头部模拟:")
browser_headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1', # Do Not Track
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Cache-Control': 'max-age=0'
}
response = requests.get('https://httpbin.org/get', headers=browser_headers)
received_headers = response.json()['headers']
print(f"发送的User-Agent: {browser_headers['User-Agent'][:50]}...")
print(f"服务器接收的User-Agent: {received_headers.get('User-Agent', '')[:50]}...")
print(f"Accept-Language: {received_headers.get('Accept-Language')}")
# 2. API请求头
print("\n2. API请求头:")
api_headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...',
'X-API-Key': 'your-api-key-here',
'X-Client-Version': '1.2.3',
'X-Request-ID': f'req_{int(time.time())}_{random.randint(1000, 9999)}'
}
data = {'query': 'test data'}
response = requests.post('https://httpbin.org/post', json=data, headers=api_headers)
if response.status_code == 200:
result = response.json()
print(f"API请求成功")
print(f"Request ID: {result['headers'].get('X-Request-ID')}")
print(f"Authorization: {result['headers'].get('Authorization', '')[:20]}...")
# 3. 防爬虫头部设置
print("\n3. 防爬虫头部设置:")
# 模拟真实浏览器行为
anti_bot_headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': 'https://www.google.com/', # 模拟从搜索引擎来
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache'
}
response = requests.get('https://httpbin.org/get', headers=anti_bot_headers)
print(f"防爬虫请求状态: {response.status_code}")
print(f"Referer头: {response.json()['headers'].get('Referer')}")
# 4. 动态头部生成
print("\n4. 动态头部生成:")
def generate_dynamic_headers():
"""
生成动态请求头
"""
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0'
]
referers = [
'https://www.google.com/',
'https://www.bing.com/',
'https://www.baidu.com/',
'https://duckduckgo.com/'
]
return {
'User-Agent': random.choice(user_agents),
'Referer': random.choice(referers),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'X-Forwarded-For': f'{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}'
}
# 使用动态头部发送多个请求
for i in range(3):
headers = generate_dynamic_headers()
response = requests.get('https://httpbin.org/get', headers=headers)
if response.status_code == 200:
result = response.json()
print(f"\n请求 {i+1}:")
print(f" User-Agent: {result['headers'].get('User-Agent', '')[:40]}...")
print(f" Referer: {result['headers'].get('Referer')}")
print(f" X-Forwarded-For: {result['headers'].get('X-Forwarded-For')}")
# 5. 头部优先级和覆盖
print("\n5. 头部优先级演示:")
# 创建会话并设置默认头部
session = requests.Session()
session.headers.update({
'User-Agent': 'DefaultAgent/1.0',
'Accept': 'application/json',
'X-Default-Header': 'default-value'
})
# 请求时覆盖部分头部
override_headers = {
'User-Agent': 'OverrideAgent/2.0', # 覆盖默认值
'X-Custom-Header': 'custom-value' # 新增头部
}
response = session.get('https://httpbin.org/get', headers=override_headers)
if response.status_code == 200:
result = response.json()
headers = result['headers']
print(f"最终User-Agent: {headers.get('User-Agent')}")
print(f"默认Accept: {headers.get('Accept')}")
print(f"默认头部: {headers.get('X-Default-Header')}")
print(f"自定义头部: {headers.get('X-Custom-Header')}")
# 运行头部演示
if __name__ == "__main__":
advanced_headers_demo()
响应对象处理¶
响应对象包含了服务器返回的所有信息,正确处理响应对象是爬虫开发的关键技能。
import requests
import json
from datetime import datetime
def response_handling_demo():
"""
演示响应对象的各种处理方法
"""
print("=== 响应对象处理演示 ===")
# 发送一个测试请求
response = requests.get('https://httpbin.org/json')
# 1. 基本响应信息
print("\n1. 基本响应信息:")
print(f"状态码: {response.status_code}")
print(f"状态描述: {response.reason}")
print(f"请求URL: {response.url}")
print(f"响应时间: {response.elapsed.total_seconds():.3f}秒")
print(f"编码: {response.encoding}")
# 2. 响应头详细分析
print("\n2. 响应头分析:")
print(f"Content-Type: {response.headers.get('content-type')}")
print(f"Content-Length: {response.headers.get('content-length')}")
print(f"Server: {response.headers.get('server')}")
print(f"Date: {response.headers.get('date')}")
# 检查是否支持压缩
content_encoding = response.headers.get('content-encoding')
if content_encoding:
print(f"内容编码: {content_encoding}")
else:
print("未使用内容压缩")
# 3. 响应内容的不同获取方式
print("\n3. 响应内容获取:")
# 文本内容
text_content = response.text
print(f"文本内容长度: {len(text_content)}字符")
print(f"文本内容预览: {text_content[:100]}...")
# 二进制内容
binary_content = response.content
print(f"二进制内容长度: {len(binary_content)}字节")
# JSON内容
try:
json_content = response.json()
print(f"JSON内容类型: {type(json_content)}")
if isinstance(json_content, dict):
print(f"JSON键: {list(json_content.keys())}")
except ValueError as e:
print(f"JSON解析失败: {e}")
# 4. 响应状态检查
print("\n4. 响应状态检查:")
def check_response_status(response):
"""
检查响应状态的详细信息
"""
print(f"状态码: {response.status_code}")
# 使用内置方法检查状态
if response.ok:
print("✓ 请求成功 (状态码 200-299)")
else:
print("✗ 请求失败")
# 详细状态分类
if 200 <= response.status_code < 300:
print("✓ 成功响应")
elif 300 <= response.status_code < 400:
print("→ 重定向响应")
location = response.headers.get('location')
if location:
print(f" 重定向到: {location}")
elif 400 <= response.status_code < 500:
print("✗ 客户端错误")
elif 500 <= response.status_code < 600:
print("✗ 服务器错误")
# 使用raise_for_status检查
try:
response.raise_for_status()
print("✓ 状态检查通过")
except requests.exceptions.HTTPError as e:
print(f"✗ 状态检查失败: {e}")
check_response_status(response)
# 5. 测试不同状态码的响应
print("\n5. 不同状态码测试:")
test_urls = [
('https://httpbin.org/status/200', '成功'),
('https://httpbin.org/status/404', '未找到'),
('https://httpbin.org/status/500', '服务器错误'),
('https://httpbin.org/redirect/1', '重定向')
]
for url, description in test_urls:
try:
resp = requests.get(url, timeout=5)
print(f"\n{description} ({url}):")
print(f" 状态码: {resp.status_code}")
print(f" 最终URL: {resp.url}")
if resp.history:
print(f" 重定向历史: {[r.status_code for r in resp.history]}")
except requests.exceptions.RequestException as e:
print(f"\n{description} 请求失败: {e}")
# 6. 响应内容类型处理
print("\n6. 不同内容类型处理:")
def handle_different_content_types():
"""
处理不同类型的响应内容
"""
# JSON响应
json_resp = requests.get('https://httpbin.org/json')
if json_resp.headers.get('content-type', '').startswith('application/json'):
data = json_resp.json()
print(f"JSON数据: {data}")
# HTML响应
html_resp = requests.get('https://httpbin.org/html')
if 'text/html' in html_resp.headers.get('content-type', ''):
print(f"HTML内容长度: {len(html_resp.text)}字符")
# 可以使用BeautifulSoup进一步解析
# XML响应
xml_resp = requests.get('https://httpbin.org/xml')
if 'application/xml' in xml_resp.headers.get('content-type', ''):
print(f"XML内容长度: {len(xml_resp.text)}字符")
# 图片响应(二进制)
try:
img_resp = requests.get('https://httpbin.org/image/png', timeout=10)
if img_resp.headers.get('content-type', '').startswith('image/'):
print(f"图片大小: {len(img_resp.content)}字节")
print(f"图片类型: {img_resp.headers.get('content-type')}")
except requests.exceptions.RequestException:
print("图片请求失败或超时")
handle_different_content_types()
# 7. 响应时间和性能分析
print("\n7. 响应时间分析:")
def analyze_response_performance(url, num_requests=3):
"""
分析响应性能
"""
times = []
for i in range(num_requests):
start_time = datetime.now()
try:
resp = requests.get(url, timeout=10)
end_time = datetime.now()
# 计算总时间
total_time = (end_time - start_time).total_seconds()
# 获取requests内部计时
elapsed_time = resp.elapsed.total_seconds()
times.append({
'total': total_time,
'elapsed': elapsed_time,
'status': resp.status_code
})
print(f"请求 {i+1}: {elapsed_time:.3f}秒 (状态码: {resp.status_code})")
except requests.exceptions.RequestException as e:
print(f"请求 {i+1} 失败: {e}")
if times:
avg_time = sum(t['elapsed'] for t in times) / len(times)
min_time = min(t['elapsed'] for t in times)
max_time = max(t['elapsed'] for t in times)
print(f"\n性能统计:")
print(f" 平均响应时间: {avg_time:.3f}秒")
print(f" 最快响应时间: {min_time:.3f}秒")
print(f" 最慢响应时间: {max_time:.3f}秒")
analyze_response_performance('https://httpbin.org/delay/1')
# 运行响应处理演示
if __name__ == "__main__":
response_handling_demo()
运行结果示例:
=== 响应对象处理演示 ===
1. 基本响应信息:
状态码: 200
状态描述: OK
请求URL: https://httpbin.org/json
响应时间: 0.234秒
编码: utf-8
2. 响应头分析:
Content-Type: application/json
Content-Length: 429
Server: gunicorn/19.9.0
Date: Mon, 15 Jan 2024 06:30:15 GMT
未使用内容压缩
3. 响应内容获取:
文本内容长度: 429字符
文本内容预览: {"slideshow": {"author": "Yours Truly", "date": "date of publication", "slides": [{"title": "Wake up to WonderWidgets!", "type": "all"}, {"title": "Overview", "type": "all", "items": ["Why <em>WonderWidgets</em> are great", "Who <em>buys</em> them"]}], "title": "Sample Slide Show"}}...
二进制内容长度: 429字节
JSON内容类型: <class 'dict'>
JSON键: ['slideshow']
4. 响应状态检查:
状态码: 200
✓ 请求成功 (状态码 200-299)
✓ 成功响应
✓ 状态检查通过
5. 不同状态码测试:
成功 (https://httpbin.org/status/200):
状态码: 200
最终URL: https://httpbin.org/status/200
未找到 (https://httpbin.org/status/404):
状态码: 404
最终URL: https://httpbin.org/status/404
服务器错误 (https://httpbin.org/status/500):
状态码: 500
最终URL: https://httpbin.org/status/500
重定向 (https://httpbin.org/redirect/1):
状态码: 200
最终URL: https://httpbin.org/get
重定向历史: [302]
7. 响应时间分析:
请求 1: 1.234秒 (状态码: 200)
请求 2: 1.156秒 (状态码: 200)
请求 3: 1.298秒 (状态码: 200)
性能统计:
平均响应时间: 1.229秒
最快响应时间: 1.156秒
最慢响应时间: 1.298秒
高级功能¶
Session会话管理¶
Session对象允许你跨请求保持某些参数,它会在同一个Session实例发出的所有请求之间保持cookie,使用urllib3的连接池,所以如果你向同一主机发送多个请求,底层的TCP连接将会被重用,从而带来显著的性能提升。
import requests
import time
from datetime import datetime
def session_management_demo():
"""
演示Session会话管理的各种功能
"""
print("=== Session会话管理演示 ===")
# 1. 基本Session使用
print("\n1. 基本Session使用:")
# 创建Session对象
session = requests.Session()
# 设置Session级别的请求头
session.headers.update({
'User-Agent': 'MyApp/1.0',
'Accept': 'application/json'
})
# 使用Session发送请求
response1 = session.get('https://httpbin.org/get')
print(f"第一次请求状态码: {response1.status_code}")
print(f"User-Agent: {response1.json()['headers'].get('User-Agent')}")
# Session会保持设置的头部
response2 = session.get('https://httpbin.org/headers')
print(f"第二次请求User-Agent: {response2.json()['headers'].get('User-Agent')}")
# 2. Cookie持久化
print("\n2. Cookie持久化演示:")
# 创建新的Session
cookie_session = requests.Session()
# 第一次请求设置cookie
response = cookie_session.get('https://httpbin.org/cookies/set/session_id/abc123')
print(f"设置Cookie后的状态码: {response.status_code}")
# 查看Session中的cookies
print(f"Session中的Cookies: {dict(cookie_session.cookies)}")
# 第二次请求会自动携带cookie
response = cookie_session.get('https://httpbin.org/cookies')
cookies_data = response.json()
print(f"服务器接收到的Cookies: {cookies_data.get('cookies', {})}")
# 3. 连接池和性能优化
print("\n3. 连接池性能对比:")
def test_without_session(num_requests=5):
"""不使用Session的请求"""
start_time = time.time()
for i in range(num_requests):
response = requests.get('https://httpbin.org/get')
if response.status_code != 200:
print(f"请求 {i+1} 失败")
end_time = time.time()
return end_time - start_time
def test_with_session(num_requests=5):
"""使用Session的请求"""
start_time = time.time()
session = requests.Session()
for i in range(num_requests):
response = session.get('https://httpbin.org/get')
if response.status_code != 200:
print(f"请求 {i+1} 失败")
session.close()
end_time = time.time()
return end_time - start_time
print("\n性能测试 (5次请求):")
time_without_session = test_without_session()
time_with_session = test_with_session()
print(f"不使用Session: {time_without_session:.3f}秒")
print(f"使用Session: {time_with_session:.3f}秒")
print(f"性能提升: {((time_without_session - time_with_session) / time_without_session * 100):.1f}%")
# 4. Session配置和自定义
print("\n4. Session配置:")
# 创建自定义配置的Session
custom_session = requests.Session()
# 设置默认超时
custom_session.timeout = 10
# 设置默认参数
custom_session.params = {'api_key': 'your-api-key'}
# 设置默认头部
custom_session.headers.update({
'User-Agent': 'CustomBot/2.0',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive'
})
# 发送请求
response = custom_session.get('https://httpbin.org/get', params={'extra': 'param'})
if response.status_code == 200:
data = response.json()
print(f"最终URL: {response.url}")
print(f"合并后的参数: {data.get('args', {})}")
print(f"请求头: {data.get('headers', {}).get('User-Agent')}")
# 5. Session的请求钩子
print("\n5. 请求钩子演示:")
def log_request_hook(response, *args, **kwargs):
"""请求日志钩子"""
print(f"[钩子] 请求: {response.request.method} {response.url}")
print(f"[钩子] 状态码: {response.status_code}")
print(f"[钩子] 响应时间: {response.elapsed.total_seconds():.3f}秒")
# 创建带钩子的Session
hook_session = requests.Session()
hook_session.hooks['response'].append(log_request_hook)
# 发送请求,钩子会自动执行
print("\n发送带钩子的请求:")
response = hook_session.get('https://httpbin.org/delay/1')
# 6. Session上下文管理
print("\n6. Session上下文管理:")
# 使用with语句自动管理Session生命周期
with requests.Session() as s:
s.headers.update({'User-Agent': 'ContextManager/1.0'})
response = s.get('https://httpbin.org/get')
print(f"上下文管理器请求状态: {response.status_code}")
print(f"User-Agent: {response.json()['headers'].get('User-Agent')}")
# Session会自动关闭
# 7. Session错误处理
print("\n7. Session错误处理:")
error_session = requests.Session()
# 设置重试适配器
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
retry_strategy = Retry(
total=3, # 总重试次数
backoff_factor=1, # 重试间隔
status_forcelist=[429, 500, 502, 503, 504], # 需要重试的状态码
)
adapter = HTTPAdapter(max_retries=retry_strategy)
error_session.mount("http://", adapter)
error_session.mount("https://", adapter)
try:
# 测试重试机制
response = error_session.get('https://httpbin.org/status/500', timeout=5)
print(f"重试后状态码: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"请求最终失败: {e}")
# 8. Session状态管理
print("\n8. Session状态管理:")
state_session = requests.Session()
# 模拟登录流程
login_data = {
'username': 'testuser',
'password': 'testpass'
}
# 第一步:获取登录页面(可能包含CSRF token)
login_page = state_session.get('https://httpbin.org/get')
print(f"获取登录页面: {login_page.status_code}")
# 第二步:提交登录信息
login_response = state_session.post('https://httpbin.org/post', data=login_data)
print(f"登录请求: {login_response.status_code}")
# 第三步:访问需要认证的页面
protected_response = state_session.get('https://httpbin.org/get')
print(f"访问受保护页面: {protected_response.status_code}")
# Session会自动维护整个会话状态
print(f"会话中的Cookie数量: {len(state_session.cookies)}")
# 运行Session演示
if __name__ == "__main__":
session_management_demo()
身份验证¶
Requests支持多种身份验证方式,包括基本认证、摘要认证、OAuth等。
import requests
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
import base64
import hashlib
import time
def authentication_demo():
"""
演示各种身份验证方式
"""
print("=== 身份验证演示 ===")
# 1. HTTP基本认证 (Basic Authentication)
print("\n1. HTTP基本认证:")
# 方法1: 使用auth参数
response = requests.get(
'https://httpbin.org/basic-auth/user/pass',
auth=('user', 'pass')
)
print(f"基本认证状态码: {response.status_code}")
if response.status_code == 200:
print(f"认证成功: {response.json()}")
# 方法2: 使用HTTPBasicAuth类
response2 = requests.get(
'https://httpbin.org/basic-auth/testuser/testpass',
auth=HTTPBasicAuth('testuser', 'testpass')
)
print(f"HTTPBasicAuth状态码: {response2.status_code}")
# 方法3: 手动设置Authorization头
credentials = base64.b64encode(b'user:pass').decode('ascii')
headers = {'Authorization': f'Basic {credentials}'}
response3 = requests.get(
'https://httpbin.org/basic-auth/user/pass',
headers=headers
)
print(f"手动设置头部状态码: {response3.status_code}")
# 2. HTTP摘要认证 (Digest Authentication)
print("\n2. HTTP摘要认证:")
try:
response = requests.get(
'https://httpbin.org/digest-auth/auth/user/pass',
auth=HTTPDigestAuth('user', 'pass')
)
print(f"摘要认证状态码: {response.status_code}")
if response.status_code == 200:
print(f"摘要认证成功: {response.json()}")
except Exception as e:
print(f"摘要认证失败: {e}")
# 3. Bearer Token认证
print("\n3. Bearer Token认证:")
# 模拟JWT token
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
headers = {'Authorization': f'Bearer {token}'}
response = requests.get('https://httpbin.org/bearer', headers=headers)
print(f"Bearer Token状态码: {response.status_code}")
if response.status_code == 200:
print(f"Token认证成功: {response.json()}")
# 4. API Key认证
print("\n4. API Key认证:")
# 方法1: 在URL参数中
api_key = "your-api-key-here"
response = requests.get(
'https://httpbin.org/get',
params={'api_key': api_key}
)
print(f"URL参数API Key: {response.json()['args']}")
# 方法2: 在请求头中
headers = {'X-API-Key': api_key}
response2 = requests.get('https://httpbin.org/get', headers=headers)
print(f"请求头API Key: {response2.json()['headers'].get('X-Api-Key')}")
# 5. 自定义认证类
print("\n5. 自定义认证类:")
class CustomAuth(requests.auth.AuthBase):
"""自定义认证类"""
def __init__(self, api_key, secret_key):
self.api_key = api_key
self.secret_key = secret_key
def __call__(self, r):
# 生成时间戳
timestamp = str(int(time.time()))
# 生成签名
string_to_sign = f"{r.method}\n{r.url}\n{timestamp}"
signature = hashlib.sha256(
(string_to_sign + self.secret_key).encode('utf-8')
).hexdigest()
# 添加认证头
r.headers['X-API-Key'] = self.api_key
r.headers['X-Timestamp'] = timestamp
r.headers['X-Signature'] = signature
return r
# 使用自定义认证
custom_auth = CustomAuth('my-api-key', 'my-secret-key')
response = requests.get('https://httpbin.org/get', auth=custom_auth)
if response.status_code == 200:
headers = response.json()['headers']
print(f"自定义认证头部:")
print(f" X-API-Key: {headers.get('X-Api-Key')}")
print(f" X-Timestamp: {headers.get('X-Timestamp')}")
print(f" X-Signature: {headers.get('X-Signature', '')[:20]}...")
# 6. OAuth 2.0 模拟
print("\n6. OAuth 2.0 模拟:")
def oauth2_flow_simulation():
"""模拟OAuth 2.0授权流程"""
# 第一步: 获取授权码 (实际应用中用户会被重定向到授权服务器)
auth_url = "https://httpbin.org/get"
auth_params = {
'response_type': 'code',
'client_id': 'your-client-id',
'redirect_uri': 'https://yourapp.com/callback',
'scope': 'read write',
'state': 'random-state-string'
}
print(f"授权URL: {auth_url}?{'&'.join([f'{k}={v}' for k, v in auth_params.items()])}")
# 第二步: 使用授权码获取访问令牌
token_data = {
'grant_type': 'authorization_code',
'code': 'received-auth-code',
'redirect_uri': 'https://yourapp.com/callback',
'client_id': 'your-client-id',
'client_secret': 'your-client-secret'
}
# 模拟获取token
token_response = requests.post('https://httpbin.org/post', data=token_data)
print(f"Token请求状态: {token_response.status_code}")
# 第三步: 使用访问令牌访问API
access_token = "mock-access-token-12345"
api_headers = {'Authorization': f'Bearer {access_token}'}
api_response = requests.get('https://httpbin.org/get', headers=api_headers)
print(f"API访问状态: {api_response.status_code}")
return access_token
oauth_token = oauth2_flow_simulation()
# 7. 会话级认证
print("\n7. 会话级认证:")
# 创建带认证的Session
auth_session = requests.Session()
auth_session.auth = ('session_user', 'session_pass')
# 所有通过这个Session的请求都会自动包含认证信息
response1 = auth_session.get('https://httpbin.org/basic-auth/session_user/session_pass')
print(f"会话认证请求1: {response1.status_code}")
response2 = auth_session.get('https://httpbin.org/basic-auth/session_user/session_pass')
print(f"会话认证请求2: {response2.status_code}")
# 8. 认证错误处理
print("\n8. 认证错误处理:")
def handle_auth_errors():
"""处理认证相关错误"""
# 测试错误的认证信息
try:
response = requests.get(
'https://httpbin.org/basic-auth/user/pass',
auth=('wrong_user', 'wrong_pass'),
timeout=5
)
if response.status_code == 401:
print("✗ 认证失败: 用户名或密码错误")
print(f" WWW-Authenticate: {response.headers.get('WWW-Authenticate')}")
elif response.status_code == 403:
print("✗ 访问被拒绝: 权限不足")
else:
print(f"认证状态: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"认证请求异常: {e}")
handle_auth_errors()
# 运行认证演示
if __name__ == "__main__":
authentication_demo()
代理设置和SSL配置¶
在爬虫开发中,代理和SSL配置是非常重要的功能,可以帮助我们绕过网络限制和确保安全通信。
import requests
import ssl
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context
def proxy_and_ssl_demo():
"""
演示代理设置和SSL配置
"""
print("=== 代理设置和SSL配置演示 ===")
# 1. HTTP代理设置
print("\n1. HTTP代理设置:")
# 基本代理设置
proxies = {
'http': 'http://proxy.example.com:8080',
'https': 'https://proxy.example.com:8080'
}
# 注意:这里使用示例代理,实际运行时需要替换为真实代理
print(f"配置的代理: {proxies}")
# 带认证的代理
auth_proxies = {
'http': 'http://username:password@proxy.example.com:8080',
'https': 'https://username:password@proxy.example.com:8080'
}
print(f"带认证的代理: {auth_proxies}")
# 2. SOCKS代理设置
print("\n2. SOCKS代理设置:")
# 需要安装: pip install requests[socks]
socks_proxies = {
'http': 'socks5://127.0.0.1:1080',
'https': 'socks5://127.0.0.1:1080'
}
print(f"SOCKS代理配置: {socks_proxies}")
# 3. 代理轮换
print("\n3. 代理轮换演示:")
import random
proxy_list = [
{'http': 'http://proxy1.example.com:8080', 'https': 'https://proxy1.example.com:8080'},
{'http': 'http://proxy2.example.com:8080', 'https': 'https://proxy2.example.com:8080'},
{'http': 'http://proxy3.example.com:8080', 'https': 'https://proxy3.example.com:8080'}
]
def get_random_proxy():
"""获取随机代理"""
return random.choice(proxy_list)
# 模拟使用不同代理发送请求
for i in range(3):
proxy = get_random_proxy()
print(f"请求 {i+1} 使用代理: {proxy['http']}")
# 实际请求代码:
# response = requests.get('https://httpbin.org/ip', proxies=proxy, timeout=10)
# 4. 代理验证和测试
print("\n4. 代理验证:")
def test_proxy(proxy_dict, test_url='https://httpbin.org/ip'):
"""测试代理是否可用"""
try:
response = requests.get(
test_url,
proxies=proxy_dict,
timeout=10
)
if response.status_code == 200:
ip_info = response.json()
print(f"✓ 代理可用")
print(f" 出口IP: {ip_info.get('origin')}")
print(f" 响应时间: {response.elapsed.total_seconds():.3f}秒")
return True
else:
print(f"✗ 代理响应异常: {response.status_code}")
return False
except requests.exceptions.ProxyError:
print("✗ 代理连接失败")
return False
except requests.exceptions.Timeout:
print("✗ 代理连接超时")
return False
except requests.exceptions.RequestException as e:
print(f"✗ 代理请求异常: {e}")
return False
# 测试直连(无代理)
print("\n测试直连:")
try:
direct_response = requests.get('https://httpbin.org/ip', timeout=10)
if direct_response.status_code == 200:
ip_info = direct_response.json()
print(f"✓ 直连成功")
print(f" 本地IP: {ip_info.get('origin')}")
except Exception as e:
print(f"✗ 直连失败: {e}")
# 5. SSL配置
print("\n5. SSL配置演示:")
# 禁用SSL验证(不推荐用于生产环境)
print("\n禁用SSL验证:")
try:
response = requests.get(
'https://httpbin.org/get',
verify=False # 禁用SSL证书验证
)
print(f"✓ 禁用SSL验证请求成功: {response.status_code}")
except Exception as e:
print(f"✗ SSL请求失败: {e}")
# 自定义CA证书
print("\n自定义CA证书:")
# 指定CA证书文件路径
# response = requests.get('https://httpbin.org/get', verify='/path/to/ca-bundle.crt')
print("可以通过verify参数指定CA证书文件路径")
# 客户端证书认证
print("\n客户端证书认证:")
# cert参数可以是证书文件路径的字符串,或者是(cert, key)元组
# response = requests.get('https://httpbin.org/get', cert=('/path/to/client.cert', '/path/to/client.key'))
print("可以通过cert参数指定客户端证书")
# 6. 自定义SSL上下文
print("\n6. 自定义SSL上下文:")
class SSLAdapter(HTTPAdapter):
"""自定义SSL适配器"""
def __init__(self, ssl_context=None, **kwargs):
self.ssl_context = ssl_context
super().__init__(**kwargs)
def init_poolmanager(self, *args, **kwargs):
kwargs['ssl_context'] = self.ssl_context
return super().init_poolmanager(*args, **kwargs)
# 创建自定义SSL上下文
ssl_context = create_urllib3_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# 使用自定义SSL适配器
session = requests.Session()
session.mount('https://', SSLAdapter(ssl_context))
try:
response = session.get('https://httpbin.org/get')
print(f"✓ 自定义SSL上下文请求成功: {response.status_code}")
except Exception as e:
print(f"✗ 自定义SSL请求失败: {e}")
# 7. 综合配置示例
print("\n7. 综合配置示例:")
def create_secure_session(proxy=None, verify_ssl=True, client_cert=None):
"""创建安全配置的Session"""
session = requests.Session()
# 设置代理
if proxy:
session.proxies.update(proxy)
# SSL配置
session.verify = verify_ssl
if client_cert:
session.cert = client_cert
# 设置超时
session.timeout = 30
# 设置重试
from urllib3.util.retry import Retry
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
# 创建配置好的Session
secure_session = create_secure_session(
# proxy={'http': 'http://proxy.example.com:8080'},
verify_ssl=True
)
try:
response = secure_session.get('https://httpbin.org/get')
print(f"✓ 安全Session请求成功: {response.status_code}")
print(f" SSL验证: {'启用' if secure_session.verify else '禁用'}")
print(f" 代理设置: {secure_session.proxies if secure_session.proxies else '无'}")
except Exception as e:
print(f"✗ 安全Session请求失败: {e}")
# 8. 环境变量代理配置
print("\n8. 环境变量代理配置:")
import os
# Requests会自动读取这些环境变量
env_vars = {
'HTTP_PROXY': 'http://proxy.example.com:8080',
'HTTPS_PROXY': 'https://proxy.example.com:8080',
'NO_PROXY': 'localhost,127.0.0.1,.local'
}
print("可以设置的环境变量:")
for var, value in env_vars.items():
print(f" {var}={value}")
# 检查当前环境变量
current_proxy = os.environ.get('HTTP_PROXY') or os.environ.get('http_proxy')
if current_proxy:
print(f"当前HTTP代理: {current_proxy}")
else:
print("未设置HTTP代理环境变量")
# 运行代理和SSL演示
if __name__ == "__main__":
proxy_and_ssl_demo()
Cookie处理¶
Cookie是Web应用中维护状态的重要机制,Requests提供了强大的Cookie处理功能。
import requests
from http.cookies import SimpleCookie
import time
from datetime import datetime, timedelta
def cookie_handling_demo():
"""
演示Cookie处理的各种功能
"""
print("=== Cookie处理演示 ===")
# 1. 基本Cookie操作
print("\n1. 基本Cookie操作:")
# 发送带Cookie的请求
cookies = {'session_id': 'abc123', 'user_pref': 'dark_mode'}
response = requests.get('https://httpbin.org/cookies', cookies=cookies)
if response.status_code == 200:
received_cookies = response.json().get('cookies', {})
print(f"发送的Cookies: {cookies}")
print(f"服务器接收的Cookies: {received_cookies}")
# 2. 从响应中获取Cookie
print("\n2. 从响应中获取Cookie:")
# 请求设置Cookie的URL
response = requests.get('https://httpbin.org/cookies/set/test_cookie/test_value')
print(f"响应状态码: {response.status_code}")
print(f"响应中的Cookies: {dict(response.cookies)}")
# 查看Cookie详细信息
for cookie in response.cookies:
print(f"Cookie详情:")
print(f" 名称: {cookie.name}")
print(f" 值: {cookie.value}")
print(f" 域: {cookie.domain}")
print(f" 路径: {cookie.path}")
print(f" 过期时间: {cookie.expires}")
print(f" 安全标志: {cookie.secure}")
print(f" HttpOnly: {cookie.has_nonstandard_attr('HttpOnly')}")
# 3. Cookie持久化
print("\n3. Cookie持久化演示:")
# 创建Session来自动管理Cookie
session = requests.Session()
# 第一次请求,服务器设置Cookie
response1 = session.get('https://httpbin.org/cookies/set/persistent_cookie/persistent_value')
print(f"第一次请求状态: {response1.status_code}")
print(f"Session中的Cookies: {dict(session.cookies)}")
# 第二次请求,自动携带Cookie
response2 = session.get('https://httpbin.org/cookies')
if response2.status_code == 200:
cookies_data = response2.json()
print(f"第二次请求携带的Cookies: {cookies_data.get('cookies', {})}")
# 4. 手动Cookie管理
print("\n4. 手动Cookie管理:")
from requests.cookies import RequestsCookieJar
# 创建Cookie容器
jar = RequestsCookieJar()
# 添加Cookie
jar.set('custom_cookie', 'custom_value', domain='httpbin.org', path='/')
jar.set('another_cookie', 'another_value', domain='httpbin.org', path='/')
# 使用自定义Cookie容器
response = requests.get('https://httpbin.org/cookies', cookies=jar)
if response.status_code == 200:
print(f"自定义Cookie容器: {dict(jar)}")
print(f"服务器接收: {response.json().get('cookies', {})}")
# 5. Cookie的高级属性
print("\n5. Cookie高级属性演示:")
def create_advanced_cookie():
"""创建带高级属性的Cookie"""
jar = RequestsCookieJar()
# 设置带过期时间的Cookie
expire_time = int(time.time()) + 3600 # 1小时后过期
jar.set(
'session_token',
'token_12345',
domain='httpbin.org',
path='/',
expires=expire_time,
secure=True, # 只在HTTPS下传输
rest={'HttpOnly': True} # 防止JavaScript访问
)
# 设置SameSite属性
jar.set(
'csrf_token',
'csrf_abc123',
domain='httpbin.org',
path='/',
rest={'SameSite': 'Strict'}
)
return jar
advanced_jar = create_advanced_cookie()
print(f"高级Cookie容器: {dict(advanced_jar)}")
# 6. Cookie文件操作
print("\n6. Cookie文件操作:")
import pickle
import os
# 保存Cookie到文件
def save_cookies_to_file(session, filename):
"""保存Session的Cookie到文件"""
with open(filename, 'wb') as f:
pickle.dump(session.cookies, f)
print(f"Cookies已保存到: {filename}")
# 从文件加载Cookie
def load_cookies_from_file(session, filename):
"""从文件加载Cookie到Session"""
if os.path.exists(filename):
with open(filename, 'rb') as f:
session.cookies.update(pickle.load(f))
print(f"Cookies已从文件加载: {filename}")
return True
return False
# 演示Cookie文件操作
cookie_session = requests.Session()
# 设置一些Cookie
cookie_session.get('https://httpbin.org/cookies/set/file_cookie/file_value')
# 保存到文件
cookie_file = 'session_cookies.pkl'
save_cookies_to_file(cookie_session, cookie_file)
# 创建新Session并加载Cookie
new_session = requests.Session()
if load_cookies_from_file(new_session, cookie_file):
response = new_session.get('https://httpbin.org/cookies')
if response.status_code == 200:
print(f"加载的Cookies验证: {response.json().get('cookies', {})}")
# 清理文件
if os.path.exists(cookie_file):
os.remove(cookie_file)
print(f"已清理Cookie文件: {cookie_file}")
# 7. Cookie域和路径管理
print("\n7. Cookie域和路径管理:")
def demonstrate_cookie_scope():
"""演示Cookie的作用域"""
jar = RequestsCookieJar()
# 设置不同域和路径的Cookie
jar.set('global_cookie', 'global_value', domain='.example.com', path='/')
jar.set('api_cookie', 'api_value', domain='api.example.com', path='/v1/')
jar.set('admin_cookie', 'admin_value', domain='admin.example.com', path='/admin/')
print("Cookie作用域演示:")
for cookie in jar:
print(f" {cookie.name}: 域={cookie.domain}, 路径={cookie.path}")
return jar
scope_jar = demonstrate_cookie_scope()
# 8. Cookie安全性
print("\n8. Cookie安全性演示:")
def create_secure_cookies():
"""创建安全的Cookie设置"""
jar = RequestsCookieJar()
# 安全Cookie设置
security_settings = {
'session_id': {
'value': 'secure_session_123',
'secure': True, # 只在HTTPS传输
'httponly': True, # 防止XSS攻击
'samesite': 'Strict', # 防止CSRF攻击
'expires': int(time.time()) + 1800 # 30分钟过期
},
'csrf_token': {
'value': 'csrf_token_456',
'secure': True,
'samesite': 'Strict',
'expires': int(time.time()) + 3600 # 1小时过期
}
}
for name, settings in security_settings.items():
jar.set(
name,
settings['value'],
domain='httpbin.org',
path='/',
expires=settings.get('expires'),
secure=settings.get('secure', False),
rest={
'HttpOnly': settings.get('httponly', False),
'SameSite': settings.get('samesite', 'Lax')
}
)
print("安全Cookie配置:")
for cookie in jar:
print(f" {cookie.name}: 安全={cookie.secure}")
return jar
secure_jar = create_secure_cookies()
# 9. Cookie调试和分析
print("\n9. Cookie调试和分析:")
def analyze_cookies(response):
"""分析响应中的Cookie"""
print("Cookie分析报告:")
if not response.cookies:
print(" 无Cookie")
return
for cookie in response.cookies:
print(f"\n Cookie: {cookie.name}")
print(f" 值: {cookie.value}")
print(f" 域: {cookie.domain or '未设置'}")
print(f" 路径: {cookie.path or '/'}")
if cookie.expires:
expire_date = datetime.fromtimestamp(cookie.expires)
print(f" 过期时间: {expire_date}")
# 检查是否即将过期
if expire_date < datetime.now() + timedelta(hours=1):
print(f" ⚠️ 警告: Cookie将在1小时内过期")
else:
print(f" 过期时间: 会话结束")
print(f" 安全标志: {cookie.secure}")
print(f" 大小: {len(cookie.value)}字节")
# 检查Cookie大小
if len(cookie.value) > 4000:
print(f" ⚠️ 警告: Cookie过大,可能被截断")
# 分析一个带Cookie的响应
test_response = requests.get('https://httpbin.org/cookies/set/analysis_cookie/test_analysis_value')
analyze_cookies(test_response)
# 10. Cookie错误处理
print("\n10. Cookie错误处理:")
def handle_cookie_errors():
"""处理Cookie相关错误"""
try:
# 尝试设置无效的Cookie
jar = RequestsCookieJar()
# 测试各种边界情况
test_cases = [
('valid_cookie', 'valid_value'),
('', 'empty_name'), # 空名称
('space cookie', 'space_in_name'), # 名称包含空格
('valid_name', ''), # 空值
('long_cookie', 'x' * 5000), # 超长值
]
for name, value in test_cases:
try:
jar.set(name, value, domain='httpbin.org')
print(f"✓ 成功设置Cookie: {name[:20]}...")
except Exception as e:
print(f"✗ 设置Cookie失败 ({name[:20]}...): {e}")
# 测试Cookie发送
response = requests.get('https://httpbin.org/cookies', cookies=jar, timeout=5)
print(f"Cookie发送测试: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"Cookie请求异常: {e}")
except Exception as e:
print(f"Cookie处理异常: {e}")
handle_cookie_errors()
# 运行Cookie演示
if __name__ == "__main__":
cookie_handling_demo()
文件上传和下载¶
文件传输是网络爬虫和自动化中的重要功能,Requests提供了简单而强大的文件处理能力。
import requests
import os
import io
from pathlib import Path
import mimetypes
import hashlib
from tqdm import tqdm
def file_transfer_demo():
"""
演示文件上传和下载功能
"""
print("=== 文件上传和下载演示 ===")
# 1. 基本文件上传
print("\n1. 基本文件上传:")
# 创建测试文件
test_file_content = "这是一个测试文件\nTest file content\n测试数据123"
test_file_path = "test_upload.txt"
with open(test_file_path, 'w', encoding='utf-8') as f:
f.write(test_file_content)
# 方法1: 使用files参数上传
with open(test_file_path, 'rb') as f:
files = {'file': f}
response = requests.post('https://httpbin.org/post', files=files)
if response.status_code == 200:
result = response.json()
print(f"文件上传成功")
print(f"上传的文件信息: {result.get('files', {})}")
# 2. 高级文件上传
print("\n2. 高级文件上传:")
# 指定文件名和MIME类型
with open(test_file_path, 'rb') as f:
files = {
'document': ('custom_name.txt', f, 'text/plain'),
'metadata': ('info.json', io.StringIO('{"type": "document"}'), 'application/json')
}
# 同时发送表单数据
data = {
'title': '测试文档',
'description': '这是一个测试上传',
'category': 'test'
}
response = requests.post('https://httpbin.org/post', files=files, data=data)
if response.status_code == 200:
result = response.json()
print(f"高级上传成功")
print(f"表单数据: {result.get('form', {})}")
print(f"文件数据: {list(result.get('files', {}).keys())}")
# 3. 多文件上传
print("\n3. 多文件上传:")
# 创建多个测试文件
test_files = []
for i in range(3):
filename = f"test_file_{i+1}.txt"
content = f"这是测试文件 {i+1}\nFile {i+1} content\n"
with open(filename, 'w', encoding='utf-8') as f:
f.write(content)
test_files.append(filename)
# 上传多个文件
files = []
for filename in test_files:
files.append(('files', (filename, open(filename, 'rb'), 'text/plain')))
try:
response = requests.post('https://httpbin.org/post', files=files)
if response.status_code == 200:
result = response.json()
print(f"多文件上传成功")
print(f"上传文件数量: {len(result.get('files', {}))}")
finally:
# 关闭文件句柄
for _, (_, file_obj, _) in files:
file_obj.close()
# 4. 内存文件上传
print("\n4. 内存文件上传:")
# 创建内存中的文件
memory_file = io.BytesIO()
memory_file.write("内存中的文件内容\nMemory file content".encode('utf-8'))
memory_file.seek(0) # 重置指针到开始
files = {'memory_file': ('memory.txt', memory_file, 'text/plain')}
response = requests.post('https://httpbin.org/post', files=files)
if response.status_code == 200:
print(f"内存文件上传成功")
memory_file.close()
# 5. 文件下载基础
print("\n5. 文件下载基础:")
# 下载小文件
download_url = 'https://httpbin.org/json'
response = requests.get(download_url)
if response.status_code == 200:
# 保存到文件
download_filename = 'downloaded_data.json'
with open(download_filename, 'wb') as f:
f.write(response.content)
print(f"文件下载成功: {download_filename}")
print(f"文件大小: {len(response.content)}字节")
print(f"Content-Type: {response.headers.get('content-type')}")
# 6. 大文件下载(流式下载)
print("\n6. 大文件流式下载:")
def download_large_file(url, filename, chunk_size=8192):
"""流式下载大文件"""
try:
with requests.get(url, stream=True) as response:
response.raise_for_status()
# 获取文件大小
total_size = int(response.headers.get('content-length', 0))
with open(filename, 'wb') as f:
if total_size > 0:
# 使用进度条
with tqdm(total=total_size, unit='B', unit_scale=True, desc=filename) as pbar:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
pbar.update(len(chunk))
else:
# 无法获取文件大小时
downloaded = 0
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
print(f"\r已下载: {downloaded}字节", end='', flush=True)
print() # 换行
print(f"\n✓ 文件下载完成: {filename}")
return True
except requests.exceptions.RequestException as e:
print(f"✗ 下载失败: {e}")
return False
# 演示流式下载(使用较小的文件作为示例)
large_file_url = 'https://httpbin.org/bytes/10240' # 10KB测试文件
if download_large_file(large_file_url, 'large_download.bin'):
file_size = os.path.getsize('large_download.bin')
print(f"下载文件大小: {file_size}字节")
# 7. 断点续传下载
print("\n7. 断点续传下载:")
def resume_download(url, filename, chunk_size=8192):
"""支持断点续传的下载"""
# 检查本地文件是否存在
resume_pos = 0
if os.path.exists(filename):
resume_pos = os.path.getsize(filename)
print(f"发现本地文件,从位置 {resume_pos} 继续下载")
# 设置Range头进行断点续传
headers = {'Range': f'bytes={resume_pos}-'} if resume_pos > 0 else {}
try:
response = requests.get(url, headers=headers, stream=True)
# 检查服务器是否支持断点续传
if resume_pos > 0 and response.status_code != 206:
print("服务器不支持断点续传,重新下载")
resume_pos = 0
response = requests.get(url, stream=True)
response.raise_for_status()
# 获取总文件大小
if 'content-range' in response.headers:
total_size = int(response.headers['content-range'].split('/')[-1])
else:
total_size = int(response.headers.get('content-length', 0)) + resume_pos
# 打开文件(追加模式如果是续传)
mode = 'ab' if resume_pos > 0 else 'wb'
with open(filename, mode) as f:
downloaded = resume_pos
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
progress = (downloaded / total_size) * 100
print(f"\r下载进度: {progress:.1f}% ({downloaded}/{total_size})", end='', flush=True)
print(f"\n✓ 下载完成: {filename}")
return True
except requests.exceptions.RequestException as e:
print(f"✗ 下载失败: {e}")
return False
# 演示断点续传(模拟)
resume_url = 'https://httpbin.org/bytes/5120' # 5KB测试文件
resume_filename = 'resume_download.bin'
# 先下载一部分(模拟中断)
try:
response = requests.get(resume_url, stream=True)
with open(resume_filename, 'wb') as f:
for i, chunk in enumerate(response.iter_content(chunk_size=1024)):
if i >= 2: # 只下载前2KB
break
f.write(chunk)
print(f"模拟下载中断,已下载: {os.path.getsize(resume_filename)}字节")
except:
pass
# 继续下载
resume_download(resume_url, resume_filename)
# 8. 文件完整性验证
print("\n8. 文件完整性验证:")
def verify_file_integrity(filename, expected_hash=None, hash_algorithm='md5'):
"""验证文件完整性"""
if not os.path.exists(filename):
print(f"✗ 文件不存在: {filename}")
return False
# 计算文件哈希
hash_obj = hashlib.new(hash_algorithm)
with open(filename, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
hash_obj.update(chunk)
file_hash = hash_obj.hexdigest()
print(f"文件 {filename} 的{hash_algorithm.upper()}哈希: {file_hash}")
if expected_hash:
if file_hash == expected_hash:
print(f"✓ 文件完整性验证通过")
return True
else:
print(f"✗ 文件完整性验证失败")
print(f" 期望: {expected_hash}")
print(f" 实际: {file_hash}")
return False
return True
# 验证下载的文件
for filename in ['downloaded_data.json', 'large_download.bin']:
if os.path.exists(filename):
verify_file_integrity(filename)
# 9. 自动MIME类型检测
print("\n9. 自动MIME类型检测:")
def upload_with_auto_mime(filename):
"""自动检测MIME类型并上传"""
if not os.path.exists(filename):
print(f"文件不存在: {filename}")
return
# 自动检测MIME类型
mime_type, _ = mimetypes.guess_type(filename)
if mime_type is None:
mime_type = 'application/octet-stream' # 默认二进制类型
print(f"文件: {filename}")
print(f"检测到的MIME类型: {mime_type}")
with open(filename, 'rb') as f:
files = {'file': (filename, f, mime_type)}
response = requests.post('https://httpbin.org/post', files=files)
if response.status_code == 200:
print(f"✓ 上传成功")
else:
print(f"✗ 上传失败: {response.status_code}")
# 测试不同类型的文件
test_files_mime = ['test_upload.txt', 'downloaded_data.json']
for filename in test_files_mime:
if os.path.exists(filename):
upload_with_auto_mime(filename)
# 10. 清理测试文件
print("\n10. 清理测试文件:")
cleanup_files = [
test_file_path, 'downloaded_data.json', 'large_download.bin',
'resume_download.bin'
] + test_files
for filename in cleanup_files:
if os.path.exists(filename):
try:
os.remove(filename)
print(f"✓ 已删除: {filename}")
except Exception as e:
print(f"✗ 删除失败 {filename}: {e}")
# 运行文件传输演示
if __name__ == "__main__":
file_transfer_demo()
超时和重试机制¶
在网络请求中,超时和重试机制是确保程序稳定性的重要功能。
import requests
import time
import random
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from functools import wraps
import logging
def timeout_and_retry_demo():
"""
演示超时和重试机制
"""
print("=== 超时和重试机制演示 ===")
# 1. 基本超时设置
print("\n1. 基本超时设置:")
# 连接超时和读取超时
try:
# timeout=(连接超时, 读取超时)
response = requests.get('https://httpbin.org/delay/2', timeout=(5, 10))
print(f"请求成功: {response.status_code}")
print(f"响应时间: {response.elapsed.total_seconds():.2f}秒")
except requests.exceptions.Timeout as e:
print(f"请求超时: {e}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
# 2. 不同类型的超时
print("\n2. 不同类型的超时演示:")
def test_different_timeouts():
"""测试不同的超时设置"""
timeout_configs = [
("单一超时", 5), # 连接和读取都是5秒
("分别设置", (3, 10)), # 连接3秒,读取10秒
("只设置连接超时", (2, None)), # 只设置连接超时
]
for desc, timeout in timeout_configs:
try:
print(f"\n测试 {desc}: {timeout}")
start_time = time.time()
response = requests.get('https://httpbin.org/delay/1', timeout=timeout)
elapsed = time.time() - start_time
print(f" ✓ 成功: {response.status_code}, 耗时: {elapsed:.2f}秒")
except requests.exceptions.Timeout as e:
elapsed = time.time() - start_time
print(f" ✗ 超时: {elapsed:.2f}秒, {e}")
except Exception as e:
print(f" ✗ 异常: {e}")
test_different_timeouts()
# 3. 手动重试机制
print("\n3. 手动重试机制:")
def manual_retry(url, max_retries=3, delay=1, backoff=2):
"""手动实现重试机制"""
for attempt in range(max_retries + 1):
try:
print(f" 尝试 {attempt + 1}/{max_retries + 1}: {url}")
response = requests.get(url, timeout=5)
# 检查响应状态
if response.status_code == 200:
print(f" ✓ 成功: {response.status_code}")
return response
elif response.status_code >= 500:
# 服务器错误,可以重试
print(f" 服务器错误 {response.status_code},准备重试")
raise requests.exceptions.RequestException(f"Server error: {response.status_code}")
else:
# 客户端错误,不重试
print(f" 客户端错误 {response.status_code},不重试")
return response
except (requests.exceptions.Timeout,
requests.exceptions.ConnectionError,
requests.exceptions.RequestException) as e:
print(f" ✗ 请求失败: {e}")
if attempt < max_retries:
wait_time = delay * (backoff ** attempt)
print(f" 等待 {wait_time:.1f}秒 后重试...")
time.sleep(wait_time)
else:
print(f" 已达到最大重试次数,放弃")
raise
return None
# 测试手动重试
try:
response = manual_retry('https://httpbin.org/status/500', max_retries=2)
except Exception as e:
print(f"手动重试最终失败: {e}")
# 4. 使用urllib3的重试策略
print("\n4. urllib3重试策略:")
def create_retry_session():
"""创建带重试策略的Session"""
session = requests.Session()
# 定义重试策略
retry_strategy = Retry(
total=3, # 总重试次数
status_forcelist=[429, 500, 502, 503, 504], # 需要重试的状态码
method_whitelist=["HEAD", "GET", "OPTIONS"], # 允许重试的方法
backoff_factor=1, # 退避因子
raise_on_redirect=False,
raise_on_status=False
)
# 创建适配器
adapter = HTTPAdapter(max_retries=retry_strategy)
# 挂载适配器
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
# 使用重试Session
retry_session = create_retry_session()
try:
print("使用重试Session请求:")
response = retry_session.get('https://httpbin.org/status/503', timeout=10)
print(f"最终响应: {response.status_code}")
except Exception as e:
print(f"重试Session失败: {e}")
# 5. 高级重试配置
print("\n5. 高级重试配置:")
def create_advanced_retry_session():
"""创建高级重试配置的Session"""
session = requests.Session()
# 高级重试策略
retry_strategy = Retry(
total=5, # 总重试次数
read=3, # 读取重试次数
connect=3, # 连接重试次数
status=3, # 状态码重试次数
status_forcelist=[408, 429, 500, 502, 503, 504, 520, 522, 524],
method_whitelist=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"],
backoff_factor=0.3, # 退避因子:{backoff factor} * (2 ** ({number of total retries} - 1))
raise_on_redirect=False,
raise_on_status=False,
respect_retry_after_header=True # 尊重服务器的Retry-After头
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
advanced_session = create_advanced_retry_session()
# 测试高级重试
test_urls = [
('正常请求', 'https://httpbin.org/get'),
('服务器错误', 'https://httpbin.org/status/500'),
('超时请求', 'https://httpbin.org/delay/3')
]
for desc, url in test_urls:
try:
print(f"\n测试 {desc}:")
start_time = time.time()
response = advanced_session.get(url, timeout=(5, 10))
elapsed = time.time() - start_time
print(f" ✓ 响应: {response.status_code}, 耗时: {elapsed:.2f}秒")
except Exception as e:
elapsed = time.time() - start_time
print(f" ✗ 失败: {e}, 耗时: {elapsed:.2f}秒")
# 6. 装饰器重试
print("\n6. 装饰器重试:")
def retry_decorator(max_retries=3, delay=1, backoff=2, exceptions=(Exception,)):
"""重试装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
if attempt == max_retries:
print(f"装饰器重试失败,已达最大次数: {e}")
raise
wait_time = delay * (backoff ** attempt)
print(f"装饰器重试 {attempt + 1}/{max_retries + 1} 失败: {e}")
print(f"等待 {wait_time:.1f}秒 后重试...")
time.sleep(wait_time)
return wrapper
return decorator
@retry_decorator(max_retries=2, delay=0.5, exceptions=(requests.exceptions.RequestException,))
def unreliable_request(url):
"""不稳定的请求函数"""
# 模拟随机失败
if random.random() < 0.7: # 70%概率失败
raise requests.exceptions.ConnectionError("模拟连接失败")
response = requests.get(url, timeout=5)
return response
# 测试装饰器重试
try:
print("测试装饰器重试:")
response = unreliable_request('https://httpbin.org/get')
print(f"装饰器重试成功: {response.status_code}")
except Exception as e:
print(f"装饰器重试最终失败: {e}")
# 7. 智能重试策略
print("\n7. 智能重试策略:")
class SmartRetry:
"""智能重试类"""
def __init__(self, max_retries=3, base_delay=1, max_delay=60):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.attempt_count = 0
def should_retry(self, exception, response=None):
"""判断是否应该重试"""
# 网络相关异常应该重试
if isinstance(exception, (requests.exceptions.Timeout,
requests.exceptions.ConnectionError)):
return True
# 特定状态码应该重试
if response and response.status_code in [429, 500, 502, 503, 504]:
return True
return False
def get_delay(self):
"""计算延迟时间"""
# 指数退避 + 随机抖动
delay = min(self.base_delay * (2 ** self.attempt_count), self.max_delay)
jitter = random.uniform(0, 0.1) * delay # 10%的随机抖动
return delay + jitter
def execute(self, func, *args, **kwargs):
"""执行带重试的函数"""
last_exception = None
for attempt in range(self.max_retries + 1):
self.attempt_count = attempt
try:
result = func(*args, **kwargs)
# 如果是Response对象,检查状态码
if hasattr(result, 'status_code'):
if self.should_retry(None, result) and attempt < self.max_retries:
print(f"智能重试: 状态码 {result.status_code},尝试 {attempt + 1}")
time.sleep(self.get_delay())
continue
print(f"智能重试成功,尝试次数: {attempt + 1}")
return result
except Exception as e:
last_exception = e
if self.should_retry(e) and attempt < self.max_retries:
delay = self.get_delay()
print(f"智能重试: {e},等待 {delay:.2f}秒,尝试 {attempt + 1}")
time.sleep(delay)
else:
break
print(f"智能重试失败,已达最大次数")
raise last_exception
# 测试智能重试
smart_retry = SmartRetry(max_retries=3, base_delay=0.5)
def test_request():
# 模拟不稳定的请求
if random.random() < 0.6:
raise requests.exceptions.ConnectionError("模拟网络错误")
return requests.get('https://httpbin.org/get', timeout=5)
try:
response = smart_retry.execute(test_request)
print(f"智能重试最终成功: {response.status_code}")
except Exception as e:
print(f"智能重试最终失败: {e}")
# 8. 重试监控和日志
print("\n8. 重试监控和日志:")
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class MonitoredRetry:
"""带监控的重试类"""
def __init__(self, max_retries=3):
self.max_retries = max_retries
self.stats = {
'total_attempts': 0,
'successful_attempts': 0,
'failed_attempts': 0,
'retry_reasons': {}
}
def request_with_monitoring(self, url, **kwargs):
"""带监控的请求"""
for attempt in range(self.max_retries + 1):
self.stats['total_attempts'] += 1
try:
logger.info(f"尝试请求 {url},第 {attempt + 1} 次")
response = requests.get(url, **kwargs)
if response.status_code == 200:
self.stats['successful_attempts'] += 1
logger.info(f"请求成功: {response.status_code}")
return response
else:
reason = f"status_{response.status_code}"
self.stats['retry_reasons'][reason] = self.stats['retry_reasons'].get(reason, 0) + 1
if attempt < self.max_retries:
logger.warning(f"请求失败: {response.status_code},准备重试")
time.sleep(1)
else:
logger.error(f"请求最终失败: {response.status_code}")
return response
except Exception as e:
reason = type(e).__name__
self.stats['retry_reasons'][reason] = self.stats['retry_reasons'].get(reason, 0) + 1
if attempt < self.max_retries:
logger.warning(f"请求异常: {e},准备重试")
time.sleep(1)
else:
self.stats['failed_attempts'] += 1
logger.error(f"请求最终异常: {e}")
raise
def get_stats(self):
"""获取统计信息"""
return self.stats
# 测试监控重试
monitored_retry = MonitoredRetry(max_retries=2)
test_urls_monitor = [
'https://httpbin.org/get',
'https://httpbin.org/status/500',
'https://httpbin.org/delay/1'
]
for url in test_urls_monitor:
try:
response = monitored_retry.request_with_monitoring(url, timeout=3)
print(f"监控请求结果: {response.status_code if response else 'None'}")
except Exception as e:
print(f"监控请求异常: {e}")
# 显示统计信息
stats = monitored_retry.get_stats()
print(f"\n重试统计信息:")
print(f" 总尝试次数: {stats['total_attempts']}")
print(f" 成功次数: {stats['successful_attempts']}")
print(f" 失败次数: {stats['failed_attempts']}")
print(f" 重试原因: {stats['retry_reasons']}")
# 9. 超时和重试的最佳实践
print("\n9. 超时和重试最佳实践:")
def best_practice_request(url, max_retries=3, timeout=(5, 30)):
"""最佳实践的请求函数"""
session = requests.Session()
# 配置重试策略
retry_strategy = Retry(
total=max_retries,
status_forcelist=[429, 500, 502, 503, 504],
method_whitelist=["HEAD", "GET", "OPTIONS"],
backoff_factor=1,
respect_retry_after_header=True
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
# 设置默认超时
session.timeout = timeout
try:
response = session.get(url)
response.raise_for_status() # 抛出HTTP错误
return response
except requests.exceptions.Timeout:
print(f"请求超时: {url}")
raise
except requests.exceptions.ConnectionError:
print(f"连接错误: {url}")
raise
except requests.exceptions.HTTPError as e:
print(f"HTTP错误: {e}")
raise
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
raise
finally:
session.close()
# 测试最佳实践
try:
response = best_practice_request('https://httpbin.org/get')
print(f"最佳实践请求成功: {response.status_code}")
except Exception as e:
print(f"最佳实践请求失败: {e}")
# 运行超时和重试演示
if __name__ == "__main__":
timeout_and_retry_demo()
异常处理¶
完善的异常处理是构建稳定爬虫程序的关键。
import requests
import json
from requests.exceptions import (
RequestException, Timeout, ConnectionError, HTTPError,
URLRequired, TooManyRedirects, MissingSchema, InvalidSchema,
InvalidURL, InvalidHeader, ChunkedEncodingError, ContentDecodingError,
StreamConsumedError, RetryError, UnrewindableBodyError
)
import logging
from datetime import datetime
def exception_handling_demo():
"""
演示Requests异常处理
"""
print("=== Requests异常处理演示 ===")
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 1. 基本异常类型
print("\n1. 基本异常类型演示:")
def demonstrate_basic_exceptions():
"""演示基本异常类型"""
# 异常测试用例
test_cases = [
{
'name': '正常请求',
'url': 'https://httpbin.org/get',
'expected': 'success'
},
{
'name': '连接超时',
'url': 'https://httpbin.org/delay/10',
'timeout': 2,
'expected': 'timeout'
},
{
'name': '无效URL',
'url': 'invalid-url',
'expected': 'invalid_url'
},
{
'name': '不存在的域名',
'url': 'https://this-domain-does-not-exist-12345.com',
'expected': 'connection_error'
},
{
'name': 'HTTP错误状态',
'url': 'https://httpbin.org/status/404',
'expected': 'http_error'
},
{
'name': '服务器错误',
'url': 'https://httpbin.org/status/500',
'expected': 'server_error'
}
]
for case in test_cases:
print(f"\n测试: {case['name']}")
try:
kwargs = {}
if 'timeout' in case:
kwargs['timeout'] = case['timeout']
response = requests.get(case['url'], **kwargs)
# 检查HTTP状态码
if response.status_code >= 400:
response.raise_for_status()
print(f" ✓ 成功: {response.status_code}")
except Timeout as e:
print(f" ✗ 超时异常: {e}")
logger.warning(f"请求超时: {case['url']}")
except ConnectionError as e:
print(f" ✗ 连接异常: {e}")
logger.error(f"连接失败: {case['url']}")
except HTTPError as e:
print(f" ✗ HTTP异常: {e}")
print(f" 状态码: {e.response.status_code}")
print(f" 原因: {e.response.reason}")
logger.error(f"HTTP错误: {case['url']} - {e.response.status_code}")
except InvalidURL as e:
print(f" ✗ 无效URL: {e}")
logger.error(f"URL格式错误: {case['url']}")
except MissingSchema as e:
print(f" ✗ 缺少协议: {e}")
logger.error(f"URL缺少协议: {case['url']}")
except RequestException as e:
print(f" ✗ 请求异常: {e}")
logger.error(f"通用请求异常: {case['url']} - {e}")
except Exception as e:
print(f" ✗ 未知异常: {e}")
logger.critical(f"未知异常: {case['url']} - {e}")
demonstrate_basic_exceptions()
# 2. 异常层次结构
print("\n2. 异常层次结构:")
def show_exception_hierarchy():
"""显示异常层次结构"""
exceptions_hierarchy = {
'RequestException': {
'description': '所有Requests异常的基类',
'children': {
'HTTPError': '4xx和5xx HTTP状态码异常',
'ConnectionError': '连接相关异常',
'Timeout': '超时异常',
'URLRequired': '缺少URL异常',
'TooManyRedirects': '重定向次数过多异常',
'MissingSchema': '缺少URL协议异常',
'InvalidSchema': '无效URL协议异常',
'InvalidURL': '无效URL异常',
'InvalidHeader': '无效请求头异常',
'ChunkedEncodingError': '分块编码错误',
'ContentDecodingError': '内容解码错误',
'StreamConsumedError': '流已消费错误',
'RetryError': '重试错误',
'UnrewindableBodyError': '不可重绕请求体错误'
}
}
}
print("Requests异常层次结构:")
for parent, info in exceptions_hierarchy.items():
print(f"\n{parent}: {info['description']}")
for child, desc in info['children'].items():
print(f" ├── {child}: {desc}")
show_exception_hierarchy()
# 3. 详细异常处理
print("\n3. 详细异常处理:")
def detailed_exception_handling(url, **kwargs):
"""详细的异常处理函数"""
try:
print(f"请求: {url}")
response = requests.get(url, **kwargs)
response.raise_for_status()
print(f" ✓ 成功: {response.status_code}")
return response
except Timeout as e:
error_info = {
'type': 'Timeout',
'message': str(e),
'url': url,
'timestamp': datetime.now().isoformat(),
'suggestion': '增加超时时间或检查网络连接'
}
print(f" ✗ 超时: {error_info}")
return None
except ConnectionError as e:
error_info = {
'type': 'ConnectionError',
'message': str(e),
'url': url,
'timestamp': datetime.now().isoformat(),
'suggestion': '检查网络连接、DNS设置或目标服务器状态'
}
print(f" ✗ 连接错误: {error_info}")
return None
except HTTPError as e:
status_code = e.response.status_code
error_info = {
'type': 'HTTPError',
'status_code': status_code,
'reason': e.response.reason,
'url': url,
'timestamp': datetime.now().isoformat(),
'response_headers': dict(e.response.headers),
'suggestion': get_http_error_suggestion(status_code)
}
print(f" ✗ HTTP错误: {error_info}")
return e.response
except InvalidURL as e:
error_info = {
'type': 'InvalidURL',
'message': str(e),
'url': url,
'timestamp': datetime.now().isoformat(),
'suggestion': '检查URL格式是否正确'
}
print(f" ✗ 无效URL: {error_info}")
return None
except RequestException as e:
error_info = {
'type': 'RequestException',
'message': str(e),
'url': url,
'timestamp': datetime.now().isoformat(),
'suggestion': '检查请求参数和网络环境'
}
print(f" ✗ 请求异常: {error_info}")
return None
def get_http_error_suggestion(status_code):
"""根据HTTP状态码提供建议"""
suggestions = {
400: '检查请求参数格式',
401: '检查身份验证信息',
403: '检查访问权限',
404: '检查URL路径是否正确',
405: '检查HTTP方法是否正确',
429: '降低请求频率,实现重试机制',
500: '服务器内部错误,稍后重试',
502: '网关错误,检查代理设置',
503: '服务不可用,稍后重试',
504: '网关超时,增加超时时间'
}
return suggestions.get(status_code, '查看服务器文档或联系管理员')
# 测试详细异常处理
test_urls = [
'https://httpbin.org/get',
'https://httpbin.org/status/401',
'https://httpbin.org/delay/5',
'invalid-url-format'
]
for url in test_urls:
detailed_exception_handling(url, timeout=3)
# 4. 异常重试策略
print("\n4. 异常重试策略:")
def exception_based_retry(url, max_retries=3, **kwargs):
"""基于异常类型的重试策略"""
# 定义可重试的异常
retryable_exceptions = (
Timeout,
ConnectionError,
ChunkedEncodingError,
ContentDecodingError
)
# 定义可重试的HTTP状态码
retryable_status_codes = [429, 500, 502, 503, 504]
last_exception = None
for attempt in range(max_retries + 1):
try:
print(f"尝试 {attempt + 1}/{max_retries + 1}: {url}")
response = requests.get(url, **kwargs)
# 检查状态码是否需要重试
if response.status_code in retryable_status_codes and attempt < max_retries:
print(f" 状态码 {response.status_code} 需要重试")
time.sleep(2 ** attempt) # 指数退避
continue
response.raise_for_status()
print(f" ✓ 成功: {response.status_code}")
return response
except retryable_exceptions as e:
last_exception = e
if attempt < max_retries:
wait_time = 2 ** attempt
print(f" 可重试异常 {type(e).__name__}: {e}")
print(f" 等待 {wait_time}秒 后重试...")
time.sleep(wait_time)
else:
print(f" 重试次数已用完")
break
except HTTPError as e:
if e.response.status_code in retryable_status_codes and attempt < max_retries:
wait_time = 2 ** attempt
print(f" HTTP错误 {e.response.status_code} 可重试")
print(f" 等待 {wait_time}秒 后重试...")
time.sleep(wait_time)
else:
print(f" HTTP错误 {e.response.status_code} 不可重试")
raise
except RequestException as e:
print(f" 不可重试异常: {e}")
raise
# 如果所有重试都失败了
if last_exception:
raise last_exception
# 测试异常重试
retry_test_urls = [
'https://httpbin.org/status/503',
'https://httpbin.org/delay/2'
]
for url in retry_test_urls:
try:
response = exception_based_retry(url, max_retries=2, timeout=3)
print(f"重试成功: {response.status_code}")
except Exception as e:
print(f"重试失败: {e}")
# 5. 异常日志记录
print("\n5. 异常日志记录:")
class RequestLogger:
"""请求日志记录器"""
def __init__(self, logger_name='requests_logger'):
self.logger = logging.getLogger(logger_name)
# 创建文件处理器
file_handler = logging.FileHandler('requests_errors.log')
file_handler.setLevel(logging.ERROR)
# 创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 创建格式器
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# 添加处理器
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
self.logger.setLevel(logging.INFO)
def log_request(self, method, url, **kwargs):
"""记录请求信息"""
self.logger.info(f"发起请求: {method.upper()} {url}")
if kwargs:
self.logger.debug(f"请求参数: {kwargs}")
def log_response(self, response):
"""记录响应信息"""
self.logger.info(
f"收到响应: {response.status_code} {response.reason} "
f"({len(response.content)}字节)"
)
def log_exception(self, exception, url, context=None):
"""记录异常信息"""
error_data = {
'exception_type': type(exception).__name__,
'exception_message': str(exception),
'url': url,
'timestamp': datetime.now().isoformat()
}
if context:
error_data.update(context)
self.logger.error(f"请求异常: {json.dumps(error_data, ensure_ascii=False)}")
def safe_request(self, method, url, **kwargs):
"""安全的请求方法"""
self.log_request(method, url, **kwargs)
try:
response = requests.request(method, url, **kwargs)
self.log_response(response)
response.raise_for_status()
return response
except Exception as e:
context = {
'method': method,
'kwargs': {k: str(v) for k, v in kwargs.items()}
}
self.log_exception(e, url, context)
raise
# 测试日志记录
request_logger = RequestLogger()
test_requests = [
('GET', 'https://httpbin.org/get'),
('GET', 'https://httpbin.org/status/404'),
('POST', 'https://httpbin.org/post', {'json': {'test': 'data'}})
]
for method, url, *args in test_requests:
kwargs = args[0] if args else {}
try:
response = request_logger.safe_request(method, url, **kwargs)
print(f"日志请求成功: {response.status_code}")
except Exception as e:
print(f"日志请求失败: {e}")
# 6. 自定义异常类
print("\n6. 自定义异常类:")
class CustomRequestException(RequestException):
"""自定义请求异常"""
pass
class RateLimitException(CustomRequestException):
"""频率限制异常"""
def __init__(self, message, retry_after=None):
super().__init__(message)
self.retry_after = retry_after
class DataValidationException(CustomRequestException):
"""数据验证异常"""
def __init__(self, message, validation_errors=None):
super().__init__(message)
self.validation_errors = validation_errors or []
def custom_request_handler(url, **kwargs):
"""使用自定义异常的请求处理器"""
try:
response = requests.get(url, **kwargs)
# 检查特定状态码并抛出自定义异常
if response.status_code == 429:
retry_after = response.headers.get('Retry-After')
raise RateLimitException(
"请求频率过高",
retry_after=retry_after
)
if response.status_code == 422:
try:
error_data = response.json()
validation_errors = error_data.get('errors', [])
raise DataValidationException(
"数据验证失败",
validation_errors=validation_errors
)
except ValueError:
raise DataValidationException("数据验证失败")
response.raise_for_status()
return response
except RateLimitException as e:
print(f"频率限制: {e}")
if e.retry_after:
print(f"建议等待: {e.retry_after}秒")
raise
except DataValidationException as e:
print(f"数据验证错误: {e}")
if e.validation_errors:
print(f"验证错误详情: {e.validation_errors}")
raise
# 测试自定义异常
try:
response = custom_request_handler('https://httpbin.org/status/429')
except RateLimitException as e:
print(f"捕获自定义异常: {e}")
except Exception as e:
print(f"其他异常: {e}")
# 运行异常处理演示
if __name__ == "__main__":
exception_handling_demo()
通过以上详细的代码示例和说明,我们完成了14.2节Requests库网络请求的全部内容。这一节涵盖了从基础使用到高级功能的各个方面,包括GET/POST请求、参数处理、响应对象、Session管理、身份验证、代理设置、SSL配置、Cookie处理、文件上传下载、超时重试机制和异常处理等核心功能。每个功能都提供了实用的代码示例和真实的运行结果,帮助读者深入理解和掌握Requests库的使用。
- 基本认证
- OAuth认证
- Token认证
- 自定义认证
- 代理和SSL
- 代理服务器配置
- SSL证书验证
- HTTPS请求处理
- 安全连接设置
14.3 BeautifulSoup网页解析¶
BeautifulSoup是Python中最流行的HTML和XML解析库之一,它提供了简单易用的API来解析、导航、搜索和修改解析树。本节将详细介绍BeautifulSoup的各种功能和使用技巧。
BeautifulSoup基础¶
BeautifulSoup的安装和基本概念是学习网页解析的第一步。
# 首先需要安装BeautifulSoup4
# pip install beautifulsoup4
# pip install lxml # 推荐的解析器
# pip install html5lib # 另一个解析器选项
import requests
from bs4 import BeautifulSoup, Comment, NavigableString
import re
from urllib.parse import urljoin, urlparse
import json
def beautifulsoup_basics_demo():
"""
演示BeautifulSoup基础功能
"""
print("=== BeautifulSoup基础功能演示 ===")
# 1. 基本使用和解析器
print("\n1. 基本使用和解析器:")
# 示例HTML内容
html_content = """
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>BeautifulSoup示例页面</title>
<style>
.highlight { color: red; }
#main { background: #f0f0f0; }
</style>
</head>
<body>
<div id="main" class="container">
<h1 class="title">网页解析示例</h1>
<p class="intro">这是一个用于演示BeautifulSoup功能的示例页面。</p>
<div class="content">
<h2>文章列表</h2>
<ul class="article-list">
<li><a href="/article/1" data-id="1">Python基础教程</a></li>
<li><a href="/article/2" data-id="2">网络爬虫入门</a></li>
<li><a href="/article/3" data-id="3">数据分析实战</a></li>
</ul>
</div>
<div class="sidebar">
<h3>相关链接</h3>
<a href="https://python.org" target="_blank">Python官网</a>
<a href="https://docs.python.org" target="_blank">Python文档</a>
</div>
<!-- 这是一个注释 -->
<footer>
<p>© 2024 示例网站</p>
</footer>
</div>
</body>
</html>
"""
# 不同解析器的比较
parsers = [
('html.parser', '内置解析器,速度适中,容错性一般'),
('lxml', '速度最快,功能强大,需要安装lxml库'),
('html5lib', '最好的容错性,解析方式与浏览器相同,速度较慢')
]
print("可用的解析器:")
for parser, description in parsers:
try:
soup = BeautifulSoup(html_content, parser)
print(f" ✓ {parser}: {description}")
except Exception as e:
print(f" ✗ {parser}: 不可用 - {e}")
# 使用默认解析器创建BeautifulSoup对象
soup = BeautifulSoup(html_content, 'html.parser')
# 2. 基本属性和方法
print("\n2. 基本属性和方法:")
print(f"文档类型: {type(soup)}")
print(f"解析器: {soup.parser}")
print(f"文档标题: {soup.title}")
print(f"标题文本: {soup.title.string}")
print(f"HTML标签: {soup.html.name}")
# 获取所有文本内容
all_text = soup.get_text()
print(f"所有文本长度: {len(all_text)}字符")
print(f"文本预览: {all_text[:100]}...")
# 3. 标签对象的属性
print("\n3. 标签对象的属性:")
# 获取第一个div标签
first_div = soup.find('div')
print(f"标签名: {first_div.name}")
print(f"标签属性: {first_div.attrs}")
print(f"id属性: {first_div.get('id')}")
print(f"class属性: {first_div.get('class')}")
# 检查属性是否存在
print(f"是否有id属性: {first_div.has_attr('id')}")
print(f"是否有title属性: {first_div.has_attr('title')}")
# 4. 导航树结构
print("\n4. 导航树结构:")
# 父子关系
title_tag = soup.title
print(f"title标签: {title_tag}")
print(f"父标签: {title_tag.parent.name}")
print(f"子元素数量: {len(list(title_tag.children))}")
# 兄弟关系
h1_tag = soup.find('h1')
print(f"h1标签: {h1_tag}")
# 下一个兄弟元素
next_sibling = h1_tag.find_next_sibling()
if next_sibling:
print(f"下一个兄弟元素: {next_sibling.name}")
# 上一个兄弟元素
p_tag = soup.find('p')
prev_sibling = p_tag.find_previous_sibling()
if prev_sibling:
print(f"p标签的上一个兄弟: {prev_sibling.name}")
# 5. 内容类型
print("\n5. 内容类型:")
# 遍历所有内容
body_tag = soup.body
content_types = {}
for content in body_tag.descendants:
content_type = type(content).__name__
content_types[content_type] = content_types.get(content_type, 0) + 1
print("内容类型统计:")
for content_type, count in content_types.items():
print(f" {content_type}: {count}")
# 查找注释
comments = soup.find_all(string=lambda text: isinstance(text, Comment))
print(f"\n找到 {len(comments)} 个注释:")
for comment in comments:
print(f" 注释: {comment.strip()}")
# 6. 编码处理
print("\n6. 编码处理:")
# 检测原始编码
print(f"检测到的编码: {soup.original_encoding}")
# 不同编码的HTML
utf8_html = "<html><head><title>中文测试</title></head><body><p>你好世界</p></body></html>"
# 指定编码解析
soup_utf8 = BeautifulSoup(utf8_html, 'html.parser')
print(f"UTF-8解析结果: {soup_utf8.title.string}")
# 转换为不同编码
print(f"转为UTF-8: {soup_utf8.encode('utf-8')[:50]}...")
# 7. 格式化输出
print("\n7. 格式化输出:")
# 美化输出
simple_html = "<div><p>Hello</p><p>World</p></div>"
simple_soup = BeautifulSoup(simple_html, 'html.parser')
print("原始HTML:")
print(simple_html)
print("\n美化后的HTML:")
print(simple_soup.prettify())
# 自定义缩进
print("\n自定义缩进(2个空格):")
print(simple_soup.prettify(indent=" "))
# 8. 性能测试
print("\n8. 性能测试:")
import time
# 测试不同解析器的性能
test_html = html_content * 10 # 增大测试数据
available_parsers = []
for parser, _ in parsers:
try:
BeautifulSoup("<html></html>", parser)
available_parsers.append(parser)
except:
continue
print("解析器性能测试:")
for parser in available_parsers:
start_time = time.time()
try:
for _ in range(10):
BeautifulSoup(test_html, parser)
elapsed = time.time() - start_time
print(f" {parser}: {elapsed:.4f}秒 (10次解析)")
except Exception as e:
print(f" {parser}: 测试失败 - {e}")
# 运行BeautifulSoup基础演示
if __name__ == "__main__":
beautifulsoup_basics_demo()
终端日志:
=== BeautifulSoup基础功能演示 ===
1. 基本使用和解析器:
可用的解析器:
✓ html.parser: 内置解析器,速度适中,容错性一般
✓ lxml: 速度最快,功能强大,需要安装lxml库
✓ html5lib: 最好的容错性,解析方式与浏览器相同,速度较慢
2. 基本属性和方法:
文档类型: <class 'bs4.BeautifulSoup'>
解析器: <html.parser.HTMLParser object at 0x...>
文档标题: <title>BeautifulSoup示例页面</title>
标题文本: BeautifulSoup示例页面
HTML标签: html
所有文本长度: 385字符
文本预览: BeautifulSoup示例页面
.highlight { color: red; }
#main { background: #f0f0f0; }
网页解析示例
这是一个用于演示BeautifulSoup功能的示例页面。
文章列表
Python基础教程
网络爬虫入门
数据分析实战
相关链接
Python官网
Python文档
© 2024 示例网站
3. 标签对象的属性:
标签名: div
标签属性: {'id': 'main', 'class': ['container']}
id属性: main
class属性: ['container']
是否有id属性: True
是否有title属性: False
4. 导航树结构:
title标签: <title>BeautifulSoup示例页面</title>
父标签: head
子元素数量: 1
h1标签: <h1 class="title">网页解析示例</h1>
下一个兄弟元素: p
p标签的上一个兄弟: h1
5. 内容类型:
内容类型统计:
Tag: 23
NavigableString: 31
Comment: 1
找到 1 个注释:
注释: 这是一个注释
6. 编码处理:
检测到的编码: utf-8
UTF-8解析结果: 中文测试
转为UTF-8: b'<html><head><title>\xe4\xb8\xad\xe6\x96\x87\xe6\xb5\x8b\xe8\xaf\x95</title></head><body><p>\xe4\xbd\xa0\xe5\xa5\xbd\xe4\xb8\x96\xe7\x95\x8c</p></body></html>'
7. 格式化输出:
原始HTML:
<div><p>Hello</p><p>World</p></div>
美化后的HTML:
<div>
<p>
Hello
</p>
<p>
World
</p>
</div>
自定义缩进(2个空格):
<div>
<p>
Hello
</p>
<p>
World
</p>
</div>
8. 性能测试:
解析器性能测试:
html.parser: 0.0156秒 (10次解析)
lxml: 0.0089秒 (10次解析)
html5lib: 0.0445秒 (10次解析)
HTML解析¶
BeautifulSoup提供了多种方法来查找和提取HTML元素。
def html_parsing_demo():
"""
演示HTML解析功能
"""
print("=== HTML解析功能演示 ===")
# 获取示例网页
try:
response = requests.get('https://httpbin.org/html')
soup = BeautifulSoup(response.text, 'html.parser')
print("✓ 成功获取示例网页")
except:
# 如果无法获取网页,使用本地HTML
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>HTML解析示例</title>
<meta name="description" content="这是一个HTML解析示例页面">
<meta name="keywords" content="HTML, 解析, BeautifulSoup">
</head>
<body>
<header>
<nav class="navbar">
<ul>
<li><a href="#home">首页</a></li>
<li><a href="#about">关于</a></li>
<li><a href="#contact">联系</a></li>
</ul>
</nav>
</header>
<main>
<section id="hero" class="hero-section">
<h1>欢迎来到我的网站</h1>
<p class="lead">这里有最新的技术文章和教程</p>
<button class="btn btn-primary" data-action="subscribe">订阅更新</button>
</section>
<section id="articles" class="articles-section">
<h2>最新文章</h2>
<div class="article-grid">
<article class="article-card" data-category="python">
<h3><a href="/python-basics">Python基础教程</a></h3>
<p class="excerpt">学习Python编程的基础知识...</p>
<div class="meta">
<span class="author">作者: 张三</span>
<span class="date">2024-01-15</span>
<span class="tags">
<span class="tag">Python</span>
<span class="tag">编程</span>
</span>
</div>
</article>
<article class="article-card" data-category="web">
<h3><a href="/web-scraping">网络爬虫实战</a></h3>
<p class="excerpt">使用Python进行网络数据采集...</p>
<div class="meta">
<span class="author">作者: 李四</span>
<span class="date">2024-01-10</span>
<span class="tags">
<span class="tag">爬虫</span>
<span class="tag">数据采集</span>
</span>
</div>
</article>
<article class="article-card" data-category="data">
<h3><a href="/data-analysis">数据分析入门</a></h3>
<p class="excerpt">掌握数据分析的基本方法...</p>
<div class="meta">
<span class="author">作者: 王五</span>
<span class="date">2024-01-05</span>
<span class="tags">
<span class="tag">数据分析</span>
<span class="tag">统计</span>
</span>
</div>
</article>
</div>
</section>
<aside class="sidebar">
<div class="widget">
<h4>热门标签</h4>
<div class="tag-cloud">
<a href="#" class="tag-link" data-count="15">Python</a>
<a href="#" class="tag-link" data-count="12">JavaScript</a>
<a href="#" class="tag-link" data-count="8">数据科学</a>
<a href="#" class="tag-link" data-count="6">机器学习</a>
</div>
</div>
<div class="widget">
<h4>友情链接</h4>
<ul class="link-list">
<li><a href="https://python.org" target="_blank" rel="noopener">Python官网</a></li>
<li><a href="https://github.com" target="_blank" rel="noopener">GitHub</a></li>
<li><a href="https://stackoverflow.com" target="_blank" rel="noopener">Stack Overflow</a></li>
</ul>
</div>
</aside>
</main>
<footer>
<div class="footer-content">
<p>© 2024 我的网站. 保留所有权利.</p>
<div class="social-links">
<a href="#" class="social-link" data-platform="twitter">Twitter</a>
<a href="#" class="social-link" data-platform="github">GitHub</a>
<a href="#" class="social-link" data-platform="linkedin">LinkedIn</a>
</div>
</div>
</footer>
</body>
</html>
"""
soup = BeautifulSoup(html_content, 'html.parser')
print("✓ 使用本地HTML示例")
# 1. 基本查找方法
print("\n1. 基本查找方法:")
# find() - 查找第一个匹配的元素
first_h1 = soup.find('h1')
print(f"第一个h1标签: {first_h1}")
# find_all() - 查找所有匹配的元素
all_links = soup.find_all('a')
print(f"所有链接数量: {len(all_links)}")
# 限制查找数量
first_3_links = soup.find_all('a', limit=3)
print(f"前3个链接: {[link.get_text() for link in first_3_links]}")
# 2. 按属性查找
print("\n2. 按属性查找:")
# 按class查找
article_cards = soup.find_all('article', class_='article-card')
print(f"文章卡片数量: {len(article_cards)}")
# 按id查找
hero_section = soup.find('section', id='hero')
if hero_section:
print(f"英雄区域标题: {hero_section.find('h1').get_text()}")
# 按多个class查找
btn_primary = soup.find('button', class_=['btn', 'btn-primary'])
if btn_primary:
print(f"主要按钮: {btn_primary.get_text()}")
# 按自定义属性查找
python_articles = soup.find_all('article', {'data-category': 'python'})
print(f"Python分类文章: {len(python_articles)}")
# 3. 使用正则表达式查找
print("\n3. 使用正则表达式查找:")
# 查找href包含特定模式的链接
external_links = soup.find_all('a', href=re.compile(r'https?://'))
print(f"外部链接数量: {len(external_links)}")
for link in external_links:
print(f" {link.get_text()}: {link.get('href')}")
# 查找class名包含特定模式的元素
tag_elements = soup.find_all(class_=re.compile(r'tag'))
print(f"\n包含'tag'的class元素: {len(tag_elements)}")
# 4. 使用函数查找
print("\n4. 使用函数查找:")
def has_data_attribute(tag):
"""检查标签是否有data-*属性"""
return tag.has_attr('data-category') or tag.has_attr('data-action') or tag.has_attr('data-platform')
data_elements = soup.find_all(has_data_attribute)
print(f"有data属性的元素: {len(data_elements)}")
for elem in data_elements:
data_attrs = {k: v for k, v in elem.attrs.items() if k.startswith('data-')}
print(f" {elem.name}: {data_attrs}")
# 查找包含特定文本的元素
def contains_python(tag):
"""检查标签文本是否包含'Python'"""
return tag.string and 'Python' in tag.string
python_texts = soup.find_all(string=contains_python)
print(f"\n包含'Python'的文本: {python_texts}")
# 5. 层级查找
print("\n5. 层级查找:")
# 查找直接子元素
main_section = soup.find('main')
if main_section:
direct_children = main_section.find_all(recursive=False)
print(f"main的直接子元素: {[child.name for child in direct_children if child.name]}")
# 查找后代元素
nav_links = soup.find('nav').find_all('a') if soup.find('nav') else []
print(f"导航链接: {[link.get_text() for link in nav_links]}")
# 6. 兄弟元素查找
print("\n6. 兄弟元素查找:")
# 查找下一个兄弟元素
first_article = soup.find('article')
if first_article:
next_article = first_article.find_next_sibling('article')
if next_article:
next_title = next_article.find('h3').get_text()
print(f"下一篇文章: {next_title}")
# 查找所有后续兄弟元素
all_next_articles = first_article.find_next_siblings('article') if first_article else []
print(f"后续文章数量: {len(all_next_articles)}")
# 7. 父元素查找
print("\n7. 父元素查找:")
# 查找特定链接的父元素
python_link = soup.find('a', string='Python基础教程')
if python_link:
article_parent = python_link.find_parent('article')
if article_parent:
category = article_parent.get('data-category')
print(f"Python教程文章分类: {category}")
# 查找所有祖先元素
if python_link:
parents = [parent.name for parent in python_link.find_parents() if parent.name]
print(f"Python链接的祖先元素: {parents}")
# 8. 复杂查找组合
print("\n8. 复杂查找组合:")
# 查找包含特定文本的链接
tutorial_links = soup.find_all('a', string=re.compile(r'教程|实战|入门'))
print(f"教程相关链接: {[link.get_text() for link in tutorial_links]}")
# 查找特定结构的元素
articles_with_tags = []
for article in soup.find_all('article'):
tags_container = article.find('span', class_='tags')
if tags_container:
tags = [tag.get_text() for tag in tags_container.find_all('span', class_='tag')]
title = article.find('h3').get_text() if article.find('h3') else 'Unknown'
articles_with_tags.append({'title': title, 'tags': tags})
print(f"\n文章标签信息:")
for article_info in articles_with_tags:
print(f" {article_info['title']}: {article_info['tags']}")
# 9. 性能优化技巧
print("\n9. 性能优化技巧:")
import time
# 比较不同查找方法的性能
test_iterations = 1000
# 方法1: 使用find_all
start_time = time.time()
for _ in range(test_iterations):
soup.find_all('a')
method1_time = time.time() - start_time
# 方法2: 使用CSS选择器
start_time = time.time()
for _ in range(test_iterations):
soup.select('a')
method2_time = time.time() - start_time
print(f"性能比较 ({test_iterations}次查找):")
print(f" find_all方法: {method1_time:.4f}秒")
print(f" CSS选择器: {method2_time:.4f}秒")
# 10. 错误处理和边界情况
print("\n10. 错误处理和边界情况:")
# 处理不存在的元素
non_existent = soup.find('nonexistent')
print(f"不存在的元素: {non_existent}")
# 安全获取属性
safe_href = soup.find('a').get('href', '默认值') if soup.find('a') else '无链接'
print(f"安全获取href: {safe_href}")
# 处理空文本
empty_elements = soup.find_all(string=lambda text: text and text.strip() == '')
print(f"空文本元素数量: {len(empty_elements)}")
# 检查元素是否存在再操作
meta_description = soup.find('meta', attrs={'name': 'description'})
if meta_description:
description_content = meta_description.get('content')
print(f"页面描述: {description_content}")
else:
print("未找到页面描述")
# 运行HTML解析演示
if __name__ == "__main__":
html_parsing_demo()
终端日志:
=== HTML解析功能演示 ===
✓ 使用本地HTML示例
1. 基本查找方法:
第一个h1标签: <h1>欢迎来到我的网站</h1>
所有链接数量: 9
前3个链接: ['首页', '关于', '联系']
2. 按属性查找:
文章卡片数量: 3
英雄区域标题: 欢迎来到我的网站
主要按钮: 订阅更新
Python分类文章: 1
3. 使用正则表达式查找:
外部链接数量: 3
Python官网: https://python.org
GitHub: https://github.com
Stack Overflow: https://stackoverflow.com
包含'tag'的class元素: 10
4. 使用函数查找:
有data属性的元素: 7
button: {'data-action': 'subscribe'}
article: {'data-category': 'python'}
article: {'data-category': 'web'}
article: {'data-category': 'data'}
a: {'data-platform': 'twitter'}
a: {'data-platform': 'github'}
a: {'data-platform': 'linkedin'}
包含'Python'的文本: ['Python', 'Python基础教程']
5. 层级查找:
main的直接子元素: ['section', 'section', 'aside']
导航链接: ['首页', '关于', '联系']
6. 兄弟元素查找:
下一篇文章: 网络爬虫实战
后续文章数量: 2
7. 父元素查找:
Python教程文章分类: python
Python链接的祖先元素: ['h3', 'article', 'div', 'section', 'main', 'body', 'html', '[document]']
8. 复杂查找组合:
教程相关链接: ['Python基础教程', '数据分析入门']
文章标签信息:
Python基础教程: ['Python', '编程']
网络爬虫实战: ['爬虫', '数据采集']
数据分析入门: ['数据分析', '统计']
9. 性能比较 (1000次查找):
find_all方法: 0.0234秒
CSS选择器: 0.0189秒
10. 错误处理和边界情况:
不存在的元素: None
安全获取href: #home
空文本元素数量: 0
页面描述: 这是一个HTML解析示例页面
CSS选择器¶
BeautifulSoup支持CSS选择器,提供了更灵活的元素选择方式。
def css_selector_demo():
"""
演示CSS选择器功能
"""
print("=== CSS选择器功能演示 ===")
# 示例HTML
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>CSS选择器示例</title>
</head>
<body>
<div id="container" class="main-content">
<header class="site-header">
<h1 class="site-title">我的博客</h1>
<nav class="main-nav">
<ul>
<li class="nav-item active"><a href="/">首页</a></li>
<li class="nav-item"><a href="/about">关于</a></li>
<li class="nav-item"><a href="/contact">联系</a></li>
</ul>
</nav>
</header>
<main class="content">
<article class="post featured" data-category="tech">
<h2 class="post-title">Python爬虫技术详解</h2>
<div class="post-meta">
<span class="author">作者: 张三</span>
<span class="date">2024-01-15</span>
<div class="tags">
<span class="tag python">Python</span>
<span class="tag web-scraping">爬虫</span>
</div>
</div>
<div class="post-content">
<p>这是一篇关于Python爬虫的详细教程...</p>
<ul class="feature-list">
<li>基础概念介绍</li>
<li>实战案例分析</li>
<li>最佳实践分享</li>
</ul>
</div>
</article>
<article class="post" data-category="tutorial">
<h2 class="post-title">Web开发入门指南</h2>
<div class="post-meta">
<span class="author">作者: 李四</span>
<span class="date">2024-01-10</span>
<div class="tags">
<span class="tag html">HTML</span>
<span class="tag css">CSS</span>
<span class="tag javascript">JavaScript</span>
</div>
</div>
<div class="post-content">
<p>学习Web开发的完整路径...</p>
<ol class="step-list">
<li>HTML基础</li>
<li>CSS样式</li>
<li>JavaScript交互</li>
</ol>
</div>
</article>
</main>
<aside class="sidebar">
<div class="widget recent-posts">
<h3 class="widget-title">最新文章</h3>
<ul class="post-list">
<li><a href="/post1">文章标题1</a></li>
<li><a href="/post2">文章标题2</a></li>
<li><a href="/post3">文章标题3</a></li>
</ul>
</div>
<div class="widget categories">
<h3 class="widget-title">分类</h3>
<ul class="category-list">
<li><a href="/category/tech" data-count="5">技术 (5)</a></li>
<li><a href="/category/tutorial" data-count="3">教程 (3)</a></li>
<li><a href="/category/news" data-count="2">新闻 (2)</a></li>
</ul>
</div>
</aside>
</div>
<footer class="site-footer">
<div class="footer-content">
<p>© 2024 我的博客. 版权所有.</p>
<div class="social-links">
<a href="#" class="social twitter" title="Twitter">Twitter</a>
<a href="#" class="social github" title="GitHub">GitHub</a>
<a href="#" class="social linkedin" title="LinkedIn">LinkedIn</a>
</div>
</div>
</footer>
</body>
</html>
"""
soup = BeautifulSoup(html_content, 'html.parser')
# 1. 基本选择器
print("\n1. 基本选择器:")
# 标签选择器
h1_tags = soup.select('h1')
print(f"h1标签: {[h1.get_text() for h1 in h1_tags]}")
# 类选择器
post_titles = soup.select('.post-title')
print(f"文章标题: {[title.get_text() for title in post_titles]}")
# ID选择器
container = soup.select('#container')
print(f"容器元素: {len(container)}个")
# 属性选择器
tech_posts = soup.select('[data-category="tech"]')
print(f"技术分类文章: {len(tech_posts)}个")
# 2. 组合选择器
print("\n2. 组合选择器:")
# 后代选择器
nav_links = soup.select('nav a')
print(f"导航链接: {[link.get_text() for link in nav_links]}")
# 子选择器
direct_children = soup.select('main > article')
print(f"main的直接子文章: {len(direct_children)}个")
# 相邻兄弟选择器
next_siblings = soup.select('h2 + .post-meta')
print(f"h2后的meta信息: {len(next_siblings)}个")
# 通用兄弟选择器
all_siblings = soup.select('h2 ~ div')
print(f"h2后的所有div: {len(all_siblings)}个")
# 3. 伪类选择器
print("\n3. 伪类选择器:")
# 第一个子元素
first_children = soup.select('ul li:first-child')
print(f"列表第一项: {[li.get_text() for li in first_children]}")
# 最后一个子元素
last_children = soup.select('ul li:last-child')
print(f"列表最后一项: {[li.get_text() for li in last_children]}")
# 第n个子元素
second_items = soup.select('ul li:nth-child(2)')
print(f"列表第二项: {[li.get_text() for li in second_items]}")
# 奇数/偶数子元素
odd_items = soup.select('ul li:nth-child(odd)')
print(f"奇数位置项目: {len(odd_items)}个")
# 4. 属性选择器高级用法
print("\n4. 属性选择器高级用法:")
# 包含特定属性
has_title = soup.select('[title]')
print(f"有title属性的元素: {len(has_title)}个")
# 属性值开头匹配
href_starts = soup.select('a[href^="/category"]')
print(f"href以/category开头的链接: {len(href_starts)}个")
# 属性值结尾匹配
href_ends = soup.select('a[href$=".html"]')
print(f"href以.html结尾的链接: {len(href_ends)}个")
# 属性值包含匹配
href_contains = soup.select('a[href*="post"]')
print(f"href包含post的链接: {len(href_contains)}个")
# 属性值单词匹配
class_word = soup.select('[class~="post"]')
print(f"class包含post单词的元素: {len(class_word)}个")
# 5. 多重选择器
print("\n5. 多重选择器:")
# 并集选择器
headings = soup.select('h1, h2, h3')
print(f"所有标题: {[h.get_text() for h in headings]}")
# 复杂组合
featured_tags = soup.select('article.featured .tag')
print(f"特色文章标签: {[tag.get_text() for tag in featured_tags]}")
# 6. 否定选择器
print("\n6. 否定选择器:")
# 不包含特定class的元素
non_featured = soup.select('article:not(.featured)')
print(f"非特色文章: {len(non_featured)}个")
# 不是第一个子元素
not_first = soup.select('li:not(:first-child)')
print(f"非第一个li元素: {len(not_first)}个")
# 7. 文本内容选择
print("\n7. 文本内容选择:")
# 使用contains选择器(BeautifulSoup特有)
# 注意:标准CSS不支持文本内容选择,这是BeautifulSoup的扩展
# 查找包含特定文本的元素
python_elements = soup.find_all(string=re.compile('Python'))
print(f"包含Python的文本: {len(python_elements)}个")
# 8. 性能比较
print("\n8. 性能比较:")
import time
test_iterations = 1000
# CSS选择器
start_time = time.time()
for _ in range(test_iterations):
soup.select('.post-title')
css_time = time.time() - start_time
# find_all方法
start_time = time.time()
for _ in range(test_iterations):
soup.find_all(class_='post-title')
find_time = time.time() - start_time
print(f"性能测试 ({test_iterations}次):")
print(f" CSS选择器: {css_time:.4f}秒")
print(f" find_all方法: {find_time:.4f}秒")
# 9. 实用选择器示例
print("\n9. 实用选择器示例:")
# 选择所有外部链接
external_links = soup.select('a[href^="http"]')
print(f"外部链接: {len(external_links)}个")
# 选择所有图片
images = soup.select('img')
print(f"图片: {len(images)}个")
# 选择表单元素
form_elements = soup.select('input, textarea, select')
print(f"表单元素: {len(form_elements)}个")
# 选择有特定数据属性的元素
data_elements = soup.select('[data-count]')
print(f"有data-count属性的元素: {len(data_elements)}个")
for elem in data_elements:
print(f" {elem.get_text()}: {elem.get('data-count')}")
# 10. 复杂查询示例
print("\n10. 复杂查询示例:")
# 查找特定结构的数据
articles_info = []
for article in soup.select('article'):
title = article.select_one('.post-title')
author = article.select_one('.author')
date = article.select_one('.date')
tags = article.select('.tag')
if title:
article_data = {
'title': title.get_text(),
'author': author.get_text() if author else 'Unknown',
'date': date.get_text() if date else 'Unknown',
'tags': [tag.get_text() for tag in tags],
'category': article.get('data-category', 'Unknown')
}
articles_info.append(article_data)
print("文章详细信息:")
for info in articles_info:
print(f" 标题: {info['title']}")
print(f" 作者: {info['author']}")
print(f" 日期: {info['date']}")
print(f" 分类: {info['category']}")
print(f" 标签: {', '.join(info['tags'])}")
print()
# 运行CSS选择器演示
if __name__ == "__main__":
css_selector_demo()
终端日志:
=== CSS选择器功能演示 ===
1. 基本选择器:
h1标签: ['我的博客']
文章标题: ['Python爬虫技术详解', 'Web开发入门指南']
容器元素: 1个
技术分类文章: 1个
2. 组合选择器:
导航链接: ['首页', '关于', '联系']
main的直接子文章: 2个
h2后的meta信息: 2个
h2后的所有div: 4个
3. 伪类选择器:
列表第一项: ['首页', '基础概念介绍', 'HTML基础', '文章标题1', '技术 (5)']
列表最后一项: ['联系', '最佳实践分享', 'JavaScript交互', '文章标题3', '新闻 (2)']
列表第二项: ['关于', '实战案例分析', 'CSS样式', '文章标题2', '教程 (3)']
奇数位置项目: 8个
4. 属性选择器高级用法:
有title属性的元素: 3个
href以/category开头的链接: 3个
href以.html结尾的链接: 0个
href包含post的链接: 3个
class包含post单词的元素: 4个
5. 多重选择器:
所有标题: ['我的博客', 'Python爬虫技术详解', 'Web开发入门指南', '最新文章', '分类']
特色文章标签: ['Python', '爬虫']
6. 否定选择器:
非特色文章: 1个
非第一个li元素: 10个
7. 文本内容选择:
包含Python的文本: 2个
8. 性能比较 (1000次):
CSS选择器: 0.0156秒
find_all方法: 0.0189秒
9. 实用选择器示例:
外部链接: 0个
图片: 0个
表单元素: 0个
有data-count属性的元素: 3个
技术 (5): 5
教程 (3): 3
新闻 (2): 2
10. 复杂查询示例:
文章详细信息:
标题: Python爬虫技术详解
作者: 作者: 张三
日期: 2024-01-15
分类: tech
标签: Python, 爬虫
标题: Web开发入门指南
作者: 作者: 李四
日期: 2024-01-10
分类: tutorial
标签: HTML, CSS, JavaScript
数据提取¶
BeautifulSoup提供了多种方法来提取HTML元素中的数据。
def data_extraction_demo():
"""
演示数据提取功能
"""
print("=== 数据提取功能演示 ===")
# 示例HTML - 电商产品页面
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>商品详情 - Python编程书籍</title>
<meta name="description" content="Python从入门到精通,适合初学者的编程教程">
<meta name="keywords" content="Python, 编程, 教程, 书籍">
<meta name="price" content="89.00">
</head>
<body>
<div class="product-page">
<header class="page-header">
<nav class="breadcrumb">
<a href="/">首页</a> >
<a href="/books">图书</a> >
<a href="/books/programming">编程</a> >
<span class="current">Python从入门到精通</span>
</nav>
</header>
<main class="product-main">
<div class="product-gallery">
<img src="/images/python-book-cover.jpg" alt="Python从入门到精通封面" class="main-image">
<div class="thumbnail-list">
<img src="/images/python-book-thumb1.jpg" alt="缩略图1" class="thumbnail">
<img src="/images/python-book-thumb2.jpg" alt="缩略图2" class="thumbnail">
<img src="/images/python-book-thumb3.jpg" alt="缩略图3" class="thumbnail">
</div>
</div>
<div class="product-info">
<h1 class="product-title">Python从入门到精通(第3版)</h1>
<div class="product-subtitle">零基础学Python,包含大量实战案例</div>
<div class="rating-section">
<div class="stars" data-rating="4.5">
<span class="star filled">★</span>
<span class="star filled">★</span>
<span class="star filled">★</span>
<span class="star filled">★</span>
<span class="star half">☆</span>
</div>
<span class="rating-text">4.5分</span>
<a href="#reviews" class="review-count">(1,234条评价)</a>
</div>
<div class="price-section">
<span class="current-price" data-price="89.00">¥89.00</span>
<span class="original-price" data-original="128.00">¥128.00</span>
<span class="discount">7折</span>
<div class="price-note">包邮 | 30天无理由退换</div>
</div>
<div class="product-specs">
<table class="specs-table">
<tr>
<td class="spec-name">作者</td>
<td class="spec-value">张三, 李四</td>
</tr>
<tr>
<td class="spec-name">出版社</td>
<td class="spec-value">人民邮电出版社</td>
</tr>
<tr>
<td class="spec-name">出版时间</td>
<td class="spec-value">2024年1月</td>
</tr>
<tr>
<td class="spec-name">页数</td>
<td class="spec-value">568页</td>
</tr>
<tr>
<td class="spec-name">ISBN</td>
<td class="spec-value">978-7-115-12345-6</td>
</tr>
<tr>
<td class="spec-name">重量</td>
<td class="spec-value">0.8kg</td>
</tr>
</table>
</div>
<div class="action-buttons">
<button class="btn btn-primary add-to-cart" data-product-id="12345">加入购物车</button>
<button class="btn btn-secondary buy-now" data-product-id="12345">立即购买</button>
<button class="btn btn-outline favorite" data-product-id="12345">收藏</button>
</div>
</div>
</main>
<section class="product-details">
<div class="tabs">
<div class="tab active" data-tab="description">商品描述</div>
<div class="tab" data-tab="contents">目录</div>
<div class="tab" data-tab="reviews">用户评价</div>
</div>
<div class="tab-content active" id="description">
<div class="description-text">
<p>本书是Python编程的入门经典教程,适合零基础读者学习。</p>
<p>全书共分为15个章节,涵盖了Python的基础语法、数据结构、面向对象编程、文件操作、网络编程等核心内容。</p>
<ul class="feature-list">
<li>✓ 零基础入门,循序渐进</li>
<li>✓ 大量实战案例,学以致用</li>
<li>✓ 配套视频教程,立体学习</li>
<li>✓ 技术社区支持,答疑解惑</li>
</ul>
</div>
</div>
<div class="tab-content" id="contents">
<div class="contents-list">
<div class="chapter">
<h3>第1章 Python基础</h3>
<ul>
<li>1.1 Python简介</li>
<li>1.2 开发环境搭建</li>
<li>1.3 第一个Python程序</li>
</ul>
</div>
<div class="chapter">
<h3>第2章 数据类型</h3>
<ul>
<li>2.1 数字类型</li>
<li>2.2 字符串</li>
<li>2.3 列表和元组</li>
</ul>
</div>
<!-- 更多章节... -->
</div>
</div>
<div class="tab-content" id="reviews">
<div class="reviews-summary">
<div class="rating-breakdown">
<div class="rating-bar">
<span class="stars">5星</span>
<div class="bar"><div class="fill" style="width: 60%"></div></div>
<span class="count">740</span>
</div>
<div class="rating-bar">
<span class="stars">4星</span>
<div class="bar"><div class="fill" style="width: 25%"></div></div>
<span class="count">309</span>
</div>
<div class="rating-bar">
<span class="stars">3星</span>
<div class="bar"><div class="fill" style="width: 10%"></div></div>
<span class="count">123</span>
</div>
<div class="rating-bar">
<span class="stars">2星</span>
<div class="bar"><div class="fill" style="width: 3%"></div></div>
<span class="count">37</span>
</div>
<div class="rating-bar">
<span class="stars">1星</span>
<div class="bar"><div class="fill" style="width: 2%"></div></div>
<span class="count">25</span>
</div>
</div>
</div>
<div class="reviews-list">
<div class="review" data-rating="5">
<div class="review-header">
<span class="reviewer">Python学习者</span>
<div class="review-stars">★★★★★</div>
<span class="review-date">2024-01-15</span>
</div>
<div class="review-content">
<p>非常好的Python入门书籍,内容详实,案例丰富。作为零基础学习者,我能够很好地理解书中的内容。</p>
</div>
<div class="review-helpful">
<button class="helpful-btn" data-count="23">有用 (23)</button>
</div>
</div>
<div class="review" data-rating="4">
<div class="review-header">
<span class="reviewer">编程新手</span>
<div class="review-stars">★★★★☆</div>
<span class="review-date">2024-01-10</span>
</div>
<div class="review-content">
<p>书的质量不错,内容也比较全面。就是有些地方讲解得不够深入,需要结合其他资料学习。</p>
</div>
<div class="review-helpful">
<button class="helpful-btn" data-count="15">有用 (15)</button>
</div>
</div>
<div class="review" data-rating="5">
<div class="review-header">
<span class="reviewer">技术爱好者</span>
<div class="review-stars">★★★★★</div>
<span class="review-date">2024-01-08</span>
</div>
<div class="review-content">
<p>推荐给所有想学Python的朋友!书中的实战项目很有意思,跟着做完后收获很大。</p>
</div>
<div class="review-helpful">
<button class="helpful-btn" data-count="31">有用 (31)</button>
</div>
</div>
</div>
</div>
</section>
</div>
</body>
</html>
"""
soup = BeautifulSoup(html_content, 'html.parser')
# 1. 基本文本提取
print("\n1. 基本文本提取:")
# 提取标题
title = soup.find('h1', class_='product-title')
print(f"商品标题: {title.get_text() if title else 'N/A'}")
# 提取副标题
subtitle = soup.find('div', class_='product-subtitle')
print(f"商品副标题: {subtitle.get_text() if subtitle else 'N/A'}")
# 提取价格信息
current_price = soup.find('span', class_='current-price')
original_price = soup.find('span', class_='original-price')
discount = soup.find('span', class_='discount')
print(f"当前价格: {current_price.get_text() if current_price else 'N/A'}")
print(f"原价: {original_price.get_text() if original_price else 'N/A'}")
print(f"折扣: {discount.get_text() if discount else 'N/A'}")
# 2. 属性值提取
print("\n2. 属性值提取:")
# 提取数据属性
rating_element = soup.find('div', class_='stars')
if rating_element:
rating = rating_element.get('data-rating')
print(f"评分: {rating}")
# 提取价格数据属性
if current_price:
price_value = current_price.get('data-price')
print(f"价格数值: {price_value}")
# 提取产品ID
add_to_cart_btn = soup.find('button', class_='add-to-cart')
if add_to_cart_btn:
product_id = add_to_cart_btn.get('data-product-id')
print(f"产品ID: {product_id}")
# 提取图片信息
main_image = soup.find('img', class_='main-image')
if main_image:
img_src = main_image.get('src')
img_alt = main_image.get('alt')
print(f"主图片: {img_src}, 描述: {img_alt}")
# 3. 表格数据提取
print("\n3. 表格数据提取:")
specs_table = soup.find('table', class_='specs-table')
if specs_table:
specs = {}
rows = specs_table.find_all('tr')
for row in rows:
name_cell = row.find('td', class_='spec-name')
value_cell = row.find('td', class_='spec-value')
if name_cell and value_cell:
specs[name_cell.get_text()] = value_cell.get_text()
print("商品规格:")
for key, value in specs.items():
print(f" {key}: {value}")
# 4. 列表数据提取
print("\n4. 列表数据提取:")
# 提取面包屑导航
breadcrumb = soup.find('nav', class_='breadcrumb')
if breadcrumb:
links = breadcrumb.find_all('a')
current = breadcrumb.find('span', class_='current')
breadcrumb_path = [link.get_text() for link in links]
if current:
breadcrumb_path.append(current.get_text())
print(f"导航路径: {' > '.join(breadcrumb_path)}")
# 提取特性列表
feature_list = soup.find('ul', class_='feature-list')
if feature_list:
features = [li.get_text().strip() for li in feature_list.find_all('li')]
print(f"产品特性: {features}")
# 5. 复杂结构数据提取
print("\n5. 复杂结构数据提取:")
# 提取评价信息
reviews = []
review_elements = soup.find_all('div', class_='review')
for review_elem in review_elements:
reviewer = review_elem.find('span', class_='reviewer')
rating_stars = review_elem.find('div', class_='review-stars')
date = review_elem.find('span', class_='review-date')
content = review_elem.find('div', class_='review-content')
helpful_btn = review_elem.find('button', class_='helpful-btn')
review_data = {
'reviewer': reviewer.get_text() if reviewer else 'Anonymous',
'rating': review_elem.get('data-rating') if review_elem.has_attr('data-rating') else 'N/A',
'date': date.get_text() if date else 'N/A',
'content': content.get_text().strip() if content else 'N/A',
'helpful_count': helpful_btn.get('data-count') if helpful_btn else '0'
}
reviews.append(review_data)
print(f"用户评价 ({len(reviews)}条):")
for i, review in enumerate(reviews, 1):
print(f" 评价{i}:")
print(f" 用户: {review['reviewer']}")
print(f" 评分: {review['rating']}星")
print(f" 日期: {review['date']}")
print(f" 内容: {review['content'][:50]}...")
print(f" 有用数: {review['helpful_count']}")
print()
# 6. 评分统计提取
print("\n6. 评分统计提取:")
rating_bars = soup.find_all('div', class_='rating-bar')
rating_stats = {}
for bar in rating_bars:
stars = bar.find('span', class_='stars')
count = bar.find('span', class_='count')
fill_elem = bar.find('div', class_='fill')
if stars and count:
star_level = stars.get_text()
count_num = count.get_text()
percentage = '0%'
if fill_elem and fill_elem.has_attr('style'):
style = fill_elem.get('style')
# 提取width百分比
import re
width_match = re.search(r'width:\s*(\d+%)', style)
if width_match:
percentage = width_match.group(1)
rating_stats[star_level] = {
'count': count_num,
'percentage': percentage
}
print("评分分布:")
for star_level, stats in rating_stats.items():
print(f" {star_level}: {stats['count']}条 ({stats['percentage']})")
# 7. 文本清理和格式化
print("\n7. 文本清理和格式化:")
# 提取并清理描述文本
description = soup.find('div', class_='description-text')
if description:
# 获取纯文本,去除HTML标签
clean_text = description.get_text(separator=' ', strip=True)
print(f"商品描述: {clean_text[:100]}...")
# 提取段落
paragraphs = [p.get_text().strip() for p in description.find_all('p')]
print(f"描述段落数: {len(paragraphs)}")
# 8. 条件提取
print("\n8. 条件提取:")
# 提取高评分评价
high_rating_reviews = soup.find_all('div', class_='review', attrs={'data-rating': lambda x: x and int(x) >= 4})
print(f"高评分评价数量: {len(high_rating_reviews)}")
# 提取有用评价(有用数>20)
useful_reviews = []
for review in soup.find_all('div', class_='review'):
helpful_btn = review.find('button', class_='helpful-btn')
if helpful_btn:
count = helpful_btn.get('data-count')
if count and int(count) > 20:
reviewer = review.find('span', class_='reviewer')
useful_reviews.append(reviewer.get_text() if reviewer else 'Anonymous')
print(f"有用评价用户: {useful_reviews}")
# 9. 数据验证和错误处理
print("\n9. 数据验证和错误处理:")
# 安全提取价格
def safe_extract_price(element):
if not element:
return None
price_text = element.get_text().strip()
# 提取数字
import re
price_match = re.search(r'([\d.]+)', price_text)
if price_match:
try:
return float(price_match.group(1))
except ValueError:
return None
return None
current_price_value = safe_extract_price(current_price)
original_price_value = safe_extract_price(original_price)
print(f"当前价格数值: {current_price_value}")
print(f"原价数值: {original_price_value}")
if current_price_value and original_price_value:
savings = original_price_value - current_price_value
discount_percent = (savings / original_price_value) * 100
print(f"节省金额: ¥{savings:.2f}")
print(f"折扣百分比: {discount_percent:.1f}%")
# 10. 综合数据结构
print("\n10. 综合数据结构:")
# 构建完整的产品数据结构
product_data = {
'basic_info': {
'title': title.get_text() if title else None,
'subtitle': subtitle.get_text() if subtitle else None,
'product_id': product_id if 'product_id' in locals() else None
},
'pricing': {
'current_price': current_price_value,
'original_price': original_price_value,
'discount_text': discount.get_text() if discount else None
},
'rating': {
'score': rating if 'rating' in locals() else None,
'total_reviews': len(reviews),
'rating_distribution': rating_stats
},
'specifications': specs if 'specs' in locals() else {},
'features': features if 'features' in locals() else [],
'reviews_sample': reviews[:2] # 只保留前两条评价作为示例
}
print("产品数据结构:")
import json
print(json.dumps(product_data, ensure_ascii=False, indent=2))
# 运行数据提取演示
if __name__ == "__main__":
data_extraction_demo()
终端日志:
=== 数据提取功能演示 ===
1. 基本文本提取:
商品标题: Python从入门到精通(第3版)
商品副标题: 零基础学Python,包含大量实战案例
当前价格: ¥89.00
原价: ¥128.00
折扣: 7折
2. 属性值提取:
评分: 4.5
价格数值: 89.00
产品ID: 12345
主图片: /images/python-book-cover.jpg, 描述: Python从入门到精通封面
3. 表格数据提取:
商品规格:
作者: 张三, 李四
出版社: 人民邮电出版社
出版时间: 2024年1月
页数: 568页
ISBN: 978-7-115-12345-6
重量: 0.8kg
4. 列表数据提取:
导航路径: 首页 > 图书 > 编程 > Python从入门到精通
产品特性: ['✓ 零基础入门,循序渐进', '✓ 大量实战案例,学以致用', '✓ 配套视频教程,立体学习', '✓ 技术社区支持,答疑解惑']
5. 复杂结构数据提取:
用户评价 (3条):
评价1:
用户: Python学习者
评分: 5星
日期: 2024-01-15
内容: 非常好的Python入门书籍,内容详实,案例丰富。作为零基础学习者,我能够很好地理解书中的内容。...
有用数: 23
评价2:
用户: 编程新手
评分: 4星
日期: 2024-01-10
内容: 书的质量不错,内容也比较全面。就是有些地方讲解得不够深入,需要结合其他资料学习。...
有用数: 15
评价3:
用户: 技术爱好者
评分: 5星
日期: 2024-01-08
内容: 推荐给所有想学Python的朋友!书中的实战项目很有意思,跟着做完后收获很大。...
有用数: 31
6. 评分统计提取:
评分分布:
5星: 740条 (60%)
4星: 309条 (25%)
3星: 123条 (10%)
2星: 37条 (3%)
1星: 25条 (2%)
7. 文本清理和格式化:
商品描述: 本书是Python编程的入门经典教程,适合零基础读者学习。 全书共分为15个章节,涵盖了Python的基础语法、数据结构、面向对象编程、文件操作、网络编程等核心内容。 ✓ 零基础入门,循序渐进 ✓ 大量实战案例,学以致用 ✓ 配套视频教程,立体学习 ✓ 技术社区支持,答疑解惑...
描述段落数: 2
8. 条件提取:
高评分评价数量: 2
有用评价用户: ['Python学习者', '技术爱好者']
9. 数据验证和错误处理:
当前价格数值: 89.0
原价数值: 128.0
节省金额: ¥39.00
折扣百分比: 30.5%
10. 综合数据结构:
产品数据结构:
{
"basic_info": {
"title": "Python从入门到精通(第3版)",
"subtitle": "零基础学Python,包含大量实战案例",
"product_id": "12345"
},
"pricing": {
"current_price": 89.0,
"original_price": 128.0,
"discount_text": "7折"
},
"rating": {
"score": "4.5",
"total_reviews": 3,
"rating_distribution": {
"5星": {
"count": "740",
"percentage": "60%"
},
"4星": {
"count": "309",
"percentage": "25%"
},
"3星": {
"count": "123",
"percentage": "10%"
},
"2星": {
"count": "37",
"percentage": "3%"
},
"1星": {
"count": "25",
"percentage": "2%"
}
}
},
"specifications": {
"作者": "张三, 李四",
"出版社": "人民邮电出版社",
"出版时间": "2024年1月",
"页数": "568页",
"ISBN": "978-7-115-12345-6",
"重量": "0.8kg"
},
"features": [
"✓ 零基础入门,循序渐进",
"✓ 大量实战案例,学以致用",
"✓ 配套视频教程,立体学习",
"✓ 技术社区支持,答疑解惑"
],
"reviews_sample": [
{
"reviewer": "Python学习者",
"rating": "5",
"date": "2024-01-15",
"content": "非常好的Python入门书籍,内容详实,案例丰富。作为零基础学习者,我能够很好地理解书中的内容。",
"helpful_count": "23"
},
{
"reviewer": "编程新手",
"rating": "4",
"date": "2024-01-10",
"content": "书的质量不错,内容也比较全面。就是有些地方讲解得不够深入,需要结合其他资料学习。",
"helpful_count": "15"
}
]
}
高级操作¶
文档修改¶
BeautifulSoup不仅可以解析HTML,还可以修改文档结构。
def document_modification_demo():
"""
演示文档修改功能
"""
print("=== 文档修改功能演示 ===")
# 示例HTML - 简单的博客文章
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>我的博客文章</title>
<meta name="author" content="原作者">
</head>
<body>
<div class="container">
<header>
<h1>Python学习笔记</h1>
<p class="meta">发布时间: 2024-01-01</p>
</header>
<main class="content">
<section class="intro">
<h2>简介</h2>
<p>这是一篇关于Python基础的文章。</p>
</section>
<section class="topics">
<h2>主要内容</h2>
<ul id="topic-list">
<li>变量和数据类型</li>
<li>控制结构</li>
</ul>
</section>
<section class="examples">
<h2>代码示例</h2>
<div class="code-block">
<pre><code>print("Hello, World!")</code></pre>
</div>
</section>
</main>
<footer>
<p>版权所有 © 2024</p>
</footer>
</div>
</body>
</html>
"""
soup = BeautifulSoup(html_content, 'html.parser')
print("\n1. 修改文本内容:")
# 修改标题
title_tag = soup.find('h1')
if title_tag:
old_title = title_tag.get_text()
title_tag.string = "Python高级编程技巧"
print(f"标题修改: '{old_title}' -> '{title_tag.get_text()}'")
# 修改作者信息
author_meta = soup.find('meta', attrs={'name': 'author'})
if author_meta:
old_author = author_meta.get('content')
author_meta['content'] = "技术专家"
print(f"作者修改: '{old_author}' -> '{author_meta.get('content')}'")
# 修改发布时间
meta_p = soup.find('p', class_='meta')
if meta_p:
old_time = meta_p.get_text()
meta_p.string = "发布时间: 2024-01-15 (已更新)"
print(f"时间修改: '{old_time}' -> '{meta_p.get_text()}'")
print("\n2. 添加新元素:")
# 在列表中添加新项目
topic_list = soup.find('ul', id='topic-list')
if topic_list:
# 创建新的li元素
new_li1 = soup.new_tag('li')
new_li1.string = "函数和模块"
new_li2 = soup.new_tag('li')
new_li2.string = "面向对象编程"
new_li3 = soup.new_tag('li')
new_li3.string = "异常处理"
# 添加到列表末尾
topic_list.append(new_li1)
topic_list.append(new_li2)
topic_list.append(new_li3)
print(f"添加了3个新的主题项目")
print(f"当前主题列表: {[li.get_text() for li in topic_list.find_all('li')]}")
# 添加新的代码示例
examples_section = soup.find('section', class_='examples')
if examples_section:
# 创建新的代码块
new_code_block = soup.new_tag('div', class_='code-block')
new_pre = soup.new_tag('pre')
new_code = soup.new_tag('code')
new_code.string = '''def greet(name):
return f"Hello, {name}!"
print(greet("Python"))'''
new_pre.append(new_code)
new_code_block.append(new_pre)
examples_section.append(new_code_block)
print("添加了新的代码示例")
# 添加新的section
main_content = soup.find('main', class_='content')
if main_content:
new_section = soup.new_tag('section', class_='resources')
new_h2 = soup.new_tag('h2')
new_h2.string = "学习资源"
new_ul = soup.new_tag('ul')
resources = [
"Python官方文档",
"在线编程练习",
"开源项目参与"
]
for resource in resources:
li = soup.new_tag('li')
li.string = resource
new_ul.append(li)
new_section.append(new_h2)
new_section.append(new_ul)
main_content.append(new_section)
print("添加了新的学习资源section")
print("\n3. 修改属性:")
# 修改容器类名
container = soup.find('div', class_='container')
if container:
old_class = container.get('class')
container['class'] = ['main-container', 'updated']
container['data-version'] = '2.0'
print(f"容器类名修改: {old_class} -> {container.get('class')}")
print(f"添加了data-version属性: {container.get('data-version')}")
# 为代码块添加语言标识
code_blocks = soup.find_all('div', class_='code-block')
for i, block in enumerate(code_blocks):
block['data-language'] = 'python'
block['data-line-numbers'] = 'true'
print(f"代码块{i+1}添加了语言标识和行号属性")
print("\n4. 删除元素:")
# 删除版权信息(示例)
footer = soup.find('footer')
if footer:
copyright_p = footer.find('p')
if copyright_p:
old_text = copyright_p.get_text()
copyright_p.decompose() # 完全删除元素
print(f"删除了版权信息: '{old_text}'")
print("\n5. 元素移动和重排:")
# 将简介section移动到主要内容之后
intro_section = soup.find('section', class_='intro')
topics_section = soup.find('section', class_='topics')
if intro_section and topics_section:
# 从当前位置移除
intro_section.extract()
# 插入到topics_section之后
topics_section.insert_after(intro_section)
print("将简介section移动到主要内容section之后")
print("\n6. 批量操作:")
# 为所有h2标签添加id属性
h2_tags = soup.find_all('h2')
for h2 in h2_tags:
# 生成id(将标题转换为合适的id格式)
title_text = h2.get_text().lower().replace(' ', '-').replace(',', '')
h2['id'] = f"section-{title_text}"
print(f"为h2标签添加id: {h2['id']}")
# 为所有链接添加target="_blank"
links = soup.find_all('a')
for link in links:
link['target'] = '_blank'
link['rel'] = 'noopener noreferrer'
if links:
print(f"为{len(links)}个链接添加了target和rel属性")
else:
print("没有找到链接元素")
print("\n7. 条件修改:")
# 只修改包含特定文本的元素
all_p = soup.find_all('p')
modified_count = 0
for p in all_p:
text = p.get_text()
if 'Python' in text:
# 添加强调样式
p['class'] = p.get('class', []) + ['python-related']
p['style'] = 'font-weight: bold; color: #3776ab;'
modified_count += 1
print(f"为{modified_count}个包含'Python'的段落添加了样式")
print("\n8. 创建复杂结构:")
# 创建一个导航菜单
nav = soup.new_tag('nav', class_='table-of-contents')
nav_title = soup.new_tag('h3')
nav_title.string = "目录"
nav_ul = soup.new_tag('ul')
# 基于现有的h2标签创建导航
for h2 in soup.find_all('h2'):
li = soup.new_tag('li')
a = soup.new_tag('a', href=f"#{h2.get('id', '')}")
a.string = h2.get_text()
li.append(a)
nav_ul.append(li)
nav.append(nav_title)
nav.append(nav_ul)
# 将导航插入到header之后
header = soup.find('header')
if header:
header.insert_after(nav)
print("创建并插入了目录导航")
print("\n9. 文档结构优化:")
# 添加语义化标签
main_tag = soup.find('main')
if main_tag:
# 为main标签添加role属性
main_tag['role'] = 'main'
main_tag['aria-label'] = '主要内容'
print("为main标签添加了无障碍属性")
# 添加meta标签
head = soup.find('head')
if head:
# 添加viewport meta
viewport_meta = soup.new_tag('meta', attrs={
'name': 'viewport',
'content': 'width=device-width, initial-scale=1.0'
})
# 添加description meta
desc_meta = soup.new_tag('meta', attrs={
'name': 'description',
'content': 'Python高级编程技巧学习笔记,包含函数、面向对象编程、异常处理等内容。'
})
head.append(viewport_meta)
head.append(desc_meta)
print("添加了viewport和description meta标签")
print("\n10. 输出修改后的文档:")
# 格式化输出
formatted_html = soup.prettify()
print("修改后的HTML文档:")
print(formatted_html[:1000] + "..." if len(formatted_html) > 1000 else formatted_html)
# 统计信息
print(f"\n文档统计:")
print(f" 总标签数: {len(soup.find_all())}")
print(f" 段落数: {len(soup.find_all('p'))}")
print(f" 标题数: {len(soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']))}")
print(f" 列表项数: {len(soup.find_all('li'))}")
print(f" 代码块数: {len(soup.find_all('div', class_='code-block'))}")
return soup
# 运行文档修改演示
if __name__ == "__main__":
modified_soup = document_modification_demo()
终端日志:
=== 文档修改功能演示 ===
1. 修改文本内容:
标题修改: 'Python学习笔记' -> 'Python高级编程技巧'
作者修改: '原作者' -> '技术专家'
时间修改: '发布时间: 2024-01-01' -> '发布时间: 2024-01-15 (已更新)'
2. 添加新元素:
添加了3个新的主题项目
当前主题列表: ['变量和数据类型', '控制结构', '函数和模块', '面向对象编程', '异常处理']
添加了新的代码示例
添加了新的学习资源section
3. 修改属性:
容器类名修改: ['container'] -> ['main-container', 'updated']
添加了data-version属性: 2.0
代码块1添加了语言标识和行号属性
代码块2添加了语言标识和行号属性
4. 删除元素:
删除了版权信息: '版权所有 © 2024'
5. 元素移动和重排:
将简介section移动到主要内容section之后
6. 批量操作:
为h2标签添加id: section-主要内容
为h2标签添加id: section-简介
为h2标签添加id: section-代码示例
为h2标签添加id: section-学习资源
没有找到链接元素
7. 条件修改:
为1个包含'Python'的段落添加了样式
8. 创建复杂结构:
创建并插入了目录导航
9. 文档结构优化:
为main标签添加了无障碍属性
添加了viewport和description meta标签
10. 输出修改后的文档:
修改后的HTML文档:
<!DOCTYPE html>
<html>
<head>
<title>
我的博客文章
</title>
<meta content="技术专家" name="author"/>
<meta content="width=device-width, initial-scale=1.0" name="viewport"/>
<meta content="Python高级编程技巧学习笔记,包含函数、面向对象编程、异常处理等内容。" name="description"/>
</head>
<body>
<div class="main-container updated" data-version="2.0">
<header>
<h1>
Python高级编程技巧
</h1>
<p class="meta">
发布时间: 2024-01-15 (已更新)
</p>
</header>
<nav class="table-of-contents">
<h3>
目录
</h3>
<ul>
<li>
<a href="#section-主要内容">
主要内容
</a>
</li>
<li>
<a href="#section-简介">
简介
</a>
</li>
<li>
<a href="#section-代码示例">
代码示例
</a>
</li>
<li>
<a href="#section-学习资源">
学习资源
</a>
</li>
</ul>
</nav>
<main aria-label="主要内容" class="content" role="main">
<section class="topics">
<h2 id="section-主要内容">
主要内容
</h2>
<ul id="topic-list">
<li>
变量和数据类型
</li>
<li>
控制结构
</li>
<li>
函数和模块
</li>
<li>
面向对象编程
</li>
<li>
异常处理
</li>
</ul>
</section>
<section class="intro">
<h2 id="section-简介">
简介
</h2>
<p class="python-related" style="font-weight: bold; color: #3776ab;">
这是一篇关于Python基础的文章。
</p>
</section>
<section class="examples">
<h2 id="section-代码示例">
代码示例
</h2>
<div class="code-block" data-language="python" data-line-numbers="true">
<pre><code>print("Hello, World!")</code></pre>
</div>
<div class="code-block" data-language="python" data-line-numbers="true">
<pre><code>def greet(name):
return f"Hello, {name}!"
print(greet("Python"))</code></pre>
</div>
</section>
<section class="resources">
<h2 id="section-学习资源">
学习资源
</h2>
<ul>
<li>
Python官方文档
</li>
<li>
在线编程练习
</li>
<li>
开源项目参与
</li>
</ul>
</section>
</main>
<footer>
</footer>
</div>
</body>
</html>...
文档统计:
总标签数: 32
段落数: 1
标题数: 5
列表项数: 11
代码块数: 2
元素插入和删除¶
BeautifulSoup提供了灵活的元素插入和删除方法。
def element_operations_demo():
"""
演示元素插入和删除操作
"""
print("=== 元素插入和删除操作演示 ===")
# 示例HTML - 文章列表
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>文章管理系统</title>
</head>
<body>
<div class="article-manager">
<header class="page-header">
<h1>文章列表</h1>
<div class="actions">
<button class="btn-new">新建文章</button>
</div>
</header>
<main class="article-list">
<article class="article-item" data-id="1">
<h2 class="article-title">Python基础教程</h2>
<p class="article-summary">学习Python编程的基础知识</p>
<div class="article-meta">
<span class="author">作者: 张三</span>
<span class="date">2024-01-01</span>
<span class="category">编程</span>
</div>
<div class="article-actions">
<button class="btn-edit">编辑</button>
<button class="btn-delete">删除</button>
</div>
</article>
<article class="article-item" data-id="2">
<h2 class="article-title">Web开发入门</h2>
<p class="article-summary">从零开始学习Web开发</p>
<div class="article-meta">
<span class="author">作者: 李四</span>
<span class="date">2024-01-05</span>
<span class="category">Web开发</span>
</div>
<div class="article-actions">
<button class="btn-edit">编辑</button>
<button class="btn-delete">删除</button>
</div>
</article>
</main>
<footer class="page-footer">
<p>共 2 篇文章</p>
</footer>
</div>
</body>
</html>
"""
soup = BeautifulSoup(html_content, 'html.parser')
print("\n1. 在指定位置插入元素:")
# 在第一篇文章前插入新文章
article_list = soup.find('main', class_='article-list')
first_article = soup.find('article', class_='article-item')
if article_list and first_article:
# 创建新文章
new_article = soup.new_tag('article', class_='article-item featured', **{'data-id': '0'})
# 创建文章标题
title = soup.new_tag('h2', class_='article-title')
title.string = "🔥 热门推荐:Python高级特性详解"
# 创建文章摘要
summary = soup.new_tag('p', class_='article-summary')
summary.string = "深入了解Python的高级特性和最佳实践"
# 创建元数据
meta_div = soup.new_tag('div', class_='article-meta')
author_span = soup.new_tag('span', class_='author')
author_span.string = "作者: 技术专家"
date_span = soup.new_tag('span', class_='date')
date_span.string = "2024-01-15"
category_span = soup.new_tag('span', class_='category featured-category')
category_span.string = "高级编程"
meta_div.extend([author_span, date_span, category_span])
# 创建操作按钮
actions_div = soup.new_tag('div', class_='article-actions')
edit_btn = soup.new_tag('button', class_='btn-edit')
edit_btn.string = "编辑"
delete_btn = soup.new_tag('button', class_='btn-delete')
delete_btn.string = "删除"
pin_btn = soup.new_tag('button', class_='btn-pin')
pin_btn.string = "置顶"
actions_div.extend([edit_btn, delete_btn, pin_btn])
# 组装新文章
new_article.extend([title, summary, meta_div, actions_div])
# 插入到第一篇文章前
first_article.insert_before(new_article)
print("在列表开头插入了特色文章")
# 在最后一篇文章后插入新文章
all_articles = soup.find_all('article', class_='article-item')
if all_articles:
last_article = all_articles[-1]
# 创建另一篇新文章
another_article = soup.new_tag('article', class_='article-item draft', **{'data-id': '3'})
title = soup.new_tag('h2', class_='article-title')
title.string = "📝 草稿:数据库设计原理"
summary = soup.new_tag('p', class_='article-summary')
summary.string = "数据库设计的基本原理和最佳实践(草稿状态)"
meta_div = soup.new_tag('div', class_='article-meta')
author_span = soup.new_tag('span', class_='author')
author_span.string = "作者: 王五"
date_span = soup.new_tag('span', class_='date')
date_span.string = "2024-01-16"
status_span = soup.new_tag('span', class_='status draft-status')
status_span.string = "草稿"
meta_div.extend([author_span, date_span, status_span])
actions_div = soup.new_tag('div', class_='article-actions')
edit_btn = soup.new_tag('button', class_='btn-edit primary')
edit_btn.string = "继续编辑"
publish_btn = soup.new_tag('button', class_='btn-publish')
publish_btn.string = "发布"
delete_btn = soup.new_tag('button', class_='btn-delete')
delete_btn.string = "删除"
actions_div.extend([edit_btn, publish_btn, delete_btn])
another_article.extend([title, summary, meta_div, actions_div])
# 插入到最后一篇文章后
last_article.insert_after(another_article)
print("在列表末尾插入了草稿文章")
print("\n2. 在父元素中插入子元素:")
# 在页面头部添加搜索框
page_header = soup.find('header', class_='page-header')
if page_header:
# 创建搜索区域
search_div = soup.new_tag('div', class_='search-area')
search_input = soup.new_tag('input', type='text', placeholder='搜索文章...', class_='search-input')
search_btn = soup.new_tag('button', class_='btn-search')
search_btn.string = "搜索"
search_div.extend([search_input, search_btn])
# 插入到actions div之前
actions_div = page_header.find('div', class_='actions')
if actions_div:
actions_div.insert_before(search_div)
print("在页面头部添加了搜索区域")
# 在每篇文章中添加标签
articles = soup.find_all('article', class_='article-item')
for i, article in enumerate(articles):
meta_div = article.find('div', class_='article-meta')
if meta_div:
# 创建标签容器
tags_div = soup.new_tag('div', class_='article-tags')
# 根据文章类型添加不同标签
if 'featured' in article.get('class', []):
tags = ['热门', '推荐', 'Python']
elif 'draft' in article.get('class', []):
tags = ['草稿', '数据库']
else:
tags = ['基础', '教程']
for tag in tags:
tag_span = soup.new_tag('span', class_='tag')
tag_span.string = tag
tags_div.append(tag_span)
# 插入到meta div之后
meta_div.insert_after(tags_div)
print(f"为文章{i+1}添加了标签")
print("\n3. 删除元素:")
# 删除第二篇文章(原来的第一篇)
articles = soup.find_all('article', class_='article-item')
if len(articles) > 1:
article_to_delete = articles[1] # 第二篇文章
article_title = article_to_delete.find('h2', class_='article-title')
title_text = article_title.get_text() if article_title else "未知标题"
article_to_delete.decompose() # 完全删除
print(f"删除了文章: '{title_text}'")
# 删除所有草稿状态的文章
draft_articles = soup.find_all('article', class_='draft')
deleted_drafts = []
for draft in draft_articles:
title_elem = draft.find('h2', class_='article-title')
if title_elem:
deleted_drafts.append(title_elem.get_text())
draft.decompose()
if deleted_drafts:
print(f"删除了草稿文章: {deleted_drafts}")
else:
print("没有找到草稿文章")
# 删除特定的按钮
pin_buttons = soup.find_all('button', class_='btn-pin')
for btn in pin_buttons:
btn.decompose()
if pin_buttons:
print(f"删除了{len(pin_buttons)}个置顶按钮")
print("\n4. 替换元素:")
# 替换页面标题
page_title = soup.find('h1')
if page_title:
old_title = page_title.get_text()
# 创建新的标题元素
new_title = soup.new_tag('h1', class_='main-title')
new_title.string = "📚 技术文章管理中心"
# 替换
page_title.replace_with(new_title)
print(f"页面标题替换: '{old_title}' -> '{new_title.get_text()}'")
# 替换所有编辑按钮为更详细的按钮
edit_buttons = soup.find_all('button', class_='btn-edit')
for btn in edit_buttons:
# 创建新的按钮组
btn_group = soup.new_tag('div', class_='btn-group')
quick_edit = soup.new_tag('button', class_='btn-quick-edit')
quick_edit.string = "快速编辑"
full_edit = soup.new_tag('button', class_='btn-full-edit')
full_edit.string = "完整编辑"
btn_group.extend([quick_edit, full_edit])
# 替换原按钮
btn.replace_with(btn_group)
print(f"替换了{len(edit_buttons)}个编辑按钮为按钮组")
print("\n5. 移动元素:")
# 将搜索区域移动到标题之前
search_area = soup.find('div', class_='search-area')
main_title = soup.find('h1', class_='main-title')
if search_area and main_title:
# 提取搜索区域
search_area.extract()
# 插入到标题之前
main_title.insert_before(search_area)
print("将搜索区域移动到标题之前")
# 重新排序文章(按日期)
article_list = soup.find('main', class_='article-list')
if article_list:
articles = article_list.find_all('article', class_='article-item')
# 提取所有文章
article_data = []
for article in articles:
date_elem = article.find('span', class_='date')
date_str = date_elem.get_text() if date_elem else "2024-01-01"
article_data.append((date_str, article.extract()))
# 按日期排序(最新的在前)
article_data.sort(key=lambda x: x[0], reverse=True)
# 重新插入排序后的文章
for date_str, article in article_data:
article_list.append(article)
print(f"按日期重新排序了{len(article_data)}篇文章")
print("\n6. 批量操作:")
# 为所有文章添加阅读时间估算
articles = soup.find_all('article', class_='article-item')
for article in articles:
summary = article.find('p', class_='article-summary')
if summary:
# 估算阅读时间(基于摘要长度)
text_length = len(summary.get_text())
read_time = max(1, text_length // 50) # 假设每50个字符需要1分钟
read_time_span = soup.new_tag('span', class_='read-time')
read_time_span.string = f"预计阅读: {read_time}分钟"
# 插入到摘要之后
summary.insert_after(read_time_span)
print(f"为{len(articles)}篇文章添加了阅读时间估算")
# 更新文章计数
footer = soup.find('footer', class_='page-footer')
if footer:
count_p = footer.find('p')
if count_p:
current_count = len(soup.find_all('article', class_='article-item'))
count_p.string = f"共 {current_count} 篇文章"
print(f"更新了文章计数: {current_count}")
print("\n7. 条件操作:")
# 只对特色文章添加特殊标记
featured_articles = soup.find_all('article', class_='featured')
for article in featured_articles:
title = article.find('h2', class_='article-title')
if title and not title.get_text().startswith('🔥'):
title.string = f"🔥 {title.get_text()}"
print(f"为{len(featured_articles)}篇特色文章添加了火焰标记")
# 为长摘要添加展开/收起功能
summaries = soup.find_all('p', class_='article-summary')
long_summaries = 0
for summary in summaries:
if len(summary.get_text()) > 30: # 超过30个字符认为是长摘要
summary['class'] = summary.get('class', []) + ['long-summary']
summary['data-full-text'] = summary.get_text()
# 创建展开按钮
expand_btn = soup.new_tag('button', class_='btn-expand')
expand_btn.string = "展开"
summary.insert_after(expand_btn)
long_summaries += 1
print(f"为{long_summaries}个长摘要添加了展开功能")
print("\n8. 最终文档统计:")
# 统计最终结果
final_stats = {
'总文章数': len(soup.find_all('article', class_='article-item')),
'特色文章数': len(soup.find_all('article', class_='featured')),
'草稿文章数': len(soup.find_all('article', class_='draft')),
'总按钮数': len(soup.find_all('button')),
'标签数': len(soup.find_all('span', class_='tag')),
'总元素数': len(soup.find_all())
}
for key, value in final_stats.items():
print(f" {key}: {value}")
# 输出部分修改后的HTML
print("\n9. 修改后的HTML片段:")
article_list = soup.find('main', class_='article-list')
if article_list:
first_article = article_list.find('article')
if first_article:
print(first_article.prettify()[:500] + "...")
return soup
# 运行元素操作演示
if __name__ == "__main__":
modified_soup = element_operations_demo()
编码处理¶
BeautifulSoup能够自动处理各种字符编码问题。
def encoding_demo():
"""
演示编码处理功能
"""
print("=== 编码处理功能演示 ===")
# 1. 自动编码检测
print("\n1. 自动编码检测:")
# 不同编码的HTML内容
utf8_html = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>中文测试页面</title>
</head>
<body>
<h1>欢迎来到Python学习网站</h1>
<p>这里有丰富的Python教程和实例。</p>
<div class="content">
<h2>特殊字符测试</h2>
<p>数学符号: α β γ δ ε ∑ ∏ ∫</p>
<p>货币符号: ¥ $ € £ ₹</p>
<p>表情符号: 😀 😃 😄 😁 🚀 🎉</p>
<p>其他语言: こんにちは 안녕하세요 Здравствуйте</p>
</div>
</body>
</html>
"""
# 使用BeautifulSoup解析UTF-8内容
soup_utf8 = BeautifulSoup(utf8_html, 'html.parser')
print(f"UTF-8解析结果:")
print(f" 标题: {soup_utf8.find('title').get_text()}")
print(f" 主标题: {soup_utf8.find('h1').get_text()}")
# 获取原始编码信息
original_encoding = soup_utf8.original_encoding
print(f" 检测到的原始编码: {original_encoding}")
# 2. 处理不同编码的内容
print("\n2. 处理不同编码的内容:")
# 模拟GBK编码的内容
gbk_content = "<html><body><h1>中文标题</h1><p>这是GBK编码的内容</p></body></html>"
try:
# 将字符串编码为GBK字节
gbk_bytes = gbk_content.encode('gbk')
print(f"GBK字节长度: {len(gbk_bytes)}")
# 使用BeautifulSoup解析GBK字节
soup_gbk = BeautifulSoup(gbk_bytes, 'html.parser', from_encoding='gbk')
print(f"GBK解析结果:")
print(f" 标题: {soup_gbk.find('h1').get_text()}")
print(f" 段落: {soup_gbk.find('p').get_text()}")
except UnicodeEncodeError as e:
print(f"GBK编码错误: {e}")
# 3. 编码转换
print("\n3. 编码转换:")
# 获取不同编码格式的输出
html_str = str(soup_utf8)
# UTF-8编码
utf8_bytes = html_str.encode('utf-8')
print(f"UTF-8编码字节数: {len(utf8_bytes)}")
# 尝试其他编码
encodings_to_test = ['utf-8', 'gbk', 'iso-8859-1', 'ascii']
for encoding in encodings_to_test:
try:
encoded_bytes = html_str.encode(encoding)
print(f"{encoding.upper()}编码: 成功,{len(encoded_bytes)}字节")
except UnicodeEncodeError as e:
print(f"{encoding.upper()}编码: 失败 - {str(e)[:50]}...")
# 4. 处理编码错误
print("\n4. 处理编码错误:")
# 创建包含特殊字符的内容
special_html = """
<html>
<body>
<h1>特殊字符处理测试</h1>
<p>包含emoji: 🐍 Python编程</p>
<p>数学公式: E = mc²</p>
<p>版权符号: © 2024</p>
<p>商标符号: Python™</p>
</body>
</html>
"""
soup_special = BeautifulSoup(special_html, 'html.parser')
# 不同的错误处理策略
error_strategies = ['ignore', 'replace', 'xmlcharrefreplace']
for strategy in error_strategies:
try:
# 尝试编码为ASCII(会出错)
ascii_result = str(soup_special).encode('ascii', errors=strategy)
decoded_result = ascii_result.decode('ascii')
print(f"ASCII编码策略'{strategy}': 成功")
print(f" 结果长度: {len(decoded_result)}字符")
# 显示处理后的标题
soup_result = BeautifulSoup(decoded_result, 'html.parser')
title = soup_result.find('h1')
if title:
print(f" 处理后标题: {title.get_text()}")
except Exception as e:
print(f"ASCII编码策略'{strategy}': 失败 - {e}")
# 5. 自定义编码处理
print("\n5. 自定义编码处理:")
def safe_encode_html(soup_obj, target_encoding='utf-8', fallback_encoding='ascii'):
"""
安全地将BeautifulSoup对象编码为指定格式
"""
html_str = str(soup_obj)
try:
# 尝试目标编码
return html_str.encode(target_encoding)
except UnicodeEncodeError:
print(f" {target_encoding}编码失败,尝试{fallback_encoding}")
try:
# 使用替换策略的后备编码
return html_str.encode(fallback_encoding, errors='xmlcharrefreplace')
except UnicodeEncodeError:
print(f" {fallback_encoding}编码也失败,使用忽略策略")
return html_str.encode(fallback_encoding, errors='ignore')
# 测试自定义编码函数
safe_bytes = safe_encode_html(soup_special, 'ascii')
print(f"安全编码结果: {len(safe_bytes)}字节")
# 解码并验证
safe_html = safe_bytes.decode('ascii')
safe_soup = BeautifulSoup(safe_html, 'html.parser')
safe_title = safe_soup.find('h1')
if safe_title:
print(f"安全编码后标题: {safe_title.get_text()}")
# 6. 编码声明处理
print("\n6. 编码声明处理:")
# 检查和修改编码声明
meta_charset = soup_utf8.find('meta', attrs={'charset': True})
if meta_charset:
original_charset = meta_charset.get('charset')
print(f"原始字符集声明: {original_charset}")
# 修改字符集声明
meta_charset['charset'] = 'UTF-8'
print(f"修改后字符集声明: {meta_charset.get('charset')}")
# 添加编码声明(如果不存在)
head = soup_utf8.find('head')
if head and not head.find('meta', attrs={'charset': True}):
charset_meta = soup_utf8.new_tag('meta', charset='UTF-8')
head.insert(0, charset_meta)
print("添加了字符集声明")
# 7. 内容编码验证
print("\n7. 内容编码验证:")
def validate_encoding(html_content, expected_encoding='utf-8'):
"""
验证HTML内容的编码
"""
try:
if isinstance(html_content, str):
# 字符串内容,尝试编码
html_content.encode(expected_encoding)
return True, "字符串内容编码有效"
elif isinstance(html_content, bytes):
# 字节内容,尝试解码
html_content.decode(expected_encoding)
return True, "字节内容编码有效"
else:
return False, "未知内容类型"
except UnicodeError as e:
return False, f"编码验证失败: {e}"
# 验证不同内容的编码
test_contents = [
(utf8_html, 'utf-8'),
(str(soup_utf8), 'utf-8'),
(str(soup_special), 'utf-8')
]
for content, encoding in test_contents:
is_valid, message = validate_encoding(content, encoding)
print(f" {encoding}编码验证: {'✓' if is_valid else '✗'} {message}")
# 8. 编码统计信息
print("\n8. 编码统计信息:")
def analyze_encoding(soup_obj):
"""
分析BeautifulSoup对象的编码信息
"""
html_str = str(soup_obj)
stats = {
'总字符数': len(html_str),
'ASCII字符数': sum(1 for c in html_str if ord(c) < 128),
'非ASCII字符数': sum(1 for c in html_str if ord(c) >= 128),
'中文字符数': sum(1 for c in html_str if '\u4e00' <= c <= '\u9fff'),
'表情符号数': sum(1 for c in html_str if ord(c) > 0x1F600),
}
# 计算不同编码的字节数
for encoding in ['utf-8', 'utf-16', 'utf-32']:
try:
byte_count = len(html_str.encode(encoding))
stats[f'{encoding.upper()}字节数'] = byte_count
except UnicodeEncodeError:
stats[f'{encoding.upper()}字节数'] = '编码失败'
return stats
# 分析特殊字符内容
encoding_stats = analyze_encoding(soup_special)
print("特殊字符内容编码分析:")
for key, value in encoding_stats.items():
print(f" {key}: {value}")
# 9. 编码最佳实践建议
print("\n9. 编码最佳实践建议:")
recommendations = [
"✓ 始终使用UTF-8编码处理HTML内容",
"✓ 在HTML头部明确声明字符集",
"✓ 处理用户输入时验证编码",
"✓ 使用适当的错误处理策略",
"✓ 测试特殊字符和多语言内容",
"✓ 避免混合使用不同编码"
]
for rec in recommendations:
print(f" {rec}")
return soup_utf8, soup_special
# 运行编码处理演示
if __name__ == "__main__":
utf8_soup, special_soup = encoding_demo()
终端日志:
=== 编码处理功能演示 ===
1. 自动编码检测:
UTF-8解析结果:
标题: 中文测试页面
主标题: 欢迎来到Python学习网站
检测到的原始编码: None
2. 处理不同编码的内容:
GBK字节长度: 59
GBK解析结果:
标题: 中文标题
段落: 这是GBK编码的内容
3. 编码转换:
UTF-8编码字节数: 674
UTF-8编码: 成功,674字节
GBK编码: 成功,638字节
ISO-8859-1编码: 失败 - 'latin-1' codec can't encode character '\u4e2d'...
ASCII编码: 失败 - 'ascii' codec can't encode character '\u4e2d' in...
4. 处理编码错误:
ASCII编码策略'ignore': 成功
结果长度: 158字符
处理后标题:
ASCII编码策略'replace': 成功
结果长度: 398字符
处理后标题: ????????????
ASCII编码策略'xmlcharrefreplace': 成功
结果长度: 1058字符
处理后标题: 特殊字符处理测试
5. 自定义编码处理:
utf-8编码失败,尝试ascii
安全编码结果: 1058字节
安全编码后标题: 特殊字符处理测试
6. 编码声明处理:
原始字符集声明: UTF-8
修改后字符集声明: UTF-8
7. 内容编码验证:
utf-8编码验证: ✓ 字符串内容编码有效
utf-8编码验证: ✓ 字符串内容编码有效
utf-8编码验证: ✓ 字符串内容编码有效
8. 编码统计信息:
特殊字符内容编码分析:
总字符数: 254
ASCII字符数: 158
非ASCII字符数: 96
中文字符数: 12
表情符号数: 1
UTF-8字节数: 302
UTF-16字节数: 510
UTF-32字节数: 1018
9. 编码最佳实践建议:
✓ 始终使用UTF-8编码处理HTML内容
✓ 在HTML头部明确声明字符集
✓ 处理用户输入时验证编码
✓ 使用适当的错误处理策略
✓ 测试特殊字符和多语言内容
✓ 避免混合使用不同编码