AQS(AbstractQueuedSynchronizer),抽象队列同步器是用来构建锁或者其他同步器组件的重量级基础框架及整个JUC体系的基石,内置FIFO队列来完成资源获取线程的排队工作,并通过一个int类型变量(state)表示持有锁的状态Reentrantlock、CountDownLatch、等juc锁和工具类.
阻塞就要排队,排队就要必然需要队列:
????AQS使用一个volatile的int类型的成员变量来表示同步状态,
????通过内置的FIFO队列来完成资源获取的工作将每条要去抢占的资源的线程封装成一个Node节点来实现锁的分配,
????通过CAS完成对State值的修改。
AQS??CLH队列双端队列+state,第一个节点是哨兵节点,真正有数据的是第二个节点
To enqueue into a CLH lock, you atomically splice it in as new
?????* tail. To dequeue, you just set the head field.
?????*
?????*??????+------+??prev +-----+???????+-----+
?????* head |??????| <---- |?????| <---- |?????|??tail
?????*??????+------+???????+-----+???????+-----+
?????*
主要源码参数:
Node节点:包含转载一个一个线程,还有前指针prev,后指针next,waitStatus等待状态
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized.??Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;
Reentrantlock底层就是调用Sync
公平锁的tryacquire有个hasQueuedPredecessors,直接进入队列
以非公平锁为例子,如果有新的线程过来可以抢锁,抢到了会不会进入clh队列
lock()方法
有个内部类同步器Sync,Reentrantlock调用lock()(入口),实际调用的是同步器Sync的lock,Sync是继承
AbstractQueuedSynchronizer
public void lock() {
????sync.lock();
}
lock分为公平和非公平,默认是非公平,先利用CAS判断State状态,如果state是0,将其设置为1,设置当前占用的线程,抢锁,
如果抢锁失败就调用acquire,加入阻塞队列
final void lock() {
????if (compareAndSetState(0, 1))
????????setExclusiveOwnerThread(Thread.currentThread());
????else
????????acquire(1);
}
acquire方法也是AQS的
public final void acquire(int arg) {
????if (!tryAcquire(arg) &&
????????acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
????????selfInterrupt();
}
tryAcquire方法,尝试获取锁:
????AQS方法直接抛出异常,子类必须重写,
??????protected boolean tryAcquire(int arg) {
????????????throw new UnsupportedOperationException();
????????}
????Reentrantlcok重写方法(重写了两个,一个是公平锁一个是非公平锁):
?? ?protected final boolean tryAcquire(int acquires) {
????? ??return nonfairTryAcquire(acquires);
?? ?}
?? ?非公平锁调用nonfairTryAcquire
????????????final boolean nonfairTryAcquire(int acquires) {
????????????????final Thread current = Thread.currentThread();
????????????????int c = getState();
????????????????if (c == 0) { // aqs的state不是0,极端情况,上一个线程刚处理完,把state修改为0
????????????????????if (compareAndSetState(0, acquires)) {
????????????????????????setExclusiveOwnerThread(current);
????????????????????????return true;
????????????????????}
????????????????}
????????????????else if (current == getExclusiveOwnerThread()) {// 当前运行的线程也不是,上个线程办理完之后,又被该线程抢到锁
????????????????????int nextc = c + acquires;
????????????????????if (nextc < 0) // overflow
????????????????????????throw new Error("Maximum lock count exceeded");
????????????????????setState(nextc);
????????????????????return true;
????????????????}
????????????????return false;
????????????}
addWaiter方,入队,封装Node节点加入等待队列,第一次进来,队列没有元素,走 enq(node),初始化列表节点,
底层也是CAS+自旋锁
?????private Node addWaiter(Node mode) {
????????????Node node = new Node(Thread.currentThread(), mode);
????????????// Try the fast path of enq; backup to full enq on failure
????????????Node pred = tail;
????????????if (pred != null) {
????????????????node.prev = pred;
????????????????if (compareAndSetTail(pred, node)) {
????????????????????pred.next = node;
????????????????????return node;
????????????????}
????????????}
????????????enq(node);
????????????return node;
????????}
????????如果尾结点,是为null,初始化队列,是底层自己创建的一个伪节点,为了统一后续节点,这样就形成了一个头尾相连的队列
????????接下来才将第一个线程的Node,设置为尾节点
????????private Node enq(final Node node) {
????????????????for (;;) {
????????????????????Node t = tail;
????????????????????if (t == null) { // Must initialize
????????????????????????if (compareAndSetHead(new Node()))
????????????????????????????tail = head;
????????????????????} else {
????????????????????????node.prev = t;
????????????????????????if (compareAndSetTail(t, node)) {
????????????????????????????t.next = node;
????????????????????????????return t;
????????????????????????}
????????????????????}
????????????????}
????????????}
acquireQueued方法,主要是对队列的node线程阻塞,会再一次抢锁
????final boolean acquireQueued(final Node node, int arg) {
????????boolean failed = true;
????????try {
????????????boolean interrupted = false;
????????????for (;;) {
????????????????final Node p = node.predecessor();
????????????????if (p == head && tryAcquire(arg)) { // 再去抢一下锁
????????????????????setHead(node);
????????????????????p.next = null; // help GC
????????????????????failed = false;
????????????????????return interrupted;
????????????????}
????????????????if (shouldParkAfterFailedAcquire(p, node) &&
????????????????????parkAndCheckInterrupt()) // 上面再一次抢占失败之后park
????????????????????interrupted = true;
????????????}
????????} finally {
????????????if (failed) // 取消排队
????????????????cancelAcquire(node);
????????}
????}
????private final boolean parkAndCheckInterrupt() {
????????LockSupport.park(this);
????????return Thread.interrupted();
????}
??????parkAndCheckInterrupt())之后队列的阻塞一种等着,自旋锁循环尝试
=====================
unlock()方法
????public void unlock() {
????????sync.release(1);
????}
???public final boolean release(int arg) {
????????if (tryRelease(arg)) { // 返回true
????????????Node h = head;
????????????if (h != null && h.waitStatus != 0)
????????????????unparkSuccessor(h);
????????????return true;
????????}
????????return false;
????}
????底层还是调用aqs的release,但是子类需要自己实现,设置标志位state为0.当前线程设为空,变为空闲返回true
???protected final boolean tryRelease(int releases) {
????????????int c = getState() - releases;
????????????if (Thread.currentThread() != getExclusiveOwnerThread())
????????????????throw new IllegalMonitorStateException();
????????????boolean free = false;
????????????if (c == 0) {
????????????????free = true;
????????????????setExclusiveOwnerThread(null);
????????????}
????????????setState(c);
????????????return free;
????????}
????unparkSuccessor(h); 解锁,uppark,哨兵节点的下一个结点(真实结点)进行抢占
??????LockSupport.unpark(s.thread);
由于acquireQueued一直循环着,B 线程获取到锁,又会改变头部节点,将B节点设置为哨兵节点,队列后面节点也就乖乖排队。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。