etcd是CoreOS团队于2013年6月发起的开源项目 它的目标是构建一个高可用的分布式键值(key-value)数据库。etcd内部采用raft协议作为一致性算法 etcd基于Go语言实现。
etcd作为服务发现系统 有以下的特点
简单 安装配置简单 而且提供了HTTP API进行交互 使用也很简单etcd项目地址 https://github.com/coreos/etcd/
etcd典型应用场景-服务发现
etcd比较多的应用场景是用于服务发现 服务发现(Service Discovery)要解决的是分布式系统中最常见的问题之一 即在同一个分布式集群中的进程或服务如何才能找到对方并建立连接。
从本质上说 服务发现就是要了解集群中是否有进程在监听upd或者tcp端口 并且通过名字就可以进行查找和链接。
要解决服务发现的问题 需要下面三大支柱 缺一不可。
一个强一致性、高可用的服务存储目录。基于Ralf算法的etcd天生就是这样一个强一致性、高可用的服务存储目录。
一种注册服务和健康服务健康状况的机制。用户可以在etcd中注册服务 并且对注册的服务配置key TTL 定时保持服务的心跳以达到监控健康状态的效果。
一种查找和连接服务的机制。通过在etcd指定的主题下注册的服务业能在对应的主题下查找到。为了确保连接 我们可以在每个服务机器上都部署一个proxy模式的etcd 这样就可以确保访问etcd集群的服务都能够互相连接。
gRPC简介gRPC是谷歌开源的一款跨平台、高性能的RPC框架。gRPC是一个现代的开源高性能RPC框架 可以在任何环境下运行。在实际开发过程中 主要使用它来进行后端微服务的开发。
在gRPC中 客户端应用程序可以像本地对象那样直接调用另一台计算机上的服务器应用程序上的方法 从而更容易创建分布式应用程序和服务。与许多RPC系统一样 gRPC基于定义服务的思想 可以通过设置参数和返回类型来远程调用方法。在服务端 实现这个接口并运行gRPC服务器来处理客户端调用。客户端提供的方法 客户端与服务端的方法相同 。
如图所示 gRPC客户端和服务端可以在各种环境中运行和相互通信 并且可以用gRPC支持的任何语言编写。因此 可以用Go语言创建一个gRPC服务器 同时供PHP客户端和Android客户端等多个客户端调用 从而突破开发语言的限制。
服务注册 register.go
/etcdAddr separated by commas
func RegisterEtcd(schema, etcdAddr, myHost string, myPort int, serviceName string, ttl int) error {
? cli, err : clientv3.New(clientv3.Config{
? ? ?Endpoints: strings.Split(etcdAddr, , ),
? })
? fmt.Println( RegisterEtcd )
? if err ! nil {
? ? ?// ? ?return fmt.Errorf( grpclb: create clientv3 client failed: %v , err)
? ? ?return fmt.Errorf( create etcd clientv3 client failed, errmsg:%v, etcd addr:%s , err, etcdAddr)
? }
? //lease
? ctx, cancel : context.WithCancel(context.Background())
? resp, err : cli.Grant(ctx, int64(ttl))
? if err ! nil {
? ? ?return fmt.Errorf( grant failed )
? }
? // ?schema:///serviceName/ip:port - ip:port
? serviceValue : net.JoinHostPort(myHost, strconv.Itoa(myPort))
? serviceKey : GetPrefix(schema, serviceName) serviceValue
? //set key- value
? if _, err : cli.Put(ctx, serviceKey, serviceValue, clientv3.WithLease(resp.ID)); err ! nil {
? ? ?return fmt.Errorf( put failed, errmsg:%v key:%s, value:%s , err, serviceKey, serviceValue)
? }
? //keepalive
? kresp, err : cli.KeepAlive(ctx, resp.ID)
? if err ! nil {
? ? ?return fmt.Errorf( keepalive faild, errmsg:%v, lease id:%d , err, resp.ID)
? }
? go func() {
? FLOOP:
? ? ?for {
? ? ? ? select {
? ? ? ? case _, ok : -kresp:
? ? ? ? ? ?if ok true {
? ? ? ? ? ?} else {
? ? ? ? ? ? ? break FLOOP
? ? ? ? ? ?}
? ? ? ? }
? ? ?}
? }()
? rEtcd RegEtcd{ctx: ctx,
? ? ?cli: ? ?cli,
? ? ?cancel: cancel,
? ? ?key: ? ?serviceKey}
? return nil
}
grpc模块在启动时调用RegisterEtcd注册 并定时lease
命名解析实现及服务发现 resolver.go
type Resolver struct {
? cc ? ? ? ? ? ? ? ? resolver.ClientConn
? serviceName ? ? ? ?string
? grpcClientConn ? ? *grpc.ClientConn
? cli ? ? ? ? ? ? ? ?*clientv3.Client
? schema ? ? ? ? ? ? string
? etcdAddr ? ? ? ? ? string
? watchStartRevision int64
}
var (
? nameResolver ? ? ? ? make(map[string]*Resolver)
? rwNameResolverMutex sync.RWMutex
)
func NewResolver(schema, etcdAddr, serviceName string) (*Resolver, error) {
? etcdCli, err : clientv3.New(clientv3.Config{
? ? ?Endpoints: strings.Split(etcdAddr, , ),
? })
? if err ! nil {
? ? ?return nil, err
? }
? var r Resolver
? r.serviceName serviceName
? r.cli etcdCli
? r.schema schema
? r.etcdAddr etcdAddr
? resolver.Register( r)
? conn, err : grpc.Dial(
? ? ?GetPrefix(schema, serviceName),
? ? ?grpc.WithDefaultServiceConfig(fmt.Sprintf( { LoadBalancingPolicy : %s } , roundrobin.Name)),
? ? ?grpc.WithInsecure(),
? ? ?grpc.WithTimeout(time.Duration(5)*time.Second),
? )
? if err nil {
? ? ?r.grpcClientConn conn
? }
? return r, err
}
func (r1 *Resolver) ResolveNow(rn resolver.ResolveNowOptions) {
}
func (r1 *Resolver) Close() {
}
func GetConn(schema, etcdaddr, serviceName string) *grpc.ClientConn {
? rwNameResolverMutex.RLock()
? r, ok : nameResolver[schema serviceName]
? rwNameResolverMutex.RUnlock()
? if ok {
? ? ?return r.grpcClientConn
? }
? rwNameResolverMutex.Lock()
? r, ok nameResolver[schema serviceName]
? if ok {
? ? ?rwNameResolverMutex.Unlock()
? ? ?return r.grpcClientConn
? }
? r, err : NewResolver(schema, etcdaddr, serviceName)
? if err ! nil {
? ? ?rwNameResolverMutex.Unlock()
? ? ?return nil
? }
? nameResolver[schema serviceName] r
? rwNameResolverMutex.Unlock()
? return r.grpcClientConn
}
func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
? if r.cli nil {
? ? ?return nil, fmt.Errorf( etcd clientv3 client failed, etcd:%s , target)
? }
? r.cc cc
? ctx, _ : context.WithTimeout(context.Background(), 5*time.Second)
? // ? ? %s:///%s
? prefix : GetPrefix(r.schema, r.serviceName)
? // get key first
? resp, err : r.cli.Get(ctx, prefix, clientv3.WithPrefix())
? if err nil {
? ? ?var addrList []resolver.Address
? ? ?for i : range resp.Kvs {
? ? ? ? fmt.Println( init addr: , string(resp.Kvs[i].Value))
? ? ? ? addrList append(addrList, resolver.Address{Addr: string(resp.Kvs[i].Value)})
? ? ?}
? ? ?r.cc.UpdateState(resolver.State{Addresses: addrList})
? ? ?r.watchStartRevision resp.Header.Revision 1
? ? ?go r.watch(prefix, addrList)
? } else {
? ? ?return nil, fmt.Errorf( etcd get failed, prefix: %s , prefix)
? }
? return r, nil
}
func (r *Resolver) Scheme() string {
? return r.schema
}
func exists(addrList []resolver.Address, addr string) bool {
? for _, v : range addrList {
? ? ?if v.Addr addr {
? ? ? ? return true
? ? ?}
? }
? return false
}
func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
? for i : range s {
? ? ?if s[i].Addr addr {
? ? ? ? s[i] s[len(s)-1]
? ? ? ? return s[:len(s)-1], true
? ? ?}
? }
? return nil, false
}
func (r *Resolver) watch(prefix string, addrList []resolver.Address) {
? rch : r.cli.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithPrefix())
? for n : range rch {
? ? ?flag : 0
? ? ?for _, ev : range n.Events {
? ? ? ? switch ev.Type {
? ? ? ? case mvccpb.PUT:
? ? ? ? ? ?if !exists(addrList, string(ev.Kv.Value)) {
? ? ? ? ? ? ? flag 1
? ? ? ? ? ? ? addrList append(addrList, resolver.Address{Addr: string(ev.Kv.Value)})
? ? ? ? ? ? ? fmt.Println( after add, new list: , addrList)
? ? ? ? ? ?}
? ? ? ? case mvccpb.DELETE:
? ? ? ? ? ?fmt.Println( remove addr key: , string(ev.Kv.Key), value: , string(ev.Kv.Value))
? ? ? ? ? ?i : strings.LastIndexAny(string(ev.Kv.Key), / )
? ? ? ? ? ?if i 0 {
? ? ? ? ? ? ? return
? ? ? ? ? ?}
? ? ? ? ? ?t : string(ev.Kv.Key)[i 1:]
? ? ? ? ? ?fmt.Println( remove addr key: , string(ev.Kv.Key), value: , string(ev.Kv.Value), addr: , t)
? ? ? ? ? ?if s, ok : remove(addrList, t); ok {
? ? ? ? ? ? ? flag 1
? ? ? ? ? ? ? addrList s
? ? ? ? ? ? ? fmt.Println( after remove, new list: , addrList)
? ? ? ? ? ?}
? ? ? ? }
? ? ?}
? ? ?if flag 1 {
? ? ? ? r.cc.UpdateState(resolver.State{Addresses: addrList})
? ? ? ? fmt.Println( update: , addrList)
? ? ?}
? }
}
客户端先通过GetConn获取conn 然后再调用grpc服务 调用后不用关闭conn
服务端示例代码 server.go
getcdv3.RegisterEtcd ( sk , etcdAddr, 127.0.0.1 , port, myrpc1 , 10)
s : grpc.NewServer()
helloworld.RegisterHelloServer(s, server{})
s.Serve(listener)
客户端示例代码 client.go
p : getcdv3.GetConn( sk , etcdAddr, myrpc1 )
client : helloworld.NewHelloClient(p)
resp1, err : client.SayHello(context.Background(), helloworld.HelloReq{Req: world })
总结 OpenIM集成此插件 实现了轻量级的服务发现机制 打造了基于集群的IM服务 各模块很方便平行扩展 方便运维。在使用grpc、etcd过程中 特别注意版本兼容问题 具体可以参考OpenIM的go.mod文件
更多阅读
基于Tablestore Timeline的IM 即时通讯 消息系统架构 - 架构篇
【原创】开源OpenIM 高性能、可伸缩、易扩展的即时通讯架构
在TOP云(zuntop.com)科技租赁过服务器的站长都知道独立服务器在价格上比VPS主...
中国最?好的一朵云飘进了华瑞银行。阿里云将进一步助力华瑞银行All in Cloud。 -...
2020年对于云计算行业来说是突破性的一年,因为公共云供应商增加了收入,而疫情...
最近,DevOps的采用导致了企业计算的重大转变。除无服务器计算,动态配置和即付...
一、PostgreSQL行业位置 一 行业位置 首先我们看一看RDS PostgreSQL在整个行业当...
很长时间没有更新原创文章了,但是还一直在思考和沉淀当中,后面公众号会更频繁...
定义 this是函数运行时自动生成的内部对象,即调用函数的那个对象。(不一定很准...
9月17日,2020云栖大会上,阿里云正式发布工业大脑3.0。 阿里云智能资深产品专家...
查看表结构,sbtest1有主键、k_1二级索引、i_c二级索引 CREATE TABLE `sbtest1` ...
本文转载自网络,原文链接:https://mp.weixin.qq.com/s/vlOUg46B5bcmToX-fjavJQ...