前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:raft(11)

golang源码分析:raft(11)

作者头像
golangLeetcode
发布2023-09-07 09:05:45
1660
发布2023-09-07 09:05:45
举报

前面提到transport将远程对象分为两类:remote和peer,分别代表新建立的连接和已经加入集群的节点,下面简单分析下它们的核心逻辑:

代码语言:javascript
复制
type remote struct {
  lg       *zap.Logger
  localID  types.ID
  id       types.ID
  status   *peerStatus
  pipeline *pipeline
}

初始化remote的时候会启动一个pipline对象,里面会通过channel将消息分发给handler协程,然后走http协议发送出去:

代码语言:javascript
复制
func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote {
   pipeline := &pipeline{
    peerID: id,
    tr:     tr,
    picker: picker,
    status: status,
    raft:   tr.Raft,
    errorc: tr.ErrorC,
  }
  pipeline.start()
      return &remote{
    lg:       tr.Logger,
    localID:  tr.ID,
    id:       id,
    status:   status,
    pipeline: pipeline,
  }
代码语言:javascript
复制
  func (g *remote) send(m raftpb.Message) {
    select {
  case g.pipeline.msgc <- m:
  default:
    if g.status.isActive() {

stop方法用来关闭pipeline。

代码语言:javascript
复制
func (g *remote) stop() {
  g.pipeline.stop()
}
代码语言:javascript
复制
func (g *remote) Pause() {
  g.stop()
}
代码语言:javascript
复制
func (g *remote) Resume() {
  g.pipeline.start()
}

Peer是一个接口,定义了集群内部节点之间通信的核心方法,peer实现了这个接口:

代码语言:javascript
复制
type Peer interface {
  // send sends the message to the remote peer. The function is non-blocking
  // and has no promise that the message will be received by the remote.
  // When it fails to send message out, it will report the status to underlying
  // raft.
  send(m raftpb.Message)


  // sendSnap sends the merged snapshot message to the remote peer. Its behavior
  // is similar to send.
  sendSnap(m snap.Message)


  // update updates the urls of remote peer.
  update(urls types.URLs)


  // attachOutgoingConn attaches the outgoing connection to the peer for
  // stream usage. After the call, the ownership of the outgoing
  // connection hands over to the peer. The peer will close the connection
  // when it is no longer used.
  attachOutgoingConn(conn *outgoingConn)
  // activeSince returns the time that the connection with the
  // peer becomes active.
  activeSince() time.Time
  // stop performs any necessary finalization and terminates the peer
  // elegantly.
  stop()
}

peer的属性包括raft对象和前面提到的pipeline,以及stream

代码语言:javascript
复制
type peer struct {
  lg *zap.Logger


  localID types.ID
  // id of the remote raft peer node
  id types.ID


  r Raft


  status *peerStatus


  picker *urlPicker


  msgAppV2Writer *streamWriter
  writer         *streamWriter
  pipeline       *pipeline
  snapSender     *snapshotSender // snapshot sender to send v3 snapshot messages
  msgAppV2Reader *streamReader
  msgAppReader   *streamReader


  recvc chan raftpb.Message
  propc chan raftpb.Message


  mu     sync.Mutex
  paused bool


  cancel context.CancelFunc // cancel pending works in go routine created by peer.
  stopc  chan struct{}
}

peer 代表了一个远程的raft节点,有两种方式把消息发送出去,stream和pipeline。stream是通过 long-polling 连接的。除了一般的流还有一个优化的流用来发送msgApp,发送大消息的分片。只有raft的leader会使用这个优化流来把消息发送给follower。

pipeline是一系列的http客户端,用来发送请求到remote,只是用在stream还没有被建立阶段。

代码语言:javascript
复制
func startPeer(t *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer {
   pipeline := &pipeline{
    peerID:        peerID,
    tr:            t,
    picker:        picker,
    status:        status,
    followerStats: fs,
    raft:          r,
    errorc:        errorc,
  }
  pipeline.start()
      p := &peer{
    lg:             t.Logger,
    localID:        t.ID,
    id:             peerID,
    r:              r,
    status:         status,
    picker:         picker,
    msgAppV2Writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
    writer:         startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
    pipeline:       pipeline,
    snapSender:     newSnapshotSender(t, picker, peerID, status),
    recvc:          make(chan raftpb.Message, recvBufSize),
    propc:          make(chan raftpb.Message, maxPendingProposals),
    stopc:          make(chan struct{}),
  }
    msgAppV2Writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
    writer:         startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
   go func() {
    for {
      select {
      case mm := <-p.recvc:
        if err := r.Process(ctx, mm); err != nil {
          if t.Logger != nil {
            t.Logger.Warn("failed to process Raft message", zap.Error(err))
          }
        }
      case <-p.stopc:
        return
      }
    }
  }()
   go func() {
    for {
      select {
      case mm := <-p.propc:
        if err := r.Process(ctx, mm); err != nil {
          if t.Logger != nil {
            t.Logger.Warn("failed to process Raft message", zap.Error(err))
          }
        }
      case <-p.stopc:
        return
      }
    }
  }()
    p.msgAppV2Reader = &streamReader{
    p.msgAppReader = &streamReader{
    p.msgAppV2Reader.start()
    p.msgAppReader.start()

然后是send方法,负责选取节点,把消息写入对应的channel里面:

代码语言:javascript
复制
func (p *peer) send(m raftpb.Message) {
  writec, name := p.pick(m)
  select {
  case writec <- m:

发送快照类似

代码语言:javascript
复制
func (p *peer) sendSnap(m snap.Message) {
  go p.snapSender.send(m)
}

有节点变化的时候,会更新

代码语言:javascript
复制
func (p *peer) update(urls types.URLs) {
  p.picker.update(urls)
}
代码语言:javascript
复制
func (p *peer) attachOutgoingConn(conn *outgoingConn) {
   switch conn.t {
  case streamTypeMsgAppV2:
    ok = p.msgAppV2Writer.attach(conn)
  case streamTypeMessage:
    ok = p.writer.attach(conn)

接受其他节点消息的http服务接口定义如下:

代码语言:javascript
复制
func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    conn := &outgoingConn{
    t:       t,
    Writer:  w,
    Flusher: w.(http.Flusher),
    Closer:  c,
    localID: h.tr.ID,
    peerID:  from,
  }
 // 初始化 stream writer
p.attachOutgoingConn(conn)
代码语言:javascript
复制
func (p *peer) activeSince() time.Time { return p.status.activeSince() }

暂停、关闭方法也是类似的

代码语言:javascript
复制
func (p *peer) Pause() {
  p.msgAppReader.pause()
  p.msgAppV2Reader.pause()
代码语言:javascript
复制
func (p *peer) Resume() {
  p.msgAppReader.resume()
  p.msgAppV2Reader.resume()
代码语言:javascript
复制
func (p *peer) stop() {
  p.msgAppV2Writer.stop()
  p.writer.stop()
  p.pipeline.stop()
  p.snapSender.stop()
  p.msgAppV2Reader.stop()
  p.msgAppReader.stop()

pick会根据消息类型选择不同的消息前缀

代码语言:javascript
复制
func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
   if isMsgSnap(m) {
    return p.pipeline.msgc, pipelineMsg
  } else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) {
    return writec, streamAppV2
  } else if writec, ok = p.writer.writec(); ok {
    return writec, streamMsg
  }
代码语言:javascript
复制
func isMsgApp(m raftpb.Message) bool { return m.Type == raftpb.MsgApp }
代码语言:javascript
复制
func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap }

至此消息发送方法定义完毕。

本文参与?腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-09-05,如有侵权请联系?cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com