前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊PowerJob的OmsLogHandler

聊聊PowerJob的OmsLogHandler

原创
作者头像
code4it
发布2023-12-24 21:34:33
1540
发布2023-12-24 21:34:33
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下PowerJob的OmsLogHandler

OmsLogHandler

tech/powerjob/worker/background/OmsLogHandler.java

代码语言:javascript
复制
@Slf4j
public class OmsLogHandler {

    private final String workerAddress;
    private final Transporter transporter;
    private final ServerDiscoveryService serverDiscoveryService;

    // 处理线程,需要通过线程池启动
    public final Runnable logSubmitter = new LogSubmitter();
    // 上报锁,只需要一个线程上报即可
    private final Lock reportLock = new ReentrantLock();
    // 生产者消费者模式,异步上传日志
    private final BlockingQueue<InstanceLogContent> logQueue = Queues.newLinkedBlockingQueue(10240);

    // 每次上报携带的数据条数
    private static final int BATCH_SIZE = 20;
    // 本地囤积阈值
    private static final int REPORT_SIZE = 1024;

    public OmsLogHandler(String workerAddress, Transporter transporter, ServerDiscoveryService serverDiscoveryService) {
        this.workerAddress = workerAddress;
        this.transporter = transporter;
        this.serverDiscoveryService = serverDiscoveryService;
    }

    /**
     * 提交日志
     * @param instanceId 任务实例ID
     * @param logContent 日志内容
     */
    public void submitLog(long instanceId, LogLevel logLevel, String logContent) {

        if (logQueue.size() > REPORT_SIZE) {
            // 线程的生命周期是个不可循环的过程,一个线程对象结束了不能再次start,只能一直创建和销毁
            new Thread(logSubmitter).start();
        }

        InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logLevel.getV(), logContent);
        boolean offerRet = logQueue.offer(tuple);
        if (!offerRet) {
            log.warn("[OmsLogHandler] [{}] submit log failed, maybe your log speed is too fast!", instanceId);
        }
    }

    //......
}    

OmsLogHandler提供了submitLog方法,它先判断logQueue大小是否超过REPORT_SIZE(1024),超过则通过异步线程执行logSubmitter;接着将内容包装为InstanceLogContent,然后放入到logQueue

LogSubmitter

代码语言:javascript
复制
    private class LogSubmitter implements Runnable {

        @Override
        public void run() {

            boolean lockResult = reportLock.tryLock();
            if (!lockResult) {
                return;
            }

            try {

                final String currentServerAddress = serverDiscoveryService.getCurrentServerAddress();
                // 当前无可用 Server
                if (StringUtils.isEmpty(currentServerAddress)) {
                    if (!logQueue.isEmpty()) {
                        logQueue.clear();
                        log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded all logs.");
                    }
                    return;
                }

                List<InstanceLogContent> logs = Lists.newLinkedList();

                while (!logQueue.isEmpty()) {
                    try {
                        InstanceLogContent logContent = logQueue.poll(100, TimeUnit.MILLISECONDS);
                        logs.add(logContent);

                        if (logs.size() >= BATCH_SIZE) {
                            WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, Lists.newLinkedList(logs));
                            // 不可靠请求,WEB日志不追求极致
                            TransportUtils.reportLogs(req, currentServerAddress, transporter);
                            logs.clear();
                        }

                    }catch (Exception ignore) {
                        break;
                    }
                }

                if (!logs.isEmpty()) {
                    WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, logs);
                    TransportUtils.reportLogs(req, currentServerAddress, transporter);
                }

            }finally {
                reportLock.unlock();
            }
        }
    }

LogSubmitter实现了Runnable接口,其run方法先通过reportLock加锁,成功才继续,它通过serverDiscoveryService.getCurrentServerAddress()获取当前server的地址,若获取不到则清空logQueue;否则while循环,每次从logQueue拉取InstanceLogContent,放到linkedList,超过BATCH_SIZE(20)则创建WorkerLogReportReq,通过TransportUtils.reportLogs(req, currentServerAddress, transporter)上报,然后清空linkedList,跳出循环之后再上报剩下的日志,最后释放锁

reportLogs

tech/powerjob/worker/common/utils/TransportUtils.java

代码语言:javascript
复制
    public static void reportLogs(WorkerLogReportReq req, String address, Transporter transporter) {
        final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_REPORT_LOG, address);
        transporter.tell(url, req);
    }

    public static URL easyBuildUrl(ServerType serverType, String rootPath, String handlerPath, String address) {
        HandlerLocation handlerLocation = new HandlerLocation()
                .setRootPath(rootPath)
                .setMethodPath(handlerPath);
        return new URL()
                .setServerType(serverType)
                .setAddress(Address.fromIpv4(address))
                .setLocation(handlerLocation);
    }    

reportLogs先通过easyBuildUrl构建URL,再通过transporter.tell(url, req)发送请求,rootPath为server,handlerPath为reportLog

tell

AkkaTransporter

tech/powerjob/remote/akka/AkkaTransporter.java

代码语言:javascript
复制
    public void tell(URL url, PowerSerializable request) {
        ActorSelection actorSelection = fetchActorSelection(url);
        actorSelection.tell(request, null);
    }

AkkaTransporter直接使用actorSelection发送请求

VertxTransporter

tech/powerjob/remote/http/vertx/VertxTransporter.java

代码语言:javascript
复制
    public void tell(URL url, PowerSerializable request) {
        post(url, request, null);
    }

    private <T> CompletionStage<T> post(URL url, PowerSerializable request, Class<T> clz) {
        final String host = url.getAddress().getHost();
        final int port = url.getAddress().getPort();
        final String path = url.getLocation().toPath();
        RequestOptions requestOptions = new RequestOptions()
                .setMethod(HttpMethod.POST)
                .setHost(host)
                .setPort(port)
                .setURI(path);
        // 获取远程服务器的HTTP连接
        Future<HttpClientRequest> httpClientRequestFuture = httpClient.request(requestOptions);
        // 转换 -> 发送请求获取响应
        Future<HttpClientResponse> responseFuture = httpClientRequestFuture.compose(httpClientRequest ->
            httpClientRequest
                .putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
                .send(JsonObject.mapFrom(request).toBuffer())
        );
        return responseFuture.compose(httpClientResponse -> {
            // throw exception
            final int statusCode = httpClientResponse.statusCode();
            if (statusCode != HttpResponseStatus.OK.code()) {
                // CompletableFuture.get() 时会传递抛出该异常
                throw new RemotingException(String.format("request [host:%s,port:%s,url:%s] failed, status: %d, msg: %s",
                       host, port, path, statusCode, httpClientResponse.statusMessage()
                        ));
            }

            return httpClientResponse.body().compose(x -> {

                if (clz == null) {
                    return Future.succeededFuture(null);
                }

                if (clz.equals(String.class)) {
                    return Future.succeededFuture((T) x.toString());
                }

                return Future.succeededFuture(x.toJsonObject().mapTo(clz));
            });
        }).toCompletionStage();
    }    

VertxTransporter则使用post方法通过vertx的httpClient发送请求

processWorkerLogReport

tech/powerjob/server/core/handler/AbWorkerRequestHandler.java

代码语言:javascript
复制
    @Handler(path = S4W_HANDLER_REPORT_LOG, processType = ProcessType.NO_BLOCKING)
    public void processWorkerLogReport(WorkerLogReportReq req) {

        WorkerLogReportEvent event = new WorkerLogReportEvent()
                .setWorkerAddress(req.getWorkerAddress())
                .setLogNum(req.getInstanceLogContents().size());
        try {
            processWorkerLogReport0(req, event);
            event.setStatus(WorkerLogReportEvent.Status.SUCCESS);
        } catch (RejectedExecutionException re) {
            event.setStatus(WorkerLogReportEvent.Status.REJECTED);
        } catch (Throwable t) {
            event.setStatus(WorkerLogReportEvent.Status.EXCEPTION);
            log.warn("[WorkerRequestHandler] process worker report failed!", t);
        } finally {
            monitorService.monitor(event);
        }
    }

processWorkerLogReport通过processWorkerLogReport0进行处理,最后通过monitorService.monitor(event)上报监控

processWorkerLogReport0

tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java

代码语言:javascript
复制
    @Override
    protected void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event) {
        // 这个效率应该不会拉垮吧...也就是一些判断 + Map#get 吧...
        instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());
    }

processWorkerLogReport0通过instanceLogService.submitLogs进行上报

submitLogs

tech/powerjob/server/core/instance/InstanceLogService.java

代码语言:javascript
复制
    /**
     * 提交日志记录,持久化到本地数据库中
     * @param workerAddress 上报机器地址
     * @param logs 任务实例运行时日志
     */
    @Async(value = PJThreadPool.LOCAL_DB_POOL)
    public void submitLogs(String workerAddress, List<InstanceLogContent> logs) {

        List<LocalInstanceLogDO> logList = logs.stream().map(x -> {
            instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis());

            LocalInstanceLogDO y = new LocalInstanceLogDO();
            BeanUtils.copyProperties(x, y);
            y.setWorkerAddress(workerAddress);
            return y;
        }).collect(Collectors.toList());

        try {
            CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.saveAll(logList));
        }catch (Exception e) {
            log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e);
        }
    }

InstanceLogService通过PJThreadPool.LOCAL_DB_POOL线程池进行异步,它通过localInstanceLogRepository.saveAll(logList)保存到本地数据库

monitor

tech/powerjob/server/monitor/PowerJobMonitorService.java

代码语言:javascript
复制
    public void monitor(Event event) {
        monitors.forEach(m -> m.record(event));
    }

monitor方法遍历monitors,挨个执行record

LogMonitor

tech/powerjob/server/monitor/monitors/LogMonitor.java

代码语言:javascript
复制
    public void record(Event event) {
        MDC.put(MDC_KEY_SERVER_ID, String.valueOf(serverInfo.getId()));
        LoggerFactory.getLogger(event.type()).info(event.message());
    }

LogMonitor的record方法通过日志打印event信息

小结

PowerJob的OmsLogHandler提供了submitLog方法,它先判断logQueue大小是否超过REPORT_SIZE(1024),超过则通过异步线程执行logSubmitter;接着将内容包装为InstanceLogContent,然后放入到logQueue;logSubmitter主要是执行reportLogs,它先通过easyBuildUrl构建URL,再通过transporter.tell(url, req)发送请求,rootPath为server,handlerPath为reportLog;服务端的processWorkerLogReport通过processWorkerLogReport0进行处理(通过localInstanceLogRepository.saveAll(logList)保存到本地数据库),最后通过monitorService.monitor(event)上报监控。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • OmsLogHandler
  • LogSubmitter
  • reportLogs
  • tell
    • AkkaTransporter
      • VertxTransporter
      • processWorkerLogReport
        • processWorkerLogReport0
          • submitLogs
            • monitor
              • LogMonitor
              • 小结
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
              http://www.vxiaotou.com