IT策士 10余年一线大厂经验,专注 IT 思维、架构、职场进阶。我会在各个平台持续发布最新文章,助你少走弯路。
这是本系列的收官之战。前面 29 篇文章,我们从安装连接一路深入到 InnoDB 内核、锁机制、主从复制、分库分表。今天,我们将所有知识融会贯通,完整设计一个电商订单核心系统,从表结构设计到索引优化,从分区归档到全链路压测与慢 SQL 调优,让你真正体验一把“架构师视角”的数据库实战。
1. 业务需求与架构概览
核心场景:用户浏览商品 → 下单 → 扣库存 → 生成订单 → 查询订单列表。
数据规模预估:
用户数:500 万
商品数:10 万
日均订单:50 万单
订单表年增量:约 1.8 亿行
技术目标:
核心下单接口 P99 < 100ms
订单查询(带条件分页)P99 < 200ms
每日归档历史订单,保持热表在 3 个月内
2. 搭建测试环境
importmysql.connectorimporttimeimportrandomimportthreading from datetimeimportdate, timedelta conn=mysql.connector.connect(host="127.0.0.1",port=3306,user="root",password="MyNewPass123!",database="shop")cursor=conn.cursor()# ============ 用户表 ============cursor.execute("DROP TABLE IF EXISTS users")cursor.execute(""" CREATE TABLEusers(idINT AUTO_INCREMENT PRIMARY KEY, username VARCHAR(50)NOT NULL, phone VARCHAR(20)NOT NULL, city VARCHAR(20), vip_level TINYINT DEFAULT0, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, UNIQUE INDEX idx_phone(phone), INDEX idx_city(city))ENGINE=InnoDB DEFAULTCHARSET=utf8mb4""")# ============ 商品表 ============cursor.execute("DROP TABLE IF EXISTS products")cursor.execute(""" CREATE TABLE products(idINT AUTO_INCREMENT PRIMARY KEY, title VARCHAR(200)NOT NULL, price DECIMAL(10,2)NOT NULL, stock INT NOT NULL DEFAULT0, category VARCHAR(30), attributes JSON, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, INDEX idx_category(category), INDEX idx_category_price(category, price))ENGINE=InnoDB DEFAULTCHARSET=utf8mb4""")# ============ 订单主表(按月份 RANGE 分区) ============cursor.execute("DROP TABLE IF EXISTS orders")cursor.execute(""" CREATE TABLE orders(idBIGINT AUTO_INCREMENT, user_id INT NOT NULL, total_amount DECIMAL(10,2)NOT NULL, status TINYINT DEFAULT0COMMENT'0待支付,1已支付,2已发货,3已完成,4已取消', created_date DATE NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(id, created_date), INDEX idx_user(user_id), INDEX idx_user_date(user_id, created_date), INDEX idx_status_date(status, created_date))ENGINE=InnoDB DEFAULTCHARSET=utf8mb4 PARTITION BY RANGE(TO_DAYS(created_date))(PARTITION p202501 VALUES LESS THAN(TO_DAYS('2025-02-01')), PARTITION p202502 VALUES LESS THAN(TO_DAYS('2025-03-01')), PARTITION p202503 VALUES LESS THAN(TO_DAYS('2025-04-01')), PARTITION p202504 VALUES LESS THAN(TO_DAYS('2025-05-01')), PARTITION p202505 VALUES LESS THAN(TO_DAYS('2025-06-01')), PARTITION p202506 VALUES LESS THAN(TO_DAYS('2025-07-01')), PARTITION p202507 VALUES LESS THAN(TO_DAYS('2025-08-01')), PARTITION p_future VALUES LESS THAN MAXVALUE)""")# ============ 订单明细表 ============cursor.execute("DROP TABLE IF EXISTS order_items")cursor.execute(""" CREATE TABLE order_items(idBIGINT AUTO_INCREMENT PRIMARY KEY, order_id BIGINT NOT NULL, product_id INT NOT NULL, product_title VARCHAR(200)NOT NULL, price DECIMAL(10,2)NOT NULL, quantity INT NOT NULL, INDEX idx_order(order_id))ENGINE=InnoDB DEFAULTCHARSET=utf8mb4""")print("✅ 四张核心表创建完毕")3. 表设计复盘:每个决策的依据
4. 插入测试数据:模拟 3 个月运营
print("⏳ 正在生成测试数据...")# 插入用户users_batch=[]foriinrange(1,5001):# 5000 用户phone=f"138{10000000 + i}"city=random.choice(["北京","上海","深圳","杭州","成都","武汉"])users_batch.append((f"user_{i}",phone,city,random.randint(0,3)))cursor.executemany("INSERT INTO users (username, phone, city, vip_level) VALUES (%s,%s,%s,%s)", users_batch)conn.commit()print(f"✅ 插入了 5000 用户")# 插入商品categories=["电脑外设","音频设备","手机配件","办公用品","智能家居"]products_batch=[]foriinrange(1,1001):# 1000 商品title=f"product_{i}"price=round(random.uniform(10,5000),2)stock=random.randint(0,500)cat=random.choice(categories)attrs=f'{{"brand":"brand_{i%50}","color":"black","weight":{round(random.uniform(0.1,5),2)}}}'products_batch.append((title,price,stock,cat,attrs))cursor.executemany("INSERT INTO products (title, price, stock, category, attributes) VALUES (%s,%s,%s,%s,%s)", products_batch)conn.commit()print(f"✅ 插入了 1000 商品")# 插入订单(最近3个月,共10万条)total_orders=100000batch_size=2000start_date=date(2025,4,1)statuses=[0,1,1,1,2,2,3,4]# 加权分布foriinrange(0, total_orders, batch_size): orders_batch=[]items_batch=[]forjinrange(batch_size): order_id=1000000+ i + j user_id=random.randint(1,5000)order_date=start_date + timedelta(days=random.randint(0,90))status=random.choice(statuses)total=0# 每个订单 1-3 个商品for_inrange(random.randint(1,3)): pid=random.randint(1,1000)qty=random.randint(1,5)price=round(random.uniform(10,5000),2)total+=price * qty items_batch.append((order_id,pid,f"product_{pid}",price,qty))orders_batch.append((order_id,user_id,round(total,2),status,order_date,order_date))cursor.executemany("INSERT INTO orders (id, user_id, total_amount, status, created_date, created_at) VALUES (%s,%s,%s,%s,%s,%s)", orders_batch)cursor.executemany("INSERT INTO order_items (order_id, product_id, product_title, price, quantity) VALUES (%s,%s,%s,%s,%s)", items_batch)conn.commit()print(f" 订单: {min(i+batch_size, total_orders)}/{total_orders}",end="\r")print("\n✅ 订单数据插入完毕")5. 核心业务 SQL 设计
5.1 下单(需事务 + 行锁)
def create_order(conn, user_id, items):"""items:[(product_id, quantity),...]""" conn.autocommit=False cursor=conn.cursor()try: total=0order_items=[]forpid, qtyinitems:# 锁定库存行cursor.execute("SELECT stock, price, title FROM products WHERE id = %s FOR UPDATE",(pid,))stock, price, title=cursor.fetchone()ifstock<qty: raise Exception(f"商品 {title} 库存不足")total+=price * qty order_items.append((pid,title,price,qty))# 创建订单order_date=date.today()cursor.execute("INSERT INTO orders (user_id, total_amount, status, created_date) VALUES (%s,%s,0,%s)",(user_id, total, order_date))order_id=cursor.lastrowid# 批量插入明细forpid, title, price, qtyinorder_items: cursor.execute("INSERT INTO order_items (order_id, product_id, product_title, price, quantity) VALUES (%s,%s,%s,%s,%s)",(order_id, pid, title, price, qty))cursor.execute("UPDATE products SET stock = stock - %s WHERE id = %s",(qty, pid))conn.commit()returnorder_id except Exception as e: conn.rollback()raise e# 测试下单try: order_id=create_order(conn,1001,[(1,2),(50,1)])print(f"✅ 下单成功,订单号: {order_id}")except Exception as e: print(f"❌ 下单失败: {e}")5.2 用户订单列表(分页 + 覆盖索引)
def user_orders(conn, user_id,page=1,page_size=10):"""用户订单列表,使用覆盖索引 + 延迟关联优化深分页""" cursor=conn.cursor()offset=(page -1)* page_size# 利用 idx_user_date 覆盖索引,先查主键再回表sql=""" SELECT o.id, o.total_amount, o.status, o.created_at FROM orders o INNER JOIN(SELECTidFROM orders WHERE user_id=%s ORDER BY created_date DESC LIMIT %s OFFSET %s)AS tmp ON o.id=tmp.id ORDER BY o.created_at DESC""" cursor.execute(sql,(user_id, page_size, offset))returncursor.fetchall()orders=user_orders(conn,1001,1,5)print(f"用户 1001 的订单(前5条):")foroinorders: print(f" 订单#{o[0]} 金额¥{o[1]} 状态{o[2]} 时间{o[3]}")5.3 管理后台:按日期范围查询订单(分区裁剪)
def admin_search_orders(conn, start_date, end_date,status=None,page=1,page_size=20):"""按日期范围查询,利用分区裁剪""" cursor=conn.cursor()offset=(page -1)* page_size where="WHERE created_date BETWEEN %s AND %s"params=[start_date, end_date]ifstatus is not None: where+=" AND status = %s"params.append(status)sql=f"SELECT id, user_id, total_amount, status, created_at FROM orders {where} ORDER BY created_date DESC LIMIT %s OFFSET %s"params.extend([page_size, offset])cursor.execute(sql, params)returncursor.fetchall()result=admin_search_orders(conn,'2025-06-01','2025-06-30',status=1)print(f"6月已支付订单(前5条):{len(result)}条返回")6. 全链路压测:发现性能瓶颈
用 Python 多线程模拟 100 并发用户同时查询订单和下单:
def stress_test():"""模拟100并发:80% 查询 +20% 下单""" def worker(uid): conn_t=mysql.connector.connect(host="127.0.0.1",port=3306,user="root",password="MyNewPass123!",database="shop")cursor_t=conn_t.cursor()for_inrange(10):ifrandom.random()<0.8:# 查询订单cursor_t.execute("SELECT id FROM orders WHERE user_id = %s ORDER BY created_date DESC LIMIT 10",(uid,))cursor_t.fetchall()else:# 模拟下单try: create_order(conn_t, uid,[(random.randint(1,1000),1)])except: pass cursor_t.close()conn_t.close()threads=[]start=time.time()foriinrange(100): t=threading.Thread(target=worker,args=(random.randint(1,5000),))threads.append(t)t.start()fortinthreads: t.join()elapsed=time.time()- start print(f"\n📊 压测结果: 100 并发 × 10 次操作 = 1000 次请求, 总耗时 {elapsed:.2f} 秒")print(f" 平均 QPS: {1000/elapsed:.0f}")stress_test()7. 慢查询分析与优化
打开慢查询日志后,用pt-query-digest分析(见第23篇),发现以下典型问题:
7.1 问题1:按商品类别查询时出现 Using filesort
cursor.execute(""" EXPLAIN SELECT * FROM products WHERE category='电脑外设'ORDER BY price DESC LIMIT10""")print("❌ 优化前:")forrowincursor.fetchall(): print(f" type={row[3]}, key={row[6]}, Extra={row[10]}")优化:idx_category_price联合索引已包含 ORDER BY 列,但方向需一致。验证:
cursor.execute(""" EXPLAIN SELECT * FROM products WHERE category='电脑外设'ORDER BY price DESC LIMIT10""")print("✅ 优化后:")forrowincursor.fetchall(): print(f" type={row[3]}, key={row[6]}, Extra={row[10]}")若 Extra 显示Using index condition; Backward index scan,说明 MySQL 8.0 可反向扫描降序索引。
7.2 问题2:未用上 partition pruning
# 错误:函数包裹分区键cursor.execute(""" EXPLAIN SELECT COUNT(*)FROM orders WHERE MONTH(created_date)=6""")print("❌ 函数包裹分区键(全分区扫描):")forrowincursor.fetchall(): print(f" partitions={row[4]}")优化:用范围查询替代:
cursor.execute(""" EXPLAIN SELECT COUNT(*)FROM orders WHERE created_date>='2025-06-01'AND created_date<'2025-07-01'""")print("✅ 范围查询(分区裁剪):")forrowincursor.fetchall(): print(f" partitions={row[4]}")7.3 问题3:深分页全表扫描
用户翻到第 10000 页时,LIMIT 10 OFFSET 100000仍然扫描前 100000 行。
优化:延迟关联(已在user_orders函数中使用)。更激进的方案是游标分页——前端传最后一笔订单 ID,用WHERE id < last_id ORDER BY id DESC LIMIT 10。
8. 归档策略
每天凌晨用事件调度器(或外部 Cron)将 3 个月前的订单迁移到归档表:
def archive_old_orders(conn, before_date):"""将指定日期前的订单归档""" cursor=conn.cursor()# 创建归档表(如果不存在)cursor.execute("CREATE TABLE IF NOT EXISTS orders_archive LIKE orders")# 分区表不能直接 INSERT...SELECT 跨分区,需逐分区处理# 简化:用 DELETE + INSERT 逐批处理cursor.execute(""" INSERT INTO orders_archive SELECT * FROM orders WHERE created_date<%s""",(before_date,))cursor.execute("DELETE FROM orders WHERE created_date < %s",(before_date,))conn.commit()print(f"✅ 已归档 {cursor.rowcount} 条订单到 orders_archive")# 归档 3 个月前的数据archive_old_orders(conn,'2025-05-01')分区带来的额外好处:如果可以按分区整体归档,直接用ALTER TABLE orders TRUNCATE PARTITION p202504,零写入放大。
9. 最终架构总览
┌──────────────┐ │ Python 应用 │ └──────┬───────┘ │ ┌────────────┴────────────┐ │ ProxySQL(6033)│ │ 读写分离 + 查询路由 │ └────────────┬────────────┘ │ ┌──────────────────┼──────────────────┐ │ │ │ ┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐ │ Master │ │ Slave1 │ │ Slave2 │ │(写)│──1:N─│(读)│──1:N─│(读)│ └───────────┘ └───────────┘ └───────────┘ │ │ binlog ▼ ┌───────────┐ │ 归档脚本 │ ← 每日cron│ orders → │ │ orders_archive │ └───────────┘关键组件总结:
10. 动手试试:优化你的电商系统
调整索引:在
orders表上尝试不同联合索引顺序,用 EXPLAIN 对比不同查询的性能。模拟压测:将并发数提升到 500,观察 QPS 和慢查询变化,找出下一个瓶颈。
实现游标分页:改造用户订单列表接口,用
WHERE id < last_id代替OFFSET。增加归档自动化:用 MySQL Event 定时执行归档存储过程(参考第11篇)。
11. 系列总结与进阶路线图
恭喜你完成了《MySQL 从入门到精通:Python 开发者实战》全部 30 篇文章!
你已经掌握的能力:
数据库基础操作与 DDL/DML 安全准则
索引原理(B+Tree)与性能优化
事务、锁、MVCC 的底层机制
主从复制、高可用架构与 ProxySQL 读写分离
分库分表、全局 ID 生成与分布式事务
全链路压测与性能诊断
进阶学习路线:
当前水平 ──→ 深入 MySQL 源码(InnoDB 存储引擎实现) │ ├──→ 云原生数据库(TiDB、OceanBase) │ ├──→ 数据仓库与 OLAP(ClickHouse、Doris) │ └──→ 架构师思维:容量规划、容灾演练、成本优化持续学习的建议:
阅读 MySQL 官方文档和 release notes,关注 8.0 新特性
订阅 Percona Blog、MySQL Server Team Blog
参与开源项目(ShardingSphere、Vitess 等),贡献代码或文档
感谢你陪伴到这最后一篇。数据库的世界广袤而深邃,30 篇文章只是打开了一扇门。真正的精通,来自于生产环境的锤炼与持续的好奇心。
想了解更多还可以去各个平台搜索「IT策士」,一起升级 IT 思维 !