文章目录
- 1、Semaphore
- 1.1、acquire
- 1.2、release
- 2、CountDownLatch
- 2.1、await
- 2.2、countDown
1、Semaphore
1.1、acquire
Semaphore.acquire
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}
AbstractQueuedSynchronizer.acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//尝试获取共享资源(默认非公平的实现) if (tryAcquireShared(arg) < 0)//若获取资源失败进入这个方法doAcquireSharedInterruptibly(arg);}
Semaphore.tryAcquireShared
//Semaphore非公平锁的实现protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}//Semaphore公平锁的实现protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}
AbstractQueuedSynchronizer.doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {//封装共享节点并入队final Node node = addWaiter(Node.SHARED);try {for (;;) {final Node p = node.predecessor();//如果前一个节点是头结点,再次尝试获锁if (p == head) {//获取共享资源int r = tryAcquireShared(arg);if (r >= 0) {//如果获取成功设置头节点并传播//其实就是成功获取资源后接着唤醒后面的节点,因为是共享资源,所以要尽可能唤醒更多的节点setHeadAndPropagate(node, r);p.next = null;return;}}//如果获锁失败或者前节点不是head的节点就根据前节点的状态来看是否需要阻塞//需要阻塞就调用 LockSupport.park() 阻塞线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} catch (Throwable t) {cancelAcquire(node);throw t;}}
被唤醒后…
AbstractQueuedSynchronizer.setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) {Node h = head;//设置头结点setHead(node);//唤醒后继节点if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;//如果是共享节点调用doReleaseShared唤醒后继节点if (s == null || s.isShared())doReleaseShared();}}
AbstractQueuedSynchronizer.doReleaseShared
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;//cas(SIGNAL->0) + unpark if (ws == Node.SIGNAL) {if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))continue;unparkSuccessor(h);}//cas(0->PROPAGATE)else if (ws == 0 &&!h.compareAndSetWaitStatus(0, Node.PROPAGATE))continue;}//如果head不变,说明已经没有需要唤醒的节点了,则breakif (h == head)break;}}
1.2、release
Semaphore.release
public void release() {sync.releaseShared(1);}
AbstractQueuedSynchronizer.releaseShared
public final boolean releaseShared(int arg) {//释放共享资源if (tryReleaseShared(arg)) {//成功释放则调用 doReleaseShared 唤醒节点doReleaseShared();return true;}return false;}
Semaphore.tryReleaseShared
protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}
可以发现独占锁只会唤醒一个节点,而共享锁会类似传播一样从第一个被唤醒的节点开始逐次唤醒后面的节点。
2、CountDownLatch
2.1、await
CountDownLatch.await
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}
acquireSharedInterruptibly上面已经分析过了,这里直接看到CountDownLatch的获取共享资源的实现
CountDownLatch.tryAcquireShared
protected int tryAcquireShared(int acquires) {//如果state为0,则允许获取,反之不允许return (getState() == 0) ? 1 : -1;}
2.2、countDown
CountDownLatch.countDown
public void countDown() {sync.releaseShared(1);}
releaseShared上面也已经分析过了,这里同样直接看到CountDownLatch的释放共享资源的实现
CountDownLatch.tryReleaseShared
protected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();if (c == 0)return false;// state - 1int nextc = c - 1;//cas替换stateif (compareAndSetState(c, nextc))//如果替换后state为0,则返回true,表示可以唤醒节点了return nextc == 0;}}