当前位置: 首页 > news >正文

emeinet亿玫网站建设网站建设课程设计报告

emeinet亿玫网站建设,网站建设课程设计报告,seo云优化,代理公司注册费用点击查看脑图目录地址,实时更新 1 什么是 JUC 1.1 JUC 简介 在 Java 中#xff0c;线程部分是一个重点#xff0c;本篇文章说的 JUC 也是关于线程的。JUC 就是 java.util .concurrent 工具包的简称。这是一个处理线程的工具包#xff0c;JDK 1.5 开始出现的。 1.2 进程与… 点击查看脑图目录地址,实时更新 1 什么是 JUC 1.1 JUC 简介 在 Java 中线程部分是一个重点本篇文章说的 JUC 也是关于线程的。JUC 就是 java.util .concurrent 工具包的简称。这是一个处理线程的工具包JDK 1.5 开始出现的。 1.2 进程与线程 进程Process 是计算机中的程序关于某数据集合上的一次运行活动是系统进行资源分配和调度的基本单位是操作系统结构的基础。 在当代面向线程设计的计算机结构中进程是线程的容器。程序是指令、数据及其组织形式的描述进程是程序的实体。是计算机中的程序关于某数据集合上的一次运行活动是系统进行资源分配和调度的基本单位是操作系统结构的基础。程序是指令、数据及其组织形式的描述进程是程序的实体。 线程thread 是操作系统能够进行运算调度的最小单位。它被包含在进程之中是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流 一个进程中可以并发多个线程每条线程并行执行不同的任务。 总结来说: 进程指在系统中正在运行的一个应用程序程序一旦运行就是进程进程— —资源分配的最小单位。 线程系统分配处理器时间资源的基本单元或者说进程之内独立执行的一个单元执行流。线程——程序执行的最小单位。 1.3 线程的状态 1.3.1 线程状态枚举类 Thread.State public static enum State {NEW,RUNNABLE,BLOCKED,WAITING,TIMED_WAITING,TERMINATED;}Java 线程在运行的生命周期中的指定时刻只可能处于下面 6 种不同状态的其中一个状态 NEW: 初始状态线程被创建出来但没有被调用 start() 。 RUNNABLE: 运行状态线程被调用了 start()等待运行的状态。 BLOCKED 阻塞状态需要等待锁释放。 WAITING等待状态表示该线程需要等待其他线程做出一些特定动作通知或中断。 TIME_WAITING超时等待状态可以在指定的时间后自行返回而不是像 WAITING 那样一直等待。 TERMINATED终止状态表示该线程已经运行完毕。 由上图可以看出线程创建之后它将处于 NEW新建 状态调用 start() 方法后开始运行线程这时候处于 READY可运行 状态。可运行状态的线程获得了 CPU 时间片timeslice后就处于 RUNNING运行 状态。 在操作系统层面线程有 READY 和 RUNNING 状态而在 JVM 层面只能看到 RUNNABLE 状态所以 Java 系统一般将这两个状态统称为 RUNNABLE运行中 状态 。 为什么 JVM 没有区分这两种状态呢 现在的时分time-sharing多任务multi-task操作系统架构通常都是用所谓的“时间分片time quantum or time slice”方式进行抢占式preemptive轮转调度round-robin 式。这个时间分片通常是很小的一个线程一次最多只能在 CPU 上运行比如 10-20ms 的时间此时处于 running 状态也即大概只有 0.01 秒这一量级时间片用后就要被切换下来放入调度队列的末尾等待再次调度。也即回到 ready 状态。线程切换的如此之快区分这两种状态就没什么意义了。 当线程执行 wait()方法之后线程进入 WAITING等待 状态。进入等待状态的线程需要依靠其他线程的通知才能够返回到运行状态。 TIMED_WAITING(超时等待) 状态相当于在等待状态的基础上增加了超时限制比如通过 sleeplong millis方法或 waitlong millis方法可以将线程置于 TIMED_WAITING 状态。当超时时间结束后线程将会返回到 RUNNABLE 状态。 当线程进入 synchronized 方法/块或者调用 wait 后被 notify重新进入 synchronized 方法/块但是锁被其它线程占有这个时候线程就会进入 BLOCKED阻塞 状态。 线程在执行完了 run()方法之后将会进入到 TERMINATED终止 状态。 参考 线程的几种状态你真的了解么 1.3.2 wait/sleep 的区别 sleep() 方法没有释放锁而 wait() 方法释放了锁 。wait() 通常被用于线程间交互/通信sleep()通常被用于暂停执行。wait() 方法被调用后线程不会自动苏醒需要别的线程调用同一个对象上的 notify()或者 notifyAll() 方法。sleep()方法执行完成后线程会自动苏醒或者也可以使用 wait(long timeout) 超时后线程会自动苏醒。sleep() 是 Thread 类的静态本地方法wait() 则是 Object 类的本地方法。为什么这样设计呢它们都可以被 interrupted 方法中断。 为什么 wait() 方法不定义在 Thread 中 wait() 是让获得对象锁的线程实现等待会自动释放当前线程占有的对象锁。每个对象Object都拥有对象锁既然要释放当前线程占有的对象锁并让其进入 WAITING 状态自然是要操作对应的对象Object而非当前的线程Thread。 类似的问题为什么 sleep() 方法定义在 Thread 中 因为 sleep() 是让当前线程暂停执行不涉及到对象类也不需要获得对象锁。 wait 详解java JUC 中 Object里wait()、notify() 实现原理及实战讲解 1.4 并发与并行 1.4.1 串行模式 串行表示所有任务都一一按先后顺序进行。串行意味着必须先装完一车柴才能 运送这车柴只有运送到了才能卸下这车柴并且只有完成了这整个三个步 骤才能进行下一个步骤。 串行是一次只能取得一个任务并执行这个任务。 1.4.2 并行模式 并行意味着可以同时取得多个任务并同时去执行所取得的这些任务。并行模 式相当于将长长的一条队列划分成了多条短队列所以并行缩短了任务队列的长度。并行的效率从代码层次上强依赖于多进程/多线程代码从硬件角度上 则依赖于多核 CPU。 1.4.3 并发 并发(concurrent)指的是多个程序可以同时运行的现象更细化的是多进程可 以同时运行或者多指令可以同时运行。但这不是重点在描述并发的时候也不 会去扣这种字眼是否精确并发的重点在于它是一种现象, 并发描述的是多进程同时运行的现象。但实际上对于单核心 CPU 来说同一时刻只能运行一个线程。所以这里的同时运行表示的不是真的同一时刻有多个线程运行的现象这是并行的概念而是提供一种功能让用户看来多个程序同 时运行起来了但实际上这些程序中的进程不是一直霸占 CPU 的而是执行一会停一会。 要解决大并发问题通常是将大任务分解成多个小任务, 由于操作系统对进程的调度是随机的所以切分成多个小任务后可能会从任一小任务处执行。这可能会出现一些现象 可能出现一个小任务执行了多次还没开始下个任务的情况。这时一般会采用队列或类似的数据结构来存放各个小任务的成果 可能出现还没准备好第一步就执行第二步的可能。这时一般采用多路复用或异步的方式比如只有准备好产生了事件通知才执行某个任务。 可以多进程/多线程的方式并行执行这些小任务。也可以单进程/单线程执行这 些小任务这时很可能要配合多路复用才能达到较高的效率 1.4.4 小结(重点) 并发两个及两个以上的作业在同一 时间段 内执行。单核CPU根据时间片快速切换线程并行两个及两个以上的作业在同一 时刻 执行。 最关键的点是是否是 同时/同一时刻 执行。 1.4.5 同步和异步的区别 同步 发出一个调用之后在没有得到结果之前 该调用就不可以返回一直等待。异步 调用在发出之后不用等待返回结果该调用直接返回。 1.5 管程 管程(monitor)是保证了同一时刻只有一个进程在管程内活动,即管程内定义的操作在同一时刻只被一个进程调用(由编译器实现)。但是这样并不能保证进程以设计的顺序执行。 JVM 中同步是基于进入和退出管程(monitor)对象实现的每个对象都会有一个管程 (monitor)对象管程(monitor)会随着 java 对象一同创建和销毁 执行线程首先要持有管程对象然后才能执行方法当方法完成之后会释放管程方法在执行时候会持有管程其他线程无法再获取同一个管程。 1.6 用户线程和守护线程 用户线程:平时用到的普通线程,自定义线程 守护线程:运行在后台,是一种特殊的线程,比如垃圾回收 通过thread.setDaemon(true)设置为守护线程,默认用户线程。调用 start之后 setDaemon 无效当主线程结束后,用户线程还在运行,JVM 存活 如果没有用户线程,都是守护线程,JVM 结束。守护线程不会保活进程 2. Lock 接口 2.1 Synchronized 2.1.1 Synchronized 关键字回顾 synchronized 是 Java 中的关键字是一种同步锁。它修饰的对象有以下几种 修饰一个代码块被修饰的代码块称为同步语句块其作用的范围是大括号{} 括起来的代码作用的对象是调用这个代码块的对象 修饰一个方法被修饰的方法称为同步方法其作用的范围是整个方法作用的对象是调用这个方法的对象 虽然可以使用 synchronized 来定义方法但 synchronized 并不属于方法定义的一部分因此synchronized 关键字不能被继承。如果在父类中的某个方法使用了 synchronized 关键字而在子类中覆盖了这个方法在子类中的这个方法默认情况下并不是同步的而必须显式地在子类的这个方法中加上synchronized 关键字才可以。当然还可以在子类方法中调用父类中相应的方法这样虽然子类中的方法不是同步的但子类调用了父类的同步方法因此子类的方法也就相当于同步了。 修改一个静态的方法其作用的范围是整个静态方法作用的对象是这个类的所有对象 修改一个类其作用的范围是 synchronized 后面括号括起来的部分作用主的对象是这个类的所有对象。 Synchronized 只锁对象对象头中保存着当前锁信息线程ID等。所有 Synchronized 只能锁对象或者类Class也是对象不能锁方法. Java中的每一个对象都可以作为锁 对于普通同步方法锁的是当前实例对象。 对于静态同步方法锁的是当前类的class对象。 对于同步方法块锁是Synchronized 括号里配置的对象 2.1.2 售票案例 class Ticket { //票数 private int number 30; //操作方法卖票 public synchronized void sale() { //判断是否有票 if(number 0) { System.out.println(Thread.currentThread().getName() : (number--) number); } } } 如果一个代码块被 synchronized 修饰了当一个线程获取了对应的锁并执行该代码块时其他线程便只能一直等待等待获取锁的线程释放锁而这里获取锁的线程释放锁只会有两种情况 1 获取锁的线程执行完了该代码块然后线程释放对锁的占有 2 线程执行发生异常此时 JVM 会让线程自动释放锁。 那么如果这个获取锁的线程由于要等待 IO 或者其他原因比如调用 sleep 方法被阻塞了但是又没有释放锁其他线程便只能干巴巴地等待试想一下这多么影响程序执行效率。 因此就需要有一种机制可以不让等待的线程一直无期限地等待下去比如只等待一定的时间或者能够响应中断通过 Lock 就可以办到。 2.2 什么是 Lock Lock 锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作。它们允许更灵活的结构可能具有非常不同的属性并且可能支持多个关联的条件对象。Lock 提供了比 synchronized 更多的功能。 2.2.1 Lock 接口 public interface Lock {void lock();void lockInterruptibly() throws InterruptedException;boolean tryLock();boolean tryLock(long time, TimeUnit unit) throws InterruptedException;void unlock();Condition newCondition(); }下面来逐个讲述 Lock 接口中每个方法的使用 2.2.2 lock lock()方法是平常使用得最多的一个方法就是用来获取锁。如果锁已被其他线程获取则进行等待。 采用 Lock必须主动去释放锁并且在发生异常时不会自动释放锁。因此一般来说使用 Lock 必须在 try{}catch{}块中进行并且将释放锁的操作放在 finally 块中进行以保证锁一定被被释放防止死锁的发生。通常使用 Lock 来进行同步的话是以下面这种形式去使用的 Lock lock ...; lock.lock(); try{//处理任务 }catch(Exception ex){ }finally{lock.unlock(); //释放锁 }2.2.3 newCondition 关键字 synchronized 与 wait()/notify()这两个方法一起使用可以实现等待/通知模式 Lock 锁的 newContition()方法返回 Condition 对象Condition 类 也可以实现等待/通知模式。 用 notify()通知时JVM 会随机唤醒某个等待的线程 使用 Condition 类可以进行选择性通知 Condition 比较常用的两个方法 await()会使当前线程等待,同时会释放锁,当其他线程调用signal()时,线程会重新获得锁并继续执行。signal()用于唤醒一个等待的线程。 注意在调用 Condition 的 await()/signal()方法前也需要线程持有相关 的 Lock 锁调用 await()后线程会释放这个锁在 singal()调用后会从当前 Condition 对象的等待队列中唤醒 一个线程唤醒的线程尝试获得锁 一旦获得锁成功就继续执行。 2.3 ReentrantLock ReentrantLock意思是“可重入锁”关于可重入锁的概念将在后面讲述。 ReentrantLock 是唯一实现了 Lock 接口的类并且 ReentrantLock 提供了更多的方法。下面通过一些实例看具体看一下如何使用。 public class Test {private ArrayListInteger arrayList new ArrayList();public static void main(String[] args) {final Test test new Test();new Thread() {public void run() {test.insert(Thread.currentThread());}}.start();new Thread() {public void run() {test.insert(Thread.currentThread());}}.start();}public void insert(Thread thread) {Lock lock new ReentrantLock(); //注意这个地方lock.lock();try {System.out.println(thread.getName() 得到了锁);for (int i 0; i 5; i) {arrayList.add(i);}} catch (Exception e) {// TODO: handle exception} finally {System.out.println(thread.getName() 释放了锁);lock.unlock();}} }2.4 ReadWriteLock ReadWriteLock 也是一个接口在它里面只定义了两个方法 public interface ReadWriteLock {/*** Returns the lock used for reading.** return the lock used for reading.*/Lock readLock();/*** Returns the lock used for writing.** return the lock used for writing.*/Lock writeLock(); }一个用来获取读锁一个用来获取写锁。也就是说将文件的读写操作分开分成 2 个锁来分配给线程从而使得多个线程可以同时进行读操作。下面的 ReentrantReadWriteLock 实现了 ReadWriteLock 接口。 下面通过几个例子来看一下 ReentrantReadWriteLock 具体用法。 假如有多个线程要同时进行读操作的话先看一下 synchronized 达到的效果 public class Test {private ReentrantReadWriteLock rwl newReentrantReadWriteLock();public static void main(String[] args) {final Test test new Test();new Thread() {public void run() {test.get(Thread.currentThread());}}.start();new Thread() {public void run() {test.get(Thread.currentThread());}}.start();}public synchronized void get(Thread thread) {long start System.currentTimeMillis();while (System.currentTimeMillis() - start 1) {System.out.println(thread.getName() 正在进行读操作);}System.out.println(thread.getName() 读操作完毕);} }而改成用读写锁的话 public class Test {private ReentrantReadWriteLock rwl newReentrantReadWriteLock();public static void main(String[] args) {final Test test new Test();new Thread() {public void run() {test.get(Thread.currentThread());};}.start();new Thread() {public void run() {test.get(Thread.currentThread());}}.start();}public void get(Thread thread) {rwl.readLock().lock();try {long start System.currentTimeMillis();while (System.currentTimeMillis() - start 1) {System.out.println(thread.getName() 正在进行读操作);}System.out.println(thread.getName() 读操作完毕);} finally {rwl.readLock().unlock();}} }说明 thread1 和 thread2 在同时进行读操作。这样就大大提升了读操作的效率。 注意: 如果有一个线程已经占用了读锁则此时其他线程如果要申请写锁则申请写锁的线程会一直等待释放读锁。 如果有一个线程已经占用了写锁则此时其他线程如果申请写锁或者读锁则 申请的线程会一直等待释放写锁。 2.5 小结(重点) Lock 和 synchronized 有以下几点不同 Lock 是一个接口而 synchronized 是 Java 中的关键字synchronized 是内 置的语言实现synchronized 在发生异常时会自动释放线程占有的锁因此不会导致死锁现象发生而 Lock 在发生异常时如果没有主动通过 unLock()去释放锁则很 可能造成死锁现象因此使用 Lock 时需要在 finally 块中释放锁Lock 可以让等待锁的线程响应中断而 synchronized 却不行使用 synchronized 时等待的线程会一直等待下去不能够响应中断通过 Lock 可以知道有没有成功获取锁而 synchronized 却无法办到。Lock 可以提高多个线程进行读操作的效率。synchronized底层采用的是objectMonitor,lock采用的AQS;synchronized只支持非公平锁,lock支持非公平锁和公平锁;synchronized使用了object类的wait和notify进行等待和唤醒, lock使用了condition接口进行等待和唤醒(await和signal); 在性能上来说如果竞争资源不激烈两者的性能是差不多的而当竞争资源 非常激烈时即有大量线程同时竞争此时 Lock 的性能要远远优于 synchronized。 3 线程间通信 线程间通信的模型有两种共享内存和消息传递以下方式都是基本这两种模型来实现的。我们来基本一道面试常见的题目来分析 场景一 两个线程一个线程对当前数值加 1另一个线程对当前数值减 1,要求用线程间通信 3.1 synchronized 方案 public class TestVolatile {/*** 交替加减* param args*/public static void main(String[] args){DemoClass demoClass new DemoClass();new Thread(() -{for (int i 0; i 5; i) {demoClass.increment();}}, 线程 A).start();new Thread(() -{for (int i 0; i 5; i) {demoClass.decrement();}}, 线程 B).start();} }class DemoClass{//加减对象private int number 0;/*** 加 1*/public synchronized void increment() {try {while (number ! 0){this.wait();}number;System.out.println(-------- Thread.currentThread().getName() 加一成功----------,值为: number);notifyAll();}catch (Exception e){e.printStackTrace();}}/*** 减一*/public synchronized void decrement(){try {while (number 0){this.wait();}number--;System.out.println(-------- Thread.currentThread().getName() 减一成功----------,值为: number);notifyAll();}catch (Exception e){e.printStackTrace();}} }3.2 Lock 方案 class DemoClass{//加减对象private int number 0;//声明锁private Lock lock new ReentrantLock();//声明钥匙private Condition condition lock.newCondition();/*** 加 1*/public void increment() {try {lock.lock();while (number ! 0){condition.await();}number;System.out.println(-------- Thread.currentThread().getName() 加一成功----------,值为: number);condition.signalAll();}catch (Exception e){e.printStackTrace();}finally {lock.unlock();}}/*** 减一*/public void decrement(){try {lock.lock();while (number 0){condition.await();}number--;System.out.println(-------- Thread.currentThread().getName() 减一成功----------,值为: number);condition.signalAll();}catch (Exception e){e.printStackTrace();}finally {lock.unlock();}} }4 线程间定制化通信 4.1 案例介绍 问题: A 线程打印 5 次 AB 线程打印 10 次 BC 线程打印 15 次 C,按照 此顺序循环 10 轮 4.2 实现流程 class DemoClass {//通信对象:0--打印 A 1---打印 B 2----打印 Cprivate int number 0;//声明锁private Lock lock new ReentrantLock();//声明钥匙 Aprivate Condition conditionA lock.newCondition();//声明钥匙 Bprivate Condition conditionB lock.newCondition();//声明钥匙 Cprivate Condition conditionC lock.newCondition();/*** A 打印 5 次*/public void printA(int j) {try {lock.lock();while (number ! 0) {conditionA.await();}System.out.println(Thread.currentThread().getName() 输出 A,第 j 轮开始);//输出 5 次 Afor (int i 0; i 5; i) {System.out.println(A);}//开始打印 Bnumber 1;//唤醒 BconditionB.signal();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}/*** B 打印 10 次*/public void printB(int j) {try {lock.lock();while (number ! 1) {conditionB.await();}System.out.println(Thread.currentThread().getName() 输出 B,第 j 轮开始);//输出 10 次 Bfor (int i 0; i 10; i) {System.out.println(B);}//开始打印 Cnumber 2;//唤醒 CconditionC.signal();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}/*** C 打印 15 次*/public void printC(int j) {try {lock.lock();while (number ! 2) {conditionC.await();}System.out.println(Thread.currentThread().getName() 输出 C,第 j 轮开始);//输出 15 次 Cfor (int i 0; i 15; i) {System.out.println(C);}System.out.println(-----------------------------------------);//开始打印 Anumber 0;//唤醒 AconditionA.signal();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}} }5 集合的线程安全 6.1 集合操作 NotSafeDemo public class NotSafeDemo {/*** 多个线程同时对集合进行修改** param args*/public static void main(String[] args) {List list new ArrayList();for (int i 0; i 100; i) {new Thread(() - {list.add(UUID.randomUUID().toString());System.out.println(list);}, 线程 i).start();}} }异常内容 java.util.ConcurrentModificationException 问题: 为什么会出现并发修改异常? /*** Appends the specified element to the end of this list.** param e element to be appended to this list* return tttrue/tt (as specified by {link Collection#add})*/public boolean add(E e) {ensureCapacityInternal(size 1); // Increments modCount!!elementData[size] e;return true;} 那么我们如何去解决 List 类型的线程安全问题? 5.2 Vector Vector 是矢量队列它是 JDK1.0 版本添加的类。继承于 AbstractList实现 了 List, RandomAccess, Cloneable 这些接口。 Vector 继承了 AbstractList 实现了 List所以它是一个队列支持相关的添加、删除、修改、遍历等功能。 Vector 实现了 RandmoAccess 接口即提供了随机访问功能。 RandmoAccess 是 java 中用来被 List 实现为 List 提供快速访问功能的。在 Vector 中我们即可以通过元素的序号快速获取元素对象这就是快速随机访 问。 Vector 实现了 Cloneable 接口即实现 clone()函数。它能被克隆。 和 ArrayList 不同Vector 中的操作是线程安全的。 NotSafeDemo 代码修改 public class NotSafeDemo {/*** 多个线程同时对集合进行修改** param args*/public static void main(String[] args) {List list new Vector();for (int i 0; i 100; i) {new Thread(() - {list.add(UUID.randomUUID().toString());System.out.println(list);}, 线程 i).start();}} }现在没有运行出现并发异常,为什么? 查看 Vector 的 add 方法 /*** Appends the specified element to the end of this Vector.** param e element to be appended to this Vector* return {code true} (as specified by {link Collection#add})* since 1.2*/public synchronized boolean add(E e) {modCount;ensureCapacityHelper(elementCount 1);elementData[elementCount] e;return true;}add 方法被 synchronized 同步修辞,线程安全!因此没有并发异常 5.3 Collections Collections 提供了方法 synchronizedList 保证 list 是同步线程安全的 public class NotSafeDemo {/*** 多个线程同时对集合进行修改* param args*/public static void main(String[] args) {List list Collections.synchronizedList(new ArrayList());for (int i 0; i 100; i) {new Thread(() -{list.add(UUID.randomUUID().toString());System.out.println(list);}, 线程 i).start();}} }没有并发修改异常 查看方法源码 public static T ListT synchronizedList(ListT list) {return (list instanceof RandomAccess ?new SynchronizedRandomAccessList(list) :new SynchronizedList(list));}5.4 CopyOnWriteArrayList(重点) 写时复制 首先我们对 CopyOnWriteArrayList 进行学习,其特点如下: 它相当于线程安全的 ArrayList。和 ArrayList 一样它是个可变数组但是和 ArrayList 不同的时它具有以下特性 它最适合于具有以下特征的应用程序List 大小通常保持很小只读操作远多于可变操作需要在遍历期间防止线程间的冲突。 它是线程安全的。 因为通常需要复制整个基础数组所以可变操作add()、set() 和 remove() 等等的开销很大。 迭代器支持 hasNext(), next()等不可变操作但不支持可变 remove()等操作。 使用迭代器进行遍历的速度很快并且不会与其他线程发生冲突。在构造迭代 器时迭代器依赖于不变的数组快照。 独占锁效率低采用读写分离思想解决 写线程获取到锁其他写线程阻塞 复制思想 当我们往一个容器添加元素的时候不直接往当前容器添加而是先将当前容器进行 Copy复制出一个新的容器然后新的容器里添加元素添加完元素之后再将原容器的引用指向新的容器。 这时候会抛出来一个新的问题也就是数据不一致的问题。如果写线程还没来得及写会内存其他的线程就会读到了脏数据。 这就是 CopyOnWriteArrayList 的思想和原理。就是拷贝一份。 NotSafeDemo 代码修改 public class NotSafeDemo {/*** 多个线程同时对集合进行修改* param args*/public static void main(String[] args) {List list Collections.synchronizedList(new ArrayList());for (int i 0; i 100; i) {new Thread(() -{list.add(UUID.randomUUID().toString());System.out.println(list);}, 线程 i).start();}} }没有线程安全问题 原因分析(重点):动态数组与线程安全 下面从“动态数组”和“线程安全”两个方面进一步对 CopyOnWriteArrayList 的原理进行说明。 “动态数组”机制 它内部有个“volatile 数组”(array)来保持数据。在“添加/修改/删除”数据 时都会新建一个数组并将更新后的数据拷贝到新建的数组中最后再将该 数组赋值给“volatile 数组”, 这就是它叫做 CopyOnWriteArrayList 的原因由于它在“添加/修改/删除”数据时都会新建数组所以涉及到修改数据的 操作CopyOnWriteArrayList 效率很低但是单单只是进行遍历查找的话 效率比较高。 “线程安全”机制 通过 volatile 和互斥锁来实现的。通过“volatile 数组”来保存数据的。一个线程读取 volatile 数组时总能看到其它线程对该 volatile 变量最后的写入就这样通过 volatile 提供了“读取到的数据总是最新的”这个机制的保证。通过互斥锁来保护数据。在“添加/修改/删除”数据时会先“获取互斥锁” 再修改完毕之后先将数据更新到“volatile 数组”中然后再“释放互斥 锁”就达到了保护数据的目的。 5.5 小结(重点) 1.线程安全与线程不安全集合 集合类型中存在线程安全与线程不安全的两种,常见例如: ArrayList ----- Vector HashMap -----HashTable 但是以上都是通过 synchronized 关键字实现,效率较低 2.Collections 构建的线程安全集合 3.java.util.concurrent 并发包下 CopyOnWriteArrayList CopyOnWriteArraySet 类型,通过动态数组与线程安 全个方面保证线程安全 ConcurrentHashMap : 线程安全的 HashMap CopyOnWriteArrayList : 线程安全的 List在读多写少的场合性能非常好远远好于 Vector。 ConcurrentLinkedQueue : 高效的并发队列使用链表实现。可以看做一个线程安全的 LinkedList这是一个非阻塞队列。 BlockingQueue : 这是一个接口JDK 内部通过链表、数组等方式实现了这个接口。表示阻塞队列非常适合用于作为数据共享的通道。 ConcurrentSkipListMap : 跳表的实现。这是一个 Map使用跳表的数据结构进行快速查找。 6 多线程锁 6.1公平锁和非公平锁 公平锁多个线程按照申请锁的顺序去获得锁线程会直接进入队列去排队永远都是队列的第一位才能得到锁。 优点所有的线程都能得到资源不会饿死在队列中。缺点吞吐量会下降很多队列里面除了第一个线程其他的线程都会阻塞cpu唤醒阻塞线程的开销会很大。 非公平锁多个线程去获取锁的时候会直接去尝试获取获取不到再去进入等待队列如果能获取到就直接获取到锁。 优点可以减少CPU唤醒线程的开销整体的吞吐效率会高点CPU也不必取唤醒所有线程会减少唤起线程的数量。缺点你们可能也发现了这样可能导致队列中间的线程一直获取不到锁或者长时间获取不到锁导致饿死。 Synchronized 默认 非公平锁不可修改。ReentrantLock 可根据入参修改new ReentrantLock(true); true 公平锁,false / 不传 非公平锁 6.2 可重入锁 可重入锁 就是一个线程不用释放可以重复的获取同一个锁n次只是在释放的时候也需要相应的释放n次。 Synchronized 和 Lock 多事可重入锁 6.3 死锁 所谓死锁是指多个进程在运行过程中因争夺同一资源而造成的一种僵局当进程处于这种僵持状态时若无外力作用它们都将无法再向前推进。 因此我们举个例子来描述如果此时有一个线程A按照先锁a再获得锁b的的顺序获得锁而在此同时又有另外一个线程B按照先锁b再锁a的顺序获得锁。如下代码所示 public class DeadLock {//创建两个对象static Object a new Object();static Object b new Object();public static void main(String[] args) {new Thread(()-{synchronized (a) {System.out.println(Thread.currentThread().getName() 持有锁a试图获取锁b);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}synchronized (b) {System.out.println(Thread.currentThread().getName() 获取锁b);}}},A).start();new Thread(()-{synchronized (b) {System.out.println(Thread.currentThread().getName() 持有锁b试图获取锁a);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}synchronized (a) {System.out.println(Thread.currentThread().getName() 获取锁a);}}},B).start();} }死锁产生的4个必要条件 产生死锁的必要条件 互斥条件进程要求对所分配的资源进行排它性控制即在一段时间内某资源仅为一进程所占用。请求和保持条件当进程因请求资源而阻塞时对已获得的资源保持不放。不剥夺条件进程已获得的资源在未使用完之前不能剥夺只能在使用完时由自己释放。环路等待条件在发生死锁时必然存在一个进程–资源的环形链。 解决死锁的基本方法 预防死锁 资源一次性分配一次性分配所有资源这样就不会再有请求了破坏请求条件只要有一个资源得不到分配也不给这个进程分配其他的资源破坏请保持条件可剥夺资源即当某进程获得了部分资源但得不到其它资源则释放已占有的资源破坏不可剥夺条件资源有序分配法系统给每类资源赋予一个编号每一个进程按编号递增的顺序请求资源释放则相反破坏环路等待条件 7 CallableFuture 接口 7.1 Callable 接口 目前我们学习了有两种创建线程的方法-一种是通过创建 Thread 类另一种是 通过使用 Runnable 创建线程。但是Runnable 缺少的一项功能是当线程终止时即 run完成时我们无法使线程返回结果。为了支持此功能 Java 中提供了 Callable 接口。 Callable 接口的特点如下(重点) 为了实现 Runnable需要实现不返回任何内容的 run方法而对于 Callable需要实现在完成时返回结果的 call方法。 call方法可以引发异常而 run则不能。 为实现 Callable 而必须重写 call 方法。 不能直接替换 runnable,因为 Thread 类的构造方法根本没有 Callable。 //创建新类 MyThread 实现 runnable 接口 class MyThread implements Runnable{Overridepublic void run() {} } //新类 MyThread2 实现 callable 接口 class MyThread2 implements CallableInteger {Overridepublic Integer call() throws Exception {return 200;} } 7.2 Future 接口 当 call()方法完成时结果必须存储在主线程已知的对象中以便主线程可 以知道该线程返回的结果。为此可以使用 Future 对象。 将 Future 视为保存结果的对象–它可能暂时不保存结果但将来会保存一旦 Callable 返回。Future 基本上是主线程可以跟踪进度以及其他线程的结果的 一种方式。要实现此接口必须重写 5 种方法这里列出了重要的方法,如下: public boolean cancelboolean mayInterrupt用于停止任务。 如果尚未启动它将停止任务。如果已启动则仅在 mayInterrupt 为 true 时才会中断任务。 public Object get抛出 InterruptedExceptionExecutionException用于获取任务的结果。 如果任务完成它将立即返回结果否则将等待任务完成然后返回结果。 public boolean isDone如果任务完成则返回 true否则返回 false 可以看到 Callable 和 Future 做两件事 Callable 与 Runnable 类似因为它封装了要在另一个线程上运行的任务而 Future 用于存储从另一个线程获得的结果。实际上future 也可以与 Runnable 一起使用。 要创建线程需要 Runnable。为了获得结果需要 future。 7.3 FutureTask Java 库具有具体的 FutureTask 类型该类型实现 Runnable 和 Future并方便地将两种功能组合在一起。 可以通过为其构造函数提供 Callable 来创建 FutureTask。然后将 FutureTask 对象提供给 Thread 的构造函数以创建 Thread 对象。因此间接地使用 Callable 创建线程。 核心原理:(重点) 在主线程中需要执行比较耗时的操作时但又不想阻塞主线程时可以把这些作业交给 Future 对象在后台完成 当主线程将来需要时就可以通过 Future 对象获得后台作业的计算结果或者执行状态。一般 FutureTask 多用于耗时的计算主线程可以在完成自己的任务后再去获取结果。仅在计算完成时才能检索结果如果计算尚未完成则阻塞 get 方法一旦计算完成就不能再重新开始或取消计算get 方法而获取结果只有在计算完成时获取否则会一直阻塞直到任务转入完 成状态然后会返回结果或者抛出异常get 只计算一次,因此 get 方法放到最后 7.4 使用 Callable 和 Future public class CallableDemo {/*** 实现 runnable 接口*/static class MyThread1 implements Runnable {/*** run 方法*/Overridepublic void run() {try {System.out.println(Thread.currentThread().getName() 线程进入了 run方法);} catch (Exception e) {e.printStackTrace();}}}/*** 实现 callable 接口*/static class MyThread2 implements Callable {/*** call 方法** return* throws Exception*/Overridepublic Long call() throws Exception {try {System.out.println(Thread.currentThread().getName() 线程进入了 call方法,开始准备睡觉);Thread.sleep(1000);System.out.println(Thread.currentThread().getName() 睡醒了);} catch (Exception e) {e.printStackTrace();}return System.currentTimeMillis();}}public static void main(String[] args) throws Exception {//声明 runableRunnable runable new MyThread1();//声明 callableCallable callable new MyThread2();//future-callableFutureTaskLong futureTask2 new FutureTask(callable);//线程二new Thread(futureTask2, 线程二).start();for (int i 0; i 10; i) {Long result1 futureTask2.get();System.out.println(result1);}//线程一new Thread(runable, 线程一).start();} }7.5 小结(重点) 在主线程中需要执行比较耗时的操作时但又不想阻塞主线程时可以把这些作业交给 Future 对象在后台完成, 当主线程将来需要时就可以通过 Future 对象获得后台作业的计算结果或者执行状态一般 FutureTask 多用于耗时的计算主线程可以在完成自己的任务后再去获取结果仅在计算完成时才能检索结果如果计算尚未完成则阻塞 get 方法。一旦计算完成就不能再重新开始或取消计算。get 方法而获取结果只有在计算完成 时获取否则会一直阻塞直到任务转入完成状态然后会返回结果或者抛出异常。只计算一次 8 JUC 三大辅助类 JUC 中提供了三种常用的辅助类通过这些辅助类可以很好的解决线程数量过 多时 Lock 锁的频繁操作。这三种辅助类为 CountDownLatch: 减少计数CyclicBarrier: 循环栅栏Semaphore: 信号灯 下面我们分别进行详细的介绍和学习 8.1 减少计数 CountDownLatch CountDownLatch 类可以设置一个计数器然后通过 countDown 方法来进行 减 1 的操作使用 await 方法等待计数器不大于 0然后继续执行 await 方法 之后的语句。 CountDownLatch 主要有两个方法当一个或多个线程调用 await 方法时这 些线程会阻塞其它线程调用 countDown 方法会将计数器减 1(调用 countDown 方法的线程 不会阻塞)当计数器的值变为 0 时因 await 方法阻塞的线程会被唤醒继续执行 场景: 6 个同学陆续离开教室后值班同学才可以关门。 CountDownLatchDemo public class CountDownLatchDemo {//6个同学陆续离开教室之后班长锁门public static void main(String[] args) throws InterruptedException {//创建CountDownLatch对象设置初始值CountDownLatch countDownLatch new CountDownLatch(6);//6个同学陆续离开教室之后for (int i 1; i 6; i) {new Thread(()-{System.out.println(Thread.currentThread().getName() 号同学离开了教室);//计数 -1countDownLatch.countDown();},String.valueOf(i)).start();}//等待countDownLatch.await();System.out.println(Thread.currentThread().getName() 班长锁门走人了);} }CountDownLatch 的原理是什么 CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用 countDown() 方法时,其实使用了tryReleaseShared方法以 CAS 的操作来减少 state,直至 state 为 0 。当调用 await() 方法的时候如果 state 不为 0那就证明任务还没有执行完毕await() 方法就会一直阻塞也就是说 await() 方法之后的语句不会被执行。然后CountDownLatch 会自旋 CAS 判断 state 0如果 state 0 的话就会释放所有等待的线程await() 方法之后的语句得到执行。 8.2 循环栅栏 CyclicBarrier CyclicBarrier 看英文单词可以看出大概就是循环阻塞的意思在使用中 CyclicBarrier 的构造方法第一个参数是目标障碍数每次执行 CyclicBarrier 一 次障碍数会加一如果达到了目标障碍数才会执行 cyclicBarrier.await()之后 的语句。可以将 CyclicBarrier 理解为加 1 操作 场景: 集齐 7 颗龙珠就可以召唤神龙 CyclicBarrierDemo public class CyclicBarrierDemo {//创建固定值private static final int NUMBER 7;public static void main(String[] args) {//创建CyclicBarrierCyclicBarrier cyclicBarrier new CyclicBarrier(NUMBER,()-{System.out.println(*****集齐7颗龙珠就可以召唤神龙);});//集齐七颗龙珠过程for (int i 1; i 7; i) {new Thread(()-{try {System.out.println(Thread.currentThread().getName() 星龙被收集到了);//等待cyclicBarrier.await();} catch (Exception e) {e.printStackTrace();}},String.valueOf(i)).start();}} }CyclicBarrier 的原理是什么 CyclicBarrier 内部通过一个 count 变量作为计数器count 的初始值为 parties 属性的初始化值每当一个线程到了栅栏这里了那么就将计数器减 1。如果 count 值为 0 了表示这是这一代最后一个线程到达栅栏就尝试执行我们构造方法中输入的任务。 //每次拦截的线程数 private final int parties; //计数器 private int count;下面我们结合源码来简单看看。 1、CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties)其参数表示屏障拦截的线程数量每个线程调用 await() 方法告诉 CyclicBarrier 我已经到达了屏障然后当前线程被阻塞。 public CyclicBarrier(int parties) {this(parties, null); }public CyclicBarrier(int parties, Runnable barrierAction) {if (parties 0) throw new IllegalArgumentException();this.parties parties;this.count parties;this.barrierCommand barrierAction; }其中parties 就代表了有拦截的线程的数量当拦截的线程数量达到这个值的时候就打开栅栏让所有线程通过。 2、当调用 CyclicBarrier 对象调用 await() 方法时实际上调用的是 dowait(false, 0L)方法。 await() 方法就像树立起一个栅栏的行为一样将线程挡住了当拦住的线程数量达到 parties 的值时栅栏才会打开线程才得以通过执行。 public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen} }dowait(false, 0L)方法源码分析如下 // 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。private int count;/*** Main barrier code, covering the various policies.*/private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock this.lock;// 锁住lock.lock();try {final Generation g generation;if (g.broken)throw new BrokenBarrierException();// 如果线程中断了抛出异常if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}// cout减1int index --count;// 当 count 数量减为 0 之后说明最后一个线程已经到达栅栏了也就是达到了可以执行await 方法之后的条件if (index 0) { // trippedboolean ranAction false;try {final Runnable command barrierCommand;if (command ! null)command.run();ranAction true;// 将 count 重置为 parties 属性的初始化值// 唤醒之前等待的线程// 下一波执行开始nextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}// loop until tripped, broken, interrupted, or timed outfor (;;) {try {if (!timed)trip.await();else if (nanos 0L)nanos trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g generation ! g.broken) {breakBarrier();throw ie;} else {// Were about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// belong to subsequent execution.Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g ! generation)return index;if (timed nanos 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}8.3 信号灯 Semaphore Semaphore 的构造方法中传入的第一个参数是最大信号量可以看成最大线程池每个信号量初始化为一个最多只能分发一个许可证。使用 acquire 方 法获得许可证release 方法释放许可 场景: 抢车位, 6 部汽车 3 个停车位 SemaphoreDemo public class SemaphoreDemo {public static void main(String[] args) {//创建Semaphore设置许可数量Semaphore semaphore new Semaphore(3);//模拟6辆汽车for (int i 1; i 6; i) {new Thread(()-{try {//抢占semaphore.acquire();System.out.println(Thread.currentThread().getName() 抢到了车位);//设置随机停车时间TimeUnit.SECONDS.sleep(new Random().nextInt(5));System.out.println(Thread.currentThread().getName() ------离开了车位);} catch (InterruptedException e) {e.printStackTrace();} finally {//释放车位semaphore.release();}},String.valueOf(i)).start();}} }Semaphore 的原理是什么 Semaphore 是共享锁的一种实现它默认构造 AQS 的 state 值为 permits你可以将 permits 的值理解为许可证的数量只有拿到许可证的线程才能执行。 调用semaphore.acquire() 线程尝试获取许可证如果 state 0 的话则表示可以获取成功。如果获取成功的话使用 CAS 操作去修改 state 的值 statestate-1。如果 state0 的话则表示许可证数量不足。此时会创建一个 Node 节点加入阻塞队列挂起当前线程。 /*** 获取1个许可证*/ public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1); } /*** 共享模式下获取许可证获取成功则返回失败则加入阻塞队列挂起线程*/ public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 尝试获取许可证arg为获取许可证个数当可用许可证数减当前获取的许可证数结果小于0,则创建一个节点加入阻塞队列挂起当前线程。if (tryAcquireShared(arg) 0)doAcquireSharedInterruptibly(arg); }调用semaphore.release(); 线程尝试释放许可证并使用 CAS 操作去修改 state 的值 statestate1。释放许可证成功之后同时会唤醒同步队列中的一个线程。被唤醒的线程会重新尝试去修改 state 的值 statestate-1 如果 state0 则获取令牌成功否则重新进入阻塞队列挂起线程。 // 释放一个许可证 public void release() {sync.releaseShared(1); }// 释放共享锁同时会唤醒同步队列中的一个线程。 public final boolean releaseShared(int arg) {//释放共享锁if (tryReleaseShared(arg)) {//唤醒同步队列中的一个线程doReleaseShared();return true;}return false; }9 读写锁 9.1 读写锁介绍 现实中有这样一种场景对共享资源有读和写的操作且写操作没有读操作那么频繁。在没有写操作的时候多个线程同时读一个资源没有任何问题所以 应该允许多个线程同时读取共享资源但是如果一个线程想去写这些共享资源 就不应该允许其他线程对该资源进行读和写的操作了。 针对这种场景JAVA 的并发包提供了读写锁 ReentrantReadWriteLock 它表示两个锁一个是读操作相关的锁称为共享锁一个是写相关的锁称 为排他锁 线程进入读锁的前提条件 没有其他线程的写锁没有写请求, 或者 有写请求但调用线程和持有锁的线程是同一个(可重入锁)。 线程进入写锁的前提条件 没有其他线程的读锁没有其他线程的写锁 而读写锁有以下三个重要的特性 1公平选择性支持非公平默认和公平的锁获取方式吞吐量还是非公 平优于公平。2重进入读锁和写锁都支持线程重进入。3锁降级遵循获取写锁、获取读锁再释放写锁的次序写锁能够降级成为读锁。 9.2 ReentrantReadWriteLock public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {/*** 读锁*/private final ReentrantReadWriteLock.ReadLock readerLock;/*** 写锁*/private final ReentrantReadWriteLock.WriteLock writerLock;final Sync sync;/*** 使用默认非公平的排序属性创建一个新的* ReentrantReadWriteLock*/public ReentrantReadWriteLock() {this(false);}/*** 使用给定的公平策略创建一个新的 ReentrantReadWriteLock*/public ReentrantReadWriteLock(boolean fair) {sync fair ? new FairSync() : new NonfairSync();readerLock new ReadLock(this);writerLock new WriteLock(this);}/*** 返回用于写入操作的锁*/public ReentrantReadWriteLock.WriteLock writeLock() {return writerLock;}/*** 返回用于读取操作的锁*/public ReentrantReadWriteLock.ReadLock readLock() {return readerLock;}abstract static class Sync extends AbstractQueuedSynchronizer {}static final class NonfairSync extends Sync {}static final class FairSync extends Sync {}public static class ReadLock implements Lock, java.io.Serializable {}public static class WriteLock implements Lock, java.io.Serializable {} }可以看到ReentrantReadWriteLock 实现了 ReadWriteLock 接口 ReadWriteLock 接口定义了获取读锁和写锁的规范具体需要实现类去实现 同时其还实现了 Serializable 接口表示可以进行序列化在源代码中可以看 到 ReentrantReadWriteLock 实现了自己的序列化逻辑。 9.3 入门案例 场景: 使用 ReentrantReadWriteLock 对一个 hashmap 进行读和写操作 //资源类 class MyCache {//创建map集合private volatile MapString,Object map new HashMap();//创建读写锁对象private ReadWriteLock rwLock new ReentrantReadWriteLock();//放数据public void put(String key,Object value) {//添加写锁rwLock.writeLock().lock();try {System.out.println(Thread.currentThread().getName() 正在写操作key);//暂停一会TimeUnit.MICROSECONDS.sleep(300);//放数据map.put(key,value);System.out.println(Thread.currentThread().getName() 写完了key);} catch (InterruptedException e) {e.printStackTrace();} finally {//释放写锁rwLock.writeLock().unlock();}}//取数据public Object get(String key) {//添加读锁rwLock.readLock().lock();Object result null;try {System.out.println(Thread.currentThread().getName() 正在读取操作key);//暂停一会TimeUnit.MICROSECONDS.sleep(300);result map.get(key);System.out.println(Thread.currentThread().getName() 取完了key);} catch (InterruptedException e) {e.printStackTrace();} finally {//释放读锁rwLock.readLock().unlock();}return result;} }public class ReadWriteLockDemo {public static void main(String[] args) throws InterruptedException {MyCache myCache new MyCache();//创建线程放数据for (int i 1; i 5; i) {final int num i;new Thread(()-{myCache.put(num,num);},String.valueOf(i)).start();}TimeUnit.MICROSECONDS.sleep(300);//创建线程取数据for (int i 1; i 5; i) {final int num i;new Thread(()-{myCache.get(num);},String.valueOf(i)).start();}} }锁降级 public class ReadWriteLockTest {private ReadWriteLock readWriteLock new ReentrantReadWriteLock();private Lock writeLock readWriteLock.writeLock();private Lock readLock readWriteLock.readLock();private volatile boolean updateFlag;/**锁降级*/public void test(){readLock.lock();if(!updateFlag){readLock.unlock();//肯定要先开释了读锁再去获取写锁如果间接获取写锁以后线程会被阻塞writeLock.lock();//step1,获取写锁try {if(!updateFlag){//批改数据逻辑略。updateFlag true;}readLock.lock();//step2,获取读锁排挤其它写锁批改数据}finally {writeLock.unlock();//step3,开释写锁。到这里锁降级实现}}try {//如果step2不先获取读锁在step3开释了写锁后其它线程会对数据进行批改//会使得上面‘读取数据逻辑’里呈现数据读取不精确的问题//读取数据逻辑略。}finally {readLock.unlock();step4}} }9.4 小结(重要) 在线程持有读锁的情况下该线程不能取得写锁(因为获取写锁的时候如果发 现当前的读锁被占用就马上获取失败不管读锁是不是被当前线程持有)。 在线程持有写锁的情况下该线程可以继续获取读锁获取读锁时如果发现写锁被占用只有写锁没有被当前线程占用的情况才会获取失败。 原因: 当线程获取读锁的时候可能有其他线程同时也在持有读锁因此不能把获取读锁的线程“升级”为写锁而对于获得写锁的线程它一定独占了读写 锁因此可以继续让它获取读锁当它同时获取了写锁和读锁后还可以先释放写锁继续持有读锁这样一个写锁就“降级”为了读锁。 10 阻塞队列 10.1 BlockingQueue 简介 Concurrent 包中BlockingQueue 很好的解决了多线程中如何高效安全 “传输”数据的问题。通过这些高效并且线程安全的队列类为我们快速搭建 高质量的多线程程序带来极大的便利。本文详细介绍了 BlockingQueue 家庭中的所有成员包括他们各自的功能以及常见使用场景。 阻塞队列顾名思义首先它是一个队列, 通过一个共享的队列可以使得数据 由队列的一端输入从另外一端输出 当队列是空的从队列中获取元素的操作将会被阻塞 当队列是满的从队列中添加元素的操作将会被阻塞 试图从空的队列中获取元素的线程将会被阻塞直到其他线程往空的队列插入新的元素 试图向已满的队列中添加新元素的线程将会被阻塞直到其他线程从队列中移除一个或多 个元素或者完全清空使队列变得空闲起来并后续新增 常用的队列主要有以下两种 先进先出FIFO先插入的队列的元素也最先出队列类似于排队的功能。 从某种程度上来说这种队列也体现了一种公平性后进先出LIFO后插入队列的元素最先出队列这种队列优先处理最近发 生的事件(栈) 在多线程领域所谓阻塞在某些情况下会挂起线程即阻塞一旦条件满足被挂起的线程又会自动被唤起 为什么需要 BlockingQueue 好处是我们不需要关心什么时候需要阻塞线程什么时候需要唤醒线程因为这一切 BlockingQueue 都给你一手包办了。 在 concurrent 包发布以前在多线程环境下我们每个程序员都必须去自己控制这些细 节尤其还要兼顾效率和线程安全而这会给我们的程序带来不小的复杂度。 多线程环境中通过队列可以很容易实现数据共享比如经典的“生产者”和 “消费者”模型中通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程另外又有若干个消费者线程。如果生产者线程需要把准 备好的数据共享给消费者线程利用队列的方式来传递数据就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内万一 发生数据处理速度不匹配的情况呢理想情况下如果生产者产出数据的速度大于消费者消费的速度并且当生产出来的数据累积到一定程度的时候那么生产者必须暂停等待一下阻塞生产者线程以便等待消费者线程把累积的 数据处理完毕反之亦然。 当队列中没有数据的情况下消费者端的所有线程都会被自动阻塞挂起 直到有数据放入队列当队列中填满数据的情况下生产者端的所有线程都会被自动阻塞挂起 直到队列中有空的位置线程被自动唤醒 10.2 BlockingQueue 核心方法 BlockingQueue 的核心方法 1.放入数据 offer(anObject):表示如果可能的话,将 anObject 加到 BlockingQueue 里,即如果 BlockingQueue 可以容纳,则返回 true,否则返回 false.本方法不阻塞当前执行方法的线程offer(E o, long timeout, TimeUnit unit)可以设定等待的时间如果在指定的时间内还不能往队列中加入 BlockingQueue则返回失败put(anObject):把 anObject 加到 BlockingQueue 里,如果 BlockQueue 没有 空间,则调用此方法的线程被阻断直到 BlockingQueue 里面有空间再继续 2.获取数据 poll(time): 取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等 time 参数规定的时间,取不到时返回 nullpoll(long timeout, TimeUnit unit)从 BlockingQueue 取出一个队首的对象 如果在指定时间内队列一旦有数据可取则立即返回队列中的数据。否则知 道时间超时还没有数据可取返回失败。take(): 取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断 进入等待状态直到 BlockingQueue 有新的数据被加入;drainTo(): 一次性从 BlockingQueue 获取所有可用的数据对象还可以指定 获取数据的个数通过该方法可以提升获取数据效率不需要多次分批加 锁或释放锁。 10.3 入门案例 public class BlockingQueueDemo {public static void main(String[] args) throws InterruptedException {//创建阻塞队列BlockingQueueString blockingQueue new ArrayBlockingQueue(3);//第一组 // System.out.println(blockingQueue.add(a));System.out.println(blockingQueue.add(b));System.out.println(blockingQueue.add(c));//System.out.println(blockingQueue.element());//System.out.println(blockingQueue.add(w));System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());//第二组 // System.out.println(blockingQueue.offer(a)); // System.out.println(blockingQueue.offer(b)); // System.out.println(blockingQueue.offer(c)); // System.out.println(blockingQueue.offer(www)); // // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll());//第三组 // blockingQueue.put(a); // blockingQueue.put(b); // blockingQueue.put(c); // //blockingQueue.put(w); // // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take());//第四组System.out.println(blockingQueue.offer(a));System.out.println(blockingQueue.offer(b));System.out.println(blockingQueue.offer(c));System.out.println(blockingQueue.offer(w,3L, TimeUnit.SECONDS));}10.4 常见的 BlockingQueue 10.4.1 ArrayBlockingQueue(常用) 基于数组的阻塞队列实现在 ArrayBlockingQueue 内部维护了一个定长数组以便缓存队列中的数据对象这是一个常用的阻塞队列除了一个定长数组外ArrayBlockingQueue 内部还保存着两个整形变量分别标识着队列的头部和尾部在数组中的位置。 ArrayBlockingQueue 在生产者放入数据和消费者获取数据都是共用同一个锁对象由此也意味着两者无法真正并行运行这点尤其不同于 LinkedBlockingQueue按照实现原理来分析ArrayBlockingQueue 完全可以采用分离锁从而实现生产者和消费者操作的完全并行运行。Doug Lea 之所以没这样去做也许是因为 ArrayBlockingQueue 的数据写入和获取操作已经足够轻巧以至于引入独立的锁机制除了给代码带来额外的复杂性外其在性能上完全占不到任何便宜。 ArrayBlockingQueue 和 LinkedBlockingQueue 间还有一个明显的不同之处在于前者在插入或删除元素时不会产生或销毁任何额外的对象实例而后者则会生成一个额外的 Node 对象。这在长时间内需要高效并发地处理大批量数据的系统中其对于 GC 的影响还是存在一定的区别。而在创建 ArrayBlockingQueue 时我们还可以控制对象的内部锁是否采用公平锁默认采用非公平锁。 一句话总结: 由数组结构组成的有界阻塞队列。 10.4.2 LinkedBlockingQueue(常用) 基于链表的阻塞队列同 ArrayListBlockingQueue 类似其内部也维持着一个数据缓冲队列该队列由一个链表构成当生产者往队列中放入一个数据时队列会从生产者手中获取数据并缓存在队列内部而生产者立即返回 只有当队列缓冲区达到最大值缓存容量时LinkedBlockingQueue 可以通过构造函数指定该值才会阻塞生产者队列直到消费者从队列中消费掉一份数据生产者线程会被唤醒反之对于消费者这端的处理也基于同样的原理。 而LinkedBlockingQueue 之所以能够高效的处理并发数据还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据以此来提高整个队列 的并发性能。 ArrayBlockingQueue 和 LinkedBlockingQueue 是两个最普通也是最常用 的阻塞队列一般情况下在处理多线程间的生产者消费者问题使用这两个 类足以。 一句话总结: 由链表结构组成的有界但大小默认值为 integer.MAX_VALUE阻塞队列。 10.4.3 DelayQueue DelayQueue 中的元素只有当其指定的延迟时间到了才能够从队列中获取到该元素。DelayQueue 是一个没有大小限制的队列因此往队列中插入数据的操作生产者永远不会被阻塞而只有获取数据的操作消费者才会被阻塞。 一句话总结: 使用优先级队列实现的延迟无界阻塞队列。 10.4.4 PriorityBlockingQueue 基于优先级的阻塞队列优先级的判断通过构造函数传入的 Compator 对象来决定但需要注意的是 PriorityBlockingQueue 并不会阻塞数据生产者而只会在没有可消费的数据时阻塞数据的消费者。 因此使用的时候要特别注意生产者生产数据的速度绝对不能快于消费者消费 数据的速度否则时间一长会最终耗尽所有的可用堆内存空间。 在实现 PriorityBlockingQueue 时内部控制线程同步的锁采用的是公平锁。 一句话总结: 支持优先级排序的无界阻塞队列。 10.4.5 SynchronousQueue 一种无缓冲的等待队列类似于无中介的直接交易有点像原始社会中的生产者和消费者生产者拿着产品去集市销售给产品的最终消费者而消费者必须亲自去集市找到所要商品的直接生产者如果一方没有找到合适的目标那么对不起大家都在集市等待。相对于有缓冲的 BlockingQueue 来说少了一个中间经销商的环节缓冲区如果有经销商生产者直接把产品批发给经销商而无需在意经销商最终会将这些产品卖给那些消费者由于经销商可以库存一部分商品因此相对于直接交易模式总体来说采用中间经销商的模式 会吞吐量高一些可以批量买卖但另一方面又因为经销商的引入使得产品从生产者到消费者中间增加了额外的交易环节单个产品的及时响应性能可能会降低。 声明一个 SynchronousQueue 有两种不同的方式它们之间有着不太一样的行为。 公平模式和非公平模式的区别: 公平模式SynchronousQueue 会采用公平锁并配合一个 FIFO 队列来阻塞多余的生产者和消费者从而体系整体的公平策略非公平模式SynchronousQueue 默认SynchronousQueue 采用非公平 锁同时配合一个 LIFO 队列来管理多余的生产者和消费者而后一种模式 如果生产者和消费者的处理速度有差距则很容易出现饥渴的情况即可能有某些生产者或者是消费者的数据永远都得不到处理。 一句话总结: 不存储元素的阻塞队列也即单个元素的队列。 10.4.6 LinkedTransferQueue LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其他阻塞队列LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。 LinkedTransferQueue 采用一种预占模式。意思就是消费者线程取元素时如果队列不为空则直接取走数据若队列为空那就生成一个节点节点元素 为 null入队然后消费者线程被等待在这个节点上后面生产者线程入队时发现有一个元素为 null 的节点生产者线程就不入队了直接就将元素填充到该节点并唤醒该节点等待的线程被唤醒的消费者线程取走元素从调用的方法返回。 一句话总结: 由链表组成的无界阻塞队列。 10.4.7 LinkedBlockingDeque LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列即可以从队列的两端插入和移除元素。 对于一些指定的操作在插入或者获取队列元素时如果队列状态不允许该操作可能会阻塞住该线程直到队列状态变更为允许操作这里的阻塞一般有两种情况。 插入元素时: 如果当前队列已满将会进入阻塞状态一直等到队列有空的位置时再将该元素插入该操作可以通过设置超时参数超时后返回 false 表示操作失败也可以不设置超时参数一直阻塞中断后抛出 InterruptedException 异常。 读取元素时: 如果当前队列为空会阻塞住直到队列不为空然后返回元素同样可 以通过设置超时参数 一句话总结: 由链表组成的双向阻塞队列 10.5 小结 在多线程领域 所谓阻塞在某些情况下会挂起线程即阻塞一旦条件满足被挂起的线程又会自动被唤起。为什么需要 BlockingQueue? 在 concurrent 包发布以前在多线程环境下 我们每个程序员都必须去自己控制这些细节尤其还要兼顾效率和线程安全 而这会给我们的程序带来不小的复杂度。使用后我们不需要关心什么时候需要阻塞线程什么时候需要唤醒线程因为这一切 BlockingQueue 都给你一手包办了 11 ThreadPool 线程池 11.1 线程池简介 线程池英语thread pool一种线程使用模式。线程过多会带来调度开销 进而影响缓存局部性和整体性能。而线程池维护着多个线程等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代 价。线程池不仅能够保证内核的充分利用还能防止过分调度。 线程池提供了一种限制和管理资源包括执行一个任务的方式。 每个线程池还维护一些基本统计信息例如已完成任务的数量。 这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。提高响应速度。当任务到达时任务可以不需要等到线程创建就能立即执行。提高线程的可管理性。线程是稀缺资源如果无限制的创建不仅会消耗系统资源还会降低系统的稳定性使用线程池可以进行统一的分配调优和监控。 Java 中的线程池是通过 Executor 框架实现的该框架中用到了 ExecutorExecutors ExecutorServiceThreadPoolExecutor 这几个类 11.2 线程池参数说明 /*** 用给定的初始参数创建一个新的ThreadPoolExecutor。*/public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量int maximumPoolSize,//线程池的最大线程数long keepAliveTime,//当线程数大于核心线程数时多余的空闲线程存活的最长时间TimeUnit unit,//时间单位BlockingQueueRunnable workQueue,//任务队列用来储存等待执行任务的队列ThreadFactory threadFactory,//线程工厂用来创建线程一般默认即可RejectedExecutionHandler handler//拒绝策略当提交的任务过多而不能及时处理时我们可以定制策略来处理任务) {if (corePoolSize 0 ||maximumPoolSize 0 ||maximumPoolSize corePoolSize ||keepAliveTime 0)throw new IllegalArgumentException();if (workQueue null || threadFactory null || handler null)throw new NullPointerException();this.corePoolSize corePoolSize;this.maximumPoolSize maximumPoolSize;this.workQueue workQueue;this.keepAliveTime unit.toNanos(keepAliveTime);this.threadFactory threadFactory;this.handler handler;}11.2.1 常用参数(重点) ThreadPoolExecutor 3 个最重要的参数 corePoolSize : 核心线程数定义了最小可以同时运行的线程数量。 maximumPoolSize : 当队列中存放的任务达到队列容量的时候当前可以同时运行的线程数量变为最大线程数。 workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数如果达到的话新任务就会被存放在队列中。 ThreadPoolExecutor其他常见参数: keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候如果这时没有新的任务提交核心线程外的线程不会立即销毁而是会等待直到等待的时间超过了 keepAliveTime才会被回收销毁unit : keepAliveTime 参数的时间单位。threadFactory :executor 创建新线程的时候会用到。handler :饱和策略。关于饱和策略下面单独介绍一下。 线程池的饱和策略有哪些 如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时ThreadPoolTaskExecutor 定义一些策略: ThreadPoolExecutor.AbortPolicy 抛出 RejectedExecutionException来拒绝新任务的处理。 ThreadPoolExecutor.CallerRunsPolicy 调用执行自己的线程运行任务也就是直接在调用execute方法的线程中运行(run)被拒绝的任务如果执行程序已关闭则会丢弃该任务。因此这种策略会降低对于新任务提交速度影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话你可以选择这个策略。 ThreadPoolExecutor.DiscardPolicy 不处理新任务直接丢弃掉。 ThreadPoolExecutor.DiscardOldestPolicy 此策略将丢弃最早的未处理的任务请求 举个例子 Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时此策略为我们提供可伸缩队列。这个直接查看 ThreadPoolExecutor 的构造函数源码就可以看出比较简单的原因这里就不贴代码了) 11.3 线程池的种类与创建 11.3.1 newCachedThreadPool 作用 创建一个可缓存线程池如果线程池长度超过处理需要可灵活回收空 闲线程若无可回收则新建线程. 特点: 线程池中数量没有固定可达到最大值Interger. MAX_VALUE线程池中的线程可进行缓存重复利用和回收回收默认时间为 1 分钟当线程池中没有可用线程会重新创建一个线程 创建方式 /** * 可缓存线程池 * return */ public static ExecutorService newCachedThreadPool(){/*** corePoolSize 线程池的核心线程数* maximumPoolSize 能容纳的最大线程数* keepAliveTime 空闲线程存活时间* unit 存活的时间单位* workQueue 存放提交但未执行任务的队列* threadFactory 创建线程的工厂类:可以省略* handler 等待队列满后的拒绝策略:可以省略*/return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); }场景: 适用于创建一个可无限扩大的线程池服务器负载压力较轻执行时间较 短任务多的场景 11.3.2 newFixedThreadPool 作用创建一个可重用固定线程数的线程池以共享的无界队列方式来运行这些线程。在任意点在大多数线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务则在有可用线程之前附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止那么一个新线程将代替它执行后续的任务如果需要。在某个线程被显式地关闭之前池 中的线程将一直存在。 特征 线程池中的线程处于一定的量可以很好的控制线程的并发量线程可以重复被使用在显示关闭之前都将一直存在超出一定量的线程被提交时候需在队列中等待 创建方式 /** * 固定长度线程池 * return */ public static ExecutorService newFixedThreadPool(){/*** corePoolSize 线程池的核心线程数* maximumPoolSize 能容纳的最大线程数* keepAliveTime 空闲线程存活时间* unit 存活的时间单位* workQueue 存放提交但未执行任务的队列* threadFactory 创建线程的工厂类:可以省略* handler 等待队列满后的拒绝策略:可以省略*/return new ThreadPoolExecutor(10,10,0L,TimeUnit.SECONDS,new LinkedBlockingQueue(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); }场景: 适用于可以预测线程数量的业务中或者服务器负载较重对线程数有严 格限制的场景 11.3.3 newSingleThreadExecutor(常用) 作用 创建一个使用单个 worker 线程的 Executor以无界队列方式来运行该线程。注意如果因为在关闭前的执行期间出现失败而终止了此单个线程 那么如果需要一个新线程将代替它执行后续的任务。可保证顺序地执行各个任务并且在任意给定的时间不会有多个线程是活动的。与其他等效的newFixedThreadPool 不同可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。 特征 线程池中最多执行 1 个线程之后提交的线程活动将会排在队列中以此执行 创建方式 /** * 单一线程池 * return */ public static ExecutorService newSingleThreadExecutor(){/*** corePoolSize 线程池的核心线程数* maximumPoolSize 能容纳的最大线程数* keepAliveTime 空闲线程存活时间* unit 存活的时间单位* workQueue 存放提交但未执行任务的队列* threadFactory 创建线程的工厂类:可以省略* handler 等待队列满后的拒绝策略:可以省略*/return new ThreadPoolExecutor(1,1,0L,TimeUnit.SECONDS,new LinkedBlockingQueue(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); }场景: 适用于需要保证顺序执行各个任务并且在任意时间点不会同时有多个线程的场景 11.3.4 newScheduleThreadPool(了解) 作用: 线程池支持定时以及周期性执行任务创建一个 corePoolSize 为传入参数最大线程数为整形的最大数的线程池 特征: 1线程池中具有指定数量的线程即便是空线程也将保留 2可定时或者延迟执行线程活动 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }场景: 适用于需要多个后台线程执行周期任务的场景 11.3.5 newWorkStealingPool jdk1.8 提供的线程池底层使用的是 ForkJoinPool实现创建一个拥有多个 任务队列的线程池可以减少连接数创建当前可用 cpu 核数的线程来并行执行任务。 创建方式: public static ExecutorService newWorkStealingPool(int parallelism) {/*** parallelism并行级别通常默认为 JVM 可用的处理器个数* factory用于创建 ForkJoinPool 中使用的线程。* handler用于处理工作线程未处理的异常默认为 null* asyncMode用于控制 WorkQueue 的工作模式:队列---反队列*/return new ForkJoinPool(parallelism,ForkJoinPool.defaultForkJoinWorkerThreadFactory,null,true); }场景: 适用于大耗时可并行执行的场景 11.4 线程池入门案例 public class ThreadPoolDemo1 {/*** 火车站 3 个售票口, 10 个用户买票** param args*/public static void main(String[] args) {//定时线程次:线程数量为 3---窗口数为 3ExecutorService threadService new ThreadPoolExecutor(3,3,60L,TimeUnit.SECONDS,new LinkedBlockingQueue(),Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy());try {//10 个人买票for (int i 1; i 10; i) {threadService.execute(() - {try {System.out.println(Thread.currentThread().getName() 窗口,开始卖票);Thread.sleep(5000);System.out.println(Thread.currentThread().getName() 窗口买票结束);} catch (Exception e) {e.printStackTrace();}});}} catch (Exception e) {e.printStackTrace();} finally {//完成后结束threadService.shutdown();}} }11.5 线程池底层工作原理(重要) 在创建了线程池后线程池中的线程数为零 当调用 execute()方法添加一个请求任务时线程池会做出如下判断 2.1 如果正在运行的线程数量小于 corePoolSize那么马上创建线程运行这个任务 2.2 如果正在运行的线程数量大于或等于 corePoolSize那么将这个任务放入队列 2.3 如果这个时候队列满了且正在运行的线程数量还小于 maximumPoolSize, 那么还是要创建非核心线程立刻运行这个任务 2.4 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize那么线程池会启动饱和拒绝策略来执行。 当一个线程完成任务时它会从队列中取下一个任务来执行 当一个线程无事可做超过一定的时间keepAliveTime时线程会判断 4.1 如果当前运行的线程数大于 corePoolSize那么这个线程就被停掉。 4.2 所以线程池的所有任务完成后它最终会收缩到 corePoolSize 的大小。 11.6 注意事项(重要) 项目中创建多线程时使用常见的三种线程池创建方式单一、可变、定长都有一定问题原因是 FixedThreadPool 和 SingleThreadExecutor 底层都是用 LinkedBlockingQueue 实现的这个队列最大长度为 Integer.MAX_VALUE 容易导致 OOM。所以实际生产一般自己通过 ThreadPoolExecutor 的 7 个参数自定义线程池。 创建线程池推荐适用 ThreadPoolExecutor 及其 7 个参数手动创建 corePoolSize 线程池的核心线程数maximumPoolSize 能容纳的最大线程数keepAliveTime 空闲线程存活时间unit 存活的时间单位workQueue 存放提交但未执行任务的队列threadFactory 创建线程的工厂类handler 等待队列满后的拒绝策略 为什么不允许适用不允许 Executors.的方式手动创建线程池,如下图 12 Fork/Join 12.1 Fork/Join 框架简介 Fork/Join 它可以将一个大的任务拆分成多个子任务进行并行处理最后将子 任务结果合并成最后的计算结果并进行输出。Fork/Join 框架要完成两件事情 Fork把一个复杂任务进行分拆大事化小 Join把分拆任务的结果进行合并 任务分割首先 Fork/Join 框架需要把大的任务分割成足够小的子任务如果 子任务比较大的话还要对子任务进行继续分割 执行任务并合并结果 分割的子任务分别放到双端队列里然后几个启动线程 分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里 启动一个线程从队列里取数据然后合并这些数据。 在 Java 的 Fork/Join 框架中使用两个类完成上述操作 ForkJoinTask:我们要使用 Fork/Join 框架首先需要创建一个 ForkJoin 任务。 该类提供了在任务中执行 fork 和 join 的机制。通常情况下我们不需要直接集成 ForkJoinTask 类只需要继承它的子类Fork/Join 框架提供了两个子类 RecursiveAction用于没有返回结果的任务RecursiveTask:用于有返回结果的任务 ForkJoinPool: ForkJoinTask 需要通过 ForkJoinPool 来执行 RecursiveTask: 继承后可以实现递归(自己调自己)调用的任务 Fork/Join 框架的实现原理 ForkJoinPool 由 ForkJoinTask 数组和 ForkJoinWorkerThread 数组组成 ForkJoinTask 数组负责将存放以及将程序提交给 ForkJoinPool而 ForkJoinWorkerThread 负责执行这些任务。 12.2 Fork 方法 递归任务 继承后可以实现递归自己调自己调用任务 Fork 方法的实现原理 当我们调用 ForkJoinTask 的 fork 方法时程序会把任务放在 ForkJoinWorkerThread 的 pushTask 的 workQueue 中异步地执行这个任务然后立即返回结果。 public final ForkJoinTaskV fork() {Thread t;if ((t Thread.currentThread()) instanceof ForkJoinWorkerThread)((ForkJoinWorkerThread) t).workQueue.push(this);elseForkJoinPool.common.externalPush(this);return this;}pushTask 方法把当前任务存放在 ForkJoinTask 数组队列里。然后再调用 ForkJoinPool 的 signalWork()方法唤醒或创建一个工作线程来执行任务。代码如下 final void push(ForkJoinTask? task) {ForkJoinTask?[] a;ForkJoinPool p;int b base, s top, n;if ((a array) ! null) { // ignore if queue removed int m a.length - 1; // fenced write for task visibilityU.putOrderedObject(a, ((m s) ASHIFT) ABASE, task);U.putOrderedInt(this, QTOP, s 1);if ((n s - b) 1) {if ((p pool) ! null)p.signalWork(p.workQueues, this);//执行} else if (n m) growArray();}}12.3 join 方法 Join 方法的主要作用是阻塞当前线程并等待获取结果。让我们一起看看 ForkJoinTask 的 join 方法的实现代码如下 public final V join() {int s;if ((s doJoin() DONE_MASK) ! NORMAL)reportException(s);return getRawResult();}它首先调用 doJoin 方法通过 doJoin()方法得到当前任务的状态来判断返回 什么结果任务状态有 4 种 已完成NORMAL、被取消CANCELLED、信号SIGNAL和出 现异常EXCEPTIONAL 如果任务状态是已完成则直接返回任务结果。如果任务状态是被取消则直接抛出 CancellationException如果任务状态是抛出异常则直接抛出对应的异常 让我们分析一下 doJoin 方法的实现 private int doJoin() {int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueuew;return (s status) 0 ? s :((t Thread.currentThread()) instanceof ForkJoinWorkerThread) ?(w (wt (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) (s doExec()) 0 ? s :wt.pool.awaitJoin(w, this, 0L) :externalAwaitDone();}final int doExec() {int s; boolean completed;if ((s status) 0) {try {completed exec();} catch (Throwable rex) {return setExceptionalCompletion(rex);}if (completed)s setCompletion(NORMAL);}return s;}在 doJoin()方法流程如下: 首先通过查看任务的状态看任务是否已经执行完成如果执行完成则直接 返回任务状态如果没有执行完则从任务数组里取出任务并执行。如果任务顺利执行完成则设置任务状态为 NORMAL如果出现异常则记 录异常并将任务状态设置为 EXCEPTIONAL。 12.4 Fork/Join 框架的异常处理 ForkJoinTask 在执行的时候可能会抛出异常但是我们没办法在主线程里直接 捕获异常所以 ForkJoinTask 提供了 isCompletedAbnormally()方法来检查 任务是否已经抛出异常或已经被取消了并且可以通过 ForkJoinTask 的 getException 方法获取异常。 getException 方法返回 Throwable 对象如果任务被取消了则返回 CancellationException。如果任务没有完成或者没有抛出异常则返回 null。 12.5 入门案例 场景: 生成一个计算任务计算 123…1000,每 100 个数切分一个子任务 class MyTask extends RecursiveTaskInteger {//拆分差值不能超过10计算10以内运算private static final Integer VALUE 10;private int begin ;//拆分开始值private int end;//拆分结束值private int result ; //返回结果//创建有参数构造public MyTask(int begin,int end) {this.begin begin;this.end end;}//拆分和合并过程Overrideprotected Integer compute() {//判断相加两个数值是否大于10if((end-begin)VALUE) {//相加操作for (int i begin; i end; i) {result resulti;}} else {//进一步拆分//获取中间值int middle (beginend)/2;//拆分左边MyTask task01 new MyTask(begin,middle);//拆分右边MyTask task02 new MyTask(middle1,end);//调用方法拆分task01.fork();task02.fork();//合并结果result task01.join()task02.join();}return result;} }public class ForkJoinDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建MyTask对象MyTask myTask new MyTask(0,100);//创建分支合并池对象ForkJoinPool forkJoinPool new ForkJoinPool();ForkJoinTaskInteger forkJoinTask forkJoinPool.submit(myTask);//获取最终合并之后结果Integer result forkJoinTask.get();System.out.println(result);//关闭池对象forkJoinPool.shutdown();} }13 CompletableFuture (异步任务编排) 13.1 CompletableFuture 简介 CompletableFuture 在 Java 里面被用于异步编程异步通常意味着非阻塞可以使得我们的任务单独运行在与主线程分离的其他线程中并且通过回调可以在主线程中得到异步任务的执行状态是否完成和是否异常等信息。 CompletableFuture 实现了 Future, CompletionStage 接口实现了 Future 接口就可以兼容现在有线程池框架而 CompletionStage 接口才是异步编程的接口抽象里面定义多种异步方法通过这两者集合从而打造出了强大的CompletableFuture 类。 13.2 Future 与 CompletableFuture Futrue 在 Java 里面通常用来表示一个异步任务的引用比如我们将任务提交到线程池里面然后我们会得到一个 Futrue在 Future 里面有 isDone 方法来判断任务是否处理结束还有 get 方法可以一直阻塞直到任务结束然后获取结果但整体来说这种方式还是同步的因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。 Future 的主要缺点如下 1不支持手动完成 我提交了一个任务但是执行太慢了我通过其他路径已经获取到了任务结果 现在没法把这个任务结果通知到正在执行的线程所以必须主动取消或者一直等待它执行完成 2不支持进一步的非阻塞调用 通过 Future 的 get 方法会一直阻塞到任务完成但是想在获取任务之后执行额外的任务因为 Future 不支持回调函数所以无法实现这个功能。 3不支持链式调用 对于 Future 的执行结果我们想继续传到下一个 Future 处理使用从而形成 一个链式的 pipline 调用这在 Future 中是没法实现的。 4不支持多个 Future 合并 比如我们有 10 个 Future 并行执行我们想在所有的 Future 运行完毕之后 执行某些函数是没法通过 Future 实现的。 5不支持异常处理 Future 的 API 没有任何的异常处理的 api所以在异步运行时如果出了问题是不好定位的。 13.3 CompletableFuture的实战代码示例 具体使用参考 异步神器CompletableFuture让你的代码免受阻塞之苦
http://www.hkea.cn/news/14420106/

相关文章:

  • 深圳网站开发公司哪家好哪里卖网站域名
  • 网站横幅怎么更换网站建设要备案吗
  • 做网站注意的问题陕西省住房与建设厅网站
  • 门户网站用什么源码购买一个网站空间如何可以多个域名使用吗
  • 网站开发哪里有首商网官网
  • app扁平化设计网站模板花钱也可以哪些网站可以做推广广告
  • 用来做调查问卷的网站wordpress表单提交 阿里云邮箱
  • 大学《网站开发与应用》试题网站微信链接怎么做的
  • 网站界面设计工具网站设计师需要什么知识与技能
  • wordpress站点美化企业展厅设计公司哪个好看
  • asp 网站源码京东网站建设现状
  • 网页免费浏览网站wordpress网站科学主题
  • 免费手机网站商城厦门 网站建设 公司哪家好
  • 河北省建设工程安全生产监督管理网站杭州建设公司网站
  • 淮北做网站微博通 wordpress
  • 二手车网站开发过程网站开发 重庆
  • 青羊区网站建设公司wordpress创建自定义页面模板
  • 网站定制报价网站建设项目补充协议
  • 制作一个自适应网站源码最新网站建设方案
  • 企业做产品网站费用大概是多少群晖nas安装wordpress安装
  • 软件通网站建设百度抓取网站登录
  • 建设微信营销网站检测设备技术支持东莞网站建设
  • 台州网站开发公司小区网络设计方案
  • 青岛网站制作网站宁城网站建设公司
  • 网站透明背景杭州互联网网页设计公司
  • 什么网站流量高软件人员外包
  • 基础建设的网站有哪些内容厦门网站建设哪家强
  • 网站可以做软著吗网站续费协议
  • 做网站用百度百科的资料会侵权吗西安米德建站
  • 自己做网站用花钱吗购物网站模板下载