Java 多线程和并发

进程和线程

进程是程序在某个数据集合上的一次运行活动,也是操作系统进行资源分配和保护的基本单位。

线程是进程的一个执行单元,是进程内的调度实体、比进程更小的独立运行的基本单位。线程也被称为轻量级进程。

进程和线程的区别:

  • 进程是操作系统资源分配的基本单位,而线程是处理器任务调度和执行的基本单位

  • 资源开销:每个进程都有独立的代码和数据空间(程序上下文),程序之间的切换会有较大的开销;线程可以看做轻量级的进程,同一类线程共享代码和数据空间,每个线程都有自己独立的运行栈和程序计数器(PC),线程之间切换的开销小

  • 包含关系:如果一个进程内有多个线程,则执行过程不是一条线的,而是多条线(线程)共同完成的;线程是进程的一部分,所以线程也被称为轻权进程或者轻量级进程

  • 内存分配:同一进程的线程共享本进程的地址空间和资源,而进程之间的地址空间和资源是相互独立的

  • 影响关系:一个进程崩溃后,在保护模式下不会对其他进程产生影响,但是一个线程崩溃整个进程都死掉。所以多进程要比多线程健壮

  • 执行过程:每个独立的进程有程序运行的入口、顺序执行序列和程序出口。但是线程不能独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制,两者均可并发执行

类和接口(后续更新)

Thread

public class Demo {
    public static class MyThread extends Thread {
        @Override
        public void run() {
            System.out.println("MyThread");
        }
    }
    public static void main(String[] args) {
        Thread myThread = new MyThread();
        myThread.start();
    }
}

Runnable

public class Demo {
    public static class MyThread implements Runnable {
        @Override
        public void run() {
            System.out.println("MyThread");
        }
    }
    public static void main(String[] args) {
        new MyThread().start();
        // Java 8 函数式编程,可以省略MyThread类
        new Thread(() -> {
            System.out.println("Java 8 匿名内部类");
        }).start();
    }
}

Callable

// 自定义 Callable
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        // 模拟计算需要一秒
        Thread.sleep(1000);
        return 2;
    }
    public static void main(String args[]){
        // 使用
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        // 注意调用 get 方法会阻塞当前线程,直到得到结果
        // 所以实际编码中建议使用可以设置超时时间的重载 get 方法
        System.out.println(result.get());
    }
}

Output:

2

Future & FutureTask

Future 接口是用来获取异步计算结果的,说白了就是对具体的 Runnable 或者 Callable 对象任务执行的结果进行获取(get)、取消(cancel)、判断是否完成等操作。

在 Future 接口中声明了 5 个方法:

  • cancel(): 用来取消任务,如果取消任务成功则返回 true,如果取消任务失败则返回 false
  • isCancelled(): 表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true
  • isDone(): 表示任务是否已经完成,若任务完成,则返回 true
  • get(): 获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回
  • get(long timeout, TimeUnit unit): 获取执行结果,如果在指定时间内,还没获取到结果,就直接返回 null

也就是说 Future 提供了三种功能:
1)判断任务是否完成;
2)能够中断任务;
3)能够获取任务执行结果。

因为 Future 只是一个接口,所以是无法直接用来创建对象使用的,因此就有了 FutureTask。

案例:获取执行结果

1、使用 Callable + Future 获取执行结果

public class Main {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        executor.shutdown();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        System.out.println("主线程在执行任务");

        try {
            System.out.println("task 运行结果: " + result.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("所有任务执行完毕");
    }

    static class Task implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            System.out.println("子线程在进行计算");
            Thread.sleep(3000);
            int sum = 0;
            for (int i = 0; i < 100; i++)
                sum += i;
            return sum;
        }
    }
}

Output:

子线程在进行计算
主线程在执行任务
task运行结果4950
所有任务执行完毕

2、使用 Callable + FutureTask 获取执行结果

public class Main {
    public static void main(String[] args) {
        // 第一种方式
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<>(task);
        executor.submit(futureTask);
        executor.shutdown();

        // 第二种方式,和第一种方式效果是类似的
//        Task task = new Task();
//        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
//        Thread thread = new Thread(futureTask);
//        thread.start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        System.out.println("主线程在执行任务");

        try {
            System.out.println("task 运行结果: " + futureTask.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("所有任务执行完毕");
    }

    static class Task implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            System.out.println("子线程在进行计算");
            Thread.sleep(3000);
            int sum = 0;
            for (int i = 0; i < 100; i++)
                sum += i;
            return sum;
        }
    }
}

线程组和线程优先级

Thread testThread = new Thread(() -> {
    System.out.println("testThread当前线程组名字:" + Thread.currentThread().getThreadGroup().getName());
    System.out.println("testThread线程名字:" + Thread.currentThread().getName());
});
testThread.start();
System.out.println("执行main方法的线程名字:" + Thread.currentThread().getName());

Output:

执行main方法线程名字: main
testThread当前线程组名字: main
testThread线程名字: Thread-0

Thread a = new Thread();
System.out.println("我是默认线程优先级:" + a.getPriority());
Thread b = new Thread();
b.setPriority(10);
System.out.println("我是设置过的线程优先级:" + b.getPriority());

Output:

我是默认线程优先级:5
我是设置过的线程优先级:10

Java 中的优先级来说不是特别的可靠,Java 程序中对线程所设置的优先级只是给操作系统一个建议,操作系统不一定会采纳。而真正的调用顺序,是由操作系统的线程调度算法决定的。

线程的状态和转化

操作系统线程主要有以下三种状态:

  • 就绪(ready):线程正在等待使用 CPU,经调度程序调用之后可进入 running 状态
  • 执行(running):线程正在使用 CPU
  • 等待(waiting):线程经过等待事件的调用或者正在等待其他资源(如 I/O)

Java 线程(Thread.State)的六个状态:

public enum State {
    NEW, // 尚未启动,还没有调用 start() 方法
    RUNNABLE, // 当前线程正在运行中
    BLOCKED, // 阻塞状态。处于 BLOCKED 状态的线程正等待锁的释放以进入同步区
    WAITING, // 等待状态
    TIMED_WAITING, // 超时等待状态
    TERMINATED; // 终止状态
}

线程状态转换图:

image-20221025172234951

Java 线程间的通信

Java 中锁的概念都是基于对象的,所以我们又经常称它为对象锁。一个锁同一时间只能被一个线程持有。也就是说,一个锁如果被一个线程持有,那其他线程如果需要得到这个锁,就得等这个线程释放这个锁。

锁和同步 synchronized(Object obj);

public class ObjectLock {
    private static Object lock = new Object();
    static class ThreadA implements Runnable {
        @Override
        public void run() {
            synchronized(lock) {
                for (int i = 0; i < 100; i++) {
                    System.out.println("Thread A " + i);
                }
            }
        }
    }
    static class ThreadB implements Runnable {
        @Override
        public void run() {
            synchronized(lock) {
                for (int i = 0; i < 100; i++) {
                    System.out.println("Thread B " + i);
                }
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        new Thread(new ThreadA()).start();
        Thread.sleep(10);
        new Thread(new ThreadB()).start();
    }
}

等待/通知 wait(), notify(), notifyAll()

public class WaitAndNotify {
    private static Object lock = new Object();
    static class ThreadA implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                for (int i = 0; i < 5; i++) {
                    try {
                        System.out.println("ThreadA: " + i);
                        lock.notify();
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                lock.notify();
            }
        }
    }
    static class ThreadB implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                for (int i = 0; i < 5; i++) {
                    try {
                        System.out.println("ThreadB: " + i);
                        lock.notify();
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                lock.notify();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        new Thread(new ThreadA()).start();
        Thread.sleep(1000);
        new Thread(new ThreadB()).start();
    }
}

Output:

ThreadA: 0
ThreadB: 0
ThreadA: 1
ThreadB: 1
ThreadA: 2
ThreadB: 2
ThreadA: 3
ThreadB: 3
ThreadA: 4
ThreadB: 4

信号量 volatile

public class Signal {
    private static volatile int signal = 0;
    static class ThreadA implements Runnable {
        @Override
        public void run() {
            while (signal < 5) {
                if (signal % 2 == 0) {
                    System.out.println("threadA: " + signal);
                    synchronized (this) {
                        signal++;
                    }
                }
            }
        }
    }
    static class ThreadB implements Runnable {
        @Override
        public void run() {
            while (signal < 5) {
                if (signal % 2 == 1) {
                    System.out.println("threadB: " + signal);
                    synchronized (this) {
                        signal = signal + 1;
                    }
                }
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        new Thread(new ThreadA()).start();
        Thread.sleep(1000);
        new Thread(new ThreadB()).start();
    }
}

Output:

threadA: 0
threadB: 1
threadA: 2
threadB: 3
threadA: 4

管道通信 PipedReader, PipedWriter

public class Pipe {
    static class ReaderThread implements Runnable {
        private PipedReader reader;
        public ReaderThread(PipedReader reader) {
            this.reader = reader;
        }
        @Override
        public void run() {
            System.out.println("this is reader");
            int receive = 0;
            try {
                while ((receive = reader.read()) != -1) {
                    System.out.print((char)receive);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    static class WriterThread implements Runnable {
        private PipedWriter writer;
        public WriterThread(PipedWriter writer) {
            this.writer = writer;
        }
        @Override
        public void run() {
            System.out.println("this is writer");
            int receive = 0;
            try {
                writer.write("test");
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    writer.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) throws IOException, InterruptedException {
        PipedWriter writer = new PipedWriter();
        PipedReader reader = new PipedReader();
        writer.connect(reader); // 这里注意一定要连接,才能通信
        new Thread(new ReaderThread(reader)).start();
        Thread.sleep(1000);
        new Thread(new WriterThread(writer)).start();
    }
}

Output:

this is reader
this is writer
test

其他通信方法

  1. join()

    public class Join {
        static class ThreadA implements Runnable {
            @Override
            public void run() {
                try {
                    System.out.println("我是子线程,我先睡一秒");
                    Thread.sleep(1000);
                    System.out.println("我是子线程,我睡完了一秒");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        public static void main(String[] args) throws InterruptedException {
            Thread thread = new Thread(new ThreadA());
            thread.start();
            thread.join();
            System.out.println("如果不加 join 方法,我会先被打出来,加了就不一样了");
        }
    }
  2. sleep()

    wait() 与 sleep() 的区别:

    • sleep 是不会释放当前的锁的,而 wait 会

    • wait 可以指定时间,也可以不指定;sleep 必须指定时间

    • wait 释放 CPU 资源,同时释放锁;sleep 释放 CPU 资源,但是不释放锁,所以易死锁

    • wait 必须放在同步块或同步方法中,而 sleep 可以在任意位置

ThreadLocal 类

ThreadLocal<String> threadLocal;
threadLocal.set("A");
threadLocal.get();

指令重排序

重排序

计算机在执行程序时,为了提高性能,编译器和处理器常常会对指令做重排。

  • 编译器优化重排
  • 指令并行重排
  • 内存系统重排

指令重排可以保证串行语义一致,但是没有义务保证多线程间的语义也一致。

顺序一致性模型

  • 一个线程中的所有操作必须按照程序的顺序(即 Java 代码的顺序)来执行
  • 不管程序是否同步,所有线程都只能看到一个单一的操作执行顺序。即在顺序一致性模型中,每个操作必须是原子性的,且立刻对所有线程可见

Java 内存模型(JMM)

JMM 是一种规范,目的是解决由于多线程通过共享内存进行通信时,存在的本地内存数据不一致、编译器会对代码指令重排序、处理器会对代码乱序执行等带来的问题。

Java 内存模型(JMM)的效果:

  • 同步程序:在不改变(正确同步的)程序执行结果的前提下,尽量为编译器和处理器的优化打开方便之门
  • 未同步程序:JMM 没有保证未同步程序的执行结果与该程序在顺序一致性中执行结果一致。因为如果要保证执行结果一致,那么 JMM 需要禁止大量的优化,对程序的执行性能会产生很大的影响

happens-before 规则

一方面,程序员需要 JMM 提供一个强的内存模型来编写代码;另一方面,编译器和处理器希望 JMM 对它们的束缚越少越好,这样它们就可以最可能多的做优化来提高性能,希望的是一个弱的内存模型。

happens-before关系的定义如下:

  1. 如果一个操作 happens-before 另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前
  2. 两个操作之间存在 happens-before 关系,并不意味着 Java 平台的具体实现必须要按照 happens-before 关系指定的顺序来执行。如果重排序之后的执行结果与按 happens-before 关系来执行的结果一致,那么 JMM 也允许这样的重排序

总之,如果操作A happens-before 操作B,那么操作A 在内存上所做的操作对操作B 都是可见的,不管它们在不在一个线程。

volatile

在 Java 中,volatile 主要有以下两个功能:

  • 保证变量的内存可见性
  • 禁止 volatile 变量与普通变量重排序(JSR133 提出,Java 5 开始才有这个“增强的 volatile 内存语义”)
public class VolatileExample {
    int a = 0;
    volatile boolean flag = false;
    public void writer() {
        a = 1; // step 1
        flag = true; // step 2
    }
    public void reader() {
        if (flag) { // step 3
            System.out.println(a); // step 4
        }
    }
}

当一个线程对 volatile 修饰的变量进行写操作(step 2)时,JMM 会立即把该线程对应的本地内存中的共享变量的值刷新到主内存;当一个线程对 volatile 修饰的变量进行读操作(step 3)时,JMM 会把立即该线程对应的本地内存置为无效,从主内存中读取共享变量的值。

内存屏障

内存屏障有两个作用:

  1. 阻止屏障两侧的指令重排序
  2. 强制把写缓冲区/高速缓存中的脏数据等写回主内存,或者让缓存中相应的数据失效

编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。编译器选择了一个比较保守的 JMM 内存屏障插入策略,这样可以保证在任何处理器平台,任何程序中都能得到正确的 volatile 内存语义。这个策略是:

  • 在每个 volatile 写操作前插入一个 StoreStore 屏障
  • 在每个 volatile 写操作后插入一个 StoreLoad 屏障
  • 在每个 volatile 读操作后插入一个 LoadLoad 屏障
  • 在每个 volatile 读操作后再插入一个 LoadStore 屏障
image-20221115110714037

在保证内存可见性这一点上,volatile 有着与锁相同的内存语义,所以可以作为一个“轻量级”的锁来使用。但由于 volatile 仅仅保证对单个 volatile 变量的读/写具有原子性,而锁可以保证整个临界区代码的执行具有原子性。所以在功能上,锁比 volatile 更强大;在性能上,volatile 更有优势。

synchronized

我们通常使用 synchronized 关键字来给一段代码或一个方法上锁。它通常有以下三种形式:

/* 关键字在实例方法上,锁为当前实例 */
public synchronized void instanceLock() {
    // code
}
/* 关键字在静态方法上,锁为当前 Class 对象 */
public static synchronized void classLock() {
    // code
}
/* 关键字在代码块上,锁为括号里面的对象 */
public void blockLock() {
    Object o = new Object();
    synchronized (o) {
        // code
    }
}

Java 中的锁

Java 多线程的锁都是基于对象的,Java 中的每一个对象都可以作为一个锁。

不同级别的锁

Java 6 及其以后,一个对象有四种锁状态,它们由低到高依次是:

  • 无锁状态
  • 偏向锁状态
  • 轻量级锁状态
  • 重量级锁状态
优点 缺点 适用场景
偏向锁 加锁和解锁不需要额外的消耗,和执行非同步方法比仅存在纳秒级的差距 如果线程间存在锁竞争,会带来额外的锁撤销的消耗 只有一个线程访问同步块
轻量级锁 竞争的线程不会阻塞,提高了程序的响应速度 如果始终得不到锁,竞争的线程使用自旋会消耗 CPU 追求响应时间,同步块执行速度非常快
重量级锁 线程竞争不使用自旋,不会消耗 CPU 线程阻塞,响应时间缓慢 追求吞吐量,同步块执行速度较长

乐观锁与悲观锁

悲观锁就是我们常说的锁。对于悲观锁来说,它总是认为每次访问共享资源时会发生冲突,所以必须对每次数据操作加上锁,以保证临界区的程序同一时间只能有一个线程在执行。

乐观锁又称为“无锁”。乐观锁总是假设对共享资源的访问没有冲突,线程可以不停地执行,无需加锁也无需等待。而一旦多个线程发生冲突,乐观锁通常是使用一种称为 CAS 的技术来保证线程执行的安全性。
由于无锁操作中没有锁的存在,因此不可能出现死锁的情况,也就是说乐观锁天生免疫死锁。乐观锁多用于“读多写少”的环境,避免频繁加锁影响性能;而悲观锁多用于“写多读少”的环境,避免频繁失败和重试影响性能。

CAS (Compare And Swap) 比较并交换

在 CAS 中,有这样三个值:

  • V: 要更新的变量 var
  • E: 预期值 expected
  • N: 新值 new

判断 V 是否等于 E,如果等于,将 V 的值设置为 N;如果不等,说明已经有其它线程更新了 V,则当前线程放弃更新,什么都不做。预期值 E 本质上指的是“旧值”。

CAS 是一种原子操作,它是一种系统原语,是一条 CPU 的原子指令。当多个线程同时使用 CAS 操作一个变量时,只有一个会胜出,并成功更新,其余均会失败,但失败的线程并不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。

AQS - 抽象队列同步器

AQS (AbstractQueuedSynchronizer) 是一个用来构建锁和同步器的框架,使用 AQS 能简单且高效地构造出应用广泛的同步器。

相关链接:

线程池原理

使用线程池主要有以下三个原因:

  1. 创建/销毁线程需要消耗系统资源,线程池可以复用已创建的线程
  2. 控制并发的数量。并发数量过多,可能会导致资源消耗过多,从而造成服务器崩溃(主要原因)
  3. 可以对线程做统一管理

ThreadPoolExecutor 构造函数的参数:

  • int corePoolSize - 该线程池中核心线程数最大值

    核心线程:线程池中有两类线程,核心线程和非核心线程。核心线程默认情况下会一直存在于线程池中,即使这个核心线程什么都不干,而非核心线程如果长时间的闲置,就会被销毁。

  • int maximumPoolSize - 线程池中线程总数最大值

    该值等于核心线程数量 + 非核心线程数量

  • long keepAliveTime - 非核心线程闲置超时时长

    非核心线程如果处于闲置状态超过该值,就会被销毁。如果设置 allowCoreThreadTimeOut(true),则会也作用于核心线程。

  • TimeUnit unit - keepAliveTime 的单位

  • BlockingQueue workQueue - 阻塞队列,维护着等待执行的 Runnable 任务对象

  • (非必须)ThreadFactory threadFactory - 创建线程的工厂

    用于批量创建线程,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等。如果不指定,会新建一个默认的线程工厂。

  • (非必须)RejectedExecutionHandler handler - 拒绝处理策略

    线程数量大于最大线程数就会采用拒绝处理策略,四种拒绝处理的策略为:
    1)ThreadPoolExecutor.AbortPolicy: 默认拒绝处理策略,丢弃任务并抛出 RejectedExecutionException 异常
    2)ThreadPoolExecutor.DiscardPolicy: 丢弃新来的任务,但是不抛出异常
    3)ThreadPoolExecutor.DiscardOldestPolicy: 丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)
    4)ThreadPoolExecutor.CallerRunsPolicy: 由调用线程处理该任务

线程池的几种状态(RUNNING、SHURDOWN、STOP、TIDYING 、TERMINATED):

  • 线程池创建后处于 RUNNING 状态.

  • 调用 shutdown() 方法后处于 SHUTDOWN 状态,线程池不能接受新的任务,清除一些空闲 worker,会等待阻塞队列的任务完成.

  • 调用 shutdownNow() 方法后处于 STOP 状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃。此时,poolsize=0,阻塞队列的 size 也为 0.

  • 当所有的任务已终止,ctl 记录的“任务数量”为 0,线程池会变为 TIDYING 状态。接着会执行 terminated() 函数.

    ThreadPoolExecutor 中有一个控制状态的属性叫 ctl,它是一个 Atomiclnteger 类型的变量。

  • 线程池处在 TIDYING 状态时,执行完 terminated() 方法之后,就会由 TIDYING->TERMINATED,线程池被设置为 TERMINATED 状态.

线程池主要的任务处理流程

image-20221115202904762

四种常见的线程池

(1) newFixedThreadPool - 核心线程数量和总线程数量相等

public static ExecutorService newFixedThreadPool(int nThreads) {
    // 使用无界队列 LinkedBlockingQueue 作为线程池的工作队列(队列的容量为 Integer.MAX_VALUE)
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

(2) newSingleThreadExecutor - 有且仅有一个核心线程

public static ExecutorService newSingleThreadExecutor() {
    // 使用无界队列 LinkedBlockingQueue 作为线程池的工作队列
    return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(
            1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

(3) newCachedThreadPool

当需要执行很多短时间的任务时,CacheThreadPool 的线程复用率比较高,会显著的提高性能。而且线程 60s 后会回收,意味着即使没有任务进来,并不会占用很多资源

public static ExecutorService newCachedThreadPool() {
    // 使用没有容量的 SynchronousQueue 作为线程池的工作队列
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

(4) newScheduledThreadPool - 创建一个定长线程池,支持定时及周期性任务执行

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

阻塞队列(BlockingQueue)

阻塞队列提供了四组不同的方法用于插入、移除、检查元素:

处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() - -

注意事项:

  • 不能往阻塞队列中插入 null,会抛出空指针异常
  • 可以访问阻塞队列中的任意元素,调用 remove(o) 可以将队列之中的特定对象移除,但并不高效,尽量避免使用

生产者-消费者模式

public class Producer_Consumer {
    public static void main(String[] args) {
        int queueSize = 10;
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(queueSize);
        // Producer
        new Thread(() -> {
            while (true) {
                try {
                    queue.put(1);
                    System.out.println("向队列中插入一个元素,队列剩余空间:" + queue.remainingCapacity());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        // Consumer
        new Thread(() -> {
            while (true) {
                try {
                    queue.take();
                    System.out.println("从队列取走一个元素,队列剩余 " + queue.size() + " 个元素");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

Output:

从队列取走一个元素,队列剩余 0 个元素
从队列取走一个元素,队列剩余 0 个元素
向队列取中插入一个元素,队列剩余空间:9
向队列取中插入一个元素,队列剩余空间:9
向队列取中插入一个元素,队列剩余空间:9
向队列取中插入一个元素,队列剩余空间:8
向队列取中插入一个元素,队列剩余空间:7
向队列取中插入一个元素,队列剩余空间:6
向队列取中插入一个元素,队列剩余空间:5
向队列取中插入一个元素,队列剩余空间:4
向队列取中插入一个元素,队列剩余空间:3
向队列取中插入一个元素,队列剩余空间:2
向队列取中插入一个元素,队列剩余空间:1
向队列取中插入一个元素,队列剩余空间:0
从队列取走一个元素,队列剩余 1 个元素
从队列取走一个元素,队列剩余 9 个元素

锁的接口和类

synchronized 的不足之处:

  • 如果临界区是只读操作,其实可以多线程一起执行,但使用 synchronized 的话,同一时间只能有一个线程执行
  • synchronized 无法知道线程有没有成功获取到锁
  • 如果临界区因为 I/O 或者 sleep() 方法等原因阻塞了,而当前线程又没有释放锁,就会导致所有线程等待

Condition

Condition 接口提供了类似 Object 监视器的方法,通过与 Lock 配合来实现等待/通知模式。

方法名称 描述
await() 当前线程进入等待状态直到被通知(signal)或者中断;当前线程进入运行状态并从 await() 方法返回的场景包括:(1) 其他线程调用相同 Condition 对象的 signal/signalAll 方法,并且当前线程被唤醒;(2) 其他线程调用 interrupt 方法中断当前线程
awaitUninterruptibly() 当前线程进入等待状态直到被通知,在此过程中对中断信号不敏感,不支持中断当前线程
awaitNanos(long) 当前线程进入等待状态,直到被通知、中断或者超时。如果返回值小于等于 0,可以认定就是超时了
awaitUntil(Date) 当前线程进入等待状态,直到被通知、中断或者超时。如果没到指定时间被通知,则返回 true,否则返回 false
signal() 唤醒一个等待在 Condition 上的线程,被唤醒的线程在方法返回前必须获得与 Condition 对象关联的锁
signalAll() 唤醒所有等待在 Condition 上的线程,能够从 await() 等方法返回的线程必须先获得与 Condition 对象关联的锁

ReentrantLock

ReentrantLock 是一个非抽象类,它是 Lock 接口的 JDK 默认实现,实现了锁的基本功能。从名字上看,它是一个“可重入”锁,从源码上看,它内部有一个抽象类 Sync,是继承了 AQS、自己实现的一个同步器。同时,ReentrantLock 内部有两个非抽象类 NonfairSync 和 FairSync,它们都继承了 Sync。从名字上看得出分别是“非公平同步器”和“公平同步器”的意思。这意味着 ReentrantLock 可以支持“公平锁”和“非公平锁”。

通过两个同步器的源码可以发现,它们的实现都是“独占”的,都调用了 AOS 的 setExclusive0wnerThread 方法,所以 ReentrantLock 的锁的是“独占”的,也就是说,它的锁都是“排他锁”,不能共享。

在 ReentrantLock 的构造方法里,可以传入一个 boolean 类型的参数,来指定它是否是一个公平锁,默认情况下是非公平的。这个参数一旦实例化后就不能修改,只能通过 isFair() 方法来查看。

ReentrantReadWriteLock

ReentrantReadWriteLock 也是一个非抽象类,它是 ReadWriteLock 接口的 JDK 默认实现。它与 ReentrantLock 的功能类似,同样是可重入的,支持非公平锁和公平锁。不同的是,它还支持“读写锁”。

缺点:在“写”操作的时候,其它线程不能写也不能读(“写饥饿”)。

StampedLock

StampedLock 类是在 Java 8 才发布的,也是 Doug Lea 大神所写,有人号称它为锁的性能之王。它没有实现 Lock 接口和 ReadWriteLock 接口,但它其实是实现了“读写锁”的功能,并且性能比 ReentrantReadWriteLock 更高。StampedLock 还把读锁分为了“乐观读锁”和“悲观读锁”两种。

它的核心思想在于,在读的时候如果发生了写,应该通过重试的方式来获取新的值,而不应该阻塞写操作。这种模式也就是典型的无锁编程思想,和 CAS 自旋的思想一样。这种操作方式决定了 StampedLock 在读线程非常多而写线程非常少的场景下非常适用,同时还避免了“写饥饿”情况的发生。

总的来说,StampedLock 的性能是非常优异的,基本上可以取代 ReentrantReadWriteLock 的作用。

CopyOnWrite 容器

CopyOnWrite 是计算机设计领域中的一种优化策略,也是一种在并发场景下常用的设计思想——写入时复制思想。

写入时复制思想是当有多个调用者同时去请求一个资源数据的时候,有一个调用者出于某些原因需要对当前的数据源进行修改,这个时候系统将会复制一个当前数据源的副本给调用者修改。

CopyOnWrite 容器即写时复制的容器,当我们往一个容器中添加元素的时候,不直接往容器中添加,而是将当前容器进行 copy,复制出来一个新的容器,然后向新容器中添加我们需要的元素,最后将原容器的引用指向新容器。

这样做的好处在于,我们可以在并发的场景下对容器进行“读操作”而不需要“加锁”,从而达到读写分离的目的。从 JDK 1.5 开始 Java 并发包里提供了两个使用 CopyOnWrite 机制实现的并发容器,分别是 CopyOnWriteArrayList 和 CopyOnWriteArraySet。

CopyOnWriteArrayList 的 add() 方法源码:

public boolean add(E e) {
    // ReentrantLock 加锁,保证线程安全
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        // 拷贝原容器,长度为原容器长度加一
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        // 在新副本上执行添加操作
        newElements[len] = e;
        // 将原容器引用指向新副本
        setArray(newElements);
        return true;
    } finally {
        // 解锁
        lock.unlock();
    }
}

利用这种思想实现 CopyOnWriteMap

public class CopyOnWriteMap<K, V> implements Map<K, V>, Cloneable {
    private volatile Map<K, V> internalMap;
    public CopyOnWriteMap() {
        internalMap = new HashMap<K, V>();
    }
    public V put(K key, V value) {
        synchronized (this) {
            Map<K, V> newMap = new HashMap<K, V>(internalMap);
            V val = newMap.put(key, value);
            internalMap = newMap;
            return val;
        }
    }
    public V get(Object key) {
        return internalMap.get(key);
    }
    public void putAll(Map<? extends K, ? extends V> newData) {
        synchronized (this) {
            Map<K, V> newMap = new HashMap<K, V>(internalMap);
            newMap.putAll(newData);
            internalMap = newMap;
        }
    }
}

CopyOnWrite 容器有数据一致性的问题,它只能保证最终数据一致性。所以如果我们希望写入的数据马上能准确地读取,请不要使用 CopyOnWrite 容器。

通信工具类

作用
Semaphore 限制线程的数量
Exchanger 两个线程交换数据
CountDownLatch 线程等待直到计数器减为 0 时开始工作
CyclicBarrier 作用跟 CountDownLatch 类似,但是可以重复使用
Phaser 增强的 CyclicBarrier

Semaphore 案例

限制同时只能有 3 个线程在工作:

public class SemaphoreDemo {
    static class MyThread implements Runnable {
        private int value;
        private Semaphore semaphore;
        public MyThread(int value, Semaphore semaphore) {
            this.value = value;
            this.semaphore = semaphore;
        }
        @Override
        public void run() {
            try {
                semaphore.acquire(); // 获取 permit
                System.out.println(String.format("当前线程是%d, 还剩%d个资源,还有%d个线程在等待",
                                                 value, semaphore.availablePermits(), semaphore.getQueue()));
                // 睡眠随机时间,打乱释放顺序
                Random random = new Random();
                Thread.sleep(random.nextInt(1000));
                semaphore.release(); // 释放 permit
                System.out.println(String.format("线程%d释放了资源", value));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 10; i++) {
            new Thread(new MyThread(i, semaphore)).start();
        }
    }
}

Output:

当前线程是1,还剩2个资源,还有0个线程在等待
当前线程是0,还剩1个资源,还有0个线程在等待
当前线程是6,还剩0个资源,还有0个线程在等待
线程6释放了资源
当前线程是2,还剩0个资源,还有6个线程在等待
线程2释放了资源
当前线程是4,还剩0个资源,还有5个线程在等待
线程0释放了资源
当前线程是7,还剩0个资源,还有4个线程在等待
线程1释放了资源
当前线程是8,还剩0个资源,还有3个线程在等待
线程7释放了资源
当前线程是5,还剩0个资源,还有2个线程在等待
线程4释放了资源
当前线程是3,还剩0个资源,还有1个线程在等待
线程8释放了资源
当前线程是9,还剩0个资源,还有0个线程在等待
线程9释放了资源
线程5释放了资源
线程3释放了资源

Exchanger 案例

public class ExchangerDemo {
    public static void main(String[] args) throws InterruptedException {
        Exchanger<String> exchanger = new Exchanger<>();
        new Thread(() -> {
            try {
                System.out.println("这是线程A,得到了另一个线程的数据:"
                                   + exchanger.exchange("这是来自线程A的数据"));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        
        System.out.println("这个时候线程A是阻塞的,在等待线程B的数据");
        Thread.sleep(1000);
        
        new Thread(() -> {
            try {
                System.out.println("这是线程B,得到了另一个线程的数据:"
                                   + exchanger.exchange("这是来自线程B的数据"));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

Output:

这个时候线程A是阻塞的,在等待线程B的数据
这是线程B,得到了另一个线程的数据: 这是来自线程A的数据
这是线程A,得到了另一个线程的数据:这是来自线程B的数据

当一个线程调用 exchange() 方法后,它是处于阻塞状态的,只有当另一个线程也调用了 exchange() 方法,它才会继续向下执行。源码使用 park/unpark 来实现等待状态的切换的,但是在使用 park/unpark 方法之前,使用了 CAS 检查,估计是为了提高性能。

Exchanger 一般用于两个线程之间更方便地在内存中交换数据,因为其支持泛型所以我们可以传输任何的数据,比如 I/O 流或者 I/O 缓存。根据 JDK 注释的说法,可以总结为以下特性:

  • 此类提供对外的操作是同步的
  • 用于成对出现的线程之间交换数据
  • 可以视作双向的同步队列
  • 可应用于基因算法、流水线设计等场景

CountDownLatch

// 构造方法
public CountDownLatch(int count);

public void await(); // 等待
public boolean await(long timeout, TimeUnit unit); // 超时等待
public void countDown(); // count - 1
public long getCount(); // 获取当前还有多少 count

案例:

public class CountDownLatchDemo {
    // 定义前置任务线程
    static class PreTaskThread implements Runnable {
        private String task;
        private CountDownLatch countDownLatch;
        public PreTaskThread(String task, CountDownLatch countDownLatch) {
            this.task = task;
            this.countDownLatch = countDownLatch;
        }
        @Override
        public void run() {
            try {
                Random random = new Random();
                Thread.sleep(random.nextInt(1000));
                System.out.println(task + " - 任务完成");
                countDownLatch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        // 假设有三个模块需要加载
        CountDownLatch countDownLatch = new CountDownLatch(3);
        // 主任务
        new Thread(() -> {
            try {
                System.out.println("等待数据加载...");
                System.out.println(String.format("还有%d个前置任务", countDownLatch.getCount()));
                countDownLatch.await();
                System.out.println("数据加载完成,正式开始游戏!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        // 前置任务
        new Thread(new PreTaskThread("加载地图数据", countDownLatch)).start();
        new Thread(new PreTaskThread("加载任务模型", countDownLatch)).start();
        new Thread(new PreTaskThread("加载背景音乐", countDownLatch)).start();
    }
}

Output:

等待数据加载…
还有3个前置任务
加载人物模型 - 任务完成
加载背景音乐 - 任务完成
加载地图数据 - 任务完成
数据加载完成,正式开始游戏!

构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且 CountDownLatch 没有提供任何机制去重新设置这个计数值。

CyclicBarrier

CyclicBarrier 拥有 CountDownLatch 的所有功能,还可以使用 reset() 方法重置屏障。

如果在参与者(线程)在等待的过程中,Barrier 被破坏,就会抛出 BrokenBarrierException。可以用 isBroken() 方法检测 Barrier 是否被破坏。

  1. 如果有线程已经处于等待状态,调用 reset() 方法会导致已经在等待的线程出现 BrokenBarrierException 异常。并且由于出现了 BrokenBarrierException,将会导致线程始终无法等待
  2. 如果在等待的过程中,线程被中断,也会抛出 BrokenBarrierException 异常并且这个异常会传播到其他所有的线程
  3. 如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,其他线程会抛出 BrokenBarrierException,屏障被损坏
  4. 如果超出指定的等待时间,当前线程会抛出 TimeoutException 异常,其他线程会抛出 BrokenBarrierException 异常

案例:如果玩一个游戏有多个“关卡”,使用 CountDownLatch 显然不太合适,需要为每个关卡都创建一个实例。我们可以使用 CyclicBarrier 来实现每个关卡的数据加载等待功能。

public class CyclicBarrierDemo {
    static class PreTaskThread implements Runnable {
        private String task;
        private CyclicBarrier cyclicBarrier;
        public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
            this.task = task;
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            // 假设总共三个关卡
            for (int i = 1; i < 4; i++) {
                try {
                    Random random = new Random();
                    Thread.sleep(random.nextInt(1000));
                    System.out.println(String.format("关卡%d的任务%s完成", i, task));
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                cyclicBarrier.reset(); // 重置屏障
            }
        }
    }
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
            System.out.println("本关卡所有前置任务完成,开始游戏...");
        });
        new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start();
        new Thread(new PreTaskThread("加载人物模型", cyclicBarrier)).start();
        new Thread(new PreTaskThread("加载背景音乐", cyclicBarrier)).start();
    }
}

Output:

关卡1的任务加载地图数据完成
关卡1的任务加载背景音乐完成
关卡1的任务加载人物模型完成
本关卡所有前置任务完成,开始游戏…
关卡2的任务加载地图数据完成
关卡2的任务加载背景音乐完成
关卡2的任务加载人物模型完成
本关卡所有前置任务完成,开始游戏…
关卡3的任务加载人物模型完成
关卡3的任务加载地图数据完成
关卡3的任务加载背景音乐完成
本关卡所有前置任务完成,开始游戏…

注意这里跟 CountDownLatch 的代码有一些不同。CyclicBarrier 没有分为 await() 和 countDown(),而是只有单独的一个 await() 方法。

CyclicBarrier 允许我们在达到屏障的时候可以执行一个任务,可以在构造方法传入一个 Runnable 类型的对象。上述案例就是在达到屏障时,输出“本关卡所有前置任务完成,开始游戏…”。

CyclicBarrier 虽说功能与 CountDownLatch 类似,但是实现原理却完全不同。CyclicBarrier 内部使用的是 Lock + Condition 实现的等待/通知模式。详情可以查看 dowait(boolean timed, long nanos) 方法的源码。

Phaser

CyclicBarrier 在构造方法里传入“任务总量” parties 之后,就不能修改这个值了,并且每次调用 await() 方法也只能消耗一个 parties 计数。但 Phaser 可以动态地调整任务总量。

名词解释:

  • party: 对应一个线程,数量可以通过 register 或者构造参数传入
  • arrive: 对应一个 party 的状态,初始时是 unarrived,当调用 arriveAndAwaitAdvance() 或者 arriveAndDeregister() 进入 arrive 状态,可以通过 getUnarrivedParties() 获取当前未到达的数量
  • register: 注册一个 party,每一阶段必须所有注册的 party 都到达才能进入下一阶段
  • deRegister: 减少一个 party
  • phase: 阶段,当所有注册的 party 都 arrive 之后,将会调用 Phaser 的 onAdvance() 方法来判断是否要进入下一阶段

Phaser 终止的两种途径,Phaser 维护的线程执行完毕或者 onAdvance() 返回 true。

此外 Phaser 还能维护一个树状的层级关系,构造的时候 new Phaser(parentPhaser),对于 Task 执行时间短的场景 (竞争激烈),也就是说有大量的 party,可以把每个 Phaser 的任务量设置较小,多个 Phaser 共同继承一个父 Phaser。


案例:假设游戏有三个关卡,但只有第一个关卡有新手教程,需要加载新手教程模块。后面的第二个关卡和第三个关卡都不需要。我们可以用 Phaser 来做这个需求。

public class PhaserDemo {
    static class PreTaskThread implements Runnable {
        private String task;
        private Phaser phaser;
        public PreTaskThread(String task, Phaser phaser) {
            this.task = task;
            this.phaser = phaser;
        }
        @Override
        public void run() {
            for (int i = 1; i < 4; i++) {
                try {
                    // 第二个关卡起不加载 NPC,跳过
                    if (i >= 2 && "加载新⼿教程".equals(task)) {
                        continue;
                    }
                    Random random = new Random();
                    Thread.sleep(random.nextInt(1000));
                    System.out.println(String.format("关卡%d,需要加载%d个模块,当前模块【%s】", 
                                                     i, phaser.getRegisteredParties(), task));
                    // 从第二个关卡起,不加载新手教程
                    if (i == 1 && "加载新手教程".equals(task)) {
                        System.out.println("下次关卡移除加载【新手教程】模块");
                        phaser.arriveAndDeregister(); // 移除一个模块
                    } else {
                        phaser.arriveAndAwaitAdvance();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) {
        Phaser phaser = new Phaser(4) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println(String.format("第%d次关卡准备完成", phase + 1));
                return phase == 3 || registeredParties == 0;
            }
        };
        new Thread(new PreTaskThread("加载地图数据", phaser)).start();
        new Thread(new PreTaskThread("加载人物模型", phaser)).start();
        new Thread(new PreTaskThread("加载背景音乐", phaser)).start();
        new Thread(new PreTaskThread("加载新手教程", phaser)).start();
    }
}

Output:

关卡1,需要加载4个模块,当前模块【加载背景音乐】
关卡1,需要加载4个模块,当前模块【加载新手教程】
下次关卡移除加载【新手教程】模块
关卡1,需要加载3个模块,当前模块【加载地图数据】
关卡1,需要加载3个模块,当前模块【加载人物模型】
第1次关卡准备完成
关卡2,需要加载3个模块,当前模块【加载地图数据】
关卡2,需要加载3个模块,当前模块【加载背景音乐】
关卡2,需要加载3个模块,当前模块【加载人物模型】
第2次关卡准备完成
关卡3,需要加载3个模块,当前模块【加载人物模型】
关卡3,需要加载3个模块,当前模块【加载地图数据】
关卡3,需要加载3个模块,当前模块【加载背景音乐】
第3次关卡准备完成

这里要注意关卡1 的输出,在“加载新手教程”线程中调用了 arriveAndDeregister() 减少一个 party 之后,后面的线程使用 getRegisteredParties() 得到的是已经被修改后的 parties 了。但是当前这个阶段(phase),仍然是需要 4 个 parties 都 arrive 才触发屏障的。从下一个阶段开始,才需要 3 个 parties 都 arrive 就触发屏障。

Fork/Join 框架

Fork/Join 框架是一个实现了 ExecutorService 接口的多线程处理器,它专为那些可以通过递归分解成更细小的任务而设计,最大化的利用多核处理器来提高应用程序的性能。Fork/Join 框架在执行任务时使用了工作窃取算法

工作窃取算法

工作窃取算法指的是在多线程执行不同任务队列的过程中,某个线程执行完自己队列的任务后从其他线程的任务队列里窃取任务来执行。

值得注意的是,当一个线程窃取另一个线程的时候,为了减少两个任务线程之间的竞争,我们通常使用双端队列来存储任务。被窃取的任务线程都从双端队列的头部拿任务执行,而窃取其他任务的线程从双端队列的尾部执行任务。

另外,当一个线程在窃取任务时要是没有其他可用的任务了,这个线程会进入阻塞状态以等待再次“工作”。

ForkJoinTask

在 Fork/Join 框架里提供了抽象类 ForkJoinTask 来实现任务。ForkJoinTask 是一个类似普通线程的实体,但是比普通线程轻量得多。

通常情况下,在创建任务的时候我们一般不直接继承 ForkJoinTask,而是继承它的子类 RecursiveActionRecursiveTask,两个都是 ForkJoinTask 的子类,前者无返回值,后者有返回值。

ForkJoinPool 是用于执行 ForkJoinTask 任务的执行(线程)池。ForkJoinPool 与传统线程池最显著的区别就是它维护了一个工作队列数组volatile WorkQueue[] workQueues,ForkJoinPool 中的每个工作线程都维护着一个工作队列)。

案例:计算斐波那契数列第 n 项

斐波那契数列数列是一个线性递推数列,从第三项开始,每一项的值都等于前两项之和:f(n) = f(n-1) + f(n-2)

public class FibonacciTest {
    class Fibonacci extends RecursiveTask<Integer> {
        int n;
        public Fibonacci(int n) {
            this.n = n;
        }
        // 主要的实现逻辑都在 compute() 里面
        @Override
        protected Integer compute() {
            // 这里先假设 n >= 0
            if (n <= 1) {
                return n;
            } else {
                // f(n-1)
                Fibonacci f1 = new Fibonacci(n - 1);
                f1.fork();
                // f(n-2)
                Fibonacci f2 = new Fibonacci(n - 2);
                f2.fork();
                // f(n) = f(n-1) + f(n-2)
                return f1.join() + f2.join();
            }
        }
    }
    @Test
    public void testFib() throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        System.out.println("CPU核数:" + Runtime.getRuntime().availableProcessors());
        long start = System.currentTimeMillis();
        Fibonacci fibonacci = new Fibonacci(40);
        Future<Integer> future = forkJoinPool.submit(fibonacci);
        System.out.println(future.get());
        long end = System.currentTimeMillis();
        System.out.println(String.format("耗时:%d millis", end - start));
    }
}

Output:

CPU核数:4
计算结果:102334155
耗时:9490 millis

使用普通递归的效率都要比使用 Fork/Join 框架要高很多。因为 Fork/Join 是使用多个线程协作来计算的,所以会有线程通信和线程切换的开销。

如果要计算的任务比较简单(比如案例中的斐波那契数列),那当然是直接使用单线程会更快一些。但如果要计算的东西比较复杂,计算机又是多核的情况下就可以充分利用多核 CPU 来提高计算速度。

计划任务

自 JDK 1.5 开始,JDK 提供了 ScheduledThreadPoolExecutor 类用于计划任务(又称定时任务),这个类有两个用途:

  • 在给定的延迟之后运行任务
  • 周期性重复执行任务

案例:指定时间给大家发送消息,用一个定时任务,每隔 1 秒检查数据库在当前时间有没有需要发送的消息。

public class ThreadPool {
    private static final ScheduledExecutorService executor = 
        new ScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory());
    
    private static SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
    public static void main(String[] args){
        // 新建一个固定延迟时间的计划任务
        executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                if (haveMsgAtCurrentTime()) {
                    System.out.println(df.format(new Date()));
                    System.out.println("大家注意了,我要发消息了");
                }
            }
        }, 1, 1, TimeUnit.SECONDS);
    }
    
    public static boolean haveMsgAtCurrentTime(){
        // 查询数据库,有没有当前时间需要发送的消息
        // 这里省略实现,直接返回 true
        return true;
    }
}

Output:

2022-11-17 07:38:09
大家注意了,我要发消息了
2022-11-17 07:38:10
大家注意了,我要发消息了
2022-11-17 07:38:11
大家注意了,我要发消息了
2022-11-17 07:38:12
大家注意了,我要发消息了
2022-11-17 07:38:13
大家注意了,我要发消息了


Java 多线程和并发
http://lpxz.work/posts/586fe421/
作者
LPxz
发布于
2023年5月1日
许可协议