wiki 常用 JUC 工具介绍
1. CountDownLatch
使一组线程等待其他线程完成操作后才执行。其接收一个 int
参数,在其扣减为零时,等待的线程会被唤醒
1. 方法 常用方法有
CountDownLatch(n)
: 初始化计数器 latch
await()
: 等待方法,使当前线程进入同步队列进行等待,直到 latch 的值被减到0或者当前线程被中断,当前线程就会被唤醒
await(long timeout, TimeUnit unit)
: 带超时时间的 await()
countDown()
: 每执行一次,计数器 latch 减一,代表一个线程完成了任务
getCount()
: 获取当前 latch 值
2. 实战 代码的使用逻辑为
主线程初始化 CountDownLatch
实例,并设置计数器,然后直接 await
阻塞
子线程开始完成自己任务,结束后,调用 countDown
减少计数器
当计数器变成 0 时,唤醒正在 await
的主线程,继续执行
2个主线程通过 CountDownLatch 去等待3个工作线程完成操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class CountDownLatchTest { public static void main (String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch (3 ); Master master1 = new Master ("master-1" , countDownLatch); Master master2 = new Master ("master-2" , countDownLatch); Worker worker1 = new Worker ("worker-1" , countDownLatch); Worker worker2 = new Worker ("worker-2" , countDownLatch); Worker worker3 = new Worker ("worker-3" , countDownLatch); master1.start(); master2.start(); Thread.sleep(1000 ); worker1.start(); worker2.start(); worker3.start(); } }
Master 线程声明
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class Master extends Thread { private String name; private CountDownLatch cd; public Master (String name, CountDownLatch cd) { this .name = name; this .cd = cd; } @Override public void run () { try { System.out.println(this .name + " master wait." ); cd.await(); System.out.println(this .name + " master running." ); } catch (InterruptedException e) { e.printStackTrace(); } } }
Worker 线程声明
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class Worker extends Thread { private String name; private CountDownLatch cd; public Worker (String name, CountDownLatch cd) { this .name = name; this .cd = cd; } @Override public void run () { System.out.println(this .name + " begin." ); try { Thread.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this .name + " end." ); cd.countDown(); } }
输出
1 2 3 4 5 6 7 8 9 10 master-1 master wait. master-2 master wait. worker-1 begin. worker-3 begin. worker-2 begin. worker-1 end. worker-3 end. worker-2 end. master-1 master running. master-2 master running.
2. CyclicBarrier
实现线程间的计数等待,当一组线程到达某个屏障时被阻塞,直到组内的最后一个线程到达,然后屏障开放,线程执行。 其与 CountDownLatch
有一些区别,前者计数器数量一般都是大于等于线程,后者则数量必须一致。
1. 方法 常用方法有
CyclicBarrier(n)
: 要拦截的线程数 n
CyclicBarrier(int parties, Runnable barrierAction)
: 第二个参数是当屏障结束时要执行的任务线程
await()
: 线程调用时表示自己已经到达栅栏
await(long timeout, TimeUnit unit)
: 带超时时间的 await()
getNumberWaiting()
: 返回目前正在等待障碍的线程数量
getParties()
: 返回执行这个障碍所需的 parties 数量
isBroken()
: 查询这个障碍是否处于 broken
状态
reset()
: 将屏障重置为初始状态
2. 实战 代码的使用逻辑为
主线程初始化实例,并设置要拦截的线程数
然后子线程调用 await
方法,之后阻塞
当所有线程都达到时,屏障开放,线程继续执行,同时 barrierAction
线程也开始执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class CyclicBarrierTest { public static void main (String[] args) throws InterruptedException { public static void main (String[] args) { CyclicBarrier cb = new CyclicBarrier (5 , new Collecter ()); for (int i = 0 ; i <= 4 ; i++) { Worker worker = new Worker ("worker-thead-" + i, cb); worker.start(); } } } }
Works 线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 class Worker extends Thread { private String name; private CyclicBarrier cb; public Worker (String name, CyclicBarrier cb) { this .name = name; this .cb = cb; } @Override public void run () { System.out.println(this .name + " come." ); try { cb.await(); Thread.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(this .name + " go." ); } }
barrierAction
线程
1 2 3 4 5 6 7 8 public class Collecter implements Runnable { @Override public void run () { System.out.println("All come." ); } }
输出
1 2 3 4 5 6 7 8 9 10 11 worker-thead-0 come. worker-thead-4 come. worker-thead-1 come. worker-thead-2 come. worker-thead-3 come. All come. worker-thead-1 go. worker-thead-2 go. worker-thead-4 go. worker-thead-3 go. worker-thead-0 go.
3. Semaphore
信号量,用于控制同时访问某个特定资源的线程数量,当线程访问资源时,需要先获取许可,释放许可后其他线程才能获取该许可
1. 方法 常用方法有
Semaphore(n)
: 初始化许可数量 permits,就是资源的最大允许的访问的线程数
Semaphore(int permits, boolean fair)
: 第二个参数是是否公平竞争
acquire()
: 尝试从信号量中获取许可,一直阻塞直到获取许可或者线程中断
release
: 释放许可到信号量中
2. 实战 1 2 3 4 5 6 7 8 9 10 11 12 public class SemaphoreTest { public static void main (String[] args) { Semaphore semaphore = new Semaphore (3 ); for (int i = 0 ; i < 5 ; i++) { Worker worker = new Worker ("worker-thead-" + i, semaphore); worker.start(); } } }
Works 线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class Worker extends Thread { private String name; private Semaphore s; public Worker (String name, Semaphore s) { this .name = name; this .s = s; } @Override public void run () { try { System.out.println(this .name + " try to get acquired." ); s.acquire(); System.out.println(this .name + " get acquired." ); Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { s.release(); System.out.println(this .name + " go." ); } } }
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 worker-thead-0 try to get acquired. worker-thead-4 try to get acquired. worker-thead-0 get acquired. worker-thead-3 try to get acquired. worker-thead-1 try to get acquired. worker-thead-2 try to get acquired. worker-thead-3 get acquired. worker-thead-4 get acquired. worker-thead-4 go. worker-thead-2 get acquired. worker-thead-1 get acquired. worker-thead-0 go. worker-thead-3 go. worker-thead-2 go. worker-thead-1 go.
4. Exchanger
用于两个线程之间交换信息,两者都到达交换点时,就会交换两个线程的信息
1. 方法 常用方法有
Exchanger()
: 初始化许可数量 permits,就是资源的最大允许的访问的线程数
exchange(V x)
: 等到其他线程到达时,交换数据
V exchange(V x, long timeout, TimeUnit unit)
: 增加超时时间
2. 实战 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class ExchangerTest { public static void main (String[] args) { Exchanger<String> exchanger = new Exchanger <>(); Long beginTime = System.currentTimeMillis(); System.out.println("Begin in " + beginTime); Worker worker1 = new Worker ("worker-thead-1" , "hello - 1" , exchanger); try { Thread.sleep(3000 ); } catch (InterruptedException e) { e.printStackTrace(); } Worker worker2 = new Worker ("worker-thead-2" , "world - 2" , exchanger); worker1.start(); worker2.start(); Long endTime = System.currentTimeMillis(); System.out.println("End in " + endTime); System.out.println(String.format("Totle use %s ms" , (endTime - beginTime))); } }
Works 线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class Worker extends Thread { private String name; private String context; private Exchanger<String> e; public Worker (String name, String context, Exchanger<String> e) { this .name = name; this .context = context; this .e = e; } @Override public void run () { try { System.out.println(this .name + " exchanger to " + e.exchange(context)); } catch (InterruptedException e) { e.printStackTrace(); } } }
输出
1 2 3 4 5 Begin in 1651585100258 End in 1651585103264 worker-thead-1 exchanger to world - 2 worker-thead-2 exchanger to hello - 1 Totle use 3006 ms