并发编程的艺术-Lock锁原理与分析


并发编程的艺术-Lock锁原理与分析

简介

Java中的java.util.concurrent(J.U.C)包中提供了很多并发编程相关的辅助工具,比如阻塞队列、线程池、锁、并发集合等,其中为了保证线程的安全性,给出的解决方案就是Lock顶层接口,比较经典的实现就是ReentrantLock以及后来的StampedLock 它是Reentrant读写锁的升级版。注: 本博文存在CAS等知识点,建议大家先阅读博主的另一篇synchronized的博文

使用

ReentrantLock是一把互斥锁,排它锁,但同时,他又是一个可重入锁,功能类似于synchronized,互斥排它指同一时间呢有且只允许一个线程获取锁资源,而可重入的意思就是,如果当这个线程再次尝试获取锁资源时,无需再进行加锁操作,只需计入重入次数即可。示例如下:

  • 首先我们创建一个需要加锁的方法以及操作
public class LockDemo {
    private static ReentrantLock lock = new ReentrantLock();
    private static int count = 0;
    public void increment(){
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }
}
  • 创建两个线程去执行,判断最后结果是否不存在并发
public static void main(String[] args) throws InterruptedException {
    LockDemo lockDemo = new LockDemo();
    Thread[] threads = new Thread[2];
    for (int i = 0; i < threads.length; i++) {
        threads[i] = new Thread(() -> {
            for (int y = 0; y < 10000; y++) {
                lockDemo.increment();
            }
        });
        threads[i].start();
    }
    threads[0].join();
    threads[1].join();
    System.out.println(LockDemo.count);
}
  • 最后我们发现,最终结果无论执行多少次,结果均为20000,与我们预期一样,故加锁成功。

锁的设计理念与ReentrantLock源码分析

原理分析

根据Lock的特性,我们可以进行总结下:

  • 如果要满足排他,互斥特性,势必需要一个共享变量。
  • 没有竞争到锁的线程,势必阻塞
  • 被阻塞的线程,势必需要一个容器或者队列来管理

根据特性的总结,去看源码,发现,ReentrantLock中依赖了AbstractQueuedSynchronizer来实现线程的同步

ReentrantLock关系图

根据图例与源码,发现在ReentrantLock中定义了一个Sync的同步类,这个类有两个实现,一个FireSync(公平同步),一个是NonfairSync(非公平同步),也就是公平锁与非公平锁,它在我们new ReentrantLock对象的时候来决定

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

从源码中可以发现,当构造时默认为非公平锁,如果自行传入boolean,则会根据boolean的结果决定,true为公平锁,false则为非公平。

sync则继承了AbstractQueuedSynchronizer抽象类,说明它的具体逻辑是在AbstractQueuedSynchronizer也就是AQS中实现的。

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer(AQS)是ReentrantLock实现锁同步的核心类,但是实际上在J.U.C中大部分组件都依赖于AbstractQueuedSynchronizer

AQS中提供了如下两种锁的实现。

  • 共享锁
  • 独占锁

独占锁,表示同一时刻只允许一个线程获取锁,共享锁,同一时刻允许多个线程同时获得锁. 这就有疑问了,同一时刻允许多个线程获得锁,跟不加锁有什么区别,现在将ReentrantLock与AQS结合起来分析

我们先预览下加锁解锁的流程

锁抢占流程

大致流程如下:

  1. 线程去尝试访问Lock锁加锁的共享资源
  2. 如果线程获取到锁,则将互斥变量state修改为1,并将exclusiveOwnerThread赋值为当前线程
  3. 如果线程为获取到锁,将加入阻塞队列当中

AQS是一个抽象类,留给子类需要重写的方法有下面4个,如果以后想拓展,可以实现这四个方法来生成自己的锁,例如,分布式锁。

  • tryAcquire():独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease():独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared():共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余可用资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点则返回true,否则返回false。

state是一个互斥变量,当state=0时,表示当前共享资源处于空闲状态,当大于0时,表示有线程已经抢占到锁还没释放,但是在重入情况下,它可能是一个大于1的值,也就是线程的重入次数,相应的重入多少次,最后释放锁的时候就要释放同样的次数。state是一个volatile修饰的变量,并通过CAS的方式修改,保证在多线程竞争的情况下,变量的原子性。

综上所述,ReentrantLock在加锁时,实际上是调用的tryAcquire()方法通过CAS的方式将state从0修改为1,其他线程在调用tryAcquire()方法则会失败,但是当以获得锁的线程是个类似递归操作,再次通过tryAcquire()获取锁资源时,则会将state进行++操作,这就是重入操作。

流程操作中的唤醒,由LockSupport.park与LockSupport.unpark来实现。

Lock

我们在原理分析中,了解了公平锁与非公平锁,并且知道了创建的过程,现在我们具体看看公平锁与非公平锁的加锁过程。

FairSync(公平锁), lock()加锁具体实现如下:

final void lock() {
    acquire(1);
}

会发现直接进行了acquire()方法,acquire()方法则是具体的锁竞争逻辑,后续会分析

NofairSync(非公平锁), lock()加锁具体实现如下:

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

非公平锁则是直接去尝试通过CAS的方式去修改state的值,如果这时候恰好state为0,则不用执行锁竞争逻辑,直接获得锁。如果CAS失败,则执行锁竞争逻辑。

现在我们去分析下锁竞争具体方法acquire()

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

一共做了两件事

第一件事,通过tryAcquire()方法尝试获取锁,如果成功返回true,否则为false,它是抽象父类AbstractQueuedSynchronizer提供的方法,虽然不是抽象方法,但是它会直接抛出UnsupportedOperationException异常,源码如下:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

所以具体逻辑还是需要子类去实现。在Reentrantlock中,公平锁的尝试独占方式如下:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

具体操作流程为:

  • 获取state得的当前锁的状态,如果当前状态为0,则尝试通过CAS的方式修改state状态,成功则抢占成功获得锁,否则返回false抢占失败
  • 如果当前state锁的状态不为0,则判断当前线程是否为同一线程,如果是则增加重入次数,返回true,否则返回false抢占失败

第二件事,如果锁抢占失败,则调用acquireQueued()方法将线程加入阻塞队列,进入阻塞队列的线程需要包装为Node节点,这个过程则通过addWaiter(Node.EXCLUSIVE)来完成,addWaiter的源码如下:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
  • 首先将当前线程封装为Node,获取当前队列中最末尾节点,也就是处于tail位置的Node
  • 如果末尾存在节点Node,则通过CAS的方式将当前线程的prev指向指向tail,作为末尾节点引入,并将原末尾节点的next指向当前node
  • 这期间如果存在线程竞争,或者队列中没有Node,则调用enq()方法处理,方法源码如下:
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

方法采用自旋锁的操作来完成,如果末尾节点为空,则不断尝试变更Node为头节点,否则不断尝试变更Node为末尾节点。

在Node返回后,通过acquireQueued()方法获取锁,源码如下

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  • 首先,获取当前Node的prev节点,如果prev指向的是head,则说明当前线程有权利去获取锁
  • 如果当前Node有权利,并且之前线程已经释放锁,且当前线程成功获取锁,则将当前Node设置到head当中,并将原head节点的next指向设计为空,根据GC回收机制,如果当前对象无任何引用则会被自动回收,也就说,原head节点会从链表中清除并被GC回收
  • 如果当前Node有权利,但当前线程获取锁失败,则去通过shouldParkAfterFailedAcquire()方法判断原节点与现节点状态是否正常,具体源码如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

首先获取原节点为waitStatus状态,Node的等待状态一共由5种状态,分别为:

  • default: 0 默认状态
  • CANCELLED : 1 表示线程已取消, 通常是线程等待超时,或者被强制中断,此状态下的Node waitStatus不会在发生改变
  • SIGNAL: -1 此状态下的Node需要等待以获得锁的线程解除所占用
  • CONDITION: -2 此Node有条件的等待,常常线程被await()等待
  • PROPAGATE: -3 共享模式下,Node可以运行操作

这里的源码中,如果waitStatus等于SIGNAL状态则返回true ,如果不是,则尝试把非正常状态节点移除,也就说CANCELLED状态,如果为其他状态,则尝试修改waitStatus为SIGNAL状态。

这里如果shouldParkAfterFailedAcquire()方法返回true,则执行parkAndCheckInterrupt(),此方法通过LockSupport.park()方法挂起当前线程,源码如下:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

至此,整个加锁流程结束。

unlock

当线程需要释放锁的时候,需要调用unlock()方法进行解锁,源码如下:

public void unlock() {
    sync.release(1);
}

在跟进看一下release()方法

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

release方法具体做了两件事,第一个就是调用tryRelease()方法尝试去释放锁,第二个就是在释放锁成功后调用unparkSuccessor()方法去唤醒阻塞队列中的线程。

我们先看一下tryRelease()方法,源码如下:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

首先,他调用该getState()方法获取当前线程重入次数,然后根据重入次数减1。如果减1后等于0,则表示当前线程已经全部执行完毕,释放锁,否则返回false。

如果释放锁成功,则第二步调用unparkSuccessor()方法尝试唤醒阻塞队列的线程,源码如下:

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

首先获取头节点的waitStatus状态, 如果小于0,即通过CAS的方式尝试将waitstatus状态修改为0,即默认状态,然后获取当前头节点的下一个节点,如果下个节点为空,或者下个节点的waitStatus为待消亡状态,则从线程尾部开始遍历,在离头节点最近的一个Node的waitStatus状态小于0的的节点。

最后 如果获取的节点s不为空,则尝试唤醒该线程。

至于为什么线程不从头部开始扫描,而是从尾部开始扫描,是因为在添加阻塞队列的过程中调用enq()的方法有关,它的逻辑是将新节点的prev指向之前tail的节点,然后通过CAS操作将新节点设置为tail节点,最后将原tail节点的node的next指向新节点,如果原tail节点还未将node的next指向新节点,势必在扫描到这里是出现问题,所以通过尾节点就不会出现这类问题。

至此,ReentrantLock加锁与解锁流程就结束了。


文章作者: TimeRoar
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 TimeRoar !
评论
  目录