系列第4篇:Python+Go构建企业级AI Agent实战指南(4/13)
标签:DeerFlow | Skill开发 | 数据分析 | Python | 源码解析
一、开篇:Skill是Agent的"超能力"
如果把AI Agent比作一个职场新人,那么**Skill(技能)**就是他的专业能力:
- 会Excel → 数据处理Skill
- 会Python → 编程Skill
- 会写报告 → 文案Skill
DeerFlow的Skill机制,让Agent的能力扩展变得像安装APP一样简单。本文将深入Skill.md开发,带你构建一个生产级的数据分析Agent。
二、Skill.md文件详解
2.1 Skill.md结构
# Skill.md 标准结构 ## 基本信息 name: Skill名称 description: Skill功能描述 version: 版本号 author: 作者 ## 能力列表 (capabilities) 定义Agent能做什么 ## 工作流程 (workflow) 定义Agent如何执行任务 ## 约束条件 (constraints) 定义边界和限制 ## 示例 (examples) 提供使用示例2.2 完整Skill.md示例
# sales_analyzer/Skill.md ## 基本信息 name: SalesAnalyzer description: 专业的销售数据分析智能体,提供销售趋势分析、预测和洞察 version: 2.0.0 author: AI Team ## 能力列表 capabilities: - name: load_sales_data description: 加载销售数据,支持CSV、Excel格式 parameters: - name: file_path type: string required: true description: 数据文件路径 - name: date_column type: string required: false default: "date" description: 日期列名称 returns: type: dataframe description: 加载的数据框 - name: analyze_trends description: 分析销售趋势,包括同比、环比、移动平均线 parameters: - name: df type: dataframe required: true - name: metric type: string required: true description: 分析指标,如sales、revenue - name: period type: string enum: [daily, weekly, monthly] default: "monthly" returns: type: dict description: 趋势分析结果 - name: forecast_sales description: 使用 Prophet 模型预测未来销售 parameters: - name: df type: dataframe required: true - name: periods type: integer required: false default: 30 description: 预测天数 returns: type: dataframe description: 预测结果 - name: generate_insights description: 基于分析结果生成业务洞察 parameters: - name: analysis_results type: dict required: true returns: type: list description: 洞察列表 ## 工作流程 workflow: 1. 接收用户的数据文件路径和分析需求 2. 验证文件格式和完整性 3. 加载数据并执行基础清洗 4. 根据需求选择分析维度(趋势/预测/对比) 5. 执行分析并生成可视化 6. 提取关键洞察和业务建议 7. 整合输出完整的分析报告 ## 约束条件 constraints: - 数据文件大小不超过500MB - 支持格式:CSV、Excel (.xlsx, .xls) - 必须包含日期列 - 数值列不能包含非数字字符 - 预测功能需要至少3个月历史数据 ## 示例 examples: - input: "分析sales_2024.csv的销售趋势" output: | 📊 销售趋势分析报告 1. 整体趋势:2024年销售额同比增长23% 2. 季节性:Q4为销售旺季,占全年35% 3. 异常点:3月份销售额下降15%,建议关注 [附:趋势图、预测图] - input: "预测下个月的销售额" output: | 🔮 销售预测 基于历史数据,预测下月销售额: - 预测值:$125,000 - 置信区间:$118,000 - $132,000 - 环比预期:+8%三、Skill的Python实现
3.1 项目结构
sales_analyzer/ ├── Skill.md # 技能定义文件 ├── __init__.py ├── agent.py # Agent主类 ├── tools/ │ ├── __init__.py │ ├── data_loader.py # 数据加载工具 │ ├── analyzer.py # 分析工具 │ ├── forecaster.py # 预测工具 │ └── visualizer.py # 可视化工具 ├── prompts/ │ └── analysis_prompt.txt └── tests/ └── test_analyzer.py3.2 完整代码实现
# sales_analyzer/agent.py import pandas as pd import numpy as np from datetime import datetime, timedelta from typing import Dict, List, Optional import matplotlib.pyplot as plt import seaborn as sns from prophet import Prophet import warnings warnings.filterwarnings('ignore') from deerflow import Agent, Tool from deerflow.types import ToolResult plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS'] plt.rcParams['axes.unicode_minus'] = False class SalesAnalyzerAgent(Agent): """ 销售数据分析智能体 提供销售趋势分析、预测和洞察生成功能 """ def __init__(self, config: Optional[Dict] = None): super().__init__() self.config = config or {} self.data = None self.setup_tools() def setup_tools(self): """注册所有工具""" @Tool( name="load_sales_data", description="加载销售数据文件" ) def load_data(file_path: str, date_column: str = "date") -> ToolResult: """加载并验证销售数据""" try: # 根据扩展名选择读取方式 if file_path.endswith('.csv'): df = pd.read_csv(file_path) elif file_path.endswith(('.xlsx', '.xls')): df = pd.read_excel(file_path) else: return ToolResult.error("不支持的文件格式,请使用CSV或Excel") # 验证日期列 if date_column not in df.columns: return ToolResult.error(f"未找到日期列: {date_column}") # 转换日期格式 df[date_column] = pd.to_datetime(df[date_column]) df = df.sort_values(date_column) # 存储数据 self.data = df # 返回数据摘要 summary = { "rows": len(df), "columns": list(df.columns), "date_range": f"{df[date_column].min()} 至 {df[date_column].max()}", "numeric_columns": df.select_dtypes(include=[np.number]).columns.tolist() } return ToolResult.success(summary) except Exception as e: return ToolResult.error(f"数据加载失败: {str(e)}") @Tool( name="analyze_trends", description="分析销售趋势" ) def analyze_trends( metric: str = "sales", period: str = "monthly" ) -> ToolResult: """分析销售趋势指标""" if self.data is None: return ToolResult.error("请先加载数据") if metric not in self.data.columns: return ToolResult.error(f"未找到指标列: {metric}") try: df = self.data.copy() # 按周期聚合 if period == "daily": df_grouped = df.groupby(df['date'].dt.date)[metric].sum() elif period == "weekly": df_grouped = df.groupby(df['date'].dt.to_period('W'))[metric].sum() else: # monthly df_grouped = df.groupby(df['date'].dt.to_period('M'))[metric].sum() # 计算关键指标 total = df_grouped.sum() mean = df_grouped.mean() growth_rate = (df_grouped.iloc[-1] - df_grouped.iloc[0]) / df_grouped.iloc[0] * 100 # 计算移动平均线 ma_7 = df_grouped.rolling(window=7, min_periods=1).mean() ma_30 = df_grouped.rolling(window=30, min_periods=1).mean() # 找出峰值和谷值 peak_idx = df_grouped.idxmax() valley_idx = df_grouped.idxmin() result = { "period": period, "metric": metric, "total": round(total, 2), "average": round(mean, 2), "growth_rate": round(growth_rate, 2), "peak": {"period": str(peak_idx), "value": round(df_grouped.max(), 2)}, "valley": {"period": str(valley_idx), "value": round(df_grouped.min(), 2)}, "trend_data": df_grouped.to_dict() } return ToolResult.success(result) except Exception as e: return ToolResult.error(f"趋势分析失败: {str(e)}") @Tool( name="forecast_sales", description="预测未来销售趋势" ) def forecast_sales( metric: str = "sales", periods: int = 30 ) -> ToolResult: """使用Prophet模型进行销售预测""" if self.data is None: return ToolResult.error("请先加载数据") try: # 准备Prophet格式数据 df_prophet = self.data.rename(columns={ 'date': 'ds', metric: 'y' })[['ds', 'y']] # 创建并训练模型 model = Prophet( yearly_seasonality=True, weekly_seasonality=True, daily_seasonality=False ) model.fit(df_prophet) # 生成预测 future = model.make_future_dataframe(periods=periods) forecast = model.predict(future) # 提取预测结果 forecast_future = forecast[forecast['ds'] > df_prophet['ds'].max()] result = { "forecast_periods": periods, "predictions": [ { "date": row['ds'].strftime('%Y-%m-%d'), "value": round(row['yhat'], 2), "lower": round(row['yhat_lower'], 2), "upper": round(row['yhat_upper'], 2) } for _, row in forecast_future.head(10).iterrows() ], "trend_direction": "up" if forecast['trend'].iloc[-1] > forecast['trend'].iloc[-30] else "down" } return ToolResult.success(result) except Exception as e: return ToolResult.error(f"预测失败: {str(e)}") @Tool( name="generate_report", description="生成完整的分析报告" ) def generate_report( trend_analysis: Dict, forecast: Optional[Dict] = None ) -> ToolResult: """生成Markdown格式的分析报告""" report = f"""# 📊 销售数据分析报告 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M')} ## 1. 数据概览 - **分析指标**: {trend_analysis['metric']} - **时间周期**: {trend_analysis['period']} - **总销售额**: ¥{trend_analysis['total']:,.2f} - **平均销售额**: ¥{trend_analysis['average']:,.2f} ## 2. 趋势分析 ### 2.1 整体趋势 - **增长率**: {trend_analysis['growth_rate']:+.2f}% - **趋势方向**: {'📈 上升' if trend_analysis['growth_rate'] > 0 else '📉 下降'} ### 2.2 关键节点 - **销售峰值**: ¥{trend_analysis['peak']['value']:,.2f} ({trend_analysis['peak']['period']}) - **销售谷值**: ¥{trend_analysis['valley']['value']:,.2f} ({trend_analysis['valley']['period']}) """ if forecast: report += f""" ## 3. 销售预测 ### 3.1 未来{trend_analysis['period']}预测 - **趋势方向**: {'📈 预期上升' if forecast['trend_direction'] == 'up' else '📉 预期下降'} - **预测周期**: {forecast['forecast_periods']} 天 ### 3.2 近期预测值 | 日期 | 预测值 | 置信区间下限 | 置信区间上限 | |------|--------|-------------|-------------| """ for pred in forecast['predictions'][:5]: report += f"| {pred['date']} | ¥{pred['value']:,.2f} | ¥{pred['lower']:,.2f} | ¥{pred['upper']:,.2f} |\n" report += """ ## 4. 业务建议 基于以上分析,建议关注以下方面: 1. **趋势把握**: 根据增长趋势调整库存和人员配置 2. **季节性**: 关注销售峰值周期,提前做好营销准备 3. **异常监控**: 建立预警机制,及时发现销售异常 --- *报告由 SalesAnalyzer Agent 自动生成* """ return ToolResult.success({"report": report}) # 注册所有工具 self.register_tool(load_data) self.register_tool(analyze_trends) self.register_tool(forecast_sales) self.register_tool(generate_report) async def run_analysis(self, file_path: str, metric: str = "sales") -> str: """执行完整的分析流程""" print("🚀 开始销售数据分析...") # 1. 加载数据 print("📖 加载数据...") load_result = await self.call_tool("load_sales_data", file_path=file_path) if not load_result.success: return f"❌ 数据加载失败: {load_result.data}" print(f"✅ 数据加载成功: {load_result.data}") # 2. 趋势分析 print("📊 分析趋势...") trend_result = await self.call_tool("analyze_trends", metric=metric) if not trend_result.success: return f"❌ 趋势分析失败: {trend_result.data}" print("✅ 趋势分析完成") # 3. 销售预测 print("🔮 预测未来趋势...") forecast_result = await self.call_tool("forecast_sales", metric=metric, periods=30) if not forecast_result.success: print(f"⚠️ 预测失败: {forecast_result.data}") forecast_result = None else: print("✅ 预测完成") # 4. 生成报告 print("📝 生成报告...") report_result = await self.call_tool( "generate_report", trend_analysis=trend_result.data, forecast=forecast_result.data if forecast_result else None ) if report_result.success: # 保存报告 output_path = f"sales_report_{datetime.now().strftime('%Y%m%d_%H%M')}.md" with open(output_path, 'w', encoding='utf-8') as f: f.write(report_result.data['report']) print(f"✅ 报告已保存: {output_path}") return report_result.data['report'] else: return f"❌ 报告生成失败: {report_result.data}" # 使用示例 async def main(): # 创建示例数据 np.random.seed(42) dates = pd.date_range('2024-01-01', periods=365, freq='D') base_sales = 1000 trend = np.linspace(0, 500, 365) seasonal = 200 * np.sin(2 * np.pi * np.arange(365) / 365.25) noise = np.random.normal(0, 100, 365) sales = base_sales + trend + seasonal + noise sample_df = pd.DataFrame({ 'date': dates, 'sales': sales.astype(int), 'orders': (sales / 50).astype(int) }) sample_df.to_csv('sample_sales.csv', index=False) print("✅ 示例数据已创建: sample_sales.csv") # 创建Agent并执行分析 agent = SalesAnalyzerAgent() report = await agent.run_analysis('sample_sales.csv', metric='sales') print("\n" + "="*60) print("📄 报告预览:") print("="*60) print(report[:1000]) print("...") if __name__ == "__main__": import asyncio asyncio.run(main())四、测试与调试
4.1 单元测试
# tests/test_analyzer.py import pytest import pandas as pd from sales_analyzer.agent import SalesAnalyzerAgent @pytest.fixture def sample_data(): """创建测试数据""" dates = pd.date_range('2024-01-01', periods=100, freq='D') return pd.DataFrame({ 'date': dates, 'sales': range(100), 'revenue': [x * 10 for x in range(100)] }) @pytest.fixture def agent(): return SalesAnalyzerAgent() @pytest.mark.asyncio async def test_load_data(agent, tmp_path): """测试数据加载""" # 创建临时CSV csv_path = tmp_path / "test.csv" sample_data().to_csv(csv_path, index=False) result = await agent.call_tool("load_sales_data", file_path=str(csv_path)) assert result.success assert result.data['rows'] == 100 @pytest.mark.asyncio async def test_analyze_trends(agent, tmp_path): """测试趋势分析""" # 先加载数据 csv_path = tmp_path / "test.csv" sample_data().to_csv(csv_path, index=False) await agent.call_tool("load_sales_data", file_path=str(csv_path)) # 测试分析 result = await agent.call_tool("analyze_trends", metric="sales") assert result.success assert 'growth_rate' in result.data五、结语
通过本文,你已经掌握了:
- ✅ Skill.md文件的完整结构和编写规范
- ✅ DeerFlow Agent的Python实现
- ✅ 工具注册和调用机制
- ✅ 完整的数据分析Agent开发流程
下一篇,我们将探讨Python与国产大模型的深度集成。
系列文章导航:← 3. Python构建AI多智能体系统实战 4.Python技能开发实战(本文) 5. Python与国产大模型的深度集成 →
本文首发于CSDN,转载请注明出处。
标签:DeerFlow | Skill开发 | 数据分析 | Python | 源码解析