将api序列化为字符串,作为method_type,调用Id(流水号),api的参数和返回结果作为args。作为整体再序列化为json,protobuf、cob等编码方式。通过tcp、http、传给服务端。
服务端进行解包,查找注册表,查找api并传参,结果通过网络返回给客户端。
server端:
package main
import (
"fmt"
"log"
"net"
"net/http"
"net/rpc"
"time"
)
type Req struct {
Data []byte
}
type Reply struct {
Success string
}
type DiskQueue int
func (t *DiskQueue) Put(args *Req, Reply *Reply) error {
fmt.Println(time.Now(), string(args.Data))
Reply.Success = "yes"
return nil
}
func main() {
arith := new(DiskQueue)
rpc.Register(arith)
rpc.HandleHTTP()
l, e := net.Listen("tcp", ":1234")
if e != nil {
log.Fatal("listen error:", e)
}
http.Serve(l, nil)
}
client端:
package main
import (
"errors"
"fmt"
"net/rpc"
"sync"
"time"
"github.com/gwaylib/log"
)
type Req struct {
Data []byte
}
type Reply struct {
Success string
}
// 同步调用rpc
func PostRpc(data []byte) error {
if rpcc == nil {
return errors.New("rpc client is nil")
}
req := Req{
Data: data,
}
var res Reply
err := rpcc.Call("DiskQueue.Put", req, &res)
if err != nil {
log.Error("rpc --------------------Err -------", err)
return err
}
log.Info("----------", res.Success)
return nil
}
var msgs = make(chan []byte, 1024)
var rpcc *rpc.Client
func init() {
conn, err := rpc.DialHTTP("tcp", "127.0.0.1:1234")
if err == nil {
rpcc = conn
}
}
func SendMsgLoop() {
wg.Add(1)
for {
select {
case msg := <-msgs:
err := PostRpc(msg)
if err != nil {
Waterlevel := 8
Threshold := 1 << 15
for {
if rpcc != nil {
rpcc.Close()
}
fmt.Println("reconnect rpc server ...>>>")
time.Sleep(time.Millisecond * time.Duration(Waterlevel))
conn, err := rpc.DialHTTP("tcp", "127.0.0.1:1234")
if err == nil {
rpcc = conn
break
} else {
fmt.Printf("Waterlevel=%+v, Threshold=%+v\n", Waterlevel, Threshold)
if Waterlevel < Threshold {
Waterlevel = Waterlevel << 1
} else if Waterlevel >= Threshold {
fmt.Println("超过最大次数,跳出本次rpc重连")
break
}
}
}
}
time.Sleep(time.Millisecond * 10)
}
}
defer wg.Done()
}
// 如果满,则丢弃一个旧的消息
func SendMsg(msg []byte) {
if len(msgs) > 1000 {
<-msgs
}
msgs <- msg
}
// 向消息队列发消息
func PutMsgLoop() {
wg.Add(1)
for i := 0; i < 2000000; i++ {
time.Sleep(time.Millisecond * 10) // 延时,以便观察断开server, 再开启server后,查看消息是否丢弃旧消息
SendMsg([]byte(fmt.Sprintf("hello, %d", i)))
}
defer wg.Done()
}
var wg sync.WaitGroup
func main() {
go PutMsgLoop() // 消息生产测试,20万条消息
go SendMsgLoop() //单独的go routine 执行rpc 客户端监听消息队列,有消息则发送,失败重连,超时则丢这条消息
time.Sleep(time.Second * 2)
wg.Wait()
}
运行效果:
【51CTO.com快译】 作为提高企业的运营效率和业务部门竞争力的必备工具,企业知...
最近几天更新了 vsCode 的版本,目前所用的版本号为:1.43。其实每次更新 vsCode...
复制代码 代码如下: html head meta http-equiv="Content-Type" content="text/h...
DIRECTORY_SEPARATOR在php是什么意思呢,在什么时候使用DIRECTORY_SEPARATOR最合...
本文实例为大家分享了Aspose.Cells实现导入导出的具体代码,供大家参考,具体内...
IT之家3月10日消息据微软博客发布,微软于 2021 年 3 月 9 日正式结束对传统经典...
话说几万年前,有一只猴子在大闹地府删库跑路,导致地府几百年没缓过劲儿来........
要将Web应用创建为便携式的Appimage包格式吗?Appnativefy就是能帮你实现这样功...
前2天群里发了张git历史图,如下: 根据提交历史,可以看出图中所有分支合并都采...
phpStudy启动失败,网上总结了基本就是下面的三种方法: 原因一是防火墙拦截,关...