简介
本文详细解析了Transformer架构的核心原理,包括位置编码、多头注意力和残差连接等。通过时间序列预测实战案例,展示了Encoder-Decoder结构的应用,并提供完整代码实现和可视化分析。文章解释了Transformer如何解决序列建模中的长依赖问题,使其成为大模型的关键架构。
首先,咱们先大概解释一下什么事Transformer,以及优势~
如果把机器学习比作让机器读书学习的过程,传统 RNN/LSTM 就像是从头到尾一页页地读书,读过的内容记在脑子里,但越往后,前面看的内容就模糊了(长依赖很难记住)。
那么,Transformer 则像是配备了一个全书搜索+多焦点注意力的读书系统:每当它要理解当前位置的内容时,它可以同时翻全书,聚焦到最相关的段落,并且可以开多个注意力焦点(多头注意力),从不同视角理解内容;结合多层堆叠,它就像一个推理深度不断加深的阅读者。
它好在哪里?
- 省事:不必一页页串行回忆,直接在全局范围检索相关信息,并行训练效率高。
- 能力强:多头注意力像多个专家,能同时从不同的关系维度建模。
- 可泛化:从语言到图像、语音、时间序列,它都能工作,已成为 AI 时代的通用基座模型方法论。
总之,大家可以这么理解:Transformer 是注意力机制驱动的并行神经网络架构,通过全局对齐与多头注意力,解决了序列建模中长依赖难、并行效率低的问题。
老规矩:如果大家觉得对你有所启发,不要忘记点赞或者转发,你们的支持是我持续的动力~
下面,会进行一个详细的阐述~
一、核心原理
模型的宏观结构
经典的 Transformer 由 Encoder-Decoder 两部分组成(也可只用 Encoder 或 Decoder):
- Encoder:输入序列经过多层自注意力 + 前馈网络的堆叠,得到上下文表示。
- Decoder:在预测时逐步看已预测的部分(并做未来信息屏蔽),对每个位置再次做自注意力,同时通过交叉注意力看 Encoder 的输出,从而生成输出序列。
最后,实战会使用一个小型的 Encoder-Decoder 结构进行时间序列多步预测。
位置编码
Positional Encoding
由于注意力本身对位置不敏感,需要注入位置信息。
经典位置编码使用固定的正弦-余弦函数:
- 其中 是序列位置, 是通道维度索引。
- 不同频率的正弦、余弦组合,使模型可以通过线性变换学习相对位置关系。
也可用可学习的位置嵌入,但正弦-余弦简单稳定,跨长度泛化好。
缩放点积注意力
Scaled Dot-Product Attention
给定查询向量 、键向量 、值向量 ,注意力本质是用 与 的相似度对 加权求和:
- 的形状通常是 , 是键向量维度。
- 用于缩放,避免点积过大导致梯度不稳定。
- 是可选的 mask(如解码器中的未来信息屏蔽,把不该看的位置赋值为 )。
多头注意力
Multi-Head Attention
不是只用一组 ,而是把通道分成多个子空间(头),每个头做一次注意力,再把各头结果拼接起来:
多头像多个专家,从不同子空间看问题,增强表达能力。
残差连接与层归一化
每个子层都有残差连接 + 层归一化:
残差帮助深层梯度流动,LayerNorm 稳定训练。
实现上可分Pre-Norm和Post-Norm,现代实践多用 Pre-Norm,收敛更稳。
前馈网络
Position-wise FFN
对每个位置独立的两层全连接网络,一般包含非线性激活(ReLU/GELU):
或使用 GELU 激活,效果更佳。
编码器与解码器
编码器层:自注意力 + FFN。
解码器层:自注意力(带未来信息屏蔽)+ 交叉注意力(看编码器输出)+ FFN。
在序列预测(如时间序列多步预测)中,常用教师强制,即训练时把真实前缀喂给解码器,测试时自回归生成。
损失函数与优化
本文实战使用多任务:
回归:预测未来值(MSE)
分类(辅助):识别序列的生成类型(正弦/方波/锯齿波),用交叉熵
总损失:
优化器:AdamW;学习率策略:Noam(Transformer 原版热身策略)
其中 是步数。
二、完整案例
我们构造一个数据集:包含三种模式(正弦、方波、锯齿波),长度为 60 的历史序列,预测未来 20 个点;同时预测该序列属于哪一种生成模式(辅助分类)。
模型:小型 Encoder-Decoder Transformer,数值输入经线性投影至 d_model,并叠加正弦位置编码。
训练:使用 AdamW + Noam 学习率计划;损失 = MSE + λ·CE。
import mathimport osimport randomfrom dataclasses import dataclassimport numpy as npimport torchimport torch.nn as nnfrom torch.utils.data import Dataset, DataLoaderfrom sklearn.manifold import TSNEimport matplotlib.pyplot as pltSEED = 42random.seed(SEED)np.random.seed(SEED)torch.manual_seed(SEED)device = torch.device('cuda'if torch.cuda.is_available() else'cpu')# 数据集构造def gen_sine(length, freq, amp, phase, noise_std=0.05): t = np.arange(length) y = amp * np.sin(2 * np.pi * freq * t / length + phase) y += np.random.normal(0, noise_std, size=length) return ydef gen_square(length, freq, amp, noise_std=0.05): t = np.arange(length) s = np.sign(np.sin(2 * np.pi * freq * t / length)) y = amp * s + np.random.normal(0, noise_std, size=length) return ydef gen_sawtooth(length, freq, amp, noise_std=0.05): t = np.arange(length) # sawtooth: 线性上升、骤降 frac = (t * freq / length) % 1.0 y = amp * (2 * frac - 1) y += np.random.normal(0, noise_std, size=length) return ydef gen_mixed_sequence(src_len=60, tgt_len=20): # 随机选择一种模式 mode = np.random.choice([0, 1, 2]) # 0: sine, 1: square, 2: sawtooth amp = np.random.uniform(0.5, 1.5) freq = np.random.uniform(1.0, 6.0) phase = np.random.uniform(0, np.pi) total_len = src_len + tgt_len if mode == 0: seq = gen_sine(total_len, freq, amp, phase, noise_std=0.06) elif mode == 1: seq = gen_square(total_len, freq, amp, noise_std=0.06) else: seq = gen_sawtooth(total_len, freq, amp, noise_std=0.06) # 叠加缓慢趋势项,增强难度 trend = np.linspace(0, np.random.uniform(-0.5, 0.5), total_len) seq = seq + trend src = seq[:src_len] tgt = seq[src_len:] return src.astype(np.float32), tgt.astype(np.float32), modeclass TimeSeriesDataset(Dataset): def __init__(self, n_samples=20000, src_len=60, tgt_len=20): self.src, self.tgt, self.mode = [], [], [] for _ in range(n_samples): s, t, m = gen_mixed_sequence(src_len, tgt_len) self.src.append(s) self.tgt.append(t) self.mode.append(m) self.src = np.array(self.src) # [N, src_len] self.tgt = np.array(self.tgt) # [N, tgt_len] self.mode = np.array(self.mode) # [N] def __len__(self): return len(self.src) def __getitem__(self, idx): # 返回输入序列、目标序列、类别 return self.src[idx], self.tgt[idx], self.mode[idx]@dataclassclass HParams: src_len: int = 60 tgt_len: int = 20 d_model: int = 64 nhead: int = 4 num_layers: int = 2 dim_ff: int = 128 dropout: float = 0.1 cls_weight: float = 0.2# 分类损失权重 batch_size: int = 128 epochs: int = 8 warmup_steps: int = 400 lr: float = 1.0# 初始lr将由Noam调节 weight_decay: float = 1e-4 grad_clip: float = 1.0hp = HParams()# 位置编码class PositionalEncoding(nn.Module): def __init__(self, d_model, max_len=1000): super().__init__() pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len).unsqueeze(1) div_term = torch.exp( torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model) ) pe[:, 0::2] = torch.sin(position.float() * div_term) pe[:, 1::2] = torch.cos(position.float() * div_term) self.register_buffer('pe', pe.unsqueeze(0)) # [1, max_len, d_model] def forward(self, x): # x: [B, L, d_model] L = x.size(1) return x + self.pe[:, :L, :]# 自定义 Transformer(暴露注意力权重)class TransformerBlock(nn.Module): def __init__(self, d_model, nhead, dim_ff, dropout=0.1): super().__init__() self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout, batch_first=True) self.linear1 = nn.Linear(d_model, dim_ff) self.dropout = nn.Dropout(dropout) self.linear2 = nn.Linear(dim_ff, d_model) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.act = nn.GELU() self.dropout1 = nn.Dropout(dropout) self.dropout2 = nn.Dropout(dropout) # 缓存注意力(仅最后一次前向的) self.last_self_attn_weights = None def forward(self, x, attn_mask=None, key_padding_mask=None): # Pre-Norm x_norm = self.norm1(x) attn_out, attn_weights = self.self_attn( x_norm, x_norm, x_norm, attn_mask=attn_mask, key_padding_mask=key_padding_mask, need_weights=True, average_attn_weights=False# 返回 [B, num_heads, L, L] ) self.last_self_attn_weights = attn_weights.detach() x = x + self.dropout1(attn_out) x_norm2 = self.norm2(x) ff = self.linear2(self.dropout(self.act(self.linear1(x_norm2)))) x = x + self.dropout2(ff) return xclass TransformerEncoder(nn.Module): def __init__(self, d_model, nhead, dim_ff, num_layers, dropout=0.1): super().__init__() self.layers = nn.ModuleList([ TransformerBlock(d_model, nhead, dim_ff, dropout) for _ in range(num_layers) ]) def forward(self, x, attn_mask=None, key_padding_mask=None): last_attn = None for layer in self.layers: x = layer(x, attn_mask=attn_mask, key_padding_mask=key_padding_mask) last_attn = layer.last_self_attn_weights return x, last_attn # 返回最后一层的注意力权重class TransformerDecoderBlock(nn.Module): def __init__(self, d_model, nhead, dim_ff, dropout=0.1): super().__init__() self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout, batch_first=True) self.cross_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout, batch_first=True) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.norm3 = nn.LayerNorm(d_model) self.linear1 = nn.Linear(d_model, dim_ff) self.linear2 = nn.Linear(dim_ff, d_model) self.dropout = nn.Dropout(dropout) self.dropout1 = nn.Dropout(dropout) self.dropout2 = nn.Dropout(dropout) self.dropout3 = nn.Dropout(dropout) self.act = nn.GELU() self.last_self_attn_weights = None self.last_cross_attn_weights = None def forward(self, x, mem, tgt_mask=None, tgt_key_padding_mask=None, mem_key_padding_mask=None): x_norm = self.norm1(x) self_attn_out, self_attn_w = self.self_attn( x_norm, x_norm, x_norm, attn_mask=tgt_mask, key_padding_mask=tgt_key_padding_mask, need_weights=True, average_attn_weights=False ) self.last_self_attn_weights = self_attn_w.detach() x = x + self.dropout1(self_attn_out) x_norm2 = self.norm2(x) cross_attn_out, cross_attn_w = self.cross_attn( x_norm2, mem, mem, key_padding_mask=mem_key_padding_mask, need_weights=True, average_attn_weights=False ) self.last_cross_attn_weights = cross_attn_w.detach() x = x + self.dropout2(cross_attn_out) x_norm3 = self.norm3(x) ff = self.linear2(self.dropout(self.act(self.linear1(x_norm3)))) x = x + self.dropout3(ff) return xclass TransformerDecoder(nn.Module): def __init__(self, d_model, nhead, dim_ff, num_layers, dropout=0.1): super().__init__() self.layers = nn.ModuleList([ TransformerDecoderBlock(d_model, nhead, dim_ff, dropout) for _ in range(num_layers) ]) def forward(self, x, mem, tgt_mask=None, tgt_key_padding_mask=None, mem_key_padding_mask=None): last_self, last_cross = None, None for layer in self.layers: x = layer(x, mem, tgt_mask, tgt_key_padding_mask, mem_key_padding_mask) last_self = layer.last_self_attn_weights last_cross = layer.last_cross_attn_weights return x, last_self, last_crossclass TimeSeriesTransformer(nn.Module): def __init__(self, hp: HParams, n_classes=3): super().__init__() self.hp = hp self.src_proj = nn.Linear(1, hp.d_model) self.tgt_proj = nn.Linear(1, hp.d_model) self.pe = PositionalEncoding(hp.d_model, max_len=2000) self.encoder = TransformerEncoder(hp.d_model, hp.nhead, hp.dim_ff, hp.num_layers, hp.dropout) self.decoder = TransformerDecoder(hp.d_model, hp.nhead, hp.dim_ff, hp.num_layers, hp.dropout) self.out = nn.Linear(hp.d_model, 1) # 预测数值 # 辅助分类头(对 encoder 输出进行池化) self.cls_head = nn.Sequential( nn.LayerNorm(hp.d_model), nn.Linear(hp.d_model, hp.d_model), nn.GELU(), nn.Linear(hp.d_model, n_classes) ) def generate_square_subsequent_mask(self, L): # 上三角为 -inf,阻止看未来 mask = torch.triu(torch.ones(L, L, device=device), diagonal=1) mask = mask.masked_fill(mask == 1, float('-inf')) return mask def forward(self, src, tgt_in): # src: [B, src_len], tgt_in: [B, tgt_len](解码器输入,训练时为已知前缀的真实值) B, S = src.shape T = tgt_in.shape[1] src = src.unsqueeze(-1) # [B, S, 1] tgt_in = tgt_in.unsqueeze(-1) # [B, T, 1] src = self.src_proj(src) tgt = self.tgt_proj(tgt_in) src = self.pe(src) tgt = self.pe(tgt) mem, enc_attn = self.encoder(src) tgt_mask = self.generate_square_subsequent_mask(T) dec_out, dec_self_attn, cross_attn = self.decoder(tgt, mem, tgt_mask=tgt_mask) pred = self.out(dec_out).squeeze(-1) # [B, T] # 分类:对 encoder 输出做平均池化(也可用 CLS token) pooled = mem.mean(dim=1) # [B, d_model] logits = self.cls_head(pooled) # [B, n_classes] # 暴露注意力权重,便于可视化 self.last_enc_self_attn = enc_attn # [B, H, S, S] self.last_dec_self_attn = dec_self_attn # [B, H, T, T] self.last_cross_attn = cross_attn # [B, H, T, S] return pred, logits# 训练工具class NoamLR: def __init__(self, optimizer, d_model, warmup_steps): self.optimizer = optimizer self.d_model = d_model self.warmup_steps = warmup_steps self.step_num = 0 self.lrs = [] def step(self): self.step_num += 1 lr = (self.d_model ** -0.5) * min(self.step_num ** -0.5, self.step_num * (self.warmup_steps ** -1.5)) for pg in self.optimizer.param_groups: pg['lr'] = lr self.lrs.append(lr) def get_last_lr(self): return self.lrs[-1] if self.lrs else0.0def collate_fn(batch): # batch: list of (src, tgt, mode) src = torch.tensor([b[0] for b in batch], dtype=torch.float32) tgt = torch.tensor([b[1] for b in batch], dtype=torch.float32) mode = torch.tensor([b[2] for b in batch], dtype=torch.long) return src, tgt, modedef train_one_epoch(model, loader, optimizer, scheduler, hp): model.train() mse_loss_fn = nn.MSELoss() ce_loss_fn = nn.CrossEntropyLoss() total_loss, total_mse, total_ce = 0.0, 0.0, 0.0 for src, tgt, mode in loader: src, tgt, mode = src.to(device), tgt.to(device), mode.to(device) # 教师强制:tgt_in = 右移一位的真实序列,这里简化用全部真实序列做输入(预测对应步) tgt_in = torch.zeros_like(tgt) tgt_in[:, 1:] = tgt[:, :-1] # 右移一位,第一位为0(或可用最后一个src的值) # 也可尝试把第一位设为 src 的最后一个点: tgt_in[:, 0] = src[:, -1] pred, logits = model(src, tgt_in) mse_loss = mse_loss_fn(pred, tgt) ce_loss = ce_loss_fn(logits, mode) loss = mse_loss + hp.cls_weight * ce_loss optimizer.zero_grad() loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), hp.grad_clip) optimizer.step() scheduler.step() total_loss += loss.item() * src.size(0) total_mse += mse_loss.item() * src.size(0) total_ce += ce_loss.item() * src.size(0) n = len(loader.dataset) return total_loss / n, total_mse / n, total_ce / n@torch.no_grad()def evaluate(model, loader, hp): model.eval() mse_loss_fn = nn.MSELoss() ce_loss_fn = nn.CrossEntropyLoss() total_loss, total_mse, total_ce = 0.0, 0.0, 0.0 correct, total = 0, 0 for src, tgt, mode in loader: src, tgt, mode = src.to(device), tgt.to(device), mode.to(device) # 验证也用教师强制方式 tgt_in = torch.zeros_like(tgt) tgt_in[:, 1:] = tgt[:, :-1] tgt_in[:, 0] = src[:, -1] pred, logits = model(src, tgt_in) mse_loss = mse_loss_fn(pred, tgt) ce_loss = ce_loss_fn(logits, mode) loss = mse_loss + hp.cls_weight * ce_loss total_loss += loss.item() * src.size(0) total_mse += mse_loss.item() * src.size(0) total_ce += ce_loss.item() * src.size(0) preds_cls = logits.argmax(dim=-1) correct += (preds_cls == mode).sum().item() total += mode.numel() n = len(loader.dataset) return total_loss / n, total_mse / n, total_ce / n, correct / totaldef build_dataloaders(): # 较大虚拟数据集(可根据硬件调参) train_set = TimeSeriesDataset(n_samples=18000, src_len=hp.src_len, tgt_len=hp.tgt_len) val_set = TimeSeriesDataset(n_samples=3000, src_len=hp.src_len, tgt_len=hp.tgt_len) test_set = TimeSeriesDataset(n_samples=3000, src_len=hp.src_len, tgt_len=hp.tgt_len) train_loader = DataLoader(train_set, batch_size=hp.batch_size, shuffle=True, num_workers=0, collate_fn=collate_fn) val_loader = DataLoader(val_set, batch_size=hp.batch_size, shuffle=False, num_workers=0, collate_fn=collate_fn) test_loader = DataLoader(test_set, batch_size=hp.batch_size, shuffle=False, num_workers=0, collate_fn=collate_fn) return train_loader, val_loader, test_loader# 主训练流程def main_train_and_analyze(): train_loader, val_loader, test_loader = build_dataloaders() model = TimeSeriesTransformer(hp).to(device) optimizer = torch.optim.AdamW(model.parameters(), lr=hp.lr, weight_decay=hp.weight_decay) scheduler = NoamLR(optimizer, d_model=hp.d_model, warmup_steps=hp.warmup_steps) history = {'train_loss': [], 'val_loss': [], 'train_mse': [], 'val_mse': [], 'train_ce': [], 'val_ce': [], 'val_acc': [], 'lr': []} print("Start training...") global_step = 0 for epoch in range(1, hp.epochs + 1): tr_loss, tr_mse, tr_ce = train_one_epoch(model, train_loader, optimizer, scheduler, hp) val_loss, val_mse, val_ce, val_acc = evaluate(model, val_loader, hp) history['train_loss'].append(tr_loss) history['val_loss'].append(val_loss) history['train_mse'].append(tr_mse) history['val_mse'].append(val_mse) history['train_ce'].append(tr_ce) history['val_ce'].append(val_ce) history['val_acc'].append(val_acc) history['lr'].append(scheduler.get_last_lr()) print(f"Epoch {epoch:02d}: " f"train_loss={tr_loss:.4f}, val_loss={val_loss:.4f}, " f"train_mse={tr_mse:.4f}, val_mse={val_mse:.4f}, " f"train_ce={tr_ce:.4f}, val_ce={val_ce:.4f}, val_acc={val_acc:.3f}, " f"lr={scheduler.get_last_lr():.6f}") # 图1:训练/验证损失曲线(线图) plt.figure() epochs = np.arange(1, hp.epochs + 1) plt.plot(epochs, history['train_loss'], marker='o', linewidth=2.5, label='Train Loss') plt.plot(epochs, history['val_loss'], marker='s', linewidth=2.5, label='Val Loss') plt.title('Training vs Validation Loss', fontsize=14) plt.xlabel('Epoch') plt.ylabel('Loss') plt.legend() plt.grid(alpha=0.3) plt.tight_layout() plt.show() # 图2:学习率随步数变化(线图) plt.figure() # 展示 Noam 曲线(history['lr'] 是每个 epoch 最后一步的lr;另绘制scheduler.lrs为步级曲线) plt.plot(np.arange(1, len(scheduler.lrs) + 1), scheduler.lrs, linewidth=2.0, label='Noam LR per step') plt.title('Learning Rate Schedule (Noam)', fontsize=14) plt.xlabel('Training Step') plt.ylabel('LR') plt.legend() plt.grid(alpha=0.3) plt.tight_layout() plt.show() # 用测试集做多角度可视化分析 # 抽取一个batch做注意力热力图 src_batch, tgt_batch, mode_batch = next(iter(test_loader)) src_batch, tgt_batch = src_batch.to(device), tgt_batch.to(device) # 验证/测试阶段:用教师强制来产生注意力权重(也可改为自回归采样) tgt_in = torch.zeros_like(tgt_batch) tgt_in[:, 1:] = tgt_batch[:, :-1] tgt_in[:, 0] = src_batch[:, -1] with torch.no_grad(): pred, logits = model(src_batch, tgt_in) # 图3:编码器自注意力热力图 # 选取第一条样本、第一个头 enc_attn = model.last_enc_self_attn # [B, H, S, S] if enc_attn isnotNone: attn_map = enc_attn[0, 0].detach().cpu().numpy() # [S, S] plt.figure() plt.imshow(attn_map, cmap='magma', aspect='auto') plt.colorbar() plt.title('Encoder Self-Attention Heatmap (Head 1, Sample 1)', fontsize=13) plt.xlabel('Key Position') plt.ylabel('Query Position') plt.tight_layout() plt.show() # 图4:注意力相对距离分布 # 统计 |i - j| 的加权分布,聚合 batch 后按 head 展示 if enc_attn isnotNone: attn = enc_attn.detach().cpu().numpy() # [B, H, S, S] B, H, S, _ = attn.shape distances = np.arange(S) head_distributions = [] for h in range(H): # 聚合 batch 和 query 维度 w = attn[:, h, :, :] # [B, S, S] # 计算距离权重 dist_weight = np.zeros(S, dtype=np.float64) total = 0.0 for i in range(S): for j in range(S): d = abs(i - j) val = w[:, i, j].sum() dist_weight[d] += val total += val dist_weight /= (total + 1e-8) head_distributions.append(dist_weight) # 绘制条形 + 平滑曲线 plt.figure() for h in range(H): color = plt.rcParams['axes.prop_cycle'].by_key()['color'][h % 10] plt.plot(distances, head_distributions[h], '-', linewidth=2.5, label=f'Head {h+1}', color=color) # 添加点标记 plt.scatter(distances, head_distributions[h], s=12, color=color, alpha=0.7) plt.title('Encoder Attention Distance Distribution by Head', fontsize=13) plt.xlabel('Absolute Distance |i - j|') plt.ylabel('Weighted Probability') plt.legend() plt.grid(alpha=0.3) plt.tight_layout() plt.show() # 图5:编码器嵌入的 t-SNE 可视化 # 取测试集一部分样本,取 encoder 输出做平均池化,然后做 t-SNE,颜色映射到类别 model.eval() embeds = [] labels = [] with torch.no_grad(): cnt = 0 for src, tgt, mode in test_loader: src, tgt = src.to(device), tgt.to(device) tgt_in = torch.zeros_like(tgt) tgt_in[:, 1:] = tgt[:, :-1] tgt_in[:, 0] = src[:, -1] # 前向,但我们只想拿编码器表示 src_ = model.src_proj(src.unsqueeze(-1)) src_ = model.pe(src_) mem, _ = model.encoder(src_) pooled = mem.mean(dim=1) # [B, d_model] embeds.append(pooled.cpu().numpy()) labels.append(mode.numpy()) cnt += src.size(0) if cnt >= 2000: # 限制样本数量,t-SNE更快 break embeds = np.concatenate(embeds, axis=0) labels = np.concatenate(labels, axis=0) tsne = TSNE(n_components=2, learning_rate='auto', init='pca', perplexity=30, random_state=SEED) emb2d = tsne.fit_transform(embeds) plt.figure() colors = ['#FF1493', '#00CED1', '#FFA500'] # 亮粉、青色、橙色 for c in [0, 1, 2]: idx = labels == c plt.scatter(emb2d[idx, 0], emb2d[idx, 1], s=16, c=colors[c], alpha=0.8, label=f'Class {c}') plt.title('t-SNE of Encoder Pooled Embeddings', fontsize=13) plt.xlabel('Dim 1') plt.ylabel('Dim 2') plt.legend() plt.grid(alpha=0.2) plt.tight_layout() plt.show() # 图6:多步预测的误差分布 # 在测试集上进行一次前向并统计每个预测步的绝对误差 abs_err_by_step = [] with torch.no_grad(): for src, tgt, mode in test_loader: src, tgt = src.to(device), tgt.to(device) tgt_in = torch.zeros_like(tgt) tgt_in[:, 1:] = tgt[:, :-1] tgt_in[:, 0] = src[:, -1] pred, _ = model(src, tgt_in) abs_err = (pred - tgt).abs().cpu().numpy() # [B, T] abs_err_by_step.append(abs_err) abs_err_by_step = np.concatenate(abs_err_by_step, axis=0) # [N, T] # 绘制小提琴图 plt.figure() parts = plt.violinplot([abs_err_by_step[:, t] for t in range(hp.tgt_len)], showmeans=True, showextrema=False) # 调整颜色鲜艳 for pc in parts['bodies']: pc.set_facecolor('#1E90FF') pc.set_edgecolor('#FF4500') pc.set_alpha(0.6) if'cmeans'in parts: parts['cmeans'].set_color('#FF4500') parts['cmeans'].set_linewidth(2.0) plt.title('Absolute Error Distribution per Forecast Horizon', fontsize=13) plt.xlabel('Horizon Step') plt.ylabel('Absolute Error') plt.grid(alpha=0.3) plt.tight_layout() plt.show() return historyif __name__ == '__main__': history = main_train_and_analyze()训练/验证损失曲线:
观察模型是否过拟合或欠拟合。两条曲线在后期收敛并接近,说明泛化尚可;若验证损失大幅上升,需加强正则或早停。
学习率随步数变化:
Noam 策略先升后降:前期热身避免过早陷入坏局部,后期降低学习率稳定收敛。曲线形状与论文一致,有助于稳定训练。
编码器注意力热力图:
行表示 Query 位置,列表示 Key 位置,颜色越亮表示越关注。可直观看出模型在对某一时刻进行编码时,更倾向于关注哪些历史时刻。对于周期性序列,可能出现对固定相位差位置的高关注。
注意力相对距离分布:
横轴是相对距离 |i-j|,纵轴为按注意力概率加权后的分布。不同头的分布形态可不同:
- 有的头偏好短距离(建模局部平滑/短期相关)
- 有的头偏好较长距离(建模周期/长期依赖)
这是多头=多视角的一个外在证据。
t-SNE 编码器嵌入二维可视化:
对编码器输出的平均池化向量做 t-SNE 可视化,按生成模式着色。若不同颜色簇较明显,说明编码器已学到区分不同动力学模态的表征。
多步预测误差分布:
横轴是预测步(1 到 20 步),纵轴是绝对误差。通常越远的步误差越大;小提琴的形状反映分布的离散程度。可以帮助你判断模型在多步预测上的退化速度与稳定性。
大家如果在后面想要扩展的方向,可以考虑如下的几点:
- 增加解码策略:从教师强制改为自回归生成;或蒸馏+scheduled sampling。
- 引入更复杂的数据生成机制:多频率混合、突变点、异常注入,评估鲁棒性。
- 模型层面:加深层数/维度;引入相对位置编码、因果卷积、分块注意力或 FlashAttention。
- 评估指标:MAE、SMAPE、DTW距离等;绘制校准图、残差自相关图。
三、常见问题
其实对于Transformer,大家过去的一些问题,进行了简单整理,大家结合上面的内容,进一步理解:
问:为什么要缩放点积注意力?
- 答:点积随着维度增大方差会变大,未经缩放的 softmax 可能进入梯度饱和区,训练不稳定。除以 可显著缓解。
问:多头越多越好吗?
- 答:头数多可增强表达,但会增加计算和显存,且头之间可能冗余。一般和 d_model 成比例,比如 d_model=64 用 4~8 头较常见,需调参。
问:位置编码用正弦还是可学习的?
- 答:正弦-余弦便宜且对超长泛化友好;可学习位置编码在固定长度范围内表现也好。对于时间序列,许多实践仍偏好正弦,或选相对位置编码(T5/DeBERTa 风格)。
问:时间序列强因果的场景,如何保证未来不泄漏?
- 答:解码器自注意力中使用未来屏蔽(上三角置 -inf),或在encoder-only场景用因果注意力。
四、总结
Transformer 的核心法术是注意力:全局检索 + 多头多视角,让模型能并行学习长依赖。
通过位置编码、残差+归一化、前馈网络、深层堆叠等模块,构成了一个稳定且强大的通用序列建模器。
后面几天的文章会围绕更多关于Transformer的细节内容,给大家分享出来~
五、如何系统的学习大模型 AI ?
由于新岗位的生产效率,要优于被取代岗位的生产效率,所以实际上整个社会的生产效率是提升的。
但是具体到个人,只能说是:
“最先掌握AI的人,将会比较晚掌握AI的人有竞争优势”。
这句话,放在计算机、互联网、移动互联网的开局时期,都是一样的道理。
我在一线互联网企业工作十余年里,指导过不少同行后辈。帮助很多人得到了学习和成长。
我意识到有很多经验和知识值得分享给大家,也可以通过我们的能力和经验解答大家在人工智能学习中的很多困惑,所以在工作繁忙的情况下还是坚持各种整理和分享。但苦于知识传播途径有限,很多互联网行业朋友无法获得正确的资料得到学习提升,故此将并将重要的AI大模型资料包括AI大模型入门学习思维导图、精品AI大模型学习书籍手册、视频教程、实战学习等录播视频免费分享出来。
一直在更新,更多的大模型学习和面试资料已经上传带到CSDN的官方了,有需要的朋友可以扫描下方二维码免费领取【保证100%免费】👇👇
01.大模型风口已至:月薪30K+的AI岗正在批量诞生
2025年大模型应用呈现爆发式增长,根据工信部最新数据:
国内大模型相关岗位缺口达47万
初级工程师平均薪资28K(数据来源:BOSS直聘报告)
70%企业存在"能用模型不会调优"的痛点
真实案例:某二本机械专业学员,通过4个月系统学习,成功拿到某AI医疗公司大模型优化岗offer,薪资直接翻3倍!
02.大模型 AI 学习和面试资料
1️⃣ 提示词工程:把ChatGPT从玩具变成生产工具
2️⃣ RAG系统:让大模型精准输出行业知识
3️⃣ 智能体开发:用AutoGPT打造24小时数字员工
📦熬了三个大夜整理的《AI进化工具包》送你:
✔️ 大厂内部LLM落地手册(含58个真实案例)
✔️ 提示词设计模板库(覆盖12大应用场景)
✔️ 私藏学习路径图(0基础到项目实战仅需90天)
第一阶段(10天):初阶应用
该阶段让大家对大模型 AI有一个最前沿的认识,对大模型 AI 的理解超过 95% 的人,可以在相关讨论时发表高级、不跟风、又接地气的见解,别人只会和 AI 聊天,而你能调教 AI,并能用代码将大模型和业务衔接。
- 大模型 AI 能干什么?
- 大模型是怎样获得「智能」的?
- 用好 AI 的核心心法
- 大模型应用业务架构
- 大模型应用技术架构
- 代码示例:向 GPT-3.5 灌入新知识
- 提示工程的意义和核心思想
- Prompt 典型构成
- 指令调优方法论
- 思维链和思维树
- Prompt 攻击和防范
- …
第二阶段(30天):高阶应用
该阶段我们正式进入大模型 AI 进阶实战学习,学会构造私有知识库,扩展 AI 的能力。快速开发一个完整的基于 agent 对话机器人。掌握功能最强的大模型开发框架,抓住最新的技术进展,适合 Python 和 JavaScript 程序员。
- 为什么要做 RAG
- 搭建一个简单的 ChatPDF
- 检索的基础概念
- 什么是向量表示(Embeddings)
- 向量数据库与向量检索
- 基于向量检索的 RAG
- 搭建 RAG 系统的扩展知识
- 混合检索与 RAG-Fusion 简介
- 向量模型本地部署
- …
第三阶段(30天):模型训练
恭喜你,如果学到这里,你基本可以找到一份大模型 AI相关的工作,自己也能训练 GPT 了!通过微调,训练自己的垂直大模型,能独立训练开源多模态大模型,掌握更多技术方案。
到此为止,大概2个月的时间。你已经成为了一名“AI小子”。那么你还想往下探索吗?
- 为什么要做 RAG
- 什么是模型
- 什么是模型训练
- 求解器 & 损失函数简介
- 小实验2:手写一个简单的神经网络并训练它
- 什么是训练/预训练/微调/轻量化微调
- Transformer结构简介
- 轻量化微调
- 实验数据集的构建
- …
第四阶段(20天):商业闭环
对全球大模型从性能、吞吐量、成本等方面有一定的认知,可以在云端和本地等多种环境下部署大模型,找到适合自己的项目/创业方向,做一名被 AI 武装的产品经理。
- 硬件选型
- 带你了解全球大模型
- 使用国产大模型服务
- 搭建 OpenAI 代理
- 热身:基于阿里云 PAI 部署 Stable Diffusion
- 在本地计算机运行大模型
- 大模型的私有化部署
- 基于 vLLM 部署大模型
- 案例:如何优雅地在阿里云私有部署开源大模型
- 部署一套开源 LLM 项目
- 内容安全
- 互联网信息服务算法备案
- …
学习是一个过程,只要学习就会有挑战。天道酬勤,你越努力,就会成为越优秀的自己。
如果你能在15天内完成所有的任务,那你堪称天才。然而,如果你能完成 60-70% 的内容,你就已经开始具备成为一名大模型 AI 的正确特征了。