前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊claudb的MasterReplication

聊聊claudb的MasterReplication

原创
作者头像
code4it
修改2020-08-24 10:03:44
1340
修改2020-08-24 10:03:44
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下claudb的MasterReplication

MasterReplication

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/replication/MasterReplication.java

代码语言:javascript
复制
public class MasterReplication implements Runnable {
?
  private static final Logger LOGGER = LoggerFactory.getLogger(MasterReplication.class);
?
  private static final String SELECT_COMMAND = "SELECT";
  private static final String PING_COMMAND = "PING";
  private static final int TASK_DELAY = 2;
?
  private final DBServerContext server;
  private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
?
  public MasterReplication(DBServerContext server) {
    this.server = server;
  }
?
  public void start() {
    executor.scheduleWithFixedDelay(this, TASK_DELAY, TASK_DELAY, TimeUnit.SECONDS);
  }
?
  public void stop() {
    executor.shutdown();
  }
?
  public void addSlave(String id) {
    getServerState().addSlave(id);
    LOGGER.info("new slave: {}", id);
  }
?
  public void removeSlave(String id) {
    getServerState().removeSlave(id);
    LOGGER.info("slave revomed: {}", id);
  }
?
  @Override
  public void run() {
    List<RedisToken> commands = createCommands();
?
    for (SafeString slave : getServerState().getSlaves()) {
      for (RedisToken command : commands) {
        server.publish(slave.toString(), command);
      }
    }
  }
?
  private List<RedisToken> createCommands() {
    List<RedisToken> commands = new LinkedList<>();
    commands.add(pingCommand());
    commands.addAll(commandsToReplicate());
    return commands;
  }
?
  private List<RedisToken> commandsToReplicate() {
    List<RedisToken> commands = new LinkedList<>();
?
    for (RedisToken command : server.getCommandsToReplicate()) {
      command.accept(new AbstractRedisTokenVisitor<Void>() {
        @Override
        public Void array(ArrayRedisToken token) {
          commands.add(selectCommand(token));
          commands.add(command(token));
          return null;
        }
      });
    }
    return commands;
  }
?
  private RedisToken selectCommand(ArrayRedisToken token) {
    return array(string(SELECT_COMMAND),
        token.getValue().stream().findFirst().orElse(string("0")));
  }
?
  private RedisToken pingCommand() {
    return array(string(PING_COMMAND));
  }
?
  private RedisToken command(ArrayRedisToken token) {
    return array(token.getValue().stream().skip(1).collect(toList()));
  }
?
  private DBServerState getServerState() {
    return serverState().getOrElseThrow(() -> new IllegalStateException("missing server state"));
  }
?
  private Option<DBServerState> serverState() {
    return server.getValue("state");
  }
}
  • MasterReplication实现了Runnable接口,其start方法调度执行自身的runnable,每隔2秒执行一次;其run方法先执行createCommands方法,然后遍历slaves,然后遍历commands,执行server.publish(slave.toString(), command);createCommands先添加ping命令,然后再添加commandsToReplicate;commandsToReplicate方法遍历server.getCommandsToReplicate(),遇到array方法时先添加select命令,再添加command命令,最后返回commands

getCommandsToReplicate

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/ClauDB.java

代码语言:javascript
复制
public class ClauDB extends RespServerContext implements DBServerContext {
?
    //......
?
  @Override
  public ImmutableList<RedisToken> getCommandsToReplicate() {
    return executeOn(Observable.<ImmutableList<RedisToken>>create(observable -> {
      observable.onNext(getState().getCommandsToReplicate());
      observable.onComplete();
    })).blockingFirst();
  }
?
    //......
?
}
  • getCommandsToReplicate方法执行的是getState().getCommandsToReplicate()

getCommandsToReplicate

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/DBServerState.java

代码语言:javascript
复制
public class DBServerState {
?
  private static final int RDB_VERSION = 6;
?
  private static final SafeString SLAVES = safeString("slaves");
  private static final DatabaseKey SLAVES_KEY = safeKey("slaves");
  private static final DatabaseKey SCRIPTS_KEY = safeKey("scripts");
?
  private boolean master = true;
?
  private final List<Database> databases = new ArrayList<>();
  private final Database admin;
  private final DatabaseFactory factory;
?
  private final Queue<RedisToken> queue = new LinkedList<>();
?
  public void append(RedisToken command) {
    queue.offer(command);
  }
?
    //......
?
  public ImmutableList<RedisToken> getCommandsToReplicate() {
    ImmutableList<RedisToken> list = ImmutableList.from(queue);
    queue.clear();
    return list;
  }
?
    //......
?
}
  • getCommandsToReplicate方法会根据queue创建ImmutableList,然后清空queue;而append方法会添加command到queue

executeCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/ClauDB.java

代码语言:javascript
复制
public class ClauDB extends RespServerContext implements DBServerContext {
?
    //......
?
  protected RedisToken executeCommand(RespCommand command, Request request) {
    if (!isReadOnly(request.getCommand())) {
      try {
        RedisToken response = command.execute(request);
        replication(request);
        notification(request);
        return response;
      } catch (RuntimeException e) {
        LOGGER.error("error executing command: " + request, e);
        return error("error executing command: " + request);
      }
    } else {
      return error("READONLY You can't write against a read only slave");
    }
  }
?
  private void replication(Request request) {
    if (!isReadOnlyCommand(request.getCommand())) {
      RedisToken array = requestToArray(request);
      if (hasSlaves()) {
        getState().append(array);
      }
      persistence.ifPresent(manager -> manager.append(array));
    }
  }
?
  @Override
  public void publish(String sourceKey, RedisToken message) {
    Session session = getSession(sourceKey);
    if (session != null) {
      session.publish(message);
    }
  }
?
    //......
?
}
  • executeCommand方法除了执行command.execute,还会执行replication方法,它会在有slaves的条件将非readOnlyCommand追加到state;publish方法执行的是session.publish(message)传输给slave

小结

MasterReplication实现了Runnable接口,其start方法调度执行自身的runnable,每隔2秒执行一次;其run方法先执行createCommands方法,然后遍历slaves,然后遍历commands,执行server.publish(slave.toString(), command);createCommands先添加ping命令,然后再添加commandsToReplicate;commandsToReplicate方法遍历server.getCommandsToReplicate(),遇到array方法时先添加select命令,再添加command命令,最后返回commands

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • MasterReplication
  • getCommandsToReplicate
  • getCommandsToReplicate
  • executeCommand
  • 小结
  • doc
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com