1. 从零实现机器学习基础算法的必要性
在机器学习领域,调用现成的库(如scikit-learn)固然方便,但真正理解算法本质的开发者都会选择自己动手实现一遍。这就像学习烹饪时,从切菜开始准备食材比直接使用预制菜更能掌握料理的精髓。
自己动手实现基础算法有三个不可替代的价值:
- 深入理解数学原理:每个公式都会转化为具体的代码逻辑
- 掌握实现细节:了解算法在各种边界条件下的表现
- 培养工程能力:从理论到实践的完整闭环训练
Python作为机器学习的主流语言,其清晰的语法和丰富的科学计算生态(NumPy、Matplotlib)特别适合用来实现这些算法。下面我将带您完整实现5个最基础的机器学习算法,包括它们的数学推导和Python实现技巧。
2. 准备工作与环境搭建
2.1 基础工具链配置
推荐使用Python 3.8+环境,这是目前最稳定的版本。我们需要以下基础库:
pip install numpy matplotlib注意:虽然可以单独实现所有数学运算,但使用NumPy不仅能提高性能,其API设计也更贴近数学表达形式,方便我们验证实现的正确性。
2.2 测试数据集准备
为了方便验证算法效果,我们准备两个经典数据集:
- 线性回归测试数据:使用正弦函数加噪声生成
def make_regression_data(n_samples=100): X = np.linspace(0, 2*np.pi, n_samples) y = np.sin(X) + np.random.normal(0, 0.1, n_samples) return X.reshape(-1,1), y- 分类测试数据:使用sklearn的make_classification生成(仅用于测试,不用于算法实现)
from sklearn.datasets import make_classification X, y = make_classification(n_features=2, n_redundant=0, n_informative=2)3. 线性回归实现
3.1 数学原理推导
普通最小二乘法(OLS)的目标是最小化残差平方和: $$ \min_w |Xw - y|_2^2 $$
其解析解为: $$ w = (X^TX)^{-1}X^Ty $$
3.2 Python实现代码
class LinearRegression: def __init__(self): self.w = None def fit(self, X, y): # 添加偏置项 X = np.hstack([np.ones((X.shape[0], 1)), X]) # 计算解析解 self.w = np.linalg.inv(X.T @ X) @ X.T @ y def predict(self, X): X = np.hstack([np.ones((X.shape[0], 1)), X]) return X @ self.w3.3 梯度下降实现
当数据量较大时,可以使用梯度下降法:
def fit_gd(self, X, y, lr=0.01, n_iters=1000): X = np.hstack([np.ones((X.shape[0], 1)), X]) self.w = np.zeros(X.shape[1]) for _ in range(n_iters): grad = 2 * X.T @ (X @ self.w - y) / len(y) self.w -= lr * grad实操技巧:学习率(lr)通常从0.01开始尝试,观察损失函数下降曲线调整
4. 逻辑回归实现
4.1 Sigmoid函数与决策边界
逻辑回归使用sigmoid函数将线性输出映射到(0,1)区间: $$ \sigma(z) = \frac{1}{1+e^{-z}} $$
决策边界为: $$ w^Tx + b = 0 $$
4.2 交叉熵损失函数
损失函数定义为: $$ J(w) = -\frac{1}{m}\sum_{i=1}^m [y^{(i)}\log(h(x^{(i)})) + (1-y^{(i)})\log(1-h(x^{(i)}))] $$
其梯度为: $$ \nabla_w J(w) = \frac{1}{m}X^T(\sigma(Xw) - y) $$
4.3 Python实现
class LogisticRegression: def __init__(self, lr=0.01, n_iters=1000): self.lr = lr self.n_iters = n_iters self.w = None def _sigmoid(self, z): return 1 / (1 + np.exp(-z)) def fit(self, X, y): X = np.hstack([np.ones((X.shape[0], 1)), X]) self.w = np.zeros(X.shape[1]) for _ in range(self.n_iters): z = X @ self.w h = self._sigmoid(z) grad = X.T @ (h - y) / len(y) self.w -= self.lr * grad def predict_proba(self, X): X = np.hstack([np.ones((X.shape[0], 1)), X]) return self._sigmoid(X @ self.w) def predict(self, X, threshold=0.5): return (self.predict_proba(X) >= threshold).astype(int)5. K近邻算法实现
5.1 距离度量选择
常用的距离度量包括:
- 欧式距离:$\sqrt{\sum_{i=1}^n (x_i - y_i)^2}$
- 曼哈顿距离:$\sum_{i=1}^n |x_i - y_i|$
- 余弦相似度:$\frac{x \cdot y}{|x||y|}$
5.2 投票机制实现
class KNN: def __init__(self, k=3, metric='euclidean'): self.k = k self.metric = metric def _distance(self, a, b): if self.metric == 'euclidean': return np.sqrt(np.sum((a - b)**2)) elif self.metric == 'manhattan': return np.sum(np.abs(a - b)) def fit(self, X, y): self.X_train = X self.y_train = y def predict(self, X): preds = [] for x in X: # 计算所有训练样本的距离 distances = [self._distance(x, x_train) for x_train in self.X_train] # 获取最近的k个样本索引 k_indices = np.argsort(distances)[:self.k] # 获取对应标签 k_labels = self.y_train[k_indices] # 多数投票 pred = np.bincount(k_labels).argmax() preds.append(pred) return np.array(preds)性能优化提示:对于大数据集,可以使用KD树或Ball树加速近邻搜索
6. 朴素贝叶斯实现
6.1 概率计算公式
对于分类问题,我们需要计算: $$ P(y|x_1,...,x_n) = \frac{P(y)P(x_1|y)...P(x_n|y)}{P(x_1,...,x_n)} $$
由于分母相同,比较分子即可: $$ \hat{y} = \arg\max_y P(y)\prod_{i=1}^n P(x_i|y) $$
6.2 高斯朴素贝叶斯实现
class GaussianNB: def __init__(self): self.class_priors = None self.class_means = None self.class_vars = None def fit(self, X, y): self.classes = np.unique(y) n_classes = len(self.classes) n_features = X.shape[1] self.class_priors = np.zeros(n_classes) self.class_means = np.zeros((n_classes, n_features)) self.class_vars = np.zeros((n_classes, n_features)) for i, c in enumerate(self.classes): X_c = X[y == c] self.class_priors[i] = len(X_c) / len(X) self.class_means[i] = X_c.mean(axis=0) self.class_vars[i] = X_c.var(axis=0) def _gaussian_pdf(self, x, mean, var): return np.exp(-(x - mean)**2 / (2 * var)) / np.sqrt(2 * np.pi * var) def predict(self, X): posteriors = [] for i, c in enumerate(self.classes): prior = np.log(self.class_priors[i]) likelihood = np.sum(np.log(self._gaussian_pdf(X, self.class_means[i], self.class_vars[i])), axis=1) posterior = prior + likelihood posteriors.append(posterior) return self.classes[np.argmax(np.array(posteriors), axis=0)]7. 决策树实现
7.1 信息增益计算
使用信息增益选择最佳分割特征: $$ IG(D_p, f) = I(D_p) - \sum_{j=1}^m \frac{N_j}{N_p}I(D_j) $$
其中$I$可以是基尼系数或熵:
- 基尼系数:$I_G(t) = 1 - \sum_{i=1}^c p(i|t)^2$
- 熵:$I_E(t) = -\sum_{i=1}^c p(i|t)\log_2 p(i|t)$
7.2 递归构建决策树
class DecisionTree: def __init__(self, max_depth=None, criterion='gini'): self.max_depth = max_depth self.criterion = criterion self.tree = None def _gini(self, y): _, counts = np.unique(y, return_counts=True) probs = counts / len(y) return 1 - np.sum(probs**2) def _entropy(self, y): _, counts = np.unique(y, return_counts=True) probs = counts / len(y) return -np.sum(probs * np.log2(probs)) def _information_gain(self, X_col, y, split_threshold): parent_impurity = self._gini(y) if self.criterion == 'gini' else self._entropy(y) left_idx = X_col <= split_threshold right_idx = X_col > split_threshold n, n_left, n_right = len(y), sum(left_idx), sum(right_idx) if n_left == 0 or n_right == 0: return 0 child_impurity = (n_left/n)*self._gini(y[left_idx]) + (n_right/n)*self._gini(y[right_idx]) \ if self.criterion == 'gini' else \ (n_left/n)*self._entropy(y[left_idx]) + (n_right/n)*self._entropy(y[right_idx]) return parent_impurity - child_impurity def _best_split(self, X, y): best_gain = -1 best_feature, best_threshold = None, None for feature_idx in range(X.shape[1]): X_col = X[:, feature_idx] thresholds = np.unique(X_col) for threshold in thresholds: gain = self._information_gain(X_col, y, threshold) if gain > best_gain: best_gain = gain best_feature = feature_idx best_threshold = threshold return best_feature, best_threshold def _build_tree(self, X, y, depth=0): n_samples, n_features = X.shape n_classes = len(np.unique(y)) # 终止条件 if (self.max_depth is not None and depth >= self.max_depth) or n_classes == 1: return {'class': np.bincount(y).argmax()} # 寻找最佳分割 feature, threshold = self._best_split(X, y) if feature is None: return {'class': np.bincount(y).argmax()} # 递归构建子树 left_idx = X[:, feature] <= threshold right_idx = X[:, feature] > threshold left_subtree = self._build_tree(X[left_idx], y[left_idx], depth+1) right_subtree = self._build_tree(X[right_idx], y[right_idx], depth+1) return { 'feature': feature, 'threshold': threshold, 'left': left_subtree, 'right': right_subtree } def fit(self, X, y): self.tree = self._build_tree(X, y) def _predict_sample(self, x, node): if 'class' in node: return node['class'] if x[node['feature']] <= node['threshold']: return self._predict_sample(x, node['left']) else: return self._predict_sample(x, node['right']) def predict(self, X): return np.array([self._predict_sample(x, self.tree) for x in X])8. 算法评估与比较
8.1 评估指标实现
def accuracy_score(y_true, y_pred): return np.sum(y_true == y_pred) / len(y_true) def mean_squared_error(y_true, y_pred): return np.mean((y_true - y_pred)**2)8.2 交叉验证实现
def cross_val_score(model, X, y, cv=5, scoring='accuracy'): n_samples = len(X) fold_size = n_samples // cv indices = np.arange(n_samples) np.random.shuffle(indices) scores = [] for i in range(cv): start = i * fold_size end = (i + 1) * fold_size if i != cv - 1 else n_samples test_idx = indices[start:end] train_idx = np.concatenate([indices[:start], indices[end:]]) X_train, X_test = X[train_idx], X[test_idx] y_train, y_test = y[train_idx], y[test_idx] model.fit(X_train, y_train) y_pred = model.predict(X_test) if scoring == 'accuracy': score = accuracy_score(y_test, y_pred) elif scoring == 'mse': score = mean_squared_error(y_test, y_pred) scores.append(score) return np.array(scores)9. 工程实践中的优化技巧
9.1 数值稳定性处理
在逻辑回归中,sigmoid函数可以改写为数值稳定的版本:
def _sigmoid(self, z): # 防止数值溢出 z = np.clip(z, -500, 500) return 1 / (1 + np.exp(-z))9.2 向量化计算优化
以线性回归为例,矩阵运算比循环快100倍以上:
# 不好的实现 def predict(self, X): return np.array([np.dot(x, self.w) for x in X]) # 好的实现 def predict(self, X): return X @ self.w9.3 超参数调优策略
实现简单的网格搜索:
def grid_search(model, X, y, param_grid, cv=5): best_score = -np.inf best_params = None # 生成参数组合 keys, values = zip(*param_grid.items()) for params in itertools.product(*values): params_dict = dict(zip(keys, params)) model.set_params(**params_dict) scores = cross_val_score(model, X, y, cv=cv) mean_score = np.mean(scores) if mean_score > best_score: best_score = mean_score best_params = params_dict return best_params, best_score10. 扩展与进阶方向
掌握了这些基础算法实现后,可以考虑以下进阶方向:
- 正则化实现:为线性回归和逻辑回归添加L1/L2正则化项
- 多分类扩展:改造逻辑回归为softmax回归
- 核方法实现:为SVM添加核函数支持
- 集成方法:基于决策树实现随机森林和GBDT
- 神经网络:从零实现多层感知机
每个算法的完整实现通常需要考虑更多工程细节,比如:
- 异常值处理
- 缺失值处理
- 类别不平衡处理
- 在线学习支持
- 并行计算优化
这些实现虽然比直接调用库更复杂,但对理解算法本质和提升工程能力有着不可替代的作用。建议读者在理解本文代码的基础上,尝试为每个算法添加更多功能和优化。