Condition 源码分析

它是一个用来多线程的协调通信的工具类,当某个线程阻塞等待某个条件时,当满足条件才会被唤醒。


public interface Condition {

    void await() throws InterruptedException;
    void awaitUninterruptibly();
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    void signal();

    ....
}

两个重要方法,await()和signal()。

它的实现在 AbstractQueuedSynchronizer (也就是我们平时说的aqs)和 AbstractQueuedLongSynchronizer中

await

调用此方法会使得线程进入等待队列并释放锁,线程的状态变成等待状态

        public final void await() throws InterruptedException {
 //允许线程中断
            if (Thread.interrupted())
                throw new InterruptedException();
 //创建一个状态为condition的节点,采用链表的形式存放数据
            Node node = addConditionWaiter();
//释放当前的锁,得到锁的状态,释放等待队列中的一个线程
            int savedState = fullyRelease(node);
            int interruptMode = 0;
    //判断当前节点是或否在队列上
            while (!isOnSyncQueue(node)) {
 //挂起当前线程
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
//acquireQueued为false就拿到了锁
    //interruptMode != THROW_IE表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  //将这个变量设置成 REINTERRUPT
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
  //如果node节点的下一个等待者不为空,则开始进行清理,清理condition节点 
                unlinkCancelledWaiters();
            if (interruptMode != 0)
 //如果线程中断了,需要抛出异常  
                reportInterruptAfterWait(interruptMode);
        }
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    //如果lastWaiter不等于空并且waitStatus不为condition,把这个节点从链表中移除
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //创建一个状态为condition的单向列表
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        //获得重入的次数
        int savedState = getState();
        //释放并唤醒同步队列中的线程
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
final boolean isOnSyncQueue(Node node) {
    //判断当前节点是否在队列中,false表示不在,true表示在
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     * node.prev can be non-null, but not yet on queue because
     * the CAS to place it on queue can fail. So we have to
     * traverse from tail to make sure it actually made it.  It
     * will always be near the tail in calls to this method, and
     * unless the CAS failed (which is unlikely), it will be
     * there, so we hardly ever traverse much.
     */
    /从tail节点往前扫描AQS队列,如果发现AQS队列中的节点与当前节点相等,则说明节点一定存在与队列中
    return findNodeFromTail(node);
}

signal()

调用此方法,将会唤醒在AQS队列中的节点

public final void signal() {
    //判断当前线程是否获得了锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //AQS队列的第一个节点
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
private void doSignal(Node first) {
    do {
        //从condition队列中移除first节点
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    //更新节点状态为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    //调用 enq,把当前节点添加到AQS队列。并且返回返回按当前节点的上一个节点,也就是原tail 节点
    Node p = enq(node);
    int ws = p.waitStatus;
    //如果上一个节点被取消了,尝试设置上一节点状态为SIGNAL
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        //唤醒节点上的线程
        LockSupport.unpark(node.thread);
    return true;
}
  • 阻塞:await()方法中,在线程释放锁资源之后,如果节点不在AQS等待队列,则阻塞当前线程,如果在等待队列,则自旋等待尝试获取锁;
  • 释放:signal()后,节点会从condition队列移动到AQS等待队列,则进入正常锁的获取流程。