这里是我们自己实现,调用redission的方法,获取锁,然后加锁。lock.lock(expireTime, timeUnit)
是关键,我们追进去。
/**
* 获取锁,如果没有主动调用unlock解锁,expireTime后会自动释放
* @param lockKey
* @param expireTime 如果没有调用unlock解锁,expireTime 后自动释放
* @param timeUnit 时间单位
* @return
*/
public RLock lock(String lockKey,Integer expireTime,TimeUnit timeUnit){
RLock lock = redisson.getLock(lockKey);
lock.lock(expireTime, timeUnit);
logger.info("【Redisson lock】success to acquire lock for [ "+lockKey+" ],expire time:"+expireTime+timeUnit);
return lock;
}
进入 lock.lock(expireTime, timeUnit);
/**
* Acquires the lock.
*
* <p>If the lock is not available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until the
* lock has been acquired.
*
* If the lock is acquired, it is held until <code>unlock</code> is invoked,
* or until leaseTime milliseconds have passed
* since the lock was granted - whichever comes first.
*
* @param leaseTime the maximum time to hold the lock after granting it,
* before automatically releasing it if it hasn't already been released by invoking <code>unlock</code>.
* If leaseTime is -1, hold the lock until explicitly unlocked.
* @param unit the time unit of the {@code leaseTime} argument
*
*/
void lock(long leaseTime, TimeUnit unit);
进入实现方法
@Override
public void lock(long leaseTime, TimeUnit unit) {
try {
lockInterruptibly(leaseTime, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
进入 lockInterruptibly(leaseTime, unit);
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
进入Long ttl = tryAcquire(leaseTime, unit, threadId);
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
进入
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
进入核心tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG)
这里是加锁的核心方法:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
//时间转化为毫秒值
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
这里最终的是执行了一段具有原子性的lua脚本,由CommandAsynExecutor执行; 这个锁最终持久化到redis时,使用的是hash类型的key field value; 这里注意下最后一行几个参数的对应关系: getName()), internalLockLeaseTime, getLockName(threadId) 分别是key[1],ARGV[1],ARGV[2];
Lua脚本中的执行分为以下三步: