TaoCarts 反向海淘系统微服务架构设计:1688自动代采与高并发处理实战
2026/5/6 2:37:30
大家可以多看源码,看一下同样的功能,代码是如何构造的;
我们通常所说的线程池是指Java中的ThreadPoolExecutor,下面将详细说明线程池的参数、实现原理以及如何实现一个简单的线程池。
ThreadPoolExecutor有7个核心参数:
线程池的主要工作流程如下:
线程池中的线程执行完任务后,会从工作队列中获取新的任务来执行。如果超过keepAliveTime仍然没有获取到新任务,并且当前线程数大于核心线程数,则该线程会被终止,直到线程数等于核心线程数。
Java提供了4种内置拒绝策略:
AbortPolicy:直接抛出RejectedExecutionException(默认)
CallerRunsPolicy:由调用者线程执行该任务
DiscardPolicy:直接丢弃任务,不抛异常
DiscardOldestPolicy:丢弃队列中最老的任务,然后重试执行
FixedThreadPool(固定大小线程池)
CachedThreadPool(缓存线程池)
SingleThreadExecutor(单线程线程池)
ScheduledThreadPool(定时任务线程池)
WorkStealingPool(工作窃取线程池,Java 8+)
ForkJoinPool(分治线程池,与WorkStealingPool相关)
下面分别介绍这些线程池的特点和适用场景。
Executors.newFixedThreadPool(int nThreads)Executors.newCachedThreadPool()Executors.newSingleThreadExecutor()Executors.newScheduledThreadPool(int corePoolSize)Executors.newWorkStealingPool(int parallelism), parallelism并行级别,默认为CPU核数。new ForkJoinPool(int parallelism)因此,在实际使用中,通常建议根据业务场景自定义线程池参数,使用ThreadPoolExecutor构造函数来创建,以便更精确地控制线程池的行为。
常见配置建议:
//中断空闲的线程 private void interruptIdleWorkers(boolean onlyOne){final ReentrantLock mainLock=this.mainLock;//专用锁 mainLock.lock();//添加专用锁 try{for(Worker w:workers){Thread t=w.thread;if(!t.isInterrupted()&&w.tryLock()){try{t.interrupt();//阻塞当前线程}finally{w.unlock();}}if(onlyOne)break;}}finally{mainLock.unlock();//解除专用锁代码}}public booleantryLock(){returnthis.tryAcquire(1);}protected boolean tryAcquire(int unused){if(this.compareAndSetState(0,1)){//cas线程安全 this.setExclusiveOwnerThread(Thread.currentThread());returntrue;}else{returnfalse;}}public voidunlock(){this.release(1);}public final boolean release(int arg){if(tryRelease(arg)){signalNext(head);returntrue;}returnfalse;}protected boolean tryRelease(int unused){setExclusiveOwnerThread(null);setState(0);returntrue;}// setExclusiveOwnerThread(Thread.currentThread());线程非安全,但是由于有cas锁机制是线程安全的;protected boolean tryAcquire(int unused){if(compareAndSetState(0,1)){setExclusiveOwnerThread(Thread.currentThread());returntrue;}returnfalse;}protected boolean tryRelease(int unused){this.setExclusiveOwnerThread((Thread)null);this.setState(0);returntrue;}//若 tryAcquire 和 tryRelease分别两个专项锁,则可能出现线程安全的问题;设计规则上是 若 tryAcquire 和 tryRelease 一块使用 关键因素 ReentrantLock 机制保障 两个方法都在 ReentrantLock 的同步机制控制下 tryAcquire 和 tryRelease 方法的调用都受到同一把锁的保护 互斥访问 ReentrantLock 确保同一时刻只有一个线程能执行临界区代码 无论是获取锁还是释放锁,都是在受保护的上下文中进行 内存可见性 ReentrantLock 的获取和释放操作建立了 happens-before 关系 保证了 setExclusiveOwnerThread 操作的内存可见性 源码方法 private volatile ThreadFactory threadFactory;/** * Sets the thread factory used to create new threads. * * @param threadFactory the new thread factory * @throws NullPointerExceptionifthreadFactory is null * @see#getThreadFactory*/ public void setThreadFactory(ThreadFactory threadFactory){Objects.requireNonNull(threadFactory,"threadFactory");this.threadFactory=threadFactory;//线程安全单独的赋值操作}/** * Initiates an orderlyshutdowninwhichpreviously submitted * tasks are executed, but no new tasks will be accepted. * Invocation has no additional effectifalready shut down. * *<p>This method does notwaitforpreviously submitted tasks to * complete execution. Use{@link#awaitTermination awaitTermination}* todothat. */ public voidshutdown(){//关闭线程池 final ReentrantLock mainLock=this.mainLock;mainLock.lock();try{//如下多个操作(复合操作需要 手动方式枷锁) advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown();// hookforScheduledThreadPoolExecutor 钩子函数}finally{mainLock.unlock();}tryTerminate();}