Reflect 概述
Reflector 从 kube-apiserver 中 list&watch 资源对象,用于监听指定资源的 Kubernetes 。当资源对象发生变化时(如:添加和删除等事件),Reflector 会将其这些资源对象的变化包装成Delta并将其丢到DeltaFIFO中。其实就是将 Etcd 的对象及其变化反射到DeltaFIFO中,实时更新本地缓存,确保本地数据和 ETCD 数据一致。
源码位置:k8s.io/client-go/tools/cache/reflector.go
(1)Reflector 它的数据结构如下:
type Reflector struct {name stringexpectedTypeName stringexpectedType reflect.Type // 放到Store中(即DeltaFIFO中)的对象类型expectedGVK *schema.GroupVersionKindstore Store // 与 Watch 源同步的⽬标,会赋值为 DeltaFIFOlisterWatcher ListerWatcher // ListerWatcher是个interface(含list和watch)backoffManager wait.BackoffManagerinitConnBackoffManager wait.BackoffManagerMaxInternalErrorRetryDuration time.DurationresyncPeriod time.Duration // 重新同步周期ShouldResync func() boolclock clock.ClockpaginatedResult boollastSyncResourceVersion stringisLastSyncResourceVersionUnavailable boollastSyncResourceVersionMutex sync.RWMutexWatchListPageSize int64watchErrorHandler WatchErrorHandler
}
(2)Reflector 初始化
通过 NewReflector 实例化 Reflector 对象,在实例中需要传入的 ListerWatcher 数据接口对象,这个包含核心 List 和 Watch 方法,主要是负责 List 和 Watch 指定的 Kubernetes APIServer 资源。
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { realClock := &clock.RealClock{} r := &Reflector{ name: name, listerWatcher: lw, store: store, initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), resyncPeriod: resyncPeriod, clock: realClock, watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), } r.setExpectedType(expectedType) return r
}
(3)ListerWatcher interface
type Lister interface { List(options metav1.ListOptions) (runtime.Object, error)
} type Watcher interface {
Watch(options metav1.ListOptions) (watch.Interface, error)
} type ListerWatcher interface { Lister Watcher
}
(4)ListWatch struct
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)type ListWatch struct { ListFunc ListFunc WatchFunc WatchFunc DisableChunking bool
}
(5)Reflector 启动
创建 Reflector 对象后, Run 方法启动监听并处理事件,通过 wait.BackoffUntil 不断调用 ListAndWatch 方法,如果该方法 return 了,那么就会发生re-list,watch过程则被嵌套在for循环中。 Run() 中最核心的就是 List-Watch 方法。
func (r *Reflector) Run(stopCh <-chan struct{}) { klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) wait.BackoffUntil(func() { if err := r.ListAndWatch(stopCh); err != nil { r.watchErrorHandler(r, err) } }, r.backoffManager, true, stopCh) klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
ListAndWatch 核心代码:
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { // ...// list 获取资源下的所有对象的数据 err := r.list(stopCh) if err != nil { return err } // go 部分go func() { // 返回重新同步的定时通道 resyncCh, cleanup := r.resyncChan() // ...for { select { case <-resyncCh: case <-stopCh: return case <-cancelCh: return } // 判断是否需要执行Resync操作,即重新同步 if r.ShouldResync == nil || r.ShouldResync() { klog.V(4).Infof("%s: forcing resync", r.name) // Resync 机制会将本地存储(LocalStore)的资源对象同步到 DeltaFIFO 中 if err := r.store.Resync(); err != nil { resyncerrc <- err return } } cleanup() // 重新启⽤定时器定时触发 resyncCh, cleanup = r.resyncChan() } }() // for 部分 for { // 1、stopCh处理,判断是否需要退出循环 select { case <-stopCh: return nil default: } // 2、将resourceVersion为最新的resourceVersion,即从list回来的最新resourceVersion开始执行watch操作timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) options := metav1.ListOptions{ ResourceVersion: r.LastSyncResourceVersion(),TimeoutSeconds: &timeoutSeconds,AllowWatchBookmarks: true} // 3、 开始监听start := r.clock.Now() w, err := r.listerWatcher.Watch(options)// 4、Reflctor 组件的功能: 事件处理函数 // 事件处理函数,当触发增删改时,将对应的资源对象更新到本地缓存 DeltaFIFO,并设置 ResouceVersion 最新 err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh) // ... }
}
r.watchHandler() 函数:
watchHandler()
函数是 reflector
组件的一个重要函数,它负责监听Kubernetes API server中的对象变更事件。如当触发增删改时,将对应的资源对象更新到本地缓存 DeltaFIFO,并设置 ResouceVersion 最新。
主要逻辑:
(1)从watch操作返回来的结果中获取event事件;
(2)接收到每个事件后,watchHandler()
函数会判断该事件是否为错误事件及根据事件类型作出处理;
(3)获得当前watch到资源的ResourceVersion;
(4)判断不同类型的event事件作出相应处理;
(5)调用r.setLastSyncResourceVersion,更新Reflector对象中存储的最新的资源版本号。
循环操作,直至event事件处理完毕。
func watchHandler(start time.Time, w watch.Interface, store Store, expectedType reflect.Type, expectedGVK *schema.GroupVersionKind, name string, expectedTypeName string, setLastSyncResourceVersion func(string), clock clock.Clock, errc chan error, stopCh <-chan struct{},
) error { eventCount := 0 // Stopping the watcher should be idempotent and if we return from this function there's no way // we're coming back in with the same watch interface. defer w.Stop() loop: for { select { case <-stopCh: return errorStopRequested case err := <-errc: return err case event, ok := <-w.ResultChan(): if !ok { // 错误事件,可能与客户端的连接已断开,则重试机制下尝试重新连接break loop } if event.Type == watch.Error { return apierrors.FromObject(event.Object) } if expectedType != nil { if e, a := expectedType, reflect.TypeOf(event.Object); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a)) continue } } if expectedGVK != nil { if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a)) continue } } meta, err := meta.Accessor(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event)) continue } // 获得当前watch到资源的ResourceVersionresourceVersion := meta.GetResourceVersion() switch event.Type { // 不同类型的event事件,调用不同函数处理。如事件为Added则调用 store.Add 处理case watch.Added: err := store.Add(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err)) } case watch.Modified: err := store.Update(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err)) } case watch.Deleted: // TODO: Will any consumers need access to the "last known // state", which is passed in event.Object? If so, may need // to change this. err := store.Delete(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err)) } case watch.Bookmark: // A `Bookmark` means watch has synced here, just update the resourceVersion default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event)) } // 记录Reflector对象已经处理过的最新的资源版本号,以便在下次请求资源数据时能够从该版本号开始监听资源变更。setLastSyncResourceVersion(resourceVersion) if rvu, ok := store.(ResourceVersionUpdater); ok { rvu.UpdateResourceVersion(resourceVersion) } eventCount++ } } watchDuration := clock.Since(start) if watchDuration < 1*time.Second && eventCount == 0 { return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name) } klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount) return nil
}
到这里 Reflector 组件的功能基本就结束了,接下来分析 DeltaFIFO 组件。