本文主要研究一下debezium的Heartbeat
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/Heartbeat.java
public interface Heartbeat {
?
public static final String HEARTBEAT_INTERVAL_PROPERTY_NAME = "heartbeat.interval.ms";
?
@FunctionalInterface
public static interface OffsetProducer {
Map<String, ?> offset();
}
?
void heartbeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException;
?
void heartbeat(Map<String, ?> partition, OffsetProducer offsetProducer, BlockingConsumer<SourceRecord> consumer) throws InterruptedException;
?
void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException;
?
boolean isEnabled();
?
//......
?
}
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/HeartbeatImpl.java
class HeartbeatImpl implements Heartbeat {
?
private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatImpl.class);
private static final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
?
/**
* Default length of interval in which connector generates periodically
* heartbeat messages. A size of 0 disables heartbeat.
*/
static final int DEFAULT_HEARTBEAT_INTERVAL = 0;
?
/**
* Default prefix for names of heartbeat topics
*/
static final String DEFAULT_HEARTBEAT_TOPICS_PREFIX = "__debezium-heartbeat";
?
private static final String SERVER_NAME_KEY = "serverName";
?
private static Schema KEY_SCHEMA = SchemaBuilder.struct()
.name(schemaNameAdjuster.adjust("io.debezium.connector.common.ServerNameKey"))
.field(SERVER_NAME_KEY, Schema.STRING_SCHEMA)
.build();
private static Schema VALUE_SCHEMA = SchemaBuilder.struct()
.name(schemaNameAdjuster.adjust("io.debezium.connector.common.Heartbeat"))
.field(AbstractSourceInfo.TIMESTAMP_KEY, Schema.INT64_SCHEMA)
.build();
?
private final String topicName;
private final Duration heartbeatInterval;
private final String key;
?
private volatile Timer heartbeatTimeout;
?
HeartbeatImpl(Configuration configuration, String topicName, String key) {
this.topicName = topicName;
this.key = key;
?
heartbeatInterval = configuration.getDuration(HeartbeatImpl.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS);
heartbeatTimeout = resetHeartbeat();
}
?
@Override
public void heartbeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException {
if (heartbeatTimeout.expired()) {
forcedBeat(partition, offset, consumer);
heartbeatTimeout = resetHeartbeat();
}
}
?
@Override
public void heartbeat(Map<String, ?> partition, OffsetProducer offsetProducer, BlockingConsumer<SourceRecord> consumer) throws InterruptedException {
if (heartbeatTimeout.expired()) {
forcedBeat(partition, offsetProducer.offset(), consumer);
heartbeatTimeout = resetHeartbeat();
}
}
?
@Override
public void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer)
throws InterruptedException {
LOGGER.debug("Generating heartbeat event");
if (offset == null || offset.isEmpty()) {
// Do not send heartbeat message if no offset is available yet
return;
}
consumer.accept(heartbeatRecord(partition, offset));
}
?
@Override
public boolean isEnabled() {
return true;
}
?
private SourceRecord heartbeatRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
final Integer partition = 0;
?
return new SourceRecord(sourcePartition, sourceOffset,
topicName, partition, KEY_SCHEMA, serverNameKey(key), VALUE_SCHEMA, messageValue());
}
?
private Timer resetHeartbeat() {
return Threads.timer(Clock.SYSTEM, heartbeatInterval);
}
?
//......
?
}
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/DatabaseHeartbeatImpl.java
public class DatabaseHeartbeatImpl extends HeartbeatImpl {
private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseHeartbeatImpl.class);
?
public static final String HEARTBEAT_ACTION_QUERY_PROPERTY_NAME = "heartbeat.action.query";
?
public static final Field HEARTBEAT_ACTION_QUERY = Field.create(HEARTBEAT_ACTION_QUERY_PROPERTY_NAME)
.withDisplayName("The query to execute with every heartbeat")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withDescription("The query executed with every heartbeat. Defaults to an empty string.");
?
private final String heartBeatActionQuery;
private final JdbcConnection jdbcConnection;
?
DatabaseHeartbeatImpl(Configuration configuration, String topicName, String key, JdbcConnection jdbcConnection, String heartBeatActionQuery) {
super(configuration, topicName, key);
?
this.heartBeatActionQuery = heartBeatActionQuery;
this.jdbcConnection = jdbcConnection;
}
?
@Override
public void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException {
try {
jdbcConnection.execute(heartBeatActionQuery);
}
catch (Exception e) {
LOGGER.error("Could not execute heartbeat action", e);
}
LOGGER.debug("Executed heartbeat action query");
?
super.forcedBeat(partition, offset, consumer);
}
}
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/Heartbeat.java
public interface Heartbeat {
?
//......
?
public static Heartbeat create(Configuration configuration, String topicName, String key) {
return configuration.getDuration(HeartbeatImpl.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS).isZero() ? NULL : new HeartbeatImpl(configuration, topicName, key);
}
?
public static Heartbeat create(Configuration configuration, String topicName, String key, JdbcConnection jdbcConnection) {
if (configuration.getDuration(HeartbeatImpl.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS).isZero()) {
return NULL;
}
?
String heartBeatActionQuery = configuration.getString(DatabaseHeartbeatImpl.HEARTBEAT_ACTION_QUERY);
?
if (heartBeatActionQuery != null) {
return new DatabaseHeartbeatImpl(configuration, topicName, key, jdbcConnection, heartBeatActionQuery);
}
?
return new HeartbeatImpl(configuration, topicName, key);
}
?
//......
?
}
HeartbeatImpl实现了Heartbeat接口,其heartbeat方法在heartbeatTimeout.expired()时执行forcedBeat,然后执行resetHeartbeat;其forcedBeat方法执行consumer.accept(heartbeatRecord(partition, offset));其isEnabled返回true;heartbeatRecord方法创建SourceRecord并返回;resetHeartbeat方法返回Threads.timer(Clock.SYSTEM, heartbeatInterval);DatabaseHeartbeatImpl继承了HeartbeatImpl,其forcedBeat方法执行jdbcConnection.execute(heartBeatActionQuery),然后再执行super.forcedBeat(partition, offset, consumer)
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。