给出包含“核心贡献识别”、“关键公式”、“函数依赖关系”、“代码结构设计”等部分详尽的算法解析报告。
比如说《LLaMA-MoE: Building Mixture-of-Experts from LLaMA with Continual Pre-training》这篇讲基于现有密集型大语言模型构建Mixture-of-Experts (MoE)模型的方法。
选择这篇论文,无需指令,直接使用代码工具抽取
即可得到该篇论文的算法报告:
下载源码,即可得到可直接运行,足以满足测试需求的伪代码——源码支持 Python, MATLAB, C++等运行环境算法复现实验,源码包含注释的完整代码段(Data Loader, Model, Training Loop等),还提供 requirements.txt或依赖库说明。
供实验复现的伪代码如下:
''' 算法实现:LLaMA-MoE 构建与推理 论文:LLaMA-MoE: Building Mixture-of-Experts from LLaMA 核心贡献:提出首个从现有decoder-only密集大语言模型(LLaMA-2)构建混合专家模型(MoE)的框架, 通过分割FFN参数并持续预训练,在保持模型性能的同时大幅降低训练成本。 实现说明: - 单文件实现:所有代码在一个 .py 文件中 - 完整实现论文中的核心公式 (1)-(6) - 每个函数都标注对应的论文公式编号 - 包含详细注释,说明公式含义和实现细节 - 使用 NumPy 实现张量运算,便于理解和验证逻辑 ''' import numpy as np from dataclasses import dataclass from typing import List, Tuple, Optional # ============= 数据结构定义 ============= @dataclass class MoEConfig: ''' MoE 模型超参数配置 对应论文中的参数设置 ''' d_model: int = 4096 # 模型维度 d d_hidden: int = 11008 # FFN隐藏层维度 d_h (LLaMA通常为 8/3 * d_model) num_experts: int = 8 # 专家总数 N top_k: int = 2 # 激活的专家数 k batch_size: int = 2 # 批次大小 seq_len: int = 1024 # 序列长度 @dataclass class ExpertWeights: ''' 单个专家的权重集合 ''' W_up: np.ndarray # shape=(d, m) W_gate: np.ndarray # shape=(d, m) W_down: np.ndarray # shape=(m, d) # ============= 核心公式函数 ============= def llama_ffn_forward( x: np.ndarray, W_up: np.ndarray, W_gate: np.ndarray, W_down: np.ndarray ) -> np.ndarray: ''' 计算原始 LLaMA 的 FFN 输出 对应论文公式 (1): $$y = h W_{\mathrm{down}}, \quad h = x W_{\mathrm{up}} \odot \mathrm{Swish}(x W_{\mathrm{gate}})$$ 实现说明: - Swish 激活函数定义为 $x \cdot \text{sigmoid}(x)$ - 使用矩阵乘法 (@) 进行投影 - 使用逐元素乘法 (*) 进行门控融合 Args: x: 输入向量, shape=(batch_size, seq_len, d) W_up: 上投影权重矩阵, shape=(d, d_h) W_gate: 门控投影权重矩阵, shape=(d, d_h) W_down: 下投影权重矩阵, shape=(d_h, d) Returns: y: FFN层输出, shape=(batch_size, seq_len, d) ''' # 计算门控分支: x W_gate gate_out = x @ W_gate # 计算 Swish 激活: gate_out * sigmoid(gate_out) swish_act = gate_out * (1.0 / (1.0 + np.exp(-gate_out))) # 计算上投影分支: x W_up up_out = x @ W_up # 逐元素乘法 (Hadamard product): h = up_out \odot swish_act h = up_out * swish_act # 下投影: y = h W_down y = h @ W_down return y def moe_top_k_aggregate( expert_outputs: np.ndarray, gate_weights: np.ndarray, top_k_indices: np.ndarray ) -> np.ndarray: ''' MoE 层的 Top-k 聚合输出 对应论文公式 (2): $$y = \sum_{i \in \kappa} G(x)_i \cdot E_i(x)$$ 实现说明: - 利用高级索引从所有专家输出中提取 Top-k 专家的输出 - 利用高级索引提取对应的门控权重 - 进行加权求和得到最终输出 Args: expert_outputs: 所有专家的输出, shape=(batch_size, seq_len, num_experts, d) gate_weights: 门控网络的权重, shape=(batch_size, seq_len, num_experts) top_k_indices: 被选中的专家索引, shape=(batch_size, seq_len, top_k) Returns: y: MoE层聚合后的输出, shape=(batch_size, seq_len, d) ''' batch_size, seq_len, top_k = top_k_indices.shape d_model = expert_outputs.shape[-1] # 构建索引网格,用于从 expert_outputs 中提取数据 # batch_indices: (batch_size, 1, 1) -> 广播到 (batch_size, seq_len, top_k) batch_indices = np.arange(batch_size)[:, None, None] # seq_indices: (1, seq_len, 1) -> 广播到 (batch_size, seq_len, top_k) seq_indices = np.arange(seq_len)[None, :, None] # 1. 选出 Top-k 专家的输出 # expert_outputs[batch_indices, seq_indices, top_k_indices, :] # 结果形状: (batch_size, seq_len, top_k, d) selected_expert_outputs = expert_outputs[batch_indices, seq_indices, top_k_indices, :] # 2. 选出 Top-k 专家对应的权重 # gate_weights[batch_indices, seq_indices, top_k_indices] # 结果形状: (batch_size, seq_len, top_k) selected_gate_weights = gate_weights[batch_indices, seq_indices, top_k_indices] # 3. 加权求和 # 将权重维度扩展以便广播: (batch_size, seq_len, top_k, 1) weights_expanded = selected_gate_weights[..., None] # 加权: (batch_size, seq_len, top_k, d) weighted_outputs = selected_expert_outputs * weights_expanded # 在 top_k 维度上求和: (batch_size, seq_len, d) y = np.sum(weighted_outputs, axis=2) return y def slice_expert_weights( W_up: np.ndarray, W_gate: np.ndarray, W_down: np.ndarray, neuron_indices: List[int] ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: ''' 专家权重的分割 对应论文公式 (3): $$W_{\mathrm{up}}^{(j)} = W_{\mathrm{up}[:, S_j]}, \quad W_{\mathrm{gate}}^{(j)} = W_{\mathrm{gate}[:, S_j]}, \quad W_{\mathrm{down}}^{(j)} = W_{\mathrm{down}[S_j, :]}$$ 实现说明: - W_up 和 W_gate 对列(维度 d_h)进行切片 - W_down 对行(维度 d_h)进行切片 - neuron_indices 是第 j 个专家分配的神经元索引集合 $S_j$ Args: W_up: 原始上投影权重, shape=(d, d_h) W_gate: 原始门控权重, shape=(d, d_h) W_down: 原始下投影权重, shape=(d_h, d) neuron_indices: 第j个专家分配的神经元索引集合 Returns: expert_weights: 包含 (W_up_j, W_gate_j, W_down_j) 的元组 ''' # 将列表转换为 numpy 数组以便索引 indices = np.array(neuron_indices) # 切片操作 W_up_j = W_up[:, indices] W_gate_j = W_gate[:, indices] W_down_j = W_down[indices, :] return W_up_j, W_gate_j, W_down_j def expert_forward( x: np.ndarray, W_up_j: np.ndarray, W_gate_j: np.ndarray, W_down_j: np.ndarray ) -> np.ndarray: ''' 单个专家的输出计算 对应论文公式 (4): $$E_j(x) = h_j W_{\mathrm{down}}^{(j)}, \quad h_j = x W_{\mathrm{up}}^{(j)} \odot \mathrm{Swish}(x W_{\mathrm{gate}}^{(j)})$$ 实现说明: - 逻辑与公式1完全一致,只是使用了分割后的子权重矩阵 - 直接复用 llama_ffn_forward 的逻辑 Args: x: 输入向量, shape=(batch_size, seq_len, d) W_up_j: 第j个专家的上投影权重, shape=(d, m) W_gate_j: 第j个专家的门控权重, shape=(d, m) W_down_j: 第j个专家的下投影权重, shape=(m, d) Returns: output: 第j个专家的输出, shape=(batch_size, seq_len, d) ''' return llama_ffn_forward(x, W_up_j, W_gate_j, W_down_j) def compute_neuron_importance( hidden_states: np.ndarray, grad_loss: np.ndarray ) -> np.ndarray: ''' 神经元重要性计算(用于 Neuron-Sharing 方法) 对应论文公式 (5): $$v := v + \sum_{(x,y) \in D} \left| h \odot \nabla_h L(x,y) \right|$$ 实现说明: - 计算隐藏状态与梯度的逐元素乘积 - 取绝对值 - 在 batch 和 seq_len 维度上进行求和 Args: hidden_states: 中间隐藏状态 h, shape=(batch_size, seq_len, d_h) grad_loss: 损失对隐藏状态的梯度, shape=(batch_size, seq_len, d_h) Returns: importance_score: 重要性增量, shape=(d_h,) ''' # 逐元素乘积 product = hidden_states * grad_loss # 绝对值 abs_product = np.abs(product) # 在 batch (dim 0) 和 seq_len (dim 1) 上求和 importance_score = np.sum(abs_product, axis=(0, 1)) return importance_score def calculate_moe_scale_factor( num_experts: int, top_k: int ) -> float: ''' 专家输出重缩放因子计算 对应论文公式 (6): $$\text{scale factor} = \frac{N}{k}$$ 实现说明: - 简单的除法运算 - 用于平衡专家网络的输出幅度 Args: num_experts: 专家总数 N top_k: 激活的专家数 k Returns: scale: 缩放因子 ''' return float(num_experts) / float(top_k) # ============= 主算法类 ============= class LLaMAMoE: ''' LLaMA-MoE 实现类 核心思想: - 从密集的 LLaMA FFN 层构建 MoE 层 - 支持通过 IndependentRandom 策略分割神经元 - 实现 Top-k 路由和聚合 与基线对比: - 基线:原始密集 LLaMA FFN - 改进:将 FFN 参数分割为多个专家,通过门控网络激活部分专家 ''' def __init__(self, config: MoEConfig): ''' 初始化 LLaMA-MoE 模型 Args: config: 模型配置 ''' self.config = config # 初始化密集权重 (模拟从预训练模型加载) # 实际应用中这些权重来自 LLaMA checkpoint np.random.seed(42) self.W_up = np.random.randn(config.d_model, config.d_hidden) * 0.01 self.W_gate = np.random.randn(config.d_model, config.d_hidden) * 0.01 self.W_down = np.random.randn(config.d_hidden, config.d_model) * 0.01 # 存储专家权重列表 self.experts: List[ExpertWeights] = [] # 构建专家 self._build_experts() # 计算缩放因子 self.scale_factor = calculate_moe_scale_factor( config.num_experts, config.top_k ) def _build_experts(self): ''' 构建专家 (Expert Construction 阶段) 使用 IndependentRandom 策略:随机将 d_h 个神经元索引分成 n 个等长子集 ''' d_h = self.config.d_hidden n = self.config.num_experts m = d_h // n # 每个专家的神经元数 # 生成随机索引并打乱 indices = np.arange(d_h) np.random.shuffle(indices) # 分割索引给每个专家 for i in range(n): start_idx = i * m end_idx = (i + 1) * m expert_indices = indices[start_idx:end_idx] # 使用公式3分割权重 W_up_j, W_gate_j, W_down_j = slice_expert_weights( self.W_up, self.W_gate, self.W_down, expert_indices ) self.experts.append(ExpertWeights(W_up_j, W_gate_j, W_down_j)) print(f"成功构建 {n} 个专家,每个专家拥有 {m} 个神经元。") def forward( self, x: np.ndarray, gate_scores: Optional[np.ndarray] = None ) -> Tuple[np.ndarray, dict]: ''' MoE 前向传播 Args: x: 输入张量, shape=(batch_size, seq_len, d_model) gate_scores: 可选的门控网络得分,如果为 None 则随机生成用于演示 Returns: output: MoE 层输出, shape=(batch_size, seq_len, d_model) aux_info: 包含中间信息的字典 (用于调试或分析) ''' batch_size, seq_len, d_model = x.shape num_experts = self.config.num_experts top_k = self.config.top_k # 1. 模拟门控网络 # 实际中这里是一个线性层 + Softmax if gate_scores is None: # 随机生成 logits gate_logits = np.random.randn(batch_size, seq_len, num_experts) else: gate_logits = gate_scores # Softmax 归一化 # 减去最大值以保持数值稳定性 max_logits = np.max(gate_logits, axis=-1, keepdims=True) exp_logits = np.exp(gate_logits - max_logits) sum_exp = np.sum(exp_logits, axis=-1, keepdims=True) gate_weights = exp_logits / (sum_exp + 1e-9) # 2. 选择 Top-k 专家 # 获取分数最高的 k 个专家的索引 # np.argpartition 返回的是未排序的索引,这里为了简单直接用 argsort top_k_indices = np.argsort(gate_weights, axis=-1)[..., -top_k:] # 3. 计算所有专家的输出 # 注意:为了符合公式2的输入格式,这里计算所有专家的输出 # 在实际高效实现中,可以只计算被选中的专家 all_expert_outputs = np.zeros( (batch_size, seq_len, num_experts, d_model) ) for i, expert in enumerate(self.experts): # 使用公式4计算单个专家输出 expert_out = expert_forward( x, expert.W_up, expert.W_gate, expert.W_down ) all_expert_outputs[:, :, i, :] = expert_out # 4. 聚合输出 (使用公式2) y = moe_top_k_aggregate(all_expert_outputs, gate_weights, top_k_indices) # 5. 应用重缩放因子 (使用公式6) y = y * self.scale_factor aux_info = { "gate_weights": gate_weights, "top_k_indices": top_k_indices, "scale_factor": self.scale_factor } return y, aux_info def analyze_neuron_importance( self, x: np.ndarray, grad_loss: np.ndarray ) -> np.ndarray: ''' 分析神经元重要性 (用于 Neuron-Sharing 策略) 演示公式5的使用。 注意:这里需要获取中间隐藏状态 h。 为了演示,我们假设 grad_loss 是对 h 的梯度。 在实际训练中,这需要反向传播或 hook 机制。 ''' # 这里为了演示,我们重新计算密集 FFN 的 h # h = x W_up \odot Swish(x W_gate) gate_out = x @ self.W_gate swish_act = gate_out * (1.0 / (1.0 + np.exp(-gate_out))) up_out = x @ self.W_up h = up_out * swish_act # 使用公式5计算重要性 importance = compute_neuron_importance(h, grad_loss) return importance # ============= 使用示例 ============= def main(): ''' 使用示例 ''' print("=" * 60) print("LLaMA-MoE 算法实现演示") print("=" * 60) # 1. 创建配置 (使用较小的维度以便快速演示) config = MoEConfig( d_model=512, # 较小的模型维度 d_hidden=2048, # FFN 维度 (4x d_model) num_experts=4, # 4个专家 top_k=2, # 激活2个专家 batch_size=2, seq_len=128 ) print(f"\n模型配置:") print(f" 模型维度 (d): {config.d_model}") print(f" 隐藏层维度 (d_h): {config.d_hidden}") print(f" 专家数量 (N): {config.num_experts}") print(f" Top-k (k): {config.top_k}") # 2. 初始化模型 model = LLaMAMoE(config) # 3. 生成随机输入数据 np.random.seed(42) x = np.random.randn(config.batch_size, config.seq_len, config.d_model).astype(np.float32) print(f"\n输入数据 shape: {x.shape}") # 4. 执行前向传播 print("\n执行 MoE 前向传播...") output, aux_info = model.forward(x) print(f"输出数据 shape: {output.shape}") print(f"使用的缩放因子: {aux_info['scale_factor']:.2f}") # 打印第一个 token 的路由情况 print(f"\n第一个样本第一个 Token 的路由权重 (Top-{config.top_k}):") top_k_indices = aux_info['top_k_indices'][