前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊elasticsearch的RoutingService

聊聊elasticsearch的RoutingService

原创
作者头像
code4it
修改2019-05-15 09:51:47
5050
修改2019-05-15 09:51:47
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下elasticsearch的RoutingService

RoutingService

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java

代码语言:javascript
复制
public class RoutingService extends AbstractLifecycleComponent {
    private static final Logger logger = LogManager.getLogger(RoutingService.class);
?
    private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute";
?
    private final ClusterService clusterService;
    private final AllocationService allocationService;
?
    private AtomicBoolean rerouting = new AtomicBoolean();
?
    @Inject
    public RoutingService(ClusterService clusterService, AllocationService allocationService) {
        this.clusterService = clusterService;
        this.allocationService = allocationService;
    }
?
    @Override
    protected void doStart() {
    }
?
    @Override
    protected void doStop() {
    }
?
    @Override
    protected void doClose() {
    }
?
    /**
     * Initiates a reroute.
     */
    public final void reroute(String reason) {
        try {
            if (lifecycle.stopped()) {
                return;
            }
            if (rerouting.compareAndSet(false, true) == false) {
                logger.trace("already has pending reroute, ignoring {}", reason);
                return;
            }
            logger.trace("rerouting {}", reason);
            clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")",
                new ClusterStateUpdateTask(Priority.HIGH) {
                    @Override
                    public ClusterState execute(ClusterState currentState) {
                        rerouting.set(false);
                        return allocationService.reroute(currentState, reason);
                    }
?
                    @Override
                    public void onNoLongerMaster(String source) {
                        rerouting.set(false);
                        // no biggie
                    }
?
                    @Override
                    public void onFailure(String source, Exception e) {
                        rerouting.set(false);
                        ClusterState state = clusterService.state();
                        if (logger.isTraceEnabled()) {
                            logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state:\n{}",
                                source, state), e);
                        } else {
                            logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state version [{}]",
                                source, state.version()), e);
                        }
                    }
                });
        } catch (Exception e) {
            rerouting.set(false);
            ClusterState state = clusterService.state();
            logger.warn(() -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e);
        }
    }
}
  • RoutingService的构造器要求输入clusterService及allocationService;其reroute方法主要是向clusterService提交ClusterStateUpdateTask,其execute方法是委托给allocationService.reroute

AllocationService.reroute

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

代码语言:javascript
复制
public class AllocationService {
    //......
?
    /**
     * Reroutes the routing table based on the live nodes.
     * <p>
     * If the same instance of ClusterState is returned, then no change has been made.
     */
    public ClusterState reroute(ClusterState clusterState, String reason) {
        return reroute(clusterState, reason, false);
    }
?
    /**
     * Reroutes the routing table based on the live nodes.
     * <p>
     * If the same instance of ClusterState is returned, then no change has been made.
     */
    protected ClusterState reroute(ClusterState clusterState, String reason, boolean debug) {
        ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);
?
        RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState);
        // shuffle the unassigned nodes, just so we won't have things like poison failed shards
        routingNodes.unassigned().shuffle();
        RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState,
            clusterInfoService.getClusterInfo(), currentNanoTime());
        allocation.debugDecision(debug);
        reroute(allocation);
        if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {
            return clusterState;
        }
        return buildResultAndLogHealthChange(clusterState, allocation, reason);
    }
?
    private void reroute(RoutingAllocation allocation) {
        assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";
        assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() :
            "auto-expand replicas out of sync with number of nodes in the cluster";
?
        // now allocate all the unassigned to available nodes
        if (allocation.routingNodes().unassigned().size() > 0) {
            removeDelayMarkers(allocation);
            gatewayAllocator.allocateUnassigned(allocation);
        }
?
        shardsAllocator.allocate(allocation);
        assert RoutingNodes.assertShardStats(allocation.routingNodes());
    }
?
    //......
}
  • AllocationService的reroute方法主要是构建RoutingAllocation,然后在进行gatewayAllocator.allocateUnassigned及shardsAllocator.allocate(allocation)

BalancedShardsAllocator.allocate

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

代码语言:javascript
复制
public class BalancedShardsAllocator implements ShardsAllocator {
    //......
?
    public void allocate(RoutingAllocation allocation) {
        if (allocation.routingNodes().size() == 0) {
            /* with no nodes this is pointless */
            return;
        }
        final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
        balancer.allocateUnassigned();
        balancer.moveShards();
        balancer.balance();
    }
?
    //......
}
  • BalancedShardsAllocator的allocate方法,则创建Balancer,然后执行Balancer的allocateUnassigned()、moveShards()、balance()方法

小结

  • RoutingService的构造器要求输入clusterService及allocationService;其reroute方法主要是向clusterService提交ClusterStateUpdateTask,其execute方法是委托给allocationService.reroute
  • AllocationService的reroute方法主要是构建RoutingAllocation,然后在进行gatewayAllocator.allocateUnassigned及shardsAllocator.allocate(allocation)
  • BalancedShardsAllocator的allocate方法,则创建Balancer,然后执行Balancer的allocateUnassigned()、moveShards()、balance()方法

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RoutingService
  • AllocationService.reroute
  • BalancedShardsAllocator.allocate
  • 小结
  • doc
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com