最近遇到了多线程并发同步问题,找到了java.util.concurrent包下的CountDownLatch、CyclicBarrier、Semaphore这三个类。
CountDownLatch可以实现类似计数器的功能,例如线程A需要等待B、C、D三个线程执行完成之后才可以执行。
CyclicBarrier可以实现让一组(多个)线程等待至某个状态之后再全部同时执行,当所有线程都被释放以后,CyclicBarrier可以被重用。
Semaphore可以控制同时访问的线程个数,通过acquire()获取一个许可,如果没有就等待,而release()释放一个许可。
CountDownLatch
CountDownLatch类只有一个构造方法:
| 12
 3
 4
 
 | public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");
 this.sync = new Sync(count);
 }
 
 | 
这里的count是一个计数值,表示要等待多少任务,每次调用该对象示例的countDown()方法,该值都会减1,当count为0时表示没有需要等待的任务。常用的方法如下:
| 12
 3
 
 | public void await() throws InterruptedException { };   public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
 public void countDown() { };
 
 | 
示例如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 
 | public static void testCountDownLatch() {final CountDownLatch latch = new CountDownLatch(2);
 new Thread("one") {
 public void run() {
 try {
 System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
 Thread.sleep(3000);
 System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
 latch.countDown();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 };
 }.start();
 new Thread("two") {
 public void run() {
 try {
 System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
 Thread.sleep(3000);
 System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
 latch.countDown();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 };
 }.start();
 
 try {
 System.out.println("等待2个线程执行完成");
 latch.await();
 System.out.println("子线程已经执行完毕");
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 
 | 
CyclicBarrier
该类有两个构造方法
| 12
 
 | public CyclicBarrier(int parties, Runnable barrierAction)public CyclicBarrier(int parties)
 
 | 
参数parties是指让多少个线程或者任务等待至barrier状态,参数barrierAction是当这些线程都到达barrier状态后会执行的内容。
该类里面有两个比较重要的方法:
| 12
 
 | public int await() throws InterruptedException, BrokenBarrierException { };public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };
 
 | 
无参的方法比较常用,用来挂起当前线程,直到所有线程都达到barrier状态再同时执行后续任务。
有参的方法是让线程等待一定时间,如果线程还没有达到barrier状态,就让到达barrier状态的线程执行后续任务。
示例如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 
 | static void testCyclicBarrier() {
 
 CyclicBarrier barrier = new CyclicBarrier(5,new Runnable() {
 
 @Override
 public void run() {
 System.out.println("所有线程执行完毕,随机挑选一个线程来执行打印");
 System.out.println("挑选的线程为" + Thread.currentThread().getName());
 
 }
 });
 for (int i = 0; i < 5; i++) {
 new Writer(barrier, "thread:" + i).start();
 }
 
 }
 
 static class Writer extends Thread {
 private CyclicBarrier cyclicBarrier;
 
 public Writer(CyclicBarrier cyclicBarrier, String threadName) {
 this.cyclicBarrier = cyclicBarrier;
 if (threadName != null) {
 this.setName(threadName);
 }
 
 }
 
 @Override
 public void run() {
 
 try {
 System.out.println("线程" + Thread.currentThread().getName() + "正在作业中");
 Thread.sleep(5000);
 System.out.println("线程" + Thread.currentThread().getName() + "作业完成");
 cyclicBarrier.await();
 System.out.println("所有线程作业完毕,线程" + Thread.currentThread().getName() + "继续理其他任务");
 
 } catch (InterruptedException e) {
 e.printStackTrace();
 } catch (BrokenBarrierException e) {
 
 e.printStackTrace();
 }
 
 }
 
 }
 
 | 
值得注意的是,CyclicBarrier是可以重用的。
Semaphore
该类提供了两个构造器:
| 12
 3
 4
 5
 6
 
 | public Semaphore(int permits) {          sync = new NonfairSync(permits);
 }
 public Semaphore(int permits, boolean fair) {
 sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
 }
 
 | 
下面是该类中比较重要的几个方法,首先是acquire()、release():
| 12
 3
 4
 
 | public void acquire() throws InterruptedException {  }     public void acquire(int permits) throws InterruptedException { }
 public void release() { }
 public void release(int permits) { }
 
 | 
acquire()用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。
release()用来释放许可。注意,在释放许可之前,必须先获获得许可。
这4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法:
| 12
 3
 4
 
 | public boolean tryAcquire() { };    public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { };
 public boolean tryAcquire(int permits) { };
 public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { };
 
 | 
另外还可以通过availablePermits()方法得到可用的许可数目。
假如5个线程要使用3个资源,示例如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 
 | static void testSemaphore() {int N = 5;
 Semaphore semaphore = new Semaphore(3);
 for(int i=0;i<N;i++)
 new Worker("线程" +i,semaphore).start();
 }
 
 static class Worker extends Thread {
 
 private Semaphore semaphore;
 
 public Worker(String name, Semaphore semaphore) {
 super();
 this.setName(name);
 this.semaphore = semaphore;
 }
 
 @Override
 public void run() {
 try {
 semaphore.acquire();
 System.out.println("线程:" + Thread.currentThread().getName() + "占用一个资源");
 Thread.sleep(3000);
 System.out.println("线程:" + Thread.currentThread().getName() + "释放一个资源");
 semaphore.release();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 
 }
 
 | 
CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:
CountDownLatch一般用于某个线程等待若干个其他线程执行完任务之后,它才执行;而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;另外,CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。Semaphore其实和锁有点类似,它一般用于控制对某组资源的访问权限。
以上