
AI 索引推荐引擎让机器学会该在哪个列上建索引一、索引选择的困境DBA 的经验 vs 数据的真相数据库索引设计是性能优化的核心环节但也是最依赖经验的环节。一个有经验的 DBA 能根据查询模式判断哪些列需要索引哪些列适合组合索引组合索引的列顺序如何排列。但这种经验有三个问题不可规模化、不可量化、不可传承。更棘手的是索引本身有代价。每个索引占用存储空间每次写入需要同步更新所有索引。一张表上建 20 个索引写入性能可能下降 5-10 倍。索引不是越多越好而是要找到查询加速收益和写入维护成本之间的最优平衡点。AI 索引推荐引擎的目标是基于历史查询负载和数据分布特征自动推荐最优的索引方案并量化每个索引的预期收益和成本。这不是替代 DBA而是给 DBA 提供数据驱动的决策依据。二、索引推荐的核心算法从负载分析到代价建模AI 索引推荐的流程可以分为四个阶段负载采集、候选索引生成、代价评估、方案推荐。flowchart TD A[慢查询日志 审计日志] -- B[查询模式提取br/WHERE/JOIN/GROUP BY/ORDER BY] B -- C[候选索引生成br/枚举单列/组合索引] C -- D[代价评估模型br/What-if 优化器] D -- E[收益-成本权衡br/查询加速 vs 写入开销] E -- F[推荐方案输出br/索引列表 预期收益] subgraph 负载分析层 A B end sub索引候选层 C end subgraph 代价评估层 D E end subgraph 推荐输出层 F end负载采集是起点。需要从slow_query_log和performance_schema中提取查询模式包括每个查询的执行频率、平均执行时间、扫描行数、返回行数。关键是要区分频繁但已优化的查询和偶尔但极慢的查询——后者可能更需要索引。候选索引生成采用启发式规则从 WHERE 条件中提取等值谓词列和范围谓词列从 JOIN 条件中提取关联列从 GROUP BY 和 ORDER BY 中提取排序列。组合索引的列顺序遵循等值列在前、范围列在后的原则。代价评估是核心。采用 What-if 优化器的方式假设某个索引存在重新估算查询的代价对比无索引时的代价差值就是该索引对这个查询的收益。对所有查询的收益求和减去索引的写入维护成本得到净收益。三、生产级实现基于 What-if 优化器的索引推荐引擎from dataclasses import dataclass, field from typing import List, Dict, Tuple from itertools import combinations dataclass class QueryProfile: 查询画像从慢查询日志中提取的关键特征 query_id: str # 查询指纹 sql_template: str # SQL 模板参数化后 frequency: int # 每日执行次数 avg_execution_ms: float # 平均执行时间 avg_rows_examined: int # 平均扫描行数 avg_rows_returned: int # 平均返回行数 where_columns: List[str] # WHERE 条件涉及的列 join_columns: List[str] # JOIN 条件涉及的列 group_by_columns: List[str] # GROUP BY 涉及的列 order_by_columns: List[str] # ORDER BY 涉及的列 dataclass class IndexCandidate: 索引候选方案 table_name: str columns: Tuple[str, ...] # 索引列有序 index_type: str # btree / hash / bloom estimated_size_mb: float # 预估索引大小 benefit_queries: Dict[str, float] field(default_factorydict) # 查询ID - 收益(ms) write_cost_per_row: float 0.0 # 每行写入的额外开销(ms) class IndexRecommender: AI 索引推荐引擎 核心逻辑基于查询负载和 What-if 代价评估 推荐净收益最大的索引方案 def __init__(self, table_stats: Dict): self.table_stats table_stats # 表统计信息 def generate_candidates(self, queries: List[QueryProfile], max_columns: int 3) - List[IndexCandidate]: 生成候选索引列表 策略 1. 单列索引WHERE/JOIN 中的高频列 2. 组合索引等值列 范围列最多 max_columns 列 3. 列顺序等值列在前范围列在后 candidates [] # 统计每列的查询频率 column_freq: Dict[str, int] {} for q in queries: for col in q.where_columns q.join_columns: column_freq[col] column_freq.get(col, 0) q.frequency # 生成单列候选 for col, freq in sorted(column_freq.items(), keylambda x: -x[1]): candidates.append(IndexCandidate( table_nameself._get_table(col), columns(col,), index_typebtree, estimated_size_mbself._estimate_index_size(col), )) # 生成组合候选等值列 范围列 eq_cols set() range_cols set() for q in queries: eq_cols.update(q.where_columns) range_cols.update(q.order_by_columns) range_cols.update(q.group_by_columns) for eq_col in eq_cols: for range_col in range_cols: if eq_col ! range_col: combo (eq_col, range_col) candidates.append(IndexCandidate( table_nameself._get_table(eq_col), columnscombo, index_typebtree, estimated_size_mbself._estimate_index_size(combo), )) return candidates def evaluate_benefit(self, candidate: IndexCandidate, queries: List[QueryProfile]) - float: 评估索引候选的净收益 净收益 查询加速收益总和 - 写入维护成本 查询加速收益 sum(查询频率 * 单次加速时间) 写入维护成本 写入频率 * 单行额外开销 total_benefit 0.0 for q in queries: # 判断该索引是否对此查询有效 relevant False if set(candidate.columns) set(q.where_columns): relevant True if set(candidate.columns) set(q.join_columns): relevant True if not relevant: continue # What-if 估算假设索引存在时的扫描行数 # 简化模型索引过滤率 索引列基数 / 表行数 estimated_selectivity self._estimate_selectivity(candidate.columns) estimated_rows q.avg_rows_examined * estimated_selectivity # 加速收益 原扫描行数 - 索引过滤后扫描行数 speedup (q.avg_rows_examined - estimated_rows) / q.avg_rows_examined benefit_ms q.avg_execution_ms * speedup * q.frequency candidate.benefit_queries[q.query_id] benefit_ms total_benefit benefit_ms # 写入维护成本 write_freq self._get_write_frequency(candidate.table_name) write_cost write_freq * candidate.write_cost_per_row * 86400 # 每日成本 return total_benefit - write_cost def recommend(self, queries: List[QueryProfile], max_indexes: int 5) - List[IndexCandidate]: 推荐最优索引方案 算法 1. 生成所有候选索引 2. 评估每个候选的净收益 3. 按净收益降序排列 4. 去除被更优索引覆盖的子集 5. 返回 Top-N 推荐 candidates self.generate_candidates(queries) # 评估净收益 scored [] for c in candidates: net_benefit self.evaluate_benefit(c, queries) if net_benefit 0: scored.append((c, net_benefit)) # 按净收益降序排列 scored.sort(keylambda x: -x[1]) # 去除被覆盖的子集索引 # 如果索引 (a, b) 已入选则 (a) 被覆盖不再推荐 recommended [] for c, score in scored: dominated False for rc, _ in recommended: if c.columns rc.columns[:len(c.columns)]: dominated True break if not dominated: recommended.append((c, score)) return [c for c, _ in recommended[:max_indexes]] def _estimate_selectivity(self, columns: Tuple[str, ...]) - float: 估算索引列组合的过滤率 selectivity 1.0 for col in columns: ndv self.table_stats.get(col, {}).get(ndv, 1000) total_rows self.table_stats.get(col, {}).get(total_rows, 1000000) selectivity * min(1.0, ndv / total_rows) return selectivity def _estimate_index_size(self, columns: Tuple[str, ...]) - float: 估算索引大小MB # 简化估算每行索引约 30 字节 total_rows self.table_stats.get(columns[0], {}).get(total_rows, 1000000) return total_rows * 30 / 1024 / 1024 def _get_table(self, column: str) - str: 获取列所属表名 return self.table_stats.get(column, {}).get(table, unknown) def _get_write_frequency(self, table_name: str) - float: 获取表的每秒写入频率 return self.table_stats.get(table_name, {}).get(write_per_second, 100.0)四、AI 索引推荐的盲区数据倾斜、查询漂移与组合爆炸数据倾斜下的选择性误判。AI 推荐引擎用 NDVNumber of Distinct Values估算过滤率但 NDV 无法反映数据分布的倾斜程度。一列可能有 10000 个不同值但 90% 的行集中在前 10 个值。这种情况下索引对高频值的过滤效果远低于估算值。解决方案是引入直方图统计用 Top-N 频率修正过滤率估算。查询漂移导致索引失效。AI 推荐基于历史查询负载但业务查询模式会变化。一个为 Q1 查询模式设计的索引到 Q3 可能完全无用反而增加写入开销。必须建立索引使用率监控定期淘汰低效索引。组合索引的搜索空间爆炸。一张 20 列的表3 列组合索引的候选数是 C(20,3) 1140。考虑列顺序后是 P(20,3) 6840。对每个候选做 What-if 评估计算量巨大。实际生产中需要用贪心算法或遗传算法做近似搜索但近似解可能遗漏全局最优。五、总结AI 索引推荐引擎的核心价值是将索引选择从经验驱动转向数据驱动。但它的推荐结果必须经过人工审核因为机器无法理解业务语义——某些索引虽然查询收益低但服务于关键业务路径不能被算法淘汰。落地路线建议部署慢查询日志采集管道积累至少 2 周的查询负载数据从单列索引推荐切入验证 What-if 代价评估的准确率引入直方图统计修正数据倾斜场景下的过滤率估算建立索引使用率监控每月自动淘汰使用率低于 1% 的索引组合索引推荐限制在 3 列以内避免搜索空间爆炸AI 推荐结果必须经 DBA 审核后才能上线不可自动执行 DDL