前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:raft(12)

golang源码分析:raft(12)

作者头像
golangLeetcode
发布2023-09-07 09:06:08
1270
发布2023-09-07 09:06:08
举报

下面我们来到更底一层,分别分析下pipeline和stream的实现,pipeline通过固定数量的goroutine 来分发处理消息:

代码语言:javascript
复制
type pipeline struct {
  peerID types.ID


  tr     *Transport
  picker *urlPicker
  status *peerStatus
  raft   Raft
  errorc chan error
  // deprecate when we depercate v2 API
  followerStats *stats.FollowerStats


  msgc chan raftpb.Message
  // wait for the handling routines
  wg    sync.WaitGroup
  stopc chan struct{}
}

核心逻辑位于start和handle方法中

代码语言:javascript
复制
func (p *pipeline) start() {
      for i := 0; i < connPerPipeline; i++ {
    go p.handle()
  }
代码语言:javascript
复制
func (p *pipeline) stop() {
    close(p.stopc)
    p.wg.Wait()

当channel 里面有消息可以消费的时候,调用http 的post方法通过protobuf协议将消息发送出去:

代码语言:javascript
复制
func (p *pipeline) handle() {
    defer p.wg.Done()
    for {
    select {
    case m := <-p.msgc:
      start := time.Now()
      err := p.post(pbutil.MustMarshal(&m))
代码语言:javascript
复制
func (p *pipeline) post(data []byte) (err error) {
   req := createPostRequest(p.tr.Logger, u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.tr.ID, p.tr.ClusterID)
   resp, err := p.tr.pipelineRt.RoundTrip(req)
  done <- struct{}{}

stream分为writer和reader两类:

代码语言:javascript
复制
type streamWriter struct {
  lg *zap.Logger


  localID types.ID
  peerID  types.ID


  status *peerStatus
  fs     *stats.FollowerStats
  r      Raft


  mu      sync.Mutex // guard field working and closer
  closer  io.Closer
  working bool


  msgc  chan raftpb.Message
  connc chan *outgoingConn
  stopc chan struct{}
  done  chan struct{}
}

初始化的时候会起一个协程

代码语言:javascript
复制
func startStreamWriter(lg *zap.Logger, local, id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
      w := &streamWriter{
      go w.run()

在协程内部根据消息类型进行分发处理:

代码语言:javascript
复制
func (cw *streamWriter) run() {
      flusher    http.Flusher
      for {
    select {
    case <-heartbeatc:
              if err == nil {
        flusher.Flush()
      case m := <-msgc:
      err := enc.encode(&m)
                if len(msgc) == 0 || batched > streamBufSize/2 {
          flusher.Flush()
          case conn := <-cw.connc:
      cw.mu.Lock()
      closed := cw.closeUnlocked()
      t = conn.t
      switch conn.t {
        flusher = conn.Flusher
        case <-cw.stopc:
      if cw.close() {
代码语言:javascript
复制
func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) {
代码语言:javascript
复制
func (cw *streamWriter) close() bool {
代码语言:javascript
复制
func (cw *streamWriter) closeUnlocked() bool {

当新连接建立的时候会通过attach方法将连接建立消息传递进来,这是消息循环的入口

代码语言:javascript
复制
func (cw *streamWriter) attach(conn *outgoingConn) bool {
  select {
  case cw.connc <- conn:
代码语言:javascript
复制
func (cw *streamWriter) stop() {

reader的实现类似

代码语言:javascript
复制
type streamReader struct {
  lg *zap.Logger


  peerID types.ID
  typ    streamType


  tr     *Transport
  picker *urlPicker
  status *peerStatus
  recvc  chan<- raftpb.Message
  propc  chan<- raftpb.Message


  rl *rate.Limiter // alters the frequency of dial retrial attempts


  errorc chan<- error


  mu     sync.Mutex
  paused bool
  closer io.Closer


  ctx    context.Context
  cancel context.CancelFunc
  done   chan struct{}
}

也是在start的时候启动goroutine

代码语言:javascript
复制
func (cr *streamReader) start() {
      go cr.run()
代码语言:javascript
复制
func (cr *streamReader) run() {
        for {
    rc, err := cr.dial(t)
      cr.status.activate()
      err = cr.decodeLoop(rc, t)
      err = cr.rl.Wait(cr.ctx)
      close(cr.done)

不同的是,它是根据不同的消息类型进行不同的处理

代码语言:javascript
复制
func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
  switch t {
  case streamTypeMsgAppV2:
    dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
  case streamTypeMessage:
    dec = &messageDecoder{r: rc}
        for {
    m, err := dec.decode()
      receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
      select {
    case recvc <- m:
    default:
代码语言:javascript
复制
    func (cr *streamReader) stop() {

dial方法会发起http请求

代码语言:javascript
复制
    func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
      uu.Path = path.Join(t.endpoint(cr.lg), cr.tr.ID.String())
      req, err := http.NewRequest(http.MethodGet, uu.String(), nil)
      req = req.WithContext(cr.ctx)
        switch resp.StatusCode {
  case http.StatusGone:
代码语言:javascript
复制
func (cr *streamReader) close() {
代码语言:javascript
复制
func (cr *streamReader) pause() {
代码语言:javascript
复制
func (cr *streamReader) resume() {
代码语言:javascript
复制
func checkStreamSupport(v *semver.Version, t streamType) bool {
本文参与?腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-09-06,如有侵权请联系?cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体同步曝光计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com