k8s-Informer之Reflector的解析

Reflect 概述

Reflector 从 kube-apiserver 中 list&watch 资源对象,用于监听指定资源的 Kubernetes 。当资源对象发生变化时(如:添加和删除等事件),Reflector 会将其这些资源对象的变化包装成Delta并将其丢到DeltaFIFO中。其实就是将 Etcd 的对象及其变化反射到DeltaFIFO中,实时更新本地缓存,确保本地数据和 ETCD 数据一致。

源码位置:k8s.io/client-go/tools/cache/reflector.go
(1)Reflector 它的数据结构如下:

 type Reflector struct {name stringexpectedTypeName stringexpectedType reflect.Type // 放到Store中(即DeltaFIFO中)的对象类型expectedGVK *schema.GroupVersionKindstore Store // 与 Watch 源同步的⽬标,会赋值为 DeltaFIFOlisterWatcher ListerWatcher // ListerWatcher是个interface(含list和watch)backoffManager wait.BackoffManagerinitConnBackoffManager wait.BackoffManagerMaxInternalErrorRetryDuration time.DurationresyncPeriod time.Duration // 重新同步周期ShouldResync func() boolclock clock.ClockpaginatedResult boollastSyncResourceVersion stringisLastSyncResourceVersionUnavailable boollastSyncResourceVersionMutex sync.RWMutexWatchListPageSize int64watchErrorHandler WatchErrorHandler
}

(2)Reflector 初始化

通过 NewReflector 实例化 Reflector 对象,在实例中需要传入的 ListerWatcher 数据接口对象,这个包含核心 List 和 Watch 方法,主要是负责 List 和 Watch 指定的 Kubernetes APIServer 资源。


func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {  return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)  
}// NewNamedReflector same as NewReflector, but with a specified name for logging  
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {  realClock := &clock.RealClock{}  r := &Reflector{  name:          name,  listerWatcher: lw,  store:         store,  initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),  resyncPeriod:           resyncPeriod,  clock:                  realClock,  watchErrorHandler:      WatchErrorHandler(DefaultWatchErrorHandler),  }  r.setExpectedType(expectedType)  return r  
}

(3)ListerWatcher interface

type Lister interface {  List(options metav1.ListOptions) (runtime.Object, error)  
}  type Watcher interface {  
Watch(options metav1.ListOptions) (watch.Interface, error)  
}  type ListerWatcher interface {  Lister  Watcher
}

(4)ListWatch struct

type ListFunc func(options metav1.ListOptions) (runtime.Object, error)type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)type ListWatch struct {  ListFunc  ListFunc  WatchFunc WatchFunc  DisableChunking bool  
}

(5)Reflector 启动

创建 Reflector 对象后, Run 方法启动监听并处理事件,通过 wait.BackoffUntil 不断调用 ListAndWatch 方法,如果该方法 return 了,那么就会发生re-list,watch过程则被嵌套在for循环中。 Run() 中最核心的就是 List-Watch 方法。

func (r *Reflector) Run(stopCh <-chan struct{}) {  klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)  wait.BackoffUntil(func() {  if err := r.ListAndWatch(stopCh); err != nil {  r.watchErrorHandler(r, err)  }  }, r.backoffManager, true, stopCh)  klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)  
}

ListAndWatch 核心代码:

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {  // ...// list 获取资源下的所有对象的数据  err := r.list(stopCh)  if err != nil {  return err  }  // go 部分go func() {  // 返回重新同步的定时通道  resyncCh, cleanup := r.resyncChan()  // ...for {  select {  case <-resyncCh:  case <-stopCh:  return  case <-cancelCh:  return  }  // 判断是否需要执行Resync操作,即重新同步  if r.ShouldResync == nil || r.ShouldResync() {  klog.V(4).Infof("%s: forcing resync", r.name)  // Resync 机制会将本地存储(LocalStore)的资源对象同步到 DeltaFIFO 中  if err := r.store.Resync(); err != nil {  resyncerrc <- err  return  }  }  cleanup()  // 重新启⽤定时器定时触发  resyncCh, cleanup = r.resyncChan()  }  }()  // for 部分 for {  // 1、stopCh处理,判断是否需要退出循环 select {  case <-stopCh:  return nil  default:  }  // 2、将resourceVersion为最新的resourceVersion,即从list回来的最新resourceVersion开始执行watch操作timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))  options := metav1.ListOptions{  ResourceVersion: r.LastSyncResourceVersion(),TimeoutSeconds: &timeoutSeconds,AllowWatchBookmarks: true}   // 3、 开始监听start := r.clock.Now()  w, err := r.listerWatcher.Watch(options)// 4、Reflctor 组件的功能: 事件处理函数  // 事件处理函数,当触发增删改时,将对应的资源对象更新到本地缓存 DeltaFIFO,并设置 ResouceVersion 最新  err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)  // ... }  
}

r.watchHandler() 函数:

watchHandler()函数是 reflector组件的一个重要函数,它负责监听Kubernetes API server中的对象变更事件。如当触发增删改时,将对应的资源对象更新到本地缓存 DeltaFIFO,并设置 ResouceVersion 最新。

主要逻辑:
(1)从watch操作返回来的结果中获取event事件;
(2)接收到每个事件后,watchHandler()函数会判断该事件是否为错误事件及根据事件类型作出处理;
(3)获得当前watch到资源的ResourceVersion;
(4)判断不同类型的event事件作出相应处理;
(5)调用r.setLastSyncResourceVersion,更新Reflector对象中存储的最新的资源版本号。

循环操作,直至event事件处理完毕。


func watchHandler(start time.Time,  w watch.Interface,  store Store,  expectedType reflect.Type,  expectedGVK *schema.GroupVersionKind,  name string,  expectedTypeName string,  setLastSyncResourceVersion func(string),  clock clock.Clock,  errc chan error,  stopCh <-chan struct{},  
) error {  eventCount := 0  // Stopping the watcher should be idempotent and if we return from this function there's no way  // we're coming back in with the same watch interface.   defer w.Stop()  loop:  for {  select {  case <-stopCh:  return errorStopRequested  case err := <-errc:  return err  case event, ok := <-w.ResultChan():  if !ok {  // 错误事件,可能与客户端的连接已断开,则重试机制下尝试重新连接break loop  }  if event.Type == watch.Error {  return apierrors.FromObject(event.Object)  }  if expectedType != nil {  if e, a := expectedType, reflect.TypeOf(event.Object); e != a {  utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))  continue  }  }  if expectedGVK != nil {  if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {  utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))  continue  }  }  meta, err := meta.Accessor(event.Object)  if err != nil {  utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))  continue  }  // 获得当前watch到资源的ResourceVersionresourceVersion := meta.GetResourceVersion()  switch event.Type {  // 不同类型的event事件,调用不同函数处理。如事件为Added则调用 store.Add 处理case watch.Added:  err := store.Add(event.Object)  if err != nil {  utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))  }  case watch.Modified:  err := store.Update(event.Object)  if err != nil {  utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))  }  case watch.Deleted:  // TODO: Will any consumers need access to the "last known  // state", which is passed in event.Object? If so, may need  // to change this.            err := store.Delete(event.Object)  if err != nil {  utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))  }  case watch.Bookmark:  // A `Bookmark` means watch has synced here, just update the resourceVersion  default:  utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))  }  // 记录Reflector对象已经处理过的最新的资源版本号,以便在下次请求资源数据时能够从该版本号开始监听资源变更。setLastSyncResourceVersion(resourceVersion)  if rvu, ok := store.(ResourceVersionUpdater); ok {  rvu.UpdateResourceVersion(resourceVersion)  }  eventCount++  }  }  watchDuration := clock.Since(start)  if watchDuration < 1*time.Second && eventCount == 0 {  return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)  }  klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount)  return nil  
}

到这里 Reflector 组件的功能基本就结束了,接下来分析 DeltaFIFO 组件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/36256.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Android仿美团左右联动购物列表

Android仿美团左右联动购物列表 左右联动购物列表&#xff0c;不难。 一、思路&#xff1a; 两个RecycleView 二、效果图&#xff1a; 三、关键代码&#xff1a; public class MainActivity extends AppCompatActivity {private RecyclerView rl_left;private RecyclerVie…

Mitel MiCollab 企业协作平台 任意文件读取漏洞复现(CVE-2024-41713)

0x01 产品简介 Mitel MiCollab是加拿大Mitel(敏迪)公司推出的一款企业级协作平台,旨在为企业提供统一、高效、安全的通信与协作解决方案。通过该平台,员工可以在任何时间、任何地点,使用任何设备,实现即时通信、语音通话、视频会议、文件共享等功能,从而提升工作效率和…

深度学习camp-第J3-1周:DenseNet算法 实现乳腺癌识别

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 我的环境 语言环境&#xff1a;Python 3.12编译器&#xff1a;Jupyter Lab深度学习环境&#xff1a;Pytorch 2.4.1 Torchvision 0.19.1数据集&#xff1a;乳腺…

Elasticsearch 单节点安全配置与用户认证

Elasticsearch 单节点安全配置与用户认证 安全扫描时发现了一个高危漏洞&#xff1a;Elasticsearch 未授权访问 。在使用 Elasticsearch 构建搜索引擎或处理大规模数据时&#xff0c;需要启用基本的安全功能来防止未经授权的访问。本文将通过简单的配置步骤&#xff0c;为单节…

Vulhub:Shiro[漏洞复现]

目录 CVE-2010-3863(Shiro未授权) 使用浏览器访问靶场主页面 使用Yakit进行抓包 使用ffuf对靶机8080端口进行根路径FUZZ CVE-2016-4437(Shiro-550) 使用浏览器访问靶场主页面 使用Yakit进行抓包 使用Yakit反连中自带的Yso-Java Hack进行漏洞利用 首先运行脚本生成一个…

数学拯救世界(一)———寻“数”记

一、 很久很久以前&#xff0c;在一个只认识整数和小数的国度&#xff0c;有一个很残暴的国王提了一个要求&#xff1a;要是不能表示出把一段1米的绳子三等分后的大小&#xff0c;就要把所有的大臣杀掉。 1➗3 0.333&#xff0c;怎么办呀&#xff1f;怎么办呀&#xff1f; 袁q…

Codeforces Round 991 (Div. 3)题解

先随随便便写一点东西吧&#xff0c;毕竟只是一场div3 A. Line Breaks 思路&#xff1a;一道很简单的模拟题吧&#xff0c;就是遍历一遍&#xff0c;当大于x的时候就break&#xff0c;然后前面那个就是找到的前x个字的总长度不超过m #include<bits/stdc.h> using names…

掌握谈判技巧,达成双赢协议

在当今竞争激烈且合作频繁的社会环境中&#xff0c;谈判成为了我们解决分歧、谋求共同发展的重要手段。无论是商业合作、职场交流&#xff0c;还是国际事务协商&#xff0c;掌握谈判技巧以达成双赢协议都具有极其关键的意义。它不仅能够让各方在利益分配上找到平衡点&#xff0…

基于Matlab特征提取与浅层神经网络的数字图像处理乳腺癌检测系统(GUI界面+训练代码+数据集)

本研究提出了一种结合数字图像处理技术、特征提取与浅层神经网络的创新癌症检测系统&#xff0c;旨在为医学图像的分析和早期癌症检测提供有效支持。系统主要处理癌症与正常组织的医学图像&#xff0c;通过灰度共生矩阵&#xff08;GLCM&#xff09;等方法&#xff0c;从图像中…

Backblaze 2024 Q3硬盘故障质量报告解读

作为一家在2021年在美国纳斯达克上市的云端备份公司&#xff0c;Backblaze一直保持着对外定期发布HDD和SSD的故障率稳定性质量报告&#xff0c;给大家提供了一份真实应用场景下的稳定性分析参考数据&#xff1a; 以往报告解读系列参考&#xff1a; Backblaze发布2024 Q2硬盘故障…

河工oj第七周补题题解2024

A.GO LecturesⅠ—— Victory GO LecturesⅠ—— Victory - 问题 - 软件学院OJ 代码 统计 #include<bits/stdc.h> using namespace std;double b, w;int main() {for(int i 1; i < 19; i ) {for(int j 1; j < 19; j ) {char ch; cin >> ch;if(ch B) b …

[ABC234A] Weird Function

解题思路 这是一道模拟题…… 设置一个函数 &#xff0c;返回值为 。 最后答案就是 。 代码 记得开 long long ! #include<bits/stdc.h> using namespace std;long long t; long long f(long long x) {return x*xx*23; }int main() {cin>>t;cout<<f(f(f…

蓝牙键鼠无法被电脑识别

起因是我的键鼠是三模的&#xff0c;但是我蓝牙模式我只用过几次&#xff0c;基本一直使用的是有线模式&#xff0c;最近突然要用无线连接&#xff0c;如果使用收发器就显得过于繁琐&#xff0c;还占用usb口&#xff0c;因此想用蓝牙连&#xff0c;但是由于 win10更新了英特尔…

【C#设计模式(18)——中介者模式(Mediator Pattern)】

前言 中介者模式&#xff1a;是两者之间通过第三者来帮助传话。 代码 //抽象接收者public abstract class Receiver{protected Mediator mediator;protected Receiver(Mediator mediator){this.mediator mediator;}public abstract void SendMessage(string message);public a…

动态计算加载图片

学习啦 别名路径&#xff1a;①npm install path --save-dev②配置 // vite.config,js import { defineConfig } from vite import vue from vitejs/plugin-vueimport { viteStaticCopy } from vite-plugin-static-copy import path from path export default defineConfig({re…

Java HashMap用法详解

文章目录 一、定义二、核心方法三、实例演示3.1、方法示例3.2、get()方法注意点&#xff01; 一、定义 Java 的 HashMap 是 Java 集合框架中的一个非常重要的类&#xff0c;它实现了 Map 接口。HashMap基于哈希表的数据结构&#xff0c;允许使用键-值对存储数据。这种存储方式使…

淘宝直播间智能化升级:基于LLM的学习与分析

自营直播应用技术团队负责的业务中&#xff0c;淘宝买菜的直播业务起步较晚&#xff0c;业务发展压力较大&#xff0c;业务上也就有了期望能够对一些二方的标杆直播间进行学习&#xff0c;并将其优点应用到自己直播间的需求。 最初 - 人海战术&#xff0c;学习PK 业务侧最直接的…

有的开发者用Apache-2.0开源协议,但是不允许商用?合理吗

Apache 2.0开源协议是设计用来允许商业使用的。该协议明确授予了使用者在遵守许可条款的情况下&#xff0c;对软件进行复制、修改、分发以及商业使用的权利。这包括但不限于&#xff1a; 1. 永久、全球性的版权许可&#xff1a;允许复制、准备衍生作品、公开展示、公开演出、从…

java学习 -----项目(1)

项目 写在前面的话&#xff1a;耳机没电&#xff0c;先来写写今早的感受。说实话&#xff0c;我并不喜欢我们的职业规划老师&#xff0c;满嘴荒唐言&#xff0c;被社会那所大缸浸染了一身社会气。课快结束时&#xff0c;老师问还有谁的视频没做&#xff0c;我把手举了起来。&a…

某j vue3 ts 随笔

因为ts组件封装的缘故&#xff0c;使用某个组件就必须按照这个组件的规则使用&#xff0c;老是忘记&#xff0c;这里就记一下吧 1.ApiSelect 组件 {label: 角色,field: selectedroles,component: ApiSelect,componentProps: {mode: multiple,api: getAllRolesListNoByTenant,la…