当前位置:主页 > 查看内容

Java并发编程(十):ReentrantLock-NonfairSync源码逐行深度分析

发布时间:2021-05-18 00:00| 位朋友查看

简介:??我们在前文 ReentrantLock-NonfairSync源码逐行深度分析上 中分析了NonfairSync获取锁和阻塞线程的入队逻辑也就是定义在AQS中的acquire方法 public final void acquire ( int arg ) { if ( ! tryAcquire ( arg ) acquireQueued ( addWaiter ( Node . EXCLU……

??我们在前文ReentrantLock-NonfairSync源码逐行深度分析(上)中分析了NonfairSync获取锁和阻塞线程的入队逻辑,也就是定义在AQS中的acquire方法:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

??从前文中了解到,addWaiter方法会将Thread包装成一个Node对象,然后通过死循环(自旋)+CAS的方式保证node一定能够入队成功,入队成功之后会返回对应的node,然后当做入参传入acquireQueued方法中,该方法同样定义在AQS中:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
        	//标识是否被中断
            boolean interrupted = false;
            //自旋
            for (;;) {
            	//获取node节点的前驱节点
                final Node p = node.predecessor();
                //如果前驱节点是head节点,那么这里在阻塞之前再次尝试去获取锁
                if (p == head && tryAcquire(arg)) {
                	//如果获取锁成功,那么将这个node节点设置为新的头节点
                	//setHead方法会清空node的thread和prev属性,并将node赋值到head
                    setHead(node);
                    //把旧头结点的后驱指针设置为空
                    p.next = null; // help GC
                    //到这里相当于将旧的那个头结点孤立了,没有指针指向它,等待被GC回收
                    failed = false;
                    //返回是否被中断唤醒,明显这里是false
                    return interrupted;
                }
                //下面是阻塞和为唤醒做准备的操作
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

??这里同样是自旋逻辑,首先会判断当前node的前驱节点是否是head节点,如果是,那么不会立即阻塞当前node代表的线程,而是再次尝试去获取锁。这个逻辑是什么意思呢?其实很好理解,如果这个条件成立,那就代表此时这个node是CLH的第一个真实排队节点,队列中的第二个节点(head头结点是一个空的Node),此时可能锁已经被持锁线程释放了,那么这个排在首位的线程其实不需要被阻塞,直接获取锁就行了。但是也有可能会获取锁失败,比如现在是非公平锁的场景,即使node是第一个排队的节点,但此时又有一个新的线程来竞争锁,由于是非公平的,那么这个锁就可能被这个新来的线程获取,node还是需要被阻塞。如果获取锁成功,那么成功节点就需要”出队“,但是这里的出队并不是真正的出队,其执行的操作是:将节点的thread和prev属性置空,把head节点设置为当前节点,将旧head节点的next指针断掉。假如原本是这样的状态(t0已经被阻塞,head节点的waitStatus为SIGNAL):
在这里插入图片描述
??现在t0被唤醒获取锁成功,对应的node从CLH"出队"之后的状态:
在这里插入图片描述
??可以看到,node节点并没有真正出队,而是通过一系列字段置空和断开指针操作,node节点变成了新的head节点,而旧的head节点由于没有引用,会被GC清理掉。当然,如果获取锁失败,那么还是需要进入阻塞状态。
??接下来看看阻塞的逻辑,这里涉及到了两个方法的调用:shouldParkAfterFailedAcquire和parkAndCheckInterrupt。如果这两个方法都返回true,那么会将interrupted字段设置为true,然后继续下一次循环。首先来看shouldParkAfterFailedAcquire方法,注意该方法调用传入的pred参数是node节点的前驱节点。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    	//pred为node节点的前驱节点
    	//获取前驱结点pred的waitStatus值
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
        	//ws == -1
            return true;
        if (ws > 0) {
            do {
            	//如果waitStatus为1(CANCELLED
            	//那么需要移除pred节点,如果pred的前驱节点还是为CANCELLED
            	//那么继续移除,直到节点为非CANCELLED状态
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
        	//将pred的waitStatus字段设置为-1
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

??该方法主要是针对waitStatus的一系列操作,关于Node对象的waitStatus字段,前面提到过,其定义了4个值:

-1 :SIGNAL:可被唤醒
1:CANCELLED:需要删除
-2:CONDITION:条件等待
-3:PROPAGATE:无条件传播

??当然还有一个默认的0值,关于1、-2、-3几个值,在本文中不用去关心,后续在总结其它AQS相关实现的工具时遇到再谈,这里只需要关注默认值0和-1。初始状态ws的值为默认值0,那么将会走到CAS修改waitStatus的逻辑,将前驱结点的ws的值修改为-1,然后这个方法返回false(关于ws大于0,当前只定义了一个大于0的值,就是CANCELLED,本文不用理会,这里做的逻辑处理在代码中也给出了注释,就不再多言~)。
??shouldParkAfterFailedAcquire方法返回false之后,紧接着的parkAndCheckInterrupt方法也就不会执行,那么在acquireQueued方法中会进入下一次循环,这里我们就先不考虑node是首个排队节点的情况,会再次进入shouldParkAfterFailedAcquire方法的逻辑,只是此时前驱节点的ws的值成了-1,ws == Node.SIGNAL返回true,那么这个方法此时就会返回true。那么就会接着执行parkAndCheckInterrupt方法,该方法很简单,定义如下:

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

??通过LockSupport.park方法实现了线程的阻塞。方法体中关于返回当前线程中断状态的逻辑我们暂时不管,现在先来重新思考shouldParkAfterFailedAcquire和parkAndCheckInterrupt两个方法的逻辑。经过前面的分析,shouldParkAfterFailedAcquire会执行两次(不考虑pread==head并且获取锁成功的情况,这时节点不会再阻塞),第一次将node的前驱节点的waitStatus设置为SIGNAL(-1),方法返回false,第二次判断到waitStatus等于-1,方法返回true,进而进入parkAndCheckInterrupt方法通过park进行阻塞。所以是node节点的前驱节点的waitStatus的取值影响node节点是否会进入park阻塞,这样做的意义暂时不谈,先知道这么个事儿,等我们看了释放锁的逻辑之后再回过头来看。
??那现在就来看ReentrantLock的unlock方法:

public void unlock() {
        sync.release(1);
}

??调用的是AQS的release方法:

public final boolean release(int arg) {
		//尝试释放锁
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

??该方法的布尔类型返回值表示释放锁是否成功,先不管失败的情况,直接看看tryReleaes方法是如何实现的。该方法在AQS中也是一个空方法,具体实现在子类中,这里我们看看ReentrantLock类中的实现(Sync):

protected final boolean tryRelease(int releases) {
			//releases传入的是1
			//将当前state的值减去1
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
            	//只能是持锁线程才能释放锁,否则抛出异常
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
            	//如果state减为0才表示释放锁成功
            	//对于锁重入的场景,必须有对应重入次数的释放动作
                free = true;
                //锁释放成功,将exclusiveOwnerThread设置为空
                setExclusiveOwnerThread(null);
            }
            //更新state属性
            setState(c);
            return free;
        }

??锁释放的逻辑很简单,看代码中的注释就行,这里不再赘述。假设现在锁释放成功,返回的free为true,那么就需要执行阻塞线程的唤醒动作:

if (tryRelease(arg)) {
	//获取头结点
	Node h = head;
	if (h != null && h.waitStatus != 0)
		//这里要求头结点的waitStatus!=0才会去尝试唤醒阻塞线程
		unparkSuccessor(h);
	//返回true,表示锁释放成功,如果head为null,或head.waitStatus==0,那么不会去尝试唤醒阻塞线程
	return true;
}

??释放锁和唤醒阻塞线程虽然都由持锁线程处理,但是这两个是独立的动作,按照代码逻辑来看,它们允许不同时发生,判断依据就是waitStatus字段。对应到前面提到的,线程在阻塞之前,会有两次循环,第一次会先把前驱节点的waitStatus修改为SIGNAL(-1),然后第二次才会进入parkAndCheckInterrupt方法进行阻塞。注意这里唤醒线程是从CLH的头结点开始唤醒,所以需要判断head的waitStatus字段是否不为默认值0,以下状态表示t0线程可以尝试被唤醒:

??此时head节点的waitStatus==-1,那么可以进入unparkSuccessor方法尝试唤醒阻塞的线程,该方法同样定义在AQS中:

private void unparkSuccessor(Node node) {
		//此时传入的node是head节点
        int ws = node.waitStatus;
        if (ws < 0)
        	//如果waitStatus小于0,则需要重置其状态
            compareAndSetWaitStatus(node, ws, 0);
           
        //需要唤醒的是node的后驱节点 
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
        	//如果node没有后驱节点,或者后驱节点的waitStatus属性大于0 
        	//目前大于0的值就只定义了1,表示CANCELLED,需要被移除
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
            	//从后往前找,找到最靠近队头的一个waitStatus<=0的节点
                if (t.waitStatus <= 0)
                	//CANCELLED状态的节点不能被唤醒
                    s = t;
        }
        if (s != null)
        	//找到了需要唤醒的节点,调用unpark唤醒节点对应的线程
            LockSupport.unpark(s.thread);
    }

??前面介绍了在shouldParkAfterFailedAcquire方法中有对应CANCELLED节点的移除逻辑,这里也暂时不用考虑太多,只需要知道我们找到了一个需要被唤醒的线程,然后通过unpark方法将其唤醒,而这里被唤醒的节点就是head节点的后驱节点。
??那么当对应阻塞的线程被唤醒之后呢?我们回到线程阻塞的代码段:

private final boolean parkAndCheckInterrupt() {
		//线程阻塞在这里
        LockSupport.park(this);
        //返回的是线程是否被中断,该方法会重置中断标志
        return Thread.interrupted();
    }
    
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            	//阻塞线程被唤醒后,会继续循环,其前驱节点是head节点,才会尝试继续获取锁
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    //线程成功获取到锁,failed参数设置为false
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //如果线程是被中断唤醒,那么将中断表示interrupted设置为true
                    interrupted = true;
            }
        } finally {
            if (failed)
            	//在这里线程如果重新获取到锁,failed不会是true
                cancelAcquire(node);
        }
    }

??这里有几个非常重要的点:
??第一个是阻塞线程被唤醒之后,不是立马就把锁给它,而是会继续执行循环体,再执行一次:判断前驱结点是否为head,然后尝试通过CAS去竞争锁。对于非公平锁的情况下,如果持锁线程将锁释放了,并不能保证被唤醒线程一定能拿到锁,因为此时可能有一个新的线程来竞争锁,那么这个锁就可能被这个新线程获取到,被唤醒的线程还需要再次陷入阻塞,关于这点在前面分析acquireQueued方法的时候已经有过说明。在前面unparkSuccessor方法唤醒阻塞线程的逻辑中,将head的waitStatus的值又重置为了初始值0,那么这里如果被唤醒线程竞争失败,需要重新陷入阻塞,会又重复两次循环,先将前驱节点(这里是head)的waitStatus设置为SIGNAL(-1),然后再陷入阻塞,这样不断循环~
??第二个是线程中断的处理。线程在方法parkAndCheckInterrupt被阻塞,然后唤醒后,返回了当前线程是否被中断的标志。关于线程中断,可以理解为是一个信号,以前终结一个线程需要调用thread.stop方法,该方法会调用native方法stop0,这个方法很暴力,线程会被直接kill,如果线程正在执行某些操作,也就会被暴力打断。所以后面提供了一个优雅一点的方式:interrupt。线程被中断后,开发人员可以获取到该标识:Thread.interrupted(),然后优雅地退出线程。需要注意的是,调用该方法会重置线程的中断状态

/**
     * Tests if some Thread has been interrupted.  The interrupted state
     * is reset or not based on the value of ClearInterrupted that is
     * passed.
     */
    private native boolean isInterrupted(boolean ClearInterrupted);

??所以被park阻塞的线程,既可以被upark唤醒,也可以被interrupt唤醒,在此处的实现逻辑中,可以看到不论interrupt是true还是false都没有影响,仍然会不断循环重试,直到成功获取到锁,最终interrupted作为acquireQueued方法的返回值返回给调用者,在前文已经分析过了,其调用者是AQS的acquire方法:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //acquireQueued方法返回的是线程是否被中断过
            //如果被中断过,则需要在这里进行一次自我中断
            selfInterrupt();
    }
    
static void selfInterrupt() {
		//线程自己中断自己
        Thread.currentThread().interrupt();
    }

??为什么需要自己中断自己?原因就是前面提到的,Thread.interrupted()方法会重置线程的interrput状态,这里需要把中断状态向外传递,使得外层调用者能正确感知到线程被中断过,以便做出相应的处理,所以需要自己中断自己重新设置中断状态。
??第三是acquireQueued方法中断finally 代码块:

finally {
	if (failed)
		cancelAcquire(node);
	}

??在此处的场景中,这个failed好像除了未知的异常发生之外,永远不会为true,因为即使线程被中断唤醒了,也会继续竞争锁,竞争失败会继续陷入阻塞,竞争成功,那么failed会被设置为false。事实上,ReentrantLock除了lock()方法之外,还有一个lockInterruptibly方法,看方法名可能就知道了,这个方法是支持中断的,看看其在AQS中的实现:

public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
        	//如果线程被中断,那么直接抛出中断异常
            throw new InterruptedException();
        if (!tryAcquire(arg))
        	//如果首次尝试获取锁失败,那么调用doAcquireInterruptibly方法阻塞
            doAcquireInterruptibly(arg);
    }

??如果线程被中断,那么直接抛出中断异常,否则就尝试调用tryAcquire方法获取锁,如果获取锁失败,那么调用方法doAcquireInterruptibly完成线程的阻塞工作(前面介绍的lock逻辑是通过acquireQueued方法实现的),来看看这个方法:

private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        //和前面分析的acquireQueued方法一样,自旋CAS保证入队成功
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
            	//同样的自旋获取锁
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                //同样的修改waitState和阻塞逻辑
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //但是这里是抛出异常,前面是设置interrupted = true
                    //抛出异常后,failed 还是为初始值true,那么就会执行finally代码块中的cancelAcquire方法
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
            	//如果前面抛出InterruptedException就会执行这个方法
                cancelAcquire(node);
        }
    }

??可以看到doAcquireInterruptibly方法和acquireQueued方法"唯一"的区别就是,doAcquireInterruptibly没有返回值,在这里面如果阻塞线程被中断唤醒,那么不会继续参与锁的竞争,而是直接抛出InterruptedException,然后会在finally代码块中执行cancelAcquire方法;而acquireQueued方法即使线程被中断唤醒,那么也只是将中断状态保存下来,然后会继续参与锁的竞争,最终获取到锁之后,会把中断标识返回给方法调用者,最终通过selfInterrupt线程自我中断重新打上中断标记,以便更外层的调用者能够获取到该状态。
??那么我们来看看cancelAcquire方法:

private void cancelAcquire(Node node) {
        if (node == null)
        	//判空
            return;
		//置空当前节点的thread,因为被中断后就不需要了,也不需要被唤醒了
        node.thread = null;
        //获取node的前驱节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
        	//跳过node所有waitStatus>0(也就是CANCELLED)的前驱节点
        	//将node的前驱指针指向最近的一个非CANCELLED节点pred
            node.prev = pred = pred.prev;
		//获取pred的后驱节点
        Node predNext = pred.next;
        //将当前node节点的waitStatus设置为CANCELLED
        node.waitStatus = Node.CANCELLED;
        if (node == tail && compareAndSetTail(node, pred)) {
        	//情况1:node是尾结点并且成功通过CAS将pred设置为尾结点(可能有其它新线程节点入队,导致node不再是尾结点)
        	//将pred的后驱指针置空:相当于移除了当前node和node的所有CANCELLED前驱节点
            compareAndSetNext(pred, predNext, null);
        } else {
       		//node不是尾结点,或者本来是尾结点,但是CAS的时候被其它线程更新了CLH(上面compareAndSetTail失败),node变成了非尾节点
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                //情况2:pred不是head节点,并且pred是有效的节点(waitStatus不是CANCELLED,并且是SIGNAL或被成功修改为SIGNAL,thread不为空)
                //如果pred的waitStatus不等于SIGNAL,会尝试将其修改为SIGNAL,表示下个节点可被唤醒
                //由于节点的waitStatus可能被其它线程修改,所以使用了CAS操作
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                	//当node==tail,但是CAS替换tail的时候失败了,逻辑也可能走到这里,所以next可能为null,需要判空
                    compareAndSetNext(pred, predNext, next);
            } else {
            	//情况三:pred是head节点或者无效的节点
                unparkSuccessor(node);
            }
			//将node的next指向自己
            node.next = node; // help GC
        }
    }

??这个方法代码比较复杂,但是实现的逻辑很简单,就是根据节点的不同位置采取不同的逻辑,将该节点从CLH队列中移除,需要的时候还会尝试唤醒后驱节点,代码中给出了相应的注释,这里也通过画图再详细解释一下其逻辑。
??首先是注释中提到的,跳过所有waitStatus>0(也就是CANCELLED)的前驱节点,重新设置node的前驱指针。假设初始状态是这样(注意这里不关注waitState<=0的具体状态,所以图中节点只画了1和0两个状态):在这里插入图片描述
??跳过所有CANCELLED前驱节点,重新设置前驱指针,置空thread,接着获取pred的后驱节点,然后将node的waitStatus设置为CANCELLED:
在这里插入图片描述
??这些逻辑都是做的准备工作,接下来对于三个情况有不同的处理逻辑。

  • 情况1:当前node节点是尾结点(tail)。那么就直接移除node节点,还顺带把node的所有CANCELLED前驱节点一并移除,移除的操作是,先将pred设置为tail,然后将pred的next置空,对应到上图就成了这个状态:
    在这里插入图片描述
    ??最后两个waitStatus为1的节点(包括node)被逻辑移除,没有了GC Root引用链引用,可以被GC清理。当然,CAS更新尾结点可能会失败,如果失败,那么就会走入下面的逻辑。
  • 情况2:pred不是head节点,并且pred是有效的节点。那么先获取node的next节点,如果该节点是有效节点,那么尝试将pred的next指向这个node的next,最终将node的next指向自己:
    在这里插入图片描述
    ??可以看到,中间那个无效节点的指针已经断了,但是node的后驱节点的prev指针仍然指向node,不过没有影响,因为在其后面入队的节点准备阻塞或者中断后会从后往前遍历节点,同时"移除"无效节点。
  • 情况3:pred是head节点或者无效的节点。此时直接尝试唤醒node的后驱节点,最后将node的next指向自己。

这里需要注意,情况3中调用unparkSuccessor方法的目的是什么?我们假设这里unparkSuccessor方法调用之前是以下状态:
在这里插入图片描述
??现在唤醒t1线程,t1线程会在acquireQueued方法中继续循环,显然此时t1的prev节点不是head节点,所以会进入shouldParkAfterFailedAcquire方法修改前驱节点的waitStatus,但是发现前驱节点(也就是node)的watiStatus大于0 ,所以会跳过该节点,前面已经分析过shouldParkAfterFailedAcquire方法,这里就不详细说明了,只贴一下此处关心的代码:

if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        }
return false;

??经过这个操作之后,状态变成了:
在这里插入图片描述
??此时shouldParkAfterFailedAcquire方法返回的false,接着又进行一次循环,这次循环发现t1节点的prev节点是head,那么t1就会尝试去竞争锁,如果竞争失败,则再次陷入阻塞状态。那么为什么不直接修改指针,要通过唤醒t1线程去修改指针呢?试想一下,如果持锁线程unlock释放了锁,那么需要从head唤醒一个阻塞的线程,由于node是head的后驱节点,所以node会被唤醒,而node对应的线程却在方法返回之前突然又出现异常(比如tryAcquire方法中获取锁时抛出了异常),也就相当于唤醒失败了,没有成功获取到锁,从而进入了cancelAcquire方法,那么这个node会在cancelAcquire中被移除。但是这里要注意,node移除之后,本次unlock就相当于没有唤醒任何线程(因为node由于异常持锁失败),如果不来新的线程竞争锁,就可能出现没有线程持有锁,但是却有很多线程被阻塞排队等待锁被释放唤醒它们的情况(纯属个人猜测~)。
??另外,关于unparkSuccessor方法前文已经介绍过了,这里不再赘述,就是前文逻辑中入参是head,此处是node。但是为什么寻找有效节点是从后往前找,不是从前往后找呢?结合到无效节点出队的逻辑,可以发现一些无效节点的指针一开始并没有全部断开,而是保留了部分后驱节点对其的前置指针,从后往前遍历可以根据这个前置指针找到这些无效节点,完成真正的出队,也就相当于出队动作被延迟了。

总结

??Doug Lea将众多逻辑都凝练到AQS中,单独看某个函数的时候,可能会觉得有些逻辑明明可以有更简单的实现,有些逻辑又是莫名其妙,因为这些函数逻辑考虑了AQS的众多应用场景,所以需要对这块儿有个整体上的把握才能更好的理解其设计。正是由于这个原因,本文在分析函数的时候大多都只考虑当前分析的场景,不然可能会很懵逼。

;原文链接:https://blog.csdn.net/huangzhilin2015/article/details/115436267
本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!

推荐图文


随机推荐