第13章 Web开发

前言

本章将带领读者深入学习Python Web开发的各个方面,从基础的Web开发概念到主流框架的实际应用。我们将学习Flask轻量级框架、Django全栈框架和FastAPI现代框架,掌握数据库操作、前端集成、API设计等核心技能。通过大量的实际代码示例和项目实践,读者将能够独立开发完整的Web应用程序。

13.1 Web开发基础

Web应用架构概述

Web应用程序采用客户端-服务器架构模式,客户端(通常是浏览器)通过HTTP协议向服务器发送请求,服务器处理请求后返回响应。现代Web应用通常分为三层架构:

  1. 表示层(Presentation Layer):负责用户界面和用户交互
  2. 业务逻辑层(Business Logic Layer):处理业务规则和数据处理
  3. 数据访问层(Data Access Layer):负责数据存储和检索

HTTP协议基础

HTTP(HyperText Transfer Protocol)是Web通信的基础协议。让我们通过一个简单的Python脚本来理解HTTP请求和响应:

# http_demo.py
import requests

# 发送GET请求
response = requests.get('https://httpbin.org/get')
print(f"状态码: {response.status_code}")
print(f"响应头: {response.headers}")
print(f"响应内容: {response.json()}")

# 发送POST请求
data = {'name': 'Python学习者', 'skill': 'Web开发'}
response = requests.post('https://httpbin.org/post', json=data)
print(f"POST响应: {response.json()}")

运行上述代码,输出类似如下:

状态码: 200
响应头: {'Date': 'Mon, 15 Jan 2024 10:30:00 GMT', 'Content-Type': 'application/json', 'Content-Length': '312'}
响应内容: {'args': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'python-requests/2.28.1'}, 'origin': '192.168.1.100', 'url': 'https://httpbin.org/get'}
POST响应: {'args': {}, 'data': '{"name": "Python学习者", "skill": "Web开发"}', 'files': {}, 'form': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '52', 'Content-Type': 'application/json', 'Host': 'httpbin.org', 'User-Agent': 'python-requests/2.28.1'}, 'json': {'name': 'Python学习者', 'skill': 'Web开发'}, 'origin': '192.168.1.100', 'url': 'https://httpbin.org/post'}

Python Web开发生态

Python拥有丰富的Web开发生态系统,主要框架对比如下:

框架 特点 适用场景 学习难度
Flask 轻量级、灵活 小型项目、API开发
Django 全栈、功能完整 大型项目、快速开发
FastAPI 现代、高性能 API开发、微服务
Tornado 异步、高并发 实时应用

13.2 Flask轻量级框架

Flask安装和环境配置

首先创建虚拟环境并安装Flask:

# 创建虚拟环境
python -m venv flask_env

# 激活虚拟环境(Windows)
flask_env\Scripts\activate

# 安装Flask
pip install Flask

安装成功后,输出类似如下:

Collecting Flask
  Downloading Flask-2.3.3-py3-none-any.whl (96 kB)
Collecting Werkzeug>=2.3.7
  Downloading Werkzeug-2.3.7-py3-none-any.whl (242 kB)
Collecting Jinja2>=3.1.2
  Downloading Jinja2-3.1.2-py3-none-any.whl (133 kB)
Collecting itsdangerous>=2.1.2
  Downloading itsdangerous-2.1.2-py3-none-any.whl (15 kB)
Collecting click>=8.1.3
  Downloading click-8.1.7-py3-none-any.whl (97 kB)
Installing collected packages: Werkzeug, Jinja2, itsdangerous, click, Flask
Successfully installed Flask-2.3.3 Jinja2-3.1.2 Werkzeug-2.3.7 click-8.1.7 itsdangerous-2.1.2

第一个Flask应用

创建一个简单的Flask应用:

# app.py
from flask import Flask, render_template, request, jsonify

app = Flask(__name__)

@app.route('/api/users', methods=['GET', 'POST'])
def users():
    if request.method == 'POST':
        try:
            user = user_schema.load(request.json)
            db.session.add(user)
            db.session.commit()
            return user_schema.jsonify(user), 201
        except ValidationError as err:
            return jsonify({'errors': err.messages}), 400

    # GET请求
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 10, type=int)

    users = User.query.paginate(
        page=page, per_page=per_page, error_out=False
    )

    return jsonify({
        'users': users_schema.dump(users.items),
        'pagination': {
            'page': users.page,
            'pages': users.pages,
            'per_page': users.per_page,
            'total': users.total
        }
    })

@app.route('/api/users/<int:user_id>', methods=['GET', 'PUT', 'DELETE'])
def user_detail(user_id):
    user = User.query.get_or_404(user_id)

    if request.method == 'GET':
        return user_schema.jsonify(user)

    elif request.method == 'PUT':
        try:
            user = user_schema.load(request.json, instance=user, partial=True)
            db.session.commit()
            return user_schema.jsonify(user)
        except ValidationError as err:
            return jsonify({'errors': err.messages}), 400

    elif request.method == 'DELETE':
        db.session.delete(user)
        db.session.commit()
        return '', 204

@app.route('/api/posts', methods=['GET', 'POST'])
def posts():
    if request.method == 'POST':
        try:
            post = post_schema.load(request.json)
            db.session.add(post)
            db.session.commit()
            return post_schema.jsonify(post), 201
        except ValidationError as err:
            return jsonify({'errors': err.messages}), 400

    # GET请求
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 10, type=int)
    category = request.args.get('category')

    query = Post.query
    if category:
        query = query.filter(Post.category == category)

    posts = query.paginate(
        page=page, per_page=per_page, error_out=False
    )

    return jsonify({
        'posts': posts_schema.dump(posts.items),
        'pagination': {
            'page': posts.page,
            'pages': posts.pages,
            'per_page': posts.per_page,
            'total': posts.total
        }
    })

@app.route('/api/posts/<int:post_id>', methods=['GET', 'PUT', 'DELETE'])
def post_detail(post_id):
    post = Post.query.get_or_404(post_id)

    if request.method == 'GET':
        return post_schema.jsonify(post)

    elif request.method == 'PUT':
        try:
            post = post_schema.load(request.json, instance=post, partial=True)
            post.updated_at = datetime.utcnow()
            db.session.commit()
            return post_schema.jsonify(post)
        except ValidationError as err:
            return jsonify({'errors': err.messages}), 400

    elif request.method == 'DELETE':
        db.session.delete(post)
        db.session.commit()
        return '', 204

if __name__ == '__main__':
    with app.app_context():
        db.create_all()
    app.run(debug=True)

运行RESTful API服务:

python restful_api.py

输出类似如下:

 * Running on http://127.0.0.1:5000
 * Debug mode: on
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 345-678-901

API测试示例

使用curl命令测试API:

# 创建用户
curl -X POST http://localhost:5000/api/users \
  -H "Content-Type: application/json" \
  -d '{"username": "testuser", "email": "test@example.com"}'

# 获取用户列表
curl http://localhost:5000/api/users

# 创建文章
curl -X POST http://localhost:5000/api/posts \
  -H "Content-Type: application/json" \
  -d '{"title": "测试文章", "content": "这是一篇测试文章", "user_id": 1, "category": "测试"}'

# 获取文章列表
curl http://localhost:5000/api/posts

13.8 部署和运维

生产环境部署

使用Gunicorn部署Flask应用

安装Gunicorn:

pip install gunicorn

创建WSGI入口文件:

# wsgi.py
from app import app

if __name__ == "__main__":
    app.run()

使用Gunicorn启动应用:

gunicorn --bind 0.0.0.0:8000 --workers 4 wsgi:app

输出类似如下:

[2024-01-15 15:30:00 +0000] [12345] [INFO] Starting gunicorn 20.1.0
[2024-01-15 15:30:00 +0000] [12345] [INFO] Listening at: http://0.0.0.0:8000 (12345)
[2024-01-15 15:30:00 +0000] [12345] [INFO] Using worker: sync
[2024-01-15 15:30:00 +0000] [12346] [INFO] Booting worker with pid: 12346
[2024-01-15 15:30:00 +0000] [12347] [INFO] Booting worker with pid: 12347
[2024-01-15 15:30:00 +0000] [12348] [INFO] Booting worker with pid: 12348
[2024-01-15 15:30:00 +0000] [12349] [INFO] Booting worker with pid: 12349

Nginx反向代理配置

创建Nginx配置文件:

# /etc/nginx/sites-available/flask_app
server {
    listen 80;
    server_name your-domain.com;

    location / {
        proxy_pass http://127.0.0.1:8000;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }

    location /static {
        alias /path/to/your/app/static;
        expires 30d;
        add_header Cache-Control "public, immutable";
    }

    # 启用gzip压缩
    gzip on;
    gzip_vary on;
    gzip_min_length 1024;
    gzip_types text/plain text/css text/xml text/javascript application/javascript application/xml+rss application/json;
}

Docker容器化部署

创建Dockerfile:

# Dockerfile
FROM python:3.9-slim

# 设置工作目录
WORKDIR /app

# 复制依赖文件
COPY requirements.txt .

# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 暴露端口
EXPOSE 8000

# 创建非root用户
RUN useradd --create-home --shell /bin/bash app \
    && chown -R app:app /app
USER app

# 启动命令
CMD ["gunicorn", "--bind", "0.0.0.0:8000", "--workers", "4", "wsgi:app"]

创建docker-compose.yml:

# docker-compose.yml
version: '3.8'

services:
  web:
    build: .
    ports:
      - "8000:8000"
    environment:
      - FLASK_ENV=production
      - DATABASE_URL=postgresql://user:password@db:5432/blogdb
    depends_on:
      - db
    volumes:
      - ./logs:/app/logs

  db:
    image: postgres:13
    environment:
      - POSTGRES_DB=blogdb
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    depends_on:
      - web

volumes:
  postgres_data:

构建和运行容器:

# 构建镜像
docker-compose build

# 启动服务
docker-compose up -d

# 查看日志
docker-compose logs -f web

输出类似如下:

Building web
Step 1/8 : FROM python:3.9-slim
 ---> 1234567890ab
Step 2/8 : WORKDIR /app
 ---> Running in abcdef123456
Removing intermediate container abcdef123456
 ---> 234567890abc
...
Successfully built 345678901bcd
Successfully tagged flask_app_web:latest
Creating flask_app_db_1 ... done
Creating flask_app_web_1 ... done
Creating flask_app_nginx_1 ... done

13.9 性能优化

应用性能优化

数据库查询优化

# 优化前:N+1查询问题
def get_posts_with_authors_slow():
    posts = Post.query.all()
    result = []
    for post in posts:
        result.append({
            'title': post.title,
            'author': post.author.username  # 每次都查询数据库
        })
    return result

# 优化后:使用join预加载
def get_posts_with_authors_fast():
    posts = Post.query.options(db.joinedload(Post.author)).all()
    result = []
    for post in posts:
        result.append({
            'title': post.title,
            'author': post.author.username  # 已预加载,无需额外查询
        })
    return result

# 使用原生SQL进行复杂查询
def get_post_statistics():
    result = db.session.execute("""
        SELECT 
            u.username,
            COUNT(p.id) as post_count,
            AVG(LENGTH(p.content)) as avg_content_length
        FROM users u
        LEFT JOIN posts p ON u.id = p.user_id
        GROUP BY u.id, u.username
        ORDER BY post_count DESC
    """)

    return [{
        'username': row.username,
        'post_count': row.post_count,
        'avg_content_length': row.avg_content_length
    } for row in result]

缓存策略实现

# cache.py
from flask import Flask
from flask_caching import Cache
import redis
import json
from functools import wraps

app = Flask(__name__)

# 配置缓存
app.config['CACHE_TYPE'] = 'redis'
app.config['CACHE_REDIS_URL'] = 'redis://localhost:6379/0'
cache = Cache(app)

# Redis连接
redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# 自定义缓存装饰器
def cache_result(timeout=300, key_prefix=''):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            # 生成缓存键
            cache_key = f"{key_prefix}:{f.__name__}:{hash(str(args) + str(kwargs))}"

            # 尝试从缓存获取
            cached_result = redis_client.get(cache_key)
            if cached_result:
                return json.loads(cached_result)

            # 执行函数并缓存结果
            result = f(*args, **kwargs)
            redis_client.setex(cache_key, timeout, json.dumps(result, default=str))

            return result
        return decorated_function
    return decorator

# 使用缓存的API端点
@app.route('/api/posts/popular')
@cache_result(timeout=600, key_prefix='popular_posts')
def get_popular_posts():
    # 模拟复杂的查询
    posts = db.session.execute("""
        SELECT p.*, u.username, COUNT(c.id) as comment_count
        FROM posts p
        JOIN users u ON p.user_id = u.id
        LEFT JOIN comments c ON p.id = c.post_id
        GROUP BY p.id
        ORDER BY comment_count DESC, p.created_at DESC
        LIMIT 10
    """).fetchall()

    return [{
        'id': post.id,
        'title': post.title,
        'author': post.username,
        'comment_count': post.comment_count
    } for post in posts]

# 缓存失效策略
@app.route('/api/posts', methods=['POST'])
def create_post():
    # 创建文章逻辑...
    post = create_new_post(request.json)

    # 清除相关缓存
    cache.delete_memoized(get_popular_posts)
    redis_client.delete('popular_posts:*')

    return post_schema.jsonify(post), 201

异步处理和后台任务

# tasks.py
from celery import Celery
from flask import Flask
from flask_mail import Mail, Message
import time

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

# 配置邮件
app.config['MAIL_SERVER'] = 'smtp.gmail.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USERNAME'] = 'your-email@gmail.com'
app.config['MAIL_PASSWORD'] = 'your-password'

mail = Mail(app)

# 初始化Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task
def send_email_async(subject, recipients, body):
    """异步发送邮件"""
    with app.app_context():
        msg = Message(
            subject=subject,
            recipients=recipients,
            body=body
        )
        mail.send(msg)
    return f"邮件已发送给: {', '.join(recipients)}"

@celery.task
def generate_report_async(user_id):
    """异步生成用户报告"""
    # 模拟耗时操作
    time.sleep(10)

    # 生成报告逻辑
    user = User.query.get(user_id)
    posts = Post.query.filter_by(user_id=user_id).all()

    report = {
        'user': user.username,
        'total_posts': len(posts),
        'total_words': sum(len(post.content.split()) for post in posts),
        'generated_at': datetime.utcnow().isoformat()
    }

    # 发送报告邮件
    send_email_async.delay(
        subject=f"用户报告 - {user.username}",
        recipients=[user.email],
        body=f"您的报告已生成:\n{json.dumps(report, indent=2, ensure_ascii=False)}"
    )

    return report

# API端点
@app.route('/api/users/<int:user_id>/report', methods=['POST'])
def request_user_report(user_id):
    # 启动异步任务
    task = generate_report_async.delay(user_id)

    return jsonify({
        'message': '报告生成任务已启动',
        'task_id': task.id,
        'status_url': f'/api/tasks/{task.id}'
    }), 202

@app.route('/api/tasks/<task_id>')
def get_task_status(task_id):
    task = generate_report_async.AsyncResult(task_id)

    if task.state == 'PENDING':
        response = {
            'state': task.state,
            'status': '任务等待中...'
        }
    elif task.state == 'PROGRESS':
        response = {
            'state': task.state,
            'status': task.info.get('status', ''),
            'progress': task.info.get('progress', 0)
        }
    elif task.state == 'SUCCESS':
        response = {
            'state': task.state,
            'result': task.result
        }
    else:
        response = {
            'state': task.state,
            'error': str(task.info)
        }

    return jsonify(response)

启动Celery worker:

celery -A tasks.celery worker --loglevel=info

输出类似如下:

 -------------- celery@DESKTOP-ABC123 v5.2.0 (dawn-chorus)
--- ***** ----- 
-- ******* ---- Windows-10-10.0.19041-SP0 2024-01-15 16:00:00
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x1234567890ab
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . tasks.generate_report_async
  . tasks.send_email_async

[2024-01-15 16:00:00,123: INFO/MainProcess] Connected to redis://localhost:6379/0
[2024-01-15 16:00:00,124: INFO/MainProcess] mingle: searching for available workers
[2024-01-15 16:00:00,125: INFO/MainProcess] mingle: all alone
[2024-01-15 16:00:00,126: INFO/MainProcess] celery@DESKTOP-ABC123 ready.

监控和日志

应用监控配置

# monitoring.py
from flask import Flask, request, g
from prometheus_flask_exporter import PrometheusMetrics
import logging
import time
import psutil
import os

app = Flask(__name__)

# 配置Prometheus监控
metrics = PrometheusMetrics(app)
metrics.info('flask_app_info', 'Application info', version='1.0.0')

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(levelname)s %(name)s %(message)s',
    handlers=[
        logging.FileHandler('app.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# 请求日志中间件
@app.before_request
def before_request():
    g.start_time = time.time()
    logger.info(f"Request started: {request.method} {request.url}")

@app.after_request
def after_request(response):
    duration = time.time() - g.start_time
    logger.info(
        f"Request completed: {request.method} {request.url} "
        f"Status: {response.status_code} Duration: {duration:.3f}s"
    )
    return response

# 系统监控端点
@app.route('/health')
def health_check():
    return jsonify({
        'status': 'healthy',
        'timestamp': datetime.utcnow().isoformat(),
        'version': '1.0.0'
    })

@app.route('/metrics/system')
def system_metrics():
    return jsonify({
        'cpu_percent': psutil.cpu_percent(),
        'memory_percent': psutil.virtual_memory().percent,
        'disk_percent': psutil.disk_usage('/').percent,
        'process_count': len(psutil.pids()),
        'load_average': os.getloadavg() if hasattr(os, 'getloadavg') else None
    })

# 自定义监控装饰器
def monitor_performance(operation_name):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            start_time = time.time()
            try:
                result = f(*args, **kwargs)
                duration = time.time() - start_time
                logger.info(f"Operation {operation_name} completed in {duration:.3f}s")
                return result
            except Exception as e:
                duration = time.time() - start_time
                logger.error(f"Operation {operation_name} failed after {duration:.3f}s: {str(e)}")
                raise
        return decorated_function
    return decorator

# 使用监控装饰器
@app.route('/api/posts')
@monitor_performance('get_posts')
def get_posts():
    posts = Post.query.all()
    return posts_schema.jsonify(posts)

总结

本章全面介绍了Python Web开发的各个方面,从基础概念到实际应用。我们学习了:

  1. Web开发基础:理解了Web应用架构、HTTP协议和Python Web生态系统
  2. Flask框架:掌握了轻量级框架的使用,包括路由、模板、数据库集成
  3. Django框架:学习了全栈框架的MVC架构和完整的开发流程
  4. FastAPI框架:了解了现代API框架的特性和异步编程
  5. 数据库操作:掌握了SQLite基础操作和SQLAlchemy ORM的使用
  6. 前端集成:学习了AJAX技术和前后端数据交互
  7. API设计:理解了RESTful API的设计原则和最佳实践
  8. 部署运维:掌握了生产环境部署和容器化技术
  9. 性能优化:学习了缓存策略、异步处理和监控技术

通过本章的学习,读者应该能够:
- 选择合适的Web框架进行项目开发
- 设计和实现完整的Web应用程序
- 处理数据库操作和前后端交互
- 部署应用到生产环境
- 优化应用性能和监控系统状态

Web开发是一个不断发展的领域,建议读者继续关注新技术和最佳实践,在实际项目中不断提升技能。route(‘/’)
def home():
return ‘

欢迎来到Python Web开发世界!

@app.route(‘/user/‘)
def user_profile(name):
return f’

用户:{name}

欢迎访问您的个人主页!

@app.route(‘/api/data’, methods=[‘GET’, ‘POST’])
def api_data():
if request.method == ‘POST’:
data = request.get_json()
return jsonify({
‘message’: ‘数据接收成功’,
‘received_data’: data,
‘status’: ‘success’
})
else:
return jsonify({
‘message’: ‘Hello from Flask API’,
‘version’: ‘1.0’,
‘endpoints’: [‘/api/data’, ‘/user/‘]
})

if name == ‘main’:
app.run(debug=True, host=‘0.0.0.0’, port=5000)

运行Flask应用:
```shell
python app.py

输出类似如下:

 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5000
 * Running on http://192.168.1.100:5000
 * Debug mode: on
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 123-456-789

Flask模板系统

Flask使用Jinja2模板引擎。首先创建模板目录结构:

flask_project/
├── app.py
├── templates/
│   ├── base.html
│   ├── index.html
│   └── user.html
└── static/
    ├── css/
    │   └── style.css
    └── js/
        └── main.js

创建基础模板:

<!-- templates/base.html -->
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>{% block title %}Flask Web应用{% endblock %}</title>
    <link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
</head>
<body>
    <nav class="navbar">
        <div class="nav-container">
            <h1><a href="{{ url_for('home') }}">Flask Demo</a></h1>
            <ul class="nav-menu">
                <li><a href="{{ url_for('home') }}">首页</a></li>
                <li><a href="{{ url_for('about') }}">关于</a></li>
            </ul>
        </div>
    </nav>

    <main class="container">
        {% block content %}{% endblock %}
    </main>

    <footer>
        <p>&copy; 2024 Python Web开发教程</p>
    </footer>

    <script src="{{ url_for('static', filename='js/main.js') }}"></script>
</body>
</html>

创建首页模板:

<!-- templates/index.html -->
{% extends "base.html" %}

{% block title %}首页 - Flask Web应用{% endblock %}

{% block content %}
<div class="hero">
    <h1>欢迎来到Flask Web开发</h1>
    <p>这是一个完整的Flask应用示例</p>

    <div class="features">
        <div class="feature">
            <h3>路由系统</h3>
            <p>灵活的URL路由配置</p>
        </div>
        <div class="feature">
            <h3>模板引擎</h3>
            <p>强大的Jinja2模板系统</p>
        </div>
        <div class="feature">
            <h3>数据库集成</h3>
            <p>SQLAlchemy ORM支持</p>
        </div>
    </div>

    <form class="demo-form" method="POST" action="{{ url_for('submit_form') }}">
        <h3>用户信息表单</h3>
        <input type="text" name="username" placeholder="用户名" required>
        <input type="email" name="email" placeholder="邮箱" required>
        <textarea name="message" placeholder="留言" rows="4"></textarea>
        <button type="submit">提交</button>
    </form>
</div>
{% endblock %}

Flask数据库集成

安装Flask-SQLAlchemy:

pip install Flask-SQLAlchemy

创建数据库模型:

# models.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///blog.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = 'your-secret-key-here'

db = SQLAlchemy(app)

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    posts = db.relationship('Post', backref='author', lazy=True)

    def __repr__(self):
        return f'<User {self.username}>'

class Post(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(200), nullable=False)
    content = db.Column(db.Text, nullable=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)

    def __repr__(self):
        return f'<Post {self.title}>'

# 路由和视图函数
@app.route('/')
def index():
    posts = Post.query.order_by(Post.created_at.desc()).all()
    return render_template('index.html', posts=posts)

@app.route('/user/<username>')
def user_posts(username):
    user = User.query.filter_by(username=username).first_or_404()
    posts = Post.query.filter_by(author=user).order_by(Post.created_at.desc()).all()
    return render_template('user_posts.html', user=user, posts=posts)

@app.route('/create_post', methods=['GET', 'POST'])
def create_post():
    if request.method == 'POST':
        title = request.form['title']
        content = request.form['content']
        username = request.form['username']

        # 查找或创建用户
        user = User.query.filter_by(username=username).first()
        if not user:
            user = User(username=username, email=f"{username}@example.com")
            db.session.add(user)
            db.session.commit()

        # 创建文章
        post = Post(title=title, content=content, author=user)
        db.session.add(post)
        db.session.commit()

        return redirect(url_for('index'))

    return render_template('create_post.html')

if __name__ == '__main__':
    with app.app_context():
        db.create_all()
    app.run(debug=True)

运行应用并创建数据库:

python models.py

输出类似如下:

 * Running on http://127.0.0.1:5000
 * Debug mode: on
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 234-567-890

13.3 Django全栈框架

Django安装和项目创建

安装Django:

pip install Django

创建Django项目:

django-admin startproject myblog
cd myblog
python manage.py startapp blog

输出类似如下:

C:\Users\Python\myblog>django-admin startproject myblog
C:\Users\Python\myblog>cd myblog
C:\Users\Python\myblog\myblog>python manage.py startapp blog

项目结构如下:

myblog/
├── manage.py
├── myblog/
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
└── blog/
    ├── __init__.py
    ├── admin.py
    ├── apps.py
    ├── models.py
    ├── tests.py
    └── views.py

Django模型定义

blog/models.py中定义数据模型:

# blog/models.py
from django.db import models
from django.contrib.auth.models import User
from django.urls import reverse

class Category(models.Model):
    name = models.CharField(max_length=100, unique=True)
    description = models.TextField(blank=True)
    created_at = models.DateTimeField(auto_now_add=True)

    class Meta:
        verbose_name_plural = "Categories"

    def __str__(self):
        return self.name

class Post(models.Model):
    STATUS_CHOICES = [
        ('draft', '草稿'),
        ('published', '已发布'),
    ]

    title = models.CharField(max_length=200)
    slug = models.SlugField(max_length=200, unique=True)
    content = models.TextField()
    author = models.ForeignKey(User, on_delete=models.CASCADE, related_name='posts')
    category = models.ForeignKey(Category, on_delete=models.SET_NULL, null=True, blank=True)
    status = models.CharField(max_length=10, choices=STATUS_CHOICES, default='draft')
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        ordering = ['-created_at']

    def __str__(self):
        return self.title

    def get_absolute_url(self):
        return reverse('blog:post_detail', kwargs={'slug': self.slug})

class Comment(models.Model):
    post = models.ForeignKey(Post, on_delete=models.CASCADE, related_name='comments')
    author_name = models.CharField(max_length=100)
    author_email = models.EmailField()
    content = models.TextField()
    created_at = models.DateTimeField(auto_now_add=True)
    is_approved = models.BooleanField(default=False)

    class Meta:
        ordering = ['created_at']

    def __str__(self):
        return f'Comment by {self.author_name} on {self.post.title}'

Django视图和URL配置

创建视图函数:

# blog/views.py
from django.shortcuts import render, get_object_or_404, redirect
from django.http import JsonResponse
from django.core.paginator import Paginator
from django.contrib import messages
from .models import Post, Category, Comment
from .forms import CommentForm

def post_list(request):
    posts = Post.objects.filter(status='published')
    category_id = request.GET.get('category')

    if category_id:
        posts = posts.filter(category_id=category_id)

    paginator = Paginator(posts, 5)  # 每页显示5篇文章
    page_number = request.GET.get('page')
    page_obj = paginator.get_page(page_number)

    categories = Category.objects.all()

    context = {
        'page_obj': page_obj,
        'categories': categories,
        'current_category': category_id
    }
    return render(request, 'blog/post_list.html', context)

def post_detail(request, slug):
    post = get_object_or_404(Post, slug=slug, status='published')
    comments = post.comments.filter(is_approved=True)

    if request.method == 'POST':
        form = CommentForm(request.POST)
        if form.is_valid():
            comment = form.save(commit=False)
            comment.post = post
            comment.save()
            messages.success(request, '评论提交成功,等待审核!')
            return redirect('blog:post_detail', slug=slug)
    else:
        form = CommentForm()

    context = {
        'post': post,
        'comments': comments,
        'form': form
    }
    return render(request, 'blog/post_detail.html', context)

def category_posts(request, category_id):
    category = get_object_or_404(Category, id=category_id)
    posts = Post.objects.filter(category=category, status='published')

    paginator = Paginator(posts, 5)
    page_number = request.GET.get('page')
    page_obj = paginator.get_page(page_number)

    context = {
        'category': category,
        'page_obj': page_obj
    }
    return render(request, 'blog/category_posts.html', context)

配置URL路由:

# blog/urls.py
from django.urls import path
from . import views

app_name = 'blog'

urlpatterns = [
    path('', views.post_list, name='post_list'),
    path('post/<slug:slug>/', views.post_detail, name='post_detail'),
    path('category/<int:category_id>/', views.category_posts, name='category_posts'),
]
# myblog/urls.py
from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('', include('blog.urls')),
]

数据库迁移

执行数据库迁移:

python manage.py makemigrations
python manage.py migrate

输出类似如下:

Migrations for 'blog':
  blog\migrations\0001_initial.py
    - Create model Category
    - Create model Post
    - Create model Comment
Operations to perform:
  Apply all migrations: admin, auth, blog, contenttypes, sessions
Running migrations:
  Applying contenttypes.0001_initial... OK
  Applying auth.0001_initial... OK
  Applying admin.0001_initial... OK
  Applying admin.0002_logentry_remove_auto_add... OK
  Applying admin.0003_logentry_add_action_flag_choices... OK
  Applying contenttypes.0002_remove_content_type_name... OK
  Applying auth.0002_alter_permission_name_max_length... OK
  Applying auth.0003_alter_user_email_max_length... OK
  Applying auth.0004_alter_user_username_opts... OK
  Applying auth.0005_alter_user_last_login_null... OK
  Applying auth.0006_require_contenttypes_0002... OK
  Applying auth.0007_alter_validators_add_error_messages... OK
  Applying auth.0008_alter_user_username_max_length... OK
  Applying auth.0009_alter_user_last_name_max_length... OK
  Applying auth.0010_alter_group_name_max_length... OK
  Applying auth.0011_update_proxy_permissions... OK
  Applying auth.0012_alter_user_first_name_max_length... OK
  Applying blog.0001_initial... OK
  Applying sessions.0001_initial... OK

创建超级用户:

python manage.py createsuperuser

输出类似如下:

Username (leave blank to use 'admin'): admin
Email address: admin@example.com
Password: 
Password (again): 
Superuser created successfully.

13.4 FastAPI现代框架

FastAPI安装和基础应用

安装FastAPI和相关依赖:

pip install fastapi uvicorn python-multipart

创建第一个FastAPI应用:

# main.py
from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, EmailStr
from typing import List, Optional
from datetime import datetime
import uvicorn

app = FastAPI(
    title="Python Web开发API",
    description="这是一个完整的FastAPI应用示例",
    version="1.0.0"
)

security = HTTPBearer()

# 数据模型
class UserBase(BaseModel):
    username: str
    email: EmailStr
    full_name: Optional[str] = None

class UserCreate(UserBase):
    password: str

class User(UserBase):
    id: int
    is_active: bool
    created_at: datetime

    class Config:
        from_attributes = True

class PostBase(BaseModel):
    title: str
    content: str
    category: Optional[str] = None

class PostCreate(PostBase):
    pass

class Post(PostBase):
    id: int
    author_id: int
    created_at: datetime
    updated_at: datetime

    class Config:
        from_attributes = True

# 模拟数据库
fake_users_db = []
fake_posts_db = []
user_id_counter = 1
post_id_counter = 1

# 依赖注入
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
    # 这里应该验证JWT token,简化处理
    token = credentials.credentials
    if token != "valid-token":
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return {"user_id": 1, "username": "testuser"}

# API路由
@app.get("/")
async def root():
    return {
        "message": "欢迎使用FastAPI",
        "version": "1.0.0",
        "docs": "/docs",
        "redoc": "/redoc"
    }

@app.post("/users/", response_model=User, status_code=status.HTTP_201_CREATED)
async def create_user(user: UserCreate):
    global user_id_counter

    # 检查用户名是否已存在
    for existing_user in fake_users_db:
        if existing_user["username"] == user.username:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="用户名已存在"
            )

    new_user = {
        "id": user_id_counter,
        "username": user.username,
        "email": user.email,
        "full_name": user.full_name,
        "is_active": True,
        "created_at": datetime.now()
    }
    fake_users_db.append(new_user)
    user_id_counter += 1

    return new_user

@app.get("/users/", response_model=List[User])
async def get_users(skip: int = 0, limit: int = 10):
    return fake_users_db[skip : skip + limit]

@app.get("/users/{user_id}", response_model=User)
async def get_user(user_id: int):
    for user in fake_users_db:
        if user["id"] == user_id:
            return user
    raise HTTPException(status_code=404, detail="用户不存在")

@app.post("/posts/", response_model=Post, status_code=status.HTTP_201_CREATED)
async def create_post(post: PostCreate, current_user: dict = Depends(get_current_user)):
    global post_id_counter

    new_post = {
        "id": post_id_counter,
        "title": post.title,
        "content": post.content,
        "category": post.category,
        "author_id": current_user["user_id"],
        "created_at": datetime.now(),
        "updated_at": datetime.now()
    }
    fake_posts_db.append(new_post)
    post_id_counter += 1

    return new_post

@app.get("/posts/", response_model=List[Post])
async def get_posts(skip: int = 0, limit: int = 10, category: Optional[str] = None):
    posts = fake_posts_db
    if category:
        posts = [post for post in posts if post.get("category") == category]
    return posts[skip : skip + limit]

@app.get("/posts/{post_id}", response_model=Post)
async def get_post(post_id: int):
    for post in fake_posts_db:
        if post["id"] == post_id:
            return post
    raise HTTPException(status_code=404, detail="文章不存在")

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

运行FastAPI应用:

uvicorn main:app --reload

输出类似如下:

INFO:     Will watch for changes in these directories: ['C:\\Users\\Python\\fastapi_demo']
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [12345] using StatReload
INFO:     Started server process [12346]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

访问 http://127.0.0.1:8000/docs 可以看到自动生成的API文档。

FastAPI数据库集成

安装SQLAlchemy和数据库驱动:

pip install sqlalchemy databases[sqlite] alembic

创建数据库配置:

# database.py
from sqlalchemy import create_engine, MetaData
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from databases import Database

DATABASE_URL = "sqlite:///./fastapi_blog.db"

# SQLAlchemy
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

# Databases query builder
database = Database(DATABASE_URL)
metadata = MetaData()

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

创建数据库模型:

# models.py
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from datetime import datetime
from database import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(50), unique=True, index=True)
    email = Column(String(100), unique=True, index=True)
    full_name = Column(String(100))
    hashed_password = Column(String(100))
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)

    posts = relationship("Post", back_populates="author")

class Post(Base):
    __tablename__ = "posts"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(200), index=True)
    content = Column(Text)
    category = Column(String(50))
    author_id = Column(Integer, ForeignKey("users.id"))
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    author = relationship("User", back_populates="posts")

13.5 数据库操作

SQLite基础操作

创建一个完整的数据库操作示例:

# sqlite_demo.py
import sqlite3
from datetime import datetime
import json

class BlogDatabase:
    def __init__(self, db_name="blog.db"):
        self.db_name = db_name
        self.init_database()

    def init_database(self):
        """初始化数据库表"""
        conn = sqlite3.connect(self.db_name)
        cursor = conn.cursor()

        # 创建用户表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                username TEXT UNIQUE NOT NULL,
                email TEXT UNIQUE NOT NULL,
                password_hash TEXT NOT NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')

        # 创建文章表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS posts (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                title TEXT NOT NULL,
                content TEXT NOT NULL,
                author_id INTEGER,
                category TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                FOREIGN KEY (author_id) REFERENCES users (id)
            )
        ''')

        conn.commit()
        conn.close()
        print("数据库初始化完成")

    def create_user(self, username, email, password_hash):
        """创建用户"""
        conn = sqlite3.connect(self.db_name)
        cursor = conn.cursor()

        try:
            cursor.execute(
                "INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
                (username, email, password_hash)
            )
            conn.commit()
            user_id = cursor.lastrowid
            print(f"用户创建成功,ID: {user_id}")
            return user_id
        except sqlite3.IntegrityError as e:
            print(f"用户创建失败: {e}")
            return None
        finally:
            conn.close()

    def create_post(self, title, content, author_id, category=None):
        """创建文章"""
        conn = sqlite3.connect(self.db_name)
        cursor = conn.cursor()

        cursor.execute(
            "INSERT INTO posts (title, content, author_id, category) VALUES (?, ?, ?, ?)",
            (title, content, author_id, category)
        )
        conn.commit()
        post_id = cursor.lastrowid
        conn.close()

        print(f"文章创建成功,ID: {post_id}")
        return post_id

    def get_posts_with_authors(self):
        """获取文章列表(包含作者信息)"""
        conn = sqlite3.connect(self.db_name)
        cursor = conn.cursor()

        cursor.execute('''
            SELECT p.id, p.title, p.content, p.category, p.created_at,
                   u.username, u.email
            FROM posts p
            JOIN users u ON p.author_id = u.id
            ORDER BY p.created_at DESC
        ''')

        posts = cursor.fetchall()
        conn.close()

        return posts

    def search_posts(self, keyword):
        """搜索文章"""
        conn = sqlite3.connect(self.db_name)
        cursor = conn.cursor()

        cursor.execute(
            "SELECT * FROM posts WHERE title LIKE ? OR content LIKE ?",
            (f"%{keyword}%", f"%{keyword}%")
        )

        results = cursor.fetchall()
        conn.close()

        return results

# 使用示例
if __name__ == "__main__":
    db = BlogDatabase()

    # 创建用户
    user1_id = db.create_user("张三", "zhangsan@example.com", "hashed_password_123")
    user2_id = db.create_user("李四", "lisi@example.com", "hashed_password_456")

    # 创建文章
    if user1_id:
        db.create_post("Python Web开发入门", "这是一篇关于Python Web开发的文章...", user1_id, "技术")
        db.create_post("Flask框架详解", "Flask是一个轻量级的Web框架...", user1_id, "技术")

    if user2_id:
        db.create_post("Django实战项目", "使用Django开发一个完整的博客系统...", user2_id, "项目")

    # 查询文章
    posts = db.get_posts_with_authors()
    print("\n所有文章:")
    for post in posts:
        print(f"ID: {post[0]}, 标题: {post[1]}, 作者: {post[5]}, 分类: {post[3]}")

    # 搜索文章
    search_results = db.search_posts("Flask")
    print("\n搜索结果:")
    for result in search_results:
        print(f"找到文章: {result[1]}")

运行数据库操作示例:

python sqlite_demo.py

输出类似如下:

数据库初始化完成
用户创建成功,ID: 1
用户创建成功,ID: 2
文章创建成功,ID: 1
文章创建成功,ID: 2
文章创建成功,ID: 3

所有文章:
ID: 3, 标题: Django实战项目, 作者: 李四, 分类: 项目
ID: 2, 标题: Flask框架详解, 作者: 张三, 分类: 技术
ID: 1, 标题: Python Web开发入门, 作者: 张三, 分类: 技术

搜索结果:
找到文章: Flask框架详解

SQLAlchemy ORM使用

创建一个完整的ORM示例:

# orm_demo.py
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime

# 数据库配置
engine = create_engine('sqlite:///orm_blog.db', echo=True)
Base = declarative_base()
Session = sessionmaker(bind=engine)

# 模型定义
class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)

    posts = relationship("Post", back_populates="author")

    def __repr__(self):
        return f"<User(username='{self.username}', email='{self.email}')>"

class Post(Base):
    __tablename__ = 'posts'

    id = Column(Integer, primary_key=True)
    title = Column(String(200), nullable=False)
    content = Column(Text, nullable=False)
    author_id = Column(Integer, ForeignKey('users.id'))
    created_at = Column(DateTime, default=datetime.utcnow)

    author = relationship("User", back_populates="posts")

    def __repr__(self):
        return f"<Post(title='{self.title}', author='{self.author.username if self.author else None}')>"

# 创建表
Base.metadata.create_all(engine)

# 数据操作函数
def create_sample_data():
    session = Session()

    # 创建用户
    user1 = User(username="python_dev", email="dev@python.com")
    user2 = User(username="web_master", email="master@web.com")

    session.add_all([user1, user2])
    session.commit()

    # 创建文章
    post1 = Post(
        title="SQLAlchemy ORM教程",
        content="这是一篇详细的SQLAlchemy ORM使用教程...",
        author=user1
    )

    post2 = Post(
        title="Web开发最佳实践",
        content="分享一些Web开发的最佳实践和经验...",
        author=user1
    )

    post3 = Post(
        title="数据库设计原则",
        content="介绍关系型数据库设计的基本原则...",
        author=user2
    )

    session.add_all([post1, post2, post3])
    session.commit()

    print("示例数据创建完成")
    session.close()

def query_data():
    session = Session()

    # 查询所有用户
    print("\n所有用户:")
    users = session.query(User).all()
    for user in users:
        print(f"  {user}")

    # 查询特定用户的文章
    print("\npython_dev的文章:")
    user = session.query(User).filter_by(username="python_dev").first()
    if user:
        for post in user.posts:
            print(f"  {post.title} - {post.created_at}")

    # 联合查询
    print("\n所有文章(包含作者信息):")
    posts = session.query(Post).join(User).all()
    for post in posts:
        print(f"  《{post.title}》 by {post.author.username}")

    # 条件查询
    print("\n标题包含'教程'的文章:")
    tutorial_posts = session.query(Post).filter(Post.title.contains('教程')).all()
    for post in tutorial_posts:
        print(f"  {post.title}")

    session.close()

if __name__ == "__main__":
    create_sample_data()
    query_data()

运行ORM示例:

python orm_demo.py

输出类似如下:

2024-01-15 14:30:00,123 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-01-15 14:30:00,124 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("users")
2024-01-15 14:30:00,124 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-01-15 14:30:00,125 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("users")
2024-01-15 14:30:00,125 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-01-15 14:30:00,126 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("posts")
2024-01-15 14:30:00,126 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-01-15 14:30:00,127 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("posts")
2024-01-15 14:30:00,127 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-01-15 14:30:00,128 INFO sqlalchemy.engine.Engine 
CREATE TABLE users (
    id INTEGER NOT NULL, 
    username VARCHAR(50) NOT NULL, 
    email VARCHAR(100) NOT NULL, 
    created_at DATETIME, 
    PRIMARY KEY (id), 
    UNIQUE (username), 
    UNIQUE (email)
)

2024-01-15 14:30:00,128 INFO sqlalchemy.engine.Engine [no key 0.00000s] ()
2024-01-15 14:30:00,129 INFO sqlalchemy.engine.Engine 
CREATE TABLE posts (
    id INTEGER NOT NULL, 
    title VARCHAR(200) NOT NULL, 
    content TEXT NOT NULL, 
    author_id INTEGER, 
    created_at DATETIME, 
    PRIMARY KEY (id), 
    FOREIGN KEY(author_id) REFERENCES users (id)
)

2024-01-15 14:30:00,129 INFO sqlalchemy.engine.Engine [no key 0.00000s] ()
2024-01-15 14:30:00,130 INFO sqlalchemy.engine.Engine COMMIT
示例数据创建完成

所有用户:
  <User(username='python_dev', email='dev@python.com')>
  <User(username='web_master', email='master@web.com')>

python_dev的文章:
  SQLAlchemy ORM教程 - 2024-01-15 14:30:00.135000
  Web开发最佳实践 - 2024-01-15 14:30:00.136000

所有文章包含作者信息:
  SQLAlchemy ORM教程 by python_dev
  Web开发最佳实践 by python_dev
  数据库设计原则 by web_master

标题包含'教程'的文章:
  SQLAlchemy ORM教程

13.6 前端集成

AJAX和API调用

创建一个完整的前后端交互示例:

<!-- templates/ajax_demo.html -->
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>AJAX API调用示例</title>
    <style>
        body {
            font-family: 'Microsoft YaHei', sans-serif;
            max-width: 800px;
            margin: 0 auto;
            padding: 20px;
            background-color: #f5f5f5;
        }
        .container {
            background: white;
            padding: 30px;
            border-radius: 10px;
            box-shadow: 0 2px 10px rgba(0,0,0,0.1);
        }
        .form-group {
            margin-bottom: 20px;
        }
        label {
            display: block;
            margin-bottom: 5px;
            font-weight: bold;
        }
        input, textarea {
            width: 100%;
            padding: 10px;
            border: 1px solid #ddd;
            border-radius: 5px;
            font-size: 14px;
        }
        button {
            background: #007bff;
            color: white;
            padding: 10px 20px;
            border: none;
            border-radius: 5px;
            cursor: pointer;
            margin-right: 10px;
        }
        button:hover {
            background: #0056b3;
        }
        .posts {
            margin-top: 30px;
        }
        .post {
            background: #f8f9fa;
            padding: 15px;
            margin-bottom: 15px;
            border-radius: 5px;
            border-left: 4px solid #007bff;
        }
        .loading {
            color: #666;
            font-style: italic;
        }
        .error {
            color: #dc3545;
            background: #f8d7da;
            padding: 10px;
            border-radius: 5px;
            margin: 10px 0;
        }
        .success {
            color: #155724;
            background: #d4edda;
            padding: 10px;
            border-radius: 5px;
            margin: 10px 0;
        }
    </style>
</head>
<body>
    <div class="container">
        <h1>Python Web API 交互示例</h1>

        <!-- 创建文章表单 -->
        <form id="postForm">
            <div class="form-group">
                <label for="title">文章标题:</label>
                <input type="text" id="title" name="title" required>
            </div>
            <div class="form-group">
                <label for="content">文章内容:</label>
                <textarea id="content" name="content" rows="5" required></textarea>
            </div>
            <div class="form-group">
                <label for="category">分类:</label>
                <input type="text" id="category" name="category">
            </div>
            <button type="submit">发布文章</button>
            <button type="button" onclick="loadPosts()">刷新文章列表</button>
        </form>

        <div id="message"></div>

        <!-- 文章列表 -->
        <div class="posts">
            <h2>文章列表</h2>
            <div id="postsList">
                <div class="loading">加载中...</div>
            </div>
        </div>
    </div>

    <script>
        // API基础URL
        const API_BASE = 'http://localhost:8000';

        // 显示消息
        function showMessage(message, type = 'success') {
            const messageDiv = document.getElementById('message');
            messageDiv.innerHTML = `<div class="${type}">${message}</div>`;
            setTimeout(() => {
                messageDiv.innerHTML = '';
            }, 3000);
        }

        // 发送POST请求创建文章
        document.getElementById('postForm').addEventListener('submit', async function(e) {
            e.preventDefault();

            const formData = new FormData(this);
            const postData = {
                title: formData.get('title'),
                content: formData.get('content'),
                category: formData.get('category') || null
            };

            try {
                const response = await fetch(`${API_BASE}/posts/`, {
                    method: 'POST',
                    headers: {
                        'Content-Type': 'application/json',
                        'Authorization': 'Bearer valid-token'
                    },
                    body: JSON.stringify(postData)
                });

                if (response.ok) {
                    const result = await response.json();
                    showMessage('文章发布成功!', 'success');
                    this.reset();
                    loadPosts(); // 刷新文章列表
                } else {
                    const error = await response.json();
                    showMessage(`发布失败: ${error.detail}`, 'error');
                }
            } catch (error) {
                showMessage(`网络错误: ${error.message}`, 'error');
            }
        });

        // 加载文章列表
        async function loadPosts() {
            const postsDiv = document.getElementById('postsList');
            postsDiv.innerHTML = '<div class="loading">加载中...</div>';

            try {
                const response = await fetch(`${API_BASE}/posts/`);

                if (response.ok) {
                    const posts = await response.json();

                    if (posts.length === 0) {
                        postsDiv.innerHTML = '<p>暂无文章</p>';
                        return;
                    }

                    let html = '';
                    posts.forEach(post => {
                        html += `
                            <div class="post">
                                <h3>${post.title}</h3>
                                <p>${post.content}</p>
                                <small>分类: ${post.category || '未分类'} | 创建时间: ${new Date(post.created_at).toLocaleString()}</small>
                            </div>
                        `;
                    });

                    postsDiv.innerHTML = html;
                } else {
                    postsDiv.innerHTML = '<div class="error">加载文章失败</div>';
                }
            } catch (error) {
                postsDiv.innerHTML = `<div class="error">网络错误: ${error.message}</div>`;
            }
        }

        // 页面加载时获取文章列表
        document.addEventListener('DOMContentLoaded', loadPosts);

        // 使用Fetch API的高级示例
        async function advancedApiCall() {
            try {
                // 并发请求多个API
                const [usersResponse, postsResponse] = await Promise.all([
                    fetch(`${API_BASE}/users/`),
                    fetch(`${API_BASE}/posts/`)
                ]);

                const users = await usersResponse.json();
                const posts = await postsResponse.json();

                console.log('用户数据:', users);
                console.log('文章数据:', posts);

                // 处理数据...
            } catch (error) {
                console.error('API调用失败:', error);
            }
        }

        // 带有重试机制的API调用
        async function apiCallWithRetry(url, options = {}, maxRetries = 3) {
            for (let i = 0; i < maxRetries; i++) {
                try {
                    const response = await fetch(url, options);
                    if (response.ok) {
                        return await response.json();
                    }
                    throw new Error(`HTTP ${response.status}: ${response.statusText}`);
                } catch (error) {
                    if (i === maxRetries - 1) throw error;
                    console.log(`请求失败,${1000 * (i + 1)}ms后重试...`);
                    await new Promise(resolve => setTimeout(resolve, 1000 * (i + 1)));
                }
            }
        }
    </script>
</body>
</html>

13.7 Web服务和API设计

RESTful API设计实践

创建一个完整的RESTful API示例:
```python

restful_api.py

from flask import Flask, request, jsonify, abort
from flask_sqlalchemy import SQLAlchemy
from flask_marshmallow import Marshmallow
from marshmallow import ValidationError
from datetime import datetime
import os

app = Flask(name)
app.config[‘SQLALCHEMY_DATABASE_URI’] = ‘sqlite:///restful_blog.db’
app.config[‘SQLALCHEMY_TRACK_MODIFICATIONS’] = False

db = SQLAlchemy(app)
ma = Marshmallow(app)

数据模型

class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
posts = db.relationship(‘Post’, backref=’author’, lazy=True, cascade=’all, delete-orphan’)

class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
content = db.Column(db.Text, nullable=False)
category = db.Column(db.String(50))
user_id = db.Column(db.Integer, db.ForeignKey(‘user.id’), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

序列化Schema

class UserSchema(ma.SQLAlchemyAutoSchema):
class Meta:
model = User
load_instance = True
include_fk = True

class PostSchema(ma.SQLAlchemyAutoSchema):
class Meta:
model = Post
load_instance = True
include_fk = True

author = ma.Nested(UserSchema, only=['id', 'username'])

初始化Schema

user_schema = UserSchema()
users_schema = UserSchema(many=True)
post_schema = PostSchema()
posts_schema = PostSchema(many=True)

错误处理

@app.errorhandler(404)
def not_found(error):
return jsonify({‘error’: ‘Resource not found’}), 404

@app.errorhandler(400)
def bad_request(error):
return jsonify({‘error’: ‘Bad request’}), 400

@app.

小夜