Skip to content

java并发工具类

一、CountDownLatch

CountDownLatch类似于一个计数器,可以用来判断多线程是否已经完成。

主要方法:

构造方法会定义一直初始计数器的值

  • await():进入等待的状态(cas),当计数器里的值为0以后,被唤醒操作
  • countDown():计数器的值减一,当线程执行完毕以后需要执行的操作

主要代码如下:

public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(8);
        new Thread(()->{
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("countDownLatch执行结束");
        }).start();

        for (int i = 0; i < 8; i++) {
            int j = i;
            new Thread(()->{
                try {
                    Thread.sleep(j * 1000L);
                    System.out.println(Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }).start();
        }
    }

二、CyclicBarrier

CyclicBarrier类似于一个栅栏,主要是让所有线程在同一时刻一起执行。

与countDownLatch的区别:

  • CountDownLatch一般用于某个线程等待若干个其他线程执行完任务之后,它才执行;不可重复使用
  • CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;可重用的

在执行的时候调await方法,当判断线程等待数达到了计数器的值,唤醒所有的线程执行任务

主要代码如下:

 public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(8);

        for (int i = 0; i < 8; i++) {
            int finalI = i;
            new Thread(() -> {
                try {
                    Thread.sleep(finalI * 1000L);
                    System.out.println(Thread.currentThread().getName() + "准备就绪");
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }

                System.out.println("开始比赛");
            }).start();
        }
    }

三、Semaphore

Semaphore信号量,可以控制并发数量,即线程执行的个数

使用场景:接口限流

代码如下:

public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(2);

        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "开始执行");
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            }).start();
        }
    }

四、Exchanger

Exchanger是用于交换数据,它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据, 如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。因此使用Exchanger的重点是成对的线程使用exchange()方法,当有一对线程达到了同步点,就会进行交换数据。因此该工具类的线程对象是【成对】的。

代码如下:

public static void main(String[] args) {
        Exchanger<String> stringExchanger = new Exchanger<>();

        String str1 = "xdclass";
        String str2 = "wiggin";

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "初始值==========>" + str1);
            try {
                String exchange = stringExchanger.exchange(str1);
                System.out.println(Thread.currentThread().getName() + "交換后的数据==========>" + exchange);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程1").start();

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "初始值==========>" + str2);
            try {
                String exchange = stringExchanger.exchange(str2);
                System.out.println(Thread.currentThread().getName() + "交換后的数据==========>" + exchange);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程2").start();

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "初始值==========>" + str2);
            try {
                String exchange = stringExchanger.exchange(str2);
                System.out.println(Thread.currentThread().getName() + "交換后的数据==========>" + exchange);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程3").start();
    }