前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊PowerJob的UseCacheLock

聊聊PowerJob的UseCacheLock

原创
作者头像
code4it
发布2024-01-13 21:29:34
1100
发布2024-01-13 21:29:34
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下PowerJob的UseCacheLock

UseCacheLock

tech/powerjob/server/core/lock/UseCacheLock.java

代码语言:javascript
复制
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface UseCacheLock {

    String type();

    String key();

    int concurrencyLevel();
}

UseCacheLock注解定义了type、key、concurrencyLevel属性

UseCacheLockAspect

tech/powerjob/server/core/lock/UseCacheLockAspect.java

代码语言:javascript
复制
@Slf4j
@Aspect
@Component
@Order(1)
@RequiredArgsConstructor
public class UseCacheLockAspect {

    private final MonitorService monitorService;

    private final Map<String, Cache<String, ReentrantLock>> lockContainer = Maps.newConcurrentMap();

    private static final long SLOW_THRESHOLD = 100;

    @Around(value = "@annotation(useCacheLock))")
    public Object execute(ProceedingJoinPoint point, UseCacheLock useCacheLock) throws Throwable {
        Cache<String, ReentrantLock> lockCache = lockContainer.computeIfAbsent(useCacheLock.type(), ignore -> {
            int concurrencyLevel = useCacheLock.concurrencyLevel();
            log.info("[UseSegmentLockAspect] create Lock Cache for [{}] with concurrencyLevel: {}", useCacheLock.type(), concurrencyLevel);
            return CacheBuilder.newBuilder()
                    .initialCapacity(300000)
                    .maximumSize(500000)
                    .concurrencyLevel(concurrencyLevel)
                    .expireAfterWrite(30, TimeUnit.MINUTES)
                    .build();
        });
        final Method method = AOPUtils.parseMethod(point);
        Long key = AOPUtils.parseSpEl(method, point.getArgs(), useCacheLock.key(), Long.class, 1L);
        final ReentrantLock reentrantLock = lockCache.get(String.valueOf(key), ReentrantLock::new);
        long start = System.currentTimeMillis();
        reentrantLock.lockInterruptibly();
        try {
            long timeCost = System.currentTimeMillis() - start;
            if (timeCost > SLOW_THRESHOLD) {

                final SlowLockEvent slowLockEvent = new SlowLockEvent()
                        .setType(SlowLockEvent.Type.LOCAL)
                        .setLockType(useCacheLock.type())
                        .setLockKey(String.valueOf(key))
                        .setCallerService(method.getDeclaringClass().getSimpleName())
                        .setCallerMethod(method.getName())
                        .setCost(timeCost);

                monitorService.monitor(slowLockEvent);

                log.warn("[UseSegmentLockAspect] wait lock for method({}#{}) cost {} ms! key = '{}', args = {}, ", method.getDeclaringClass().getSimpleName(), method.getName(), timeCost,
                        key,
                        JSON.toJSONString(point.getArgs()));
            }
            return point.proceed();
        } finally {
            reentrantLock.unlock();
        }
    }
}

UseCacheLockAspect拦截@UseCacheLock注解,它定义了lockContainer维护了useCacheLock.type()与Cache的关系;Cache采用的guava的Cache,其initialCapacity为300000,maximumSize为500000,expireAfterWrite为30分钟;Cache的key为lock key,value为ReentrantLock;其execute方法主要是先执行reentrantLock.lockInterruptibly(),然后执行point.proceed(),最后reentrantLock.unlock();执行point.proceed()之前还判断了一下加锁耗时,若超过SLOW_THRESHOLD(100ms)则通过monitorService.monitor上报SlowLockEvent

示例

代码语言:javascript
复制
    @UseCacheLock(type = "processJobInstance", key = "#instanceId", concurrencyLevel = 1024)
    public void redispatchAsync(Long instanceId, int originStatus) {
        // 将状态重置为等待派发
        instanceInfoRepository.updateStatusAndGmtModifiedByInstanceIdAndOriginStatus(instanceId, originStatus, InstanceStatus.WAITING_DISPATCH.getV(), new Date());
    }

key支持SpEl

小结

PowerJob的UseCacheLock注解定义了type、key、concurrencyLevel属性;UseCacheLockAspect拦截@UseCacheLock注解,它定义了lockContainer维护了useCacheLock.type()与Cache的关系;而Cache的key为lock key,value为ReentrantLock,最后是通过reentrantLock.lockInterruptibly()加锁。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • UseCacheLock
  • UseCacheLockAspect
  • 示例
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com