原文连接:https://blog.csdn.net/u012986012/article/details/120271091

普通开发流程

如果不借助任何Operator脚手架,我们是如何实现Operator的?大体分为一下几步:

  • CRD定义
  • Controller开发,编写逻辑
  • 测试部署

API定义

首先通过k8s.io/code-generator项目生成API相关代码,定义相关字段。

Controller实现

实现Controller以官方提供的sample-controller为例,如图所示
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nMH4XGxn-1631524205161)(https://github.com/kubernetes/sample-controller/raw/master/docs/images/client-go-controller-interaction.jpeg)]

主要分为以下几步:

初始化client配置

  //通过master/kubeconfig建立client configcfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)if err != nil {klog.Fatalf("Error building kubeconfig: %s", err.Error())}// kubernetes clientkubeClient, err := kubernetes.NewForConfig(cfg)if err != nil {klog.Fatalf("Error building kubernetes clientset: %s", err.Error())}// crd clientexampleClient, err := clientset.NewForConfig(cfg)if err != nil {klog.Fatalf("Error building example clientset: %s", err.Error())}

初始化Informer并启动

  //k8s sharedInformerkubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)// crd sharedInformerexampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)

// 初始化controller,传入informer, 注册了Deployment与Foo Informers
controller := NewController(kubeClient, exampleClient,
kubeInformerFactory.Apps().V1().Deployments(),
exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
//启动Informer
kubeInformerFactory.Start(stopCh)
exampleInformerFactory.Start(stopCh)

最后启动Controller

  if err = controller.Run(2, stopCh); err != nil {klog.Fatalf("Error running controller: %s", err.Error())}

Controller的实现中,通过NewController来初始化:

func NewController(kubeclientset kubernetes.Interface,sampleclientset clientset.Interface,deploymentInformer appsinformers.DeploymentInformer,fooInformer informers.FooInformer) *Controller {// Create event broadcasterutilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))klog.V(4).Info("Creating event broadcaster")eventBroadcaster := record.NewBroadcaster()eventBroadcaster.StartStructuredLogging(0)eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})controller := &Controller{kubeclientset:     kubeclientset,sampleclientset:   sampleclientset,deploymentsLister: deploymentInformer.Lister(), //只读cachedeploymentsSynced: deploymentInformer.Informer().HasSynced, //调用Informer()会注册informer到共享informer中foosLister:        fooInformer.Lister(),foosSynced:        fooInformer.Informer().HasSynced,workqueue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"), // 初始化工作队列recorder:          recorder,}klog.Info("Setting up event handlers")// 添加回调事件fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: controller.enqueueFoo,UpdateFunc: func(old, new interface{}) {controller.enqueueFoo(new)},})deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: controller.handleObject,UpdateFunc: func(old, new interface{}) {newDepl := new.(*appsv1.Deployment)oldDepl := old.(*appsv1.Deployment)if newDepl.ResourceVersion == oldDepl.ResourceVersion {// Periodic resync will send update events for all known Deployments.// Two different versions of the same Deployment will always have different RVs.return}controller.handleObject(new)},DeleteFunc: controller.handleObject,})return controller
}

Controller启动则是典型的k8s工作流,通过控制循环不断从工作队列获取对象进行处理,使其达到期望状态

func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {defer utilruntime.HandleCrash()defer c.workqueue.ShutDown()// 等待cache同步klog.Info("Waiting for informer caches to sync")if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {return fmt.Errorf("failed to wait for caches to sync")}// 启动worker,每个worker一个goroutinefor i := 0; i < workers; i++ {go wait.Until(c.runWorker, time.Second, stopCh)}// 等待退出信号<-stopChreturn nil
}
// worker就是一个循环不断调用processNextWorkItem
func (c *Controller) runWorker() {for c.processNextWorkItem() {}
}
func (c *Controller) processNextWorkItem() bool {// 从工作队列获取对象obj, shutdown := c.workqueue.Get()if shutdown {return false}// We wrap this block in a func so we can defer c.workqueue.Done.err := func(obj interface{}) error {defer c.workqueue.Done(obj)var key stringvar ok boolif key, ok = obj.(string); !ok {c.workqueue.Forget(obj)utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))return nil}// 进行处理,核心逻辑if err := c.syncHandler(key); err != nil {// 处理失败再次加入队列c.workqueue.AddRateLimited(key)return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())}// 处理成功不入队c.workqueue.Forget(obj)klog.Infof("Successfully synced '%s'", key)return nil}(obj)if err != nil {utilruntime.HandleError(err)return true}return true
}

Operator模式

Operator模式下,用户只需要实现Reconcile(调谐)即sample-controller中的syncHandler,其他步骤kubebuilder已经帮我们实现了。那我们来一探究竟,kubebuilder是怎么一步步触发Reconcile逻辑。

以mygame为例,通常使用kubebuilder生成的主文件如下:

var (// 用来解析kubernetes对象scheme   = runtime.NewScheme()setupLog = ctrl.Log.WithName("setup")
)
func init() {utilruntime.Must(clientgoscheme.AddToScheme(scheme))// 添加自定义对象到schemeutilruntime.Must(myappv1.AddToScheme(scheme))//+kubebuilder:scaffold:scheme
}
func main() {// ...ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))// 初始化controller managermgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme:                 scheme,MetricsBindAddress:     metricsAddr,Port:                   9443,HealthProbeBindAddress: probeAddr,LeaderElection:         enableLeaderElection,LeaderElectionID:       "7bc453ad.qingwave.github.io",})if err != nil {setupLog.Error(err, "unable to start manager")os.Exit(1)}// 初始化Reconcilerif err = (&controllers.GameReconciler{Client: mgr.GetClient(),Scheme: mgr.GetScheme(),}).SetupWithManager(mgr); err != nil {setupLog.Error(err, "unable to create controller", "controller", "Game")os.Exit(1)}// 初始化Webhookif enableWebhook {if err = (&myappv1.Game{}).SetupWebhookWithManager(mgr); err != nil {setupLog.Error(err, "unable to create webhook", "webhook", "Game")os.Exit(1)}}//+kubebuilder:scaffold:builder// 启动managerif err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {setupLog.Error(err, "problem running manager")os.Exit(1)}
}

kubebuilder封装了controller-runtime,在主文件中主要初始了controller-manager,以及我们填充的ReconcilerWebhook,最后启动manager

分别来看下每个流程。

Manager初始化

代码如下:

func New(config *rest.Config, options Options) (Manager, error) {// 设置默认配置options = setOptionsDefaults(options)// cluster初始化cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {clusterOptions.Scheme = options.SchemeclusterOptions.MapperProvider = options.MapperProviderclusterOptions.Logger = options.LoggerclusterOptions.SyncPeriod = options.SyncPeriodclusterOptions.Namespace = options.NamespaceclusterOptions.NewCache = options.NewCacheclusterOptions.ClientBuilder = options.ClientBuilderclusterOptions.ClientDisableCacheFor = options.ClientDisableCacheForclusterOptions.DryRunClient = options.DryRunClientclusterOptions.EventBroadcaster = options.EventBroadcaster})if err != nil {return nil, err}// event recorder初始化recorderProvider, err := options.newRecorderProvider(config, cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster)if err != nil {return nil, err}// 选主的资源锁配置leaderConfig := options.LeaderElectionConfigif leaderConfig == nil {leaderConfig = rest.CopyConfig(config)}resourceLock, err := options.newResourceLock(leaderConfig, recorderProvider, leaderelection.Options{LeaderElection:             options.LeaderElection,LeaderElectionResourceLock: options.LeaderElectionResourceLock,LeaderElectionID:           options.LeaderElectionID,LeaderElectionNamespace:    options.LeaderElectionNamespace,})if err != nil {return nil, err}// ...return &controllerManager{cluster:                 cluster,recorderProvider:        recorderProvider,resourceLock:            resourceLock,metricsListener:         metricsListener,metricsExtraHandlers:    metricsExtraHandlers,logger:                  options.Logger,elected:                 make(chan struct{}),port:                    options.Port,host:                    options.Host,certDir:                 options.CertDir,leaseDuration:           *options.LeaseDuration,renewDeadline:           *options.RenewDeadline,retryPeriod:             *options.RetryPeriod,healthProbeListener:     healthProbeListener,readinessEndpointName:   options.ReadinessEndpointName,livenessEndpointName:    options.LivenessEndpointName,gracefulShutdownTimeout: *options.GracefulShutdownTimeout,internalProceduresStop:  make(chan struct{}),leaderElectionStopped:   make(chan struct{}),}, nil

New中主要初始化了各种配置端口、选主信息、eventRecorder,最重要的是初始了ClusterCluster用来访问k8s,初始化代码如下:

// New constructs a brand new cluster
func New(config *rest.Config, opts ...Option) (Cluster, error) {if config == nil {return nil, errors.New("must specify Config")}options := Options{}for _, opt := range opts {opt(&options)}options = setOptionsDefaults(options)// Create the mapper providermapper, err := options.MapperProvider(config)if err != nil {options.Logger.Error(err, "Failed to get API Group-Resources")return nil, err}// Create the cache for the cached read client and registering informerscache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})if err != nil {return nil, err}clientOptions := client.Options{Scheme: options.Scheme, Mapper: mapper}apiReader, err := client.New(config, clientOptions)if err != nil {return nil, err}writeObj, err := options.ClientBuilder.WithUncached(options.ClientDisableCacheFor...).Build(cache, config, clientOptions)if err != nil {return nil, err}if options.DryRunClient {writeObj = client.NewDryRunClient(writeObj)}recorderProvider, err := options.newRecorderProvider(config, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster)if err != nil {return nil, err}return &cluster{config:           config,scheme:           options.Scheme,cache:            cache,fieldIndexes:     cache,client:           writeObj,apiReader:        apiReader,recorderProvider: recorderProvider,mapper:           mapper,logger:           options.Logger,}, nil
}

这里主要创建了cache与读写client

Cache初始化

创建cache代码:

// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {opts, err := defaultOpts(config, opts)if err != nil {return nil, err}im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)return &informerCache{InformersMap: im}, nil
}

New中调用了NewInformersMap来创建infermer map,分为structuredunstructuredmetadata

func NewInformersMap(config *rest.Config,scheme *runtime.Scheme,mapper meta.RESTMapper,resync time.Duration,namespace string) *InformersMap {return &InformersMap{structured:   newStructuredInformersMap(config, scheme, mapper, resync, namespace),unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),metadata:     newMetadataInformersMap(config, scheme, mapper, resync, namespace),Scheme: scheme,}
}

最终都是调用newSpecificInformersMap

// newStructuredInformersMap creates a new InformersMap for structured objects.
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch)
}
func newSpecificInformersMap(config *rest.Config,scheme *runtime.Scheme,mapper meta.RESTMapper,resync time.Duration,namespace string,createListWatcher createListWatcherFunc) *specificInformersMap {ip := &specificInformersMap{config:            config,Scheme:            scheme,mapper:            mapper,informersByGVK:    make(map[schema.GroupVersionKind]*MapEntry),codecs:            serializer.NewCodecFactory(scheme),paramCodec:        runtime.NewParameterCodec(scheme),resync:            resync,startWait:         make(chan struct{}),createListWatcher: createListWatcher,namespace:         namespace,}return ip
}
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {// Kubernetes APIs work against Resources, not GroupVersionKinds.  Map the// groupVersionKind to the Resource API we will use.mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)if err != nil {return nil, err}client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)if err != nil {return nil, err}listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")listObj, err := ip.Scheme.New(listGVK)if err != nil {return nil, err}// TODO: the functions that make use of this ListWatch should be adapted to//  pass in their own contexts instead of relying on this fixed one here.ctx := context.TODO()// Create a new ListWatch for the objreturn &cache.ListWatch{ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {res := listObj.DeepCopyObject()isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRooterr := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)return res, err},// Setup the watch functionWatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {// Watch needs to be set to true separatelyopts.Watch = trueisNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRootreturn client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx)},}, nil
}

newSpecificInformersMap中通过informersByGVK来记录schema中每个GVK对象与informer的对应关系,使用时可根据GVK得到informer再去List/Get

newSpecificInformersMap中的createListWatcher来初始化ListWatch对象。

Client初始化

client这里有多种类型,apiReader直接从apiserver读取对象,writeObj可以从apiserver或者cache中读取数据。

 apiReader, err := client.New(config, clientOptions)if err != nil {return nil, err}
func New(config *rest.Config, options Options) (Client, error) {if config == nil {return nil, fmt.Errorf("must provide non-nil rest.Config to client.New")}// Init a scheme if none providedif options.Scheme == nil {options.Scheme = scheme.Scheme}// Init a Mapper if none providedif options.Mapper == nil {var err erroroptions.Mapper, err = apiutil.NewDynamicRESTMapper(config)if err != nil {return nil, err}}// 从cache中读取clientcache := &clientCache{config: config,scheme: options.Scheme,mapper: options.Mapper,codecs: serializer.NewCodecFactory(options.Scheme),structuredResourceByType:   make(map[schema.GroupVersionKind]*resourceMeta),unstructuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta),}rawMetaClient, err := metadata.NewForConfig(config)if err != nil {return nil, fmt.Errorf("unable to construct metadata-only client for use as part of client: %w", err)}c := &client{typedClient: typedClient{cache:      clientcache,paramCodec: runtime.NewParameterCodec(options.Scheme),},unstructuredClient: unstructuredClient{cache:      clientcache,paramCodec: noConversionParamCodec{},},metadataClient: metadataClient{client:     rawMetaClient,restMapper: options.Mapper,},scheme: options.Scheme,mapper: options.Mapper,}return c, nil
}

writeObj实现了读写分离的Client,写直连apiserver,读获取在cache中则直接读取cache,否则通过clientset

writeObj, err := options.ClientBuilder.WithUncached(options.ClientDisableCacheFor...).Build(cache, config, clientOptions)if err != nil {return nil, err}
func (n *newClientBuilder) Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {// Create the Client for Write operations.c, err := client.New(config, options)if err != nil {return nil, err}return client.NewDelegatingClient(client.NewDelegatingClientInput{CacheReader:     cache,Client:          c,UncachedObjects: n.uncached,})
}
// 读写分离client
func NewDelegatingClient(in NewDelegatingClientInput) (Client, error) {uncachedGVKs := map[schema.GroupVersionKind]struct{}{}for _, obj := range in.UncachedObjects {gvk, err := apiutil.GVKForObject(obj, in.Client.Scheme())if err != nil {return nil, err}uncachedGVKs[gvk] = struct{}{}}return &delegatingClient{scheme: in.Client.Scheme(),mapper: in.Client.RESTMapper(),Reader: &delegatingReader{CacheReader:       in.CacheReader,ClientReader:      in.Client,scheme:            in.Client.Scheme(),uncachedGVKs:      uncachedGVKs,cacheUnstructured: in.CacheUnstructured,},Writer:       in.Client,StatusClient: in.Client,}, nil
}
// Get retrieves an obj for a given object key from the Kubernetes Cluster.
func (d *delegatingReader) Get(ctx context.Context, key ObjectKey, obj Object) error {//根据是否cached选择clientif isUncached, err := d.shouldBypassCache(obj); err != nil {return err} else if isUncached {return d.ClientReader.Get(ctx, key, obj)}return d.CacheReader.Get(ctx, key, obj)
}

Controller初始化

Controller初始化代码如下:

func (r *GameReconciler) SetupWithManager(mgr ctrl.Manager) error {ctrl.NewControllerManagedBy(mgr).WithOptions(controller.Options{MaxConcurrentReconciles: 3,}).For(&myappv1.Game{}). // Reconcile资源Owns(&appsv1.Deployment{}). // 监听Owner是当前资源的DeploymentComplete(r)return nil
}
// Complete builds the Application ControllerManagedBy.
func (blder *Builder) Complete(r reconcile.Reconciler) error {_, err := blder.Build(r)return err
}
// Build builds the Application ControllerManagedBy and returns the Controller it created.
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {if r == nil {return nil, fmt.Errorf("must provide a non-nil Reconciler")}if blder.mgr == nil {return nil, fmt.Errorf("must provide a non-nil Manager")}if blder.forInput.err != nil {return nil, blder.forInput.err}// Checking the reconcile type exist or notif blder.forInput.object == nil {return nil, fmt.Errorf("must provide an object for reconciliation")}// Set the Configblder.loadRestConfig()// Set the ControllerManagedByif err := blder.doController(r); err != nil {return nil, err}// Set the Watchif err := blder.doWatch(); err != nil {return nil, err}return blder.ctrl, nil
}

初始化Controller调用ctrl.NewControllerManagedBy来创建Builder,填充配置,最后通过Build方法完成初始化,主要做了三件事

  1. 设置配置
  2. doController来创建controller
  3. doWatch来设置需要监听的资源

先看controller初始化

func (blder *Builder) doController(r reconcile.Reconciler) error {ctrlOptions := blder.ctrlOptionsif ctrlOptions.Reconciler == nil {ctrlOptions.Reconciler = r}gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme())if err != nil {return err}// Setup the logger.if ctrlOptions.Log == nil {ctrlOptions.Log = blder.mgr.GetLogger()}ctrlOptions.Log = ctrlOptions.Log.WithValues("reconciler group", gvk.Group, "reconciler kind", gvk.Kind)// Build the controller and return.blder.ctrl, err = newController(blder.getControllerName(gvk), blder.mgr, ctrlOptions)return err
}
func New(name string, mgr manager.Manager, options Options) (Controller, error) {c, err := NewUnmanaged(name, mgr, options)if err != nil {return nil, err}// Add the controller as a Manager componentsreturn c, mgr.Add(c)
}
func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {if options.Reconciler == nil {return nil, fmt.Errorf("must specify Reconciler")}if len(name) == 0 {return nil, fmt.Errorf("must specify Name for Controller")}if options.Log == nil {options.Log = mgr.GetLogger()}if options.MaxConcurrentReconciles <= 0 {options.MaxConcurrentReconciles = 1}if options.CacheSyncTimeout == 0 {options.CacheSyncTimeout = 2 * time.Minute}if options.RateLimiter == nil {options.RateLimiter = workqueue.DefaultControllerRateLimiter()}// Inject dependencies into Reconcilerif err := mgr.SetFields(options.Reconciler); err != nil {return nil, err}// Create controller with dependencies setreturn &controller.Controller{Do: options.Reconciler,MakeQueue: func() workqueue.RateLimitingInterface {return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)},MaxConcurrentReconciles: options.MaxConcurrentReconciles,CacheSyncTimeout:        options.CacheSyncTimeout,SetFields:               mgr.SetFields,Name:                    name,Log:                     options.Log.WithName("controller").WithName(name),}, nil
}

doController调用controller.New来创建controller并添加到manager,在NewUnmanaged可以看到我们熟悉的配置,与上文sample-controller类似这里也设置了工作队列、最大Worker数等。

doWatch代码如下

func (blder *Builder) doWatch() error {// Reconcile typetypeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)if err != nil {return err}src := &source.Kind{Type: typeForSrc}hdler := &handler.EnqueueRequestForObject{}allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {return err}// Watches the managed typesfor _, own := range blder.ownsInput {typeForSrc, err := blder.project(own.object, own.objectProjection)if err != nil {return err}src := &source.Kind{Type: typeForSrc}hdler := &handler.EnqueueRequestForOwner{OwnerType:    blder.forInput.object,IsController: true,}allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)allPredicates = append(allPredicates, own.predicates...)if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {return err}}// Do the watch requestsfor _, w := range blder.watchesInput {allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)allPredicates = append(allPredicates, w.predicates...)// If the source of this watch is of type *source.Kind, project it.if srckind, ok := w.src.(*source.Kind); ok {typeForSrc, err := blder.project(srckind.Type, w.objectProjection)if err != nil {return err}srckind.Type = typeForSrc}if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {return err}}return nil
}

doWatch以此watch当前资源,ownsInput资源(即owner为当前资源),以及通过builder传入的watchsInput,最后调用ctrl.Watch来注册。其中参数eventhandler为入队函数,如当前资源入队实现为handler.EnqueueRequestForObject,类似地handler.EnqueueRequestForOwner是将owner加入工作队列。

type EnqueueRequestForObject struct{}
// Create implements EventHandler
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {if evt.Object == nil {enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)return}// 加入队列q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name:      evt.Object.GetName(),Namespace: evt.Object.GetNamespace(),}})
}

Watch实现如下:

func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {c.mu.Lock()defer c.mu.Unlock()// Inject Cache into argumentsif err := c.SetFields(src); err != nil {return err}if err := c.SetFields(evthdler); err != nil {return err}for _, pr := range prct {if err := c.SetFields(pr); err != nil {return err}}if !c.Started {c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})return nil}c.Log.Info("Starting EventSource", "source", src)return src.Start(c.ctx, evthdler, c.Queue, prct...)
}
func (ks *Kind) InjectCache(c cache.Cache) error {if ks.cache == nil {ks.cache = c}return nil
}
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,prct ...predicate.Predicate) error {...i, err := ks.cache.GetInformer(ctx, ks.Type)if err != nil {if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok {log.Error(err, "if kind is a CRD, it should be installed before calling Start","kind", kindMatchErr.GroupKind)}return err}i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})return nil
}
// informer get 实现
func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {switch obj.(type) {case *unstructured.Unstructured:return m.unstructured.Get(ctx, gvk, obj)case *unstructured.UnstructuredList:return m.unstructured.Get(ctx, gvk, obj)case *metav1.PartialObjectMetadata:return m.metadata.Get(ctx, gvk, obj)case *metav1.PartialObjectMetadataList:return m.metadata.Get(ctx, gvk, obj)default:return m.structured.Get(ctx, gvk, obj)}
}
// 如果informer不存在则新创建一个,加入到informerMap
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {// Return the informer if it is foundi, started, ok := func() (*MapEntry, bool, bool) {ip.mu.RLock()defer ip.mu.RUnlock()i, ok := ip.informersByGVK[gvk]return i, ip.started, ok}()if !ok {var err errorif i, started, err = ip.addInformerToMap(gvk, obj); err != nil {return started, nil, err}}...return started, i, nil
}

Watch通过SetFeilds方法注入cache, 最后添加到controllerstartWatches队列,若已启动,调用Start方法配置回调函数EventHandler

Manager启动

最后来看Manager启动流程

func (cm *controllerManager) Start(ctx context.Context) (err error) {if err := cm.Add(cm.cluster); err != nil {return fmt.Errorf("failed to add cluster to runnables: %w", err)}cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)stopComplete := make(chan struct{})defer close(stopComplete)defer func() {stopErr := cm.engageStopProcedure(stopComplete)}()cm.errChan = make(chan error)if cm.metricsListener != nil {go cm.serveMetrics()}// Serve health probesif cm.healthProbeListener != nil {go cm.serveHealthProbes()}go cm.startNonLeaderElectionRunnables()go func() {if cm.resourceLock != nil {err := cm.startLeaderElection()if err != nil {cm.errChan <- err}} else {// Treat not having leader election enabled the same as being elected.cm.startLeaderElectionRunnables()close(cm.elected)}}()select {case <-ctx.Done():// We are donereturn nilcase err := <-cm.errChan:// Error starting or running a runnablereturn err}
}

主要流程包括:

  1. 启动监控服务
  2. 启动健康检查服务
  3. 启动非选主服务
  4. 启动选主服务

对于非选主服务,代码如下

func (cm *controllerManager) startNonLeaderElectionRunnables() {cm.mu.Lock()defer cm.mu.Unlock()cm.waitForCache(cm.internalCtx)// Start the non-leaderelection Runnables after the cache has syncedfor _, c := range cm.nonLeaderElectionRunnables {cm.startRunnable(c)}
}
func (cm *controllerManager) waitForCache(ctx context.Context) {if cm.started {return}for _, cache := range cm.caches {cm.startRunnable(cache)}for _, cache := range cm.caches {cache.GetCache().WaitForCacheSync(ctx)}cm.started = true
}

启动cache,启动其他服务,对于选主服务也类似,初始化controller时会加入到选主服务队列,即最后启动Controller

func (c *Controller) Start(ctx context.Context) error {...c.Queue = c.MakeQueue()defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closederr := func() error {defer c.mu.Unlock()defer utilruntime.HandleCrash()for _, watch := range c.startWatches {c.Log.Info("Starting EventSource", "source", watch.src)if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {return err}}for _, watch := range c.startWatches {syncingSource, ok := watch.src.(source.SyncingSource)if !ok {continue}if err := func() error {// use a context with timeout for launching sources and syncing caches.sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)defer cancel()if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)c.Log.Error(err, "Could not wait for Cache to sync")return err}return nil}(); err != nil {return err}}...for i := 0; i < c.MaxConcurrentReconciles; i++ {go wait.UntilWithContext(ctx, func(ctx context.Context) {for c.processNextWorkItem(ctx) {}}, c.JitterPeriod)}c.Started = truereturn nil}()if err != nil {return err}<-ctx.Done()c.Log.Info("Stopping workers")return nil
}
func (c *Controller) processNextWorkItem(ctx context.Context) bool {obj, shutdown := c.Queue.Get()...c.reconcileHandler(ctx, obj)return true
}
func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {// Make sure that the the object is a valid request.req, ok := obj.(reconcile.Request)...if result, err := c.Do.Reconcile(ctx, req); err != nil {...
}

Controller启动主要包括

  1. 等待cache同步
  2. 启动多个processNextWorkItem
  3. 每个Worker调用c.Do.Reconcile来进行数据处理
    sample-controller工作流程一致,不断获取工作队列中的数据调用Reconcile进行调谐。

流程归纳

至此,通过kubebuilder生成代码的主要逻辑已经明朗,对比sample-controller其实整体流程类似,只是kubebuilder通过controller-runtime已经帮我们做了很多工作,如clientcache的初始化,controller的运行框架,我们只需要关心Reconcile逻辑即可。

  1. 初始化manager,创建clientcache
  2. 创建controller,对于监听资源会创建对应informer并添加回调函数
  3. 启动manager,启动cachecontroller

总结

kubebuilder大大简化了开发Operator的流程,了解其背后的原理有利于我们对Operator进行调优,能更好地应用于生产。

引用

[1] https://github.com/kubernetes/sample-controller
[2] https://book.kubebuilder.io/architecture.html
[3] https://developer.aliyun.com/article/719215

深入了解Kubernetes CRD开发工具kubebuilder相关推荐

  1. Kubernetes CRD开发工具Operator-SDK简介

    原文连接:https://blog.csdn.net/weixin_33918114/article/details/92211707 概览 原文来自:https://github.com/opera ...

  2. Kubernetes CRD开发汇总

    1. Kubernetes CRD开发 1.1 kubernetes 自定义资源(CRD) 在研究 Service Mesh 的过程中,发现 Istio 很多参数都通过 kubernetes CRD ...

  3. Kubernetes CRD开发模式及源码实现深入剖析-Kubernetes商业环境实战

    专注于大数据及容器云核心技术解密,可提供全栈的大数据+云原生平台咨询方案,请持续关注本套博客.如有任何学术交流,可随时联系.留言请关注<数据云技术社区>公众号. 1 CRD资源扩展 CRD ...

  4. Kubernetes CRD开发实践

    背景 Kubernetes的最大亮点之一必定是它的声明式API设计,所谓的声明式就是告诉Kubernetes你要什么,而不是告诉它怎么做命令.我们日常使用Kubernetes做编排工作的时候,经常会接 ...

  5. 【云原生】Kubernetes CRD 详解(Custom Resource Definition)

    文章目录 一.概述 二.定制资源 1)定制资源 和 定制控制器 2)定制控制器 3)Operator 介绍 1.Operator Framework 2.Operator 安装 3.安装 Operat ...

  6. java 开发工具_Java开发工具和环境,你了解多少?

    Java作为今年来最热门的编程语言之一,越来越多的人选择Java,但对于一些初入门的小白来说,在选择和安装开发工具和环境的时候,会遇见很多的问题. 今天就给大家来分享一些实用的Java开发工具和环境, ...

  7. Java开发工具和环境,你了解多少?(二)

    GitHub GitHub不仅仅为我们的Git项目提供托管服务,它为代码开源并让全世界都看到做出了巨大贡献.这鼓舞了人们去尝试.去交流.去练习,很大程度提高了每个人的项目质量和大家的技术水平. Git ...

  8. 最佳开发工具大全!前谷歌工程师两年打造“厂外生存指南”,登上GitHub热榜

    晓查 栗子 方驭洋 发自 凹非寺  量子位 报道 | 公众号 QbitAI 一位曾经的谷歌工程师,花费两年时间,辛苦整理了一份清单. 这个名为"xg2xg"的清单,原本是这位前谷歌 ...

  9. 【开发工具】盘点IDEA那些超级实用插件

    今天给大家分享IDEA那些超级实用插件,欢迎收藏! 1.日晒主题 Solarized Themes 推荐指数:☆☆☆☆☆ 推荐理由:日晒主题本身是为vim定制的.后来移植到ide 非常酷!配色非常耐看 ...

最新文章

  1. SpringMVC中数据库链接配置
  2. 误差、方差、偏差、噪声、训练误差+验证误差、偏差方差窘境、错误率和误差、过拟合与欠拟合
  3. sam格式的结构和意义_BAM/SAM文件格式的一些小知识
  4. [caffe解读] caffe从数学公式到代码实现4-认识caffe自带的7大loss
  5. [技术基础]计算机网络技术基础名词解释
  6. oracle表没有数据判断,Oracle中判断有表DROP无表CREATE的存储过程
  7. 基于密钥的认证机制(ssh)
  8. mysql 批量删除_Python接口测试之对MySQL的增、删、改、查操作(五)
  9. 一款强大的IDEA插件,帮你实现多人远程编程!
  10. MSP430学习小结3-MSP430基本时钟模块
  11. JavaScript页面跳转常用代码(转)
  12. PL/SQL Developer使用技巧、快捷键(转发)
  13. WebSocket开发说明文档
  14. 提高ios app性能 初中高级实践
  15. IO流实现写入规定的acci码值
  16. 二进制(一):由来及简介
  17. QTdesigner前后端交互--结节算法实战
  18. 应试教育的死穴,恰在于堵死了孩子“犯错”的空间?
  19. 佳能ip110 linux驱动下载,佳能iP110驱动-佳能Canon PIXMA iP110驱动下载 v1.0官方版--pc6下载站...
  20. TP-link wdr7800百M拆解

热门文章

  1. 越不服务器显示异常什么原因,计算机异常问题都有哪些常见类型
  2. linux7双网卡设置,Centos 7 静态IP和双网卡配置
  3. 机房定期巡检报告报告,全面详实,可以参考
  4. DL之模型调参:深度学习算法模型优化参数之对LSTM算法进行超参数调优
  5. AI公开课:19.02.20 雷鸣教授《人工智能革命与机遇》课堂笔记以及个人感悟
  6. 成功解决NameError: name 'file' is not defined
  7. MAT之GA:遗传算法(GA)解决M-TSP多旅行商问题
  8. 《疯狂Java讲义》9
  9. PAT_B_1006 换个格式输出整数
  10. Thread Group(线程组)