这篇文章将为大家详细讲解有关Object提供的阻塞和唤醒API有什么用,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
java.lang.Object
作为所有非基本类型的基类,也就是说所有java.lang.Object
的子类都具备阻塞和唤醒的功能。下面详细分析Object
提供的阻塞和唤醒API
,它们有一共共同特点:必须在synchronized
所修饰的代码块或者方法中使用。
等待-wait()
方法提供了阻塞的功能,分超时和永久阻塞的版本,实际上,底层只提供了一个JNI方法:
// 这个是底层提供的JNI方法,带超时的阻塞等待,响应中断,其他两个只是变体public final native void wait(long timeoutMillis) throws InterruptedException;// 变体方法1,永久阻塞,响应中断public final void wait() throws InterruptedException { wait(0L);}// 变体方法2,带超时的阻塞,超时时间分两段:毫秒和纳秒,实际上纳秒大于0直接毫秒加1(这么暴力...),响应中断public final void wait(long timeoutMillis, int nanos) throws InterruptedException { if (timeoutMillis < 0) { throw new IllegalArgumentException("timeoutMillis value is negative"); } if (nanos < 0 || nanos > 999999) { throw new IllegalArgumentException("nanosecond timeout value out of range"); } if (nanos > 0) { timeoutMillis++; } wait(timeoutMillis);}
也就是只有一个wait(long timeoutMillis)
方法是JNI接口,其他两个方法相当于:
wait()
等价于
wait(0L)
。wait(long timeoutMillis, int nanos)
在参数合法的情况下等价于
wait(timeoutMillis + 1L)
。由于wait(long timeoutMillis, int nanos)
是参数最完整的方法,它的API注释特别长,这里直接翻译和摘取它注释中的核心要素:
notify(All)
被调用、线程被中断或者在指定了超时阻塞的情况下超过了指定的阻塞时间。wait()
方法之后,当前线程会把自身放到当前对象的等待集合(
wait-set
),然后释放所有在此对象上的同步声明(then to relinquish any nd all synchronization claims on this object),谨记只有当前对象上的同步声明会被释放,当前线程在其他对象上的同步锁只有在调用其
wait()
方法之后才会释放。notify()
或者中断)就会从等待集合(
wait-set
)中移除并且重新允许被线程调度器调度。通常情况下,这个被唤醒的线程会与其他线程竞争对象上的同步权(锁),一旦线程重新
「控制了对象(regained control of the object)」,它对对象的所有同步声明都恢复到以前的状态,即恢复到调用
wait()
方法时(笔者认为,其实准确来说,是调用
wait()
方法前)的状态。wait()
之前,或者调用过
wait()
方法之后处于阻塞等待状态,一旦线程调用了
Thread#interrupt()
,线程就会中断并且抛出
InterruptedException
异常,线程的中断状态会被清除。
InterruptedException
异常会延迟到在第4点提到"它对对象的所有同步声明都恢复到以前的状态"的时候抛出。值得注意的还有:
「一个线程必须成为此对象的监视器锁的拥有者才能正常调用wait()
系列方法,也就是wait()
系列方法必须在同步代码块(synchronized
代码块)中调用,否则会抛出IllegalMonitorStateException
异常」,这一点是初学者或者不了解wait()
的机制的开发者经常会犯的问题。
上面的五点描述可以写个简单的同步代码块伪代码时序总结一下:
final Object lock = new Object();synchronized(lock){ 1、线程进入同步代码块,意味着获取对象监视器锁成功 while(!condition){ lock.wait(); 2.线程调用wait()进行阻塞等待 break; } 3.线程从wait()的阻塞等待中被唤醒,恢复到第1步之后的同步状态 4.继续执行后面的代码,直到离开同步代码块}
notify()
方法的方法签名如下:
@HotSpotIntrinsicCandidatepublic final native void notify();
下面按照惯例翻译一下其API注释:
wait()
方法才能阻塞在对象监视器上。notify()
方法的线程)释放对象上的锁。被唤醒的线程会与其他线程竞争在对象上进行同步(换言之只有获得对象的同步控制权才能继续执行),在成为下一个锁定此对象的线程时,被唤醒的线程没有可靠的特权或劣势。the owner
)的时候才能调用,具体就是:同步方法中、同步代码块中或者静态同步方法中。否则,会抛出
IllegalMonitorStateException
异常。notifyAll()
方法的方法签名如下:
@HotSpotIntrinsicCandidatepublic final native void notifyAll();
wait()
方法才能阻塞在对象监视器上。其他注释的描述和notify()
方法类似。
我们经常看到的资料中提到synchronized
关键字的用法:
Class
对象。对于同步代码块而言,synchronized
关键字抽象到字节码层面就是同步代码块中的字节码执行在monitorenter
和monitorexit
指令之间:
synchronized(xxxx){ ...coding block}↓↓↓↓↓↓↓↓↓↓monitorenter;...coding block - bytecodemonitorexit;
JVM
需要保证每一个monitorenter
都有一个monitorexit
与之相对应。任何对象都有一个monitor
(实际上是ObjectMonitor
)与之相关联,当且仅当一个monitor
被持有之后,它将处于锁定状态。线程执行到monitorenter
指令时,将会尝试获取对象所对应的monitor
所有权,即尝试获取对象的锁。
对于同步(静态)方法而言,synchronized
方法则会被翻译成普通的方法调用和返回指令,如:invokevirtual
等等,在JVM
字节码层面并没有任何特别的指令来实现被synchronized
修饰的方法,而是在Class
文件的方法表中将该方法的access_flags
字段中的synchronized
标志位置1,表示该方法是同步方法并使用调用该方法的对象或该方法所属的Class
在JVM
的内部对象表示Klass
作为锁对象。
其实从开发者角度简单理解,「这两种方式只是在获取锁的时机有所不同」。
下面重复阐述「几个第一眼看起来不合理却是事实的问题」(其实前文已经提及过):
synchronized
方法或者代码块,相当于获取监视器锁成功,如果此时成功调用
wait()
系列方法,那么它会立即释放监视器锁,并且添加到等待集合(
Wait Set
)中进行阻塞等待。synchronized
方法或者代码块之后,它可以调用
notify(All)
方法唤醒等待集合中正在阻塞的线程,但是这个唤醒操作并不是调用
notify(All)
方法后立即生效,而是在该线程
「退出synchronized
方法或者代码块之后才生效」。wait()
方法阻塞过程中被唤醒的线程会竞争监视器目标对象的控制权,一旦重新控制了对象,那么线程的同步状态就会恢复到步入
synchronized
方法或者代码块时候的状态(也就是成功获取到对象监视器锁时候的状态),这个时候线程才能够继续执行。为了验证这三点,可以写个简单的Demo
:
public class Lock {}public class WaitMain { private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); public static void main(String[] args) throws Exception { // 这里换成Object其实也没有问题 final Lock lock = new Lock(); new Thread(new WaitRunnable(lock), "WaitThread-1").start(); new Thread(new WaitRunnable(lock), "WaitThread-2").start(); Thread.sleep(50); new Thread(new NotifyRunnable(lock), "NotifyThread").start(); Thread.sleep(Integer.MAX_VALUE); } @RequiredArgsConstructor private static class WaitRunnable implements Runnable { private final Lock lock; @Override public void run() { synchronized (lock) { System.out.println(String.format("[%s]-线程[%s]获取锁成功,准备执行wait方法", F.format(LocalDateTime.now()), Thread.currentThread().getName())); while (true) { try { lock.wait(); } catch (InterruptedException e) { //ignore } System.out.println(String.format("[%s]-线程[%s]从wait中唤醒,准备exit", F.format(LocalDateTime.now()), Thread.currentThread().getName())); try { Thread.sleep(500); } catch (InterruptedException e) { //ignore } break; } } } } @RequiredArgsConstructor private static class NotifyRunnable implements Runnable { private final Lock lock; @Override public void run() { synchronized (lock) { System.out.println(String.format("[%s]-线程[%s]获取锁成功,准备执行notifyAll方法", F.format(LocalDateTime.now()), Thread.currentThread().getName())); lock.notifyAll(); System.out.println(String.format("[%s]-线程[%s]先休眠3000ms", F.format(LocalDateTime.now()), Thread.currentThread().getName())); try { Thread.sleep(3000); } catch (InterruptedException e) { //ignore } System.out.println(String.format("[%s]-线程[%s]准备exit", F.format(LocalDateTime.now()), Thread.currentThread().getName())); } } }}
某个时刻的执行结果如下:
[2019-04-27 23:28:17.617]-线程[WaitThread-1]获取锁成功,准备执行wait方法[2019-04-27 23:28:17.631]-线程[WaitThread-2]获取锁成功,准备执行wait方法[2019-04-27 23:28:17.657]-线程[NotifyThread]获取锁成功,准备执行notifyAll方法 <-------- 这一步执行完说明WaitThread已经释放了锁[2019-04-27 23:28:17.657]-线程[NotifyThread]先休眠3000ms[2019-04-27 23:28:20.658]-线程[NotifyThread]准备exit <------- 这一步后NotifyThread离开同步代码块[2019-04-27 23:28:20.658]-线程[WaitThread-1]从wait中唤醒,准备exit <------- 这一步WaitThread-1解除阻塞[2019-04-27 23:28:21.160]-线程[WaitThread-2]从wait中唤醒,准备exit <------- 这一步WaitThread-2解除阻塞,注意发生时间在WaitThread-1解除阻塞500ms之后,符合我们前面提到的第3点
如果结合wait()
和notify()
可以简单总结出一个同步代码块的伪代码如下:
final Object lock = new Object();// 等待synchronized(lock){ 1、线程进入同步代码块,意味着获取对象监视器锁成功 while(!condition){ lock.wait(); 2.线程调用wait()进行阻塞等待 break; } 3.线程从wait()的阻塞等待中被唤醒,尝试恢复第1步之后的同步状态,并不会马上生效,直到notify被调用并且调用notify方法的线程已经释放锁,同时当前线程需要竞争成功 4.继续执行后面的代码,直到离开同步代码块}// 唤醒synchronized(lock){ 1、线程进入同步代码块,意味着获取对象监视器锁成功 lock.notify(); 2.唤醒其中一个在对象监视器上等待的线程 3.准备推出同步代码块释放锁,只有释放锁之后第2步才会生效}
结合前面分析过的知识点以及参考资料中的文章,重新画一个图理解一下对象监视器以及相应阻塞和唤醒API
的工作示意过程:
Entry Set
(实际上是
ObjectMonitor
中的
_EntryList
属性):存放等待锁并且处于阻塞状态的线程。Wait Set
(实际上是
ObjectMonitor
中的
_WaitSet
属性):存放处于等待阻塞状态的线程。The Owner
(实际上是
ObjectMonitor
中的
_owner
属性):指向获得对象监视器的线程,在同一个时刻只能有一个线程被
The Owner
持有,通俗来看,它就是监视器的控制权。通过Object
提供的阻塞和唤醒机制举几个简单的使用例子。
假设有以下场景:厕所的只有一个卡位,厕所维修工修厕所的时候,任何人不能上厕所。当厕所维修工修完厕所的时候,上厕所的人需要"得到厕所的控制权"才能上厕所。
// 厕所类public class Toilet { // 厕所的锁 private final Object lock = new Object(); private boolean available; public Object getLock() { return lock; } public void setAvailable(boolean available) { this.available = available; } public boolean getAvailable() { return available; }}// 厕所维修工@RequiredArgsConstructorpublic class ToiletRepairer implements Runnable { private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); private final Toilet toilet; @Override public void run() { synchronized (toilet.getLock()) { System.out.println(String.format("[%s]-厕所维修员得到了厕所的锁,维修厕所要用5000ms...", LocalDateTime.now().format(F))); try { Thread.sleep(5000); } catch (Exception e) { // ignore } toilet.setAvailable(true); toilet.getLock().notifyAll(); System.out.println(String.format("[%s]-厕所维修员维修完毕...", LocalDateTime.now().format(F))); } }}//上厕所的任务@RequiredArgsConstructorpublic class ToiletTask implements Runnable { private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); private final Toilet toilet; private final String name; private final Random random; @Override public void run() { synchronized (toilet.getLock()) { System.out.println(String.format("[%s]-%s得到了厕所的锁...", LocalDateTime.now().format(F), name)); while (!toilet.getAvailable()) { try { toilet.getLock().wait(); } catch (InterruptedException e) { //ignore } int time = random.nextInt(3) + 1; try { // 模拟上厕所用时 TimeUnit.SECONDS.sleep(time); } catch (InterruptedException e) { //ignore } System.out.println(String.format("[%s]-%s上厕所用了%s秒...", LocalDateTime.now().format(F), name, time)); } } }}// 场景入口public class Main { public static void main(String[] args) throws Exception { Toilet toilet = new Toilet(); Random random = new Random(); Thread toiletRepairer = new Thread(new ToiletRepairer(toilet), "ToiletRepairer"); Thread thread1 = new Thread(new ToiletTask(toilet, "张三", random), "thread-1"); Thread thread2 = new Thread(new ToiletTask(toilet, "李四", random), "thread-2"); Thread thread3 = new Thread(new ToiletTask(toilet, "王五", random), "thread-3"); thread1.start(); thread2.start(); thread3.start(); Thread.sleep(50); toiletRepairer.start(); Thread.sleep(Integer.MAX_VALUE); }}
某次执行的结果如下:
[2019-04-29 01:07:25.914]-张三得到了厕所的锁...[2019-04-29 01:07:25.931]-李四得到了厕所的锁...[2019-04-29 01:07:25.931]-王五得到了厕所的锁...[2019-04-29 01:07:25.951]-厕所维修员得到了厕所的锁,维修厕所要用5000ms...[2019-04-29 01:07:30.951]-厕所维修员维修完毕...[2019-04-29 01:07:32.952]-张三上厕所用了2秒...[2019-04-29 01:07:35.952]-王五上厕所用了3秒...[2019-04-29 01:07:37.953]-李四上厕所用了2秒...
实现一个简单固定容量的阻塞队列,接口如下:
public interface BlockingQueue<T> { void put(T value) throws InterruptedException; T take() throws InterruptedException;}
其中put(T value)
会阻塞直到队列中有可用的容量,而take()
方法会阻塞直到有元素投放到队列中。实现如下:
public class DefaultBlockingQueue<T> implements BlockingQueue<T> { private Object[] elements; private final Object notEmpty = new Object(); private final Object notFull = new Object(); private int count; private int takeIndex; private int putIndex; public DefaultBlockingQueue(int capacity) { this.elements = new Object[capacity]; } @Override public void put(T value) throws InterruptedException { synchronized (notFull) { while (count == elements.length) { notFull.wait(); } } final Object[] items = this.elements; items[putIndex] = value; if (++putIndex == items.length) { putIndex = 0; } count++; synchronized (notEmpty) { notEmpty.notify(); } } @SuppressWarnings("unchecked") @Override public T take() throws InterruptedException { synchronized (notEmpty) { while (count == 0) { notEmpty.wait(); } } final Object[] items = this.elements; T value = (T) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) { takeIndex = 0; } count--; synchronized (notFull) { notFull.notify(); } return value; }}
场景入口类:
public class Main { public static void main(String[] args) throws Exception { BlockingQueue<String> queue = new DefaultBlockingQueue<>(5); Runnable r = () -> { while (true) { try { String take = queue.take(); System.out.println(String.format("线程%s消费消息-%s", Thread.currentThread().getName(), take)); } catch (Exception e) { e.printStackTrace(); } } }; new Thread(r, "thread-1").start(); new Thread(r, "thread-2").start(); IntStream.range(0, 10).forEach(i -> { try { queue.put(String.valueOf(i)); } catch (InterruptedException e) { //ignore } }); Thread.sleep(Integer.MAX_VALUE); }}
某次执行结果如下:
线程thread-1消费消息-0线程thread-2消费消息-1线程thread-1消费消息-2线程thread-2消费消息-3线程thread-1消费消息-4线程thread-2消费消息-5线程thread-1消费消息-6线程thread-2消费消息-7线程thread-1消费消息-8线程thread-2消费消息-9
上面这个例子就是简单的单生产者-多消费者的模型。
这里实现一个极度简陋的固定容量的线程池,功能是:初始化固定数量的活跃线程,阻塞直到有可用的线程用于提交任务。它只有一个接口方法,接口定义如下:
public interface ThreadPool { void execute(Runnable runnable);}
具体实现如下:
public class DefaultThreadPool implements ThreadPool { private final int capacity; private List<Worker> initWorkers; private Deque<Worker> availableWorkers; private Deque<Worker> busyWorkers; private final Object nextLock = new Object(); public DefaultThreadPool(int capacity) { this.capacity = capacity; init(capacity); } private void init(int capacity) { initWorkers = new ArrayList<>(capacity); availableWorkers = new LinkedList<>(); busyWorkers = new LinkedList<>(); for (int i = 0; i < capacity; i++) { Worker worker = new Worker(); worker.setName("Worker-" + (i + 1)); worker.setDaemon(true); initWorkers.add(worker); } for (Worker w : initWorkers) { w.start(); availableWorkers.add(w); } } @Override public void execute(Runnable runnable) { if (null == runnable) { return; } synchronized (nextLock) { while (availableWorkers.size() < 1) { try { nextLock.wait(500); } catch (InterruptedException e) { //ignore } } Worker worker = availableWorkers.removeFirst(); busyWorkers.add(worker); worker.run(runnable); nextLock.notifyAll(); } } private void makeAvailable(Worker worker) { synchronized (nextLock) { availableWorkers.add(worker); busyWorkers.remove(worker); nextLock.notifyAll(); } } private class Worker extends Thread { private final Object lock = new Object(); private Runnable runnable; private AtomicBoolean run = new AtomicBoolean(true); private void run(Runnable runnable) { synchronized (lock) { if (null != this.runnable) { throw new IllegalStateException("Already running a Runnable!"); } this.runnable = runnable; lock.notifyAll(); } } @Override public void run() { boolean ran = false; while (run.get()) { try { synchronized (lock) { while (runnable == null && run.get()) { lock.wait(500); } if (runnable != null) { ran = true; runnable.run(); } } } catch (Exception e) { e.printStackTrace(); } finally { synchronized (lock) { runnable = null; } if (ran) { ran = false; makeAvailable(this); } } } } }}
编写一个场景类:
public class MainClient { private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); public static void main(String[] args) throws Exception{ ThreadPool threadPool = new DefaultThreadPool(2); threadPool.execute(() -> { try { System.out.println(String.format("[%s]-任务一开始执行持续3秒...", LocalDateTime.now().format(F))); Thread.sleep(3000); System.out.println(String.format("[%s]-任务一执行结束...", LocalDateTime.now().format(F))); }catch (Exception e){ //ignore } }); threadPool.execute(() -> { try { System.out.println(String.format("[%s]-任务二开始执行持续4秒...", LocalDateTime.now().format(F))); Thread.sleep(4000); System.out.println(String.format("[%s]-任务二执行结束...", LocalDateTime.now().format(F))); }catch (Exception e){ //ignore } }); threadPool.execute(() -> { try { System.out.println(String.format("[%s]-任务三开始执行持续5秒...", LocalDateTime.now().format(F))); Thread.sleep(5000); System.out.println(String.format("[%s]-任务三执行结束...", LocalDateTime.now().format(F))); }catch (Exception e){ //ignore } }); Thread.sleep(Integer.MAX_VALUE); }}
某次执行结果如下:
[2019-04-29 02:07:25.465]-任务二开始执行持续4秒...[2019-04-29 02:07:25.465]-任务一开始执行持续3秒...[2019-04-29 02:07:28.486]-任务一执行结束...[2019-04-29 02:07:28.486]-任务三开始执行持续5秒...[2019-04-29 02:07:29.486]-任务二执行结束...[2019-04-29 02:07:33.487]-任务三执行结束...
鉴于笔者C
语言学得不好,这里就无法深入分析JVM
源码的实现,只能结合一些现有的资料和自己的理解重新梳理一下Object
提供的阻塞和唤醒机制这些知识点。结合之前看过JUC
同步器的源码,一时醒悟过来,JUC
同步器只是在数据结构和算法层面使用Java
语言对原来JVM
中C
语言的阻塞和唤醒机制即Object
提供的那几个JNI
方法进行了一次实现而已。
最后,Object
提供的阻塞等待唤醒机制是JVM
实现的(如果特别熟悉C
语言可以通过JVM
源码研究其实现,对于大部分开发者来说这部分的知识其实是暗箱),除非是特别熟练或者是JDK
版本太低尚未引入JUC
包(JUC
包是JDK1.5
或者之后才加入到JDK
中)。一般情况下「不应该优先选择Object
」,一方面因为Object
提供的API
是Native
方法,其功能有可能受到JVM
版本的影响(有可能带来性能提升这样的正面影响,也有可能是负面影响),另一方面Object
提供的API
其实并不灵活。综合来看,实际开发中更建议使用专门为并发设计的JUC
包中的锁相关类库,例如可重入锁ReentrantLock
。
❝话虽然这样说,但是笔者发现了在JDK1.5之后的一些版本中,有部分原来使用了
❞ReentrantLock
的类库被重新换回去Object
提供的API
以及synchronized
,原因是:更喜欢使用synchronized和Object Monitor。具体是哪些类一时想不起来,是很久一段时间之前看源码的注释看到的。看来JDK的开发者也是有脾气的.....
直到JDK11
为止,还有大量的JDK
类库使用了Object
提供的API
以及synchronized
关键字实现的阻塞和唤醒功能,此所谓存在即合理。
关于“Object提供的阻塞和唤醒API有什么用”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/throwable/blog/4409362