3分钟掌握城通网盘终极直连解析技巧:告别广告,加速下载!
2026/4/9 20:36:23
关键词:多阶段同步、动态注册/注销、arrive/awaitAdvance、父子 Phaser、AQS、替代 CyclicBarrier、批处理/对账/并行编排
Phaser可以把它当成“升级版 CyclicBarrier”:
register()加人,也可以arriveAndDeregister()退人arrive之后,才会推进到下一阶段并唤醒等待者一句话:“可以动态增减参与者的多轮集合点/阶段门”。
当你发现CyclicBarrier不够用,通常是因为:
典型落地场景:
Phaser 内部维护(概念上):
phase:当前阶段号(从 0 开始递增)registeredParties:已注册参与者数量(本阶段需要到齐的人数)arrivedParties:当前阶段已经到达的人数unarrived = registered - arrived:还差多少人没到齐每个参与者完成当前阶段后调用:
arrive():到达但不等待arriveAndAwaitAdvance():到达并等待推进到下一阶段arriveAndDeregister():到达并退出后续阶段(动态减人)当unarrived == 0:
phase++Phaser 的实现非常“工程”:内部用原子变量/位运算把 phase、parties、unarrived 等打包,并结合 CAS;等待/唤醒机制类似于 AQS 的队列/park-unpark 思路(JDK 实现细节挺复杂,但你只要记住:它能在高并发下安全推进 phase)。
你可以继承 Phaser,重写:
protectedbooleanonAdvance(intphase,intregisteredParties)true表示终止 phaser(后续等待会立即返回)CyclicBarrierPhaser经验:
awaitAdvanceInterruptibly(phase, timeout, unit)做超时等待。给你 5 个常用模板:
场景:对账 4 阶段:拉取 → 计算 → 校验 → 落库。某些 worker 失败后不再参与后续阶段。
importjava.util.concurrent.*;importjava.util.*;publicclassReconcilePhasedJob{privatefinalExecutorServicepool;publicReconcilePhasedJob(ExecutorServicepool){this.pool=pool;}publicvoidrun(intworkers)throwsInterruptedException{Phaserphaser=newPhaser(workers){@OverrideprotectedbooleanonAdvance(intphase,intregisteredParties){System.out.println("[PHASE] advanced to phase="+(phase+1)+", registeredParties(next)="+registeredParties);// registeredParties==0 表示没人参与了,可以终止returnregisteredParties==0;}};CountDownLatchfinished=newCountDownLatch(workers);for(inti=0;i<workers;i++){finalintid=i;pool.submit(()->{try{if(!phaseFetch(id)){phaser.arriveAndDeregister();return;}phaser.arriveAndAwaitAdvance();if(!phaseCompute(id)){phaser.arriveAndDeregister();return;}phaser.arriveAndAwaitAdvance();if(!phaseVerify(id)){phaser.arriveAndDeregister();return;}phaser.arriveAndAwaitAdvance();phasePersist(id);phaser.arriveAndDeregister();// 最后阶段完成退出}finally{finished.countDown();}});}finished.await();System.out.println("[JOB] done");}privatebooleanphaseFetch(intid){sleep(120+id*10);returntrue;}privatebooleanphaseCompute(intid){sleep(80+id*5);returntrue;}privatebooleanphaseVerify(intid){sleep(60+id*3);// mock:某个 worker 验证失败returnid!=2;}privatevoidphasePersist(intid){sleep(50);}privatestaticvoidsleep(longms){try{Thread.sleep(ms);}catch(InterruptedExceptionignored){Thread.currentThread().interrupt();}}}这个模板的核心价值:
arriveAndDeregister()退出,后续阶段不会再等它(这就是 Phaser 秒杀 CyclicBarrier 的点)。场景:你先并行处理 N 个分片,处理过程中发现某个分片太大,需要拆出更多子任务继续处理。
importjava.util.concurrent.*;importjava.util.*;publicclassDynamicTaskPhaserDemo{privatefinalExecutorServicepool;publicDynamicTaskPhaserDemo(ExecutorServicepool){this.pool=pool;}publicvoidrun(List<String>shards)throwsInterruptedException{// 1 个“主控”参与者:用于等待所有任务结束Phaserphaser=newPhaser(1);for(Stringshard:shards){phaser.register();// 新任务加入pool.submit(()->{try{processShard(shard,phaser);}finally{phaser.arriveAndDeregister();// 任务结束退出}});}// 主控到达并等待所有任务完成(所有任务 deregister 后 parties 归 0)phaser.arriveAndDeregister();// 主控也退出// 注意:这里不需要额外 await,onAdvance 可终止,也可用下面的方式等待:// while (!phaser.isTerminated()) Thread.yield();}privatevoidprocessShard(Stringshard,Phaserphaser){// mock:遇到大分片,拆 2 个子任务if(shard.startsWith("BIG")){for(inti=0;i<2;i++){phaser.register();Stringsub=shard+"-sub"+i;pool.submit(()->{try{doWork(sub);}finally{phaser.arriveAndDeregister();}});}}doWork(shard);}privatevoiddoWork(Stringname){try{Thread.sleep(100);}catch(InterruptedExceptionignored){Thread.currentThread().interrupt();}System.out.println("done: "+name);}}注意点:
Phaser 的“正确用法”之一:自己拿到当前 phase,然后用带超时等待推进。
importjava.util.concurrent.*;publicclassPhaserTimeoutExample{publicvoiddemo(Phaserphaser)throwsException{intphase=phaser.getPhase();// 我到达本阶段(但不等)phaser.arrive();try{// 等待推进到下一阶段:可中断、可超时phaser.awaitAdvanceInterruptibly(phase,300,TimeUnit.MILLISECONDS);}catch(TimeoutExceptione){// 超时策略:降级/退出/报警System.err.println("phase wait timeout, phase="+phase);}}}场景:你有 3 个小组,每组内部要先对齐,然后各组 leader 再对齐推进全局。
importjava.util.concurrent.*;publicclassHierarchicalPhaserDemo{publicvoidrun()throwsInterruptedException{Phaserroot=newPhaser(3);// 3 个组 leaderPhasergroupA=newPhaser(root,3);// root 作为 parentPhasergroupB=newPhaser(root,3);PhasergroupC=newPhaser(root,3);ExecutorServicepool=Executors.newFixedThreadPool(9);runGroup(pool,"A",groupA);runGroup(pool,"B",groupB);runGroup(pool,"C",groupC);// 等 root 推进 1 个阶段(所有组 leader 到齐)intp0=root.getPhase();root.awaitAdvance(p0);System.out.println("[ROOT] all groups finished phase0");pool.shutdown();pool.awaitTermination(3,TimeUnit.SECONDS);}privatevoidrunGroup(ExecutorServicepool,Stringname,Phasergroup){for(inti=0;i<3;i++){finalintid=i;pool.submit(()->{doWork(name+"-"+id);group.arriveAndDeregister();// 组内到齐后,会自动通知 parent(root)});}}privatevoiddoWork(Stringtag){try{Thread.sleep(80);}catch(InterruptedExceptionignored){Thread.currentThread().interrupt();}System.out.println("done "+tag);}}理解要点:
场景:系统启动时预热模块数量不固定(可插拔),每个模块启动时 register,完成时 deregister。
importorg.springframework.boot.ApplicationRunner;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.concurrent.*;@ConfigurationpublicclassWarmupWithPhaserConfig{@Bean(destroyMethod="shutdown")publicExecutorServicewarmupPool(){returnExecutors.newFixedThreadPool(8);}@BeanpublicApplicationRunnerwarmupRunner(ExecutorServicewarmupPool){returnargs->{Phaserphaser=newPhaser(1);// 主控// 动态模块列表(示例)String[]modules={"cache","dict","rule","downstreamPing"};for(Stringm:modules){phaser.register();warmupPool.submit(()->{try{warmModule(m);}finally{phaser.arriveAndDeregister();}});}// 主控到达并等待所有模块完成(带超时)intphase=phaser.arrive();try{phaser.awaitAdvanceInterruptibly(phase,3,TimeUnit.SECONDS);System.out.println("[WARMUP] all modules done");}catch(TimeoutExceptione){System.err.println("[WARMUP] timeout -> start with degraded mode");}finally{phaser.arriveAndDeregister();// 主控退出}};}privatevoidwarmModule(Stringm){try{Thread.sleep(200);}catch(InterruptedExceptionignored){Thread.currentThread().interrupt();}System.out.println("warm done: "+m);}}register()必须有配对arriveAndDeregister()(不然必挂)awaitAdvanceInterruptibly(phase, timeout),别无限等onAdvance只做轻活(标记、计数、日志),重活丢线程池