前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊PowerJob的ServerDiscoveryService

聊聊PowerJob的ServerDiscoveryService

原创
作者头像
code4it
发布2023-12-23 20:16:20
1030
发布2023-12-23 20:16:20
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下PowerJob的ServerDiscoveryService

ServerDiscoveryService

tech/powerjob/worker/background/ServerDiscoveryService.java

代码语言:javascript
复制
@Slf4j
public class ServerDiscoveryService {

    private final Long appId;
    private final PowerJobWorkerConfig config;

    private String currentServerAddress;

    private final Map<String, String> ip2Address = Maps.newHashMap();

    /**
     *  服务发现地址
     */
    private static final String DISCOVERY_URL = "http://%s/server/acquire?%s";
    /**
     * 失败次数
     */
    private static int FAILED_COUNT = 0;
    /**
     * 最大失败次数
     */
    private static final int MAX_FAILED_COUNT = 3;


    public ServerDiscoveryService(Long appId, PowerJobWorkerConfig config) {
        this.appId = appId;
        this.config = config;
    }

    //......
}    

ServerDiscoveryService定义了currentServerAddress、ip2Address、服务发现url模版,失败次数,最大失败次数

start

代码语言:javascript
复制
    public void start(ScheduledExecutorService timingPool) {
        this.currentServerAddress = discovery();
        if (StringUtils.isEmpty(this.currentServerAddress) && !config.isEnableTestMode()) {
            throw new PowerJobException("can't find any available server, this worker has been quarantined.");
        }
        // 这里必须保证成功
        timingPool.scheduleAtFixedRate(() -> {
                    try {
                        this.currentServerAddress = discovery();
                    } catch (Exception e) {
                        log.error("[PowerDiscovery] fail to discovery server!", e);
                    }
                }
                , 10, 10, TimeUnit.SECONDS);
    }

其start方法先通过discovery方法获取currentServerAddress,然后注册定时任务每隔10s重新刷新一下currentServerAddress

discovery

代码语言:javascript
复制
    private String discovery() {

        if (ip2Address.isEmpty()) {
            config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x));
        }

        String result = null;

        // 先对当前机器发起请求
        String currentServer = currentServerAddress;
        if (!StringUtils.isEmpty(currentServer)) {
            String ip = currentServer.split(":")[0];
            // 直接请求当前Server的HTTP服务,可以少一次网络开销,减轻Server负担
            String firstServerAddress = ip2Address.get(ip);
            if (firstServerAddress != null) {
                result = acquire(firstServerAddress);
            }
        }

        for (String httpServerAddress : config.getServerAddress()) {
            if (StringUtils.isEmpty(result)) {
                result = acquire(httpServerAddress);
            }else {
                break;
            }
        }

        if (StringUtils.isEmpty(result)) {
            log.warn("[PowerDiscovery] can't find any available server, this worker has been quarantined.");

            // 在 Server 高可用的前提下,连续失败多次,说明该节点与外界失联,Server已经将秒级任务转移到其他Worker,需要杀死本地的任务
            if (FAILED_COUNT++ > MAX_FAILED_COUNT) {

                log.warn("[PowerDiscovery] can't find any available server for 3 consecutive times, It's time to kill all frequent job in this worker.");
                List<Long> frequentInstanceIds = HeavyTaskTrackerManager.getAllFrequentTaskTrackerKeys();
                if (!CollectionUtils.isEmpty(frequentInstanceIds)) {
                    frequentInstanceIds.forEach(instanceId -> {
                        HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.removeTaskTracker(instanceId);
                        taskTracker.destroy();
                        log.warn("[PowerDiscovery] kill frequent instance(instanceId={}) due to can't find any available server.", instanceId);
                    });
                }

                FAILED_COUNT = 0;
            }
            return null;
        } else {
            // 重置失败次数
            FAILED_COUNT = 0;
            log.debug("[PowerDiscovery] current server is {}.", result);
            return result;
        }
    }

discovery方法从config.getServerAddress()解析地址放到ip2Address,若currentServerAddress有值则acquire,否则遍历config.getServerAddress()执行acquire;若还没有获取到则判断FAILED_COUNT是否超出MAX_FAILED_COUNT,超出则遍历HeavyTaskTrackerManager.getAllFrequentTaskTrackerKeys()挨个执行remove及destory

acquire

代码语言:javascript
复制
    private String acquire(String httpServerAddress) {
        String result = null;
        String url = buildServerDiscoveryUrl(httpServerAddress);
        try {
            result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url));
        }catch (Exception ignore) {
        }
        if (!StringUtils.isEmpty(result)) {
            try {
                ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class);
                if (resultDTO.isSuccess()) {
                    return resultDTO.getData().toString();
                }
            }catch (Exception ignore) {
            }
        }
        return null;
    }

    private String buildServerDiscoveryUrl(String address) {

        ServerDiscoveryRequest serverDiscoveryRequest = new ServerDiscoveryRequest()
                .setAppId(appId)
                .setCurrentServer(currentServerAddress)
                .setProtocol(config.getProtocol().name().toUpperCase());

        String query = Joiner.on(OmsConstant.AND).withKeyValueSeparator(OmsConstant.EQUAL).join(serverDiscoveryRequest.toMap());
        return String.format(DISCOVERY_URL, address, query);
    }    

acquire方法通过buildServerDiscoveryUrl构建url,然后执行HttpUtils.get(url)获取地址

ServerController

tech/powerjob/server/web/controller/ServerController.java

代码语言:javascript
复制
@RestController
@RequestMapping("/server")
@RequiredArgsConstructor
public class ServerController implements ServerInfoAware {

    private ServerInfo serverInfo;
    private final TransportService transportService;

    private final ServerElectionService serverElectionService;

    private final AppInfoRepository appInfoRepository;

    private final WorkerClusterQueryService workerClusterQueryService;

    //......

    @GetMapping("/acquire")
    public ResultDTO<String> acquireServer(ServerDiscoveryRequest request) {
        return ResultDTO.success(serverElectionService.elect(request));
    }

    //......    
}    

acquireServer方法执行serverElectionService.elect(request)返回server地址

elect

tech/powerjob/server/remote/server/election/ServerElectionService.java

代码语言:javascript
复制
    public String elect(ServerDiscoveryRequest request) {
        if (!accurate()) {
            final String currentServer = request.getCurrentServer();
            // 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功
            Optional<ProtocolInfo> localProtocolInfoOpt = Optional.ofNullable(transportService.allProtocols().get(request.getProtocol()));
            if (localProtocolInfoOpt.isPresent() && localProtocolInfoOpt.get().getAddress().equals(currentServer)) {
                log.debug("[ServerElectionService] this server[{}] is worker's current server, skip check", currentServer);
                return currentServer;
            }
        }
        return getServer0(request);
    }

elect方法判断如果是本机就直接返回,否则执行getServer0

getServer0

代码语言:javascript
复制
    private String getServer0(ServerDiscoveryRequest discoveryRequest) {

        final Long appId = discoveryRequest.getAppId();
        final String protocol = discoveryRequest.getProtocol();
        Set<String> downServerCache = Sets.newHashSet();

        for (int i = 0; i < RETRY_TIMES; i++) {

            // 无锁获取当前数据库中的Server
            Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId);
            if (!appInfoOpt.isPresent()) {
                throw new PowerJobException(appId + " is not registered!");
            }
            String appName = appInfoOpt.get().getAppName();
            String originServer = appInfoOpt.get().getCurrentServer();
            String activeAddress = activeAddress(originServer, downServerCache, protocol);
            if (StringUtils.isNotEmpty(activeAddress)) {
                return activeAddress;
            }

            // 无可用Server,重新进行Server选举,需要加锁
            String lockName = String.format(SERVER_ELECT_LOCK, appId);
            boolean lockStatus = lockService.tryLock(lockName, 30000);
            if (!lockStatus) {
                try {
                    Thread.sleep(500);
                }catch (Exception ignore) {
                }
                continue;
            }
            try {

                // 可能上一台机器已经完成了Server选举,需要再次判断
                AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database."));
                String address = activeAddress(appInfo.getCurrentServer(), downServerCache, protocol);
                if (StringUtils.isNotEmpty(address)) {
                    return address;
                }

                // 篡位,如果本机存在协议,则作为Server调度该 worker
                final ProtocolInfo targetProtocolInfo = transportService.allProtocols().get(protocol);
                if (targetProtocolInfo != null) {
                    // 注意,写入 AppInfoDO#currentServer 的永远是 default 的地址,仅在返回的时候特殊处理为协议地址
                    appInfo.setCurrentServer(transportService.defaultProtocol().getAddress());
                    appInfo.setGmtModified(new Date());

                    appInfoRepository.saveAndFlush(appInfo);
                    log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);
                    return targetProtocolInfo.getAddress();
                }
            }catch (Exception e) {
                log.error("[ServerElection] write new server to db failed for app {}.", appName, e);
            } finally {
                lockService.unlock(lockName);
            }
        }
        throw new PowerJobException("server elect failed for app " + appId);
    }

getServer0方法先判断appInfoRepository中当前appId的originServer是否存活,是则直接返回,否则加锁将transportService.defaultProtocol().getAddress()写入到appInfo的currentServer

小结

PowerJob的ServerDiscoveryService定义了start方法,它先通过discovery方法获取currentServerAddress,然后注册定时任务每隔10s重新刷新一下currentServerAddress;discovery方法主要是遍历config.getServerAddress()执行acquire;acquire方法通过buildServerDiscoveryUrl构建url,然后执行HttpUtils.get(url)获取该appId的server地址。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ServerDiscoveryService
  • start
  • discovery
  • acquire
  • ServerController
  • elect
  • getServer0
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com