目录
什么是ConditionObject:
ConditionObject的应用:
源码分析await()方法:(前置)
进入await()方法:
进入addConditionWaiter()方法:
unlinkCancelledWaiters()方法
fullyRelease(node)方法:
源码分析signal()方法:
主体了解一下唤醒的流程:
signal():方法
doSignal(first);方法
transferForSignal(first)方法:
源码分析await()方法:(后置)
方法transferAfterCancelledWait(node):
isOnSyncQueue()方法
acquireQueued(node, savedState)方法
reportInterruptAfterWait(int interruptMode)方法:
源码分析awaitNanos&signalAll方法
进入方法awaitNanos():
宏观理解方法实现
signalAll方法()
什么是ConditionObject:
在synchronized里面我们可以使用wait()方法挂起线程,notify()方法唤醒线程,ReentrantLock也提供和了自己专属的方式达到这个要求。那就是await()方法和signal()方法。
注意在使用await()方法和signal()方法的线程必须拥有锁。
ConditionObject的应用:
我们的流程的分析:
首先我们的线程1开始时获取到锁资源吗,之后调用await()方法,将当前获得到锁的线程1挂起,这时我们的主线程开始获取锁,之后让主线程将线程1唤醒,之后线程1结束锁资源。大家可以直接运行一下代码看一下效果。
public class Show_Contidion {private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {Condition condition = lock.newCondition();Thread t1 = new Thread(() -> {lock.lock();//开始获取到System.out.println("线程1获取到锁资源");try {//我是先睡眠,所以现在主线程要等到可以执行await()时才可以获取资源Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}try {condition.await();//将线程挂起} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("线程一再次获取到锁");});t1.start();try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}
lock.lock();System.out.println("主线程获取到锁");condition.signal();lock.unlock();}
}
源码分析await()方法:(前置)
首先我们在前置分析中,这是分析到线程挂起。后续的在后置中讲解,接着读。
看源码时发现对于单项链表,我们设置一个哨兵节点是很有必要的,这对于我们在写算法与数据结构时也有帮助。
进入await()方法:
在进入后我们发现这个方法是抽象的方法,接着直接去看他的实现类:
void await() throws InterruptedException;
在前置方法里面我们先只是看到线程挂起的过程,在线程挂起之后的那部分我们在后续的后置部分讲解。
public final void await() throws InterruptedException {//判断是否出现中断,如果是中断直接抛出异常if (Thread.interrupted())throw new InterruptedException();//讲节点添加进condition单向链表Node node = addConditionWaiter();//讲持有的锁全部释放int savedState = fullyRelease(node);//设置中断的modeint interruptMode = 0;//如果不再AQS队列是,开始挂起线程 //因为是先释放的锁资源,如果在释放之后,其他的线程直接讲当前线程唤醒,那么他就直接进入AQS队列直接排队,准备抢占线程,那样的话,无需再次挂起线程。while (!isOnSyncQueue(node)) {LockSupport.park(this);//...后置代码在接下来讲解} }
进入addConditionWaiter()方法:
private Node addConditionWaiter() {//获取队列队列中的最后的节点Node t = lastWaiter;// 如果最后的节点不是null并且节点的状态不是-2 证明这个节点不是有效的节点if (t != null && t.waitStatus != Node.CONDITION) {//进入方法,直接将节点取消unlinkCancelledWaiters();//讲t设置为新的尾节点t = lastWaiter;}//设置节点信息,将状态设置为-2Node node = new Node(Thread.currentThread(), Node.CONDITION);//如果t==null 证明condition单向链表里面没有任何的元素,直接设置当前节点为first节点if (t == null)firstWaiter = node;else//将节点设置再尾节点后t.nextWaiter = node;//重新设置尾节点lastWaiter = node;//返回封装好的节点,便于接下来的操作。return node;}
unlinkCancelledWaiters()方法
private void unlinkCancelledWaiters() {//获取队列的开头节点Node t = firstWaiter;Node trail = null;//如果节点不是nullwhile (t != null) {//节点next为t的下一个节点Node next = t.nextWaiter;//t的线程也是不正常的if (t.waitStatus != Node.CONDITION) {//设置他直接指向null,方便回收t.nextWaiter = null;//如果头节点就是nullif (trail == null)//讲头节点设置为他的下一个节点firstWaiter = next;else//将他的指针指向他下一个的下一个,讲失效的节点直接清除。trail.nextWaiter = next;if (next == null)lastWaiter = trail;}else//讲rail设置为ttrail = t;//设置t为直接的next;t = next;} }
fullyRelease(node)方法:
final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();//核心就是这个代码,还是释放锁的代码,但是这里面释放的值就是state的值。直接设置为0;if (release(savedState)) {failed = false;//记录了未释放之前的state的值(注意!!!!!!)return savedState;} else {//失败就是使用这个方式必须持有锁。没持有的化,不需要挂起throw new IllegalMonitorStateException();}} finally {if (failed)//如果释放锁失败,将节点设置为取消。还是你没有锁资源,就是用方法await().创建的节点只能是取消node.waitStatus = Node.CANCELLED;}}
源码分析signal()方法:
将等待时间最长的线程(如果存在)从此条件的等待队列移动到拥有锁的等待队列这是源码里的话,证明我们的唤醒顺序是从头开始的。
主体了解一下唤醒的流程:
-
首先检验是否是当前线程,如果是的话才能进行接下来的操作
-
唤醒时从第一个节点开始唤醒 如果第一个节点就是null证明当前condition单向链表里面没有任何的节点,无需唤醒。
-
开始唤醒之后,检查要唤醒的节点状态是否正常,将当前唤醒的节点的nextWaiter这是为null,便于再唤醒后回收这个节点
-
唤醒时将节点放置到AQS队列,再放置到AQS队列时需要进行健壮性分析,如果他的前置接点出现问题,将现在这个新的节点直接唤醒。
signal():方法
public final void signal() {//查看是否是当前的线程,如果不是直接抛出异常if (!isHeldExclusively())throw new IllegalMonitorStateException();//获取第一个节点,开始唤醒Node first = firstWaiter;if (first != null) //但是被唤醒的节点不能是nulldoSignal(first);//开始唤醒}
doSignal(first);方法
private void doSignal(Node first) {do {//如果当前节点的下一个节点是null,也就是当前节点就是最后一个节点了if ( (firstWaiter = first.nextWaiter) == null)//设置最后节点为nulllastWaiter = null;,//如果不是,后面还是有节点的 将这个节点指针设置为null 便于垃圾回收first.nextWaiter = null;} while (//!transferForSignal(first) //如果返回时false 继续判断,不然直接结束循环&&// (first = firstWaiter) != null);//如果下一个节点不时null 继续循环}
transferForSignal(first)方法:
final boolean transferForSignal(Node node) {//开始将节点的状态从-2改为0 如果不能改的话就是节点已经取消 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))//表示将讲节点放到AQS队列失败return false;//到这里就是修改节点状态成功 p是AQS队列,就是将当前节点插入到p节点的后面Node p = enq(node);//获取p节点的状态int ws = p.waitStatus;//如果状态是1 证明节点已经取消 或者是再将p节点的ws设置为-1失败时if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))//将线程唤醒,放置因为前置节点状态问题,当前节点无法唤醒的健壮性分析LockSupport.unpark(node.thread);return true;}
源码分析await()方法:(后置)
使用线程唤醒之后,首先我们需要知道他是如何背唤醒的,是中断唤醒还是signal()唤醒,还是先被signal()唤醒之后紧接着被中断。
只要是唤醒之后我们就是要保证我们的节点在AQS队列,并且在condition单向链表中将节点取消。
如果是REINTERRUPT需要重新设置中断标记位。
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {LockSupport.park(this);//经过上面的过程将线程挂起//状态是0 是signal唤醒 //状态是-1 中断唤醒, 那这个节点是不是再AQS队列里面呢:探讨//状态时1 背signal唤醒之后,背中断//checkInterruptWhileWaiting(node))之后节点一定在AQS中if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//如果不是直接最简单的signal()唤醒,直接跳出循环//如果中断的话break;}// 如果返回时true 证明在AQS排队时,线程再次中断,但是中断的状态不是THROW_IE,那就是证明是REINTERRUPTif (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) //如果节点的nextWaiter不是null,证明还是在condition单向链表里面unlinkCancelledWaiters();//将节点取消if (interruptMode != 0)//如果不是0,reportInterruptAfterWait(interruptMode);}private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ?//THROW_IE:被中断唤醒 -1//REINTERRUPT:背signal唤醒之后,被中断 1(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0;//如果是零的话就是没有中断 }
方法transferAfterCancelledWait(node):
//如果返回时true 那就是直接被中断唤醒,如果是false那就是signal()唤醒的,之后中断,如果是singal()唤醒的,那就是一定在AQS里面,如果是中断唤醒的那就不在AQS中。 final boolean transferAfterCancelledWait(Node node) {//CAS操作将节点的状态由-2转化为0成功,那就是我的节点还是再condition单向链表中,那就是中断唤醒的if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {//将节点放置到AQS队列enq(node);//注意现在我们的节点即在AQS里,也在condition单向链表里面return true;}如果节点没有在AQS队列中,那就是证明现在还没有放置到AQS队列中,那就等一会,一直到成功放入while (!isOnSyncQueue(node))Thread.yield();//返回的是false 就是signal()唤醒的,之后中断return false;}
isOnSyncQueue()方法
final boolean isOnSyncQueue(Node node) {//判断节点是否在AQS队列中if (node.waitStatus == Node.CONDITION || node.prev == null)//如果节点的状态是-2证明节点在condition单向链表,或者是当前节点的前置接点是null,AQS队列里面由伪节点,不会出现前置节点为null,所以也是没在AQS队列return false;if (node.next != null) // 如果你的后置接点不为空,直接返回truereturn true;//最后就是从后往前遍历AQS队列,查看是否存在节点return findNodeFromTail(node); }
acquireQueued(node, savedState)方法
//对于lock()方法里面的中断操作我们先忽略。 final boolean acquireQueued(final Node node, int arg) {//设置状态,当前未获取到锁资源boolean failed = true;try {boolean interrupted = false;//死循环,必须完成一件事 for (;;) {//查找当前节点的前驱节点final Node p = node.predecessor();//开始判断,如果我的前置节点就是伪节点(注意是伪节点)直接在词使用tryAcquire()尝试获取锁资源if (p == head && tryAcquire(arg)) {//如果成功,就设置一些头节点setHead(node);p.next = null; // help GC//设置已经获取锁资源failed = false;//返回的值是false.return interrupted;}//如果前置节点不是伪节点的话,if (shouldParkAfterFailedAcquire(p, node) &&//挂起线程parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
reportInterruptAfterWait(int interruptMode)方法:
private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {//如果是中断唤醒,直接抛出异常if (interruptMode == THROW_IE)throw new InterruptedException();//如果是signal()唤醒之后在中断else if (interruptMode == REINTERRUPT)//再次设置中断标记selfInterrupt(); }
源码分析awaitNanos&signalAll方法
其实awaitNancos()就是带有时间的await方式,其实核心的思想还是和上面的思路几乎一致。
现在开始源码的分析:
long awaitNanos(long nanosTimeout) throws InterruptedException;
进入方法awaitNanos():
小编建议我们在学习源码时,还是要看一下源码的注释的,因为原作者的注释能够帮助我们学习他的思路和想法,小编在这里把注释放到文章中,帮助大家学习
implements timed condition wait. If current thread is interrupted, throw InterruptedException. Save lock state returned by getState. Invoke release with saved state as argument, throwing IllegalMonitorStateException if it fails. Block until signalled, interrupted, or timed out. Reacquire by invoking specialized version of acquire with saved state as argument. If interrupted while blocked in step 4, throw InterruptedException. //--------------------------------------------------------------------------------------------- 下面是直接翻译的版本 实现定时条件 wait。 如果当前线程被中断,则抛出 InterruptedException。 保存返回的 getState锁定状态。 以保存的状态作为参数调用 Invoke release ,如果失败,则抛出 IllegalMonitorStateException。 阻塞,直到发出信号、中断或超时。 通过调用具有保存状态作为参数的专用版本 来 acquire 重新获取。 如果在步骤 4 中被阻塞时被中断,则引发 InterruptedException
宏观理解方法实现
-
首先是看一下是否中断
-
之后看一下时间是否足够,时间长的话继续挂起,时间短或者<0开始唤醒
public final long awaitNanos(long nanosTimeout)throws InterruptedException {//如果中断,抛出异常if (Thread.interrupted())throw new InterruptedException();//获取节点信息,添加到condition单向链表Node node = addConditionWaiter();//获取节点的状态。int savedState = fullyRelease(node);//计算结束时间final long deadline = System.nanoTime() + nanosTimeout;//初始化参数int interruptMode = 0;//查看时候再AQS队列中while (!isOnSyncQueue(node)) {//如果时间结束,if (nanosTimeout <= 0L) {//将节点放置到AQS队列transferAfterCancelledWait(node);//直接跳出循环。break;}//如果还是有时间的,查看时间时候充足,时间还是充足的话继续将节点挂起if (nanosTimeout >= spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);//如果状态不是0,那就是被唤醒,直接跳出循环,因为我们在刚才讲解方法checkInterruptWhileWaiting()//时题到,只要时进入到checkInterruptWhileWaiting()节点一定进入到AQS队列if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;//在次计算时间nanosTimeout = deadline - System.nanoTime();}//如果前面返回的是true,就是在方放入队列是发生中断,但是中断还不是THROW_IE,那就是REINTERRUPTif (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;//将condition()里的无效节点取消if (node.nextWaiter != null)unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);//返回剩余时间return deadline - System.nanoTime(); }
signalAll方法()
// 以do-while的形式,将Condition单向链表中的所有Node,全部唤醒并扔到AQS队列 private void doSignalAll(Node first) {// 将头尾都置位null~lastWaiter = firstWaiter = null;do {// 拿到next节点的引用Node next = first.nextWaiter;// 断开当前Node的nextWaiterfirst.nextWaiter = null;// 修改Node状态,扔AQS队列,是否唤醒!transferForSignal(first);// 指向下一个节点first = next;} while (first != null); }