前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊kingbus的starRaft

聊聊kingbus的starRaft

原创
作者头像
code4it
修改2020-06-15 10:46:30
4180
修改2020-06-15 10:46:30
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下kingbus的starRaft

starRaft

kingbus/server/server.go

代码语言:javascript
复制
func (s *KingbusServer) starRaft(cfg config.RaftNodeConfig) error {
    var (
        etcdRaftNode etcdraft.Node
        id           types.ID
        cl           *membership.RaftCluster
        remotes      []*membership.Member
        appliedIndex uint64
    )
?
    prt, err := rafthttp.NewRoundTripper(transport.TLSInfo{}, DialTimeout)
    if err != nil {
        return err
    }
?
    store, err := storage.NewDiskStorage(cfg.DataDir, cfg.ReserveDataSize)
    if err != nil {
        log.Log.Fatalf("NewKingbusServer:NewDiskStorage error,err:%s,dir:%s", err.Error(), cfg.DataDir)
    }
?
    //store, err := storage.NewMemoryStorage(cfg.DataDir)
    //if err != nil {
    //  log.Log.Fatalf("NewKingbusServer:NewMemoryStorage error,err:%s,dir:%s", err.Error(), cfg.DataDir)
    //}
?
    defer func() {
        //close storage when occur error
        if err != nil {
            store.Close()
        }
    }()
?
    logExist := utils.ExistLog(cfg.DataDir)
    switch {
    case !logExist && !cfg.NewCluster:
        if err = cfg.VerifyJoinExisting(); err != nil {
            return err
        }
        cl, err = membership.NewClusterFromURLsMap(cfg.InitialPeerURLsMap)
        if err != nil {
            return err
        }
        remotePeerURLs := membership.GetRemotePeerURLs(cl, cfg.Name)
        existingCluster, gerr := membership.GetClusterFromRemotePeers(remotePeerURLs, prt)
        if gerr != nil {
            return fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)
        }
        if err = membership.ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {
            return fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
        }
?
        remotes = existingCluster.Members()
        cl.SetID(existingCluster.GetID())
        cl.SetStore(store)
        id, etcdRaftNode = startEtcdRaftNode(cfg, store, cl, nil)
    case !logExist && cfg.NewCluster:
        if err = cfg.VerifyBootstrap(); err != nil {
            return err
        }
        cl, err = membership.NewClusterFromURLsMap(cfg.InitialPeerURLsMap)
        if err != nil {
            return err
        }
        m := cl.MemberByName(cfg.Name)
        if membership.IsMemberBootstrapped(cl, cfg.Name, prt, DialTimeout) {
            return fmt.Errorf("member %s has already been bootstrapped", m.ID)
        }
?
        cl.SetStore(store)
        id, etcdRaftNode = startEtcdRaftNode(cfg, store, cl, cl.MemberIDs())
    case logExist:
        if err = utils.IsDirWriteable(cfg.DataDir); err != nil {
            return fmt.Errorf("cannot write to member directory: %v", err)
        }
        //node restart, read states from storage
        //get applied index
        appliedIndex = raft.MustGetAppliedIndex(store)
        cfg.AppliedIndex = appliedIndex
        id, etcdRaftNode, cl = restartEtcdNode(cfg, store)
        cl.SetStore(store)
    default:
        return fmt.Errorf("unsupported bootstrap config")
    }
?
    s.raftNode = raft.NewNode(
        raft.NodeConfig{
            IsIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
            Node:        etcdRaftNode,
            Heartbeat:   cfg.HeartbeatMs,
            Storage:     store,
        },
    )
    //committedIndex,term will update by fsm(UpdateCommittedIndex,SetTerm)
    //set appliedIndex when applyEntries will check the entry continuity
    s.raftNode.SetAppliedIndex(appliedIndex)
?
    s.id = id
    s.wait = wait.New()
    s.reqIDGen = idutil.NewGenerator(uint16(id), time.Now())
    s.stopping = make(chan struct{})
    s.errorc = make(chan error)
    s.applyBroadcast = utils.NewBroadcast()
    s.stats = stats.NewServerStats(cfg.Name, id.String())
    s.lstats = stats.NewLeaderStats(id.String())
    s.store = store
?
    tr := &rafthttp.Transport{
        TLSInfo:     transport.TLSInfo{},
        DialTimeout: DialTimeout,
        ID:          id,
        URLs:        cfg.PeerURLs,
        ClusterID:   cl.GetID(),
        Raft:        s,
        ServerStats: s.stats,
        LeaderStats: s.lstats,
        ErrorC:      s.errorc,
    }
    if err = tr.Start(); err != nil {
        return err
    }
    // add all remotes into transport
    //Add remotes to rafthttp, who help newly joined members catch up the
    //progress of the cluster. It supports basic message sending to remote, and
    //has no stream connection for simplicity. remotes will not be used
    //after the latest peers have been added into rafthttp.
    for _, m := range remotes {
        if m.ID != id {
            tr.AddRemote(m.ID, m.PeerURLs)
        }
    }
    for _, m := range cl.Members() {
        if m.ID != id {
            tr.AddPeer(m.ID, m.PeerURLs)
        }
    }
    s.raftNode.Transport = tr
    s.cluster = cl
?
    return nil
}
  • starRaft方法先通过rafthttp.NewRoundTripper创建http.RoundTripper,之后通过storage.NewDiskStorage创建DiskStorage,之后根据logExist及cfg.NewCluster做不同处理;若二者都为false则更新membership.RaftCluster的id为存在的cluster的id,然后执行startEtcdRaftNode;若cfg.NewCluster为true则使用cl.MemberIDs()来执行startEtcdRaftNode;若logExist为true则执行restartEtcdNode;最后创建rafthttp.Transport,执行tr.Start()、tr.AddRemote、tr.AddPeer

startEtcdRaftNode

kingbus/server/server.go

代码语言:javascript
复制
func startEtcdRaftNode(cfg config.RaftNodeConfig, store storage.Storage, cl *membership.RaftCluster, ids []types.ID) (
    id types.ID, n etcdraft.Node) {
    member := cl.MemberByName(cfg.Name)
    peers := make([]etcdraft.Peer, len(ids))
?
    for i, id := range ids {
        ctx, err := json.Marshal((*cl).Member(id))
        if err != nil {
            log.Log.Panicf("marshal member should never fail: %v", err)
        }
        peers[i] = etcdraft.Peer{ID: uint64(id), Context: ctx}
    }
    id = member.ID
    log.Log.Infof("starting member %s in cluster %s", id, cl.GetID())
?
    c := &etcdraft.Config{
        ID:                        uint64(id),
        ElectionTick:              int(cfg.ElectionTimeoutMs / cfg.HeartbeatMs),
        HeartbeatTick:             1,
        Storage:                   store,
        MaxSizePerMsg:             cfg.MaxRequestBytes,
        MaxInflightMsgs:           maxInflightMsgs,
        CheckQuorum:               true,
        PreVote:                   cfg.PreVote,
        DisableProposalForwarding: true,
        Logger:                    log.Log,
    }
?
    n = etcdraft.StartNode(c, peers)
    raft.AdvanceTicks(n, c.ElectionTick)
    return id, n
}
  • startEtcdRaftNode方法通过指定的ids创建peers,之后执行etcdraft.StartNode及raft.AdvanceTicks

restartEtcdNode

kingbus/server/server.go

代码语言:javascript
复制
func restartEtcdNode(cfg config.RaftNodeConfig, store storage.Storage) (
    types.ID, etcdraft.Node, *membership.RaftCluster) {
    cl, err := membership.GetRaftClusterFromStorage(store)
    if err != nil {
        if err != nil {
            log.Log.Panic("GetRaftClusterFromStorage error:%s", err.Error())
        }
    }
?
    log.Log.Debugf("restartEtcdNode:get raft cluster from storage,cluster:%v", cl.String())
?
    //get id from raftCluster
    member := cl.MemberByName(cfg.Name)
    if member == nil {
        log.Log.Fatalf("restartEtcdNode:member not in raft cluster,cluster:%v,memberName:%s",
            cl.String(), cfg.Name)
    }
    c := &etcdraft.Config{
        ID:                        uint64(member.ID),
        ElectionTick:              int(cfg.ElectionTimeoutMs / cfg.HeartbeatMs),
        HeartbeatTick:             1,
        Applied:                   cfg.AppliedIndex, //set appliedIndex
        Storage:                   store,
        MaxSizePerMsg:             cfg.MaxRequestBytes,
        MaxInflightMsgs:           maxInflightMsgs,
        CheckQuorum:               true,
        PreVote:                   cfg.PreVote,
        DisableProposalForwarding: true,
        Logger:                    log.Log,
    }
?
    n := etcdraft.RestartNode(c)
    return member.ID, n, cl
}
  • restartEtcdNode方法通过membership.GetRaftClusterFromStorage(store)获取RaftCluster,之后通过cl.MemberByName(cfg.Name)获取Member,然后使用member.ID构造etcdraft.Config,最后根据etcdraft.Config执行etcdraft.RestartNode

小结

starRaft方法先通过rafthttp.NewRoundTripper创建http.RoundTripper,之后通过storage.NewDiskStorage创建DiskStorage,之后根据logExist及cfg.NewCluster做不同处理;若二者都为false则更新membership.RaftCluster的id为存在的cluster的id,然后执行startEtcdRaftNode;若cfg.NewCluster为true则使用cl.MemberIDs()来执行startEtcdRaftNode;若logExist为true则执行restartEtcdNode;最后创建rafthttp.Transport,执行tr.Start()、tr.AddRemote、tr.AddPeer

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • starRaft
  • startEtcdRaftNode
  • restartEtcdNode
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com