同步工具
同步工具
java.util.concurrent包(JUC)中提供了几个常用的帮助管理大量线程集的同步工具, 在合适的情形下推荐使用这些现成的类库工具而不使用繁琐的Lock和Condition,而且也不推荐使用wait和notify,也是因为困难且繁琐,我们应该学会这些更高级的同步器来完成工作.
Semaphore
学过操作系统的朋友都知道这是Dijkstra发明的一种同步原语,依靠信号量可以帮助我们解决常见的线程同步问题,主要方法就是两个,请求资源和释放资源的方法。
acquire(int n),请求一定数目的资源,不能被满足时挂起当前线程,直到有线程在该信号量上调用了release。acquire():请求一个资源,不能被满足时挂起当前线程,同上。release(int n),释放一定数量的资源,然后从被挂起的线程中选择一个能满足要求的线程并恢复之。release():释放一个资源,同上。
注意:调用
release之前并不要求该线程通过了require调用,这是什么意思呢,假如说有一个捣乱线程它不调用acquire,就只调用release就会导致可用资源数反而一直增加(这里称为资源可能不贴切,JDK中将其称之为许可证,就是说构造信号量时使用了n个许可证表示最大并发量就是n了,结果有的线程一直调用release导致许可证总数变化了,就可能会破坏并发环境),这就导致破坏了信号量的作用了。因此我们在使用信号量的时候,需要我们自己维护这个约定。fun main() { val semaphore = Semaphore(2) repeat(4) { //开启4个线程 thread { println("${Thread.currentThread().name}请求资源...") semaphore.acquire() println("${Thread.currentThread().name}获得了资源,开始工作...") Thread.sleep(500L) semaphore.release() println("${Thread.currentThread().name}释放资源...") } } }结果:
Thread-0请求资源... Thread-2请求资源... Thread-1请求资源... Thread-3请求资源... Thread-1获得了资源,开始工作... Thread-3获得了资源,开始工作... Thread-3释放资源... Thread-2获得了资源,开始工作... Thread-1释放资源... Thread-0获得了资源,开始工作... Thread-2释放资源... Thread-0释放资源...从结果中可以看到
Thread-0,Thread-1,Thread-2,Thread-3同时请求资源,但是只有Thread-1和Thread-3获得了资源能继续执行,其他的线程被挂起,Thread-3释放了资源后才从新激活Thread-2。因此它适合资源有限时,又希望线程集可以交替工作的情况。CountDownLatch
倒计时门栓内维护了一个计数器,只有计数器为0时在这个latch上等待的线程才能继续运行,具体来说就是线程需要等待一个具体事件发生N次,线程才能运行,该类有一下主要方法:
CountDownLatch(int count),count是在线程可以通过await之前必须调用countDown的次数。countDown(),一个线程完成他的工作后,调用这个方法来使计数器减1。await(),调用该方法的线程挂起,直到计数器变为0,该方法返回 。await(long timeout,TimeUnit unit),超时版本他有boolean型返回值,而上一个返回void,当在规定时间内计数器变为0了就返回true,否则false。
由于计数器这个特性,就导致了他只能是一次性的,不能重复利用。
fun main() { val count = 5; val latch = CountDownLatch(count) val l = measureTimeMillis { repeat(count) { //开启5个线程 thread { Thread.sleep(1000L) //每个线程都sleep 1秒 latch.countDown() } } println("${Thread.currentThread().name}我想退出..") latch.await()//main必须等待上面的5个线程完成工作 println("${Thread.currentThread().name}我可以退出啦") } println("time:$l ms") }main我想退出.. main我可以退出啦 time:1008 ms如果把上面的5个线程看成时工作线程集,它们必须完成各自的工作后,另一个线程才能继续推进(这里是主线程),因此它适合某一个线程需要等待特定事件发生N此的情况。
CyclicBarrier
栅栏,顾名思义它可以把处于不同运行进度的线程拦住,让这些线程又重新处于同一起跑线,主要方法就是一个
await,让这些线程等啊等,等到他们都处于同一起跑线了,又一起出发,当然还有其他方法,详细参考文档CyclicBarrier(int count,Runnable action),构造一个要拦截count个线程的栅栏,先到达的线程会执行action这一动作await(),等到这些需要同步的线程集都调用了这个方法时,这个方法才返回,当然,最后一个线程会立即返回,然后其他因为先调用await而等待的线程也会恢复,栅栏撤销,线程们又开始各自的运行await(long timeout,TimeUnit unit),await()的超时版,调用这个方法的线程表示说它不讲武德,只能最多等其它比它慢的线程timeout的时间,超过这个时间它就受不了了,超时的情况下,其他的还在await乖乖等待线程就会抛出BrokenBarrierException,终止await调用。
fun main() { val nthreads = 5 val barrier1 = CyclicBarrier(nthreads) repeat(nthreads) { thread { val start = System.currentTimeMillis() Thread.sleep(it * 1000L) println("${Thread.currentThread().name}完成了工作..") barrier1.await() val end = System.currentTimeMillis() println("${Thread.currentThread().name}但是还是花费了${end-start}") } } }Thread-0完成了工作.. Thread-1完成了工作.. Thread-2完成了工作.. Thread-3完成了工作.. Thread-4完成了工作.. Thread-4但是还是花费了4013 Thread-1但是还是花费了4013 Thread-0但是还是花费了4013 Thread-3但是还是花费了4013 Thread-2但是还是花费了4013CountDownLatch不能重复利用,你可以使用它来完成。或者直接参考
Phaser。Phaser
于Java SE7中引入,它能完成
CyclicBarrier和CountDownLatch的工作,灵活性更强,主要方法如下:Phaser(int parties),构造一个有给定数目的注册方的相位器,它表明了相位器进入下一阶段注册方需要声明到达并取消注册的次数.类似CountDownLatch.register(),添加一个未到达相位器的新参与方bulkRegister(int parties),添加给定数目的未到达相位器的参与方arriveAndAwaitAdvance(),表明到达此相位器并等待其他人,类似于CyclicBarrier.await()arriveAndDeregister(),注册方声明到达并注销,只有注册方都调用了这个方法,参与方才能通过相位器的屏障进入下一阶段.类似CountDownLatch.countDown()方法
相当于这个相位器一个顶俩,而且可复用。
fun main() { val nthread = 5 val startGate = Phaser(nthread) //nthread个注册方 startGate.bulkRegister(nthread) //批量注册nthread个线程为参与方 repeat(nthread) { //启动nthread个线程工作线程<-->参与方 thread { Thread.sleep(it * 1000L) println("${Thread.currentThread().name}表示准备就绪...${Date()}") startGate.arriveAndAwaitAdvance() //同时在这里等待,类似CyclicBarrier.await方法 //这些线程又同时从这里出发 println(Thread.currentThread().name + Date()) } } //启动nthread个发布启动命令的线程<-->注册方 repeat(nthread) { thread { //doSomething first TimeUnit.SECONDS.sleep(1L) startGate.arriveAndDeregister() //需要注册方都表示到达且注销,参与方才能进行屏障后的动作 println("${Thread.currentThread().name}表示可以开始...${Date()}") } } }Thread-0表示准备就绪...Sun Apr 25 17:31:02 CST 2021 Thread-8表示可以开始...Sun Apr 25 17:31:03 CST 2021 Thread-7表示可以开始...Sun Apr 25 17:31:03 CST 2021 Thread-1表示准备就绪...Sun Apr 25 17:31:03 CST 2021 Thread-5表示可以开始...Sun Apr 25 17:31:03 CST 2021 Thread-6表示可以开始...Sun Apr 25 17:31:03 CST 2021 Thread-9表示可以开始...Sun Apr 25 17:31:03 CST 2021 Thread-2表示准备就绪...Sun Apr 25 17:31:04 CST 2021 Thread-3表示准备就绪...Sun Apr 25 17:31:05 CST 2021 Thread-4表示准备就绪...Sun Apr 25 17:31:06 CST 2021 Thread-4Sun Apr 25 17:31:06 CST 2021 Thread-2Sun Apr 25 17:31:06 CST 2021 Thread-0Sun Apr 25 17:31:06 CST 2021 Thread-1Sun Apr 25 17:31:06 CST 2021 Thread-3Sun Apr 25 17:31:06 CST 2021它应该是功能最多的同步工具,基本大多数情况都能用它。
SynchronousQueue
同步队列是一种将生产者与消费者线程配对的机制。当一个线程调用SynchronousQueue的put 方法时, 它会阻塞直到另一个线程调用take 方法为止, 反之亦然。与Exchanger 的情况不同, 数据仅仅沿一个方向传递,从生产者到消费者。即使SynchronousQueue 类实现了BlockingQueue 接口, 概念上讲, 它依然不是一个队列。它没有包含任何元素,它的size 方法总是返回0.它有以下主要方法:
SynchronousQueue(boolean fair),是否启用公平策略,启用的话等待的线程会按照谁等得久谁先走put(E e),向队列添加一个元素,阻塞直到有线程调用take()。take(),从队列取出一个元素,阻塞直到有线程调用put。
fun main() { val queue = SynchronousQueue<String>() val nthreads = 2 repeat(nthreads) { //启动两个生产者线程 thread { Thread.sleep(it*1000L) queue.put(it.toString()) println("${Thread.currentThread().name}放进去了$it--------${Date()}") } } repeat(nthreads) { //启动两个消费线程 thread { val take = queue.take() println("${Thread.currentThread().name}取出了$take--------${Date()}") } } }Thread-0放进去了0--------Wed Apr 28 17:43:17 CST 2021 Thread-2取出了0--------Wed Apr 28 17:43:17 CST 2021 Thread-3取出了1--------Wed Apr 28 17:43:18 CST 2021 Thread-1放进去了1--------Wed Apr 28 17:43:18 CST 2021可以看到线程集基本上是消费者生产者两两配合着向前推进的。
Exchanger
交换器,不同于
SynchronousQueue数据的单向流动(生产者-->消费者),Exchanger数据是双向流动的,用于两个线程同步交换数据. 就只有一个交换方法:exchange(V v),开始准备数据交换,如果另一个线程已经在等待该线程,那么立即进行交换,否则该线程等待。exchange(V v,long timeout,TimeUnit unit),上一个的超时版,超时时抛出TimeoutException。
fun main() { val exchanger = Exchanger<String>() thread { var greeting = "Hello" println("${Thread.currentThread().name}交换前:${greeting}") greeting = exchanger.exchange(greeting) println("${Thread.currentThread().name}交换后:${greeting}") } thread { var greeting = "Hi" println("${Thread.currentThread().name}交换前:${greeting}") greeting = exchanger.exchange(greeting) println("${Thread.currentThread().name}交换后:${greeting}") } }Thread-0交换前:Hello Thread-1交换前:Hi Thread-1交换后:Hello Thread-0交换后:Hi这个使用场景可能不多,但是确实很实用,使用方法也不复杂。
引用Effective Java 第3版的关于并发工具的总结:
简而言之,直接使用
wait()和notify()方法就像用"并发汇编语言"进行编程一样,而java.util.concurrent则提供了更加高级的编程语言.***没有理由在新代码中使用wait和notify方法,即使有,也是极少的.***如果你在维护使用wait和notify的代码,务必确保始终是利用标准的模式从while内部循环调用的wait方法.一般情况下,应该使用notifyAll方法而不是notify方法.如果使用notify,请一定要小心确保程序的活性.