前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >grpc-go之基本使用(一)

grpc-go之基本使用(一)

原创
作者头像
Johns
修改2022-09-28 17:34:29
1.3K0
修改2022-09-28 17:34:29
举报
文章被收录于专栏:代码工具代码工具

介绍

gRPC 是一个高性能、通用的开源 RPC 框架,其由 Google 主要面向移动应用开发并基于 HTTP/2 协议标准而设计,基于 ProtoBuf(Protocol Buffers) 序列化协议开发,且支持众多开发语言。

与许多 RPC 系统类似,gRPC 也是基于以下理念:定义一个**服务**,指定其能够被远程调用的方法(包含参数和返回类型)。在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用。在客户端拥有一个**存根**能够像服务端一样的方法

image.png
image.png

gRPC 默认使用 protocol buffers,这是 Google 开源的一套成熟的结构数据序列化机制(当然也可以使用其他数据格式如 JSON)。

grpc 优势

  • 高效的进程间通信 没有使用类似json和xml 的文本语言,而是采用二进制的Protocol Buffers
  • 双工流 尽管也是类似RESTful 的请求响应模式,但是却提供了steam 流式数据通信,相对于传统的restful api来说,速度更快,数据更小,接口要求更严谨。
  • 成熟,稳定,并且内置商业化特性,经过了大量大型开源项目的验证,如docker 、etcd 等等

grpc 缺点

  • grpc 生态相对于RESTful 还是比较小,因为浏览器和移动端对grpc支持依然在初级阶段
  • grpc 不太直接适合面向外部通信,强类型来说有更多约束,向外提供接口的解决方案是配合网关使用

环境搭建

protoc 安装

首先需要安装 protocol buffers compile, 内容还不比较多, 具体请参考: /developer/article/2118076

终端输入protoc –version能打印出版本信息, 就表示安装ok了

gRPC 安装

代码语言:shell
复制
go get -u google.golang.org/grpc

安装过程出现问题可参考解决办法:https://github.com/grpc/grpc-go#FAQ

gRPC plugins 安装

接着是下载 gRPC Plugins,用于生成 gRPC 相关源代码。

代码语言:shell
复制
go get google.golang.org/grpc/cmd/protoc-gen-go-grpc

终端输入protoc-gen-go-grpc --version能打印出版本信息, 就表示安装ok了

正常安装成功的话, $GOPATH/bin 目录下会有 protoc-gen-go、protoc-gen-go-grpc 这两个可执行文件。如果安装了但是说找不到指定文件, 将$GOPATH/bin加入环境变量便可

使用案例

下面以一个helloworld项目为例, 讲解grpc的基本使用

定义pb文件

代码语言:text
复制
//声明proto的版本 只有 proto3 才支持 gRPC
syntax = "proto3";
// 将编译后文件输出在 pb 目录
option go_package = "./pb";
// 指定当前proto文件属于helloworld包
package pb;

// 定义一个名叫 greeting 的服务
service Greeter {
  // 该服务包含一个 SayHello 方法 HelloRequest、HelloReply分别为该方法的输入与输出
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// 具体的参数定义
message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

当前目录结构为

代码语言:txt
复制
└── helloworld
    ├── client
    ├── pb
    │?? ├── hello_world.proto
    └── server

生成go代码

代码语言:shell
复制
$ cd helloworld
$ protoc --go_out=. --go-grpc_out=. ./pb/hello_world.proto
$ tree

└── helloworld
    ├── client
    ├── pb
    │?? ├── hello_world.proto
	│?? ├── hello_world.proto
    │?? └── hello_world_grpc.pb.go
    └── server

执行完命令行后会在pb目录里生成hello_world.pb.gohello_world_grpc.pb.go两个文件

编写 server 端

代码语言:go
复制
const (
	port = ":50051"
)

func main() {
	listen, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	// 指定使用 TLS credentials。
	s := grpc.NewServer()

	// 将服务描述(server)及其具体实现(greeterServer)注册到 gRPC 中去.
	// 内部使用的是一个 map 结构存储,类似 HTTP server。
	pb.RegisterGreeterServer(s, &GreeterServer{})
	pb.RegisterEchoServer(s, &Echo{})
	log.Println("Serving gRPC on 0.0.0.0" + port)
	if err := s.Serve(listen); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

// GreeterServer 定义一个结构体用于实现 .proto文件中定义的方法
// 新版本 gRPC 要求必须嵌入 pb.UnimplementedGreeterServer 结构体
type GreeterServer struct {
	pb.UnimplementedGreeterServer
}

// SayHello 简单实现一下.proto文件中定义的 SayHello 方法
func (g *GreeterServer) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
	log.Printf("Received Msg: %v", in.GetName())
	return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

编写 client 端

代码语言:go
复制
package main

import (
	"context"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/examples/data"
	"grpc-demo/helloworld/pb"
	"log"
	"os"
	"time"
)

const (
	address     = "localhost:50051"
	defaultName = "world"
)

func main() {
	// 建立连接
	conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	client := pb.NewGreeterClient(conn)

	// 通过命令行参数指定 name
	name := defaultName
	if len(os.Args) > 1 {
		name = os.Args[1]
	}
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	r, err := client.SayHello(ctx, &pb.HelloRequest{Name: name})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	log.Printf("Greeting: %s", r.GetMessage())
}

测试验证

服务端

代码语言:txt
复制
xxx-MB1:server xxx$ go build
xxx-MB1:server xxx$ ./server 
2022/09/26 23:09:55 Serving gRPC on 0.0.0.0:50051
2022/09/26 23:10:36 Received Msg: world

客户端

代码语言:txt
复制
xxx-MB1:client guirong$ go build 
xxx-MB1:client guirong$ ./client 
2022/09/26 23:10:36 Greeting: Hello world

通信模式

gRPC有如下4种类型的通信模式:

Stream 顾名思义就是一种流,可以源源不断的推送数据,很适合大数据传输,或者服务端和客户端长时间数据交互的场景。Stream API 和 Unary API 相比,因为省掉了中间每次建立连接的花费,所以效率上会提升一些。

1)UnaryAPI:普通一元方法

前面介绍的hello world 属于简单一元rpc模式,类似http 协议一问一答的这种模式.

2)ServerStreaming:服务端推送流(Stream API)

这种流模式可以理解为,服务器向客户端源源不断的发送数据流,应用场景很多,比如游戏玩家购买道具后数据变化需要将数据推送给客户端。

一元rpc模式下,grpc服务器端和grpc客户端在通信时始终只有一个请求和一个响应。在服务器端流rpc 模式下,服务端接收到一个请求后发送多个响应组成的序列,在服务器发送所有响应消息完毕后,发送trailer元数据给客户端,标识流结束。

案例

游戏玩家购买道具后数据变化需要将数据推送给客户端。

prop_update.proto
代码语言:go
复制
syntax = "proto3";
option go_package = "./pb";
service PropService {
  rpc UserProp (UserPropRequest) returns (stream UserPropResponse);
}

//UserPropRequest 使用道具
message UserPropRequest {
  string Id = 1; //道具配置id
  int64 Count = 2; //道具数量
}
//UserPropResponse 使用道具响应
message UserPropResponse {
}
//PropChangePush 道具变化推送
message PropChangePush{
  string PropId = 1; //道具id
  int64 Count = 2; //道具总数量
}
//ResourcesPush 资源变化推送
message ResourcesPush{
  string ResId = 1; //道具id
  int64 Count = 2; //道具总数量
}
生成go代码
代码语言:shell
复制
$ cd helloworld
$ protoc --go_out=. --go-grpc_out=. ./pb/prop_change.proto
$ tree

└── helloworld
    ├── client
    ├── pb
    │?? ├── hello_world.proto
	│?? ├── prop_update.pb.go
	│?? ├── prop_update.proto
	│?? └── prop_update_grpc.pb.go
    └── server

最终的目录结构

代码语言:txt
复制
├── client
│?? ├── client.go
│?? └── main.go
├── pb
│?? ├── hello_world.pb.go
│?? ├── hello_world.proto
│?? ├── hello_world_grpc.pb.go
│?? ├── prop_update.pb.go
│?? ├── prop_update.proto
│?? └── prop_update_grpc.pb.go
└── server
    ├── main.go
    └── server.go
server/server.go
代码语言:go
复制
// PropServer 道具服务
type PropServer struct {
	pb.UnimplementedPropServiceServer
}

// UserProp 主要负责道具更新
func (h *PropServer) UserProp(req *pb.UserPropRequest, stream pb.PropService_UserPropServer) error {
	if req.Count <= 0 {
		req.Count = 1 //错误处理,防止作弊
	}
	fmt.Println(req)
	err := stream.Send(&pb.UserPropResponse{})
	if err != nil {
		panic(err)
	}
	//假设道具减少,资源增加了
	err = stream.SendMsg(&pb.PropChangePush{
		PropId: req.Id,
		Count:  0,
	})
	if err != nil {
		panic(err)
	}
	err = stream.SendMsg(&pb.ResourcesPush{
		ResId: "res_1",
		Count: 100,
	})
	if err != nil {
		panic(err)
	}
	return nil
}
server/main.go
代码语言:go
复制
package main

import (
	"google.golang.org/grpc"
	"grpc-demo/helloworld/pb"
	"log"
	"net"
)

const (
	port = ":50051"
)

func main() {
	listen, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	s := grpc.NewServer()

	// 游戏发送多条资源变化信息给玩家
	pb.RegisterPropServiceServer(s, &PropServer{})

	log.Println("Serving gRPC on 0.0.0.0" + port)
	if err := s.Serve(listen); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}
启动server, 查看控制台输出
代码语言:txt
复制
$cd server
$go build
$./server

2022/09/27 11:59:34 Serving gRPC on 0.0.0.0:50051
Id:"prop_1" Count:1 
client/client.go
代码语言:go
复制
func serverStreamProp(client pb.PropServiceClient) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	stream, err := client.UserProp(ctx, &pb.UserPropRequest{
		Id:    "prop_1",
		Count: 1,
	})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	for {
		msg, err := stream.Recv()
		// 客户端接收流数据需要循环接收,直到出现io.EOF,代表服务器发送流数据已经完毕
		if err == io.EOF {
			break
		}
		log.Printf("msg: %s", msg)
	}
}
client/main.go
代码语言:go
复制
func main() {

	// 建立连接
	conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	client := pb.NewPropServiceClient(conn)
	serverStreamProp(client)
}
启动client, 查看控制台输出
代码语言:txt
复制
$cd client
$go build
$./client
2022/09/27 12:02:04 msg: 
2022/09/27 12:02:04 msg: 1:"prop_1" 
2022/09/27 12:02:04 msg: 1:"res_1" 2:100 

3)ClientStreaming:客户端推送流(Stream API)

客户端可以将数据源源不断发送给服务器,跟服务端流相反,客户端会发送多条响应,服务器发送一条响应,但是服务器不必等到发送完所有消息才响应。可以发送一条或几条消息就开始响应。

案例

IoT硬件将本地的缓存信息上传到服务器.

data.proto
代码语言:text
复制
syntax = "proto3";
option go_package = "./pb";
service DataService {
  rpc DataUpload (stream DataUploadRequest) returns (DataUploadResponse);
}
enum DeviceStatus {
  DeviceStatusStop = 0;    //停止
  DeviceStatusRunning = 1; //运行
  DeviceStatusIdle = 2;    //怠机
}
message DataUploadRequest {
  int64 Id = 1;//终端id
  int64 Temperature = 2;//温度
  int64 Humidity = 3;//湿度
  DeviceStatus status = 4;//设备状态
  int64 Time = 5 ; //数据产生时间
}
message DataUploadResponse {

}
生成go代码
代码语言:shell
复制
$ cd helloworld
$ protoc --go_out=. --go-grpc_out=. ./pb/data.proto
$ tree

└── helloworld
    ├── client
    ├── pb
	│?? ├── data.proto
    │?? ├── hello_world.proto
	│?? ├── prop_update.pb.go
	│?? ├── prop_update.proto
	│?? └── prop_update_grpc.pb.go
    └── server

最终的目录结构

代码语言:shell
复制
├── client
│?? ├── client.go
│?? └── main.go
├── pb
│?? ├── data.pb.go
│?? ├── data.proto
│?? ├── data_grpc.pb.go
│?? ├── hello_world.pb.go
│?? ├── hello_world.proto
│?? ├── hello_world_grpc.pb.go
│?? ├── prop_update.pb.go
│?? ├── prop_update.proto
│?? └── prop_update_grpc.pb.go
└── server
    ├── main.go
    └── server.go
server/server.go

新增DataServer实现数据上传服务

代码语言:go
复制
type DataServer struct {
	pb.UnsafeDataServiceServer
}

// DataUpload 数据上传
func (h *DataServer) DataUpload(stream pb.DataService_DataUploadServer) error {
	for {
		data, err := stream.Recv()
		if err == io.EOF { //已经接收完毕
			return stream.SendAndClose(&pb.DataUploadResponse{})
		}
		h.doSave(data)
	}
}

//doSave 将数据落到时序数据库
func (h *DataServer) doSave(data *pb.DataUploadRequest) {
	fmt.Println(data)
}
server/main.go
代码语言:go
复制
package main

import (
	"google.golang.org/grpc"
	"grpc-demo/helloworld/pb"
	"log"
	"net"
)

const (
	port = ":50051"
)

func main() {
	listen, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	s := grpc.NewServer()

	// 物联网硬件将本地的缓存信息上传到服务器
	pb.RegisterDataServiceServer(s, &DataServer{})

	log.Println("Serving gRPC on 0.0.0.0" + port)
	if err := s.Serve(listen); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}
启动server, 查看控制台输出
代码语言:shell
复制
$cd server
$ go build
$ ./server 
2022/09/27 14:21:00 Serving gRPC on 0.0.0.0:50051
Id:1 Temperature:37 Humidity:20 Time:1664259475 
Id:2 Temperature:36 Humidity:20 Time:1664259475 
Id:3 Temperature:40 Humidity:21 Time:1664259475 
Id:4 Temperature:42 Humidity:22 Time:1664259475 
client/client.go
代码语言:go
复制
func clientStreamData(client pb.DataServiceClient) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	stream, err := client.DataUpload(ctx)
	if err != nil {
		panic(err)
	}
	for _, v := range GetData() {
		err = stream.Send(v)
		if err != nil {
			panic(err)
		}
	}
	response, err := stream.CloseAndRecv()
	if err != nil && err != io.EOF {
		panic(err)
	}
	fmt.Println(response)
}

//GetData 模拟物联网设备传数据
func GetData() (res []*pb.DataUploadRequest) {
	res = append(res, &pb.DataUploadRequest{
		Id:          1,
		Temperature: 45,
		Humidity:    20,
		Time:        1637502637,
	})
	res = append(res, &pb.DataUploadRequest{
		Id:          2,
		Temperature: 46,
		Humidity:    20,
		Time:        1637502638,
	})
	res = append(res, &pb.DataUploadRequest{
		Id:          3,
		Temperature: 47,
		Humidity:    21,
		Time:        1637502640,
	})
	res = append(res, &pb.DataUploadRequest{
		Id:          4,
		Temperature: 48,
		Humidity:    22,
		Time:        1637502659,
	})
	return res
}
client/main.go
代码语言:go
复制
package main

import (
	"google.golang.org/grpc"
	"grpc-demo/helloworld/pb"
	"log"
)

const (
	address     = "localhost:50051"
)

func main() {
	// 建立连接
	conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	client4 := pb.NewDataServiceClient(conn)
	clientStreamData(client4)
}
启动client, 查看控制台输出
代码语言:shell
复制
$cd client
$ go build
$ ./client 

4)BidirectionalStreaming:双向推送流(Stream API)

双方都可以将数据源源不断发给对方。简单来说就是上面客户端流和服务器流的一个整合。

案例说明

玩家连续进行了多次战斗请求,服务器将操作结果响应给玩家

定义pb
代码语言:txt
复制
syntax = "proto3";
option go_package = "./pb";

service BattleService {
  rpc Battle (stream BattleRequest) returns (stream BattleResponse);
}

message HeroInfo{
  string Id = 1; //英雄id
  int64 Life = 2;//英雄生命
}

//请求战斗
message BattleRequest {
  string HeroId = 1; //英雄id
  string SkillId = 2; //技能id
}
message BattleResponse {
  repeated HeroInfo hero = 1;
  repeated SkillInfo skill = 2;
}
message SkillInfo {
  string SkillId = 1; //技能id
  int64 CoolDown = 2;//技能冷却时间
}
生成go代码
代码语言:shell
复制
$ cd helloworld
$ protoc --go_out=. --go-grpc_out=. ./pb/battle.proto
$ tree

└── helloworld
    ├── client
    ├── pb
	│?? ├── battle.proto
	│?? ├── data.proto
    │?? ├── hello_world.proto
	│?? ├── prop_update.pb.go
	│?? ├── prop_update.proto
	│?? └── prop_update_grpc.pb.go
    └── server

最终的目录结构

代码语言:shell
复制
├── client
│?? ├── client.go
│?? └── main.go
├── pb
│?? ├── battle.pb.go
│?? ├── battle.proto
│?? ├── battle_grpc.pb.go
│?? ├── data.pb.go
│?? ├── data.proto
│?? ├── data_grpc.pb.go
│?? ├── hello_world.pb.go
│?? ├── hello_world.proto
│?? ├── hello_world_grpc.pb.go
│?? ├── prop_update.pb.go
│?? ├── prop_update.proto
│?? └── prop_update_grpc.pb.go
└── server
    ├── main.go
    └── server.go
server/server.go
代码语言:go
复制
type BattleServer struct {
	pb.UnimplementedBattleServiceServer
}

// Battle 战斗服务
func (h *BattleServer) Battle(steam pb.BattleService_BattleServer) error {
	for {
		req, err := steam.Recv()
		fmt.Println(req)
		if err == io.EOF { //发送最后一次结果给前端
			err = steam.Send(&pb.BattleResponse{})
			if err != nil {
				log.Println(err)
			}
			return nil
		}
		err = steam.Send(&pb.BattleResponse{
			Hero: []*pb.HeroInfo{
				{Id: "hero_1", Life: 999},
			},
			Skill: []*pb.SkillInfo{
				{SkillId: "skill_1", CoolDown: 1664249248},
				{SkillId: "skill_2", CoolDown: 1664249293},
			},
		})
		if err != nil {
			log.Println(err)
		}
	}
}
server/main.go
代码语言:txt
复制
package main

import (
	"google.golang.org/grpc"
	"grpc-demo/helloworld/pb"
	"log"
	"net"
)

const (
	port = ":50051"
)

func main() {
	listen, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	s := grpc.NewServer()
	// 玩家连续进行了多次战斗请求,服务器将操作结果响应给玩家
	pb.RegisterBattleServiceServer(s, &BattleServer{})
	log.Println("Serving gRPC on 0.0.0.0" + port)
	if err := s.Serve(listen); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}
启动server, 观察控制台返回
代码语言:shell
复制
$ cd server
$ go build
$ ./server 
2022/09/27 15:55:05 Serving gRPC on 0.0.0.0:50051
HeroId:"hero_1" SkillId:"Skill_1" 
HeroId:"hero_2" SkillId:"Skill_2" 
<nil>
client/client.go
代码语言:go
复制
func bidirectionalStreamBattle(client pb.BattleServiceClient) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	stream, err := client.Battle(ctx)
	if err != nil {
		log.Fatalf("could not battle: %v", err)
	}
	err = stream.SendMsg(&pb.BattleRequest{
		HeroId:  "hero_1",
		SkillId: "Skill_1",
	})
	if err != nil {
		log.Fatalf("could not battle: %v", err)
	}
	err = stream.SendMsg(&pb.BattleRequest{
		HeroId:  "hero_2",
		SkillId: "Skill_2",
	})
	if err != nil {
		log.Fatalf("could not battle: %v", err)
	}
	ch := make(chan struct{})
	go asyncDoBattle(stream, ch)
	err = stream.CloseSend()
	if err != nil {
		log.Fatalf("could not battle: %v", err)
	}
	<-ch
}
func asyncDoBattle(stream pb.BattleService_BattleClient, c chan struct{}) {
	for {
		rsp, err := stream.Recv()
		if err == io.EOF {
			break
		}
		fmt.Println(rsp)
	}
	c <- struct{}{}
}
client/main.go
代码语言:txt
复制
package main

import (
	"google.golang.org/grpc"
	"grpc-demo/helloworld/pb"
	"log"
)

const (
	address     = "localhost:50051"
	defaultName = "world"
)

func main() {

	// 建立连接
	conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	client := pb.NewBattleServiceClient(conn)
	bidirectionalStreamBattle(client)
}
启动client, 观察控制台返回
代码语言:shell
复制
$cd client
$ go build
$ ./client 
hero:<Id:"hero_1" Life:999 > skill:<SkillId:"skill_1" CoolDown:1664249248 > skill:<SkillId:"skill_2" CoolDown:1664249293 > 
hero:<Id:"hero_1" Life:999 > skill:<SkillId:"skill_1" CoolDown:1664249248 > skill:<SkillId:"skill_2" CoolDown:1664249293 > 

SSL/TLS认证

gRPC 内置了以下 encryption 机制:

  • SSL / TLS:通过证书进行数据加密;
  • ALTS:Google开发的一种双向身份验证和传输加密系统。这种除非在用的是谷歌云上否则不推荐用.

gRPC 中的连接类型一共有以下3种:

  • insecure connection 不使用TLS加密, 这种情况客户端和服务器之间传输的所有数据都未加密。所以请不要在生产中使用它!
  • server-side TLS 仅服务端TLS加密, 这种情况下,所有数据都被加密,但只有服务器需要向客户端提供其 TLS 证书。如果服务器不关心哪个客户端正在调用其 API,则可以使用这种类型的连接。
  • mutual TLS:客户端、服务端都使用TLS加密 当服务器还需要验证谁在调用它的服务时,我们会使用它。所以在这种情况下,客户端和服务器都必须向对方提供他们的 TLS 证书。
image.png
image.png

案例

下面演示一下mutual TLS的情况

制作证书

代码语言:shell
复制
# CA证书制作
# 生成.key  私钥文件
openssl genrsa -out ca.key 2048

# 生成.csr 证书签名请求文件
openssl req -new -key ca.key -out ca.csr  -subj "/C=GB/L=China/O=lixd/CN=www.ggr.com"

# 自签名生成.crt 证书文件
openssl req -new -x509 -days 3650 -key ca.key -out ca.crt  -subj "/C=GB/L=China/O=lixd/CN=www.ggr.com"

find / -name "openssl.cnf"

# 注意下面使用的/root/anaconda3/ssl/openssl.cnf就是上面find / -name "openssl.cnf" 到的结果

# 服务端证书制作
# 生成.key  私钥文件
openssl genrsa -out server.key 2048

# 生成.csr 证书签名请求文件
openssl req -new -key server.key -out server.csr -subj "/C=GB/L=China/O=lixd/CN=www.ggr.com" -reqexts SAN -config <(cat /root/anaconda3/ssl/openssl.cnf <(printf "\n[SAN]\nsubjectAltName=DNS:*.ggr.com,DNS:*.ggr.com"))

# 签名生成.crt 证书文件
openssl x509 -req -days 3650 -in server.csr -out server.crt -CA ca.crt -CAkey ca.key -CAcreateserial -extensions SAN -extfile <(cat /root/anaconda3/ssl/openssl.cnf <(printf "\n[SAN]\nsubjectAltName=DNS:*.ggr.com,DNS:*.ggr.com"))

# 客户端证书制作
# 生成.key  私钥文件
openssl genrsa -out client.key 2048

# 生成.csr 证书签名请求文件
openssl req -new -key client.key -out client.csr -subj "/C=GB/L=China/O=lixd/CN=www.ggr.com" -reqexts SAN -config <(cat /root/anaconda3/ssl/openssl.cnf <(printf "\n[SAN]\nsubjectAltName=DNS:*.ggr.com,DNS:*.ggr.com"))

# 签名生成.crt 证书文件
openssl x509 -req -days 3650 -in client.csr -out client.crt -CA ca.crt -CAkey ca.key -CAcreateserial -extensions SAN -extfile <(cat /root/anaconda3/ssl/openssl.cnf <(printf "\n[SAN]\nsubjectAltName=DNS:*.ggr.com,DNS:*.ggr.com"))

证书生成后的目录, 注意可以将server.csr, ca.csr, ca.srl, ca.key 删除掉, 操作后目录为:

代码语言:txt
复制
.
├── features
└── helloworld
    ├── client
    │?? ├── ca.crt
    │?? ├── client.crt
    │?? ├── client.go
    │?? ├── client.key
    │?? └── main.go
    ├── pb
    │?? ├── battle.pb.go
    │?? ├── battle.proto
    │?? ├── battle_grpc.pb.go
    │?? ├── data.pb.go
    │?? ├── data.proto
    │?? ├── data_grpc.pb.go
    │?? ├── hello_world.pb.go
    │?? ├── hello_world.proto
    │?? ├── hello_world_grpc.pb.go
    │?? ├── prop_update.pb.go
    │?? ├── prop_update.proto
    │?? └── prop_update_grpc.pb.go
    └── server
        ├── ca.crt
        ├── main.go
        ├── server.crt
        ├── server.go
        └── server.key

服务端调整

server/main.go

代码语言:txt
复制
package main

import (
	"crypto/tls"
	"crypto/x509"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/examples/data"
	"grpc-demo/helloworld/pb"
	"io/ioutil"
	"log"
	"net"
)

const (
	port = ":50051"
)

func main() {
	// 从证书相关文件中读取和解析信息,得到证书公钥、密钥对
	certificate, err := tls.LoadX509KeyPair(data.Path("/Users/ggr
	/go/src/grpc-demo/helloworld/server/server.crt"),
		data.Path("/Users/ggr/go/src/grpc-demo/helloworld/server/server.key"))
	if err != nil {
		log.Fatal(err)
	}
	// 创建CertPool,后续就用池里的证书来校验客户端证书有效性
	// 所以如果有多个客户端 可以给每个客户端使用不同的 CA 证书,来实现分别校验的目的
	certPool := x509.NewCertPool()
	ca, err := ioutil.ReadFile(data.Path("/Users/ggr/go/src/grpc-demo/helloworld/server/ca.crt"))
	if err != nil {
		log.Fatal(err)
	}
	if ok := certPool.AppendCertsFromPEM(ca); !ok {
		log.Fatal("failed to append certs")
	}

	// 构建基于 TLS 的 TransportCredentials
	cred := credentials.NewTLS(&tls.Config{
		// 设置证书链,允许包含一个或多个
		Certificates: []tls.Certificate{certificate},
		// 要求必须校验客户端的证书 可以根据实际情况选用其他参数
		ClientAuth: tls.RequireAndVerifyClientCert, // NOTE: this is optional!
		// 设置根证书的集合,校验方式使用 ClientAuth 中设定的模式
		ClientCAs: certPool,
	})

	s := grpc.NewServer(grpc.Creds(cred))
	// 玩家连续进行了多次战斗请求,服务器将操作结果响应给玩家
	pb.RegisterBattleServiceServer(s, &BattleServer{})

	listen, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	log.Println("Serving gRPC on 0.0.0.0" + port)
	if err := s.Serve(listen); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

客户端调整

client/client.go

代码语言:txt
复制
package main

import (
	"crypto/tls"
	"crypto/x509"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/examples/data"
	"grpc-demo/helloworld/pb"
	"io/ioutil"
	"log"
)

const (
	address = "localhost:50051"
)

func main() {
	// 加载客户端证书
	certificate, err := tls.LoadX509KeyPair(data.Path("/Users/ggr/go/src/grpc-demo/helloworld/client/client.crt"),
		data.Path("/Users/ggr/go/src/grpc-demo/helloworld/client/client.key"))
	if err != nil {
		log.Fatal(err)
	}
	// 构建CertPool以校验服务端证书有效性
	certPool := x509.NewCertPool()
	ca, err := ioutil.ReadFile(data.Path("/Users/ggr/go/src/grpc-demo/helloworld/client/ca.crt"))
	if err != nil {
		log.Fatal(err)
	}
	if ok := certPool.AppendCertsFromPEM(ca); !ok {
		log.Fatal("failed to append ca certs")
	}

	cred := credentials.NewTLS(&tls.Config{
		Certificates: []tls.Certificate{certificate},
		ServerName:   "www.ggr.com", // NOTE: this is required!
		RootCAs:      certPool,
	})
	conn, err := grpc.Dial(address, grpc.WithTransportCredentials(cred))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	client := pb.NewBattleServiceClient(conn)
	bidirectionalStreamBattle(client)
}

测试

服务端

代码语言:shell
复制
/usr/local/go/bin/go build -o /private/var/folders/kk/5llx7c2j0r90sp2hhlptlc1m0000gn/T/___go_build_grpc_demo_helloworld_server grpc-demo/helloworld/server #gosetup
/private/var/folders/kk/5llx7c2j0r90sp2hhlptlc1m0000gn/T/___go_build_grpc_demo_helloworld_server
2022/09/27 18:04:32 Serving gRPC on 0.0.0.0:50051
HeroId:"hero_1" SkillId:"Skill_1" 
HeroId:"hero_2" SkillId:"Skill_2" 
<nil>

客户端

代码语言:shell
复制
/usr/local/go/bin/go build -o /private/var/folders/kk/5llx7c2j0r90sp2hhlptlc1m0000gn/T/___go_build_grpc_demo_helloworld_client grpc-demo/helloworld/client #gosetup
/private/var/folders/kk/5llx7c2j0r90sp2hhlptlc1m0000gn/T/___go_build_grpc_demo_helloworld_client
hero:<Id:"hero_1" Life:999 > skill:<SkillId:"skill_1" CoolDown:1664249248 > skill:<SkillId:"skill_2" CoolDown:1664249293 > 
hero:<Id:"hero_1" Life:999 > skill:<SkillId:"skill_1" CoolDown:1664249248 > skill:<SkillId:"skill_2" CoolDown:1664249293 > 

拦截器Interceptor

gRPC 提供了 Interceptor 功能,包括客户端拦截器和服务端拦截器。可以在接收到请求或者发起请求之前优先对请求中的数据做一些处理后再转交给指定的服务处理并响应,很适合在这里处理验证、日志等流程。

拦截器分为一元拦截器和流拦截器,服务端拦截器和客户端拦截器,所以一共有以下4种类型:

  • grpc.UnaryServerInterceptor
  • grpc.UnaryClientInterceptor
  • grpc.StreamServerInterceptor
  • grpc.StreamClientInterceptor

一元拦截器

一元拦截器可以分为3个阶段:

1)预处理(pre-processing)

2)调用RPC方法(invoking RPC method)

3)后处理(post-processing)

流拦截器

流拦截器过程和一元拦截器有所不同,同样可以分为3个阶段:

  • 1)预处理(pre-processing)
  • 2)调用RPC方法(invoking RPC method)
  • 3)后处理(post-processing)

预处理阶段和一元拦截器类似,但是调用RPC方法和后处理这两个阶段则完全不同。

StreamAPI 的请求和响应都是通过 Stream 进行传递的,更确切说是通过 Streamer 调用 SendMsg 和 RecvMsg 这两个方法获取的, 然后 Streamer 又是调用RPC方法来获取的,所以在流拦截器中我们可以对 Streamer 进行包装,然后实现 SendMsg 和 RecvMsg 这两个方法

案例

server/server.go

代码语言:go
复制
package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	pb "grpc-demo/helloworld/pb"
	"io"
	"log"
	"sync"
	"time"
)


type Echo struct {
	pb.UnimplementedEchoServer
}

// UnaryEcho 一个普通的UnaryAPI
func (e *Echo) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
	log.Printf("Recved: %v", req.GetMessage())
	resp := &pb.EchoResponse{Message: req.GetMessage()}
	return resp, nil
}

//  ServerStreamingEcho 客户端发送一个请求 服务端以流的形式循环发送多个响应
/*
1. 获取客户端请求参数
2. 处理完成后返回过个响应
3. 最后返回nil表示已经完成响应
*/
func (e *Echo) ServerStreamingEcho(req *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error {
	log.Printf("Recved %v", req.GetMessage())
	// 具体返回多少个response根据业务逻辑调整
	for i := 0; i < 2; i++ {
		// 通过 send 方法不断推送数据
		err := stream.Send(&pb.EchoResponse{Message: req.GetMessage()})
		if err != nil {
			log.Fatalf("Send error:%v", err)
			return err
		}
	}
	// 返回nil表示已经完成响应
	return nil
}

// ClientStreamingEcho 客户端流
/*
1. for循环中通过stream.Recv()不断接收client传来的数据
2. err == io.EOF表示客户端已经发送完毕关闭连接了,此时在等待服务端处理完并返回消息
3. stream.SendAndClose() 发送消息并关闭连接(虽然在客户端流里服务器这边并不需要关闭 但是方法还是叫的这个名字,内部也只会调用Send())
*/
func (e *Echo) ClientStreamingEcho(stream pb.Echo_ClientStreamingEchoServer) error {
	// 1.for循环接收客户端发送的消息
	for {
		// 2. 通过 Recv() 不断获取客户端 send()推送的消息
		req, err := stream.Recv() // Recv内部也是调用RecvMsg
		// 3. err == io.EOF表示已经获取全部数据
		if err == io.EOF {
			log.Println("client closed")
			// 4.SendAndClose 返回并关闭连接
			// 在客户端发送完毕后服务端即可返回响应
			return stream.SendAndClose(&pb.EchoResponse{Message: "ok"})
		}
		if err != nil {
			return err
		}
		log.Printf("Recved %v", req.GetMessage())
	}
}

// BidirectionalStreamingEcho 双向流服务端
/*
// 1. 建立连接 获取client
// 2. 通过client调用方法获取stream
// 3. 开两个goroutine(使用 chan 传递数据) 分别用于Recv()和Send()
// 3.1 一直Recv()到err==io.EOF(即客户端关闭stream)
// 3.2 Send()则自己控制什么时候Close 服务端stream没有close 只要跳出循环就算close了。 具体见https://github.com/grpc/grpc-go/issues/444
*/
func (e *Echo) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
	var (
		waitGroup sync.WaitGroup
		msgCh     = make(chan string)
	)
	waitGroup.Add(1)
	go func() {
		defer waitGroup.Done()

		for v := range msgCh {
			err := stream.Send(&pb.EchoResponse{Message: v})
			if err != nil {
				fmt.Println("Send error:", err)
				continue
			}
		}
	}()

	waitGroup.Add(1)
	go func() {
		defer waitGroup.Done()
		for {
			req, err := stream.Recv()
			if err == io.EOF {
				break
			}
			if err != nil {
				log.Fatalf("recv error:%v", err)
			}
			fmt.Printf("Recved :%v \n", req.GetMessage())
			msgCh <- req.GetMessage()
		}
		close(msgCh)
	}()
	waitGroup.Wait()

	// 返回nil表示已经完成响应
	return nil
}

// PropServer 道具服务
type PropServer struct {
	pb.UnimplementedPropServiceServer
}

// UserProp 主要负责道具更新
func (h *PropServer) UserProp(req *pb.UserPropRequest, stream pb.PropService_UserPropServer) error {
	if req.Count <= 0 {
		req.Count = 1 //错误处理,防止作弊
	}
	fmt.Println("收到道具使用的请求:", req)
	err := stream.Send(&pb.UserPropResponse{})
	if err != nil {
		panic(err)
	}
	//假设道具减少,资源增加了
	err = stream.SendMsg(&pb.PropChangePush{
		PropId: req.Id,
		Count:  0,
	})
	if err != nil {
		panic(err)
	}
	err = stream.SendMsg(&pb.ResourcesPush{
		ResId: "res_1",
		Count: 100,
	})
	if err != nil {
		panic(err)
	}
	return nil
}

type DataServer struct {
	pb.UnsafeDataServiceServer
}

// DataUpload 数据上传
func (h *DataServer) DataUpload(stream pb.DataService_DataUploadServer) error {
	for {
		data, err := stream.Recv()
		if err == io.EOF { //已经接收完毕
			return stream.SendAndClose(&pb.DataUploadResponse{})
		}
		h.doSave(data)
	}
}

//doSave 将数据落到时序数据库
func (h *DataServer) doSave(data *pb.DataUploadRequest) {
	fmt.Println(data)
}

type BattleServer struct {
	pb.UnimplementedBattleServiceServer
}

func (h *BattleServer) Battle(steam pb.BattleService_BattleServer) error {
	for {
		req, err := steam.Recv()
		fmt.Println(req)
		if err == io.EOF { //发送最后一次结果给前端
			err = steam.Send(&pb.BattleResponse{})
			if err != nil {
				log.Println(err)
			}
			return nil
		}
		err = steam.Send(&pb.BattleResponse{
			Hero: []*pb.HeroInfo{
				{Id: "hero_1", Life: 999},
			},
			Skill: []*pb.SkillInfo{
				{SkillId: "skill_1", CoolDown: 1664249248},
				{SkillId: "skill_2", CoolDown: 1664249293},
			},
		})
		if err != nil {
			log.Println(err)
		}
	}
}

// GreeterServer 定义一个结构体用于实现 .proto文件中定义的方法
// 新版本 gRPC 要求必须嵌入 pb.UnimplementedGreeterServer 结构体
type GreeterServer struct {
	pb.UnimplementedGreeterServer
}

// SayHello 简单实现一下.proto文件中定义的 SayHello 方法
func (g *GreeterServer) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
	log.Printf("Received Msg: %v", in.GetName())
	return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{},
	error) {
	start := time.Now()
	m, err := handler(ctx, req)
	end := time.Now()
	// 记录请求参数 耗时 错误信息等数据
	log.Printf("RPC: %s,req:%v start time: %s, end time: %s, err: %v", info.FullMethod, req, start.Format(time.RFC3339),
		end.Format(time.RFC3339), err)
	return m, err
}

type wrappedStream struct {
	grpc.ServerStream
}

func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {
	return &wrappedStream{s}
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
	log.Printf("Receive a message (Type: %T) at %s", m, time.Now().Format(time.RFC3339))
	return w.ServerStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
	log.Printf("Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
	return w.ServerStream.SendMsg(m)
}

func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	// 包装 grpc.ServerStream 以替换 RecvMsg SendMsg这两个方法。
	err := handler(srv, newWrappedStream(ss))
	if err != nil {
		log.Printf("RPC failed with error %v", err)
	}
	return err
}

server/main.go

使用的时候需要显示设置拦截器选项

代码语言:go
复制
package main

import (
	"crypto/tls"
	"crypto/x509"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/examples/data"
	"grpc-demo/helloworld/pb"
	"io/ioutil"
	"log"
	"net"
)

const (
	port = ":50051"
)

func main() {
	// 从证书相关文件中读取和解析信息,得到证书公钥、密钥对
	certificate, err := tls.LoadX509KeyPair(data.Path("/Users/guirong/go/src/grpc-demo/helloworld/server/server.crt"),
		data.Path("/Users/guirong/go/src/grpc-demo/helloworld/server/server.key"))
	if err != nil {
		log.Fatal(err)
	}
	// 创建CertPool,后续就用池里的证书来校验客户端证书有效性
	// 所以如果有多个客户端 可以给每个客户端使用不同的 CA 证书,来实现分别校验的目的
	certPool := x509.NewCertPool()
	ca, err := ioutil.ReadFile(data.Path("/Users/guirong/go/src/grpc-demo/helloworld/server/ca.crt"))
	if err != nil {
		log.Fatal(err)
	}
	if ok := certPool.AppendCertsFromPEM(ca); !ok {
		log.Fatal("failed to append certs")
	}

	// 构建基于 TLS 的 TransportCredentials
	cred := credentials.NewTLS(&tls.Config{
		// 设置证书链,允许包含一个或多个
		Certificates: []tls.Certificate{certificate},
		// 要求必须校验客户端的证书 可以根据实际情况选用其他参数
		ClientAuth: tls.RequireAndVerifyClientCert, // NOTE: this is optional!
		// 设置根证书的集合,校验方式使用 ClientAuth 中设定的模式
		ClientCAs: certPool,
	})

	s := grpc.NewServer(grpc.Creds(cred), grpc.UnaryInterceptor(unaryInterceptor), grpc.StreamInterceptor(streamInterceptor))

	pb.RegisterGreeterServer(s, &GreeterServer{})
	// 玩家连续进行了多次战斗请求,服务器将操作结果响应给玩家
	pb.RegisterBattleServiceServer(s, &BattleServer{})

	listen, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	log.Println("Serving gRPC on 0.0.0.0" + port)
	if err := s.Serve(listen); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

client/client.go

代码语言:go
复制
package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	"grpc-demo/helloworld/pb"
	"io"
	"log"
	"os"
	"sync"
	"time"
)

func unary(client pb.EchoClient) {
	resp, err := client.UnaryEcho(context.Background(), &pb.EchoRequest{Message: "hello world"})
	if err != nil {
		log.Printf("send error:%v\n", err)
	}
	fmt.Printf("Recved:%v \n", resp.GetMessage())
}

/*
1. 建立连接 获取client
2. 通过 client 获取stream
3. for循环中通过stream.Recv()依次获取服务端推送的消息
4. err==io.EOF则表示服务端关闭stream了
*/
func serverStream(client pb.EchoClient) {
	// 2.调用获取stream
	stream, err := client.ServerStreamingEcho(context.Background(), &pb.EchoRequest{Message: "Hello World"})
	if err != nil {
		log.Fatalf("could not echo: %v", err)
	}

	// 3. for循环获取服务端推送的消息
	for {
		// 通过 Recv() 不断获取服务端send()推送的消息
		resp, err := stream.Recv()
		// 4. err==io.EOF则表示服务端关闭stream了 退出
		if err == io.EOF {
			log.Println("server closed")
			break
		}
		if err != nil {
			log.Printf("Recv error:%v", err)
			continue
		}
		log.Printf("Recv data:%v", resp.GetMessage())
	}
}

func serverStreamProp(client pb.PropServiceClient) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	stream, err := client.UserProp(ctx, &pb.UserPropRequest{
		Id:    "prop_1",
		Count: 1,
	})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	for {
		msg, err := stream.Recv()
		// 客户端接收流数据需要循环接收,直到出现io.EOF,代表服务器发送流数据已经完毕
		if err == io.EOF {
			break
		}
		log.Printf("msg: %s", msg)
	}
}

// clientStream 客户端流
/*
1. 建立连接并获取client
2. 获取 stream 并通过 Send 方法不断推送数据到服务端
3. 发送完成后通过stream.CloseAndRecv() 关闭steam并接收服务端返回结果
*/
func clientStream(client pb.EchoClient) {
	// 2.获取 stream 并通过 Send 方法不断推送数据到服务端
	stream, err := client.ClientStreamingEcho(context.Background())
	if err != nil {
		log.Fatalf("Sum() error: %v", err)
	}
	for i := int64(0); i < 2; i++ {
		err := stream.Send(&pb.EchoRequest{Message: "hello world"})
		if err != nil {
			log.Printf("send error: %v", err)
			continue
		}
	}

	// 3. 发送完成后通过stream.CloseAndRecv() 关闭steam并接收服务端返回结果
	// (服务端则根据err==io.EOF来判断client是否关闭stream)
	resp, err := stream.CloseAndRecv()
	if err != nil {
		log.Fatalf("CloseAndRecv() error: %v", err)
	}
	log.Printf("sum: %v", resp.GetMessage())
}

func clientStreamData(client pb.DataServiceClient) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	stream, err := client.DataUpload(ctx)
	if err != nil {
		panic(err)
	}
	for _, v := range GetData() {
		err = stream.Send(v)
		if err != nil {
			panic(err)
		}
	}
	response, err := stream.CloseAndRecv()
	if err != nil && err != io.EOF {
		panic(err)
	}
	fmt.Println(response)
}

//GetData 模拟物联网设备传数据
func GetData() (res []*pb.DataUploadRequest) {
	res = append(res, &pb.DataUploadRequest{
		Id:          1,
		Temperature: 37,
		Humidity:    20,
		Time:        1664259475,
	})
	res = append(res, &pb.DataUploadRequest{
		Id:          2,
		Temperature: 36,
		Humidity:    20,
		Time:        1664259475,
	})
	res = append(res, &pb.DataUploadRequest{
		Id:          3,
		Temperature: 40,
		Humidity:    21,
		Time:        1664259475,
	})
	res = append(res, &pb.DataUploadRequest{
		Id:          4,
		Temperature: 42,
		Humidity:    22,
		Time:        1664259475,
	})
	return res
}

// bidirectionalStream 双向流
/*
1. 建立连接 获取client
2. 通过client获取stream
3. 开两个goroutine 分别用于Recv()和Send()
	3.1 一直Recv()到err==io.EOF(即服务端关闭stream)
	3.2 Send()则由自己控制
4. 发送完毕调用 stream.CloseSend()关闭stream 必须调用关闭 否则Server会一直尝试接收数据 一直报错...
*/
func bidirectionalStream(client pb.EchoClient) {
	var wg sync.WaitGroup
	// 2. 调用方法获取stream
	stream, err := client.BidirectionalStreamingEcho(context.Background())
	if err != nil {
		panic(err)
	}
	// 3.开两个goroutine 分别用于Recv()和Send()
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			req, err := stream.Recv()
			if err == io.EOF {
				fmt.Println("Server Closed")
				break
			}
			if err != nil {
				continue
			}
			fmt.Printf("Recv Data:%v \n", req.GetMessage())
		}
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()

		for i := 0; i < 2; i++ {
			err := stream.Send(&pb.EchoRequest{Message: "hello world"})
			if err != nil {
				log.Printf("send error:%v\n", err)
			}
			time.Sleep(time.Second)
		}
		// 4. 发送完毕关闭stream
		err := stream.CloseSend()
		if err != nil {
			log.Printf("Send error:%v\n", err)
			return
		}
	}()
	wg.Wait()
}

func bidirectionalStreamBattle(client pb.BattleServiceClient) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	stream, err := client.Battle(ctx)
	if err != nil {
		log.Fatalf("could not battle: %v", err)
	}
	err = stream.SendMsg(&pb.BattleRequest{
		HeroId:  "hero_1",
		SkillId: "Skill_1",
	})
	if err != nil {
		log.Fatalf("could not battle: %v", err)
	}
	err = stream.SendMsg(&pb.BattleRequest{
		HeroId:  "hero_2",
		SkillId: "Skill_2",
	})
	if err != nil {
		log.Fatalf("could not battle: %v", err)
	}
	ch := make(chan struct{})
	go asyncDoBattle(stream, ch)
	err = stream.CloseSend()
	if err != nil {
		log.Fatalf("could not battle: %v", err)
	}
	<-ch
}
func asyncDoBattle(stream pb.BattleService_BattleClient, c chan struct{}) {
	for {
		rsp, err := stream.Recv()
		if err == io.EOF {
			break
		}
		fmt.Println(rsp)
	}
	c <- struct{}{}
}

func sayHello(client pb.GreeterClient) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	// 通过命令行参数指定 name
	name := "world"
	if len(os.Args) > 1 {
		name = os.Args[1]
	}
	r, err := client.SayHello(ctx, &pb.HelloRequest{Name: name})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	log.Printf("Greeting: %s", r.GetMessage())
}

// unaryInterceptor 一个简单的 unary interceptor 示例。
func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
	// pre-processing
	start := time.Now()
	err := invoker(ctx, method, req, reply, cc, opts...) // invoking RPC method
	// post-processing
	end := time.Now()
	log.Printf("RPC: %s, req:%v start time: %s, end time: %s, err: %v", method, req, start.Format(time.RFC3339),
		end.Format(time.RFC3339), err)
	return err
}

// wrappedStream  用于包装 grpc.ClientStream 结构体并拦截其对应的方法。
type wrappedStream struct {
	grpc.ClientStream
}

func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {
	return &wrappedStream{s}
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
	log.Printf("Receive a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
	return w.ClientStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
	log.Printf("Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
	return w.ClientStream.SendMsg(m)
}

// streamInterceptor 一个简单的 stream interceptor 示例。
func streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
	s, err := streamer(ctx, desc, cc, method, opts...)
	if err != nil {
		return nil, err
	}
	return newWrappedStream(s), nil
}

client/main.go

使用的时候需要显示设置拦截器选项

代码语言:go
复制
package main

import (
	"crypto/tls"
	"crypto/x509"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/examples/data"
	"grpc-demo/helloworld/pb"
	"io/ioutil"
	"log"
)

const (
	address = "localhost:50051"
)

func main() {
	// 加载客户端证书
	certificate, err := tls.LoadX509KeyPair(data.Path("/Users/guirong/go/src/grpc-demo/helloworld/client/client.crt"),
		data.Path("/Users/guirong/go/src/grpc-demo/helloworld/client/client.key"))
	if err != nil {
		log.Fatal(err)
	}
	// 构建CertPool以校验服务端证书有效性
	certPool := x509.NewCertPool()
	ca, err := ioutil.ReadFile(data.Path("/Users/guirong/go/src/grpc-demo/helloworld/client/ca.crt"))
	if err != nil {
		log.Fatal(err)
	}
	if ok := certPool.AppendCertsFromPEM(ca); !ok {
		log.Fatal("failed to append ca certs")
	}

	cred := credentials.NewTLS(&tls.Config{
		Certificates: []tls.Certificate{certificate},
		ServerName:   "www.ggr.com", // NOTE: this is required!
		RootCAs:      certPool,
	})
	conn, err := grpc.Dial(address, grpc.WithTransportCredentials(cred), grpc.WithUnaryInterceptor(unaryInterceptor), grpc.WithStreamInterceptor(streamInterceptor))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	client := pb.NewBattleServiceClient(conn)
	bidirectionalStreamBattle(client)

	client2 := pb.NewGreeterClient(conn)
	sayHello(client2)
}

结果输出

服务端运行结果

代码语言:shell
复制
2022/09/27 19:21:32 Serving gRPC on 0.0.0.0:50051
2022/09/27 19:21:41 Receive a message (Type: *pb.BattleRequest) at 2022-09-27T19:21:41+08:00
HeroId:"hero_1" SkillId:"Skill_1" 
2022/09/27 19:21:41 Send a message (Type: *pb.BattleResponse) at 2022-09-27T19:21:41+08:00
2022/09/27 19:21:41 Receive a message (Type: *pb.BattleRequest) at 2022-09-27T19:21:41+08:00
HeroId:"hero_2" SkillId:"Skill_2" 
2022/09/27 19:21:41 Send a message (Type: *pb.BattleResponse) at 2022-09-27T19:21:41+08:00
2022/09/27 19:21:41 Receive a message (Type: *pb.BattleRequest) at 2022-09-27T19:21:41+08:00
<nil>
2022/09/27 19:21:41 Send a message (Type: *pb.BattleResponse) at 2022-09-27T19:21:41+08:00
2022/09/27 19:21:41 Received Msg: world
2022/09/27 19:21:41 RPC: /pb.Greeter/SayHello,req:name:"world" start time: 2022-09-27T19:21:41+08:00, end time: 2022-09-27T19:21:41+08:00, err: <nil>

客户端运行结果

代码语言:shell
复制
2022/09/27 19:21:41 Send a message (Type: *pb.BattleRequest) at 2022-09-27T19:21:41+08:00
2022/09/27 19:21:41 Send a message (Type: *pb.BattleRequest) at 2022-09-27T19:21:41+08:00
2022/09/27 19:21:41 Receive a message (Type: *pb.BattleResponse) at 2022-09-27T19:21:41+08:00
hero:<Id:"hero_1" Life:999 > skill:<SkillId:"skill_1" CoolDown:1664249248 > skill:<SkillId:"skill_2" CoolDown:1664249293 > 
2022/09/27 19:21:41 Receive a message (Type: *pb.BattleResponse) at 2022-09-27T19:21:41+08:00
hero:<Id:"hero_1" Life:999 > skill:<SkillId:"skill_1" CoolDown:1664249248 > skill:<SkillId:"skill_2" CoolDown:1664249293 > 
2022/09/27 19:21:41 Receive a message (Type: *pb.BattleResponse) at 2022-09-27T19:21:41+08:00

2022/09/27 19:21:41 Receive a message (Type: *pb.BattleResponse) at 2022-09-27T19:21:41+08:00
2022/09/27 19:21:41 RPC: /pb.Greeter/SayHello, req:name:"world" start time: 2022-09-27T19:21:41+08:00, end time: 2022-09-27T19:21:41+08:00, err: <nil>
2022/09/27 19:21:41 Greeting: Hello world

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 介绍
    • grpc 优势
      • grpc 缺点
      • 环境搭建
        • protoc 安装
          • gRPC 安装
            • gRPC plugins 安装
            • 使用案例
              • 定义pb文件
                • 生成go代码
                  • 编写 server 端
                    • 编写 client 端
                      • 测试验证
                      • 通信模式
                        • 1)UnaryAPI:普通一元方法
                          • 2)ServerStreaming:服务端推送流(Stream API)
                            • 案例
                          • 3)ClientStreaming:客户端推送流(Stream API)
                            • 案例
                          • 4)BidirectionalStreaming:双向推送流(Stream API)
                            • 案例说明
                        • SSL/TLS认证
                          • 案例
                            • 制作证书
                            • 服务端调整
                            • 客户端调整
                            • 测试
                        • 拦截器Interceptor
                          • 一元拦截器
                            • 流拦截器
                              • 案例
                                • server/server.go
                                • server/main.go
                                • client/client.go
                                • client/main.go
                                • 结果输出
                            相关产品与服务
                            文件存储
                            文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
                            http://www.vxiaotou.com