别再到处找数据了!用Python+Tushare Pro免费获取A股行情、财务、资金流数据(附完整代码)
2026/6/4 5:45:55 网站建设 项目流程

Python+Tushare Pro:构建个人金融数据库的实战指南

刚接触量化分析时,最头疼的就是数据获取。要么接口昂贵,要么数据不全,要么稳定性差。直到发现Tushare Pro这个宝藏工具——它不仅免费提供沪深股票全量数据,还包含行情、财务、资金流等核心维度。本文将手把手教你用Python+Tushare Pro搭建自动化金融数据库,并实现多因子选股分析。

1. 环境配置与基础准备

1.1 注册与Token获取

首先访问Tushare Pro官网完成注册。免费账户可获得基础权限,每日调用限额足够个人研究使用。注册成功后,在个人中心找到API Token,这是所有数据调用的通行证。

import tushare as ts TOKEN = "你的Token" # 替换为实际获取的字符串 ts.set_token(TOKEN) pro = ts.pro_api() # 初始化接口

1.2 安装与版本检查

通过pip安装最新版库,并验证版本兼容性:

pip install tushare --upgrade

检查安装是否成功:

print(f"当前Tushare版本:{ts.__version__}") # 输出示例:当前Tushare版本:1.2.3

2. 核心数据获取实战

2.1 股票基础信息抓取

获取全量股票列表是构建数据库的第一步:

stock_list = pro.stock_basic( exchange='', list_status='L', fields='ts_code,symbol,name,industry,list_date' ) print(stock_list.head())

关键参数说明:

  • exchange: 交易所代码(SSE/SZSE),空值表示全部
  • list_status: L-上市 D-退市 P-暂停上市
  • fields: 指定返回字段,减少不必要的数据传输

2.2 行情数据获取技巧

获取日K线数据时,推荐使用批量查询模式:

# 获取多只股票最近30个交易日数据 daily_data = pro.daily( ts_code='600519.SH,000858.SZ', start_date=(datetime.now()-timedelta(days=30)).strftime('%Y%m%d'), end_date=datetime.now().strftime('%Y%m%d') )

注意:Tushare的日期格式统一为YYYYMMDD,需特别注意格式转换

行情数据字段说明:

字段名说明类型
open开盘价float
high最高价float
low最低价float
close收盘价float
vol成交量(手)float
amount成交额(千元)float

2.3 财务数据深度获取

财务数据是量化分析的核心,Tushare提供完整的财务报表接口:

# 获取贵州茅台最新季度利润表 income = pro.income( ts_code='600519.SH', start_date='20220101', end_date='20221231', period='20220331' # 指定财报期 )

财务数据获取策略:

  1. 按季度获取避免数据量过大
  2. 使用period参数精准定位财报期
  3. 优先获取关键指标(ROE、毛利率等)

3. 数据存储与管理方案

3.1 本地CSV存储方案

最简单的存储方式是使用pandas直接保存CSV:

def save_to_csv(data, filename): data.to_csv(f'./data/{filename}.csv', index=False, encoding='utf_8_sig') # 示例:保存沪深300成分股数据 hs300 = pro.hs300() save_to_csv(hs300, 'hs300_constituents')

3.2 SQLite数据库方案

对于大量历史数据,推荐使用SQLite进行管理:

import sqlite3 from sqlalchemy import create_engine # 创建数据库引擎 engine = create_engine('sqlite:///quant.db') # 存储日线数据到数据库 daily_data.to_sql('daily_price', engine, if_exists='append', index=False)

数据库表结构设计建议:

  • 按数据类型分表(price, finance, index等)
  • 添加时间戳字段便于版本管理
  • 建立复合索引提高查询效率

3.3 自动化更新策略

实现定时更新的完整方案:

def auto_update(): # 获取最近一个交易日 trade_cal = pro.trade_cal(exchange='SSE', is_open='1') last_date = trade_cal.iloc[-1]['cal_date'] # 增量更新数据 new_data = pro.daily(trade_date=last_date) save_to_database(new_data) # 设置定时任务(每天17:00运行) schedule.every().day.at("17:00").do(auto_update)

4. 多因子分析实战案例

4.1 数据整合方法

构建分析数据集的关键步骤:

# 获取基础指标 basic = pro.daily_basic(trade_date='20230630') # 获取财务指标 finance = pro.fina_indicator(period='20220331') # 合并数据集 merged_data = pd.merge( basic, finance, on='ts_code', how='inner' )

4.2 简单选股策略实现

实现PE+ROE双因子筛选:

def select_stocks(data): # 条件筛选 condition = ( (data['pe'] < 15) & (data['pe'] > 0) & (data['roe'] > 0.15) ) return data[condition] good_stocks = select_stocks(merged_data) print(f"筛选出优质股票{len(good_stocks)}只")

4.3 策略回测框架

简易回测框架示例:

def backtest(stock_list, start_date, end_date): # 获取历史行情 all_data = [] for code in stock_list['ts_code']: hist = pro.daily( ts_code=code, start_date=start_date, end_date=end_date ) all_data.append(hist) # 计算组合收益 full_data = pd.concat(all_data) portfolio_return = full_data.groupby('trade_date')['pct_chg'].mean() return portfolio_return.cumsum() # 执行回测 returns = backtest(good_stocks, '20200101', '20221231')

5. 性能优化与高级技巧

5.1 批量请求优化

使用并行请求大幅提升数据获取效率:

from concurrent.futures import ThreadPoolExecutor def get_batch_data(codes): with ThreadPoolExecutor(max_workers=5) as executor: results = list(executor.map( lambda x: pro.daily(ts_code=x, start_date='20230101'), codes )) return pd.concat(results) # 示例:批量获取前100只股票数据 codes = stock_list.head(100)['ts_code'].tolist() batch_data = get_batch_data(codes)

5.2 缓存机制实现

使用磁盘缓存避免重复请求:

from functools import lru_cache import pickle import hashlib def cache_request(func): def wrapper(*args, **kwargs): # 生成唯一缓存key key = hashlib.md5(str(args+tuple(kwargs.items())).encode()).hexdigest() cache_file = f'./cache/{key}.pkl' # 检查缓存 if os.path.exists(cache_file): with open(cache_file, 'rb') as f: return pickle.load(f) # 执行请求并缓存 result = func(*args, **kwargs) with open(cache_file, 'wb') as f: pickle.dump(result, f) return result return wrapper # 装饰接口方法 pro.daily = cache_request(pro.daily)

5.3 异常处理策略

健壮的数据获取需要完善的错误处理:

def safe_query(func, max_retry=3, **kwargs): for i in range(max_retry): try: return func(**kwargs) except Exception as e: print(f"请求失败,重试 {i+1}/{max_retry}") time.sleep(2**i) # 指数退避 raise Exception("超过最大重试次数") # 安全调用示例 data = safe_query(pro.daily, ts_code='600519.SH', start_date='20230101')

在实际项目中,最耗时的部分是历史数据初始化。我的经验是分阶段获取:先抓取最近3年数据快速验证策略,待核心逻辑跑通后再补充完整历史数据。对于财务数据,特别注意财报披露的滞后性,一般季报在期末后1个月内才会完整更新。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询