JUC基础

进程与线程

基本概念

进程是系统进行资源分配和调度的基本单位,是程序的实体,是资源分配最小单位。

线程操作系统能进行运算调度的最小单位,是进程中的实际运作单位。一个进程可以并发多个线程,执行不同的任务。

线程状态

java线程包含六种状态:NEW(新建)、RUNNABLE(准备就绪)、BLOCKED(阻塞)、WAITING(等待-一直等待)、TIMED_WAITING(等待-等待一段时间)、TERMINATED(终结)

wait 和 sleep

  1. sleep是Thread的静态方法;wait是Object的方法,任何对象实例都能调用。
  2. sleep不会释放锁,它也不需要占用锁;wait会释放锁并进入等待,直到被其他线程notify唤醒,但调用它的前提是当前线程占有锁(即代码要在synchronized中)
  3. 它们都可以被interrupt方法中断

并发和并行

并发:同一时间间隔内多个线程交替执行,实际上是宏观上并行,微观上串行

并行:同一时刻多个线程正在执行

管程

又叫监视器monitor,保证了同一时刻只有一个进程在管程内活动,即管程内定义的操作在同一时刻只被一个进程调用(由编译器实现,如synchronized关键字)。

他是一种程序结构,提供了一种资源互斥使用的机制,也就是锁机制。

用户线程和守护线程

用户线程:自定义线程:主线程结束了,用户线程还在运行,jvm还存活

守护线程:比如说垃圾回收open in new window线程:没有用户线程了,只有守护线程,jvm结束

public static void main(String[] args) {
  //使用Lambda 表达式实现这个接口,创建 线程t1
  Thread t1 = new Thread(() -> {
      //判断是否是守护线程,(后台运行的)
    System.out.println(Thread.currentThread().getName() + "::" + Thread.currentThread().isDaemon());
    while (true) {
        //主线程结束,程序还在运行,jvm 没停止
    }
}, "t1");
// 如果把他设置为守护线程 ,主线程结束这个程序没有用户线程了,结束了
t1.setDaemon(false);//默认是false
//启动线程
t1.start(); System.out.println(Thread.currentThread().getName() +"结束");
}

Lock接口

介绍

Lock 实现提供比使用 synchronized 方法和语句可以获得的更广泛的锁定操作。它们允许更灵活的结构化,可能具有完全不同的属性,并且可以支持多个相关联的对象 Condition 。

当在不同范围内发生锁定和解锁时,必须注意确保在锁定时执行的所有代码由 try-finally 或 try-catch 保护,以确保在必要时释放锁定。 Lock 实现提供了使用 synchronized 方法和语句的附加功能,如非阻塞尝试获取锁 tryLock(),尝试获取可被中断的锁 lockInterruptibly() ,以及尝试获取可以超时 tryLock(long, TimeUnit)。

Lock创建可重入锁(即当前拿到了锁的线程可以再次获取锁)

// 创建可重入锁
private final ReentrantLock lock = new ReentrantLock();
try {
  //上锁
  lock.lock();
//功能操作
  ...
}finally {
  //解锁
  lock.unlock();
}

创建线程的多种方式

继承Thread类

public class CreateThread extends Thread {
  @Override
  public void run() {
    // do something
  }
  
  public static void main(String[] args) {
    new CreateThread().start();
  }
}

由于java是单继承语言,所以不建议使用这种方法

实现runnable接口

先创建runnable的接口实现对象(建议直接使用lambda表达式创建匿名内部类),然后使用Thread类创建线程

public static void main(String[] args) {
  //使用Lambda 表达式实现runnable接口,创建 线程t1
  Thread t1 = new Thread(() -> {
    // do something
  }, "t1").start();
}

使用Callable接口

使用线程池

Lock vs synchronized

  • synchronized是java关键字,内置,而lock不是内置,是一个类,可以实现同步访问且比 synchronized中的方法更加丰富

  • synchronized不会手动释放锁,而lock需手动释放锁(不解锁会出现死锁,需要在 finally 块中释放锁)

  • lock等待锁的线程会相应中断,而synchronized不会相应,只会一直等待

  • 通过 Lock 可以知道有没有成功获取锁,而 synchronized 却无法办到

  • Lock 可以提高多个线程进行读操作的效率(当多个线程竞争的时候)

线程间通信

介绍

线程间通信有两种实现方法:

关键字 synchronized 与 wait()/notify() 这两个方法一起使用可以实现等待/通知模式,wait会是当前线程执行等待,并释放锁;notify会随机唤醒一个正在等待的线程,notifyAll会唤醒所有线程,然后这些线程再次争夺锁。

Lock 接口中的 newContition() 方法返回 Condition 对象,Condition 类也可以实现等待/通知模式;使用 Condition 类可以进行选择性通知, Condition 比较常用的两个方法:

  • await() 会使当前线程等待,同时会释放锁,当其他线程调用 signal()时,线程会重新获得锁并继续执行
  • signal() 用于唤醒一个等待的线程

虚假唤醒问题

查找 JDK1.8 文档,在 Objectwait() 方法中有如下介绍

在一个参数版本中,中断和虚假唤醒是可能的,并且该方法应该始终在循环中使用

wait方法的特征是在哪里睡,唤醒后从哪里开始执行,所以如果不使用循环在唤醒后再一次判断条件,就可能出现条件不满足,却执行了相应的逻辑,从而造成虚假唤醒。

// 实现+1操作
public synchronized void incr() throws InterruptedException {
  // 操作:判断、干活、通知
  // 需要使用while循环,如果使用if就可能在多线程执行时,多个线程都执行了number++
  while (number != 0) {
    this.wait();
  }
  number++;
  System.out.print(Thread.currentThread().getName()+"::"+number);
  // 唤醒其他线程
  this.notifyAll();
}

Lock实现

public void incr() {
      // 上锁
      lock.lock();
      try {
          // 判断
          while (number != 0) {
              condition.await();
          }
          // 干活
          number++;
          System.out.print(Thread.currentThread().getName() + "::" + number);
          // 通知
          condition.signalAll();
      } catch (InterruptedException e) {
          e.printStackTrace();
      } finally {
          lock.unlock();
      }
  }

线程间定制化通信

其实就是设置标志位,每个线程等待不同的标志值,使用Lock创建不同的Condition,实现特定的线程唤醒

多个condition一般用于维护不同的线程角色,如生产者和消费者,生产者只唤醒生产者,消费者只唤醒消费者,每个condition维护了一个线程等待队列,调用await将线程加入队列尾部,signal唤醒队列首部的线程。

private Lock lock = new ReentrantLock();
// 创建三个Condition对象,为了定向唤醒线程
Condition c1 = lock.newCondition();
Condition c2 = lock.newCondition();
Condition c3 = lock.newCondition();

public void Aprint(int loop) {
      //上锁
      lock.lock();
      try{
          // 判断
          while(flag!=1) {
              c1.await();
          }
          // do something
          flag = 2; //修改标志位,定向唤醒 线程b
          // 唤醒
          c2.signal();
      } catch (InterruptedException e) {
          e.printStackTrace();
      } finally {
          // 解锁
          lock.unlock();
      }
  }

集合的线程安全

介绍

ArrayList,HashSet(底层就是HashMap),HashMap等集合结构都没有实现线程安全,所以在有多个线程编辑和读取的时候,就会发生异常。

for (int i = 0; i < 10; i++) {
          new Thread(()->{
              // 向集合中添加内容
              list.add(UUID.randomUUID().toString().substring(0,8));
              // 从集合中取出内容
              System.out.println(list);
          },String.valueOf(i)).start();
      }

以上代码中可能就会出现ConcurrentModificationException,某个线程在 Collection 上进行迭代时,通常不允许另一个线性修改该Collection。通常在这些情况下,迭代的结果是不确定的。如果检测到这种行为,一些迭代器实现(包括JRE提供的所有通用collection实现)可能选择抛出此异常。

解决方案

1.Vector(和ArrayList类似)的底层实现中加了synchronized,是线程安全的

2.使用Collections,其提供了synchronizedList(List list) 方法,可以让其维护一个列表,保证其线程安全,synchronizedList(List list) 方法返回指定列表支持的同步(线程安全的)列表

public static <T> List<T> synchronizedList(List<T> list) {
  return (list instanceof RandomAccess ?
          new SynchronizedRandomAccessList<>(list) :
          new SynchronizedList<>(list));
}

3.使用CopyOnWriteArrayList,写时复制数组(最常用),特点如下:读并发,写独立,写的时候会先拷贝,修改完后进行合并替换。实际也是加了一个锁,修改的时候需要获取锁,但是读的时候不需要,其次还有CopyOnWriteArraySet对set提供写时复制线程安全。1.8提供了ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();对map进行线程安全保证。

多线程锁

对象锁和类锁

synchronized可以修饰普通方法,也可以修饰静态方法;修饰普通方法,锁对象是类的实例;修饰静态方法,锁的对象是类本身,也就是方法区中类的Class对象。

公平锁和非公平锁

公平锁:多个线程按照申请锁的顺序去获得锁,线程会直接进入队列去排队,永远都是队列的第一位才能得到锁。

  • 优点:所有的线程都能得到资源,不会饿死在队列中。
  • 缺点:吞吐量会下降很多,队列里面除了第一个线程,其他的线程都会阻塞,cpu唤醒阻塞线程的开销会很大。

非公平锁:多个线程去获取锁的时候,会直接去尝试获取,获取不到,再去进入等待队列,如果能获取到,就直接获取到锁。

  • 优点:可以减少CPU唤醒线程的开销,整体的吞吐效率会高点,CPU也不必取唤醒所有线程,会减少唤起线程的数量。
  • 缺点:可能导致队列中间的线程一直获取不到锁或者长时间获取不到锁,导致饿死。

ReentrantLock用法: 在创建可重入锁时,想构造器中传入true,ReentrantLock 的构造器源码如下:

public ReentrantLock() {
  sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
  sync = fair ? new FairSync() : new NonfairSync();
}

可重入锁

可重入就是说某个线程已经获得某个锁,可以再次获取锁而不会出现死锁

public static void main(String[] args) {
new Thread(new Runnable() {
  @Override
  public void run() {
    synchronized (this) {
      // do something
        synchronized (this) {
          // do something
        }
    }
  }
}).start();

可重入锁有

  • synchronized
  • ReentrantLock,ReentrantLock 和 synchronized 不一样,需要手动释放锁,所以使用 ReentrantLock的时候一定要手动释放锁,并且加锁次数和释放次数要一样

可重入锁ReentrantLock基本用法

lock()

获取锁,有以下三种情况:

  • 锁空闲:直接获取锁并返回,同时设置锁持有者数量为:1;
  • 当前线程持有锁:直接获取锁并返回,同时锁持有者数量递增1;
  • 其他线程持有锁:当前线程会休眠等待,直至获取锁为止;

lockInterruptibly()

获取锁,逻辑和 lock() 方法一样,但这个方法在获取锁过程中能响应中断。

tryLock()

从关键字字面理解,这是在尝试获取锁,获取成功返回:true,获取失败返回:false, 这个方法不会等待,有以下三种情况:

  • 锁空闲:直接获取锁并返回:true,同时设置锁持有者数量为:1;
  • 当前线程持有锁:直接获取锁并返回:true,同时锁持有者数量递增1;
  • 其他线程持有锁:获取锁失败,返回:false;

4)tryLock(long timeout, TimeUnit unit)

逻辑和 tryLock() 差不多,只是这个方法是带时间的。

5)unlock()

释放锁,每次锁持有者数量递减 1,直到 0 为止。

6)newCondition

返回一个这个锁的 Condition 实例,可以实现 synchronized 关键字类似 wait/ notify 实现多线程通信的功能,不过这个比 wait/ notify 要更灵活,更强大!

死锁

两个或以上的进程因为争夺资源而造成互相等待资源的现象称为死锁

死锁产生原因:系统资源不足/系统资源分配不当/进程运行顺序不当

查看程序是否死锁

  • jps 类似于linux中的 ps -ef查看进程号
  • jstack 自带的堆栈跟踪工具,jstack + 进程号

死锁样例:

public static void main(String[] args) {
  new Thread(() -> {
    synchronized (a) {
      try {
        TimeUnit.SECONDS.sleep(1);
      } catch (Exception e) {
        e.printStackTrace();
      }
      synchronized (b) {
        // do something
      }
    }
  }, "a").start();
  new Thread(() -> {
    synchronized (b) {
      try {
        TimeUnit.SECONDS.sleep(1);
      } catch (Exception e) {
        e.printStackTrace();
      }
      synchronized (a) {
        // do something
      }
    }
  }, "b").start();
}

Callable接口

通过Thread和runnable接口都无法使线程返回结果,需要用到Callable

比较Runnable接口和Callable接口

Callable中的call()计算结果,如果无法计算结果,会抛出异常

Runnable中的run():使用实现接口Runnable的对象创建一个线程时,启动该线程将导致在独立执行的线程中调用该对象的run方法,没有返回值

Thread构造函数中无法传入Callable类型,所以无法通过new Thread创建线程,需要用到FutureTask类(是runnable实现类)

public static void main(String[] args) throws ExecutionException, InterruptedException {
    new Thread(()->{
      // do something
    }).start();
    FutureTask<String> task = new FutureTask<>(() -> {
        // do somrthing
        return "Callable接口返回值";
    });
    new Thread(task).start();
    System.out.println("Callable返回值:" + task.get());
}

JUC辅助类

减少计数CountDownLatch

CountDownLatch 类可以设置一个计数器,然后通过 countDown 方法来进行减 1 的操作,使用 await 方法等待计数器不大于 0,然后继续执行 await 方法之后的语句。

public static void main(String[] args) throws InterruptedException {
    // 创建CountDown对象并设置初始值
    CountDownLatch countDownLatch = new CountDownLatch(6);
    // 创建六个线程,模拟六个学生
    for (int i = 1; i <= 6; i++) {
        new Thread(()->{
          // do something
            // 计数 -1
            countDownLatch.countDown();
        },String.valueOf(i)).start();
    }
    // 等待,直到达到零
    countDownLatch.await();
    // done
}

循环栅栏CyclicBarrier

该类允许一组线程互相等待,直到到达某个公共屏障点,具体使用如下:

public static void main(String[] args) {
    // 每次执行 CyclicBarrier 一次障碍数会加一,如果达到了目标障碍数,才会执行 cyclicBarrier.await()之后的语句。
  CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
          // do something
      });
      for (int i = 1; i <= 7; i++) {
          new Thread(()->{
              // do something
              try {
                  // 计数 +1
                  cyclicBarrier.await();
              } catch (Exception e) {
                  e.printStackTrace();
              }
          },String.valueOf(i)).start();
      }
  }
}

信号灯Semaphore

一个计数信号量,从概念上将,信号量维护了一个许可集,如有必要,在许可可用前会阻塞每一个acquire(),然后在获取该许可。每个release()添加一个许可,从而可能释放一个正在阻塞的获取者。实例如下:

public static void main(String[] args) {
    //创建Semaphore,设置许可数量
    Semaphore semaphore = new Semaphore(3);
    for (int i = 1; i <= 6; i++) {
        new Thread(()->{
            try {
                // 抢占
                semaphore.acquire();
                // do something
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //释放
                semaphore.release();
            }
        },String.valueOf(i)).start();
    }
}

读写锁

悲观锁和乐观锁

乐观锁:乐观锁在操作数据时非常乐观,认为别人不会同时修改数据。因此乐观锁不会上锁,只是在执行更新的时候判断一下在此期间别人是否修改了数据:如果别人修改了数据则放弃操作,否则执行操作。

悲观锁:悲观锁在操作数据时比较悲观,认为别人会同时修改数据。因此操作数据时直接把数据锁住,直到操作完成后才会释放锁;上锁期间其他人不能修改数据。

悲观锁的实现方式是加锁,加锁既可以是对代码块加锁(如Java的synchronized关键字),也可以是对数据加锁(如MySQL中的排它锁)。

乐观锁的实现方式主要有两种:CAS(compare and swap)机制和版本号机制

CAS包含了Compare和Swap两个操作,它又如何保证原子性呢? 答案是:CAS是由CPU支持的原子操作,其原子性是在硬件层面进行保证的。CAS存在ABA问题,别人改了后又改了回去,你不知道

读写锁介绍

一个资源可以被多个读线程访问,也可以被一个写线程访问,但不能同时存在读写线程,读写互斥,读读共享。读锁-共享锁,写锁-独占锁

创建读写锁对象 private ReadWriteLock rwLock = new ReentrantReadWriteLock(); 写锁 加锁 rwLock.writeLock().lock();,解锁为rwLock.writeLock().unlock(); 读锁 加锁 rwLock.readLock().lock();,解锁为rwLock.readLock().unlock();

写锁降级

如果是同一个线程,则可以先获取写锁,在获取读锁,这不会阻塞,这就是写锁降级,但是不能先获取了读锁,然后再拿写锁,这样会阻塞,拿不到写锁。

这是针对单个线程,如果是不同的线程,那么读写锁都是互斥的。不可能线程拿到了读锁,另一个线程拿到写锁。

用处:

如果只使用写锁,那么释放写锁之后,其他线程就会获取到写锁或读锁,使用锁降级可以在释放写锁前获取读锁,这样其他的线程就只能获取读锁,对这个数据进行读取,但是不能获取写锁进行修改,只有当前线程释放了读锁之后才可以进行修改。相对于只使用写锁,锁降级可以减少其他读线程的阻塞。因为在他释放读锁之前,别的线程可以读,但是那些写的线程就需要等待。

阻塞队列

介绍

阻塞队列是共享队列(多线程操作),一端输入,一端输出,不能无限放队列,满了之后就会进入阻塞,取出也同理

当队列是空的,从队列中获取元素的操作将会被阻塞 当队列是满的,从队列中添加元素的操作将会被阻塞 试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素 试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增

BlockQueue是一个接口,它的父接口有: Collection, Iterable, Queue;它的子接口有:BlockingDeque , TransferQueue ;它的实现类有:ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque,LinkedBlockingQueue,LinkedTransferQueue,PriorityBlockingQueue,SynchronousQueue

分类

ArrayBlockingQueue(常用)

基于数组的阻塞队列,由数组结构组成的有界阻塞队列

ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个锁对象,无法并行

LinkedBlockingQueue(常用)

基于链表的阻塞队列,由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列

能够高效的处理并发数据,因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能

DelayQueue

使用优先级队列实现的延迟无界阻塞队列

DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。

DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞

PriorityBlockingQueue

基于优先级的阻塞队列,支持优先级排序的无界阻塞队列

不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者

SynchronousQueue

一种无缓冲的等待队列,相对于有缓冲的 BlockingQueue 来说,少了一个中间经销商的环节(缓冲区)

不存储元素的阻塞队列,也即单个元素的队列,其有两种不同的处理模式:公平模式和非公平模式。默认的非公平模式可能会使得有些生产者或是消费者的数据永远都得不到处理

不写了不写了,也用不到

核心API

检查实际就是返回队列第一个元素

方法类型抛出异常特殊值阻塞超时
插入add(e)offer()put(e)offer(e,time,unit)
移除remove()poll()take()poll(time,unit)
检查element()peek()--
  • 抛出异常:当阻塞队列满时,再往队列里add插入元素会抛出IllegalStateException:Queue full;当阻塞队列空时,再往队列里remove移除元素会抛出NoSuchElementException
  • 特殊值:插入方法,成功true,失败false;移除方法,成功返回出队列的元素,列表里没有就返回null
  • 阻塞:当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产者线程知道put数据或响应中断退出;当阻塞队列空时,消费者线程视图从队列里take元素,队列会一直阻塞消费者线程直到队列可用
  • 超时:当阻塞队列满时,队列会阻塞生产者线程一定时间,超过限时后生产者线程会退出

线程池

介绍

线程池(英语:thread pool)一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度

线程池的优势: 线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超过数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

线程池的特点:

  • 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
  • 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行。
  • 提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

使用

Executors.newFixedThreadPool(int):一池N线程ExecutorService threadPool1 = Executors.newFixedThreadPool(5); //5个窗口

Executors.newSingleThreadExecutor():一池一线程ExecutorService threadPool2 = Executors.newSingleThreadExecutor(); //一个窗口

Executors.newCachedThreadPool():一池可扩容根据需求创建线程ExecutorService threadPool3 = Executors.newCachedThreadPool();

执行线程:execute(),关闭线程:shutdown()

void execute(Runnable command);参数为Runnable接口类

原理

以上线程池的创建都依赖于new ThreadPoolExecutor()

他的构造函数有七个参数可以配置

int corePoolSize,常驻线程数量(核心)

int maximumPoolSize,最大线程数量

long keepAliveTime,TimeUnit unit,线程存活时间

BlockingQueue workQueue,阻塞队列(排队的线程放入)

ThreadFactory threadFactory,线程工厂,用于创建线程

RejectedExecutionHandler handler,拒绝测试(线程满了)

截屏2024-03-09 20.27.23

总结来说:先到常驻线程,满了之后再到阻塞队列进行等待,阻塞队列满了之后,在往外扩容线程,扩容线程不能大于最大线程数。大于最大线程数和阻塞队列之和后,会执行拒绝策略。

具体的拒绝策略有:

  • AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
  • CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量
  • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中,尝试再次提交当前任务
  • Policydiscard:该策略默默地丢弃无法处理的任务,不予任何处理也不抱出异常。如果允许任务丢失,这是最好的一种策略

自定义线程池

实际在开发中不允许使用Executors创建,而是通过ThreadPoolExecutor的方式,规避资源耗尽风险

说明:Executors 返回的线程池对象的弊端如下:

  1. FixedThreadPool 和 SingleThreadPool:允许的请求队列(阻塞队列)长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
  2. CachedThreadPool 和 ScheduledThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
ExecutorService threadPool = new ThreadPoolExecutor(
  // 常驻核心线程
      2,
  // 最大线程数量
      5,
  // 线程存活时间
      2L,
      TimeUnit.SECONDS,
  // 阻塞队列
      new ArrayBlockingQueue<>(3),
  // 线程工厂
      Executors.defaultThreadFactory(),
  // 拒绝策略
      new ThreadPoolExecutor.AbortPolicy()
);

Fork与Join分支

将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果。

ForkJoinTask:我们要使用 Fork/Join 框架,首先需要创建一个 ForkJoin 任务。该类提供了在任务中执行 fork 和 join 的机制。通常情况下我们不需要直接集成 ForkJoinTask 类,只需要继承它的子类,Fork/Join 框架提供了两个子类:

  • RecursiveAction:用于没有返回结果的任务

  • RecursiveTask:用于有返回结果的任务

ForkJoinPool:ForkJoinTask 需要通过 ForkJoinPool 来执行

RecursiveTask:继承后可以实现递归(自己调自己)调用的任务,创建分支合并对象 通过该对象调用内部方法

使用实例如下:

class MyTask extends RecursiveTask<Integer> {

  // 拆分差值不能超过10
  private static final Integer VALUE  = 10;
  private int begin;
  private int end;
  private int result;

  public MyTask(int begin, int end){
      this.begin = begin;
      this.end = end;
  }

  @Override
  protected Integer compute() {
      // 判断两个值的差值是否大于10
      if((end-begin)<=10) {
          // 相加操作
          for (int i = begin; i <= end ; i++) {
              result = result + i;
          }
      } else {
          // 大于10 继续拆分
          int middle = (begin + end) / 2;
          // 拆分左边
          MyTask task01 = new MyTask(begin, middle);
          // 拆分右边
          MyTask task02 = new MyTask(middle + 1, end);
          // 调用方法拆分
          task01.fork();
          task02.fork();
        // or invokeAll(task01, task02);
          // 合并结果
          result = task01.join() + task02.join();
      }
      return result;
  }
}

public class ForkJoinTest {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
      // 创建MyTask对象
      MyTask task = new MyTask(0, 100);
      // 创建分支合并池对象
      ForkJoinPool forkJoinPool = new ForkJoinPool();
      ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(task);
      // 获取最终合并之后结果
      Integer reslut = forkJoinTask.get();
      System.out.println(reslut);
  }
}
  1. 对一个任务调用join方法会阻塞调用方,直到该任务运行完成。因此,需要在两个子任务的计算都开始之后再调用它。否则,使用该框架计算会比原始的顺序算法更慢更复杂,因为每个子任务都必须等待另一个子任务完成才能启动。
  2. ForkJoin是通过多线程的方式进行处理任务,因此当数据量不是特别大的时候,没有必要使用ForkJoin。因为多线程会涉及到上下文的切换,当数据量不大的时候使用串行会比使用多线程快。

异步回调

CompletableFuture

CompletableFuture 在 Java 里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息

  • 异步调用没有返回值方法runAsync
  • 异步调用有返回值方法supplyAsync
public class CompletableFutureTest {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
      // 异步调用没有返回值
      CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
          // do something;
      });
      completableFuture.get();
      // 异步调用
      // mq消息队列
      CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(()->{
          // do something
          // 模拟异常
          int i = 10/0;
          return 1024;
      });
      // 完成之后调用
      completableFuture1.whenComplete((t,u)->{
          System.out.println("-----t:"+t);    // 方法的返回值
          System.out.println("-----u:"+u);    // 异常的返回信息
      }).get();
  }
}

Future vs CompletableFuture

Future在Java5就引入了。

优点:一定程度上让一个线程池内的任务异步执行了 缺点:传统回调最大的问题就是不能将控制流分离到不同的事件处理器中。例如主线程等待各个异步执行的线程返回的结果来做下一步操作,则必须阻塞在future.get()的地方等待结果返回。这时候又变成同步了。

CompletableFuture在Java8引入。

实现了Future和CompletionStage接口,保留了Future的优点,并且弥补了其不足。即异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过thenAccept、thenApply、thenCompose等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理。可见,这种方式才是我们需要的异步处理。一个控制流的多个异步事件处理能无缝的连接在一起。

Last Updated:
Contributors: liushun-ing