开源巴西金融市场数据引擎:从数据管道到REST API的量化投资实践
2026/5/13 5:37:01 网站建设 项目流程

1. 项目概述:从零构建一个巴西金融市场数据引擎

如果你也研究过巴西的股票和房地产投资信托基金,你肯定和我一样,被获取结构化数据这件事折磨过。市面上要么是付费API,价格不菲;要么是零散的CSV文件,数据清洗和整合的工作量巨大。这个项目,b3-data-fetcher,就是在这种“受够了”的情绪下诞生的。它的核心目标很简单:打造一个完全开源、可以本地部署的数据管道和API服务,让你能自由、透明地分析和筛选巴西B3交易所的资产。

简单来说,它干了两件事:第一,作为数据管道,它自动抓取并处理来自巴西证券交易委员会和雅虎财经的官方数据;第二,作为一个计算引擎,它基于一套自定义的投资启发式算法,对资产进行评分和排序。最终,它会通过一个REST API把这些处理好的、富含洞见的数据提供给你。整个过程不依赖任何外部付费服务,数据源完全公开透明,你甚至可以自己验证每一个计算步骤。这不仅仅是另一个数据抓取脚本,而是一个旨在成为个人量化投资分析基石的“数据引擎”。

2. 核心架构与设计思路拆解

2.1 为什么选择“数据管道 + API”的模式?

在项目初期,我评估过几种方案。最简单的是写一个一次性脚本,跑完输出CSV。但这样每次分析都要重新运行,无法实时交互。另一种是直接做一个带界面的桌面应用,但这限制了自动化集成和二次开发的可能性。最终选择“数据管道 + REST API”的架构,主要基于以下几点考量:

  1. 解耦与复用性:数据获取、清洗、计算逻辑(管道部分)与数据呈现、服务接口(API部分)分离。这意味着你可以只使用管道部分,将处理好的数据导入你自己的数据库或分析工具;也可以直接调用API,快速构建前端仪表盘或移动应用。
  2. 自动化与可扩展性:管道可以配置为定时任务(例如,每天收盘后自动更新),确保数据新鲜度。API层则可以方便地添加缓存、认证、限流等企业级功能。
  3. 生态友好:REST API是当前服务间通信的事实标准,几乎任何编程语言或工具都能轻松调用,极大降低了使用门槛。

这个架构的核心思想是:将脏活、累活(数据清洗、映射、计算)在后台一次性做好,并通过一个干净、稳定的接口提供价值

2.2 技术栈选型背后的逻辑

项目当前的技术栈是Python+Poetry+FastAPI,并规划了PostgreSQLRedis。每一个选择都有其具体原因:

  • Python:在金融数据分析和原型快速开发领域,Python拥有无与伦比的生态优势。pandas用于数据清洗和计算,yfinance用于获取股价,requests处理网络请求,这些库成熟且高效。
  • Poetry:相比传统的requirements.txtvirtualenv,Poetry提供了更强大的依赖管理和打包能力。它能精确锁定依赖版本,确保项目在任何机器上都能以完全一致的环境运行,这对于数据项目的可复现性至关重要。
  • FastAPI:作为API框架,FastAPI的性能与NodeJS和Go的框架相当,且自动生成交互式API文档(OpenAPI)的特性,对于项目的可用性和社区推广有巨大帮助。其基于Python类型提示的声明式接口编写方式,也让代码更清晰、错误更少。
  • SQLite → PostgreSQL:开发初期使用SQLite,因为它无需额外服务,开箱即用,适合原型验证。但SQLite在高并发写入和复杂连接查询上存在瓶颈。因此,路线图中规划迁移到PostgreSQL,这是一个功能强大、稳定可靠的开源关系数据库,完全能满足生产环境的需求。
  • Redis:金融数据虽然变化,但在分钟甚至小时级别内是相对静态的。将处理后的资产列表、指标计算结果缓存到Redis中,可以极大减轻数据库压力和计算开销,将API响应时间从秒级降低到毫秒级。

这个技术栈组合,在开发效率、运行性能、可维护性和社区支持上取得了很好的平衡。

3. 数据管道:核心细节与实现解析

数据管道是整个项目的心脏,它负责从原始、杂乱的CSV文件到干净、可分析的结构化数据的全过程。这个过程远比一个简单的pd.read_csv复杂。

3.1 多源数据整合的挑战与解决方案

项目需要处理两个主要数据源:CVM(巴西证券交易委员会)的官方基金报表和雅虎财经的实时股价。最大的挑战在于数据关联:CVM数据通过CNPJ(巴西的公司税号)标识基金,而雅虎财经使用股票代码。两者之间没有直接的、稳定的公共映射表。

我的解决方案是引入一个自定义的映射文件(ativos.txt)。这个文件的格式很简单:

PVBI11, 12.345.678/0001-90 HGLG11, 98.765.432/0001-21 ...

第一列是代码,第二列是CNPJ。这个文件需要手动维护和更新,虽然听起来有点“糙”,但在当前阶段是最可靠、最可控的方式。未来可以考虑从B3官网或其他可靠来源定期爬取这个映射关系来实现自动化。

实操心得:在构建数据管道时,不要追求一步到位的全自动化。优先保证核心流程(数据获取、清洗、计算)的畅通,对于像“代码-CNPJ映射”这类边界但关键的问题,可以采用“半自动+人工校验”的过渡方案。一个可工作的、部分手动的系统,远比一个设计完美但无法运行的“全自动”系统有价值。

3.2 核心指标的计算逻辑与财务意义

管道不只是合并数据,更重要的是根据投资逻辑计算衍生指标。当前实现的启发式算法围绕几个核心指标展开:

  1. 每股净资产值:直接从CVM的fii_complemento等CSV中提取并计算。它代表了每份基金份额对应的底层资产净值,是评估基金价值的基础。
  2. 市净率:计算公式为当前股价 / 每股净资产值。这是判断资产是否“便宜”的关键指标。P/VP < 1 通常意味着股价低于其净资产,可能被低估;反之则可能被高估。
  3. 股息率:这里计算的是基于净资产值的月度股息率。首先从CVM数据中获取月度分红金额,然后计算月度分红 / 每股净资产值。这个指标反映了基金以其净资产为基础产生现金流的能力。
  4. 理论月度股息:计算公式为每股净资产值 × 月度股息率。这个值可以理解为,在当前净资产水平下,基金“应该”能分出的红利金额,是一个理论参考值。
  5. 机会分数:这是整个启发式算法的核心。一个简单的版本可以是(股息率 * 权重A) - (P/VP * 权重B)。通过调整权重,你可以表达自己的投资偏好:是更看重高股息,还是更看重低估值?这个分数用于对所有资产进行排序,分数越高,代表综合投资机会越好。
# 简化的计算示例 (非完整代码) import pandas as pd def calculate_metrics(df_cvm, df_yahoo, mapping_dict): """ df_cvm: 包含CNPJ, vp_cota (每股净资产), dy_mes (月度股息率) 的DataFrame df_yahoo: 包含ticker, price (当前价) 的DataFrame mapping_dict: {ticker: cnpj} 的映射字典 """ # 1. 通过映射字典合并数据 df_yahoo['cnpj'] = df_yahoo['ticker'].map(mapping_dict) df_merged = pd.merge(df_cvm, df_yahoo, on='cnpj', how='inner') # 2. 计算核心指标 df_merged['p_vp'] = df_merged['price'] / df_merged['vp_cota'] df_merged['dividend_r'] = df_merged['vp_cota'] * df_merged['dy_mes'] # 3. 计算简单的机会分数 (示例:更看重低P/VP和高DY) # 对P/VP进行归一化(值越小越好),对DY进行归一化(值越大越好) df_merged['p_vp_norm'] = (df_merged['p_vp'].max() - df_merged['p_vp']) / (df_merged['p_vp'].max() - df_merged['p_vp'].min()) df_merged['dy_norm'] = (df_merged['dy_mes'] - df_merged['dy_mes'].min()) / (df_merged['dy_mes'].max() - df_merged['dy_mes'].min()) df_merged['opportunity_score'] = 0.6 * df_merged['dy_norm'] + 0.4 * df_merged['p_vp_norm'] # 4. 排序 df_merged = df_merged.sort_values(by='opportunity_score', ascending=False) return df_merged[['ticker', 'vp_cota', 'dy_mes', 'price', 'p_vp', 'dividend_r', 'opportunity_score']]

3.3 缓存策略:平衡数据新鲜度与性能

金融数据并非每秒都在剧烈变化。频繁地从雅虎财经拉取数据可能触发限流,而重复计算所有指标也是一种浪费。因此,我实现了一个简单的文件缓存系统 (cache.mouras)。

缓存逻辑如下:

  1. 检查缓存:程序运行时,首先检查缓存文件是否存在,以及其创建时间。
  2. 判断时效:通过环境变量CACHE_TTL_MINUTES(例如设为30)设置缓存有效时间。如果缓存文件存在且未过期,则直接加载缓存数据。
  3. 更新缓存:如果缓存不存在或已过期,则执行完整的数据获取、处理和计算流程,并将结果序列化(如用JSON或Pickle格式)保存到缓存文件中,同时更新文件时间戳。

这个策略确保了在缓存有效期内,API的响应是瞬时的;而一旦数据“变旧”,系统会自动更新。未来迁移到Redis后,这个机制会更高效,并可以支持更复杂的缓存失效策略。

4. 从脚本到服务:FastAPI 的实现与演进

当前项目是一个命令行脚本,输出结果到终端。下一步的核心演进就是将其封装成HTTP API服务,让数据能够被远程、结构化地访问。

4.1 构建最小可行API

使用FastAPI,构建一个基础端点非常简单。首先,我们需要将数据处理逻辑模块化。假设我们有一个data_processor.py模块,里面有一个get_processed_assets()函数,它返回我们之前计算好的DataFrame或字典列表。

# main.py (演进为FastAPI应用入口) from fastapi import FastAPI, Query from typing import Optional, List import data_processor app = FastAPI(title="B3 Data Fetcher API", description="开源巴西金融市场数据API") @app.get("/assets") async def get_assets( asset_type: Optional[str] = Query(None, description="过滤资产类型,如 'FII'"), max_p_vp: Optional[float] = Query(None, description="P/VP最大值,例如 1.0"), min_dy: Optional[float] = Query(None, description="股息率最小值,例如 0.005 (0.5%)") ): """ 获取处理后的资产列表,支持基础过滤。 """ # 调用数据处理模块 assets = data_processor.get_processed_assets() # 应用过滤器 filtered_assets = assets if asset_type: # 假设数据中有 'type' 字段 filtered_assets = [a for a in filtered_assets if a.get('type') == asset_type] if max_p_vp is not None: filtered_assets = [a for a in filtered_assets if a.get('p_vp', float('inf')) <= max_p_vp] if min_dy is not None: filtered_assets = [a for a in filtered_assets if a.get('dy_mes', 0) >= min_dy] return {"count": len(filtered_assets), "assets": filtered_assets}

这个/assets端点已经具备了实用价值。通过查询参数,前端可以轻松实现“只显示P/VP小于1的FII”这样的筛选功能。FastAPI会自动为这个端点生成交互式API文档(通常位于/docs),极大方便了API的测试和使用。

4.2 数据结构化与数据库集成

目前数据可能存在于内存或临时文件中。为了持久化、历史查询和更高效的数据操作,引入数据库是必然的。路线图规划了从SQLite到PostgreSQL的迁移。

数据库模型设计思路

  • 资产表:存储资产基本信息,如代码、CNPJ、名称、类型。这是维度表。
  • 数据快照表:这是核心事实表。每天(或每次数据更新)为每个资产插入一条记录,包含当时的时间戳、股价、净资产值、股息率、计算出的P/VP、机会分数等所有指标。
  • 映射表:独立存储代码-CNPJ映射关系,方便维护更新。

这样的设计允许你:

  1. 查询任意资产在某个时间点的所有数据。
  2. 计算指标的历史趋势(例如,P/VP在过去一年的变化)。
  3. 高效地进行复杂的跨资产、跨时间段的筛选和聚合查询。

当API收到请求时,它将不再直接调用data_processor进行实时计算,而是转换为对数据库的查询。数据处理管道则作为一个独立的后台进程或定时任务,定期从源头获取新数据,计算后写入数据库。

4.3 性能优化:引入缓存层

即使有了数据库,一些高频、静态的查询(比如“当前机会分数最高的10个FII”)仍然值得缓存。这就是Redis的用武之地。

集成Redis后的API流程会变成这样:

  1. 收到请求,如GET /assets?type=FII&limit=10&sort=-opportunity_score
  2. 根据请求参数生成一个唯一的缓存键,例如assets:FII:top10:score
  3. 首先检查Redis中是否存在这个键。如果存在且未过期,直接返回缓存的值。
  4. 如果Redis中不存在,则查询数据库,获取结果。
  5. 将查询结果序列化后存入Redis,并设置一个合理的过期时间(例如60秒)。
  6. 返回结果。

这个简单的“旁路缓存”策略,对于读多写少的金融数据查询场景,能带来数量级的性能提升。FastAPI社区有成熟的库(如aioredis)可以方便地集成这一功能。

5. 部署、运维与未来扩展

5.1 本地开发与生产部署

项目使用Poetry管理依赖,这使得环境一致性得到保障。对于生产部署,一个常见的做法是使用Docker容器化。

一个简单的Dockerfile可能如下所示:

FROM python:3.11-slim WORKDIR /app # 复制依赖定义文件 COPY pyproject.toml poetry.lock ./ # 安装Poetry和项目依赖 RUN pip install poetry && poetry config virtualenvs.create false && poetry install --no-dev --no-interaction --no-ansi # 复制应用代码 COPY . . # 运行FastAPI应用 CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

结合docker-compose.yml,可以轻松定义并启动包含API服务、PostgreSQL数据库和Redis缓存在内的完整应用栈。云部署平台如Render、Railway或Google Cloud Run都原生支持Docker,部署过程会非常顺畅。

5.2 监控、日志与错误处理

一个健壮的服务离不开可观测性。在FastAPI中,可以集成像structlog这样的结构化日志库,将不同级别的日志(INFO、WARNING、ERROR)输出到控制台或日志文件,并包含请求ID、用户标识等上下文信息,便于问题追踪。

对于错误处理,FastAPI有强大的异常处理机制。你需要为可能出现的错误定义清晰的HTTP状态码和错误信息,例如:

  • 422 Unprocessable Entity:请求参数验证失败。
  • 503 Service Unavailable:上游数据源(如雅虎财经)暂时不可用或数据处理管道出错。
  • 404 Not Found:请求的特定资产不存在。

全局的异常处理器可以确保任何未捕获的异常都不会直接暴露给用户,而是返回一个通用的500 Internal Server Error和相关的请求ID,方便后台排查。

5.3 项目的未来可能性

这个项目的边界远不止于当前的功能。它的开源和模块化设计为社区贡献和个性化扩展留下了巨大空间:

  1. 更多数据源:除了CVM和雅虎财经,可以集成B3的官方数据接口、宏观经济指标、甚至社交媒体情绪数据。
  2. 更复杂的策略:当前是单一的启发式算法。未来可以设计成“策略插件”模式,允许用户选择或上传自己的评分算法(例如,加入动量因子、质量因子等)。
  3. 实时通知:结合消息队列(如Celery + RabbitMQ),当某个资产的指标达到用户预设的阈值时(如P/VP跌破0.8),自动发送邮件或App推送。
  4. 前端仪表盘:基于React或Vue.js构建一个轻量级前端,可视化展示资产排名、历史趋势对比、投资组合模拟等。
  5. 社区化数据维护:建立一个机制,让社区用户共同维护和校验“代码-CNPJ”映射文件,甚至贡献数据清洗规则。

这个项目的终极愿景,是成为一个由社区驱动、透明、可信赖的巴西金融市场数据基础设施。它始于一个简单的个人需求,但通过清晰的架构和开源协作,完全有潜力成长为一个对更多投资者、开发者和研究者都有价值的工具。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询