当前位置:主页 > 查看内容

golang rpc

发布时间:2021-06-05 00:00| 位朋友查看

简介:rpc的原理 将api序列化为字符串作为method_type调用Id(流水号)api的参数和返回结果作为args。作为整体再序列化为jsonprotobuf、cob等编码方式。通过tcp、http、传给服务端。 服务端进行解包查找注册表查找api并传参结果通过网络返回给客户端。 rpc各部分逻辑……

rpc的原理:

将api序列化为字符串,作为method_type,调用Id(流水号),api的参数和返回结果作为args。作为整体再序列化为json,protobuf、cob等编码方式。通过tcp、http、传给服务端。
服务端进行解包,查找注册表,查找api并传参,结果通过网络返回给客户端。

rpc各部分逻辑功能:

  • Application是方法注册、调用
  • client stub是注册表,也就是k-v,例如 “func add(a, b int) int”:object.add()
  • Client Runtime Library是session管理
  • Transport是数据编码(json、protobuf)、标准的网络传输层协议打包

在这里插入图片描述

代码

server端:

package main
  
import (
        "fmt"
        "log"
        "net"
        "net/http"
        "net/rpc"
        "time"
)

type Req struct {
        Data []byte
}

type Reply struct {
        Success string
}

type DiskQueue int

func (t *DiskQueue) Put(args *Req, Reply *Reply) error {
        fmt.Println(time.Now(), string(args.Data))
        Reply.Success = "yes"
        return nil
}

func main() {
        arith := new(DiskQueue)
        rpc.Register(arith)
        rpc.HandleHTTP()
        l, e := net.Listen("tcp", ":1234")
        if e != nil {
                log.Fatal("listen error:", e)
        }
        http.Serve(l, nil)
}

client端:

package main

import (
	"errors"
	"fmt"
	"net/rpc"
	"sync"
	"time"

	"github.com/gwaylib/log"
)

type Req struct {
	Data []byte
}
type Reply struct {
	Success string
}

// 同步调用rpc
func PostRpc(data []byte) error {
	if rpcc == nil {
		return errors.New("rpc client is nil")
	}
	req := Req{
		Data: data,
	}
	var res Reply
	err := rpcc.Call("DiskQueue.Put", req, &res)
	if err != nil {
		log.Error("rpc --------------------Err -------", err)
		return err
	}
	log.Info("----------", res.Success)

	return nil
}

var msgs = make(chan []byte, 1024)
var rpcc *rpc.Client

func init() {
	conn, err := rpc.DialHTTP("tcp", "127.0.0.1:1234")
	if err == nil {
		rpcc = conn
	}
}

func SendMsgLoop() {
	wg.Add(1)
	for {
		select {
		case msg := <-msgs:
			err := PostRpc(msg)
			if err != nil {
				Waterlevel := 8
				Threshold := 1 << 15
				for {
					if rpcc != nil {
						rpcc.Close()
					}
					fmt.Println("reconnect rpc server ...>>>")
					time.Sleep(time.Millisecond * time.Duration(Waterlevel))
					conn, err := rpc.DialHTTP("tcp", "127.0.0.1:1234")
					if err == nil {
						rpcc = conn
						break
					} else {
						fmt.Printf("Waterlevel=%+v, Threshold=%+v\n", Waterlevel, Threshold)
						if Waterlevel < Threshold {
							Waterlevel = Waterlevel << 1
						} else if Waterlevel >= Threshold {
							fmt.Println("超过最大次数,跳出本次rpc重连")
							break
						}
					}
				}
			}
			time.Sleep(time.Millisecond * 10)
		}
	}
	defer wg.Done()
}

// 如果满,则丢弃一个旧的消息
func SendMsg(msg []byte) {
	if len(msgs) > 1000 {
		<-msgs
	}
	msgs <- msg
}

// 向消息队列发消息
func PutMsgLoop() {
	wg.Add(1)
	for i := 0; i < 2000000; i++ {
		time.Sleep(time.Millisecond * 10) // 延时,以便观察断开server, 再开启server后,查看消息是否丢弃旧消息
		SendMsg([]byte(fmt.Sprintf("hello, %d", i)))
	}
	defer wg.Done()
}

var wg sync.WaitGroup

func main() {
	go PutMsgLoop()  // 消息生产测试,20万条消息
	go SendMsgLoop() //单独的go routine 执行rpc 客户端监听消息队列,有消息则发送,失败重连,超时则丢这条消息

	time.Sleep(time.Second * 2)
	wg.Wait()
}

运行效果:
在这里插入图片描述

;原文链接:https://blog.csdn.net/jacky128256/article/details/115548645
本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!
上一篇:ES6 var let const 的区别 下一篇:没有了

推荐图文


随机推荐