最近遇到了多线程并发同步问题,找到了java.util.concurrent
包下的CountDownLatch
、CyclicBarrier
、Semaphore
这三个类。
CountDownLatch
可以实现类似计数器的功能,例如线程A需要等待B、C、D三个线程执行完成之后才可以执行。
CyclicBarrier
可以实现让一组(多个)线程等待至某个状态之后再全部同时执行,当所有线程都被释放以后,CyclicBarrier可以被重用。
Semaphore
可以控制同时访问的线程个数,通过acquire()
获取一个许可,如果没有就等待,而release()
释放一个许可。
CountDownLatch
CountDownLatch类只有一个构造方法:
1 2 3 4
| public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
|
这里的count是一个计数值,表示要等待多少任务,每次调用该对象示例的countDown()
方法,该值都会减1,当count为0时表示没有需要等待的任务。常用的方法如下:
1 2 3
| public void await() throws InterruptedException { }; public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; public void countDown() { };
|
示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public static void testCountDownLatch() { final CountDownLatch latch = new CountDownLatch(2); new Thread("one") { public void run() { try { System.out.println("子线程" + Thread.currentThread().getName() + "正在执行"); Thread.sleep(3000); System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕"); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }; }.start(); new Thread("two") { public void run() { try { System.out.println("子线程" + Thread.currentThread().getName() + "正在执行"); Thread.sleep(3000); System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕"); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }; }.start();
try { System.out.println("等待2个线程执行完成"); latch.await(); System.out.println("子线程已经执行完毕"); } catch (InterruptedException e) { e.printStackTrace(); } }
|
CyclicBarrier
该类有两个构造方法
1 2
| public CyclicBarrier(int parties, Runnable barrierAction) public CyclicBarrier(int parties)
|
参数parties是指让多少个线程或者任务等待至barrier状态,参数barrierAction是当这些线程都到达barrier状态后会执行的内容。
该类里面有两个比较重要的方法:
1 2
| public int await() throws InterruptedException, BrokenBarrierException { }; public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };
|
无参的方法比较常用,用来挂起当前线程,直到所有线程都达到barrier状态再同时执行后续任务。
有参的方法是让线程等待一定时间,如果线程还没有达到barrier状态,就让到达barrier状态的线程执行后续任务。
示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| static void testCyclicBarrier() {
CyclicBarrier barrier = new CyclicBarrier(5,new Runnable() { @Override public void run() { System.out.println("所有线程执行完毕,随机挑选一个线程来执行打印"); System.out.println("挑选的线程为" + Thread.currentThread().getName()); } }); for (int i = 0; i < 5; i++) { new Writer(barrier, "thread:" + i).start(); }
}
static class Writer extends Thread { private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier, String threadName) { this.cyclicBarrier = cyclicBarrier; if (threadName != null) { this.setName(threadName); }
}
@Override public void run() {
try { System.out.println("线程" + Thread.currentThread().getName() + "正在作业中"); Thread.sleep(5000); System.out.println("线程" + Thread.currentThread().getName() + "作业完成"); cyclicBarrier.await(); System.out.println("所有线程作业完毕,线程" + Thread.currentThread().getName() + "继续理其他任务");
} catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); }
}
}
|
值得注意的是,CyclicBarrier是可以重用的。
Semaphore
该类提供了两个构造器:
1 2 3 4 5 6
| public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = (fair)? new FairSync(permits) : new NonfairSync(permits); }
|
下面是该类中比较重要的几个方法,首先是acquire()、release():
1 2 3 4
| public void acquire() throws InterruptedException { } public void acquire(int permits) throws InterruptedException { } public void release() { } public void release(int permits) { }
|
acquire()用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。
release()用来释放许可。注意,在释放许可之前,必须先获获得许可。
这4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法:
1 2 3 4
| public boolean tryAcquire() { }; public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { }; public boolean tryAcquire(int permits) { }; public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { };
|
另外还可以通过availablePermits()方法得到可用的许可数目。
假如5个线程要使用3个资源,示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| static void testSemaphore() { int N = 5; Semaphore semaphore = new Semaphore(3); for(int i=0;i<N;i++) new Worker("线程" +i,semaphore).start(); } static class Worker extends Thread {
private Semaphore semaphore; public Worker(String name, Semaphore semaphore) { super(); this.setName(name); this.semaphore = semaphore; }
@Override public void run() { try { semaphore.acquire(); System.out.println("线程:" + Thread.currentThread().getName() + "占用一个资源"); Thread.sleep(3000); System.out.println("线程:" + Thread.currentThread().getName() + "释放一个资源"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }
}
|
CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:
CountDownLatch一般用于某个线程等待若干个其他线程执行完任务之后,它才执行;而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;另外,CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。Semaphore其实和锁有点类似,它一般用于控制对某组资源的访问权限。
以上