1. Flask与MySQL数据库连接实战
在Web开发中,数据库操作是后端开发的核心环节。Flask作为轻量级Python Web框架,与MySQL的结合能够快速构建数据驱动的应用。我经历过多个从零搭建Flask-MySQL项目的完整周期,今天就把最实用的配置方法和ORM操作技巧整理出来。
连接Flask与MySQL主要解决三个关键问题:一是建立稳定的数据库通信渠道,二是通过ORM实现面向对象的数据操作,三是保证生产环境下的性能与安全。不同于简单的SQL语句拼接,ORM方式让开发者能用Python类和方法操作数据库,既避免了SQL注入风险,又提高了代码可维护性。
2. 环境准备与基础配置
2.1 必要组件安装
首先确保已安装Python 3.6+和MySQL 5.7+。通过pip安装核心依赖包:
pip install flask flask-sqlalchemy pymysql cryptography这里特别说明包选型原因:
flask-sqlalchemy:Flask官方推荐的ORM扩展pymysql:纯Python实现的MySQL驱动cryptography:用于安全连接加密
注意:不要使用已停止维护的MySQL-python驱动,它在Python3环境下存在兼容性问题。
2.2 数据库连接配置
在Flask应用实例中配置数据库URI:
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://username:password@localhost/db_name?charset=utf8mb4' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False db = SQLAlchemy(app)关键参数解析:
charset=utf8mb4:支持完整的Unicode字符(包括emoji)TRACK_MODIFICATIONS:禁用Flask-SQLAlchemy事件系统提升性能- 生产环境应将密码等敏感信息放入环境变量
3. 模型定义与表结构映射
3.1 基础模型示例
以用户管理系统为例,定义User模型:
class User(db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) email = db.Column(db.String(120), unique=True, nullable=False) created_at = db.Column(db.DateTime, default=db.func.now()) def __repr__(self): return f'<User {self.username}>'字段类型选择建议:
- 字符串:String(length) 根据实际需求限制长度
- 数字:Integer/Float/Numeric 按精度需求选择
- 时间:DateTime 带时区建议使用TIMESTAMP
3.2 高级字段配置技巧
实际项目中常用的高级配置:
from sqlalchemy import text class Post(db.Model): __tablename__ = 'posts' id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(100), index=True) # 添加索引提高查询速度 content = db.Column(db.Text) # 长文本使用Text类型 status = db.Column(db.String(20), server_default='draft') # 服务端默认值 user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # 定义关系 author = db.relationship('User', backref=db.backref('posts', lazy='dynamic')) # 自定义查询方法 @classmethod def get_recent_posts(cls, days=7): return cls.query.filter( cls.created_at >= text(f'DATE_SUB(NOW(), INTERVAL {days} DAY)') ).all()4. 数据库迁移管理
4.1 Alembic迁移工具配置
Flask-Migrate是基于Alembic的数据库迁移扩展:
pip install flask-migrate初始化迁移环境:
from flask_migrate import Migrate migrate = Migrate(app, db)命令行操作:
flask db init # 初始化迁移仓库 flask db migrate -m "create user table" # 生成迁移脚本 flask db upgrade # 执行迁移4.2 迁移常见问题处理
- 合并冲突迁移:删除迁移目录中冲突的版本文件,重新生成
- 回滚迁移:
flask db downgrade指定版本号 - 生产环境迁移:务必先在测试环境验证迁移脚本
重要:不要在迁移脚本中直接操作生产数据,应该编写单独的数据迁移脚本
5. ORM增删改查全解析
5.1 创建记录
标准创建方式:
new_user = User(username='john', email='john@example.com') db.session.add(new_user) db.session.commit()批量创建优化:
users = [ User(username=f'user{i}', email=f'user{i}@test.com') for i in range(10) ] db.session.bulk_save_objects(users) db.session.commit()5.2 查询操作
基础查询方法:
# 获取全部 users = User.query.all() # 条件查询 admin = User.query.filter_by(username='admin').first() # 复杂查询 recent_users = User.query.filter( User.created_at > '2023-01-01' ).order_by(User.created_at.desc()).limit(5).all()性能优化技巧:
- 使用
.only()指定返回字段 - 关联查询时明确加载策略(lazy/joined/subquery)
- 分页使用
.paginate()方法
5.3 更新与删除
更新记录:
user = User.query.get(1) user.email = 'new@email.com' db.session.commit()批量更新:
User.query.filter_by(role='guest').update({'status': 'inactive'}) db.session.commit()删除记录:
user = User.query.get(1) db.session.delete(user) db.session.commit()6. 高级查询与性能优化
6.1 关联查询实践
一对多关系查询:
# 获取用户的所有文章 user = User.query.get(1) posts = user.posts.all() # 使用backref定义的关系 # 带条件的关联查询 recent_posts = Post.query.join(User).filter( User.username == 'john', Post.created_at > datetime.now() - timedelta(days=7) ).all()6.2 聚合查询
统计查询示例:
from sqlalchemy import func # 用户数量统计 user_count = db.session.query(func.count(User.id)).scalar() # 分组统计 post_stats = db.session.query( User.username, func.count(Post.id).label('post_count') ).join(Post).group_by(User.id).all()6.3 查询性能优化
- 使用EXPLAIN分析慢查询:
result = db.session.execute('EXPLAIN SELECT * FROM users WHERE ...')- 添加适当索引:
class User(db.Model): __table_args__ = ( db.Index('idx_username_email', 'username', 'email'), )- 启用查询缓存:
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = { 'pool_pre_ping': True, 'pool_recycle': 3600, 'pool_size': 20 }7. 事务管理与错误处理
7.1 事务基础操作
手动控制事务:
try: db.session.begin() # 执行多个操作 user1 = User(...) user2 = User(...) db.session.add_all([user1, user2]) db.session.commit() except Exception as e: db.session.rollback() raise e7.2 嵌套事务处理
使用SAVEPOINT实现嵌套事务:
try: db.session.begin_nested() # 操作1 db.session.commit() db.session.begin_nested() # 操作2 db.session.commit() except: db.session.rollback()7.3 常见错误处理
- 重复键错误:
from sqlalchemy.exc import IntegrityError try: db.session.commit() except IntegrityError as e: if 'Duplicate entry' in str(e): # 处理重复数据 db.session.rollback()- 连接超时处理:
from sqlalchemy.exc import OperationalError @app.teardown_request def shutdown_session(exception=None): if exception and isinstance(exception, OperationalError): db.session.remove()8. 生产环境最佳实践
8.1 连接池配置
推荐生产环境配置:
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = { 'pool_size': 20, 'max_overflow': 10, 'pool_timeout': 30, 'pool_recycle': 3600, 'pool_pre_ping': True }8.2 读写分离实现
使用SQLAlchemy的binds实现:
app.config['SQLALCHEMY_BINDS'] = { 'master': 'mysql+pymysql://master_user:pass@master_host/db', 'slave': 'mysql+pymysql://slave_user:pass@slave_host/db' } class User(db.Model): __bind_key__ = 'master' # 默认使用主库8.3 监控与日志
启用查询日志:
import logging logging.basicConfig() logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)性能监控建议:
- 使用Flask-SQLAlchemy的
get_debug_queries() - 集成APM工具如NewRelic或Datadog
我在实际项目中总结的几点经验:数据库连接字符串应该从环境变量读取而非硬编码;开发环境可以使用SQLite但生产必须用MySQL;ORM虽然方便但复杂查询还是需要手写优化SQL;迁移脚本必须经过严格测试才能在生产环境执行。