Java 代码之 JUC 包

wiki

常用 JUC 工具介绍

1. CountDownLatch

使一组线程等待其他线程完成操作后才执行。其接收一个 int 参数,在其扣减为零时,等待的线程会被唤醒

1. 方法

常用方法有

  • CountDownLatch(n) : 初始化计数器 latch
  • await() : 等待方法,使当前线程进入同步队列进行等待,直到 latch 的值被减到0或者当前线程被中断,当前线程就会被唤醒
  • await(long timeout, TimeUnit unit) : 带超时时间的 await()
  • countDown() : 每执行一次,计数器 latch 减一,代表一个线程完成了任务
  • getCount() : 获取当前 latch 值

2. 实战

代码的使用逻辑为

  1. 主线程初始化 CountDownLatch 实例,并设置计数器,然后直接 await 阻塞
  2. 子线程开始完成自己任务,结束后,调用 countDown 减少计数器
  3. 当计数器变成 0 时,唤醒正在 await 的主线程,继续执行

2个主线程通过 CountDownLatch 去等待3个工作线程完成操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
// 初始化
CountDownLatch countDownLatch = new CountDownLatch(3);

// 2 个主线程
Master master1 = new Master("master-1", countDownLatch);
Master master2 = new Master("master-2", countDownLatch);

// 3 个工作线程
Worker worker1 = new Worker("worker-1", countDownLatch);
Worker worker2 = new Worker("worker-2", countDownLatch);
Worker worker3 = new Worker("worker-3", countDownLatch);

// 启动
master1.start();
master2.start();
Thread.sleep(1000);
worker1.start();
worker2.start();
worker3.start();
}
}

Master 线程声明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Master extends Thread {

private String name;
private CountDownLatch cd;

public Master(String name, CountDownLatch cd) {
this.name = name;
this.cd = cd;
}

@Override
public void run() {
try {
System.out.println(this.name + " master wait.");
cd.await();
System.out.println(this.name + " master running.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Worker 线程声明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Worker extends Thread {

private String name;
private CountDownLatch cd;

public Worker(String name, CountDownLatch cd) {
this.name = name;
this.cd = cd;
}

@Override
public void run() {
System.out.println(this.name + " begin.");
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.name + " end.");
cd.countDown();
}
}

输出

1
2
3
4
5
6
7
8
9
10
master-1 master wait.
master-2 master wait.
worker-1 begin.
worker-3 begin.
worker-2 begin.
worker-1 end.
worker-3 end.
worker-2 end.
master-1 master running.
master-2 master running.

2. CyclicBarrier

实现线程间的计数等待,当一组线程到达某个屏障时被阻塞,直到组内的最后一个线程到达,然后屏障开放,线程执行。
其与 CountDownLatch 有一些区别,前者计数器数量一般都是大于等于线程,后者则数量必须一致。

1. 方法

常用方法有

  • CyclicBarrier(n) : 要拦截的线程数 n
  • CyclicBarrier(int parties, Runnable barrierAction) : 第二个参数是当屏障结束时要执行的任务线程
  • await() : 线程调用时表示自己已经到达栅栏
  • await(long timeout, TimeUnit unit) : 带超时时间的 await()
  • getNumberWaiting() : 返回目前正在等待障碍的线程数量
  • getParties() : 返回执行这个障碍所需的 parties 数量
  • isBroken() : 查询这个障碍是否处于 broken 状态
  • reset() : 将屏障重置为初始状态

2. 实战

代码的使用逻辑为

  1. 主线程初始化实例,并设置要拦截的线程数
  2. 然后子线程调用 await 方法,之后阻塞
  3. 当所有线程都达到时,屏障开放,线程继续执行,同时 barrierAction 线程也开始执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class CyclicBarrierTest {
public static void main(String[] args) throws InterruptedException {
public static void main(String[] args) {
// 初始化
CyclicBarrier cb = new CyclicBarrier(5, new Collecter());

// 启动
for (int i = 0; i <= 4; i++) {
Worker worker = new Worker("worker-thead-" + i, cb);
worker.start();
}
}
}
}

Works 线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Worker extends Thread {

private String name;
private CyclicBarrier cb;

public Worker(String name, CyclicBarrier cb) {
this.name = name;
this.cb = cb;
}

@Override
public void run() {
System.out.println(this.name + " come.");
try {
cb.await();
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(this.name + " go.");
}

}

barrierAction 线程

1
2
3
4
5
6
7
8
public class Collecter implements Runnable {

@Override
public void run() {
System.out.println("All come.");
}

}

输出

1
2
3
4
5
6
7
8
9
10
11
worker-thead-0 come.
worker-thead-4 come.
worker-thead-1 come.
worker-thead-2 come.
worker-thead-3 come.
All come.
worker-thead-1 go.
worker-thead-2 go.
worker-thead-4 go.
worker-thead-3 go.
worker-thead-0 go.

3. Semaphore

信号量,用于控制同时访问某个特定资源的线程数量,当线程访问资源时,需要先获取许可,释放许可后其他线程才能获取该许可

1. 方法

常用方法有

  • Semaphore(n) : 初始化许可数量 permits,就是资源的最大允许的访问的线程数
  • Semaphore(int permits, boolean fair) : 第二个参数是是否公平竞争
  • acquire() : 尝试从信号量中获取许可,一直阻塞直到获取许可或者线程中断
  • release : 释放许可到信号量中

2. 实战

1
2
3
4
5
6
7
8
9
10
11
12
public class SemaphoreTest {

public static void main(String[] args) {
// 初始化
Semaphore semaphore = new Semaphore(3);

for (int i = 0; i < 5; i++) {
Worker worker = new Worker("worker-thead-" + i, semaphore);
worker.start();
}
}
}

Works 线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Worker extends Thread {

private String name;
private Semaphore s;

public Worker(String name, Semaphore s) {
this.name = name;
this.s = s;
}

@Override
public void run() {
try {
System.out.println(this.name + " try to get acquired.");
// 获取许可
s.acquire();
System.out.println(this.name + " get acquired.");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放许可
s.release();
System.out.println(this.name + " go.");
}
}
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
worker-thead-0 try to get acquired.
worker-thead-4 try to get acquired.
worker-thead-0 get acquired.
worker-thead-3 try to get acquired.
worker-thead-1 try to get acquired.
worker-thead-2 try to get acquired.
worker-thead-3 get acquired.
worker-thead-4 get acquired.
worker-thead-4 go.
worker-thead-2 get acquired.
worker-thead-1 get acquired.
worker-thead-0 go.
worker-thead-3 go.
worker-thead-2 go.
worker-thead-1 go.

4. Exchanger

用于两个线程之间交换信息,两者都到达交换点时,就会交换两个线程的信息

1. 方法

常用方法有

  • Exchanger() : 初始化许可数量 permits,就是资源的最大允许的访问的线程数
  • exchange(V x) : 等到其他线程到达时,交换数据
  • V exchange(V x, long timeout, TimeUnit unit) : 增加超时时间

2. 实战

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ExchangerTest {

public static void main(String[] args) {
// 初始化
Exchanger<String> exchanger = new Exchanger<>();

Long beginTime = System.currentTimeMillis();
System.out.println("Begin in " + beginTime);
Worker worker1 = new Worker("worker-thead-1", "hello - 1", exchanger);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Worker worker2 = new Worker("worker-thead-2", "world - 2", exchanger);

// 启动
worker1.start();
worker2.start();

Long endTime = System.currentTimeMillis();
System.out.println("End in " + endTime);
System.out.println(String.format("Totle use %s ms", (endTime - beginTime)));
}
}

Works 线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Worker extends Thread {

private String name;
private String context;
private Exchanger<String> e;

public Worker(String name, String context, Exchanger<String> e) {
this.name = name;
this.context = context;
this.e = e;
}

@Override
public void run() {
try {
System.out.println(this.name + " exchanger to " + e.exchange(context));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

输出

1
2
3
4
5
Begin in 1651585100258
End in 1651585103264
worker-thead-1 exchanger to world - 2
worker-thead-2 exchanger to hello - 1
Totle use 3006 ms