kube-apiserver源码-动态准入控制 admission webhook
动态配置admission webhook举例(详情见官方文档:https://kubernetes.io/zh/docs/reference/access-authn-authz/extensible-admission-controllers/):
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:name: "pod-policy.example.com"
webhooks:
- name: "pod-policy.example.com"rules:- apiGroups: [""]apiVersions: ["v1"]operations: ["CREATE"]resources: ["pods"]scope: "Namespaced"clientConfig:service:namespace: "example-namespace"name: "example-service"caBundle: "Ci0tLS0tQk...<base64-encoded PEM bundle containing the CA that signed the webhook's serving certificate>...tLS0K"admissionReviewVersions: ["v1", "v1beta1"]sideEffects: NonetimeoutSeconds: 5
一、初始化
kube-apiserver在调用NewServerRunOptions函数初始化options的时候,调用了NewAdmissionOptions去初始化了AdmissionOptions,并注册了内置的admission插件和webhook admission插件。
// NewServerRunOptions creates a new ServerRunOptions object with default parameters
func NewServerRunOptions() *ServerRunOptions {s := ServerRunOptions{// 省略...// 初始化AdmissionOptionsAdmission: kubeoptions.NewAdmissionOptions(), Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(),Authorization: kubeoptions.NewBuiltInAuthorizationOptions(),// 省略...}// ...return &s
}
NewAdmissionOptions里面先是调用genericoptions.NewAdmissionOptions创建一个AdmissionOptions,NewAdmissionOptions同时也注册了lifecycle、validatingwebhook、mutatingwebhook这三个插件。然后再调用RegisterAllAdmissionPlugins注册内置的其他admission。
// pkg/kubeapiserver/options/admission.go
func NewAdmissionOptions() *AdmissionOptions {// 1. 创建AdmissionOptions,并在里面注册了webhook的validating、mutating插件。options := genericoptions.NewAdmissionOptions()// register all admission plugins 2. 注册所有的内置的admission pluginsRegisterAllAdmissionPlugins(options.Plugins)// set RecommendedPluginOrder 3.设置 admission plugin顺序options.RecommendedPluginOrder = AllOrderedPlugins// set DefaultOffPlugins 4.默认关闭的pluginoptions.DefaultOffPlugins = DefaultOffAdmissionPlugins()return &AdmissionOptions{GenericAdmission: options,}
}
webhook的validating、mutating插件注册时在genericoptions.NewAdmissionOptions中,server.RegisterAllAdmissionPlugins注册了lifecycle、validatingwebhook、mutatingwebhook这三个插件。
// staging/src/k8s.io/apiserver/pkg/server/options/admission.go
func NewAdmissionOptions() *AdmissionOptions {options := &AdmissionOptions{Plugins: admission.NewPlugins(),Decorators: admission.Decorators{admission.DecoratorFunc(admissionmetrics.WithControllerMetrics)},// This list is mix of mutating admission plugins and validating// admission plugins. The apiserver always runs the validating ones// after all the mutating ones, so their relative order in this list// doesn't matter.RecommendedPluginOrder: []string{lifecycle.PluginName, mutatingwebhook.PluginName, validatingwebhook.PluginName},DefaultOffPlugins: sets.NewString(),}// 注册了lifecycle、validatingwebhook、mutatingwebhookserver.RegisterAllAdmissionPlugins(options.Plugins)return options
}// staging/src/k8s.io/apiserver/pkg/server/plugins.go
// RegisterAllAdmissionPlugins registers all admission plugins
func RegisterAllAdmissionPlugins(plugins *admission.Plugins) {lifecycle.Register(plugins) // namespace lifecyclevalidatingwebhook.Register(plugins) // validatingwebhook插件mutatingwebhook.Register(plugins) // mutatingwebhook插件
}
二、Admission Plugins在kube-apiserver请求处理链中的位置
kube-apiserver在cmd/kube-apiserver/app/server.go.buildGenericConfig()中根据ServerOptions生成GenericConfig。
前面已经分析AdmissionPlugin注册到ServerRunOptions的过程, buildGenericConfig中会调用ServerRunOptions.Admission.ApplyTo生成admission chain设置到GenericConfig里面。把所有的admission plugin生成chainAdmissionHandler对象,其实就是plugin数组,这个类的Admit、Validate等方法会遍历调用每个plugin的Admit、Validate方法
GenericConfig.AdmissionControl 又会赋值给GenericAPIServer.admissionControl
func (a *AdmissionOptions) ApplyTo(c *server.Config,informers informers.SharedInformerFactory,kubeAPIServerClientConfig *rest.Config,features featuregate.FeatureGate,pluginInitializers ...admission.PluginInitializer,
) error {// 省略 ...// 找到所有启用的pluginpluginNames := a.enabledPluginNames()pluginsConfigProvider, err := admission.ReadAdmissionConfiguration(pluginNames, a.ConfigFile, configScheme)if err != nil {return fmt.Errorf("failed to read plugin config: %v", err)}clientset, err := kubernetes.NewForConfig(kubeAPIServerClientConfig)if err != nil {return err}genericInitializer := initializer.New(clientset, informers, c.Authorization.Authorizer, features)initializersChain := admission.PluginInitializers{}pluginInitializers = append(pluginInitializers, genericInitializer)initializersChain = append(initializersChain, pluginInitializers...)// 把所有的admission plugin生成admissionChain,实际是个plugin数组admissionChain, err := a.Plugins.NewFromPlugins(pluginNames, pluginsConfigProvider, initializersChain, a.Decorators)if err != nil {return err}// 把admissionChain设置给GenericConfig.AdmissionControl c.AdmissionControl = admissionmetrics.WithStepMetrics(admissionChain)return nil
}
Admission Plugin是在kube-apiserver处理完前面的handler之后,在调用RESTStorage的Get、Create、Update、Delete等函数前会调用Admission Plugin。
《Kube-apiserver源码-Handler链》分析过,kube-apiserver有很多的handler组成了handler链,这写handler链的最内层,是使用gorestful框架注册的WebService。每个WebService都对应一种资源的RESTStorage,比如NodeStorage(pkg/registry/core/node/storage/storage.go ),installAPIResources初始化WebService时,会把RESTStorage的Get、Create、Update等函数分别封装成Get、POST、PUT等http方法的handler注册到WebService中。
比如把Update函数封装成http handler 作为PUT方法的handler,而在这个hanlder调用Update函数之前,会先调用Admission Plugin的Admit、Validate等函数。下面看个PUT方法的例子。
a.group.Admit是从GenericAPIServer.admissionControl取的值,就是前面ApplyTo函数生成的admissionChain。admit、updater作为参数调用restfulUpdateResource函数生成的handler
// staging/src/k8s.io/apiserver/pkg/endpoints/installer.go
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {admit := a.group.Admit// 省略 ...updater, isUpdater := storage.(rest.Updater)// 省略 ...switch action.Verb {case "GET": ...case "PUT": // Update a resource.doc := "replace the specified " + kindif isSubresource {doc = "replace " + subresource + " of the specified " + kind}// admit、updater作为参数调用restfulUpdateResource函数生成的handlerhandler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulUpdateResource(updater, reqScope, admit))route := ws.PUT(action.Path).To(handler).Doc(doc).Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).Operation("replace"+namespaced+kind+strings.Title(subresource)+operationSuffix).Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).Returns(http.StatusOK, "OK", producedObject).// TODO: in some cases, the API may return a v1.Status instead of the versioned object// but currently go-restful can't handle multiple different objects being returned.Returns(http.StatusCreated, "Created", producedObject).Reads(defaultVersionedObject).Writes(producedObject)if err := AddObjectParams(ws, route, versionedUpdateOptions); err != nil {return nil, err}addParams(route, action.Params)routes = append(routes, route) case "PARTCH": ... // 省略 ....}
}
看restfulUpdateResource的实现没啥详细内容,就是调用了 handlers.UpdateResource。
func restfulUpdateResource(r rest.Updater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {return func(req *restful.Request, res *restful.Response) {handlers.UpdateResource(r, &scope, admit)(res.ResponseWriter, req.Request)}
}
看handlers.UpdateResource的代码实现,会先判断如果传入的admission.Interface参数是MutationInterface类型,就调用Admit,也就是调用admissionChain的Admit,最终会遍历调用每个Admission Plugin的Admit方法。而Webhook Admission是众多admission中的一个。
执行完Admission,后面的requestFunc 才会调用RESTStorage的Update函数。每个资源的RESTStorage最终都是要调用ETCD3Storage的Get、Update等函数。
// staging/src/k8s.io/apiserver/pkg/endpoints/handlers/update.go
func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interface) http.HandlerFunc {return func(w http.ResponseWriter, req *http.Request) {// 省略 ...ae := request.AuditEventFrom(ctx)audit.LogRequestObject(ae, obj, scope.Resource, scope.Subresource, scope.Serializer)admit = admission.WithAudit(admit, ae)// 如果admit是MutationInterface类型的,就调用其Admit函数,也就是admissionChain的Admitif mutatingAdmission, ok := admit.(admission.MutationInterface); ok {transformers = append(transformers, func(ctx context.Context, newObj, oldObj runtime.Object) (runtime.Object, error) {isNotZeroObject, err := hasUID(oldObj)if err != nil {return nil, fmt.Errorf("unexpected error when extracting UID from oldObj: %v", err.Error())} else if !isNotZeroObject {if mutatingAdmission.Handles(admission.Create) {return newObj, mutatingAdmission.Admit(ctx, admission.NewAttributesRecord(newObj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, updateToCreateOptions(options), dryrun.IsDryRun(options.DryRun), userInfo), scope)}} else {if mutatingAdmission.Handles(admission.Update) {return newObj, mutatingAdmission.Admit(ctx, admission.NewAttributesRecord(newObj, oldObj, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, options, dryrun.IsDryRun(options.DryRun), userInfo), scope)}}return newObj, nil})}// 省略 ...// 执行完admission,这里才调用RESTStorage的Update函数 requestFunc := func() (runtime.Object, error) {obj, created, err := r.Update(ctx,name,rest.DefaultUpdatedObjectInfo(obj, transformers...),// createValidation会调用ValidationInterface的Validate方法withAuthorization(rest.AdmissionToValidateObjectFunc(admit,admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, updateToCreateOptions(options), dryrun.IsDryRun(options.DryRun), userInfo), scope),scope.Authorizer, createAuthorizerAttributes),// updateValidation会调用ValidationInterface的Validate方法rest.AdmissionToValidateObjectUpdateFunc(admit,admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, options, dryrun.IsDryRun(options.DryRun), userInfo), scope),false,options,)wasCreated = createdreturn obj, err}result, err := finishRequest(timeout, func() (runtime.Object, error) {result, err := requestFunc()// 省略 ...return result, err})// ...transformResponseObject(ctx, scope, trace, req, w, status, outputMediaType, result)}
}
以上是PUT方法的例子,里面调用了MutationInterface和ValidationInterface。其他的方法比如POST、DELETE等也是类似。但是GET方法不会调用Admission Plugin。
三、Webhook Admission 调用
validatingwebhook和mutatingwebhook分别位于staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating/plugin.go,staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/plugin.go两个文件中。
3.1 ValidatingAdmissionWebhook调用
1. ValidatingAdmissionWebhook的Validate()函数实现了ValidationInterface接口,有请求到来时kube-apiserver会调用所有admission 的Validate()方法。ValidatingAdmissionWebhook持有了一个Webhook对象,Validate()会调用Webhook.Dispatch()。
2.Webhook.Dispatch()又调用了其持有的dispatcher的Dispatch()方法。dispatcher时通过dispatcherFactory创建的,dispatcherFactory是ValidatingAdmissionWebhook创建generic.Webhook时候传入的newValidatingDispatcher函数。调用dispatcherFactory函数创建的实际上是validatingDispatcher对象,也就是Webhook.Dispatch()调用的是validatingDispatcher.Dispatch()。
3.validatingDispatcher.Dispatch()会逐个远程调用注册的webhook plugin
NewValidatingAdmissionWebhook初始化了ValidatingAdmissionWebhook对象,内部持有了一个generic.Webhook对象,generic.Webhook是一个Validate和mutate公用的框架,创建generic.Webhook时需要一个dispatcherFactory函数,用这个函数生成dispatcher对象
// staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating/plugin.go
// NewValidatingAdmissionWebhook returns a generic admission webhook plugin.
func NewValidatingAdmissionWebhook(configFile io.Reader) (*Plugin, error) {handler := admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update)p := &Plugin{}var err errorp.Webhook, err = generic.NewWebhook(handler, configFile, configuration.NewValidatingWebhookConfigurationManager, newValidatingDispatcher(p))if err != nil {return nil, err}return p, nil
}// Validate makes an admission decision based on the request attributes.
func (a *Plugin) Validate(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error {return a.Webhook.Dispatch(ctx, attr, o)
}
调用generic.Webhook.Dispatch()时会调用dispatcher对象的Dispatch。
// Dispatch is called by the downstream Validate or Admit methods.
func (a *Webhook) Dispatch(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error {if rules.IsWebhookConfigurationResource(attr) {return nil}if !a.WaitForReady() {return admission.NewForbidden(attr, fmt.Errorf("not yet ready to handle request"))}hooks := a.hookSource.Webhooks()return a.dispatcher.Dispatch(ctx, attr, o, hooks)
}
validatingDispatcher.Dispatch遍历所有的hooks ,找到相关的webhooks,然后执行callHooks调用外部注册进来的
func (d *validatingDispatcher) Dispatch(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces, hooks []webhook.WebhookAccessor) error {var relevantHooks []*generic.WebhookInvocation// Construct all the versions we need to call our webhooksversionedAttrs := map[schema.GroupVersionKind]*generic.VersionedAttributes{}for _, hook := range hooks {// 遍历所有的webhooks,根据ValidatingWebhookConfiguration中的rules是否匹配找到所有相关的hooksinvocation, statusError := d.plugin.ShouldCallHook(hook, attr, o)if statusError != nil {return statusError}if invocation == nil {continue}relevantHooks = append(relevantHooks, invocation)// If we already have this version, continueif _, ok := versionedAttrs[invocation.Kind]; ok {continue}versionedAttr, err := generic.NewVersionedAttributes(attr, invocation.Kind, o)if err != nil {return apierrors.NewInternalError(err)}versionedAttrs[invocation.Kind] = versionedAttr}if len(relevantHooks) == 0 {// no matching hooksreturn nil}// Check if the request has already timed out before spawning remote callsselect {case <-ctx.Done():// parent context is canceled or timed out, no point in continuingreturn apierrors.NewTimeoutError("request did not complete within requested timeout", 0)default:}wg := sync.WaitGroup{}errCh := make(chan error, len(relevantHooks))wg.Add(len(relevantHooks))for i := range relevantHooks {go func(invocation *generic.WebhookInvocation) {defer wg.Done()hook, ok := invocation.Webhook.GetValidatingWebhook()if !ok {utilruntime.HandleError(fmt.Errorf("validating webhook dispatch requires v1.ValidatingWebhook, but got %T", hook))return}versionedAttr := versionedAttrs[invocation.Kind]t := time.Now()// 启动多个go routine 并行调用注册进来的webhook pluginerr := d.callHook(ctx, hook, invocation, versionedAttr)ignoreClientCallFailures := hook.FailurePolicy != nil && *hook.FailurePolicy == v1.Ignorerejected := falseif err != nil {switch err := err.(type) {case *webhookutil.ErrCallingWebhook:if !ignoreClientCallFailures {rejected = trueadmissionmetrics.Metrics.ObserveWebhookRejection(hook.Name, "validating", string(versionedAttr.Attributes.GetOperation()), admissionmetrics.WebhookRejectionCallingWebhookError, 0)}case *webhookutil.ErrWebhookRejection:rejected = trueadmissionmetrics.Metrics.ObserveWebhookRejection(hook.Name, "validating", string(versionedAttr.Attributes.GetOperation()), admissionmetrics.WebhookRejectionNoError, int(err.Status.ErrStatus.Code))default:rejected = trueadmissionmetrics.Metrics.ObserveWebhookRejection(hook.Name, "validating", string(versionedAttr.Attributes.GetOperation()), admissionmetrics.WebhookRejectionAPIServerInternalError, 0)}}admissionmetrics.Metrics.ObserveWebhook(time.Since(t), rejected, versionedAttr.Attributes, "validating", hook.Name)if err == nil {return}if callErr, ok := err.(*webhookutil.ErrCallingWebhook); ok {if ignoreClientCallFailures {klog.Warningf("Failed calling webhook, failing open %v: %v", hook.Name, callErr)utilruntime.HandleError(callErr)return}klog.Warningf("Failed calling webhook, failing closed %v: %v", hook.Name, err)errCh <- apierrors.NewInternalError(err)return}if rejectionErr, ok := err.(*webhookutil.ErrWebhookRejection); ok {err = rejectionErr.Status}klog.Warningf("rejected by webhook %q: %#v", hook.Name, err)errCh <- err}(relevantHooks[i])}// 等待多个goroutine 执行完成wg.Wait()close(errCh)var errs []errorfor e := range errCh {errs = append(errs, e)}if len(errs) == 0 {return nil}if len(errs) > 1 {for i := 1; i < len(errs); i++ {// TODO: merge status errors; until then, just return the first one.utilruntime.HandleError(errs[i])}}return errs[0]
}
3.2 MutatingAdmissionWebhook调用
看MutatingWebhook的构造函数就可以看到,MutatingWebhook和ValidatingWebhook的代码架构是一样的,只不过在创建generic.Webhook的时候传入的dispatcherFactory函数是newMutatingDispatcher,所以Webhook.Dispatch()最终调用的就是mutatingDispatcher.Dispatch(),这个和validatingDispatcher.Dispatch的实现逻辑基本是一样的,也是根据WebhookConfiguration中的rules是否匹配找到相关的webhooks,然后逐个调用。
// staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/plugin.go
// NewMutatingWebhook returns a generic admission webhook plugin.
func NewMutatingWebhook(configFile io.Reader) (*Plugin, error) {handler := admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update)p := &Plugin{}var err errorp.Webhook, err = generic.NewWebhook(handler, configFile, configuration.NewMutatingWebhookConfigurationManager, newMutatingDispatcher(p))if err != nil {return nil, err}return p, nil
}// ValidateInitialization implements the InitializationValidator interface.
func (a *Plugin) ValidateInitialization() error {if err := a.Webhook.ValidateInitialization(); err != nil {return err}return nil
}// Admit makes an admission decision based on the request attributes.
func (a *Plugin) Admit(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error {return a.Webhook.Dispatch(ctx, attr, o)
}
kube-apiserver源码-动态准入控制 admission webhook相关推荐
- 准入控制_Kubernetes动态准入控制示例
准入控制 A walk-through of creating a webhook for Kubernetes dynamic admission control. 创建用于Kubernetes动态 ...
- 全新仿DS网发卡源码 动态特效源码
介绍: 全新仿ds网发卡源码 动态特效 全新模板 全新后台系统 修复订单支付失败 去除授权后门 查单页面更换 用户体验感非常棒 增加前台显示数据 网盘下载地址: https://zijiewangpa ...
- kubernetes apiserver源码分析二之路由
apiserver的man函数在 k8s.io/kubernetes/cmd/kube-apiserver 目录. 但是大部分源码却在 k8s.io/apiserver 这个库里面. cmd 目录下的 ...
- 简述控制反转ioc_阅读Spring源码:IOC控制反转前的处理
温馨提示:要怀着 这个世界很美好 的心态去看~ 技术经验交流:点击入群 ClassPathXmlApplicationContext的注册方式 源码分析基于Spring4.3 从ClassPathXm ...
- 好看的二次元个人主页导航源码 动态背景+背景音乐
简介: 这是一款动态的个人导航页面 忘词了 你们凑合着看吧 源码上传解压就可以使用,文字链接修改 index.html 文件 音乐替换 0.mp3 文件 网盘下载地址: http://kekewl.c ...
- jq 评论源码+动态评论回复
评论回复源码下载 <!DOCTYPE html> <html><head><meta charset="utf-8"><met ...
- Android:剖析源码,随心所欲控制Toast显示
本文转载于:http://www.cnblogs.com/net168/p/4058193.html 前言 Toast相信大家都不会陌生吧,如 ...
- 以canvas为动态网页背景,添加的标签内容不显示(附源码-----动态背景星空)
问题描述: 我在网上下载了一个动态网页背景--星空,然后本地能运行,但是当我添加其他组件的时候,发现组件不显示,然后只能显示动态背景.仔细看了一下代码,发现这个动态背景是依靠canvas标签来实现的. ...
- Spring源码——动态AOP实现流程
前言 最近回顾了一下Spring源码,准备用思维导图的方式简单的将整个源码内容的流程展示出来,思维导图.图片等文件更新在https://github.com/MrSorrow/spring-frame ...
最新文章
- python02-条件语句到循环语句
- Android N新特性
- ES6新特性之字符串扩展
- 【Espruino】NO.12 加速度传感器演示
- angular2子组件的事件传递(任意组件事件传递)
- 301 302区别_如何正确理解301,302和canonial标签
- curl 增加header_libcurl增加HTTP header 和 POST之后获取返回数据
- 稀疏大模型简述:从MoE、Sparse Attention到GLaM
- php ssh tab补全,bash的按TAB键自动补全(自动完成)的原理与扩展
- azure blob_如何使用Power BI从Azure Blob存储访问数据
- 设计模式系列——三个工厂模式(简单工厂模式,工厂方法模式,抽象工厂模式)...
- mysql innodb启动失败_关于mysql innodb启动失败无法重启的处理方法讲解
- springMVC常见问题
- 【收藏】计算机视觉领域全球顶级高校研究所团队总结
- hudi系列-文件压缩(compaction)
- Excel 生成均匀分布、正态分布随机数
- python绘制中国_如何用Python画一个中国地图?
- 关于yolov3.weights文件下载地址的分享
- 点心云pcdn跑量越来越少问题解决方案
- gyp info it worked if it ends with ok npm ERR 解决办法