第13章 Web开发¶
前言¶
本章将带领读者深入学习Python Web开发的各个方面,从基础的Web开发概念到主流框架的实际应用。我们将学习Flask轻量级框架、Django全栈框架和FastAPI现代框架,掌握数据库操作、前端集成、API设计等核心技能。通过大量的实际代码示例和项目实践,读者将能够独立开发完整的Web应用程序。
13.1 Web开发基础¶
Web应用架构概述¶
Web应用程序采用客户端-服务器架构模式,客户端(通常是浏览器)通过HTTP协议向服务器发送请求,服务器处理请求后返回响应。现代Web应用通常分为三层架构:
- 表示层(Presentation Layer):负责用户界面和用户交互
- 业务逻辑层(Business Logic Layer):处理业务规则和数据处理
- 数据访问层(Data Access Layer):负责数据存储和检索
HTTP协议基础¶
HTTP(HyperText Transfer Protocol)是Web通信的基础协议。让我们通过一个简单的Python脚本来理解HTTP请求和响应:
# http_demo.py
import requests
# 发送GET请求
response = requests.get('https://httpbin.org/get')
print(f"状态码: {response.status_code}")
print(f"响应头: {response.headers}")
print(f"响应内容: {response.json()}")
# 发送POST请求
data = {'name': 'Python学习者', 'skill': 'Web开发'}
response = requests.post('https://httpbin.org/post', json=data)
print(f"POST响应: {response.json()}")
运行上述代码,输出类似如下:
状态码: 200
响应头: {'Date': 'Mon, 15 Jan 2024 10:30:00 GMT', 'Content-Type': 'application/json', 'Content-Length': '312'}
响应内容: {'args': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'python-requests/2.28.1'}, 'origin': '192.168.1.100', 'url': 'https://httpbin.org/get'}
POST响应: {'args': {}, 'data': '{"name": "Python学习者", "skill": "Web开发"}', 'files': {}, 'form': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '52', 'Content-Type': 'application/json', 'Host': 'httpbin.org', 'User-Agent': 'python-requests/2.28.1'}, 'json': {'name': 'Python学习者', 'skill': 'Web开发'}, 'origin': '192.168.1.100', 'url': 'https://httpbin.org/post'}
Python Web开发生态¶
Python拥有丰富的Web开发生态系统,主要框架对比如下:
| 框架 | 特点 | 适用场景 | 学习难度 |
|---|---|---|---|
| Flask | 轻量级、灵活 | 小型项目、API开发 | 低 |
| Django | 全栈、功能完整 | 大型项目、快速开发 | 中 |
| FastAPI | 现代、高性能 | API开发、微服务 | 中 |
| Tornado | 异步、高并发 | 实时应用 | 高 |
13.2 Flask轻量级框架¶
Flask安装和环境配置¶
首先创建虚拟环境并安装Flask:
# 创建虚拟环境
python -m venv flask_env
# 激活虚拟环境(Windows)
flask_env\Scripts\activate
# 安装Flask
pip install Flask
安装成功后,输出类似如下:
Collecting Flask
Downloading Flask-2.3.3-py3-none-any.whl (96 kB)
Collecting Werkzeug>=2.3.7
Downloading Werkzeug-2.3.7-py3-none-any.whl (242 kB)
Collecting Jinja2>=3.1.2
Downloading Jinja2-3.1.2-py3-none-any.whl (133 kB)
Collecting itsdangerous>=2.1.2
Downloading itsdangerous-2.1.2-py3-none-any.whl (15 kB)
Collecting click>=8.1.3
Downloading click-8.1.7-py3-none-any.whl (97 kB)
Installing collected packages: Werkzeug, Jinja2, itsdangerous, click, Flask
Successfully installed Flask-2.3.3 Jinja2-3.1.2 Werkzeug-2.3.7 click-8.1.7 itsdangerous-2.1.2
第一个Flask应用¶
创建一个简单的Flask应用:
# app.py
from flask import Flask, render_template, request, jsonify
app = Flask(__name__)
@app.route('/api/users', methods=['GET', 'POST'])
def users():
if request.method == 'POST':
try:
user = user_schema.load(request.json)
db.session.add(user)
db.session.commit()
return user_schema.jsonify(user), 201
except ValidationError as err:
return jsonify({'errors': err.messages}), 400
# GET请求
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
users = User.query.paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
'users': users_schema.dump(users.items),
'pagination': {
'page': users.page,
'pages': users.pages,
'per_page': users.per_page,
'total': users.total
}
})
@app.route('/api/users/<int:user_id>', methods=['GET', 'PUT', 'DELETE'])
def user_detail(user_id):
user = User.query.get_or_404(user_id)
if request.method == 'GET':
return user_schema.jsonify(user)
elif request.method == 'PUT':
try:
user = user_schema.load(request.json, instance=user, partial=True)
db.session.commit()
return user_schema.jsonify(user)
except ValidationError as err:
return jsonify({'errors': err.messages}), 400
elif request.method == 'DELETE':
db.session.delete(user)
db.session.commit()
return '', 204
@app.route('/api/posts', methods=['GET', 'POST'])
def posts():
if request.method == 'POST':
try:
post = post_schema.load(request.json)
db.session.add(post)
db.session.commit()
return post_schema.jsonify(post), 201
except ValidationError as err:
return jsonify({'errors': err.messages}), 400
# GET请求
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
category = request.args.get('category')
query = Post.query
if category:
query = query.filter(Post.category == category)
posts = query.paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
'posts': posts_schema.dump(posts.items),
'pagination': {
'page': posts.page,
'pages': posts.pages,
'per_page': posts.per_page,
'total': posts.total
}
})
@app.route('/api/posts/<int:post_id>', methods=['GET', 'PUT', 'DELETE'])
def post_detail(post_id):
post = Post.query.get_or_404(post_id)
if request.method == 'GET':
return post_schema.jsonify(post)
elif request.method == 'PUT':
try:
post = post_schema.load(request.json, instance=post, partial=True)
post.updated_at = datetime.utcnow()
db.session.commit()
return post_schema.jsonify(post)
except ValidationError as err:
return jsonify({'errors': err.messages}), 400
elif request.method == 'DELETE':
db.session.delete(post)
db.session.commit()
return '', 204
if __name__ == '__main__':
with app.app_context():
db.create_all()
app.run(debug=True)
运行RESTful API服务:
python restful_api.py
输出类似如下:
* Running on http://127.0.0.1:5000
* Debug mode: on
* Restarting with stat
* Debugger is active!
* Debugger PIN: 345-678-901
API测试示例¶
使用curl命令测试API:
# 创建用户
curl -X POST http://localhost:5000/api/users \
-H "Content-Type: application/json" \
-d '{"username": "testuser", "email": "test@example.com"}'
# 获取用户列表
curl http://localhost:5000/api/users
# 创建文章
curl -X POST http://localhost:5000/api/posts \
-H "Content-Type: application/json" \
-d '{"title": "测试文章", "content": "这是一篇测试文章", "user_id": 1, "category": "测试"}'
# 获取文章列表
curl http://localhost:5000/api/posts
13.8 部署和运维¶
生产环境部署¶
使用Gunicorn部署Flask应用¶
安装Gunicorn:
pip install gunicorn
创建WSGI入口文件:
# wsgi.py
from app import app
if __name__ == "__main__":
app.run()
使用Gunicorn启动应用:
gunicorn --bind 0.0.0.0:8000 --workers 4 wsgi:app
输出类似如下:
[2024-01-15 15:30:00 +0000] [12345] [INFO] Starting gunicorn 20.1.0
[2024-01-15 15:30:00 +0000] [12345] [INFO] Listening at: http://0.0.0.0:8000 (12345)
[2024-01-15 15:30:00 +0000] [12345] [INFO] Using worker: sync
[2024-01-15 15:30:00 +0000] [12346] [INFO] Booting worker with pid: 12346
[2024-01-15 15:30:00 +0000] [12347] [INFO] Booting worker with pid: 12347
[2024-01-15 15:30:00 +0000] [12348] [INFO] Booting worker with pid: 12348
[2024-01-15 15:30:00 +0000] [12349] [INFO] Booting worker with pid: 12349
Nginx反向代理配置¶
创建Nginx配置文件:
# /etc/nginx/sites-available/flask_app
server {
listen 80;
server_name your-domain.com;
location / {
proxy_pass http://127.0.0.1:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
location /static {
alias /path/to/your/app/static;
expires 30d;
add_header Cache-Control "public, immutable";
}
# 启用gzip压缩
gzip on;
gzip_vary on;
gzip_min_length 1024;
gzip_types text/plain text/css text/xml text/javascript application/javascript application/xml+rss application/json;
}
Docker容器化部署¶
创建Dockerfile:
# Dockerfile
FROM python:3.9-slim
# 设置工作目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 8000
# 创建非root用户
RUN useradd --create-home --shell /bin/bash app \
&& chown -R app:app /app
USER app
# 启动命令
CMD ["gunicorn", "--bind", "0.0.0.0:8000", "--workers", "4", "wsgi:app"]
创建docker-compose.yml:
# docker-compose.yml
version: '3.8'
services:
web:
build: .
ports:
- "8000:8000"
environment:
- FLASK_ENV=production
- DATABASE_URL=postgresql://user:password@db:5432/blogdb
depends_on:
- db
volumes:
- ./logs:/app/logs
db:
image: postgres:13
environment:
- POSTGRES_DB=blogdb
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- web
volumes:
postgres_data:
构建和运行容器:
# 构建镜像
docker-compose build
# 启动服务
docker-compose up -d
# 查看日志
docker-compose logs -f web
输出类似如下:
Building web
Step 1/8 : FROM python:3.9-slim
---> 1234567890ab
Step 2/8 : WORKDIR /app
---> Running in abcdef123456
Removing intermediate container abcdef123456
---> 234567890abc
...
Successfully built 345678901bcd
Successfully tagged flask_app_web:latest
Creating flask_app_db_1 ... done
Creating flask_app_web_1 ... done
Creating flask_app_nginx_1 ... done
13.9 性能优化¶
应用性能优化¶
数据库查询优化¶
# 优化前:N+1查询问题
def get_posts_with_authors_slow():
posts = Post.query.all()
result = []
for post in posts:
result.append({
'title': post.title,
'author': post.author.username # 每次都查询数据库
})
return result
# 优化后:使用join预加载
def get_posts_with_authors_fast():
posts = Post.query.options(db.joinedload(Post.author)).all()
result = []
for post in posts:
result.append({
'title': post.title,
'author': post.author.username # 已预加载,无需额外查询
})
return result
# 使用原生SQL进行复杂查询
def get_post_statistics():
result = db.session.execute("""
SELECT
u.username,
COUNT(p.id) as post_count,
AVG(LENGTH(p.content)) as avg_content_length
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
GROUP BY u.id, u.username
ORDER BY post_count DESC
""")
return [{
'username': row.username,
'post_count': row.post_count,
'avg_content_length': row.avg_content_length
} for row in result]
缓存策略实现¶
# cache.py
from flask import Flask
from flask_caching import Cache
import redis
import json
from functools import wraps
app = Flask(__name__)
# 配置缓存
app.config['CACHE_TYPE'] = 'redis'
app.config['CACHE_REDIS_URL'] = 'redis://localhost:6379/0'
cache = Cache(app)
# Redis连接
redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 自定义缓存装饰器
def cache_result(timeout=300, key_prefix=''):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# 生成缓存键
cache_key = f"{key_prefix}:{f.__name__}:{hash(str(args) + str(kwargs))}"
# 尝试从缓存获取
cached_result = redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# 执行函数并缓存结果
result = f(*args, **kwargs)
redis_client.setex(cache_key, timeout, json.dumps(result, default=str))
return result
return decorated_function
return decorator
# 使用缓存的API端点
@app.route('/api/posts/popular')
@cache_result(timeout=600, key_prefix='popular_posts')
def get_popular_posts():
# 模拟复杂的查询
posts = db.session.execute("""
SELECT p.*, u.username, COUNT(c.id) as comment_count
FROM posts p
JOIN users u ON p.user_id = u.id
LEFT JOIN comments c ON p.id = c.post_id
GROUP BY p.id
ORDER BY comment_count DESC, p.created_at DESC
LIMIT 10
""").fetchall()
return [{
'id': post.id,
'title': post.title,
'author': post.username,
'comment_count': post.comment_count
} for post in posts]
# 缓存失效策略
@app.route('/api/posts', methods=['POST'])
def create_post():
# 创建文章逻辑...
post = create_new_post(request.json)
# 清除相关缓存
cache.delete_memoized(get_popular_posts)
redis_client.delete('popular_posts:*')
return post_schema.jsonify(post), 201
异步处理和后台任务¶
# tasks.py
from celery import Celery
from flask import Flask
from flask_mail import Mail, Message
import time
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
# 配置邮件
app.config['MAIL_SERVER'] = 'smtp.gmail.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USERNAME'] = 'your-email@gmail.com'
app.config['MAIL_PASSWORD'] = 'your-password'
mail = Mail(app)
# 初始化Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def send_email_async(subject, recipients, body):
"""异步发送邮件"""
with app.app_context():
msg = Message(
subject=subject,
recipients=recipients,
body=body
)
mail.send(msg)
return f"邮件已发送给: {', '.join(recipients)}"
@celery.task
def generate_report_async(user_id):
"""异步生成用户报告"""
# 模拟耗时操作
time.sleep(10)
# 生成报告逻辑
user = User.query.get(user_id)
posts = Post.query.filter_by(user_id=user_id).all()
report = {
'user': user.username,
'total_posts': len(posts),
'total_words': sum(len(post.content.split()) for post in posts),
'generated_at': datetime.utcnow().isoformat()
}
# 发送报告邮件
send_email_async.delay(
subject=f"用户报告 - {user.username}",
recipients=[user.email],
body=f"您的报告已生成:\n{json.dumps(report, indent=2, ensure_ascii=False)}"
)
return report
# API端点
@app.route('/api/users/<int:user_id>/report', methods=['POST'])
def request_user_report(user_id):
# 启动异步任务
task = generate_report_async.delay(user_id)
return jsonify({
'message': '报告生成任务已启动',
'task_id': task.id,
'status_url': f'/api/tasks/{task.id}'
}), 202
@app.route('/api/tasks/<task_id>')
def get_task_status(task_id):
task = generate_report_async.AsyncResult(task_id)
if task.state == 'PENDING':
response = {
'state': task.state,
'status': '任务等待中...'
}
elif task.state == 'PROGRESS':
response = {
'state': task.state,
'status': task.info.get('status', ''),
'progress': task.info.get('progress', 0)
}
elif task.state == 'SUCCESS':
response = {
'state': task.state,
'result': task.result
}
else:
response = {
'state': task.state,
'error': str(task.info)
}
return jsonify(response)
启动Celery worker:
celery -A tasks.celery worker --loglevel=info
输出类似如下:
-------------- celery@DESKTOP-ABC123 v5.2.0 (dawn-chorus)
--- ***** -----
-- ******* ---- Windows-10-10.0.19041-SP0 2024-01-15 16:00:00
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x1234567890ab
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.generate_report_async
. tasks.send_email_async
[2024-01-15 16:00:00,123: INFO/MainProcess] Connected to redis://localhost:6379/0
[2024-01-15 16:00:00,124: INFO/MainProcess] mingle: searching for available workers
[2024-01-15 16:00:00,125: INFO/MainProcess] mingle: all alone
[2024-01-15 16:00:00,126: INFO/MainProcess] celery@DESKTOP-ABC123 ready.
监控和日志¶
应用监控配置¶
# monitoring.py
from flask import Flask, request, g
from prometheus_flask_exporter import PrometheusMetrics
import logging
import time
import psutil
import os
app = Flask(__name__)
# 配置Prometheus监控
metrics = PrometheusMetrics(app)
metrics.info('flask_app_info', 'Application info', version='1.0.0')
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(name)s %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 请求日志中间件
@app.before_request
def before_request():
g.start_time = time.time()
logger.info(f"Request started: {request.method} {request.url}")
@app.after_request
def after_request(response):
duration = time.time() - g.start_time
logger.info(
f"Request completed: {request.method} {request.url} "
f"Status: {response.status_code} Duration: {duration:.3f}s"
)
return response
# 系统监控端点
@app.route('/health')
def health_check():
return jsonify({
'status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'version': '1.0.0'
})
@app.route('/metrics/system')
def system_metrics():
return jsonify({
'cpu_percent': psutil.cpu_percent(),
'memory_percent': psutil.virtual_memory().percent,
'disk_percent': psutil.disk_usage('/').percent,
'process_count': len(psutil.pids()),
'load_average': os.getloadavg() if hasattr(os, 'getloadavg') else None
})
# 自定义监控装饰器
def monitor_performance(operation_name):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
start_time = time.time()
try:
result = f(*args, **kwargs)
duration = time.time() - start_time
logger.info(f"Operation {operation_name} completed in {duration:.3f}s")
return result
except Exception as e:
duration = time.time() - start_time
logger.error(f"Operation {operation_name} failed after {duration:.3f}s: {str(e)}")
raise
return decorated_function
return decorator
# 使用监控装饰器
@app.route('/api/posts')
@monitor_performance('get_posts')
def get_posts():
posts = Post.query.all()
return posts_schema.jsonify(posts)
总结¶
本章全面介绍了Python Web开发的各个方面,从基础概念到实际应用。我们学习了:
- Web开发基础:理解了Web应用架构、HTTP协议和Python Web生态系统
- Flask框架:掌握了轻量级框架的使用,包括路由、模板、数据库集成
- Django框架:学习了全栈框架的MVC架构和完整的开发流程
- FastAPI框架:了解了现代API框架的特性和异步编程
- 数据库操作:掌握了SQLite基础操作和SQLAlchemy ORM的使用
- 前端集成:学习了AJAX技术和前后端数据交互
- API设计:理解了RESTful API的设计原则和最佳实践
- 部署运维:掌握了生产环境部署和容器化技术
- 性能优化:学习了缓存策略、异步处理和监控技术
通过本章的学习,读者应该能够:
- 选择合适的Web框架进行项目开发
- 设计和实现完整的Web应用程序
- 处理数据库操作和前后端交互
- 部署应用到生产环境
- 优化应用性能和监控系统状态
Web开发是一个不断发展的领域,建议读者继续关注新技术和最佳实践,在实际项目中不断提升技能。route(‘/’)
def home():
return ‘
欢迎来到Python Web开发世界!
‘@app.route(‘/user/ 欢迎访问您的个人主页!
def user_profile(name):
return f’用户:{name}
@app.route(‘/api/data’, methods=[‘GET’, ‘POST’])
def api_data():
if request.method == ‘POST’:
data = request.get_json()
return jsonify({
‘message’: ‘数据接收成功’,
‘received_data’: data,
‘status’: ‘success’
})
else:
return jsonify({
‘message’: ‘Hello from Flask API’,
‘version’: ‘1.0’,
‘endpoints’: [‘/api/data’, ‘/user/
})
if name == ‘main’:
app.run(debug=True, host=‘0.0.0.0’, port=5000)
运行Flask应用:
```shell
python app.py
输出类似如下:
* Running on all addresses (0.0.0.0)
* Running on http://127.0.0.1:5000
* Running on http://192.168.1.100:5000
* Debug mode: on
* Restarting with stat
* Debugger is active!
* Debugger PIN: 123-456-789
Flask模板系统¶
Flask使用Jinja2模板引擎。首先创建模板目录结构:
flask_project/
├── app.py
├── templates/
│ ├── base.html
│ ├── index.html
│ └── user.html
└── static/
├── css/
│ └── style.css
└── js/
└── main.js
创建基础模板:
<!-- templates/base.html -->
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{% block title %}Flask Web应用{% endblock %}</title>
<link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
</head>
<body>
<nav class="navbar">
<div class="nav-container">
<h1><a href="{{ url_for('home') }}">Flask Demo</a></h1>
<ul class="nav-menu">
<li><a href="{{ url_for('home') }}">首页</a></li>
<li><a href="{{ url_for('about') }}">关于</a></li>
</ul>
</div>
</nav>
<main class="container">
{% block content %}{% endblock %}
</main>
<footer>
<p>© 2024 Python Web开发教程</p>
</footer>
<script src="{{ url_for('static', filename='js/main.js') }}"></script>
</body>
</html>
创建首页模板:
<!-- templates/index.html -->
{% extends "base.html" %}
{% block title %}首页 - Flask Web应用{% endblock %}
{% block content %}
<div class="hero">
<h1>欢迎来到Flask Web开发</h1>
<p>这是一个完整的Flask应用示例</p>
<div class="features">
<div class="feature">
<h3>路由系统</h3>
<p>灵活的URL路由配置</p>
</div>
<div class="feature">
<h3>模板引擎</h3>
<p>强大的Jinja2模板系统</p>
</div>
<div class="feature">
<h3>数据库集成</h3>
<p>SQLAlchemy ORM支持</p>
</div>
</div>
<form class="demo-form" method="POST" action="{{ url_for('submit_form') }}">
<h3>用户信息表单</h3>
<input type="text" name="username" placeholder="用户名" required>
<input type="email" name="email" placeholder="邮箱" required>
<textarea name="message" placeholder="留言" rows="4"></textarea>
<button type="submit">提交</button>
</form>
</div>
{% endblock %}
Flask数据库集成¶
安装Flask-SQLAlchemy:
pip install Flask-SQLAlchemy
创建数据库模型:
# models.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///blog.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = 'your-secret-key-here'
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
posts = db.relationship('Post', backref='author', lazy=True)
def __repr__(self):
return f'<User {self.username}>'
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
content = db.Column(db.Text, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
def __repr__(self):
return f'<Post {self.title}>'
# 路由和视图函数
@app.route('/')
def index():
posts = Post.query.order_by(Post.created_at.desc()).all()
return render_template('index.html', posts=posts)
@app.route('/user/<username>')
def user_posts(username):
user = User.query.filter_by(username=username).first_or_404()
posts = Post.query.filter_by(author=user).order_by(Post.created_at.desc()).all()
return render_template('user_posts.html', user=user, posts=posts)
@app.route('/create_post', methods=['GET', 'POST'])
def create_post():
if request.method == 'POST':
title = request.form['title']
content = request.form['content']
username = request.form['username']
# 查找或创建用户
user = User.query.filter_by(username=username).first()
if not user:
user = User(username=username, email=f"{username}@example.com")
db.session.add(user)
db.session.commit()
# 创建文章
post = Post(title=title, content=content, author=user)
db.session.add(post)
db.session.commit()
return redirect(url_for('index'))
return render_template('create_post.html')
if __name__ == '__main__':
with app.app_context():
db.create_all()
app.run(debug=True)
运行应用并创建数据库:
python models.py
输出类似如下:
* Running on http://127.0.0.1:5000
* Debug mode: on
* Restarting with stat
* Debugger is active!
* Debugger PIN: 234-567-890
13.3 Django全栈框架¶
Django安装和项目创建¶
安装Django:
pip install Django
创建Django项目:
django-admin startproject myblog
cd myblog
python manage.py startapp blog
输出类似如下:
C:\Users\Python\myblog>django-admin startproject myblog
C:\Users\Python\myblog>cd myblog
C:\Users\Python\myblog\myblog>python manage.py startapp blog
项目结构如下:
myblog/
├── manage.py
├── myblog/
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
└── blog/
├── __init__.py
├── admin.py
├── apps.py
├── models.py
├── tests.py
└── views.py
Django模型定义¶
在blog/models.py中定义数据模型:
# blog/models.py
from django.db import models
from django.contrib.auth.models import User
from django.urls import reverse
class Category(models.Model):
name = models.CharField(max_length=100, unique=True)
description = models.TextField(blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
verbose_name_plural = "Categories"
def __str__(self):
return self.name
class Post(models.Model):
STATUS_CHOICES = [
('draft', '草稿'),
('published', '已发布'),
]
title = models.CharField(max_length=200)
slug = models.SlugField(max_length=200, unique=True)
content = models.TextField()
author = models.ForeignKey(User, on_delete=models.CASCADE, related_name='posts')
category = models.ForeignKey(Category, on_delete=models.SET_NULL, null=True, blank=True)
status = models.CharField(max_length=10, choices=STATUS_CHOICES, default='draft')
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['-created_at']
def __str__(self):
return self.title
def get_absolute_url(self):
return reverse('blog:post_detail', kwargs={'slug': self.slug})
class Comment(models.Model):
post = models.ForeignKey(Post, on_delete=models.CASCADE, related_name='comments')
author_name = models.CharField(max_length=100)
author_email = models.EmailField()
content = models.TextField()
created_at = models.DateTimeField(auto_now_add=True)
is_approved = models.BooleanField(default=False)
class Meta:
ordering = ['created_at']
def __str__(self):
return f'Comment by {self.author_name} on {self.post.title}'
Django视图和URL配置¶
创建视图函数:
# blog/views.py
from django.shortcuts import render, get_object_or_404, redirect
from django.http import JsonResponse
from django.core.paginator import Paginator
from django.contrib import messages
from .models import Post, Category, Comment
from .forms import CommentForm
def post_list(request):
posts = Post.objects.filter(status='published')
category_id = request.GET.get('category')
if category_id:
posts = posts.filter(category_id=category_id)
paginator = Paginator(posts, 5) # 每页显示5篇文章
page_number = request.GET.get('page')
page_obj = paginator.get_page(page_number)
categories = Category.objects.all()
context = {
'page_obj': page_obj,
'categories': categories,
'current_category': category_id
}
return render(request, 'blog/post_list.html', context)
def post_detail(request, slug):
post = get_object_or_404(Post, slug=slug, status='published')
comments = post.comments.filter(is_approved=True)
if request.method == 'POST':
form = CommentForm(request.POST)
if form.is_valid():
comment = form.save(commit=False)
comment.post = post
comment.save()
messages.success(request, '评论提交成功,等待审核!')
return redirect('blog:post_detail', slug=slug)
else:
form = CommentForm()
context = {
'post': post,
'comments': comments,
'form': form
}
return render(request, 'blog/post_detail.html', context)
def category_posts(request, category_id):
category = get_object_or_404(Category, id=category_id)
posts = Post.objects.filter(category=category, status='published')
paginator = Paginator(posts, 5)
page_number = request.GET.get('page')
page_obj = paginator.get_page(page_number)
context = {
'category': category,
'page_obj': page_obj
}
return render(request, 'blog/category_posts.html', context)
配置URL路由:
# blog/urls.py
from django.urls import path
from . import views
app_name = 'blog'
urlpatterns = [
path('', views.post_list, name='post_list'),
path('post/<slug:slug>/', views.post_detail, name='post_detail'),
path('category/<int:category_id>/', views.category_posts, name='category_posts'),
]
# myblog/urls.py
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path('', include('blog.urls')),
]
数据库迁移¶
执行数据库迁移:
python manage.py makemigrations
python manage.py migrate
输出类似如下:
Migrations for 'blog':
blog\migrations\0001_initial.py
- Create model Category
- Create model Post
- Create model Comment
Operations to perform:
Apply all migrations: admin, auth, blog, contenttypes, sessions
Running migrations:
Applying contenttypes.0001_initial... OK
Applying auth.0001_initial... OK
Applying admin.0001_initial... OK
Applying admin.0002_logentry_remove_auto_add... OK
Applying admin.0003_logentry_add_action_flag_choices... OK
Applying contenttypes.0002_remove_content_type_name... OK
Applying auth.0002_alter_permission_name_max_length... OK
Applying auth.0003_alter_user_email_max_length... OK
Applying auth.0004_alter_user_username_opts... OK
Applying auth.0005_alter_user_last_login_null... OK
Applying auth.0006_require_contenttypes_0002... OK
Applying auth.0007_alter_validators_add_error_messages... OK
Applying auth.0008_alter_user_username_max_length... OK
Applying auth.0009_alter_user_last_name_max_length... OK
Applying auth.0010_alter_group_name_max_length... OK
Applying auth.0011_update_proxy_permissions... OK
Applying auth.0012_alter_user_first_name_max_length... OK
Applying blog.0001_initial... OK
Applying sessions.0001_initial... OK
创建超级用户:
python manage.py createsuperuser
输出类似如下:
Username (leave blank to use 'admin'): admin
Email address: admin@example.com
Password:
Password (again):
Superuser created successfully.
13.4 FastAPI现代框架¶
FastAPI安装和基础应用¶
安装FastAPI和相关依赖:
pip install fastapi uvicorn python-multipart
创建第一个FastAPI应用:
# main.py
from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, EmailStr
from typing import List, Optional
from datetime import datetime
import uvicorn
app = FastAPI(
title="Python Web开发API",
description="这是一个完整的FastAPI应用示例",
version="1.0.0"
)
security = HTTPBearer()
# 数据模型
class UserBase(BaseModel):
username: str
email: EmailStr
full_name: Optional[str] = None
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
created_at: datetime
class Config:
from_attributes = True
class PostBase(BaseModel):
title: str
content: str
category: Optional[str] = None
class PostCreate(PostBase):
pass
class Post(PostBase):
id: int
author_id: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# 模拟数据库
fake_users_db = []
fake_posts_db = []
user_id_counter = 1
post_id_counter = 1
# 依赖注入
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
# 这里应该验证JWT token,简化处理
token = credentials.credentials
if token != "valid-token":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return {"user_id": 1, "username": "testuser"}
# API路由
@app.get("/")
async def root():
return {
"message": "欢迎使用FastAPI",
"version": "1.0.0",
"docs": "/docs",
"redoc": "/redoc"
}
@app.post("/users/", response_model=User, status_code=status.HTTP_201_CREATED)
async def create_user(user: UserCreate):
global user_id_counter
# 检查用户名是否已存在
for existing_user in fake_users_db:
if existing_user["username"] == user.username:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="用户名已存在"
)
new_user = {
"id": user_id_counter,
"username": user.username,
"email": user.email,
"full_name": user.full_name,
"is_active": True,
"created_at": datetime.now()
}
fake_users_db.append(new_user)
user_id_counter += 1
return new_user
@app.get("/users/", response_model=List[User])
async def get_users(skip: int = 0, limit: int = 10):
return fake_users_db[skip : skip + limit]
@app.get("/users/{user_id}", response_model=User)
async def get_user(user_id: int):
for user in fake_users_db:
if user["id"] == user_id:
return user
raise HTTPException(status_code=404, detail="用户不存在")
@app.post("/posts/", response_model=Post, status_code=status.HTTP_201_CREATED)
async def create_post(post: PostCreate, current_user: dict = Depends(get_current_user)):
global post_id_counter
new_post = {
"id": post_id_counter,
"title": post.title,
"content": post.content,
"category": post.category,
"author_id": current_user["user_id"],
"created_at": datetime.now(),
"updated_at": datetime.now()
}
fake_posts_db.append(new_post)
post_id_counter += 1
return new_post
@app.get("/posts/", response_model=List[Post])
async def get_posts(skip: int = 0, limit: int = 10, category: Optional[str] = None):
posts = fake_posts_db
if category:
posts = [post for post in posts if post.get("category") == category]
return posts[skip : skip + limit]
@app.get("/posts/{post_id}", response_model=Post)
async def get_post(post_id: int):
for post in fake_posts_db:
if post["id"] == post_id:
return post
raise HTTPException(status_code=404, detail="文章不存在")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
运行FastAPI应用:
uvicorn main:app --reload
输出类似如下:
INFO: Will watch for changes in these directories: ['C:\\Users\\Python\\fastapi_demo']
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO: Started reloader process [12345] using StatReload
INFO: Started server process [12346]
INFO: Waiting for application startup.
INFO: Application startup complete.
访问 http://127.0.0.1:8000/docs 可以看到自动生成的API文档。
FastAPI数据库集成¶
安装SQLAlchemy和数据库驱动:
pip install sqlalchemy databases[sqlite] alembic
创建数据库配置:
# database.py
from sqlalchemy import create_engine, MetaData
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from databases import Database
DATABASE_URL = "sqlite:///./fastapi_blog.db"
# SQLAlchemy
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Databases query builder
database = Database(DATABASE_URL)
metadata = MetaData()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
创建数据库模型:
# models.py
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from datetime import datetime
from database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True)
email = Column(String(100), unique=True, index=True)
full_name = Column(String(100))
hashed_password = Column(String(100))
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
posts = relationship("Post", back_populates="author")
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), index=True)
content = Column(Text)
category = Column(String(50))
author_id = Column(Integer, ForeignKey("users.id"))
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
author = relationship("User", back_populates="posts")
13.5 数据库操作¶
SQLite基础操作¶
创建一个完整的数据库操作示例:
# sqlite_demo.py
import sqlite3
from datetime import datetime
import json
class BlogDatabase:
def __init__(self, db_name="blog.db"):
self.db_name = db_name
self.init_database()
def init_database(self):
"""初始化数据库表"""
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()
# 创建用户表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建文章表
cursor.execute('''
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT NOT NULL,
author_id INTEGER,
category TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (author_id) REFERENCES users (id)
)
''')
conn.commit()
conn.close()
print("数据库初始化完成")
def create_user(self, username, email, password_hash):
"""创建用户"""
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()
try:
cursor.execute(
"INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
(username, email, password_hash)
)
conn.commit()
user_id = cursor.lastrowid
print(f"用户创建成功,ID: {user_id}")
return user_id
except sqlite3.IntegrityError as e:
print(f"用户创建失败: {e}")
return None
finally:
conn.close()
def create_post(self, title, content, author_id, category=None):
"""创建文章"""
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()
cursor.execute(
"INSERT INTO posts (title, content, author_id, category) VALUES (?, ?, ?, ?)",
(title, content, author_id, category)
)
conn.commit()
post_id = cursor.lastrowid
conn.close()
print(f"文章创建成功,ID: {post_id}")
return post_id
def get_posts_with_authors(self):
"""获取文章列表(包含作者信息)"""
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()
cursor.execute('''
SELECT p.id, p.title, p.content, p.category, p.created_at,
u.username, u.email
FROM posts p
JOIN users u ON p.author_id = u.id
ORDER BY p.created_at DESC
''')
posts = cursor.fetchall()
conn.close()
return posts
def search_posts(self, keyword):
"""搜索文章"""
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM posts WHERE title LIKE ? OR content LIKE ?",
(f"%{keyword}%", f"%{keyword}%")
)
results = cursor.fetchall()
conn.close()
return results
# 使用示例
if __name__ == "__main__":
db = BlogDatabase()
# 创建用户
user1_id = db.create_user("张三", "zhangsan@example.com", "hashed_password_123")
user2_id = db.create_user("李四", "lisi@example.com", "hashed_password_456")
# 创建文章
if user1_id:
db.create_post("Python Web开发入门", "这是一篇关于Python Web开发的文章...", user1_id, "技术")
db.create_post("Flask框架详解", "Flask是一个轻量级的Web框架...", user1_id, "技术")
if user2_id:
db.create_post("Django实战项目", "使用Django开发一个完整的博客系统...", user2_id, "项目")
# 查询文章
posts = db.get_posts_with_authors()
print("\n所有文章:")
for post in posts:
print(f"ID: {post[0]}, 标题: {post[1]}, 作者: {post[5]}, 分类: {post[3]}")
# 搜索文章
search_results = db.search_posts("Flask")
print("\n搜索结果:")
for result in search_results:
print(f"找到文章: {result[1]}")
运行数据库操作示例:
python sqlite_demo.py
输出类似如下:
数据库初始化完成
用户创建成功,ID: 1
用户创建成功,ID: 2
文章创建成功,ID: 1
文章创建成功,ID: 2
文章创建成功,ID: 3
所有文章:
ID: 3, 标题: Django实战项目, 作者: 李四, 分类: 项目
ID: 2, 标题: Flask框架详解, 作者: 张三, 分类: 技术
ID: 1, 标题: Python Web开发入门, 作者: 张三, 分类: 技术
搜索结果:
找到文章: Flask框架详解
SQLAlchemy ORM使用¶
创建一个完整的ORM示例:
# orm_demo.py
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime
# 数据库配置
engine = create_engine('sqlite:///orm_blog.db', echo=True)
Base = declarative_base()
Session = sessionmaker(bind=engine)
# 模型定义
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
posts = relationship("Post", back_populates="author")
def __repr__(self):
return f"<User(username='{self.username}', email='{self.email}')>"
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
content = Column(Text, nullable=False)
author_id = Column(Integer, ForeignKey('users.id'))
created_at = Column(DateTime, default=datetime.utcnow)
author = relationship("User", back_populates="posts")
def __repr__(self):
return f"<Post(title='{self.title}', author='{self.author.username if self.author else None}')>"
# 创建表
Base.metadata.create_all(engine)
# 数据操作函数
def create_sample_data():
session = Session()
# 创建用户
user1 = User(username="python_dev", email="dev@python.com")
user2 = User(username="web_master", email="master@web.com")
session.add_all([user1, user2])
session.commit()
# 创建文章
post1 = Post(
title="SQLAlchemy ORM教程",
content="这是一篇详细的SQLAlchemy ORM使用教程...",
author=user1
)
post2 = Post(
title="Web开发最佳实践",
content="分享一些Web开发的最佳实践和经验...",
author=user1
)
post3 = Post(
title="数据库设计原则",
content="介绍关系型数据库设计的基本原则...",
author=user2
)
session.add_all([post1, post2, post3])
session.commit()
print("示例数据创建完成")
session.close()
def query_data():
session = Session()
# 查询所有用户
print("\n所有用户:")
users = session.query(User).all()
for user in users:
print(f" {user}")
# 查询特定用户的文章
print("\npython_dev的文章:")
user = session.query(User).filter_by(username="python_dev").first()
if user:
for post in user.posts:
print(f" {post.title} - {post.created_at}")
# 联合查询
print("\n所有文章(包含作者信息):")
posts = session.query(Post).join(User).all()
for post in posts:
print(f" 《{post.title}》 by {post.author.username}")
# 条件查询
print("\n标题包含'教程'的文章:")
tutorial_posts = session.query(Post).filter(Post.title.contains('教程')).all()
for post in tutorial_posts:
print(f" {post.title}")
session.close()
if __name__ == "__main__":
create_sample_data()
query_data()
运行ORM示例:
python orm_demo.py
输出类似如下:
2024-01-15 14:30:00,123 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-01-15 14:30:00,124 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("users")
2024-01-15 14:30:00,124 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-01-15 14:30:00,125 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("users")
2024-01-15 14:30:00,125 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-01-15 14:30:00,126 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("posts")
2024-01-15 14:30:00,126 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-01-15 14:30:00,127 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("posts")
2024-01-15 14:30:00,127 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-01-15 14:30:00,128 INFO sqlalchemy.engine.Engine
CREATE TABLE users (
id INTEGER NOT NULL,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
created_at DATETIME,
PRIMARY KEY (id),
UNIQUE (username),
UNIQUE (email)
)
2024-01-15 14:30:00,128 INFO sqlalchemy.engine.Engine [no key 0.00000s] ()
2024-01-15 14:30:00,129 INFO sqlalchemy.engine.Engine
CREATE TABLE posts (
id INTEGER NOT NULL,
title VARCHAR(200) NOT NULL,
content TEXT NOT NULL,
author_id INTEGER,
created_at DATETIME,
PRIMARY KEY (id),
FOREIGN KEY(author_id) REFERENCES users (id)
)
2024-01-15 14:30:00,129 INFO sqlalchemy.engine.Engine [no key 0.00000s] ()
2024-01-15 14:30:00,130 INFO sqlalchemy.engine.Engine COMMIT
示例数据创建完成
所有用户:
<User(username='python_dev', email='dev@python.com')>
<User(username='web_master', email='master@web.com')>
python_dev的文章:
SQLAlchemy ORM教程 - 2024-01-15 14:30:00.135000
Web开发最佳实践 - 2024-01-15 14:30:00.136000
所有文章(包含作者信息):
《SQLAlchemy ORM教程》 by python_dev
《Web开发最佳实践》 by python_dev
《数据库设计原则》 by web_master
标题包含'教程'的文章:
SQLAlchemy ORM教程
13.6 前端集成¶
AJAX和API调用¶
创建一个完整的前后端交互示例:
<!-- templates/ajax_demo.html -->
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AJAX API调用示例</title>
<style>
body {
font-family: 'Microsoft YaHei', sans-serif;
max-width: 800px;
margin: 0 auto;
padding: 20px;
background-color: #f5f5f5;
}
.container {
background: white;
padding: 30px;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
.form-group {
margin-bottom: 20px;
}
label {
display: block;
margin-bottom: 5px;
font-weight: bold;
}
input, textarea {
width: 100%;
padding: 10px;
border: 1px solid #ddd;
border-radius: 5px;
font-size: 14px;
}
button {
background: #007bff;
color: white;
padding: 10px 20px;
border: none;
border-radius: 5px;
cursor: pointer;
margin-right: 10px;
}
button:hover {
background: #0056b3;
}
.posts {
margin-top: 30px;
}
.post {
background: #f8f9fa;
padding: 15px;
margin-bottom: 15px;
border-radius: 5px;
border-left: 4px solid #007bff;
}
.loading {
color: #666;
font-style: italic;
}
.error {
color: #dc3545;
background: #f8d7da;
padding: 10px;
border-radius: 5px;
margin: 10px 0;
}
.success {
color: #155724;
background: #d4edda;
padding: 10px;
border-radius: 5px;
margin: 10px 0;
}
</style>
</head>
<body>
<div class="container">
<h1>Python Web API 交互示例</h1>
<!-- 创建文章表单 -->
<form id="postForm">
<div class="form-group">
<label for="title">文章标题:</label>
<input type="text" id="title" name="title" required>
</div>
<div class="form-group">
<label for="content">文章内容:</label>
<textarea id="content" name="content" rows="5" required></textarea>
</div>
<div class="form-group">
<label for="category">分类:</label>
<input type="text" id="category" name="category">
</div>
<button type="submit">发布文章</button>
<button type="button" onclick="loadPosts()">刷新文章列表</button>
</form>
<div id="message"></div>
<!-- 文章列表 -->
<div class="posts">
<h2>文章列表</h2>
<div id="postsList">
<div class="loading">加载中...</div>
</div>
</div>
</div>
<script>
// API基础URL
const API_BASE = 'http://localhost:8000';
// 显示消息
function showMessage(message, type = 'success') {
const messageDiv = document.getElementById('message');
messageDiv.innerHTML = `<div class="${type}">${message}</div>`;
setTimeout(() => {
messageDiv.innerHTML = '';
}, 3000);
}
// 发送POST请求创建文章
document.getElementById('postForm').addEventListener('submit', async function(e) {
e.preventDefault();
const formData = new FormData(this);
const postData = {
title: formData.get('title'),
content: formData.get('content'),
category: formData.get('category') || null
};
try {
const response = await fetch(`${API_BASE}/posts/`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer valid-token'
},
body: JSON.stringify(postData)
});
if (response.ok) {
const result = await response.json();
showMessage('文章发布成功!', 'success');
this.reset();
loadPosts(); // 刷新文章列表
} else {
const error = await response.json();
showMessage(`发布失败: ${error.detail}`, 'error');
}
} catch (error) {
showMessage(`网络错误: ${error.message}`, 'error');
}
});
// 加载文章列表
async function loadPosts() {
const postsDiv = document.getElementById('postsList');
postsDiv.innerHTML = '<div class="loading">加载中...</div>';
try {
const response = await fetch(`${API_BASE}/posts/`);
if (response.ok) {
const posts = await response.json();
if (posts.length === 0) {
postsDiv.innerHTML = '<p>暂无文章</p>';
return;
}
let html = '';
posts.forEach(post => {
html += `
<div class="post">
<h3>${post.title}</h3>
<p>${post.content}</p>
<small>分类: ${post.category || '未分类'} | 创建时间: ${new Date(post.created_at).toLocaleString()}</small>
</div>
`;
});
postsDiv.innerHTML = html;
} else {
postsDiv.innerHTML = '<div class="error">加载文章失败</div>';
}
} catch (error) {
postsDiv.innerHTML = `<div class="error">网络错误: ${error.message}</div>`;
}
}
// 页面加载时获取文章列表
document.addEventListener('DOMContentLoaded', loadPosts);
// 使用Fetch API的高级示例
async function advancedApiCall() {
try {
// 并发请求多个API
const [usersResponse, postsResponse] = await Promise.all([
fetch(`${API_BASE}/users/`),
fetch(`${API_BASE}/posts/`)
]);
const users = await usersResponse.json();
const posts = await postsResponse.json();
console.log('用户数据:', users);
console.log('文章数据:', posts);
// 处理数据...
} catch (error) {
console.error('API调用失败:', error);
}
}
// 带有重试机制的API调用
async function apiCallWithRetry(url, options = {}, maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
const response = await fetch(url, options);
if (response.ok) {
return await response.json();
}
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
} catch (error) {
if (i === maxRetries - 1) throw error;
console.log(`请求失败,${1000 * (i + 1)}ms后重试...`);
await new Promise(resolve => setTimeout(resolve, 1000 * (i + 1)));
}
}
}
</script>
</body>
</html>
13.7 Web服务和API设计¶
RESTful API设计实践¶
创建一个完整的RESTful API示例:
```python
restful_api.py¶
from flask import Flask, request, jsonify, abort
from flask_sqlalchemy import SQLAlchemy
from flask_marshmallow import Marshmallow
from marshmallow import ValidationError
from datetime import datetime
import os
app = Flask(name)
app.config[‘SQLALCHEMY_DATABASE_URI’] = ‘sqlite:///restful_blog.db’
app.config[‘SQLALCHEMY_TRACK_MODIFICATIONS’] = False
db = SQLAlchemy(app)
ma = Marshmallow(app)
数据模型¶
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
posts = db.relationship(‘Post’, backref=’author’, lazy=True, cascade=’all, delete-orphan’)
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
content = db.Column(db.Text, nullable=False)
category = db.Column(db.String(50))
user_id = db.Column(db.Integer, db.ForeignKey(‘user.id’), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
序列化Schema¶
class UserSchema(ma.SQLAlchemyAutoSchema):
class Meta:
model = User
load_instance = True
include_fk = True
class PostSchema(ma.SQLAlchemyAutoSchema):
class Meta:
model = Post
load_instance = True
include_fk = True
author = ma.Nested(UserSchema, only=['id', 'username'])
初始化Schema¶
user_schema = UserSchema()
users_schema = UserSchema(many=True)
post_schema = PostSchema()
posts_schema = PostSchema(many=True)
错误处理¶
@app.errorhandler(404)
def not_found(error):
return jsonify({‘error’: ‘Resource not found’}), 404
@app.errorhandler(400)
def bad_request(error):
return jsonify({‘error’: ‘Bad request’}), 400
@app.