动态配置admission webhook举例(详情见官方文档:https://kubernetes.io/zh/docs/reference/access-authn-authz/extensible-admission-controllers/):

apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:name: "pod-policy.example.com"
webhooks:
- name: "pod-policy.example.com"rules:- apiGroups:   [""]apiVersions: ["v1"]operations:  ["CREATE"]resources:   ["pods"]scope:       "Namespaced"clientConfig:service:namespace: "example-namespace"name: "example-service"caBundle: "Ci0tLS0tQk...<base64-encoded PEM bundle containing the CA that signed the webhook's serving certificate>...tLS0K"admissionReviewVersions: ["v1", "v1beta1"]sideEffects: NonetimeoutSeconds: 5

一、初始化

kube-apiserver在调用NewServerRunOptions函数初始化options的时候,调用了NewAdmissionOptions去初始化了AdmissionOptions,并注册了内置的admission插件和webhook admission插件。

// NewServerRunOptions creates a new ServerRunOptions object with default parameters
func NewServerRunOptions() *ServerRunOptions {s := ServerRunOptions{// 省略...// 初始化AdmissionOptionsAdmission:               kubeoptions.NewAdmissionOptions(), Authentication:          kubeoptions.NewBuiltInAuthenticationOptions().WithAll(),Authorization:           kubeoptions.NewBuiltInAuthorizationOptions(),// 省略...}// ...return &s
}

NewAdmissionOptions里面先是调用genericoptions.NewAdmissionOptions创建一个AdmissionOptions,NewAdmissionOptions同时也注册了lifecycle、validatingwebhook、mutatingwebhook这三个插件。然后再调用RegisterAllAdmissionPlugins注册内置的其他admission。

// pkg/kubeapiserver/options/admission.go
func NewAdmissionOptions() *AdmissionOptions {// 1. 创建AdmissionOptions,并在里面注册了webhook的validating、mutating插件。options := genericoptions.NewAdmissionOptions()// register all admission plugins  2. 注册所有的内置的admission pluginsRegisterAllAdmissionPlugins(options.Plugins)// set RecommendedPluginOrder  3.设置 admission plugin顺序options.RecommendedPluginOrder = AllOrderedPlugins// set DefaultOffPlugins      4.默认关闭的pluginoptions.DefaultOffPlugins = DefaultOffAdmissionPlugins()return &AdmissionOptions{GenericAdmission: options,}
}

webhook的validating、mutating插件注册时在genericoptions.NewAdmissionOptions中,server.RegisterAllAdmissionPlugins注册了lifecycle、validatingwebhook、mutatingwebhook这三个插件。

// staging/src/k8s.io/apiserver/pkg/server/options/admission.go
func NewAdmissionOptions() *AdmissionOptions {options := &AdmissionOptions{Plugins:    admission.NewPlugins(),Decorators: admission.Decorators{admission.DecoratorFunc(admissionmetrics.WithControllerMetrics)},// This list is mix of mutating admission plugins and validating// admission plugins. The apiserver always runs the validating ones// after all the mutating ones, so their relative order in this list// doesn't matter.RecommendedPluginOrder: []string{lifecycle.PluginName, mutatingwebhook.PluginName, validatingwebhook.PluginName},DefaultOffPlugins:      sets.NewString(),}// 注册了lifecycle、validatingwebhook、mutatingwebhookserver.RegisterAllAdmissionPlugins(options.Plugins)return options
}// staging/src/k8s.io/apiserver/pkg/server/plugins.go
// RegisterAllAdmissionPlugins registers all admission plugins
func RegisterAllAdmissionPlugins(plugins *admission.Plugins) {lifecycle.Register(plugins)  // namespace lifecyclevalidatingwebhook.Register(plugins)  // validatingwebhook插件mutatingwebhook.Register(plugins) // mutatingwebhook插件
}

二、Admission Plugins在kube-apiserver请求处理链中的位置

kube-apiserver在cmd/kube-apiserver/app/server.go.buildGenericConfig()中根据ServerOptions生成GenericConfig。

前面已经分析AdmissionPlugin注册到ServerRunOptions的过程, buildGenericConfig中会调用ServerRunOptions.Admission.ApplyTo生成admission chain设置到GenericConfig里面。把所有的admission plugin生成chainAdmissionHandler对象,其实就是plugin数组,这个类的Admit、Validate等方法会遍历调用每个plugin的Admit、Validate方法

GenericConfig.AdmissionControl 又会赋值给GenericAPIServer.admissionControl

 
func (a *AdmissionOptions) ApplyTo(c *server.Config,informers informers.SharedInformerFactory,kubeAPIServerClientConfig *rest.Config,features featuregate.FeatureGate,pluginInitializers ...admission.PluginInitializer,
) error {// 省略 ...// 找到所有启用的pluginpluginNames := a.enabledPluginNames()pluginsConfigProvider, err := admission.ReadAdmissionConfiguration(pluginNames, a.ConfigFile, configScheme)if err != nil {return fmt.Errorf("failed to read plugin config: %v", err)}clientset, err := kubernetes.NewForConfig(kubeAPIServerClientConfig)if err != nil {return err}genericInitializer := initializer.New(clientset, informers, c.Authorization.Authorizer, features)initializersChain := admission.PluginInitializers{}pluginInitializers = append(pluginInitializers, genericInitializer)initializersChain = append(initializersChain, pluginInitializers...)// 把所有的admission plugin生成admissionChain,实际是个plugin数组admissionChain, err := a.Plugins.NewFromPlugins(pluginNames, pluginsConfigProvider, initializersChain, a.Decorators)if err != nil {return err}// 把admissionChain设置给GenericConfig.AdmissionControl c.AdmissionControl = admissionmetrics.WithStepMetrics(admissionChain)return nil
}

Admission Plugin是在kube-apiserver处理完前面的handler之后,在调用RESTStorage的Get、Create、Update、Delete等函数前会调用Admission Plugin。

《Kube-apiserver源码-Handler链》分析过,kube-apiserver有很多的handler组成了handler链,这写handler链的最内层,是使用gorestful框架注册的WebService。每个WebService都对应一种资源的RESTStorage,比如NodeStorage(pkg/registry/core/node/storage/storage.go ),installAPIResources初始化WebService时,会把RESTStorage的Get、Create、Update等函数分别封装成Get、POST、PUT等http方法的handler注册到WebService中。

比如把Update函数封装成http handler 作为PUT方法的handler,而在这个hanlder调用Update函数之前,会先调用Admission Plugin的Admit、Validate等函数。下面看个PUT方法的例子。

a.group.Admit是从GenericAPIServer.admissionControl取的值,就是前面ApplyTo函数生成的admissionChain。admit、updater作为参数调用restfulUpdateResource函数生成的handler

// staging/src/k8s.io/apiserver/pkg/endpoints/installer.go
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {admit := a.group.Admit// 省略 ...updater, isUpdater := storage.(rest.Updater)// 省略 ...switch action.Verb {case "GET": ...case "PUT": // Update a resource.doc := "replace the specified " + kindif isSubresource {doc = "replace " + subresource + " of the specified " + kind}// admit、updater作为参数调用restfulUpdateResource函数生成的handlerhandler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulUpdateResource(updater, reqScope, admit))route := ws.PUT(action.Path).To(handler).Doc(doc).Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).Operation("replace"+namespaced+kind+strings.Title(subresource)+operationSuffix).Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).Returns(http.StatusOK, "OK", producedObject).// TODO: in some cases, the API may return a v1.Status instead of the versioned object// but currently go-restful can't handle multiple different objects being returned.Returns(http.StatusCreated, "Created", producedObject).Reads(defaultVersionedObject).Writes(producedObject)if err := AddObjectParams(ws, route, versionedUpdateOptions); err != nil {return nil, err}addParams(route, action.Params)routes = append(routes, route)     case "PARTCH": ...  // 省略 ....}
}

看restfulUpdateResource的实现没啥详细内容,就是调用了 handlers.UpdateResource。

func restfulUpdateResource(r rest.Updater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {return func(req *restful.Request, res *restful.Response) {handlers.UpdateResource(r, &scope, admit)(res.ResponseWriter, req.Request)}
}

看handlers.UpdateResource的代码实现,会先判断如果传入的admission.Interface参数是MutationInterface类型,就调用Admit,也就是调用admissionChain的Admit,最终会遍历调用每个Admission Plugin的Admit方法。而Webhook Admission是众多admission中的一个。

执行完Admission,后面的requestFunc 才会调用RESTStorage的Update函数。每个资源的RESTStorage最终都是要调用ETCD3Storage的Get、Update等函数。

 
// staging/src/k8s.io/apiserver/pkg/endpoints/handlers/update.go
func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interface) http.HandlerFunc {return func(w http.ResponseWriter, req *http.Request) {// 省略 ...ae := request.AuditEventFrom(ctx)audit.LogRequestObject(ae, obj, scope.Resource, scope.Subresource, scope.Serializer)admit = admission.WithAudit(admit, ae)// 如果admit是MutationInterface类型的,就调用其Admit函数,也就是admissionChain的Admitif mutatingAdmission, ok := admit.(admission.MutationInterface); ok {transformers = append(transformers, func(ctx context.Context, newObj, oldObj runtime.Object) (runtime.Object, error) {isNotZeroObject, err := hasUID(oldObj)if err != nil {return nil, fmt.Errorf("unexpected error when extracting UID from oldObj: %v", err.Error())} else if !isNotZeroObject {if mutatingAdmission.Handles(admission.Create) {return newObj, mutatingAdmission.Admit(ctx, admission.NewAttributesRecord(newObj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, updateToCreateOptions(options), dryrun.IsDryRun(options.DryRun), userInfo), scope)}} else {if mutatingAdmission.Handles(admission.Update) {return newObj, mutatingAdmission.Admit(ctx, admission.NewAttributesRecord(newObj, oldObj, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, options, dryrun.IsDryRun(options.DryRun), userInfo), scope)}}return newObj, nil})}// 省略 ...// 执行完admission,这里才调用RESTStorage的Update函数 requestFunc := func() (runtime.Object, error) {obj, created, err := r.Update(ctx,name,rest.DefaultUpdatedObjectInfo(obj, transformers...),// createValidation会调用ValidationInterface的Validate方法withAuthorization(rest.AdmissionToValidateObjectFunc(admit,admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, updateToCreateOptions(options), dryrun.IsDryRun(options.DryRun), userInfo), scope),scope.Authorizer, createAuthorizerAttributes),// updateValidation会调用ValidationInterface的Validate方法rest.AdmissionToValidateObjectUpdateFunc(admit,admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, options, dryrun.IsDryRun(options.DryRun), userInfo), scope),false,options,)wasCreated = createdreturn obj, err}result, err := finishRequest(timeout, func() (runtime.Object, error) {result, err := requestFunc()// 省略 ...return result, err})// ...transformResponseObject(ctx, scope, trace, req, w, status, outputMediaType, result)}
}

以上是PUT方法的例子,里面调用了MutationInterface和ValidationInterface。其他的方法比如POST、DELETE等也是类似。但是GET方法不会调用Admission Plugin。

三、Webhook Admission 调用

validatingwebhook和mutatingwebhook分别位于staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating/plugin.go,staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/plugin.go两个文件中。

3.1 ValidatingAdmissionWebhook调用

1. ValidatingAdmissionWebhook的Validate()函数实现了ValidationInterface接口,有请求到来时kube-apiserver会调用所有admission 的Validate()方法。ValidatingAdmissionWebhook持有了一个Webhook对象,Validate()会调用Webhook.Dispatch()。

2.Webhook.Dispatch()又调用了其持有的dispatcher的Dispatch()方法。dispatcher时通过dispatcherFactory创建的,dispatcherFactory是ValidatingAdmissionWebhook创建generic.Webhook时候传入的newValidatingDispatcher函数。调用dispatcherFactory函数创建的实际上是validatingDispatcher对象,也就是Webhook.Dispatch()调用的是validatingDispatcher.Dispatch()。

3.validatingDispatcher.Dispatch()会逐个远程调用注册的webhook plugin

NewValidatingAdmissionWebhook初始化了ValidatingAdmissionWebhook对象,内部持有了一个generic.Webhook对象,generic.Webhook是一个Validate和mutate公用的框架,创建generic.Webhook时需要一个dispatcherFactory函数,用这个函数生成dispatcher对象

// staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating/plugin.go
// NewValidatingAdmissionWebhook returns a generic admission webhook plugin.
func NewValidatingAdmissionWebhook(configFile io.Reader) (*Plugin, error) {handler := admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update)p := &Plugin{}var err errorp.Webhook, err = generic.NewWebhook(handler, configFile, configuration.NewValidatingWebhookConfigurationManager, newValidatingDispatcher(p))if err != nil {return nil, err}return p, nil
}// Validate makes an admission decision based on the request attributes.
func (a *Plugin) Validate(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error {return a.Webhook.Dispatch(ctx, attr, o)
}

调用generic.Webhook.Dispatch()时会调用dispatcher对象的Dispatch。

 
// Dispatch is called by the downstream Validate or Admit methods.
func (a *Webhook) Dispatch(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error {if rules.IsWebhookConfigurationResource(attr) {return nil}if !a.WaitForReady() {return admission.NewForbidden(attr, fmt.Errorf("not yet ready to handle request"))}hooks := a.hookSource.Webhooks()return a.dispatcher.Dispatch(ctx, attr, o, hooks)
}

validatingDispatcher.Dispatch遍历所有的hooks ,找到相关的webhooks,然后执行callHooks调用外部注册进来的

func (d *validatingDispatcher) Dispatch(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces, hooks []webhook.WebhookAccessor) error {var relevantHooks []*generic.WebhookInvocation// Construct all the versions we need to call our webhooksversionedAttrs := map[schema.GroupVersionKind]*generic.VersionedAttributes{}for _, hook := range hooks {// 遍历所有的webhooks,根据ValidatingWebhookConfiguration中的rules是否匹配找到所有相关的hooksinvocation, statusError := d.plugin.ShouldCallHook(hook, attr, o)if statusError != nil {return statusError}if invocation == nil {continue}relevantHooks = append(relevantHooks, invocation)// If we already have this version, continueif _, ok := versionedAttrs[invocation.Kind]; ok {continue}versionedAttr, err := generic.NewVersionedAttributes(attr, invocation.Kind, o)if err != nil {return apierrors.NewInternalError(err)}versionedAttrs[invocation.Kind] = versionedAttr}if len(relevantHooks) == 0 {// no matching hooksreturn nil}// Check if the request has already timed out before spawning remote callsselect {case <-ctx.Done():// parent context is canceled or timed out, no point in continuingreturn apierrors.NewTimeoutError("request did not complete within requested timeout", 0)default:}wg := sync.WaitGroup{}errCh := make(chan error, len(relevantHooks))wg.Add(len(relevantHooks))for i := range relevantHooks {go func(invocation *generic.WebhookInvocation) {defer wg.Done()hook, ok := invocation.Webhook.GetValidatingWebhook()if !ok {utilruntime.HandleError(fmt.Errorf("validating webhook dispatch requires v1.ValidatingWebhook, but got %T", hook))return}versionedAttr := versionedAttrs[invocation.Kind]t := time.Now()// 启动多个go routine 并行调用注册进来的webhook pluginerr := d.callHook(ctx, hook, invocation, versionedAttr)ignoreClientCallFailures := hook.FailurePolicy != nil && *hook.FailurePolicy == v1.Ignorerejected := falseif err != nil {switch err := err.(type) {case *webhookutil.ErrCallingWebhook:if !ignoreClientCallFailures {rejected = trueadmissionmetrics.Metrics.ObserveWebhookRejection(hook.Name, "validating", string(versionedAttr.Attributes.GetOperation()), admissionmetrics.WebhookRejectionCallingWebhookError, 0)}case *webhookutil.ErrWebhookRejection:rejected = trueadmissionmetrics.Metrics.ObserveWebhookRejection(hook.Name, "validating", string(versionedAttr.Attributes.GetOperation()), admissionmetrics.WebhookRejectionNoError, int(err.Status.ErrStatus.Code))default:rejected = trueadmissionmetrics.Metrics.ObserveWebhookRejection(hook.Name, "validating", string(versionedAttr.Attributes.GetOperation()), admissionmetrics.WebhookRejectionAPIServerInternalError, 0)}}admissionmetrics.Metrics.ObserveWebhook(time.Since(t), rejected, versionedAttr.Attributes, "validating", hook.Name)if err == nil {return}if callErr, ok := err.(*webhookutil.ErrCallingWebhook); ok {if ignoreClientCallFailures {klog.Warningf("Failed calling webhook, failing open %v: %v", hook.Name, callErr)utilruntime.HandleError(callErr)return}klog.Warningf("Failed calling webhook, failing closed %v: %v", hook.Name, err)errCh <- apierrors.NewInternalError(err)return}if rejectionErr, ok := err.(*webhookutil.ErrWebhookRejection); ok {err = rejectionErr.Status}klog.Warningf("rejected by webhook %q: %#v", hook.Name, err)errCh <- err}(relevantHooks[i])}// 等待多个goroutine 执行完成wg.Wait()close(errCh)var errs []errorfor e := range errCh {errs = append(errs, e)}if len(errs) == 0 {return nil}if len(errs) > 1 {for i := 1; i < len(errs); i++ {// TODO: merge status errors; until then, just return the first one.utilruntime.HandleError(errs[i])}}return errs[0]
}

3.2 MutatingAdmissionWebhook调用

看MutatingWebhook的构造函数就可以看到,MutatingWebhook和ValidatingWebhook的代码架构是一样的,只不过在创建generic.Webhook的时候传入的dispatcherFactory函数是newMutatingDispatcher,所以Webhook.Dispatch()最终调用的就是mutatingDispatcher.Dispatch(),这个和validatingDispatcher.Dispatch的实现逻辑基本是一样的,也是根据WebhookConfiguration中的rules是否匹配找到相关的webhooks,然后逐个调用。

 
// staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/plugin.go
// NewMutatingWebhook returns a generic admission webhook plugin.
func NewMutatingWebhook(configFile io.Reader) (*Plugin, error) {handler := admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update)p := &Plugin{}var err errorp.Webhook, err = generic.NewWebhook(handler, configFile, configuration.NewMutatingWebhookConfigurationManager, newMutatingDispatcher(p))if err != nil {return nil, err}return p, nil
}// ValidateInitialization implements the InitializationValidator interface.
func (a *Plugin) ValidateInitialization() error {if err := a.Webhook.ValidateInitialization(); err != nil {return err}return nil
}// Admit makes an admission decision based on the request attributes.
func (a *Plugin) Admit(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error {return a.Webhook.Dispatch(ctx, attr, o)
}

kube-apiserver源码-动态准入控制 admission webhook相关推荐

  1. 准入控制_Kubernetes动态准入控制示例

    准入控制 A walk-through of creating a webhook for Kubernetes dynamic admission control. 创建用于Kubernetes动态 ...

  2. 全新仿DS网发卡源码 动态特效源码

    介绍: 全新仿ds网发卡源码 动态特效 全新模板 全新后台系统 修复订单支付失败 去除授权后门 查单页面更换 用户体验感非常棒 增加前台显示数据 网盘下载地址: https://zijiewangpa ...

  3. kubernetes apiserver源码分析二之路由

    apiserver的man函数在 k8s.io/kubernetes/cmd/kube-apiserver 目录. 但是大部分源码却在 k8s.io/apiserver 这个库里面. cmd 目录下的 ...

  4. 简述控制反转ioc_阅读Spring源码:IOC控制反转前的处理

    温馨提示:要怀着 这个世界很美好 的心态去看~ 技术经验交流:点击入群 ClassPathXmlApplicationContext的注册方式 源码分析基于Spring4.3 从ClassPathXm ...

  5. 好看的二次元个人主页导航源码 动态背景+背景音乐

    简介: 这是一款动态的个人导航页面 忘词了 你们凑合着看吧 源码上传解压就可以使用,文字链接修改 index.html 文件 音乐替换 0.mp3 文件 网盘下载地址: http://kekewl.c ...

  6. jq 评论源码+动态评论回复

    评论回复源码下载 <!DOCTYPE html> <html><head><meta charset="utf-8"><met ...

  7. Android:剖析源码,随心所欲控制Toast显示

                  本文转载于:http://www.cnblogs.com/net168/p/4058193.html                前言 Toast相信大家都不会陌生吧,如 ...

  8. 以canvas为动态网页背景,添加的标签内容不显示(附源码-----动态背景星空)

    问题描述: 我在网上下载了一个动态网页背景--星空,然后本地能运行,但是当我添加其他组件的时候,发现组件不显示,然后只能显示动态背景.仔细看了一下代码,发现这个动态背景是依靠canvas标签来实现的. ...

  9. Spring源码——动态AOP实现流程

    前言 最近回顾了一下Spring源码,准备用思维导图的方式简单的将整个源码内容的流程展示出来,思维导图.图片等文件更新在https://github.com/MrSorrow/spring-frame ...

最新文章

  1. python02-条件语句到循环语句
  2. Android N新特性
  3. ES6新特性之字符串扩展
  4. 【Espruino】NO.12 加速度传感器演示
  5. angular2子组件的事件传递(任意组件事件传递)
  6. 301 302区别_如何正确理解301,302和canonial标签
  7. curl 增加header_libcurl增加HTTP header 和 POST之后获取返回数据
  8. 稀疏大模型简述:从MoE、Sparse Attention到GLaM
  9. php ssh tab补全,bash的按TAB键自动补全(自动完成)的原理与扩展
  10. azure blob_如何使用Power BI从Azure Blob存储访问数据
  11. 设计模式系列——三个工厂模式(简单工厂模式,工厂方法模式,抽象工厂模式)...
  12. mysql innodb启动失败_关于mysql innodb启动失败无法重启的处理方法讲解
  13. springMVC常见问题
  14. 【收藏】计算机视觉领域全球顶级高校研究所团队总结
  15. hudi系列-文件压缩(compaction)
  16. Excel 生成均匀分布、正态分布随机数
  17. python绘制中国_如何用Python画一个中国地图?
  18. 关于yolov3.weights文件下载地址的分享
  19. 点心云pcdn跑量越来越少问题解决方案
  20. gyp info it worked if it ends with ok npm ERR 解决办法

热门文章

  1. 康姿百德 “神奇”床垫包治百病是谣传
  2. 大连三家软件企业入围全国软件百强排行榜
  3. 优麒麟系统Ubuntu Kylin的网络配置
  4. python 创建虚拟环境报错
  5. Python Pyinstaller安装与使用
  6. c++中的typename与class
  7. Java代码审计——WebGoat CSRF (上)
  8. 云计算的认识和看法_对云计算的看法. 我对云计算的认识
  9. 如何修改Linux服务器时间
  10. SecureCRT 如何快速执行常用命令