当前位置: 首页 > news >正文

万网如何建设购物网站公司logo墙设计图片

万网如何建设购物网站,公司logo墙设计图片,洛阳 网站建设公司哪家好,哈尔滨seo优化专注线程池 一、什么是线程池 在开发中#xff0c;为了提升效率的操作#xff0c;我们需要将一些业务采用多线程的方式去执行。 比如有一个比较大的任务#xff0c;可以将任务分成几块#xff0c;分别交给几个线程去执行#xff0c;最终做一个汇总就可以了。 比如做业务操…线程池 一、什么是线程池 在开发中为了提升效率的操作我们需要将一些业务采用多线程的方式去执行。 比如有一个比较大的任务可以将任务分成几块分别交给几个线程去执行最终做一个汇总就可以了。 比如做业务操作时需要发送短信或者是发送邮件这种操作也可以基于异步的方式完成这种异步的方式其实就是再构建一个线程去执行。 但是如果每次异步操作或者多线程操作都需要新创建一个线程使用完毕后线程再被销毁这样的话对系统造成一些额外的开销。在处理过程中到底由多线程处理了多少个任务以及每个线程的开销无法统计和管理。 所以咱们需要一个线程池机制来管理这些内容。线程池的概念和连接池类似都是在一个Java的集合中存储大量的线程对象每次需要执行异步操作或者多线程操作时不需要重新创建线程直接从集合中拿到线程对象直接执行方法就可以了。 JDK中就提供了线程池的类。 在线程池构建初期可以将任务提交到线程池中。会根据一定的机制来异步执行这个任务。 可能任务直接被执行任务可以暂时被存储起来了。等到有空闲线程再来处理。任务也可能被拒绝无法被执行。 JDK提供的线程池中记录了每个线程处理了多少个任务以及整个线程池处理了多少个任务。同时还可以针对任务执行前后做一些勾子函数的实现。可以在任务执行前后做一些日志信息这样可以多记录信息方便后面统计线程池执行任务时的一些内容参数等等…… 二、JDK自带的构建线程池的方式 JDK中基于Executors提供了很多种线程池 2.1 newFixedThreadPool 这个线程池的特别是线程数是固定的。 在Executors中第一个方法就是构建newFixedThreadPool public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueueRunnable()); }构建时需要给newFixedThreadPool方法提供一个nThreads的属性而这个属性其实就是当前线程池中线程的个数。当前线程池的本质其实就是使用ThreadPoolExecutor。 构建好当前线程池后线程个数已经固定好**线程是懒加载在构建之初线程并没有构建出来而是随着人任务的提交才会将线程在线程池中国构建出来**。如果线程没构建线程会待着任务执行被创建和执行。如果线程都已经构建好了此时任务会被放到LinkedBlockingQueue无界队列中存放等待线程从LinkedBlockingQueue中去take出任务然后执行。 测试功能效果 public static void main(String[] args) throws Exception {ExecutorService threadPool Executors.newFixedThreadPool(3);threadPool.execute(() - {System.out.println(1号任务 Thread.currentThread().getName() System.currentTimeMillis());try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}});threadPool.execute(() - {System.out.println(2号任务 Thread.currentThread().getName() System.currentTimeMillis());try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}});threadPool.execute(() - {System.out.println(3号任务 Thread.currentThread().getName() System.currentTimeMillis());try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}); }2.2 newSingleThreadExecutor 这个线程池看名字就知道是单例线程池线程池中只有一个工作线程在处理任务 如果业务涉及到顺序消费可以采用newSingleThreadExecutor // 当前这里就是构建单例线程池的方式 public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService// 在内部依然是构建了ThreadPoolExecutor设置的线程个数为1// 当任务投递过来后第一个任务会被工作线程处理后续的任务会被扔到阻塞队列中// 投递到阻塞队列中任务的顺序就是工作线程处理的顺序// 当前这种线程池可以用作顺序处理的一些业务中(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueueRunnable())); }static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {// 线程池的使用没有区别跟正常的ThreadPoolExecutor没区别FinalizableDelegatedExecutorService(ExecutorService executor) {super(executor);}// finalize是当前对象被GC干掉之前要执行的方法// 当前FinalizableDelegatedExecutorService的目的是为了在当前线程池被GC回收之前// 可以执行shutdownshutdown方法是将当前线程池停止并且干掉工作线程// 但是不能基于这种方式保证线程池一定会执行shutdown// finalize在执行时是守护线程这种线程无法保证一定可以执行完毕。// 在使用线程池时如果线程池是基于一个业务构建的在使用完毕之后一定要手动执行shutdown// 否则会造成JVM中一堆线程protected void finalize() {super.shutdown();} }测试单例线程池效果 public static void main(String[] args) throws Exception {ExecutorService threadPool Executors.newSingleThreadExecutor();threadPool.execute(() - {System.out.println(Thread.currentThread().getName() , 111);});threadPool.execute(() - {System.out.println(Thread.currentThread().getName() , 222);});threadPool.execute(() - {System.out.println(Thread.currentThread().getName() , 333);});threadPool.execute(() - {System.out.println(Thread.currentThread().getName() , 444);}); }测试线程池使用完毕后不执行shutdown的后果 如果是局部变量仅限当前线程池使用的线程池在使用完毕之后要记得执行shutdown避免线程无法结束 如果是全局的线程池很多业务都会到使用完毕后不要shutdown因为其他业务也要执行当前线程池 static ExecutorService threadPool Executors.newFixedThreadPool(200);public static void main(String[] args) throws Exception {newThreadPool();System.gc();Thread.sleep(5000);System.out.println(线程池被回收了);System.in.read(); }private static void newThreadPool(){for (int i 0; i 200; i) {final int a i;threadPool.execute(() - {System.out.println(a);});}threadPool.shutdown();for (int i 0; i 200; i) {final int a i;threadPool.execute(() - {System.out.println(a);});} }2.3 newCachedThreadPool 看名字好像是一个缓存的线程池查看一下构建的方式 public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueueRunnable()); }当第一次提交任务到线程池时会直接构建一个工作线程 这个工作线程带执行完人后60秒没有任务可以执行后会结束 如果在等待60秒期间有任务进来他会再次拿到这个任务去执行 如果后续提升任务时没有线程是空闲的那么就构建工作线程去执行。 最大的一个特点任务只要提交到当前的newCachedThreadPool中就必然有工作线程可以处理 代码测试效果 public static void main(String[] args) throws Exception {ExecutorService executorService Executors.newCachedThreadPool();for (int i 1; i 200; i) {final int j i;executorService.execute(() - {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() : j);});} }2.4 newScheduleThreadPool 首先看到名字就可以猜到当前线程池是一个定时任务的线程池而这个线程池就是可以以一定周期去执行一个任务或者是延迟多久执行一个任务一次 查看一下如何构建的。 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize); }基于这个方法可以看到构建的是ScheduledThreadPoolExecutor线程池 public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor{//.... }所以本质上还是正常线程池只不过在原来的线程池基础上实现了定时任务的功能 原理是基于DelayQueue实现的延迟执行。周期性执行是任务执行完毕后再次扔回到阻塞队列。 代码查看使用的方式和效果 public static void main(String[] args) throws Exception {ScheduledExecutorService pool Executors.newScheduledThreadPool(10);// 正常执行 // pool.execute(() - { // System.out.println(Thread.currentThread().getName() 1); // });// 延迟执行执行当前任务延迟5s后再执行 // pool.schedule(() - { // System.out.println(Thread.currentThread().getName() 2); // },5,TimeUnit.SECONDS);// 周期执行当前任务第一次延迟5s执行然后没3s执行一次// 这个方法在计算下次执行时间时是从任务刚刚开始时就计算。 // pool.scheduleAtFixedRate(() - { // try { // Thread.sleep(3000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // System.out.println(System.currentTimeMillis() 3); // },2,1,TimeUnit.SECONDS);// 周期执行当前任务第一次延迟5s执行然后没3s执行一次// 这个方法在计算下次执行时间时会等待任务结束后再计算时间pool.scheduleWithFixedDelay(() - {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(System.currentTimeMillis() 3);},2,1,TimeUnit.SECONDS); }至于Executors提供的newSingleThreadScheduledExecutor单例的定时任务线程池就不说了。 一个线程的线程池可以延迟或者以一定的周期执行一个任务。 2.5 newWorkStealingPool 当前JDK提供构建线程池的方式newWorkStealingPool和之前的线程池很非常大的区别 之前定长单例缓存定时任务都基于ThreadPoolExecutor去实现的。 newWorkStealingPool是基于ForkJoinPool构建出来的 ThreadPoolExecutor的核心点 在ThreadPoolExecutor中只有一个阻塞队列存放当前任务 ForkJoinPool的核心特点 ForkJoinPool从名字上就能看出一些东西。当有一个特别大的任务时如果采用上述方式这个大任务只能会某一个线程去执行。ForkJoin第一个特点是可以将一个大任务拆分成多个小任务放到当前线程的阻塞队列中。其他的空闲线程就可以去处理有任务的线程的阻塞队列中的任务 来一个比较大的数组里面存满值计算总和 单线程处理一个任务 /** 非常大的数组 */ static int[] nums new int[1_000_000_000]; // 填充值 static{for (int i 0; i nums.length; i) {nums[i] (int) ((Math.random()) * 1000);} } public static void main(String[] args) {// 单线程累加10亿数据System.out.println(单线程计算数组总和);long start System.nanoTime();int sum 0;for (int num : nums) {sum num;}long end System.nanoTime();System.out.println(单线程运算结果为 sum 计算时间为 (end - start)); }多线程分而治之的方式处理 /** 非常大的数组 */ static int[] nums new int[1_000_000_000]; // 填充值 static{for (int i 0; i nums.length; i) {nums[i] (int) ((Math.random()) * 1000);} } public static void main(String[] args) {// 单线程累加10亿数据System.out.println(单线程计算数组总和);long start System.nanoTime();int sum 0;for (int num : nums) {sum num;}long end System.nanoTime();System.out.println(单线程运算结果为 sum 计算时间为 (end - start));// 多线程分而治之累加10亿数据// 在使用forkJoinPool时不推荐使用Runnable和Callable// 可以使用提供的另外两种任务的描述方式// Runnable(没有返回结果) - RecursiveAction// Callable(有返回结果) - RecursiveTaskForkJoinPool forkJoinPool (ForkJoinPool) Executors.newWorkStealingPool();System.out.println(分而治之计算数组总和);long forkJoinStart System.nanoTime();ForkJoinTaskInteger task forkJoinPool.submit(new SumRecursiveTask(0, nums.length - 1));Integer result task.join();long forkJoinEnd System.nanoTime();System.out.println(分而治之运算结果为 result 计算时间为 (forkJoinEnd - forkJoinStart)); }private static class SumRecursiveTask extends RecursiveTaskInteger{/** 指定一个线程处理哪个位置的数据 */private int start,end;private final int MAX_STRIDE 100_000_000;// 200_000_000 147964900// 100_000_000 145942100public SumRecursiveTask(int start, int end) {this.start start;this.end end;}Overrideprotected Integer compute() {// 在这个方法中需要设置好任务拆分的逻辑以及聚合的逻辑int sum 0;int stride end - start;if(stride MAX_STRIDE){// 可以处理任务for (int i start; i end; i) {sum nums[i];}}else{// 将任务拆分分而治之。int middle (start end) / 2;// 声明为2个任务SumRecursiveTask left new SumRecursiveTask(start, middle);SumRecursiveTask right new SumRecursiveTask(middle 1, end);// 分别执行两个任务left.fork();right.fork();// 等待结果并且获取sumsum left.join() right.join();}return sum;} }最终可以发现这种累加的操作中采用分而治之的方式效率提升了2倍多。 但是也不是所有任务都能拆分提升效率首先任务得大耗时要长。 三、ThreadPoolExecutor应用源码剖析 前面讲到的Executors中的构建线程池的方式大多数还是基于ThreadPoolExecutor去new出来的。 3.1 为什么要自定义线程池 首先ThreadPoolExecutor中一共提供了7个参数每个参数都是非常核心的属性在线程池去执行任务时每个参数都有决定性的作用。 但是如果直接采用JDK提供的方式去构建可以设置的核心参数最多就两个这样就会导致对线程池的控制粒度很粗。所以在阿里规范中也推荐自己去自定义线程池。手动的去new ThreadPoolExecutor设置他的一些核心属性。 自定义构建线程池可以细粒度的控制线程池去管理内存的属性并且针对一些参数的设置可能更好的在后期排查问题。 查看一下ThreadPoolExecutor提供的七个核心参数 public ThreadPoolExecutor(int corePoolSize, // 核心工作线程当前任务执行结束后不会被销毁int maximumPoolSize, // 最大工作线程代表当前线程池中一共可以有多少个工作线程long keepAliveTime, // 非核心工作线程在阻塞队列位置等待的时间TimeUnit unit, // 非核心工作线程在阻塞队列位置等待时间的单位BlockingQueueRunnable workQueue, // 任务在没有核心工作线程处理时任务先扔到阻塞队列中ThreadFactory threadFactory, // 构建线程的线程工作可以设置thread的一些信息RejectedExecutionHandler handler) { // 当线程池无法处理投递过来的任务时执行当前的拒绝策略// 初始化线程池的操作 }3.2 ThreadPoolExecutor应用 手动new一下处理的方式还是执行execute或者submit方法。 JDK提供的几种拒绝策略 AbortPolicy当前拒绝策略会在无法处理任务时直接抛出一个异常public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException(Task r.toString() rejected from e.toString()); }CallerRunsPolicy当前拒绝策略会在线程池无法处理任务时将任务交给调用者处理public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();} }DiscardPolicy当前拒绝策略会在线程池无法处理任务时直接将任务丢弃掉public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }DiscardOldestPolicy当前拒绝策略会在线程池无法处理任务时将队列中最早的任务丢弃掉将当前任务再次尝试交给线程池处理public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);} }自定义Policy根据自己的业务可以将任务扔到数据库也可以做其他操作。private static class MyRejectedExecution implements RejectedExecutionHandler{Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println(根据自己的业务情况决定编写的代码);} }代码构建线程池并处理有无返回结果的任务 public static void main(String[] args) throws ExecutionException, InterruptedException {//1. 构建线程池ThreadPoolExecutor threadPool new ThreadPoolExecutor(2,5,10,TimeUnit.SECONDS,new ArrayBlockingQueue(5),new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {Thread thread new Thread(r);thread.setName(test-ThreadPoolExecutor);return thread;}},new MyRejectedExecution());//2. 让线程池处理任务,没返回结果threadPool.execute(() - {System.out.println(没有返回结果的任务);});//3. 让线程池处理有返回结果的任务FutureObject future threadPool.submit(new CallableObject() {Overridepublic Object call() throws Exception {System.out.println(我有返回结果);return 返回结果;}});Object result future.get();System.out.println(result);//4. 如果是局部变量的线程池记得用完要shutdownthreadPool.shutdown(); }private static class MyRejectedExecution implements RejectedExecutionHandler{Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println(根据自己的业务情况决定编写的代码);} }### 3.3 ThreadPoolExecutor源码剖析线程池的源码内容会比较多一点需要一点一点的去查看内部比较多。#### 3.3.1 ThreadPoolExecutor的核心属性核心属性主要就是ctl基于ctl拿到线程池的状态以及工作线程个数在整个线程池的执行流程中会基于ctl判断上述两个内容java // 当前是线程池的核心属性 // 当前的ctl其实就是一个int类型的数值内部是基于AtomicInteger套了一层进行运算时是原子性的。 // ctl表示着线程池中的2个核心状态 // 线程池的状态ctl的高3位表示线程池状态 // 工作线程的数量ctl的低29位表示工作线程的个数 private final AtomicInteger ctl new AtomicInteger(ctlOf(RUNNING, 0));// Integer.SIZE在获取Integer的bit位个数 // 声明了一个常量COUNT_BITS 29 private static final int COUNT_BITS Integer.SIZE - 3; 00000000 00000000 00000000 00000001 00100000 00000000 00000000 00000000 00011111 11111111 11111111 11111111 // CAPACITY就是当前工作线程能记录的工作线程的最大个数 private static final int CAPACITY (1 COUNT_BITS) - 1;// 线程池状态的表示 // 当前五个状态中只有RUNNING状态代表线程池没问题可以正常接收任务处理 // 111代表RUNNING状态RUNNING可以处理任务并且处理阻塞队列中的任务。 private static final int RUNNING -1 COUNT_BITS; // 000代表SHUTDOWN状态不会接收新任务正在处理的任务正常进行阻塞队列的任务也会做完。 private static final int SHUTDOWN 0 COUNT_BITS; // 001代表STOP状态不会接收新任务正在处理任务的线程会被中断阻塞队列的任务一个不管。 private static final int STOP 1 COUNT_BITS; // 010代表TIDYING状态这个状态是否SHUTDOWN或者STOP转换过来的代表当前线程池马上关闭就是过渡状态。 private static final int TIDYING 2 COUNT_BITS; // 011代表TERMINATED状态这个状态是TIDYING状态转换过来的转换过来只需要执行一个terminated方法。 private static final int TERMINATED 3 COUNT_BITS;// 在使用下面这几个方法时需要传递ctl进来// 基于运算的特点保证只会拿到ctl高三位的值。 private static int runStateOf(int c) { return c ~CAPACITY; } // 基于运算的特点保证只会拿到ctl低29位的值。 private static int workerCountOf(int c) { return c CAPACITY; }线程池状态的特点以及转换的方式 3.3.2 ThreadPoolExecutor的有参构造 有参构造没啥说的记住核心线程个数是允许为0的。 // 有参构造。无论调用哪个有参构造最终都会执行当前的有参构造 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueueRunnable workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {// 健壮性校验// 核心线程个数是允许为0个的。// 最大线程数必须大于0最大线程数要大于等于核心线程数// 非核心线程的最大空闲时间可以等于0if (corePoolSize 0 ||maximumPoolSize 0 ||maximumPoolSize corePoolSize ||keepAliveTime 0)// 不满足要求就抛出参数异常throw new IllegalArgumentException();// 阻塞队列线程工厂拒绝策略都不允许为null为null就扔空指针异常if (workQueue null || threadFactory null || handler null)throw new NullPointerException();// 不要关注当前内容系统资源访问决策和线程池核心业务关系不大。this.acc System.getSecurityManager() null ? null : AccessController.getContext();// 各种赋值JUC包下几乎所有涉及到线程挂起的操作单位都用纳秒。// 有参构造的值都赋值给成员变量。// Doug Lea的习惯就是将成员变量作为局部变量单独操作。this.corePoolSize corePoolSize;this.maximumPoolSize maximumPoolSize;this.workQueue workQueue;this.keepAliveTime unit.toNanos(keepAliveTime);this.threadFactory threadFactory;this.handler handler; }3.3.3 ThreadPoolExecutor的execute方法 execute方法是提交任务到线程池的核心方法很重要 线程池的执行流程其实就是在说execute方法内部做了哪些判断 execute源码的分析 // 提交任务到线程池的核心方法 // command就是提交过来的任务 public void execute(Runnable command) {// 提交的任务不能为nullif (command null)throw new NullPointerException();// 获取核心属性ctl用于后面的判断int c ctl.get();// 如果工作线程个数小于核心线程数。// 满足要求添加核心工作线程if (workerCountOf(c) corePoolSize) {// addWorker(任务,是核心线程吗)// addWorker返回true代表添加工作线程成功// addWorker返回false代表添加工作线程失败// addWorker中会基于线程池状态以及工作线程个数做判断查看能否添加工作线程if (addWorker(command, true))// 工作线程构建出来了任务也交给command去处理了。return;// 说明线程池状态或者是工作线程个数发生了变化导致添加失败重新获取一次ctlc ctl.get();}// 添加核心工作线程失败往这走// 判断线程池状态是否是RUNNING如果是正常基于阻塞队列的offer方法将任务添加到阻塞队列if (isRunning(c) workQueue.offer(command)) {// 如果任务添加到阻塞队列成功走if内部// 如果任务在扔到阻塞队列之前线程池状态突然改变了。// 重新获取ctlint recheck ctl.get();// 如果线程池的状态不是RUNNING将任务从阻塞队列移除if (!isRunning(recheck) remove(command))// 并且直接拒绝策略reject(command);// 在这说明阻塞队列有我刚刚放进去的任务// 查看一下工作线程数是不是0个// 如果工作线程为0个需要添加一个非核心工作线程去处理阻塞队列中的任务// 发生这种情况有两种// 1. 构建线程池时核心线程数是0个。// 2. 即便有核心线程可以设置核心线程也允许超时设置allowCoreThreadTimeOut为true代表核心线程也可以超时else if (workerCountOf(recheck) 0)// 为了避免阻塞队列中的任务饥饿添加一个非核心工作线程去处理addWorker(null, false);}// 任务添加到阻塞队列失败// 构建一个非核心工作线程// 如果添加非核心工作线程成功直接完事告辞else if (!addWorker(command, false))// 添加失败执行决绝策略reject(command); }execute方法的完整执行流程图 3.3.4 ThreadPoolExecutor的addWorker方法 addWorker中主要分成两大块去看 第一块校验线程池的状态以及工作线程个数第二块添加工作线程并且启动工作线程 校验线程池的状态以及工作线程个数 // 添加工作线程之校验源码 private boolean addWorker(Runnable firstTask, boolean core) {// 外层for循环在校验线程池的状态// 内层for循环是在校验工作线程的个数// retry是给外层for循环添加一个标记是为了方便在内层for循坏跳出外层for循环retry:for (;;) {// 获取ctlint c ctl.get();// 拿到ctl的高3位的值int rs runStateOf(c); //线程池状态判断// 如果线程池状态是SHUTDOWN并且此时阻塞队列有任务工作线程个数为0添加一个工作线程去处理阻塞队列的任务// 判断线程池的状态是否大于等于SHUTDOWN如果满足说明线程池不是RUNNINGif (rs SHUTDOWN // 如果这三个条件都满足就代表是要添加非核心工作线程去处理阻塞队列任务// 如果三个条件有一个没满足返回false配合!就代表不需要添加!(rs SHUTDOWN firstTask null ! workQueue.isEmpty()))// 不需要添加工作线程return false;for (;;) { //工作线程个数判断 // 基于ctl拿到低29位的值代表当前工作线程个数 int wc workerCountOf(c);// 如果工作线程个数大于最大值了不可以添加了返回falseif (wc CAPACITY ||// 基于core来判断添加的是否是核心工作线程// 如果是核心基于corePoolSize去判断// 如果是非核心基于maximumPoolSize去判断wc (core ? corePoolSize : maximumPoolSize))// 代表不能添加工作线程个数不满足要求return false;// 针对ctl进行 1采用CAS的方式if (compareAndIncrementWorkerCount(c))// CAS成功后直接退出外层循环代表可以执行添加工作线程操作了。break retry;// 重新获取一次ctl的值c ctl.get(); // 判断重新获取到的ctl中表示的线程池状态跟之前的是否有区别// 如果状态不一样说明有变化重新的去判断线程池状态if (runStateOf(c) ! rs)// 跳出一次外层for循环continue retry;}}// 省略添加工作线程以及启动的过程 }添加工作线程并且启动工作线程 private boolean addWorker(Runnable firstTask, boolean core) {// 省略校验部分的代码// 添加工作线程以及启动工作线程~~~// 声明了三个变量// 工作线程启动了没默认falseboolean workerStarted false;// 工作线程添加了没默认falseboolean workerAdded false;// 工作线程默认为nullWorker w null;try {// 构建工作线程并且将任务传递进去w new Worker(firstTask);// 获取了Worker中的Thread对象final Thread t w.thread;// 判断Thread是否不为null在new Worker时内部会通过给予的ThreadFactory去构建Thread交给Worker// 一般如果为null代表ThreadFactory有问题。if (t ! null) {// 加锁保证使用workers成员变量以及对largestPoolSize赋值时保证线程安全final ReentrantLock mainLock this.mainLock;mainLock.lock();try {// 再次获取线程池状态。int rs runStateOf(ctl.get());// 再次判断// 如果满足 rs SHUTDOWN 说明线程池是RUNNING状态正常执行if代码块// 如果线程池状态为SHUTDOWN并且firstTask为null添加非核心工作处理阻塞队列任务if (rs SHUTDOWN ||(rs SHUTDOWN firstTask null)) {// 到这可以添加工作线程。// 校验ThreadFactory构建线程后不能自己启动线程如果启动了抛出异常if (t.isAlive()) throw new IllegalThreadStateException();// private final HashSetWorker workers new HashSetWorker();// 将new好的Worker添加到HashSet中。workers.add(w);// 获取了HashSet的size拿到工作线程个数int s workers.size();// largestPoolSize在记录最大线程个数的记录// 如果当前工作线程个数大于最大线程个数的记录就赋值if (s largestPoolSize)largestPoolSize s;// 添加工作线程成功workerAdded true;}} finally {mainLock.unlock();}// 如果工作线程添加成功if (workerAdded) {// 直接启动Worker中的线程t.start();// 启动工作线程成功workerStarted true;}}} finally {// 做补偿的操作如果工作线程启动失败将这个添加失败的工作线程处理掉if (!workerStarted)addWorkerFailed(w);}// 返回工作线程是否启动成功return workerStarted; } // 工作线程启动失败需要不的步长操作 private void addWorkerFailed(Worker w) {// 因为操作了workers需要加锁final ReentrantLock mainLock this.mainLock;mainLock.lock();try {// 如果w不为null之前Worker已经new出来了。if (w ! null)// 从HashSet中移除workers.remove(w);// 同时对ctl进行 - 1代表去掉了一个工作线程个数decrementWorkerCount();// 因为工作线程启动失败判断一下状态的问题是不是可以走TIDYING状态最终到TERMINATED状态了。tryTerminate();} finally {// 释放锁mainLock.unlock();} }3.3.5 ThreadPoolExecutor的Worker工作线程 Worker对象主要包含了两个内容 工作线程要执行任务工作线程可能会被中断控制中断 // Worker继承了AQS目的就是为了控制工作线程的中断。 // Worker实现了Runnable内部的Thread对象在执行start时必然要执行Worker中断额一些操作 private final class Worker extends AbstractQueuedSynchronizer implements Runnable{// Worker管理任务 // 线程工厂构建的线程final Thread thread;// 当前Worker要执行的任务Runnable firstTask;// 记录当前工作线程处理了多少个任务。volatile long completedTasks;// 有参构造Worker(Runnable firstTask) {// 将State设置为-1代表当前不允许中断线程setState(-1); // 任务赋值this.firstTask firstTask;// 基于线程工作构建Thread并且传入的Runnable是Workerthis.thread getThreadFactory().newThread(this);}// 当thread执行start方法时调用的是Worker的run方法public void run() {// 任务执行时执行的是runWorker方法runWorker(this);}// Worker管理中断 // 当前方法是中断工作线程时执行的方法void interruptIfStarted() {Thread t;// 只有Worker中的state 0的时候可以中断工作线程if (getState() 0 (t thread) ! null !t.isInterrupted()) {try {// 如果状态正常并且线程未中断这边就中断线程t.interrupt();} catch (SecurityException ignore) {}}}protected boolean isHeldExclusively() {return getState() ! 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }}3.3.6 ThreadPoolExecutor的runWorker方法 runWorker就是让工作线程拿到任务去执行即可。 并且在内部也处理了在工作线程正常结束和异常结束时的处理方案 // 工作线程启动后执行的任务。 final void runWorker(Worker w) {// 拿到当前线程Thread wt Thread.currentThread();// 从worker对象中拿到任务Runnable task w.firstTask;// 将Worker中的firstTask置位空w.firstTask null;// 将Worker中的state置位0代表当前线程可以中断的w.unlock(); // allow interrupts// 判断工作线程是否是异常结束默认就是异常结束boolean completedAbruptly true;try {// 获取任务// 直接拿到第一个任务去执行// 如果第一个任务为null去阻塞队列中获取任务while (task ! null || (task getTask()) ! null) {// 执行了Worker的lock方法当前在lock时shutdown操作不能中断当前线程因为当前线程正在处理任务w.lock();// 比较ctl STOP,如果满足找个状态说明线程池已经到了STOP状态甚至已经要凉凉了// 线程池到STOP状态并且当前线程还没有中断确保线程是中断的进到if内部执行中断方法// if(runStateAtLeast(ctl.get(), STOP) !wt.isInterrupted()) {中断线程}// 如果线程池状态不是STOP确保线程不是中断的。// 如果发现线程中断标记位是true了再次查看线程池状态是大于STOP了再次中断线程// 这里其实就是做了一个事情如果线程池状态 STOP确保线程中断了。if ((runStateAtLeast(ctl.get(), STOP) || ( Thread.interrupted() runStateAtLeast(ctl.get(), STOP) )) !wt.isInterrupted())wt.interrupt();try {// 勾子函数在线程池中没有做任何的实现如果需要在线程池执行任务前后做一些额外的处理可以重写勾子函数// 前置勾子函数beforeExecute(wt, task);Throwable thrown null;try {// 执行任务。task.run();} catch (RuntimeException x) {thrown x; throw x;} catch (Error x) {thrown x; throw x;} catch (Throwable x) {thrown x; throw new Error(x);} finally {// 前后置勾子函数afterExecute(task, thrown);}} finally {// 任务执行完丢掉任务task null;// 当前工作线程处理的任务数1w.completedTasks;// 执行unlock方法此时shutdown方法才可以中断当前线程w.unlock();}}// 如果while循环结束正常走到这说明是正常结束// 正常结束的话在getTask中就会做一个额外的处理将ctl - 1代表工作线程没一个。completedAbruptly false;} finally {// 考虑干掉工作线程processWorkerExit(w, completedAbruptly);} } // 工作线程结束前要执行当前方法 private void processWorkerExit(Worker w, boolean completedAbruptly) {// 如果是异常结束if (completedAbruptly) // 将ctl - 1扣掉一个工作线程decrementWorkerCount();// 操作Worker为了线程安全加锁final ReentrantLock mainLock this.mainLock;mainLock.lock();try {// 当前工作线程处理的任务个数累加到线程池处理任务的个数属性中completedTaskCount w.completedTasks;// 将工作线程从hashSet中移除workers.remove(w);} finally {// 释放锁mainLock.unlock();}// 只要工作线程凉了查看是不是线程池状态改变了。tryTerminate();// 获取ctlint c ctl.get();// 判断线程池状态当前线程池要么是RUNNING要么是SHUTDOWNif (runStateLessThan(c, STOP)) {// 如果正常结束工作线程if (!completedAbruptly) {// 如果核心线程允许超时min 0否则就是核心线程个数int min allowCoreThreadTimeOut ? 0 : corePoolSize;// 如果min 0可能会出现没有工作线程并且阻塞队列有任务没有线程处理if (min 0 ! workQueue.isEmpty())// 至少要有一个工作线程处理阻塞队列任务min 1;// 如果工作线程个数 大于等于1不怕没线程处理正常returnif (workerCountOf(c) min)return; }// 异常结束为了避免出现问题添加一个空任务的非核心线程来填补上刚刚异常结束的工作线程addWorker(null, false);} }3.3.7 ThreadPoolExecutor的getTask方法 工作线程在去阻塞队列获取任务前要先查看线程池状态 如果状态没问题去阻塞队列take或者是poll任务 第二个循环时不但要判断线程池状态还要判断当前工作线程是否可以被干掉 // 当前方法就在阻塞队列中获取任务 // 前面半部分是判断当前工作线程是否可以返回null结束。 // 后半部分就是从阻塞队列中拿任务 private Runnable getTask() {// timeOut默认值是false。boolean timedOut false; // 死循环for (;;) {// 拿到ctlint c ctl.get();// 拿到线程池的状态int rs runStateOf(c);// 如果线程池状态是STOP没有必要处理阻塞队列任务直接返回null// 如果线程池状态是SHUTDOWN并且阻塞队列是空的直接返回nullif (rs SHUTDOWN (rs STOP || workQueue.isEmpty())) {// 如果可以返回null先扣减工作线程个数decrementWorkerCount();// 返回null结束runWorker的while循环return null;}// 基于ctl拿到工作线程个数int wc workerCountOf(c);// 核心线程允许超时timed为true// 工作线程个数大于核心线程数timed为trueboolean timed allowCoreThreadTimeOut || wc corePoolSize;if (// 如果工作线程个数大于最大线程数。一般情况不会满足把他看成false// 第二个判断代表只要工作线程数小于等于核心线程数必然为false// 即便工作线程个数大于核心线程数了此时第一次循环也不会为true因为timedOut默认值是false// 考虑第二次循环了因为循环内部必然有修改timeOut的位置(wc maximumPoolSize || (timed timedOut)) // 要么工作线程还有要么阻塞队列为空并且满足上述条件后工作线程才会走到if内部结束工作线程(wc 1 || workQueue.isEmpty())) {// 第二次循环才有可能到这。// 正常结束工作线程 - 1因为是CAS操作如果失败了重新走for循环if (compareAndDecrementWorkerCount(c))return null;continue;}// 工作线程从阻塞队列拿任务try {// 如果是核心线程timed是false如果是非核心线程timed就是trueRunnable r timed ?// 如果是非核心走poll方法拿任务等待一会workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :// 如果是核心走take方法死等。workQueue.take();// 从阻塞队列拿到的任务不为null这边就正常返回任务去执行if (r ! null)return r;// 说明当前线程没拿到任务将timeOut设置为true在上面就可以返回null退出了。timedOut true;} catch (InterruptedException retry) {timedOut false;}} }3.3.8 ThreadPoolExecutor的关闭方法 首先查看shutdownNow方法可以从RUNNING状态转变为STOP // shutDownNow方法shutdownNow不会处理阻塞队列的任务将任务全部给你返回了。 public ListRunnable shutdownNow() {// 声明返回结果ListRunnable tasks;// 加锁final ReentrantLock mainLock this.mainLock;mainLock.lock();try {// 不关注这个方法……checkShutdownAccess();// 将线程池状态修改为STOPadvanceRunState(STOP);// 无论怎么直接中断工作线程。interruptWorkers();// 将阻塞队列的任务全部扔到List集合中。tasks drainQueue();} finally {// 释放锁mainLock.unlock();}tryTerminate();return tasks; }// 将线程池状态修改为STOP private void advanceRunState(int STOP) {// 死循环。for (;;) {// 获取ctl属性的值int c ctl.get();// 第一个判断如果当前线程池状态已经大于等于STOP了不管了告辞。if (runStateAtLeast(c, STOP) ||// 基于CAS将ctl从c修改为STOP状态不修改工作线程个数但是状态变为了STOP// 如果修改成功结束ctl.compareAndSet(c, ctlOf(STOP, workerCountOf(c))))break;} } // 无论怎么直接中断工作线程。 private void interruptWorkers() {final ReentrantLock mainLock this.mainLock;mainLock.lock();try {// 遍历HashSet拿到所有的工作线程直接中断。for (Worker w : workers)w.interruptIfStarted();} finally {mainLock.unlock();} } // 移除阻塞队列内容全部扔到List集合中 private ListRunnable drainQueue() {BlockingQueueRunnable q workQueue;ArrayListRunnable taskList new ArrayListRunnable();// 阻塞队列自带的直接清空阻塞队列内容扔到List集合q.drainTo(taskList);// 为了避免任务丢失重新判断是否需要编辑阻塞队列重新扔到Listif (!q.isEmpty()) {for (Runnable r : q.toArray(new Runnable[0])) {if (q.remove(r))taskList.add(r);}}return taskList; }// 查看当前线程池是否可以变为TERMINATED状态 final void tryTerminate() {// 死循环。for (;;) {// 拿到ctlint c ctl.get();// 如果是RUNNING直接告辞。// 如果状态已经大于等于TIDYING马上就要凉凉直接告辞。// 如果状态是SHUTDOWN但是阻塞队列还有任务直接告辞。if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) SHUTDOWN ! workQueue.isEmpty()))return;// 如果还有工作线程if (workerCountOf(c) ! 0) { // 再次中断工作线程interruptIdleWorkers(ONLY_ONE);// 告辞等你工作线程全完事我这再尝试进入到TERMINATED状态return;}// 加锁为了可以执行Condition的释放操作final ReentrantLock mainLock this.mainLock;mainLock.lock();try {// 将线程池状态修改为TIDYING状态如果成功继续往下走if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {// 这个方法是空的如果你需要在线程池关闭后做一些额外操作这里你可以自行实现terminated();} finally {// 最终修改为TERMINATED状态ctl.set(ctlOf(TERMINATED, 0));// 线程池提供了一个方法主线程在提交任务到线程池后是可以继续做其他操作的。// 咱们也可以让主线程提交任务后等待线程池处理完毕再做后续操作// 这里线程池凉凉后要唤醒哪些调用了awaitTermination方法的线程termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS} }再次shutdown方法可以从RUNNING状态转变为SHUTDOWN shutdown状态下不会中断正在干活的线程而且会处理阻塞队列中的任务 public void shutdown() {// 加锁。。final ReentrantLock mainLock this.mainLock;mainLock.lock();try {// 不看。checkShutdownAccess();// 里面是一个死循环将线程池状态修改为SHUTDOWNadvanceRunState(SHUTDOWN);// 中断空闲线程interruptIdleWorkers();// 说了这个是为了ScheduleThreadPoolExecutor准备的不管onShutdown(); } finally {mainLock.unlock();}// 尝试结束线程tryTerminate(); }// 中断空闲线程 private void interruptIdleWorkers(boolean onlyOne) {// 加锁final ReentrantLock mainLock this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t w.thread;// 如果线程没有中断那么就去获取Worker的锁基于tryLock可知不会中断正在干活的线程if (!t.isInterrupted() w.tryLock()) {try {// 会中断空闲线程t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();} }3.4 线程池的核心参数设计规则 线程池的使用难度不大难度在于线程池的参数并不好配置。 主要难点在于任务类型无法控制比如任务有CPU密集型还有IO密集型甚至还有混合型的。 因为IO咱们无法直接控制所以很多时间按照一些书上提供的一些方法是无法解决问题的。 《Java并发编程实践》 想调试出一个符合当前任务情况的核心参数最好的方式就是测试。 需要将项目部署到测试环境或者是沙箱环境中结果各种压测得到一个相对符合的参数。 如果每次修改项目都需要重新部署成本太高了。 此时咱们可以实现一个动态监控以及修改线程池的方案。 因为线程池的核心参数无非就是 corePoolSize核心线程数maximumPoolSize最大线程数workQueue工作队列 线程池中提供了获取核心信息的get方法同时也提供了动态修改核心属性的set方法。 也可以采用一些开源项目提供的方式去做监控和修改 比如hippo4j就可以对线程池进行监控而且可以和SpringBoot整合。 Github地址https://github.com/opengoofy/hippo4j 官方文档https://hippo4j.cn/docs/user_docs/intro 3.5 线程池处理任务的核心流程 基于addWorker添加工作线程的流程切入到整体处理任务的位置 四、ScheduleThreadPoolExecutor应用源码 4.1 ScheduleThreadPoolExecutor介绍 从名字上就可以看出当前线程池是用于执行定时任务的线程池。 Java比较早的定时任务工具是Timer类。但是Timer问题很多串行的不靠谱会影响到其他的任务执行。 其实除了Timer以及ScheduleThreadPoolExecutor之外正常在企业中一般会采用Quartz或者是SpringBoot提供的Schedule的方式去实现定时任务的功能。 ScheduleThreadPoolExecutor支持延迟执行以及周期性执行的功能。 4.2 ScheduleThreadPoolExecutor应用 定时任务线程池的有参构造 public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory, handler); }发现ScheduleThreadPoolExecutor在构建时直接调用了父类的构造方法 ScheduleThreadPoolExecutor的父类就是ThreadPoolExecutor 首先ScheduleThreadPoolExecutor最多允许设置3个参数 核心线程数线程工厂拒绝策略 首先没有设置阻塞队列以及最大线程数和空闲时间以及单位 阻塞队列设置的是DelayedWorkQueue其实本质就是DelayQueue一个延迟队列。DelayQueue是一个无界队列。所以最大线程数以及非核心线程的空闲时间是不需要设置的。 代码落地使用 public static void main(String[] args) {//1. 构建定时任务线程池ScheduledThreadPoolExecutor pool new ScheduledThreadPoolExecutor(5,new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {Thread t new Thread(r);return t;}},new ThreadPoolExecutor.AbortPolicy());//2. 应用ScheduledThreadPoolExecutor// 跟直接执行线程池的execute没啥区别pool.execute(() - {System.out.println(execute);});// 指定延迟时间执行System.out.println(System.currentTimeMillis());pool.schedule(() - {System.out.println(schedule);System.out.println(System.currentTimeMillis());},2, TimeUnit.SECONDS);// 指定第一次的延迟时间并且确认后期的周期执行时间周期时间是在任务开始时就计算// 周期性执行就是将执行完毕的任务再次社会好延迟时间并且重新扔到阻塞队列// 计算的周期执行也是在原有的时间上做累加不关注任务的执行时长。System.out.println(System.currentTimeMillis());pool.scheduleAtFixedRate(() - {System.out.println(scheduleAtFixedRate);System.out.println(System.currentTimeMillis());},2,3,TimeUnit.SECONDS);// // 指定第一次的延迟时间并且确认后期的周期执行时间周期时间是在任务结束后再计算下次的延迟时间System.out.println(System.currentTimeMillis());pool.scheduleWithFixedDelay(() - {System.out.println(scheduleWithFixedDelay);System.out.println(System.currentTimeMillis());try {Thread.sleep(4000);} catch (InterruptedException e) {e.printStackTrace();}},2,3,TimeUnit.SECONDS);}4.3 ScheduleThreadPoolExecutor源码剖析 4.3.1 核心属性 后面的方法业务流程会涉及到这些属性。 // 这里是针对任务取消时的一些业务判断会用到的标记 private volatile boolean continueExistingPeriodicTasksAfterShutdown; private volatile boolean executeExistingDelayedTasksAfterShutdown true; private volatile boolean removeOnCancel false;// 计数器如果两个任务的执行时间节点一模一样根据这个序列来判断谁先执行 private static final AtomicLong sequencer new AtomicLong();// 这个方法是获取当前系统时间的毫秒值 final long now() {return System.nanoTime(); }// 内部类。核心类之一。 private class ScheduledFutureTaskVextends FutureTaskV implements RunnableScheduledFutureV {// 全局唯一的序列如果两个任务时间一直基于当前属性判断private final long sequenceNumber;// 任务执行的时间单位纳秒private long time;/*** period 0执行一次的延迟任务* period 0代表是At* period 0代表是With*/private final long period;// 周期性执行时需要将任务重新扔回阻塞队列基础当前属性拿到任务方便扔回阻塞队列RunnableScheduledFutureV outerTask this;/*** 构建schedule方法的任务*/ScheduledFutureTask(Runnable r, V result, long ns) {super(r, result);this.time ns;this.period 0;this.sequenceNumber sequencer.getAndIncrement();}/*** 构建At和With任务的有参构造*/ ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time ns;this.period period;this.sequenceNumber sequencer.getAndIncrement();}} // 内部类。核心类之一。 static class DelayedWorkQueue extends AbstractQueueRunnable implements BlockingQueueRunnable { // 这个类就是DelayQueue不用过分关注如果没看过看阻塞队列中的优先级队列和延迟队列 4.3.2 schedule方法 execute方法也是调用的schedule方法只不过传入的延迟时间是0纳秒 schedule方法就是将任务和延迟时间封装到一起并且将任务扔到阻塞队列中再去创建工作线程去take阻塞队列。 // 延迟任务执行的方法。 // command任务 // delay延迟时间 // unit延迟时间的单位 public ScheduledFuture? schedule(Runnable command, long delay, TimeUnit unit) {// 健壮性校验。if (command null || unit null)throw new NullPointerException();// 将任务和延迟时间封装到一起最终组成ScheduledFutureTask// 要分成三个方法去看// triggerTime计算延迟时间。最终返回的是当前系统时间 延迟时间 // triggerTime就是将延迟时间转换为纳秒并且当前系统时间再做一些健壮性校验// ScheduledFutureTask有参构造将任务以及延迟时间封装到一起并且设置任务执行的方式// decorateTask当前方式是让用户基于自身情况可以动态修改任务的一个扩展口RunnableScheduledFuture? t decorateTask(command, new ScheduledFutureTaskVoid(command, null,triggerTime(delay, unit)));// 任务封装好执行delayedExecute方法去执行任务delayedExecute(t);// 返回FutureTaskreturn t; }// triggerTime做的事情 // 外部方法对延迟时间做校验如果小于0就直接设置为0 // 并且转换为纳秒单位 private long triggerTime(long delay, TimeUnit unit) {return triggerTime(unit.toNanos((delay 0) ? 0 : delay)); } // 将延迟时间当前系统时间 // 后面的校验是为了避免延迟时间超过Long的取值范围 long triggerTime(long delay) {return now() ((delay (Long.MAX_VALUE 1)) ? delay : overflowFree(delay)); }// ScheduledFutureTask有参构造 ScheduledFutureTask(Runnable r, V result, long ns) {super(r, result);// time就是任务要执行的时间this.time ns;// period,为0代表任务是延迟执行不是周期执行this.period 0;// 基于AtmoicLong生成的序列this.sequenceNumber sequencer.getAndIncrement(); }// delayedExecute 执行延迟任务的操作 private void delayedExecute(RunnableScheduledFuture? task) {// 查看当前线程池是否还是RUNNING状态如果不是RUNNING进到ifif (isShutdown())// 不是RUNNING。// 执行拒绝策略。reject(task);else {// 线程池状态是RUNNING// 直接让任务扔到延迟的阻塞队列中super.getQueue().add(task);// DCL的操作再次查看线程池状态// 如果线程池在添加任务到阻塞队列后状态不是RUNNINGif (isShutdown() // task.isPeriodic()现在反回的是false因为任务是延迟执行不是周期执行// 默认情况延迟队列中的延迟任务可以执行!canRunInCurrentRunState(task.isPeriodic()) // 从阻塞队列中移除任务。remove(task))task.cancel(false);else// 线程池状态正常任务可以执行ensurePrestart();} }// 线程池状态不为RUNNING查看任务是否可以执行 // 延迟执行periodicfalse // 周期执行periodictrue // continueExistingPeriodicTasksAfterShutdown周期执行任务默认为false // executeExistingDelayedTasksAfterShutdown延迟执行任务默认为true boolean canRunInCurrentRunState(boolean periodic) {return isRunningOrShutdown(periodic ?continueExistingPeriodicTasksAfterShutdown :executeExistingDelayedTasksAfterShutdown); } // 当前情况shutdownOK为true final boolean isRunningOrShutdown(boolean shutdownOK) {int rs runStateOf(ctl.get());// 如果状态是RUNNING正常可以执行返回true// 如果状态是SHUTDOWN根据shutdownOK来决定return rs RUNNING || (rs SHUTDOWN shutdownOK); }// 任务可以正常执行后做的操作 void ensurePrestart() {// 拿到工作线程个数int wc workerCountOf(ctl.get());// 如果工作线程个数小于核心线程数if (wc corePoolSize)// 添加核心线程去处理阻塞队列中的任务addWorker(null, true);else if (wc 0)// 如果工作线程数为0核心线程数也为0这是添加一个非核心线程去处理阻塞队列任务addWorker(null, false); }4.3.3 At和With方法任务的run方法 这两个方法在源码层面上的第一个区别就是在计算周期时间时需要将这个值传递给period基于正负数在区别At和With 所以查看一个方法就ok查看At方法 // At方法 // command任务 // initialDelay第一次执行的延迟时间 // period任务的周期执行时间 // unit上面两个时间的单位 public ScheduledFuture? scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {// 健壮性校验if (command null || unit null)throw new NullPointerException();// 周期时间不能小于等于0.if (period 0)throw new IllegalArgumentException();// 将任务以及第一次的延迟时间和后续的周期时间封装好。ScheduledFutureTaskVoid sft new ScheduledFutureTaskVoid(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));// 扩展口可以对任务做修改。RunnableScheduledFutureVoid t decorateTask(command, sft);// 周期性任务需要在任务执行完毕后重新扔会到阻塞队列为了方便拿任务将任务设置到outerTask成员变量中sft.outerTask t;// 和schedule方法一样的方式// 如果任务刚刚扔到阻塞队列线程池状态变为SHUTDOWN默认情况当前任务不执行delayedExecute(t);return t; }// 延迟任务以及周期任务在执行时都会调用当前任务的run方法。 public void run() {// periodic false一次性延迟任务// periodic true周期任务boolean periodic isPeriodic();// 任务执行前会再次判断状态能否执行任务if (!canRunInCurrentRunState(periodic))cancel(false);// 判断是周期执行还是一次性任务else if (!periodic)// 一次性任务让工作线程直接执行command的逻辑ScheduledFutureTask.super.run();// 到这个else if说明任务是周期执行else if (ScheduledFutureTask.super.runAndReset()) {// 设置下次任务执行的时间setNextRunTime();// 将任务重新扔回线程池做处理reExecutePeriodic(outerTask);} } // 设置下次任务执行的时间 private void setNextRunTime() {// 拿到period值正数At负数Withlong p period;if (p 0)// 拿着之前的执行时间直接追加上周期时间time p;else// 如果走到else代表任务是With方式这种方式要重新计算延迟时间// 拿到当前系统时间追加上延迟时间time triggerTime(-p); }// 将任务重新扔回线程池做处理 void reExecutePeriodic(RunnableScheduledFuture? task) {// 如果状态ok可以执行if (canRunInCurrentRunState(true)) {// 将任务扔到延迟队列super.getQueue().add(task);// DCL判断线程池状态if (!canRunInCurrentRunState(true) remove(task))task.cancel(false);else// 添加工作线程ensurePrestart();} }
http://www.hkea.cn/news/14338239/

相关文章:

  • 中国城乡住建部建设部网站沈阳企业黄页免费
  • 仙居谁认识做网站的佛山做网站建设公司
  • 医美网站建设学做网站视频论坛
  • 淘宝流量网站城阳 网站建设
  • 电子网站建设设计wordpress4.7.3漏洞
  • 六安网站价格深圳做网站哪家
  • 地方门户网站模版网站建设和网站优化哪个重要
  • 深圳网站的公司代码网站有哪些
  • 昊杰南宫网站建设企业网站必备模块
  • phpcms v9 网站建设设计制作网络科技模板杭州行业网页设计公司
  • 北京做网站公司排界面做的最好的网站
  • 怎样搭建个人网站北京传媒公司
  • 莞城仿做网站网站模板套用湖南岚鸿
  • 企业网站改版新闻如何让百度收录自己的网站信息
  • 什么是交互式网站开发南昌网站设计哪个最好
  • 网站建设多少钱 小江网页设计德州网站建设哪一家好
  • 北海市建设局官方网站苏州网页关键词优化
  • 中文网站建设英文网站备案多少钱
  • 网站平台开发公司你了解网站建设吗 软文案例
  • 淮南网站建设公司为网站的特色功能设计各种模板
  • 网站怎么显示被kwordpress VIP账号插件
  • 网站建设合同的注意事项株洲网站建设公司
  • WordPress搭建手机网站简历免费模板可编辑word
  • 微小店网站建设平台wordpress 英文站赚钱
  • tp框架做展示网站网站怎么做才能得到更好的优化
  • 网站开发过程模型个人怎么找猎头公司推荐自己
  • 企业网站用织梦好吗西安有啥好玩的地方
  • 网站开发费用如何入账c#购物网站开发流程
  • 用什么IE打开建设银行网站给实体店老板做的网站
  • linux 做网站用哪个版本飞猪旅游的网站建设