当前位置:主页 > 查看内容

java并发编程之Lock锁

发布时间:2021-09-25 00:00| 位朋友查看

简介:java并发编程之Lock锁原理 概述 锁的类型 使用场景 Lock锁原理详解 总结 事先声明本文为原创文章禁止直接转载和抄袭若要转载需经过本人同意 概述 java锁机制是一个老生常谈的问题了那么在接触到锁的时候有没有想过什么情况下需要使用锁锁的原理又是什么接下……


事先声明:本文为原创文章,禁止直接转载和抄袭,若要转载需经过本人同意

概述

java锁机制是一个老生常谈的问题了,那么在接触到锁的时候有没有想过什么情况下需要使用锁?锁的原理又是什么?接下来,笔者就这两个问题展开对锁的解述。

锁的类型

常用的锁分为以下几类:
1、数据库锁
2、java内置锁
3、分布式锁
本篇文章笔者将对java内置锁–Lock锁做深入分析,其他两个锁笔者会陆续做出分享。

使用场景

首先,需要加锁的资源一定是临界资源,所谓临界资源就是在多线程的情况下,各个线程会进行抢占的资源。下面以一段代码来说明:

import org.junit.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class LockTest {

    private int count = 500;

    @Test
    public void unLockTest() throws InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 500; i++){
            threadPool.submit(() -> {
                    count--;
            });
        }
        Thread.sleep(5000);
        System.out.println(count);
    }
}

结果:2

代码很简单,创建了个核心线程数为10的线程池(这里需要注意的是在真实开发中禁止使用这种方式创建线程池),然后创建了500个任务,把这五百个任务里丢到线程池去处理,每个任务会对count进行减一。理想情况下,count应该为0,但实际count最后为2。其原因就是因为在多线程环境下,各个线程对count发生了资源的争夺,导致了数据的不安全性。其中,count就是临界资源,多线程就是我们常说的并发环境。

下面对代码进行修改:

    private int count = 500;
    @Test
    public void lockTest() throws InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        Lock lock = new ReentrantLock();
        for (int i = 0; i < 500; i++){
            threadPool.submit(() -> {
                lock.lock();
                count--;
                lock.unlock();
            });
        }
        Thread.sleep(5000);
        System.out.println(count);
    }
    
结果:0

代码整体没有太大变化,只是在count–前后做了加锁和减锁操作,最后无论代码运行多少次,结果都是0(需要注意的是这里忽略了可见性),没有出现未加锁时的少减情况。这就是锁的使用场景,无论是数据库锁、java内置锁还是分布式锁,他们的使用场景都大同小异。使用锁的目的就是为了控制临界资源的安全性。

Lock锁原理详解

在这里插入图片描述
如图,Lock只是一个接口,它有很多实现类,本文着重讲ReentrantLock。

ReetrantLock
在使用Lock前,先要new一个Lock出来,先来分析new ReentrantLock()做了什么,代码如下:

    //初始化Lock
    Lock lock = new ReentrantLock();
   
    //部分ReentrantLock()源码
    public class ReentrantLock implements Lock, java.io.Serializable {
   		private static final long serialVersionUID = 7373984872572414699L;
   		private final Sync sync;
    	public ReentrantLock() {
        sync = new NonfairSync();
    	}
    }
                                           源码1-1

如上源码所示,在初始化Lock时,会给ReentrantLock的成员变量sync赋值NonfairSync的实例。先来看看Sync是什么,代码如下:

	//部分Sync源码
	abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
        abstract void lock();
        }
                                          源码1-2

Sync是ReentrantLock的抽象内部类并且它实现了AQS抽象类,这里注意这个lock方法,该方法是一个抽象方法。Sync有两个实现类,分别是NonfairSync以及FairSync(其实这两个实现类就是Lock锁的公平锁与非公平锁机制)。看到这里就可以结合上面的源码串起来了,在使用无参的ReentrantLock创建Lock实例时,默认使用的是NonfairSync非公平锁机制。

lock方法
Lock锁加锁的方法是lock方法,下面来分析lock源码:

    //部分ReentrantLock()源码
    public class ReentrantLock implements Lock, java.io.Serializable {
   		private static final long serialVersionUID = 7373984872572414699L;
   		private final Sync sync;
    	public ReentrantLock() {
        sync = new NonfairSync();
    	}
    	public void lock() {
        	sync.lock();
    	}
    }
                                         源码2-1

如上源码所示,Lock类的lock方法实际调用的是其子类ReentrantLock的lock方法,而ReentrantLock中的lock方法则又调用了Sync的lock方法,而在“源码1-1”的时候我就已经知道sync的默认值为NonfairSync实例,所以这里的lock方法最终会调用到NonfairSync中的lock方法。

NonfairSync源码分析:

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
                                           源码3-1

先来看lock方法,首先会调用compareAndSetState(0, 1)方法,该方法是一个CAS操作,其作用就是修改state值为1,修改成功则返回true,修改失败则返回false。若是修改成功,则会执行setExclusiveOwnerThread(Thread.currentThread()),该方法的作用是修改exclusiveOwnerThread为当前线程id,下面先说明status和exclusiveOwnerThread这两个变量是什么,看源码:

//部分AQS源码
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 7373984972572414691L;
    
    protected AbstractQueuedSynchronizer() { }
    //CLH队列头结点
    private transient volatile Node head;
    //CLH队列尾结点
    private transient volatile Node tail;
    //锁标识state变量
    private volatile int state;
    }
                                           源码3-2
//AQS父类源码
public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 3737899427754241961L;

    protected AbstractOwnableSynchronizer() { }
    
    //独占线程id变量
    private transient Thread exclusiveOwnerThread;

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}
                                         源码3-3

在看完源码3-2和源码3-3后已经可以得知变量state和exclusiveOwnerThread的来源。那么在讲明这两个变量的作用,在Lock锁中,一个线程想要获取锁,那么需要满足一下两个条件之一:
条件一:该线程可以成功的将state值从0改为1
条件二:若state值不为1时,则检测持有锁的线程是否为当前线程,也就是exclusiveOwnerThread的值是否为当前线程id,若是则将state值加1,这就是重入锁。

看到这里对源码3-1的if逻辑也有了认知,那么接下来讲源码3-1的else逻辑,也就是acquire(1)方法,源码如下:

acquire方法

   //AQS部分源码
   public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
                                          源码4-1

该方法中主要调用了3个方法,分别是if判断中的tryAcquire以及acquireQueued方法和if体中的selfInterrupt方法。首先分析tryAcquire方法,源码如下:

tryAcquire源码

		//NonFairSync部分源码
    	static final class NonfairSync extends Sync {
        	private static final long serialVersionUID = 7316153563782823691L;
        	//tryAcquire方法
        	protected final boolean tryAcquire(int acquires) {
            	return nonfairTryAcquire(acquires);
        	}
        }
		//Sync部分源码
        abstract static class Sync extends AbstractQueuedSynchronizer {
        //nonfairTryAcquire方法
        final boolean nonfairTryAcquire(int acquires) {
        	//获取当前线程id
            final Thread current = Thread.currentThread();
            //获取锁标识
            int c = getState();
            //若锁标识为0,则当前线程尝试去获取锁
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    //当前线程获取锁成功,返回true
                    return true;
                }
            }
            //如果持有锁线程id和当前线程id相等,则进行重入锁
            else if (current == getExclusiveOwnerThread()) {
            	//锁标识加1
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                //设置锁标识state的值为nextc
                setState(nextc);
                //当前线程获取锁成功,返回true
                return true;
            }
            //当前线程获取锁失败。返回false
            return false;
        }
											源码4-2

如上源码所示,tryAcquire最终调用Sync中的nonfairTryAcquire方法,该方法主要逻辑是当前线程去获取锁,如果能够获取到则返回true,若不能获取到则返回false。具体细节看注释。接下来继续看acquireQueued方法源码,首先在源码4-1中可以看到该方法的参数是addWaiter(Node.EXCLUSIVE), arg),这里先看addWaiter(Node.EXCLUSIVE), arg)方法源码,如下:

addWaiter源码

//AQS部分源码
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 7373984972572414691L;
    
    protected AbstractQueuedSynchronizer() { }
    //CLH队列头结点
    private transient volatile Node head;
    //CLH队列尾结点
    private transient volatile Node tail;
    //锁标识state变量
    private volatile int state;
    //addWaiter方法
 	private Node addWaiter(Node mode) {
 		//将当前线程包装成一个Node对象
        Node node = new Node(Thread.currentThread(), mode);
        //获取CLH队列的尾结点
        Node pred = tail;
        //如果尾结点不为空,则说明整个CLH队列不为空,那么就将队尾结点设置为当前结点
        if (pred != null) {
            node.prev = pred;
            //cas操作将队尾结点设置为当前node
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //若调用到这里,则说明上述cas操作失败或CLH队列为空
        enq(node);
        return node;
    }
    
    //enq方法
 	private Node enq(final Node node) {
 		//cas自旋操作
        for (;;) {
        	//获取尾结点
            Node t = tail;
            //若尾结点为空,说明CLH队列为空,则初始化CLH队列
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
            	//若尾结点不为空,则cas操作将队尾结点设置为当前node(注意这里使用了自旋,所以这个替换最终是会成功的)
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
}

//Node源码
	static final class Node {
    
        static final Node SHARED = new Node();

        static final Node EXCLUSIVE = null;
		//生命状态1,表明该结点需要移除
        static final int CANCELLED =  1;
		//生命状态-1,等待触发状态
        static final int SIGNAL    = -1;
    	//生命状态-2,表明该节点在条件队列
        static final int CONDITION = -2;
        //生命状态-3,表明是共享的,需要向后传播
        static final int PROPAGATE = -3;
        //生命状态
        volatile int waitStatus;
		//前驱结点
        volatile Node prev;
		//后继结点
        volatile Node next;
		//当前结点对应线程
        volatile Thread thread;
		
        Node nextWaiter;
 }

                                          源码4-3

以上就是addWaiter方法源码,其作用就是将没获取到锁的线程包装成一个node结点,然后将CLH队列的尾结点设置为该node结点,这里需注意Node类中的的变量,尤其是waitStatus,具体细节看注释。接下来继续看acquireQueued方法源码,如下:

acquireQueued源码

//AQS部分源码
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    //acquireQueued方法
	final boolean acquireQueued(final Node node, int arg) {
		//该变量在lock方法中的调用链中无实际作用
        boolean failed = true;
        try {
        	//设置线程中断信号为false
            boolean interrupted = false;
            //cas自旋
            for (;;) {
            	//获取当前node的前一个结点
                final Node p = node.predecessor();
                //若前一个结点是头结点,则去获取锁,若获取锁成功,则将当前node设置为头结点
                //(在setHead中会将当前node的前驱结点以及对应的线程置为null),原来的head指向空(指向空,在gc时就是回收原来的head)
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    //返回中断信号
                    return interrupted;
                }
                //该if判断中主要是设置了当前node的前驱节点的生命状态(waitStatus)以及阻塞当前线程操作
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //返回中断信号
                    interrupted = true;
            }
        } finally {
        	//将当前node从CLH队列中剔除掉,在lock方法的调用链中无实际作用
            if (failed)
                cancelAcquire(node);
        }
    }
}
                                         源码4-4

以上就是acquireQueued方法源码,该方法的作用就是线程尝试获取锁,若获取不到则进行线程阻塞操作,其中在if判断中调用了两个方法:shouldParkAfterFailedAcquire和parkAndCheckInterrupt(该方法阻塞线程)。这里需要注意的是提到了结点的生命状态(waitStatus),接下来继续看shouldParkAfterFailedAcquire方法源码,如下:

shouldParkAfterFailedAcquireuy源码

//AQS部分源码
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    //shouldParkAfterFailedAcquire方法
	private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
		//获取当前结点前驱结点的生命状态
        int ws = pred.waitStatus;
        //如果前驱节点的生命状态为SIGNAL,则返回true
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        //如果前驱节点的生命状态大于0,则为取消状态,进行结点移除 
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            //将前驱节点的waitStaus设置为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
}
                                            源码4-5

以上就是shouldParkAfterFailedAcquire方法源码,该方法就是将当前节点的前驱结点waiteStatus设置为-1,那么为什么设置为-1呢?在CLH队列中,只有当前结点的waitStatus为-1,那么当前结点对应线程在释放锁时才会唤醒下一个结点。接下来继续看parkAndCheckInterrupt源码:

parkAndCheckInterrupt源码

//AQS部分源码
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
 	private final boolean parkAndCheckInterrupt() {
 		//阻塞线程
        LockSupport.park(this);
        //返回线程的中断状态
        return Thread.interrupted();
    }
}

以上就是parkAndCheckInterrupt方法源码,该方法主要方法是LockSupport.park(this),此方法会调用Unsafe类中的park方法继而调用到操作系统的函数,最终阻塞当前线程。

以上就是Lock锁的lock方法原理,接下来继续讲unLock方法。

unLock源码

	//部分ReentrantLock源码
	public class ReentrantLock implements Lock, java.io.Serializable {
   		private static final long serialVersionUID = 7373984872572414699L;
		//unLock方法
		public void unlock() {
        	sync.release(1);
    	}

	public abstract class AbstractQueuedSynchronizer
    	extends AbstractOwnableSynchronizer
    	implements java.io.Serializable {
    	private static final long serialVersionUID = 7373984972572414691L;
    	//release方法
  		public final boolean release(int arg) {
        	if (tryRelease(arg)) {
            	Node h = head;
            	if (h != null && h.waitStatus != 0)
            		//唤醒CLH队列第一个线程
                	unparkSuccessor(h);
            	return true;
        	}
        	return false;
    }
    //unparkSuccessor方法
	private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
        	//唤醒指定线程id的线程
            LockSupport.unpark(s.thread);
    }

以上就是unLock方法的调用链以及源码,其核心方法LockSupport.unpark(s.thread)会继续调用Unsafe类的unpark方法,该方法会调用到操作系统的函数继而去唤醒指定线程。

总结

到这里,Lock锁的地原理和机制已经全部梳理完了。不过还有一些细节,比如中断信号,cas原子操作没有详细讲,但这些不影整体的逻辑梳理。若是对文章中的一些点有疑惑,可在评论区留言或私信我。

;原文链接:https://blog.csdn.net/qq_42871327/article/details/115970315
本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!
上一篇:BUUCTF_ Misc_1简单思路(前20题) 下一篇:没有了

推荐图文


随机推荐