并发编程的艺术-Lock锁原理与分析
简介
Java中的java.util.concurrent
(J.U.C)包中提供了很多并发编程相关的辅助工具,比如阻塞队列、线程池、锁、并发集合等,其中为了保证线程的安全性,给出的解决方案就是Lock顶层接口,比较经典的实现就是ReentrantLock
以及后来的StampedLock
它是Reentrant读写锁的升级版。注: 本博文存在CAS等知识点,建议大家先阅读博主的另一篇synchronized
的博文
使用
ReentrantLock
是一把互斥锁,排它锁,但同时,他又是一个可重入锁,功能类似于synchronized
,互斥排它指同一时间呢有且只允许一个线程获取锁资源,而可重入的意思就是,如果当这个线程再次尝试获取锁资源时,无需再进行加锁操作,只需计入重入次数即可。示例如下:
- 首先我们创建一个需要加锁的方法以及操作
public class LockDemo {
private static ReentrantLock lock = new ReentrantLock();
private static int count = 0;
public void increment(){
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
}
- 创建两个线程去执行,判断最后结果是否不存在并发
public static void main(String[] args) throws InterruptedException {
LockDemo lockDemo = new LockDemo();
Thread[] threads = new Thread[2];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int y = 0; y < 10000; y++) {
lockDemo.increment();
}
});
threads[i].start();
}
threads[0].join();
threads[1].join();
System.out.println(LockDemo.count);
}
- 最后我们发现,最终结果无论执行多少次,结果均为20000,与我们预期一样,故加锁成功。
锁的设计理念与ReentrantLock源码分析
原理分析
根据Lock的特性,我们可以进行总结下:
- 如果要满足排他,互斥特性,势必需要一个共享变量。
- 没有竞争到锁的线程,势必阻塞
- 被阻塞的线程,势必需要一个容器或者队列来管理
根据特性的总结,去看源码,发现,ReentrantLock
中依赖了AbstractQueuedSynchronizer
来实现线程的同步
根据图例与源码,发现在ReentrantLock
中定义了一个Sync的同步类,这个类有两个实现,一个FireSync(公平同步),一个是NonfairSync(非公平同步),也就是公平锁与非公平锁,它在我们new ReentrantLock对象的时候来决定
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
从源码中可以发现,当构造时默认为非公平锁,如果自行传入boolean,则会根据boolean的结果决定,true为公平锁,false则为非公平。
sync则继承了AbstractQueuedSynchronizer
抽象类,说明它的具体逻辑是在AbstractQueuedSynchronizer
也就是AQS中实现的。
AbstractQueuedSynchronizer
AbstractQueuedSynchronizer
(AQS)是ReentrantLock实现锁同步的核心类,但是实际上在J.U.C中大部分组件都依赖于AbstractQueuedSynchronizer
。
AQS中提供了如下两种锁的实现。
- 共享锁
- 独占锁
独占锁,表示同一时刻只允许一个线程获取锁,共享锁,同一时刻允许多个线程同时获得锁. 这就有疑问了,同一时刻允许多个线程获得锁,跟不加锁有什么区别,现在将ReentrantLock与AQS结合起来分析
我们先预览下加锁解锁的流程
大致流程如下:
- 线程去尝试访问Lock锁加锁的共享资源
- 如果线程获取到锁,则将互斥变量
state
修改为1,并将exclusiveOwnerThread赋值为当前线程 - 如果线程为获取到锁,将加入阻塞队列当中
AQS是一个抽象类,留给子类需要重写的方法有下面4个,如果以后想拓展,可以实现这四个方法来生成自己的锁,例如,分布式锁。
- tryAcquire():独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease():独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared():共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余可用资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点则返回true,否则返回false。
state是一个互斥变量,当state=0时,表示当前共享资源处于空闲状态,当大于0时,表示有线程已经抢占到锁还没释放,但是在重入情况下,它可能是一个大于1的值,也就是线程的重入次数,相应的重入多少次,最后释放锁的时候就要释放同样的次数。state是一个volatile修饰的变量,并通过CAS的方式修改,保证在多线程竞争的情况下,变量的原子性。
综上所述,ReentrantLock
在加锁时,实际上是调用的tryAcquire()
方法通过CAS的方式将state从0修改为1,其他线程在调用tryAcquire()
方法则会失败,但是当以获得锁的线程是个类似递归操作,再次通过tryAcquire()
获取锁资源时,则会将state进行++操作,这就是重入操作。
流程操作中的唤醒,由LockSupport.park与LockSupport.unpark来实现。
Lock
我们在原理分析中,了解了公平锁与非公平锁,并且知道了创建的过程,现在我们具体看看公平锁与非公平锁的加锁过程。
FairSync(公平锁), lock()加锁具体实现如下:
final void lock() {
acquire(1);
}
会发现直接进行了acquire()
方法,acquire()
方法则是具体的锁竞争逻辑,后续会分析
NofairSync(非公平锁), lock()加锁具体实现如下:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
非公平锁则是直接去尝试通过CAS的方式去修改state的值,如果这时候恰好state为0,则不用执行锁竞争逻辑,直接获得锁。如果CAS失败,则执行锁竞争逻辑。
现在我们去分析下锁竞争具体方法acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
一共做了两件事
第一件事,通过tryAcquire()
方法尝试获取锁,如果成功返回true,否则为false,它是抽象父类AbstractQueuedSynchronizer
提供的方法,虽然不是抽象方法,但是它会直接抛出UnsupportedOperationException
异常,源码如下:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
所以具体逻辑还是需要子类去实现。在Reentrantlock中,公平锁的尝试独占方式如下:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
具体操作流程为:
- 获取state得的当前锁的状态,如果当前状态为0,则尝试通过CAS的方式修改state状态,成功则抢占成功获得锁,否则返回false抢占失败
- 如果当前state锁的状态不为0,则判断当前线程是否为同一线程,如果是则增加重入次数,返回true,否则返回false抢占失败
第二件事,如果锁抢占失败,则调用acquireQueued()
方法将线程加入阻塞队列,进入阻塞队列的线程需要包装为Node节点,这个过程则通过addWaiter(Node.EXCLUSIVE)
来完成,addWaiter
的源码如下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
- 首先将当前线程封装为Node,获取当前队列中最末尾节点,也就是处于tail位置的Node
- 如果末尾存在节点Node,则通过CAS的方式将当前线程的prev指向指向tail,作为末尾节点引入,并将原末尾节点的next指向当前node
- 这期间如果存在线程竞争,或者队列中没有Node,则调用
enq()
方法处理,方法源码如下:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
方法采用自旋锁的操作来完成,如果末尾节点为空,则不断尝试变更Node为头节点,否则不断尝试变更Node为末尾节点。
在Node返回后,通过acquireQueued()
方法获取锁,源码如下
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 首先,获取当前Node的prev节点,如果prev指向的是head,则说明当前线程有权利去获取锁
- 如果当前Node有权利,并且之前线程已经释放锁,且当前线程成功获取锁,则将当前Node设置到head当中,并将原head节点的next指向设计为空,根据GC回收机制,如果当前对象无任何引用则会被自动回收,也就说,原head节点会从链表中清除并被GC回收
- 如果当前Node有权利,但当前线程获取锁失败,则去通过shouldParkAfterFailedAcquire()方法判断原节点与现节点状态是否正常,具体源码如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
首先获取原节点为waitStatus
状态,Node的等待状态一共由5种状态,分别为:
- default: 0 默认状态
- CANCELLED : 1 表示线程已取消, 通常是线程等待超时,或者被强制中断,此状态下的Node waitStatus不会在发生改变
- SIGNAL: -1 此状态下的Node需要等待以获得锁的线程解除所占用
- CONDITION: -2 此Node有条件的等待,常常线程被await()等待
- PROPAGATE: -3 共享模式下,Node可以运行操作
这里的源码中,如果waitStatus等于SIGNAL状态则返回true ,如果不是,则尝试把非正常状态节点移除,也就说CANCELLED状态,如果为其他状态,则尝试修改waitStatus为SIGNAL状态。
这里如果shouldParkAfterFailedAcquire()
方法返回true,则执行parkAndCheckInterrupt()
,此方法通过LockSupport.park()
方法挂起当前线程,源码如下:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
至此,整个加锁流程结束。
unlock
当线程需要释放锁的时候,需要调用unlock()
方法进行解锁,源码如下:
public void unlock() {
sync.release(1);
}
在跟进看一下release()
方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
release方法具体做了两件事,第一个就是调用tryRelease()
方法尝试去释放锁,第二个就是在释放锁成功后调用unparkSuccessor()
方法去唤醒阻塞队列中的线程。
我们先看一下tryRelease()
方法,源码如下:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
首先,他调用该getState()
方法获取当前线程重入次数,然后根据重入次数减1。如果减1后等于0,则表示当前线程已经全部执行完毕,释放锁,否则返回false。
如果释放锁成功,则第二步调用unparkSuccessor()
方法尝试唤醒阻塞队列的线程,源码如下:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
首先获取头节点的waitStatus状态, 如果小于0,即通过CAS的方式尝试将waitstatus状态修改为0,即默认状态,然后获取当前头节点的下一个节点,如果下个节点为空,或者下个节点的waitStatus为待消亡状态,则从线程尾部开始遍历,在离头节点最近的一个Node的waitStatus状态小于0的的节点。
最后 如果获取的节点s不为空,则尝试唤醒该线程。
至于为什么线程不从头部开始扫描,而是从尾部开始扫描,是因为在添加阻塞队列的过程中调用enq()
的方法有关,它的逻辑是将新节点的prev指向之前tail的节点,然后通过CAS操作将新节点设置为tail节点,最后将原tail节点的node的next指向新节点,如果原tail节点还未将node的next指向新节点,势必在扫描到这里是出现问题,所以通过尾节点就不会出现这类问题。
至此,ReentrantLock加锁与解锁流程就结束了。