Python实战:从Yahoo Finance抓取多股票历史数据与实时趋势可视化
2026/4/15 14:57:36
std::future长期以来缺乏对异步任务取消的原生支持,开发者不得不依赖外部标志位或第三方库实现取消逻辑。C++26引入了标准化的取消机制,显著增强了并发编程的可控性与资源管理能力。std::stop_token、std::stop_source和std::stop_callback与std::future集成,允许任务主动响应中断请求。// 示例:带取消支持的异步任务 #include <future> #include <thread> #include <iostream> void task_function(std::stop_token stoken) { for (int i = 0; i < 100; ++i) { if (stoken.stop_requested()) { std::cout << "任务被取消\n"; return; } std::this_thread::sleep_for(std::chrono::milliseconds(10)); } std::cout << "任务完成\n"; } int main() { auto future = std::async(std::launch::async, [](std::stop_token stoken) { task_function(stoken); }); std::this_thread::sleep_for(std::chrono::milliseconds(50)); future.cancel(); // C++26 新增接口:触发取消 future.wait(); return 0; }上述代码展示了如何通过future.cancel()触发任务取消,底层自动传播停止信号至绑定的stop_token。| 特性 | C++23 及之前 | C++26 |
|---|---|---|
| 取消支持 | 无原生支持 | 内置 cancel() 方法 |
| 协作机制 | 手动轮询标志 | 集成 stop_token |
| 异常传播 | 仅结果/异常获取 | 支持取消状态传递 |
ctx, cancel := context.WithCancel(parent) go func() { defer cancel() select { case <-time.After(5 * time.Second): // 模拟耗时操作 case <-ctx.Done(): return // 接收到取消信号 } }()上述代码通过context.WithCancel创建可取消上下文,子协程监听ctx.Done()通道。一旦父上下文触发取消,所有派生协程将同步收到中断信号,形成树状传播路径。| 策略 | 响应延迟 | 资源回收 |
|---|---|---|
| 立即中断 | 低 | 即时 |
| 等待周期 | 高 | 延迟 |
token := source.Token() go func() { select { case <-token.Done(): fmt.Println("任务已被取消") } }() source.Cancel() // 触发所有关联 token上述代码中,`cancellation_token` 通过 `Done()` 返回一个只读通道,当 `cancellation_source.Cancel()` 被调用时,通道关闭,监听协程立即感知并执行清理逻辑。type TaskStatus int const ( Pending TaskStatus = iota Running Completed Failed ) type Task struct { ID string Status atomic.Value // 原子化状态存储 Done chan struct{} }上述代码定义了任务状态枚举及线程安全的状态字段。通过 `atomic.Value` 实现无锁读写,提升并发性能。`Done` 通道用于通知任务终止,支持外部协程等待结果。func (t *Task) Run(ctx context.Context) error { go func() { select { case <-ctx.Done(): t.Status.Store(Failed) case <-t.Done: t.Status.Store(Completed) } }() return nil }当上下文被取消时,自动触发状态变更,确保资源及时释放,实现响应式中断。ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { defer cleanupResources() // 确保异常或正常退出时均执行 select { case <-doWork(): case <-ctx.Done(): // 协作式响应取消 return } }()上述代码中,ctx.Done()提供非阻塞取消通道,cleanupResources在函数退出时必然执行,形成异常安全保证。取消行为由接收方主动响应,避免了竞态与状态破坏。std::promise prom; std::future fut = prom.get_future(); std::thread t([&prom]() { prom.set_value(42); // 完成异步操作 }); t.detach();该代码中,`prom.set_value(42)` 触发未来对象就绪,与标准库行为一致,保证了线程安全与异常传播的一致性。CancellationToken协作式取消异步操作:public async Task GetDataAsync(CancellationToken token) { var client = new HttpClient(); try { var response = await client.GetAsync("https://api.example.com/data", token); return await response.Content.ReadAsStringAsync(); } catch (OperationCanceledException) { // 处理取消请求 throw; } }该方法接受一个CancellationToken,当用户触发取消(如点击“取消”按钮),调用tokenSource.Cancel()通知任务终止。此机制确保资源及时释放,避免内存泄漏。CancellationTokenSource来管理生命周期Token传递给所有支持取消的异步方法Cancel()触发中断context实现超时控制与批量取消:ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() httpRequest(ctx, id) // 所有请求共享同一上下文 }(i) } wg.Wait()上述代码中,WithTimeout创建带超时的上下文,一旦超时,所有子任务接收到取消信号。调用cancel()可主动终止所有正在进行的请求。ctx.Done()通道ctx传递至底层请求(如http.NewRequestWithContext)ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) go func() { select { case <-slowComputation(): // 正常完成 case <-ctx.Done(): log.Println("任务超时,已中止") } }()该代码利用context.WithTimeout创建带时限的上下文,当slowComputation未在5秒内完成时,自动触发取消信号,实现安全中止。template class cancellable_task { std::packaged_task task; std::atomic* cancelled; public: template cancellable_task(F&& f) : task(std::forward(f)), cancelled(new std::atomic{false}) {} void operator()() { if (!cancelled->load()) task(); } void cancel() { cancelled->store(true); } std::future get_future() { return task.get_future(); } };上述代码中,`cancelled` 指针指向共享的原子布尔值,任务执行前检查该标志。调用 `cancel()` 可提前终止执行。`get_future()` 用于获取结果句柄,实现异步通信。Future<?> future = executor.submit(() -> { while (!Thread.currentThread().isInterrupted()) { // 执行阶段性工作 if (taskCompleted) break; Thread.sleep(100); // 响应中断 } }); // 外部触发取消 future.cancel(true);该代码通过轮询当前线程的中断状态实现协作式取消。cancel(true)会中断执行线程,配合sleep确保及时响应。| 状态 | 是否可取消 | 资源释放 |
|---|---|---|
| 运行中 | 是 | 需显式清理 |
| 已完成 | 否 | 自动释放 |
std::jthread是 C++20 引入的线程类,相较于std::thread,它支持自动的线程生命周期管理和协作式中断。
使用std::jthread时,无需手动调用join()或detach(),其析构函数会自动调用join(),避免资源泄漏。
#include <thread> #include <iostream> void worker() { std::this_thread::sleep_for(std::chrono::seconds(2)); std::cout << "工作线程结束\n"; } int main() { std::jthread t(worker); // 自动管理生命周期 return 0; // 析构时自动 join }上述代码中,std::jthread在离开作用域时自动等待线程完成,简化了异常安全处理。
std::jthread内置std::stop_token,可响应中断请求;t.request_stop()发送停止信号;token.is_stop_requested()实现优雅退出。// 启用批量处理减少数据库往返 @Async public CompletableFuture<List<Result>> processBatch(List<Task> tasks) { List<Result> results = new ArrayList<>(); for (Task task : tasks) { results.add(expensiveOperation(task)); } return CompletableFuture.completedFuture(results); }上述异步批处理可降低上下文切换频率,提升 CPU 利用率。参数tasks建议控制在 50~200 之间,避免单批次过大导致 GC 停顿。// 实验性 Go 异步函数提案(非当前标准) async func FetchData(url string) []byte { resp := await http.Get(url) data := await resp.Body.Read() return data }| 协议 | 背压支持 | 跨语言兼容性 | 典型应用场景 |
|---|---|---|---|
| gRPC-Stream | 是 | 高 | 微服务间实时同步 |
| SSE | 有限 | 中 | Web 控制台日志推送 |