首先说说LockSupport吧,它的作用是提供一组直接block或unblock线程的方法,其底层实现利用了Unsafe(前面文章有讲过Unsafe)。LockSupport是一个非常底层的API,我们利用其可以做很多事情,本文将利用LockSupport实现互斥锁和共享锁。
在JDK中已经提供了很多种锁的实现,原生的synchronized(优先推荐使用),juc中的ReentrantLock等,本文不纠结synchronized和ReentrantLock的实现,本文只从Lock的语义出发实现两种锁。
juc中对于Lock接口的定义如下:
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long var1, TimeUnit var3) throws InterruptedException;
void unlock();
Condition newCondition();
以上的关于锁的语义稍微复杂了点,特别是相应中断部分和newCondition部分,所以这次实现上简化了Lock的语义如下:
void lock();
void unLock();
boolean tryLock();
boolean tryLock(long maxWaitInMills);
基本功能和上面保持一致,但是都不响应中断
public class MutexLock implements Lock {
private volatile Thread threadOwnsTheLock;
private final AtomicInteger state = new AtomicInteger(0);
private final ConcurrentLinkedQueue<Thread> waitThreadsQueue = new ConcurrentLinkedQueue<Thread>();
//一直等待
public void lock() {
tryLock(-1L);
}
//invoke all的语义,也可以做invokeNext
public void unLock() {
tryRelease(-1);
threadOwnsTheLock = null;
if (!waitThreadsQueue.isEmpty()) {
for (Thread thread : waitThreadsQueue) {
LockSupport.unpark(thread);
}
}
}
public boolean tryLock() {
if (threadOwnsTheLock != null && (threadOwnsTheLock == Thread.currentThread())) {
return true;
}
if (tryAcquire(1)) {
threadOwnsTheLock = Thread.currentThread();
return true;
}
return false;
}
//没有实现interrupt的语义,不能打断
public boolean tryLock(long maxWaitInMills) {
Thread currentThread = Thread.currentThread();
try {
waitThreadsQueue.add(currentThread);
if (maxWaitInMills > 0) {
boolean acquired = false;
long left = maxWaitInMills * 1000L * 1000L;
long cost = 0;
while (true) {
//需要判断一次interrupt
if (tryAcquire(1)) {
threadOwnsTheLock = currentThread;
acquired = true;
break;
}
left = left - cost;
long mark = System.nanoTime();
if (left <= 0) {
break;
}
LockSupport.parkNanos(left);
cost = mark - System.nanoTime();
}
return acquired;
}else {
while (true) {
if (tryAcquire(1)) {
threadOwnsTheLock = currentThread;
break;
}
LockSupport.park();
}
return true;
}
} finally {
waitThreadsQueue.remove(currentThread);
}
}
protected boolean tryAcquire(int acquire) {
return state.compareAndSet(0, 1);
}
protected void tryRelease(int release) {
if (threadOwnsTheLock == null || (threadOwnsTheLock != Thread.currentThread())) {
System.out.println("Wrong state, this thread don't own this lock.");
}
while (true) {
if (state.compareAndSet(1, 0)) {
return;
}
}
}
}
以上互斥锁使用了一个AtomicInteger,利用了CAS来维持锁的状态
public class ShareLock implements Lock {
private volatile Set<Thread> threadsOwnsLock = Sets.newConcurrentHashSet();
private final AtomicInteger state;
private final ConcurrentLinkedQueue<Thread> waitThreadsQueue = new ConcurrentLinkedQueue<Thread>();
public ShareLock(int shareNum) {
this.state = new AtomicInteger(shareNum);
}
//一直等待
public void lock() {
tryLock(-1L);
}
public void unLock() {
tryRelease(-1);
threadsOwnsLock.remove(Thread.currentThread());
if (!waitThreadsQueue.isEmpty()) {
for (Thread thread : waitThreadsQueue) {
LockSupport.unpark(thread);
}
}
}
public boolean tryLock() {
if ( !(threadsOwnsLock.contains(Thread.currentThread()))) {
return true;
}
if (tryAcquire(1)) {
threadsOwnsLock.add(Thread.currentThread());
return true;
}
return false;
}
public boolean tryLock(long maxWaitInMills) {
Thread currentThread = Thread.currentThread();
try {
waitThreadsQueue.add(currentThread);
if (maxWaitInMills > 0) {
boolean acquired = false;
long left = TimeUnit.MILLISECONDS.toNanos(maxWaitInMills);
long cost = 0;
while (true) {
if (tryAcquire(1)) {
threadsOwnsLock.add(Thread.currentThread());
acquired = true;
break;
}
left = left - cost;
long mark = System.nanoTime();
if (left <= 0) {
break;
}
LockSupport.parkNanos(left);
cost = mark - System.nanoTime(); //有可能是被唤醒重新去获取锁,没获取到还得继续等待剩下的时间(并不精确)
}
return acquired;
}else {
while (true) {
if (tryAcquire(1)) {
threadsOwnsLock.add(Thread.currentThread());
break;
}
LockSupport.park();
}
return true;
}
} finally {
waitThreadsQueue.remove(currentThread);
}
}
protected boolean tryAcquire(int acquire) {
if (state.getAndDecrement() > 0) {
return true;
} else {
state.getAndIncrement();//恢复回来
return false;
}
}
protected void tryRelease(int release) {
if (!(threadsOwnsLock.contains(Thread.currentThread()))) {
System.out.println("Wrong state, this thread don't own this lock.");
}
state.getAndIncrement();
}
}
以上利用了LockSupport来实现了互斥锁和共享锁,但是实现中并没有完成中断响应。后面应该会有文章单独说明关于InterruptedException的注意点。下篇文章将讲述如何利用LockSupport实现Future语义