前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【并发编程】源码分析角度来看看ConditionObject

【并发编程】源码分析角度来看看ConditionObject

作者头像
程序员波特
发布2024-04-22 09:01:52
740
发布2024-04-22 09:01:52
举报
文章被收录于专栏:魔法书魔法书

synchronized提供了waitnotify的方法实现线程在持有锁时,可以实现挂起,已经唤醒的操作。

ReentrantLock也拥有这个功能。

ReentrantLock提供了awaitsignal方法去实现类似wait和notify的功能。

想执行await或者是signal就必须先持有lock锁的资源。

ConditionObject的介绍&应用

先look一下Condition的应用

代码语言:javascript
复制
public static void main(String[] args) throws InterruptedException, IOException {
 	ReentrantLock lock = new ReentrantLock();
 	Condition condition = lock.newCondition();
 	new Thread(() -> {
  		lock.lock();
 		System.out.println("子线程获取锁资源并await挂起线程");
 		try {
 			Thread.sleep(5000);
 		} catch (InterruptedException e) {
 			e.printStackTrace();
 		}
 		
 		try {
 			condition.await();
 		} catch (InterruptedException e) {
 			e.printStackTrace();
 		}
 
 		System.out.println("子线程挂起后被唤醒!持有锁资源");
 	}).start();
 
 	Thread.sleep(100);
 	// =================main======================
 	lock.lock();
 	System.out.println("主线程等待5s拿到锁资源,子线程执行了await方法");
 	condition.signal();
 	System.out.println("主线程唤醒了await挂起的子线程");
 	lock.unlock();
}
Condition的构建方式&核心属性

发现在通过lock锁对象执行newCondition方法时,本质就是直接new的AQS提供的 ConditionObject对象

代码语言:javascript
复制
final ConditionObject newCondition() {
 	return new ConditionObject();
}

其实lock锁中可以有多个Condition对象。

在对Condition1进行操作时,不会影响到Condition2的单向链表。

其次可以发现ConditionObject中,只有两个核心属性:

代码语言:javascript
复制
/** First node of condition queue. */
private transient Node firstWaiter;

/** Last node of condition queue. */
private transient Node lastWaiter;

虽然Node对象有prev和next,但是在ConditionObject中是不会使用这两个属性的,只要在Condition队列中,这两个属性都是null。在ConditionObject中只会使用nextWaiter的属性实现单向链表的效果。

Condition的await方法分析(前置分析)

持有锁的线程在执行await方法后会做几个操作:

  • 判断线程是否中断,如果中断了,什么都不做。
  • 没有中断,就讲当前线程封装为Node添加到Condition的单向链表中一次性释放掉锁资源。
  • 如果当前线程没有在AQS队列,就正常执行LockSupport.park(this)挂起线程。
代码语言:javascript
复制
// await方法的前置分析,只分析到线程挂起
public final void await() throws InterruptedException {
 	// 先判断线程的中断标记位是否是true
 	if (Thread.interrupted())
 		// 如果是true,就没必要执行后续操作挂起了。
 		throw new InterruptedException();
 	
 	// 在线程挂起之前,先将当前线程封装为Node,并且添加到Condition队列中
 	Node node = addConditionWaiter();
 	// fullyRelease在释放锁资源,一次性将锁资源全部释放,并且保留重入的次数
 	int savedState = fullyRelease(node);
 	// 省略一行代码……
 	// 当前Node是否在AQS队列中?
 	// 执行fullyRelease方法后,线程就释放锁资源了,如果线程刚刚释放锁资源,其他线程就立即执行了signal方法,
 	// 此时当前线程就被放到了AQS的队列中,这样一来线程就不需要执行LockSupport.park(this);去挂起线程了
 	while (!isOnSyncQueue(node)) {
 		// 如果没有在AQS队列中,正常在Condition单向链表里,正常挂起线程。
 		LockSupport.park(this);
 		// 省略部分代码……
 	}
 	// 省略部分代码……
}

// 线程挂起先,添加到Condition单向链表的业务~~
private Node addConditionWaiter() {
 	// 拿到尾节点。
 	Node t = lastWaiter;
 	// 如果尾节点有值,并且尾节点的状态不正常,不是-2,尾节点可能要拜拜了~
 	if (t != null && t.waitStatus != Node.CONDITION) {
 		// 如果尾节点已经取消了,需要干掉取消的尾节点~
 		unlinkCancelledWaiters();
 		// 重新获取lastWaiter
 		t = lastWaiter;
 	}
 	// 构建当前线程的Node,并且状态设置为-2
	Node node = new Node(Thread.currentThread(), Node.CONDITION);
 	// 如果last节点为null。直接将当前节点设置为firstWaiter
 	if (t == null)
 		firstWaiter = node;
 	else
 		// 如果last节点不为null,说明有值,就排在lastWaiter的后面
 		t.nextWaiter = node;
 		
 	// 把当前节点设置为最后一个节点
 	lastWaiter = node;
 	// 返回当前节点
 	return node;
}

// 干掉取消的尾节点。
private void unlinkCancelledWaiters() {
 	// 拿到头节点
 	Node t = firstWaiter;
 	// 声明一个节点,爱啥啥~~~
 	Node trail = null;
 	// 如果t不为null,就正常执行~~
 	while (t != null) {
 		// 拿到t的next节点
 		Node next = t.nextWaiter;
 		// 如果t的状态不为-2,说明有问题
 		if (t.waitStatus != Node.CONDITION) {
 			// t节点的next为null
 			t.nextWaiter = null;
 			// 如果trail为null,代表头结点状态就是1,
 			if (trail == null)
 				// 将头结点指向next节点
 				firstWaiter = next;
 			else
 				// 如果trail有值,说明不是头结点位置
 				trail.nextWaiter = next;
 
 			// 如果next为null,说明单向链表遍历到最后了,直接结束
 			if (next == null)
 				lastWaiter = trail;
 		}
 		// 如果t的状态是-2,一切正常
 		else {
 			// 临时存储t
			trail = t;
 		}
 		// t指向之前的next
 		t = next;
 	}
}

// 一次性释放锁资源
final int fullyRelease(Node node) {
 	// 标记位,释放锁资源默认失败!
 	boolean failed = true;
 	try {
 		// 拿到现在state的值
 		int savedState = getState();
 		// 一次性释放干净全部锁资源
 		if (release(savedState)) {
 			// 释放锁资源失败了么? 没有!
 			failed = false;
 			// 返回对应的锁资源信息
 			return savedState;
 		} else {
 			throw new IllegalMonitorStateException();
 		}
 	} finally {
 		if (failed)
 			// 如果释放锁资源失败,将节点状态设置为取消
 			node.waitStatus = Node.CANCELLED;
 	}
}
Condition的signal方法分析

分为了几个部分:

  • 确保执行signal方法的是持有锁的线程
  • 脱离Condition的队列
  • 将Node状态从-2改为0
  • 将Node添加到AQS队列
  • 为了避免当前Node无法在AQS队列正常唤醒做了一些判断和操作
代码语言:javascript
复制
// 线程挂起后,可以基于signal唤醒~
public final void signal() {
 	// 在ReentrantLock中,如果执行signal的线程没有持有锁资源,直接扔异常
 	if (!isHeldExclusively())
 		throw new IllegalMonitorStateException();
 
 	// 拿到排在Condition首位的Node
 	Node first = firstWaiter;
 	// 有Node在排队,才需要唤醒,如果没有,直接告辞~~
 	if (first != null) doSignal(first);
}

// 开始唤醒Condition中的Node中的线程
private void doSignal(Node first) {
 	// 先一波do-while走你~~~
 	do {
 		// 获取到第二个节点,并且将第二个节点设置为firstWaiter
 		if ( (firstWaiter = first.nextWaiter) == null)
 		// 说明就一个节点在Condition队列中,那么直接将firstWaiter和lastWaiter置位null
 		lastWaiter = null;
 	// 如果还有nextWaiter节点,因为当前节点要被唤醒了,脱离整个Condition队列。将nextWaiter置位null
 	first.nextWaiter = null;
 	// 如果transferForSignal返回true,一切正常,退出while循环
 } while (!transferForSignal(first) &&
 // 如果后续节点还有,往后面继续唤醒,如果没有,退出while循环
 (first = firstWaiter) != null);
}

// 准备开始唤醒在Condition中排队的Node
final boolean transferForSignal(Node node) {
 	// 将在Condition队列中的Node的状态从-2,改为0,代表要扔到AQS队列了。
 	if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
 	// 如果失败了,说明在signal之前应当是线程被中断了,从而被唤醒了。
 		return false;
 	// 如果正常的将Node的状态从-2改为0,这是就要将Condition中的这个Node扔到AQS的队列。
 	// 将当前Node扔到AQS队列,返回的p是当前Node的prev
	 Node p = enq(node);
 	// 获取上一个Node的状态
 	int ws = p.waitStatus;
 	// 如果ws > 0 ,说明这个Node已经被取消了。
 	// 如果ws状态不是取消,将prev节点的状态改为-1,。
 	if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
 // 如果prev节点已经取消了,可能会导致当前节点永远无法被唤醒。立即唤醒当前节点,基于acquireQueued方法,
 // 让当前节点找到一个正常的prev节点,并挂起线程
 // 如果prev节点正常,但是CAS修改prev节点失败了。证明prev节点因为并发原因导致状态改变。还是为了避免当前
 // 节点无法被正常唤醒,提前唤醒当前线程,基于acquireQueued方法,让当前节点找到一个正常的prev节点,并挂起线程
 	LockSupport.unpark(node.thread);
 // 返回true
 return true;
}
Conditiond的await方法分析(后置分析)

分为了几个部分:

  • 唤醒之后,要先确认是中断唤醒还是signal唤醒,还是signal唤醒后被中断
  • 确保当前线程的Node已经在AQS队列中
  • 执行acquireQueued方法,等待锁资源。
  • 在获取锁资源后,要确认是否在获取锁资源的阶段被中断过,如果被中断过,并且不是THROW_IE,那就确保interruptMode是REINTERRUPT
  • 确认当前Node已经不在Condition队列中了
  • 最终根据interruptMode来决定具体做的事情
    • 0:嘛也不做。
    • THROW_IE:抛出异常
    • REINTERRUPT:执行线程的interrupt方法
代码语言:javascript
复制
// 现在分析await方法的后半部分
public final void await() throws InterruptedException {
   if (Thread.interrupted())
        throw new InterruptedException();

    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    // 中断模式~
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        // 如果线程执行到这,说明现在被唤醒了。
        // 线程可以被signal唤醒。(如果是signal唤醒,可以确认线程已经在AQS队列中)
        // 线程可以被interrupt唤醒,线程被唤醒后,没有在AQS队列中。
        // 如果线程先被signal唤醒,然后线程中断了。。。。(做一些额外处理)
        // checkInterruptWhileWaiting可以确认当前中如何唤醒的。
        // 返回的值,有三种
        // 0:正常signal唤醒,没别的事(不知道Node是否在AQS队列)
        // THROW_IE(-1):中断唤醒,并且可以确保在AQS队列
        // REINTERRUPT(1):signal唤醒,但是线程被中断了,并且可以确保在AQS队列
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }

    // Node一定在AQS队列
    // 执行acquireQueued,尝试在ReentrantLock中获取锁资源。
    // acquireQueued方法返回true:代表线程在AQS队列中挂起时,被中断过
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        // 如果线程在AQS队列排队时,被中断了,并且不是THROW_IE状态,确保线程的interruptMode是REINTERRUPT
        // REINTERRUPT:await不是中断唤醒,但是后续被中断过!!!
        interruptMode = REINTERRUPT;

    // 如果当前Node还在condition的单向链表中,脱离Condition的单向链表
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();

    // 如果interruptMode是0,说明线程在signal后以及持有锁的过程中,没被中断过,什么事都不做!
    if (interruptMode != 0)
    // 如果不是0~
        reportInterruptAfterWait(interruptMode);
}

// 判断当前线程被唤醒的模式,确认interruptMode的值。
private int checkInterruptWhileWaiting(Node node) {
    // 判断线程是否中断了。
return Thread.interrupted() ?
// THROW_IE:代表线程是被interrupt唤醒的,需要向上排除异常
// REINTERRUPT:代表线程是signal唤醒的,但是在唤醒之后,被中断了。
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
// 线程是正常的被signal唤醒,并且线程没有中断过。
0;
}

// 判断线程到底是中断唤醒的,还是signal唤醒的!
final boolean transferAfterCancelledWait(Node node) {
    // 基于CAS将Node的状态从-2改为0
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
    // 说明是中断唤醒的线程。因为CAS成功了。
    // 将Node添加到AQS队列中~(如果是中断唤醒的,当前线程同时存在Condition的单向链表以及AQS的队列中)
    enq(node);
    // 返回true
    return true;
   }

    // 判断当前的Node是否在AQS队列(signal唤醒的,但是可能线程还没放到AQS队列)
    // 等到signal方法将线程的Node扔到AQS队列后,再做后续操作
    while (!isOnSyncQueue(node))
        // 如果没在AQS队列上,那就线程让步,稍等一会,Node放到AQS队列再处理(看CPU)
        Thread.yield();
        // signal唤醒的,返回false
        return false;
   }

   // 确认Node是否在AQS队列上
   final boolean isOnSyncQueue(Node node) {
    // 如果线程状态为-2,肯定没在AQS队列
    // 如果prev节点的值为null,肯定没在AQS队列
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        // 返回false
        return false;

    // 如果节点的next不为null。说明已经在AQS队列上。、
    if (node.next != null)
        // 确定AQS队列上有!
        return true;

    // 如果上述判断都没有确认节点在AQS队列上,在AQS队列中寻找一波
    return findNodeFromTail(node);
}

// 在AQS队列中找当前节点
private boolean findNodeFromTail(Node node) {
    // 拿到尾节点
    Node t = tail;
    for (;;) {
        // tail是否是当前节点,如果是,说明在AQS队列
        if (t == node)
            // 可以跳出while循环
            return true;

        // 如果节点为null,AQS队列中没有当前节点
        if (t == null)
            // 进入while,让步一手
            return false;
    // t向前引用
    t = t.prev;
  }
}

private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
    // 如果是中断唤醒的await,直接抛出异常!
    if (interruptMode == THROW_IE)
        throw new InterruptedException();

    // 如果是REINTERRUPT,signal后被中断过
    else if (interruptMode == REINTERRUPT)
        // 确认线程的中断标记位是true
        // Thread.currentThread().interrupt();

    selfInterrupt();
}
Condition的awaitNanos&signalAll方法分析

awaitNanos:仅仅是在await方法的基础上,做了一内内的改变,整体的逻辑思想都是一样的。

挂起线程时,传入要阻塞的时间,时间到了,自动唤醒,走添加到AQS队列的逻辑

代码语言:javascript
复制
// await指定时间,多了个时间到了自动醒。

public final long awaitNanos(long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();

    Node node = addConditionWaiter(); int savedState = fullyRelease(node);
    // deadline:当前线程最多挂起到什么时间点
    final long deadline = System.nanoTime() + nanosTimeout;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // nanosTimeout的时间小于等于0,直接告辞!!
        if (nanosTimeout <= 0L) {
        // 正常扔到AQS队列
        transferAfterCancelledWait(node);
        break;
    }

    // nanosTimeout的时间大于1000纳秒时,才可以挂起线程
    if (nanosTimeout >= spinForTimeoutThreshold)
        // 如果大于,正常挂起
        LockSupport.parkNanos(this, nanosTimeout);

        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
            // 计算剩余的挂起时间,可能需要重新的走while循环,再次挂起线程
        nanosTimeout = deadline - System.nanoTime();
    }

    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;

    if (node.nextWaiter != null)
        unlinkCancelledWaiters();

    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);

    // 剩余的挂起时间
    return deadline - System.nanoTime();

}

signalAll方法。这个方法一看就懂,之前signal是唤醒1个,这个是全部唤醒

代码语言:javascript
复制
// 以do-while的形式,将Condition单向链表中的所有Node,全部唤醒并扔到AQS队列
private void doSignalAll(Node first) {
    // 将头尾都置位null~
    lastWaiter = firstWaiter = null;
    do {
        // 拿到next节点的引用
        Node next = first.nextWaiter;
        // 断开当前Node的nextWaiter
        first.nextWaiter = null;
        // 修改Node状态,扔AQS队列,是否唤醒!
        transferForSignal(first);
        // 指向下一个节点
        first = next; 
    } while (first != null);
}
本文参与?腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2024-04-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ConditionObject的介绍&应用
  • Condition的构建方式&核心属性
  • Condition的await方法分析(前置分析)
  • Condition的signal方法分析
  • Conditiond的await方法分析(后置分析)
  • Condition的awaitNanos&signalAll方法分析
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com