前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊debezium的Heartbeat

聊聊debezium的Heartbeat

原创
作者头像
code4it
修改2020-05-15 14:29:46
1.1K0
修改2020-05-15 14:29:46
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下debezium的Heartbeat

Heartbeat

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/Heartbeat.java

代码语言:javascript
复制
public interface Heartbeat {
?
    public static final String HEARTBEAT_INTERVAL_PROPERTY_NAME = "heartbeat.interval.ms";
?
    @FunctionalInterface
    public static interface OffsetProducer {
        Map<String, ?> offset();
    }
?
    void heartbeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException;
?
    void heartbeat(Map<String, ?> partition, OffsetProducer offsetProducer, BlockingConsumer<SourceRecord> consumer) throws InterruptedException;
?
    void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException;
?
    boolean isEnabled();
?
    //......
?
 }
  • Heartbeat定义了OffsetProducer接口,该接口定义了offset方法;它还定义了heartbeat、forcedBeat、isEnabled方法

HeartbeatImpl

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/HeartbeatImpl.java

代码语言:javascript
复制
class HeartbeatImpl implements Heartbeat {
?
    private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatImpl.class);
    private static final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
?
    /**
     * Default length of interval in which connector generates periodically
     * heartbeat messages. A size of 0 disables heartbeat.
     */
    static final int DEFAULT_HEARTBEAT_INTERVAL = 0;
?
    /**
     * Default prefix for names of heartbeat topics
     */
    static final String DEFAULT_HEARTBEAT_TOPICS_PREFIX = "__debezium-heartbeat";
?
    private static final String SERVER_NAME_KEY = "serverName";
?
    private static Schema KEY_SCHEMA = SchemaBuilder.struct()
            .name(schemaNameAdjuster.adjust("io.debezium.connector.common.ServerNameKey"))
            .field(SERVER_NAME_KEY, Schema.STRING_SCHEMA)
            .build();
    private static Schema VALUE_SCHEMA = SchemaBuilder.struct()
            .name(schemaNameAdjuster.adjust("io.debezium.connector.common.Heartbeat"))
            .field(AbstractSourceInfo.TIMESTAMP_KEY, Schema.INT64_SCHEMA)
            .build();
?
    private final String topicName;
    private final Duration heartbeatInterval;
    private final String key;
?
    private volatile Timer heartbeatTimeout;
?
    HeartbeatImpl(Configuration configuration, String topicName, String key) {
        this.topicName = topicName;
        this.key = key;
?
        heartbeatInterval = configuration.getDuration(HeartbeatImpl.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS);
        heartbeatTimeout = resetHeartbeat();
    }
?
    @Override
    public void heartbeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException {
        if (heartbeatTimeout.expired()) {
            forcedBeat(partition, offset, consumer);
            heartbeatTimeout = resetHeartbeat();
        }
    }
?
    @Override
    public void heartbeat(Map<String, ?> partition, OffsetProducer offsetProducer, BlockingConsumer<SourceRecord> consumer) throws InterruptedException {
        if (heartbeatTimeout.expired()) {
            forcedBeat(partition, offsetProducer.offset(), consumer);
            heartbeatTimeout = resetHeartbeat();
        }
    }
?
    @Override
    public void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer)
            throws InterruptedException {
        LOGGER.debug("Generating heartbeat event");
        if (offset == null || offset.isEmpty()) {
            // Do not send heartbeat message if no offset is available yet
            return;
        }
        consumer.accept(heartbeatRecord(partition, offset));
    }
?
    @Override
    public boolean isEnabled() {
        return true;
    }
?
    private SourceRecord heartbeatRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
        final Integer partition = 0;
?
        return new SourceRecord(sourcePartition, sourceOffset,
                topicName, partition, KEY_SCHEMA, serverNameKey(key), VALUE_SCHEMA, messageValue());
    }
?
    private Timer resetHeartbeat() {
        return Threads.timer(Clock.SYSTEM, heartbeatInterval);
    }
?
    //......
?
}
  • HeartbeatImpl实现了Heartbeat接口,其heartbeat方法在heartbeatTimeout.expired()时执行forcedBeat,然后执行resetHeartbeat;其forcedBeat方法执行consumer.accept(heartbeatRecord(partition, offset));其isEnabled返回true;heartbeatRecord方法创建SourceRecord并返回;resetHeartbeat方法返回Threads.timer(Clock.SYSTEM, heartbeatInterval)

DatabaseHeartbeatImpl

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/DatabaseHeartbeatImpl.java

代码语言:javascript
复制
public class DatabaseHeartbeatImpl extends HeartbeatImpl {
    private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseHeartbeatImpl.class);
?
    public static final String HEARTBEAT_ACTION_QUERY_PROPERTY_NAME = "heartbeat.action.query";
?
    public static final Field HEARTBEAT_ACTION_QUERY = Field.create(HEARTBEAT_ACTION_QUERY_PROPERTY_NAME)
            .withDisplayName("The query to execute with every heartbeat")
            .withType(ConfigDef.Type.STRING)
            .withWidth(ConfigDef.Width.MEDIUM)
            .withImportance(ConfigDef.Importance.LOW)
            .withDescription("The query executed with every heartbeat. Defaults to an empty string.");
?
    private final String heartBeatActionQuery;
    private final JdbcConnection jdbcConnection;
?
    DatabaseHeartbeatImpl(Configuration configuration, String topicName, String key, JdbcConnection jdbcConnection, String heartBeatActionQuery) {
        super(configuration, topicName, key);
?
        this.heartBeatActionQuery = heartBeatActionQuery;
        this.jdbcConnection = jdbcConnection;
    }
?
    @Override
    public void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException {
        try {
            jdbcConnection.execute(heartBeatActionQuery);
        }
        catch (Exception e) {
            LOGGER.error("Could not execute heartbeat action", e);
        }
        LOGGER.debug("Executed heartbeat action query");
?
        super.forcedBeat(partition, offset, consumer);
    }
}
  • DatabaseHeartbeatImpl继承了HeartbeatImpl,其forcedBeat方法执行jdbcConnection.execute(heartBeatActionQuery),然后再执行super.forcedBeat(partition, offset, consumer)

Heartbeat.create

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/Heartbeat.java

代码语言:javascript
复制
public interface Heartbeat {
?
    //......
?
    public static Heartbeat create(Configuration configuration, String topicName, String key) {
        return configuration.getDuration(HeartbeatImpl.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS).isZero() ? NULL : new HeartbeatImpl(configuration, topicName, key);
    }
?
    public static Heartbeat create(Configuration configuration, String topicName, String key, JdbcConnection jdbcConnection) {
        if (configuration.getDuration(HeartbeatImpl.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS).isZero()) {
            return NULL;
        }
?
        String heartBeatActionQuery = configuration.getString(DatabaseHeartbeatImpl.HEARTBEAT_ACTION_QUERY);
?
        if (heartBeatActionQuery != null) {
            return new DatabaseHeartbeatImpl(configuration, topicName, key, jdbcConnection, heartBeatActionQuery);
        }
?
        return new HeartbeatImpl(configuration, topicName, key);
    }
?
    //......
?
}
  • Heartbeat提供了两个create静态方法,一个用于创建HeartbeatImpl,另外一个在heartBeatActionQuery不为hull时创建DatabaseHeartbeatImpl

小结

HeartbeatImpl实现了Heartbeat接口,其heartbeat方法在heartbeatTimeout.expired()时执行forcedBeat,然后执行resetHeartbeat;其forcedBeat方法执行consumer.accept(heartbeatRecord(partition, offset));其isEnabled返回true;heartbeatRecord方法创建SourceRecord并返回;resetHeartbeat方法返回Threads.timer(Clock.SYSTEM, heartbeatInterval);DatabaseHeartbeatImpl继承了HeartbeatImpl,其forcedBeat方法执行jdbcConnection.execute(heartBeatActionQuery),然后再执行super.forcedBeat(partition, offset, consumer)

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Heartbeat
  • HeartbeatImpl
  • DatabaseHeartbeatImpl
  • Heartbeat.create
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com