controller内部有个内存cache,cache 一般和lister/ indexer 一起配合使用, 用一个 Indexer interface进行的包装
type fooCache struct {
lister listers.FooLister
indexer cache.Indexer
}
type cache struct {
// cacheStorage bears the burden of thread safety for the cache
cacheStorage ThreadSafeStore
// keyFunc is used to make the key for objects stored in and retrieved from items, and
// should be deterministic.
keyFunc KeyFunc
}
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
items: map[string]interface{}{},
indexers: indexers,
indices: indices,
}
}
可以看出 外部使用的 fooCache -> cache.Indexer -> cache -> KeyFunc+ThreadSafeStore -> map+indexers+indices (参考 thread_safe_store.go)
实际存的时候会 使用 keyFunc 存到 map,同时更新所有的 indices
Indexer是一个 Interface,对cache 增加了 index的能力,即索引
// Indexer is a storage interface that lets you list objects using multiple indexing functions
type Indexer interface {
Store
// Retrieve list of objects that match on the named indexing function
Index(indexName string, obj interface{}) ([]interface{}, error)
// IndexKeys returns the set of keys that match on the named indexing function.
IndexKeys(indexName, indexKey string) ([]string, error)
// ListIndexFuncValues returns the list of generated values of an Index func
ListIndexFuncValues(indexName string) []string
// ByIndex lists object that match on the named indexing function with the exact key
ByIndex(indexName, indexKey string) ([]interface{}, error)
// GetIndexer return the indexers
GetIndexers() Indexers
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are ?developer/article/1517498/undefined.
AddIndexers(newIndexers Indexers) error
}
一个 store 比如 NewThreadSafeStore,是一个内存 map实现,本来就有 index的能力,那么增加这个复杂的 indexer是为什么呢?
indexer是为了从不同维度查询内存数据更方便设计的,比如 IndexByPodNodeName, 通过他可以用 node name 查询所有的 pod.
里面有很多概念,理解几个概念就能很快理解:
kubernetes 里面主要用到的IndexFunc有(其实用得不多,设计通用了,复杂性增加了不少)
注意区别这FIFO 和 WorkQueue,在kubernetes contoller 语意里面他们处于不同的位置,FIFO 用于 cache层,连接reflector和local cache。而 worker queue则把变化提供给controller上层的 worker处理,Fifo的功能比较简单,一般都是用 deltaFifo;而worker queue有很多种,比如通用队列、限速队列、延时队列,用于满足上层的应用需求
deltaFifo 是一种 fifo,fifo实现了 store + Pop/AddIfNotPresent
deltaFifo 使用 mapstringDeltas 存储数据,delta是一种object同时带 DeltaType,这样对于一个 object key下面的 object变化都会归类到同一个key下面的 Deltas
deltafifo 的 knownObjects KeyListerGetter 即Indexer,即用户可见的 Localcache 内容,deltaFifo 像是对Local cache的一个管道,需要对比 LocalCache里面的对象中的一些状态来简化、合并一些变化。
type DeltaType string
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
// The other types are obvious. You'll get Sync deltas when:
// * A watch expires/errors out and a new list/watch cycle is started.
// * You've turned on periodic syncs.
// (Anything that trigger's DeltaFIFO's Replace() method.)
Sync DeltaType = "Sync"
)
// Delta is the type stored by a DeltaFIFO. It tells you what change
// happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
// state of the object before it was deleted.
type Delta struct {
Type DeltaType
Object interface{}
}
// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta
delta fifo的一个好处是,有些delta可以合并,比如:两个连续的删除
参考 https://blog.csdn.net/weixin_42663840/article/details/81482553
// 代码源自client-go/tools/cache/reflector.go
type Reflector struct {
name string // 名字
metrics *reflectorMetrics // 监控
expectedType reflect.Type // 反射的类型,也就是要监控的对象类型,比如Pod
store Store // 存储 -> DeltaFIFO
listerWatcher ListerWatcher // 从 apiserver 获取资源, 使用 rest api
period time.Duration // 反射器在List和Watch的时候理论上是死循环,只有出现错误才会退出
// 这个变量用在出错后多长时间再执行List和Watch,默认值是1秒钟
resyncPeriod time.Duration // 重新同步的周期,很多人肯定认为这个同步周期指的是从apiserver的同步周期
// 其实这里面同步指的是shared_informer使用者需要定期同步全量对象
ShouldResync func() bool // 是否需要同步
clock clock.Clock // 时钟
lastSyncResourceVersion string // 最后一次同步的资源版本
lastSyncResourceVersionMutex sync.RWMutex // 最后一次同步的资源版本锁
}
核心代码
// 代码源自client-go/tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
var resourceVersion string
// 列举全部对象
options := metav1.ListOptions{ResourceVersion: "0"}
list, err := r.listerWatcher.List(options)
// ...略
items, err := meta.ExtractList(list)
// ...略
// 同步到DeltaFIFO中
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
// ...略
// watch逻辑
for {
// ...略
w, err := r.listerWatcher.Watch(options)
// watch返回是流,apiserver会将变化的资源通过这个流发送出来,client-go最终通过chan实现的
// 所以watchHandler()是一个需要持续从chan读取数据的流程,所以需要传入resyncerrc和stopCh
// 用于异步通知退出或者后台同步协程错误
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
// ...略
}
}
}
// List 之后会 syncWith
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
// ... 略
// 直接调用了DeltaFIFO的Replace()接口,这个接口就是用于同步全量对象的
return r.store.Replace(found, resourceVersion)
}
// 实现从watch返回的chan中持续读取变化的资源,并转换为DeltaFIFO相应的调用
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
// 这里就开始无限循环的从chan中读取资源的变化,也可以理解为资源的增量变化,同时还要监控各种信号
loop:
for {
select {
// ...略
case event, ok := <-w.ResultChan():
// 如果不OK,说明chan关闭了,就要重新获取,这里面我们可以推测这个chan可能会运行过程中重新创建
// 否则就应该退出而不是继续循环
if !ok {
break loop
}
// 看来event可以作为错误的返回值,而不是通过关闭chan,这种方式可以传递错误信息
if event.Type == watch.Error {
return apierrs.FromObject(event.Object)
}
// ...略
// 根据事件的类型做不同的DeltaFIFO的操作
switch event.Type {
// 向DeltaFIFO添加一个添加的Delta
case watch.Added:
err := r.store.Add(event.Object)
// 更新对象,向DeltaFIFO添加一个更新的Delta
case watch.Modified:
err := r.store.Update(event.Object)
}
// 删除对象,向DeltaFIFO添加一个删除的Delta
case watch.Deleted:
err := r.store.Delete(event.Object)
}
}
// 更新最新资源版本
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}
// watch返回时间非常短而且没有任何事件要处理,这个属于异常现象,因为我们watch是设置了超时的
watchDuration := r.clock.Now().Sub(start)
if watchDuration < 1*time.Second && eventCount == 0 {
r.metrics.numberOfShortWatches.Inc()
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
}
return nil
}
这里 Controller 和 IndexerInformer是一回事,NewIndexerInformer/NewInformer 返回的就是一个 Contoller
Controller的作用很简单,就是连接 Reflector 和 Handller, IndexerInformer/Informer 连接的同时还会在 clientState 做同步,clientState 就是LocalCache =》 clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
核心代码
func newInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
clientState Store,
) Controller {
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err
}
h.OnUpdate(old, d.Object)
} else {
if err := clientState.Add(d.Object); err != nil {
return err
}
h.OnAdd(d.Object)
}
case Deleted:
if err := clientState.Delete(d.Object); err != nil {
return err
}
h.OnDelete(d.Object)
}
}
return nil
},
}
return New(cfg)
}
有Informer已经够用了,但是如果client新建了很多 Informer,那么client state也就是 local cache就会存很多重复数据,这样是一种浪费,SharedInformer 是来解决这个问题的, 解决办法是 共享index存储,同时事件对 listener多次分发。(client 直接使用SharedInformer效率更高)
// SharedInformer has a shared data cache and is capable of distributing notifications for changes
// to the cache to multiple listeners who registered via AddEventHandler. If you use this, there is
// one behavior change compared to a standard Informer. When you receive a notification, the cache
// will be AT LEAST as fresh as the notification, but it MAY be more fresh. You should NOT depend
// on the contents of the cache exactly matching the notification you've received in handler
// functions. If there was a create, followed by a delete, the cache may NOT have your item. This
// has advantages over the broadcaster since it allows us to share a common cache across many
// controllers. Extending the broadcaster would have required us keep duplicate caches for each
// watch.
type SharedInformer interface {
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
// period. Events to a single handler are delivered sequentially, but there is no coordination
// between different handlers.
AddEventHandler(handler ResourceEventHandler)
// AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
// specified resync period. Events to a single handler are delivered sequentially, but there is
// no coordination between different handlers.
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
// GetStore returns the Store.
GetStore() Store
// GetController gives back a synthetic interface that "votes" to start the informer
GetController() Controller
// Run starts the shared informer, which will be stopped when stopCh is closed.
Run(stopCh <-chan struct{})
// HasSynced returns true if the shared informer's store has synced.
HasSynced() bool
// LastSyncResourceVersion is the resource version observed when last synced with the underlying
// store. The value returned is not synchronized with access to the underlying store and is not
// thread-safe.
LastSyncResourceVersion() string
}
核心代码
// 代码源自client-go/tools/cache/shared_informer.go
// 分发,写放大
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
// ... 略
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
// 这个函数是通过sharedProcessor利用wait.Group启动的
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
// nextCh是在这里,函数退出前析构的
defer close(p.nextCh)
// 临时变量,下面会用到
var nextCh chan<- interface{}
var notification interface{}
// 进入死循环啦
for {
select {
// 有两种情况,nextCh还没有初始化,这个语句就会被阻塞,这个我在《深入浅出golang之chan》说过
// nextChan后面会赋值为p.nextCh,因为p.nextCh也是无缓冲的chan,数据不发送成功就阻塞
case nextCh <- notification:
// 如果发送成功了,那就从缓冲中再取一个事件出来
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok {
// 如果没有事件,那就把nextCh再次设置为nil,接下来对于nextCh操作还会被阻塞
nextCh = nil
}
// 从p.addCh读取一个事件出来,这回看到消费p.addCh的地方了
case notificationToAdd, ok := <-p.addCh:
// 说明p.addCh关闭了,只能退出
if !ok {
return
}
// notification为空说明当前还没发送任何事件给处理器
if notification == nil {
// 那就把刚刚获取的事件通过p.nextCh发送个处理器
notification = notificationToAdd
nextCh = p.nextCh
} else {
// 上一个事件还没有发送成功,那就先放到缓存中
// pendingNotifications可以想象为一个slice,这样方便理解,是一个动态的缓存,
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
这里面对逻辑有点绕,简单来说 就是 reflector -> deltafifo -> processor.distribute (indexer也同步一份) -> listeners.add -> processorListener.addCh -> processorListener.nextCh -> p.handler 处理
addCh, nextCh两个chan的作用是保证 addCh 不阻塞
总结
SharedInformerFactory 是 SharedInformer的工厂类,用于简化生产各种SharedInformer
具体的 SharedIndexInformer 构建一般都是代码自动生成
// 代码源自client-go/informers/internalinterfaces/factory_interfaces.go
type SharedInformerFactory interface {
// 核心逻辑函数,类似于很多类的Run()函数
Start(stopCh <-chan struct{})
// 这个很关键,通过对象类型,返回SharedIndexInformer,这个SharedIndexInformer管理的就是指定的对象
// NewInformerFunc用于当SharedInformerFactory没有这个类型的Informer的时候创建使用
InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}
// 创建Informer的函数定义,这个函数需要apiserver的客户端以及同步周期,这个同步周期在SharedInformers反复提到
type NewInformerFunc func(kubernetes.Interface, time.Duration) cache.SharedIndexInformer
// 代码源自client-go/informers/factory.go
// 其实sharedInformerFactory的Start()函数就是启动所有具体类型的Informer的过程
// 因为每个类型的Informer都是SharedIndexInformer,需要需要把每个SharedIndexInformer都要启动起来
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
// 加锁操作
f.lock.Lock()
defer f.lock.Unlock()
// 遍历informers这个map
for informerType, informer := range f.informers {
// 看看这个Informer是否已经启动过
if !f.startedInformers[informerType] {
// 如果没启动过,那就启动一个协程执行SharedIndexInformer的Run()函数,我们在分析SharedIndexInformer的时候
// 我们知道知道Run()是整个Informer的启动入口点,看了《深入浅出kubernetes之client-go的SharedInformer》
// 的同学应该会想Run()是谁调用的呢?这里面应该给你们答案了吧?
go informer.Run(stopCh)
// 设置Informer已经启动的标记
f.startedInformers[informerType] = true
}
}
}
SharedInformerFactory一共有两类用户参考自:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。