原创

ReentrantLock核心源码分析

ReentrantLock核心源码分析:

ReentrantLock核心原理采用AQS原理实现。

ReentrantLock实现可重入锁核心原理:

file
file

公平锁与非公平锁的区别

  • 所谓公平锁,即新加入的线程会按照尝试获取锁的先后顺序依次拿到锁
  • 所谓非公平锁,即新加入的线程会进行lock方法加锁进行CAS尝试,加锁失败之后tryAcquire方法state为0时候还会进行加锁尝试,而公平锁lock加锁不会尝试,tryAcquire方法state为0时候并且前面没有线程队列排队的时候才会进行加锁尝试。

ReentrantLock实现公平锁与非公平锁核心原理:

  • 非公平锁的线程加锁会先进行CAS操作尝试获取锁,而公平锁直接进行后续操作。实现即重写抽象类Sync的lock方法的不同。
  • 非公平锁如果state状态为0就尝试加锁,而公平锁如果state为0并且前面没有排队的线程队列才尝试加锁。实现即重写抽象类AQS的tryAcquire方法的不同。

公平锁:

file

非公平锁:

file

file

lock加锁过程

1、调用ReentrantLock的lock方法。

public class ReentrantLock implements Lock, java.io.Serializable {
    public void lock() {
        sync.lock();
    }
}

2、调用抽象类Sync的实现FairSync类与NonfairSync类实现公平锁还是非公平锁

public class ReentrantLock implements Lock, java.io.Serializable {
    private final Sync sync;

    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {

        // 加锁分为公平锁与非公平锁,分别由FairSync类与NonfairSync类实现。
        abstract void lock();
    }
}

3、加锁

非公平锁的线程加锁会先进行CAS操作尝试获取锁,而公平锁直接进行后续操作。实现即重写抽象类Sync的lock方法的不同。

3.1、非公平锁加锁

调用非公平锁NonfairSync的lock方法

public class ReentrantLock implements Lock, java.io.Serializable {

    static final class NonfairSync extends Sync {
        // 实现Sync抽象类的lock方法。
        final void lock() {
            // CAS尝试加锁
            if (compareAndSetState(0, 1))
                // 成功后将当前线程设置为持有锁的线程,为以后锁重入使用
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 尝试
                acquire(1);
        }
    }
}

3.2、公平锁加锁

调用公平锁的lock方法

public class ReentrantLock implements Lock, java.io.Serializable {

    static final class FairSync extends Sync {

        final void lock() {
            acquire(1);
        }
}

4、调用AQS的acquire方法尝试加锁

首先会尝试调用公平锁与非公平锁各自的tryAcquire获取锁,如果获取到了,返回true,就会退出if语句,如果没有获取到,那么则将当前线程添加到队列中,并循环获取锁,直到获取到为止。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

    // 尝试获取锁
    public final void acquire(int arg) {
        // 尝试获取锁
        if (!tryAcquire(arg) &&
            // 失败添加队列中等待
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 中断自身线程
            selfInterrupt();
            // static void selfInterrupt() {
            //     Thread.currentThread().interrupt();
            // }
    }

4.1、尝试加锁

非公平锁如果state状态为0就尝试加锁,而公平锁如果state为0并且前面没有排队的线程队列才尝试加锁。实现即重写抽象类AQS的tryAcquire方法的不同。

4.1.1、非公平锁尝试加锁方式

调用非公平锁NonfairSync的tryAcquire方法尝试加锁

public class ReentrantLock implements Lock, java.io.Serializable {

    static final class NonfairSync extends Sync {
        protected final boolean tryAcquire(int acquires) {
            // 调用抽象类Sync的nonfairTryAcquire方法加锁。
            return nonfairTryAcquire(acquires);
        }
    }
}

调用抽象类Sync的nonfairTryAcquire方法加锁

public class ReentrantLock implements Lock, java.io.Serializable {

    abstract static class Sync extends AbstractQueuedSynchronizer {

        /**
         * tryLock方法也会调用这个方法直接尝试,即使设置的是公平锁。
         */
        final boolean nonfairTryAcquire(int acquires) {
            // 获取当前线程
            final Thread current = Thread.currentThread();
            // 得到AQS的状态
            int c = getState();
            // 如果未加锁
            if (c == 0) {
                // CAS尝试加锁
                if (compareAndSetState(0, acquires)) {
                    // 设置当前线程持有锁
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果获取锁是当前线程
            else if (current == getExclusiveOwnerThread()) {
                // acquires为1,所以加锁每次AQS的状态加1。
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 设置状态
                setState(nextc);
                return true;
            }
            return false;
        }
    }
}

4.1..2、公平锁尝试加锁方式

调用公平锁FairSync的tryAcquire方法尝试加锁

public class ReentrantLock implements Lock, java.io.Serializable {

    static final class FairSync extends Sync {

        protected final boolean tryAcquire(int acquires) {
            // 获取当前线程
            final Thread current = Thread.currentThread();
            // 获取AQS状态
            int c = getState();
            // 如果未加锁
            if (c == 0) {
                // 线程前面是否有排队的线程,没有就尝试加锁,有排队线程就继续去排队
                if (!hasQueuedPredecessors() &&
                    // CAS尝试加锁
                    compareAndSetState(0, acquires)) {
                    // 设置当前线程持有锁
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果获取锁是当前线程
            else if (current == getExclusiveOwnerThread()) {
                // acquires为1,所以加锁每次AQS的状态加1。
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                // 更新AQS的状态
                setState(nextc);
                return true;
            }
            return false;
        }
    }
}

调用hasQueuedPredecessors方法查看当前线程前排是否还有排队的线程

public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    //只有当没有任何节点获取锁或者本节点为head后第一个节点
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

4.2、加入队列中等待

方法为AQS原理

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

    // 尝试获取锁
    public final void acquire(int arg) {
        // 尝试获取锁
        if (!tryAcquire(arg) &&

            // 等待队列进行加锁尝试
            acquireQueued(
                // 失败添加队列中等待
                addWaiter(Node.EXCLUSIVE), arg)
            )
            // 中断自身线程
            selfInterrupt();
            // static void selfInterrupt() {
            //     Thread.currentThread().interrupt();
            // }
    }

4.2.1 构建线程等待队列
(1)为当前线程和给定模式创建并排队节点。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

    private Node addWaiter(Node mode) {
        //使用当前线程创建一个Node节点,mode分为共享和排他
        Node node = new Node(Thread.currentThread(), mode);

        Node pred = tail;
        if (pred != null) {
            //如果队列中已经存在节点了,那么直接将该节点CAS添加到后面
            node.prev = pred;
            // 尝试添加节点,如果CAS操作失败,走enq方法进行CAS循环添加尝试。
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 将节点插入队列,必要时进行初始化。
        enq(node);
        return node;
    }
}

(2)调用AQS的enq方法进行节点队列构建,必要时进行初始化。
设置失败的时候继续进行循环尝试

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

    // 将节点插入队列,必要时进行初始化。
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            //如果尾节点还是空的,那么构建一个空节点做为头节点
            if (t == null) {
                //然后在下一次循环的时候进入到else
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //和上面一样,将当前线程构建的节点添加到队列的尾部
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
}

4.2.2、线程队列进行锁资源争抢

非公平锁,即新加入的线程节点会进行加锁尝试,而其他的节点会进行排队等待唤醒

调用acquireQueued方法进行等待线程队列加锁尝试。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

    // 参数Node为当前想要获取线程锁的线程节点
    final boolean acquireQueued(final Node node, int arg) {

        // 标识获取锁的过程中是否出现了异常
        boolean failed = true;
        try {
            //标识线程在等待唤醒的时候是否被打断(interrupt)
            boolean interrupted = false;
            // 这里会循环获取锁,知道获取到或者出现异常
            for (;;) {
                // 获得当前想要获取线程锁的上一个线程节点(即:p节点)
                final Node p = node.predecessor();
                // 如果他是头节点,进行尝试加锁操作。(AQS思想认为头节点不参与排队,及排队买票第一个人不是在排队,而是已经在办理买票业务,如果你的前一个节点是头节点,那么当前在获取锁的时候,头节点线程可能已经执行完毕,但是还没来得及通知你,所以当前线程要进行加锁尝试,避免当前线程刚刚进入就被唤醒带来的开销)
                // 如果p不是头节点,则直接跳过加锁尝试。(所谓非公平锁,非公平是作用于新入的节点,而已经调用过addWaiter方法的节点,则需要排队)
                if (p == head && tryAcquire(arg)) {
                    // 如果加锁成功,当前线程则为头节点
                    setHead(node);
                    // 帮助GC回收执行完的上一个线程节点
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 判断当前线程获取锁失败后是否需要进入到park状态。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 如果需要,则进入park,并进入等待中断状态。当unpark时,还会进行循环尝试获取锁。
                    parkAndCheckInterrupt())
                    // 设置状态为中断状态
                    interrupted = true;
            }
        } finally {
            // 如果操作失败抛出异常
            if (failed)
                // 取消尝试加锁
                cancelAcquire(node);
        }
    }
}

shouldParkAfterFailedAcquire方法为尝试加锁失败后是否应该阻塞

  • 如果前继节点等待状态waitStatus是SIGNAL,则需要park。
  • 如果前继节点等待状态waitStatus状态为已取消,删除无效节点。
  • 如果前继节点等待状态waitStatus状态为其他状态,说明还在执行,应该将其状态设置为SIGNAL状态,阻塞后面线程让其等待。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            //如果前节点是SIGNAL状态,则代表需要park
            return true;
        if (ws > 0) { 
            //如果waitStatus的值大于0,代表已取消,需要将无效的节点删除
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //否则将前节点设置为SIGNAL状态
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
}

调用parkAndCheckInterrupt方法当前线程进入park状态,并中断当前线程

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

    private final boolean parkAndCheckInterrupt() {
        //通过LockSupport类来park该线程
        LockSupport.park(this);
        //将park的线程唤醒可能是调用unpark方法,也可能是被打断了
        return Thread.interrupted();
    }

}

tryLock尝试加锁过程

tryLock()是一个特例,即使你是公平锁,当你调用tryLock的时候,即使设置的是公平锁,前面有线程排队,也会立即尝试,不遵守公平锁规则

1、直接调用非公平锁(抽象类Sync)中的nonfairTryAcquire方法尝试。

    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

2、调用抽象类Sync的nonfairTryAcquire方法加锁

public class ReentrantLock implements Lock, java.io.Serializable {

    abstract static class Sync extends AbstractQueuedSynchronizer {

        /**
         * tryLock方法也会调用这个方法直接尝试,即使设置的是公平锁。
         */
        final boolean nonfairTryAcquire(int acquires) {
            // 获取当前线程
            final Thread current = Thread.currentThread();
            // 得到AQS的状态
            int c = getState();
            // 如果未加锁
            if (c == 0) {
                // CAS尝试加锁
                if (compareAndSetState(0, acquires)) {
                    // 设置当前线程持有锁
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果获取锁是当前线程
            else if (current == getExclusiveOwnerThread()) {
                // acquires为1,所以加锁每次AQS的状态加1。
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 设置状态
                setState(nextc);
                return true;
            }
            return false;
        }
    }
}

unock解锁过程

1、调用ReentrantLock的unlock方法。

public class ReentrantLock implements Lock, java.io.Serializable {

    // 调用Sync抽象类的release方法尝试解锁
    public void unlock() {
        sync.release(1);
    }
}

2、调用AQS的release方法尝试解锁

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

    public final boolean release(int arg) {
        // 调用tryRelease尝试解锁
        if (tryRelease(arg)) {
            // 取出头节点
            Node h = head;
            // 如果头节点为null,并且不为初始状态
            if (h != null && h.waitStatus != 0)
                // 需要unpark下一个线程队列任务进行执行。
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
}

2.1、调用AQS的抽象方法tryRelease尝试解锁,调用Sync的tryRelease方法进行解锁

public class ReentrantLock implements Lock, java.io.Serializable {

    abstract static class Sync extends AbstractQueuedSynchronizer {

        protected final boolean tryRelease(int releases) {
            // releases在unlock的时候为1(下面有源码追踪),所以解锁每次AQS的状态减1。
            int c = getState() - releases;
            // 如果不是当前线程持有锁,解锁抛出IllegalMonitorStateException异常。
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // 如果AQS的状态为0才解锁
            if (c == 0) {
                free = true;
                // 解锁,持有锁线程置为null
                setExclusiveOwnerThread(null);
            }
            // 更新AQS的状态
            setState(c);
            // 返回锁状态
            return free;
        }
    }
}

2.2、需要unparkSuccessor方法unpark下一个线程队列任务进行执行。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

    // 参数node传入的为头节点
    private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
        // 如果头节点线程不是初始状态,则设置为初始状态
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        // 取出下一个节点
        Node s = node.next;
        // 如果头节点的下一个节点不存在或者是取消状态(大于0只能为CANCELLED状态)
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 如果尾节点不是null,并且尾部节点不是头节点(head等于tail,即表示只有一个正在执行的线程)。则从队列的尾部向前找,并且一直找到node头节点,找到最前面的状态可用的节点线程,进行下面的unpark操作。
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 如果头节点的下一个节点存在,则unpark这个节点的线程执行任务
        if (s != null)
            LockSupport.unpark(s.thread);
    }
}

2.3、调用acquireQueued方法的循环中park处将其unpark,并继续进行等待线程队列加锁尝试。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

    // 参数Node为当前想要获取线程锁的线程节点
    final boolean acquireQueued(final Node node, int arg) {

        // 标识获取锁的过程中是否出现了异常
        boolean failed = true;
        try {
            //标识线程在等待唤醒的时候是否被打断(interrupt)
            boolean interrupted = false;
            // 这里会循环获取锁,知道获取到或者出现异常
            for (;;) {
                // 获得当前想要获取线程锁的上一个线程节点(即:p节点)
                final Node p = node.predecessor();
                // 如果他是头节点,进行尝试加锁操作。(AQS思想认为头节点不参与排队,及排队买票第一个人不是在排队,而是已经在办理买票业务,如果你的前一个节点是头节点,那么当前在获取锁的时候,头节点线程可能已经执行完毕,但是还没来得及通知你,所以当前线程要进行加锁尝试,避免当前线程刚刚进入就被唤醒带来的开销)
                // 如果p不是头节点,则直接跳过加锁尝试。(所谓非公平锁,非公平是作用于新入的节点,而已经调用过addWaiter方法的节点,则需要排队)
                if (p == head && tryAcquire(arg)) {
                    // 如果加锁成功,当前线程则为头节点
                    setHead(node);
                    // 帮助GC回收执行完的上一个线程节点
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 判断当前线程获取锁失败后是否需要进入到park状态。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 如果需要,则进入park,并进入等待中断状态
                    parkAndCheckInterrupt())
                    // 设置状态为中断状态
                    interrupted = true;
            }
        } finally {
            // 如果操作失败抛出异常
            if (failed)
                // 取消尝试加锁
                cancelAcquire(node);
        }
    }
}
正文到此结束
本文目录