前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊BinaryLogClient的EventListener

聊聊BinaryLogClient的EventListener

原创
作者头像
code4it
修改2020-04-28 10:06:39
1.2K0
修改2020-04-28 10:06:39
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下BinaryLogClient的EventListener

EventListener

mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

代码语言:javascript
复制
    public interface EventListener {
?
        void onEvent(Event event);
    }
  • EventListener接口定义了onEvent方法

BinaryLogClientStatistics

mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatistics.java

代码语言:javascript
复制
public class BinaryLogClientStatistics implements BinaryLogClientStatisticsMXBean,
        BinaryLogClient.EventListener, BinaryLogClient.LifecycleListener {
?
    private AtomicReference<EventHeader> lastEventHeader = new AtomicReference<EventHeader>();
    private AtomicLong timestampOfLastEvent = new AtomicLong();
    private AtomicLong totalNumberOfEventsSeen = new AtomicLong();
    private AtomicLong totalBytesReceived = new AtomicLong();
    private AtomicLong numberOfSkippedEvents = new AtomicLong();
    private AtomicLong numberOfDisconnects = new AtomicLong();
?
    public BinaryLogClientStatistics() {
    }
?
    public BinaryLogClientStatistics(BinaryLogClient binaryLogClient) {
        binaryLogClient.registerEventListener(this);
        binaryLogClient.registerLifecycleListener(this);
    }
?
    @Override
    public String getLastEvent() {
        EventHeader eventHeader = lastEventHeader.get();
        return eventHeader == null ? null : eventHeader.getEventType() + "/" + eventHeader.getTimestamp() +
                " from server " + eventHeader.getServerId();
    }
?
    @Override
    public long getSecondsSinceLastEvent() {
        long timestamp = timestampOfLastEvent.get();
        return timestamp == 0 ? 0 : (getCurrentTimeMillis() - timestamp) / 1000;
    }
?
    @Override
    public long getSecondsBehindMaster() {
        // because lastEventHeader and timestampOfLastEvent are not guarded by the common lock
        // we may get some "distorted" results, though shouldn't be a problem given the nature of the final value
        long timestamp = timestampOfLastEvent.get();
        EventHeader eventHeader = lastEventHeader.get();
        if (timestamp == 0 || eventHeader == null) {
            return -1;
        }
        return (timestamp - eventHeader.getTimestamp()) / 1000;
    }
?
    @Override
    public long getTotalNumberOfEventsSeen() {
        return totalNumberOfEventsSeen.get();
    }
?
    @Override
    public long getTotalBytesReceived() {
        return totalBytesReceived.get();
    }
?
    @Override
    public long getNumberOfSkippedEvents() {
        return numberOfSkippedEvents.get();
    }
?
    @Override
    public long getNumberOfDisconnects() {
        return numberOfDisconnects.get();
    }
?
    @Override
    public void reset() {
        lastEventHeader.set(null);
        timestampOfLastEvent.set(0);
        totalNumberOfEventsSeen.set(0);
        totalBytesReceived.set(0);
        numberOfSkippedEvents.set(0);
        numberOfDisconnects.set(0);
    }
?
    @Override
    public void onEvent(Event event) {
        EventHeader header = event.getHeader();
        lastEventHeader.set(header);
        timestampOfLastEvent.set(getCurrentTimeMillis());
        totalNumberOfEventsSeen.getAndIncrement();
        totalBytesReceived.getAndAdd(header.getHeaderLength() + header.getDataLength());
    }
?
    @Override
    public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
        numberOfSkippedEvents.getAndIncrement();
        lastEventHeader.set(null);
        timestampOfLastEvent.set(getCurrentTimeMillis());
        totalNumberOfEventsSeen.getAndIncrement();
    }
?
    @Override
    public void onDisconnect(BinaryLogClient client) {
        numberOfDisconnects.getAndIncrement();
    }
?
    @Override
    public void onConnect(BinaryLogClient client) {
    }
?
    @Override
    public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
    }
?
    protected long getCurrentTimeMillis() {
        return System.currentTimeMillis();
    }
?
}
  • BinaryLogClientStatistics实现了BinaryLogClient.EventListener接口,其onEvent方法会更新lastEventHeader、timestampOfLastEvent、totalNumberOfEventsSeen、totalBytesReceived

listenForEventPackets

mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

代码语言:javascript
复制
public class BinaryLogClient implements BinaryLogClientMXBean {
    
    //......
?
    private void listenForEventPackets() throws IOException {
        ByteArrayInputStream inputStream = channel.getInputStream();
        boolean completeShutdown = false;
        try {
            while (inputStream.peek() != -1) {
                int packetLength = inputStream.readInteger(3);
                inputStream.skip(1); // 1 byte for sequence
                int marker = inputStream.read();
                if (marker == 0xFF) {
                    ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1));
                    throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),
                        errorPacket.getSqlState());
                }
                if (marker == 0xFE && !blocking) {
                    completeShutdown = true;
                    break;
                }
                Event event;
                try {
                    event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ?
                        new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) :
                        inputStream);
                    if (event == null) {
                        throw new EOFException();
                    }
                } catch (Exception e) {
                    Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e;
                    if (cause instanceof EOFException || cause instanceof SocketException) {
                        throw e;
                    }
                    if (isConnected()) {
                        for (LifecycleListener lifecycleListener : lifecycleListeners) {
                            lifecycleListener.onEventDeserializationFailure(this, e);
                        }
                    }
                    continue;
                }
                if (isConnected()) {
                    eventLastSeen = System.currentTimeMillis();
                    updateGtidSet(event);
                    notifyEventListeners(event);
                    updateClientBinlogFilenameAndPosition(event);
                }
            }
        } catch (Exception e) {
            if (isConnected()) {
                for (LifecycleListener lifecycleListener : lifecycleListeners) {
                    lifecycleListener.onCommunicationFailure(this, e);
                }
            }
        } finally {
            if (isConnected()) {
                if (completeShutdown) {
                    disconnect(); // initiate complete shutdown sequence (which includes keep alive thread)
                } else {
                    disconnectChannel();
                }
            }
        }
    }
?
    private void notifyEventListeners(Event event) {
        if (event.getData() instanceof EventDataWrapper) {
            event = new Event(event.getHeader(), ((EventDataWrapper) event.getData()).getExternal());
        }
        for (EventListener eventListener : eventListeners) {
            try {
                eventListener.onEvent(event);
            } catch (Exception e) {
                if (logger.isLoggable(Level.WARNING)) {
                    logger.log(Level.WARNING, eventListener + " choked on " + event, e);
                }
            }
        }
    }
?
    //......
?
}
  • listenForEventPackets方法会读取channel.getInputStream(),然后通过eventDeserializer.nextEvent解析为event,之后调用updateGtidSet、notifyEventListeners、updateClientBinlogFilenameAndPosition方法;notifyEventListeners方法会遍历eventListeners挨个执行其onEvent方法

小结

EventListener接口定义了onEvent方法;BinaryLogClientStatistics实现了BinaryLogClient.EventListener接口,其onEvent方法会更新lastEventHeader、timestampOfLastEvent、totalNumberOfEventsSeen、totalBytesReceived

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • EventListener
  • BinaryLogClientStatistics
  • listenForEventPackets
  • 小结
  • doc
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com