文章目录

  • 背景
  • webhook示例
  • 源码分析
  • 总结

背景

最近看到测试环境,coredns Pod挂掉了,但k8s APIServer调用webhook仍然正常,对此有点儿疑惑,难道APIServer调用webhook中的service不需要经过coredns域名解析?直接获取到了svc ClusterIP或者直接获取到了endpoint中的pod地址?带着这个问题,深入了解下apiServer请求webhook时的一些源码。

k8s版本:1.18.14
部署方式:kube-apiserver为二进制部署,systemd管理。

webhook示例

$ kubectl get mutatingwebhookconfigurations.admissionregistration.k8s.io  rocketmq-operator-mutating-webhook-configuration  -oyaml
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:annotations:cert-manager.io/inject-ca-from: rocketmq/rocketmq-operator-serving-certname: rocketmq-operator-mutating-webhook-configuration
webhooks:
- admissionReviewVersions:- v1- v1beta1clientConfig:caBundle: LS0tLSXXXservice:name: rocketmq-operator-webhook-servicenamespace: rocketmqpath: /mutate-rocketmq-apache-org-v1alpha1-rocketmqclusterport: 443failurePolicy: FailmatchPolicy: Equivalentname: mrocketmqcluster.kb.ionamespaceSelector: {}objectSelector: {}reinvocationPolicy: Neverrules:- apiGroups:- rocketmq.apache.orgapiVersions:- v1alpha1operations:- CREATE- UPDATEresources:- rocketmqclustersscope: '*'sideEffects: NonetimeoutSeconds: 10

源码分析

在源码的kubernetes/staging/src/k8s.io/apiserver/pkg/server/plugins.go的RegisterAllAdmissionPlugins函数中:

// RegisterAllAdmissionPlugins registers all admission plugins
func RegisterAllAdmissionPlugins(plugins *admission.Plugins) {lifecycle.Register(plugins)validatingwebhook.Register(plugins)mutatingwebhook.Register(plugins)
}

注册了三种准入控制插件:lifecycle、validatingwebhook、mutatingwebhook

这里只看mutatingwebhook,其他的后面文章会讲到。
kubernetes/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/plugin.go

//NewMutatingWebhook 返回一个通用的准入 webhook 插件。
func NewMutatingWebhook(configFile io.Reader) (*Plugin, error) {handler := admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update)p := &Plugin{}var err errorp.Webhook, err = generic.NewWebhook(handler, configFile, configuration.NewMutatingWebhookConfigurationManager, newMutatingDispatcher(p))if err != nil {return nil, err}return p, nil
}

这里的NewWebhook调用:kubernetes/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/generic/webhook.go

func NewWebhook(handler *admission.Handler, configFile io.Reader, sourceFactory sourceFactory, dispatcherFactory dispatcherFactory) (*Webhook, error) {kubeconfigFile, err := config.LoadConfig(configFile)//...省略...// Set defaults which may be overridden later.cm.SetServiceResolver(webhookutil.NewDefaultServiceResolver())return &Webhook{Handler:          handler,sourceFactory:    sourceFactory,clientManager:    &cm,namespaceMatcher: &namespace.Matcher{},objectMatcher:    &object.Matcher{},dispatcher:       dispatcherFactory(&cm),}, nil
}

设置为kubernetes/staging/src/k8s.io/apiserver/pkg/util/webhook/serviceresolver.go中的defaultServiceResolver:

//ResolveEndpoint 根据给定的命名空间和名称构造服务 URL,
//请注意名称、命名空间和端口是必需的,并且默认情况下所有创建的地址都使用 HTTPS 方案。
//例如:name=ross namespace=andromeda 解析为 https://ross.andromeda.svc:443
func (sr defaultServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {if len(name) == 0 || len(namespace) == 0 || port == 0 {return nil, errors.New("cannot resolve an empty service name or namespace or port")}return &url.URL{Scheme: "https", Host: fmt.Sprintf("%s.%s.svc:%d", name, namespace, port)}, nil
}

configuration.NewMutatingWebhookConfigurationManager中设置MutatingWebhookConfigurations对象变化时informer回调函数AddFunc、UpdateFunc、DeleteFunc,全部调用mutatingWebhookConfigurationManager的updateConfiguration方法,代码在kubernetes/staging/src/k8s.io/apiserver/pkg/admission/configuration/mutating_webhook_manager.go
中。

进而调用mergeMutatingWebhookConfigurations函数,为每个webhook生产一个accessors
kubernetes/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/accessors.go

func mergeMutatingWebhookConfigurations(configurations []*v1.MutatingWebhookConfiguration) []webhook.WebhookAccessor {//每个配置的 webhook 的内部顺序由用户提供,但配置本身可以是任何顺序。//由于我们将连续运行这些 webhook,因此它们在此处进行排序以具有确定性的顺序。sort.SliceStable(configurations, MutatingWebhookConfigurationSorter(configurations).ByName)accessors := []webhook.WebhookAccessor{}for _, c := range configurations {//webhook 名称未验证唯一性,因此我们检查重复项并添加 int 后缀以区分它们names := map[string]int{}for i := range c.Webhooks {n := c.Webhooks[i].Nameuid := fmt.Sprintf("%s/%s/%d", c.Name, n, names[n])names[n]++accessors = append(accessors, webhook.NewMutatingWebhookAccessor(uid, c.Name, &c.Webhooks[i]))}}return accessors
}

这里返回的accessor集合会被Store到一个atomic.Value中。后面会被Load。

当发生mutate webhook调用时,会执行kubernetes/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/plugin.go的

// Admit 根据请求属性做出准入决定。
func (a *Plugin) Admit(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error {return a.Webhook.Dispatch(ctx, attr, o)
}

这里面Dispatch方法中会Load到accessors集合,调用mutatingDispatcher的Dispatch方法。位于:
kubernetes/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/dispatcher.go

func (a *mutatingDispatcher) Dispatch(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces, hooks []webhook.WebhookAccessor) error { //...省略...changed, err := a.callAttrMutatingHook(ctx, hook, invocation, versionedAttr, o, round, i)//...省略...return nil
}func (a *mutatingDispatcher) callAttrMutatingHook(ctx context.Context, h *admissionregistrationv1.MutatingWebhook, invocation *generic.WebhookInvocation, attr *generic.VersionedAttributes, o admission.ObjectInterfaces, round, idx int) (bool, error) {configurationName := invocation.Webhook.GetConfigurationName()annotator := newWebhookAnnotator(attr, round, idx, h.Name, configurationName)changed := falsedefer func() { annotator.addMutationAnnotation(changed) }()if attr.Attributes.IsDryRun() {if h.SideEffects == nil {return false, &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: fmt.Errorf("Webhook SideEffects is nil")}}if !(*h.SideEffects == admissionregistrationv1.SideEffectClassNone || *h.SideEffects == admissionregistrationv1.SideEffectClassNoneOnDryRun) {return false, webhookerrors.NewDryRunUnsupportedErr(h.Name)}}uid, request, response, err := webhookrequest.CreateAdmissionObjects(attr, invocation)if err != nil {return false, &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: err}}// Make the webhook requestclient, err := invocation.Webhook.GetRESTClient(a.cm)if err != nil {return false, &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: err}}//...省略...if err := r.Do(ctx).Into(response); err != nil {return false, &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: err}}trace.Step("Request completed")result, err := webhookrequest.VerifyAdmissionResponse(uid, true, response)if err != nil {return false, &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: err}}//...省略...return changed, nil
}

重点在上面的client的构建,即调用invocation.Webhook.GetRESTClient(a.cm),当webhook为mutate时,这个accessor为mutatingWebhookAccessor,实现了WebhookAccessor接口。

func (m *mutatingWebhookAccessor) GetRESTClient(clientManager *webhookutil.ClientManager) (*rest.RESTClient, error) {m.initClient.Do(func() {m.client, m.clientErr = clientManager.HookClient(hookClientConfigForWebhook(m))})return m.client, m.clientErr
}

ClientManager的HookClient方法如下:

// HookClient 从缓存中获取一个 RESTClient,或者根据 webhook 配置构造一个。
func (cm *ClientManager) HookClient(cc ClientConfig) (*rest.RESTClient, error) {ccWithNoName := ccccWithNoName.Name = ""cacheKey, err := json.Marshal(ccWithNoName)if err != nil {return nil, err}if client, ok := cm.cache.Get(string(cacheKey)); ok {return client.(*rest.RESTClient), nil}complete := func(cfg *rest.Config) (*rest.RESTClient, error) {//避免与 webhook 后端通信的客户端速率限制。在决定服务多少请求时应该进行速率限制。cfg.QPS = -1//...省略...cfg.ContentConfig.NegotiatedSerializer = cm.negotiatedSerializercfg.ContentConfig.ContentType = runtime.ContentTypeJSONclient, err := rest.UnversionedRESTClientFor(cfg)if err == nil {cm.cache.Add(string(cacheKey), client)}return client, err}if cc.Service != nil {port := cc.Service.Portif port == 0 {// Default to port 443 if no service port is specifiedport = 443}restConfig, err := cm.authInfoResolver.ClientConfigForService(cc.Service.Name, cc.Service.Namespace, int(port))if err != nil {return nil, err}cfg := rest.CopyConfig(restConfig)serverName := cc.Service.Name + "." + cc.Service.Namespace + ".svc"host := net.JoinHostPort(serverName, strconv.Itoa(int(port)))cfg.Host = "https://" + hostcfg.APIPath = cc.Service.Path// Set the server name if not already setif len(cfg.TLSClientConfig.ServerName) == 0 {cfg.TLSClientConfig.ServerName = serverName}delegateDialer := cfg.Dialif delegateDialer == nil {var d net.DialerdelegateDialer = d.DialContext}cfg.Dial = func(ctx context.Context, network, addr string) (net.Conn, error) {if addr == host {u, err := cm.serviceResolver.ResolveEndpoint(cc.Service.Namespace, cc.Service.Name, port)if err != nil {return nil, err}addr = u.Host}return delegateDialer(ctx, network, addr)}return complete(cfg)}if cc.URL == "" {return nil, &ErrCallingWebhook{WebhookName: cc.Name, Reason: errors.New("webhook configuration must have either service or URL")}}u, err := url.Parse(cc.URL)if err != nil {return nil, &ErrCallingWebhook{WebhookName: cc.Name, Reason: fmt.Errorf("Unparsable URL: %v", err)}}hostPort := u.Hostif len(u.Port()) == 0 {// Default to port 443 if no port is specifiedhostPort = net.JoinHostPort(hostPort, "443")}restConfig, err := cm.authInfoResolver.ClientConfigFor(hostPort)if err != nil {return nil, err}cfg := rest.CopyConfig(restConfig)cfg.Host = u.Scheme + "://" + u.Hostcfg.APIPath = u.Pathreturn complete(cfg)
}

这个方法作用是生成webhook client。首先构建restConfig,其中包含了参数,cfg.Host、cfg.APIPath、cfg.Dial等,这些参数最终会调用complete函数中rest.UnversionedRESTClientFor(cfg)。

其中确定且需关注的点为:
url:https://rocketmq-operator-webhook-service.rocketmq.svc:443
ApiPath: /mutate-rocketmq-apache-org-v1alpha1-rocketmqcluster

Dial需重点关注,这里赋值为以下函数:

func(ctx context.Context, network, addr string) (net.Conn, error) {if addr == host {u, err := cm.serviceResolver.ResolveEndpoint(cc.Service.Namespace, cc.Service.Name, port)if err != nil {return nil, err}addr = u.Host}return delegateDialer(ctx, network, addr)
}

这里的cfg.Dial最终会赋值给http.Transport的DialContext,代码在kubernetes/staging/src/k8s.io/client-go/transport/cache.go中tlsTransportCache的get方法:

func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {key, canCache, err := tlsConfigKey(config)if err != nil {return nil, err}//...省略...// Get the TLS options for this client configtlsConfig, err := TLSConfigFor(config)if err != nil {return nil, err}//...省略...dial := config.Dialif dial == nil {dial = (&net.Dialer{Timeout:   30 * time.Second,KeepAlive: 30 * time.Second,}).DialContext}//...省略...transport := utilnet.SetTransportDefaults(&http.Transport{Proxy:               http.ProxyFromEnvironment,TLSHandshakeTimeout: 10 * time.Second,TLSClientConfig:     tlsConfig,MaxIdleConnsPerHost: idleConnsPerHost,DialContext:         dial,DisableCompression:  config.DisableCompression,})if canCache {// Cache a single transport for these optionsc.transports[key] = transport}return transport, nil
}

上面重点关注的cfg.Dial函数中的:

cm.serviceResolver.ResolveEndpoint(cc.Service.Namespace, cc.Service.Name, port)

serviceResolver实现为loopbackResolver,代码在kubernetes/staging/src/k8s.io/kube-aggregator/pkg/apiserver/resolvers.go中:

type loopbackResolver struct {delegate ServiceResolverhost     *url.URL
}func (r *loopbackResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {if namespace == "default" && name == "kubernetes" && port == 443 {return r.host, nil}return r.delegate.ResolveEndpoint(namespace, name, port)
}

在设置了–enable-aggregator-routing=true时,delegate又去调用实现为aggregatorEndpointRouting:位于kubernetes/staging/src/k8s.io/kube-aggregator/pkg/apiserver/resolvers.go中

type aggregatorEndpointRouting struct {services  listersv1.ServiceListerendpoints listersv1.EndpointsLister
}func (r *aggregatorEndpointRouting) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {return proxy.ResolveEndpoint(r.services, r.endpoints, namespace, name, port)
}

proxy.ResolveEndpoint如下:

// ResourceLocation returns a URL to which one can send traffic for the specified service.
func ResolveEndpoint(services listersv1.ServiceLister, endpoints listersv1.EndpointsLister, namespace, id string, port int32) (*url.URL, error) {svc, err := services.Services(namespace).Get(id)if err != nil {return nil, err}svcPort, err := findServicePort(svc, port)if err != nil {return nil, err}switch {case svc.Spec.Type == v1.ServiceTypeClusterIP, svc.Spec.Type == v1.ServiceTypeLoadBalancer, svc.Spec.Type == v1.ServiceTypeNodePort:// these are finedefault:return nil, fmt.Errorf("unsupported service type %q", svc.Spec.Type)}eps, err := endpoints.Endpoints(namespace).Get(svc.Name)if err != nil {return nil, err}if len(eps.Subsets) == 0 {return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svc.Name))}// Pick a random Subset to start searching from.ssSeed := rand.Intn(len(eps.Subsets))// Find a Subset that has the port.for ssi := 0; ssi < len(eps.Subsets); ssi++ {ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)]if len(ss.Addresses) == 0 {continue}for i := range ss.Ports {if ss.Ports[i].Name == svcPort.Name {// Pick a random address.ip := ss.Addresses[rand.Intn(len(ss.Addresses))].IPport := int(ss.Ports[i].Port)return &url.URL{Scheme: "https",Host:   net.JoinHostPort(ip, strconv.Itoa(port)),}, nil}}}return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id))
}

可以看到这里在lister缓存中获取到svc及对应的endpoint对象,返回https://podip:443。

总结

具体golang的http.Transport中的DialContext作用,这里不展开。

具体为什么实现为loopbackResolver->aggregatorEndpointRouting,会在后面webhook源码分析中专门介绍。

这里只定位APIServer webhook调用时,可以直接获取webhook podip地址,而不需要去coredns做解析,这在一定程度上做到解耦合(不依赖coredns)。

其实不止webhook,APIService(聚合API)也遵循上面的过程。

即使APIServer为静态Pod方式,由kubelet管理,创建出来的mirror Pod的spec.dnsPolicy依然为ClusterFirst,而Pod为hostNetwork网络,即Pod中的/etc/resolv.conf继承自主机,不经过coredns解析。

以上讨论的前提是webhook的clientConfig是service指定。如果是用url指定的,还需另当别论。

k8s APIServer调用webhook需要域名解析吗?相关推荐

  1. Tiller pods can‘t connect to k8s apiserver,dial tcp 10.254.0.1:443: no route to host

    1. 问题 使用helm list查看本地安装应用时,报错 # helm listError: Get https://10.254.0.1:443/api/v1/namespaces/kube-sy ...

  2. 在k8s中通过CoreDNS进行域名解析的其中三种方法

    1.CoreDNS概述 CoreDNS是一种新的DNS服务器,它开发的初衷主要是用于Linux和docker的配合使用,自kubernetes 1.11版本开始,CoreDNS取代原来的KubeDNS ...

  3. 云原生生态周报 Vol. 16 | CNCF 归档 rkt,容器运行时“上古”之战老兵凋零

    作者列表:木苏,临石,得为,等等 业界要闻 安全漏洞 CVE-2019-9512 CVE-2019-9514 http2 的 DOS 漏洞,一旦攻击成功会耗尽服务器的 cpu/mem,从而导致服务不可 ...

  4. 蚂蚁大规模 Kubernetes 集群无损升级实践指南【探索篇】

    文|王连平(花名:烨川 ) 蚂蚁集团高级开发工程师 负责蚂蚁 Kubernetes 集群容器交付 专注于集群交付能力.交付性能及交付 Trace 等相关领域 本文 12623 字 阅读 20 分钟 - ...

  5. k8s Webhook 准入控制应用实践

    k8s Webhook 准入控制应用实践 tags: 开发 文章目录 k8s Webhook 准入控制应用实践 1. 背景 2. 什么是准入 Webhook? 3. 先决条件 4. 编写一个准入 We ...

  6. Mac用户学Python——Day3调用飞书webhook接口

    Python+飞书自定义机器人 首先推荐一下飞书这款应用,之前使用过钉钉和企业微信,功能实在不够强大,而且界面很混乱,飞书绝对是工作组织的神器,强烈推荐. 一直希望在办公自动化里实现一些场景的提醒功能 ...

  7. Kubernetes(k8s)从入门到精通

    Kubernetes Kubernetes介绍 1.1 应用部署方式演变 在部署应用程序的方式上,主要经历了三个时代: 传统部署:互联网早期,会直接将应用程序部署在物理机上 优点:简单,不需要其它技术 ...

  8. @Kubernetes(k8s)

    Kubernetes 文章目录 Kubernetes 1. Kubernetes介绍 1.1 应用部署方式演变 1.2 kubernetes简介 1.3 kubernetes组件 1.4 kubern ...

  9. K8S使用教程(详细)

    Kubernetes详细教程 1. Kubernetes介绍 1.1 应用部署方式演变 在部署应用程序的方式上,主要经历了三个时代: 传统部署:互联网早期,会直接将应用程序部署在物理机上 优点:简单, ...

最新文章

  1. 超卖频发or商品滞销?压倒卖家的最后一根稻草竟是库存!
  2. 【原创译文】Jive Circle案例学习:以用户为中心的设计
  3. CISCO协议总结大全
  4. RxSwift之深入解析场景特征序列的使用和底层实现
  5. APF filter到底支持多复杂的条件
  6. CMFCColorDialog弹不出来或者CMFCColorButton的Other按钮无效
  7. 在数据采集器中用TensorFlow进行实时机器学习
  8. 网站登陆页面设计灵感,UI设计得有这个范儿
  9. Oracle 多表查询
  10. 查询相关股票十档行情的方法
  11. 简述改变计算机桌面背景的方法,怎么设置和更改桌面背景
  12. 易捷文件共享web服务器 v3.5,易捷文件共享Web服务器 官方版
  13. html5网页制作心得体会,网页设计课程学习心得总结
  14. 《分布式虚拟现实系统(DVR)》(Yanlz+Unity+SteamVR+分布式+DVR+人工智能+边缘计算+人机交互+云游戏+框架编程+立钻哥哥+)
  15. 【中等】和可被K整除的子数组
  16. 人生最曼妙的风景,竟是内心的淡定与从容——杨绛
  17. 申报国家高新技术企业认定,这八大错误认识不能有 。
  18. 以旧换新,iPhone5s免费拿
  19. 多页面分页打印功能实现
  20. 智慧零售时代,苏宁如何迎接双十一“大考”?

热门文章

  1. Excel-VBA常用对象(Application、Workbook、Worksheet、Range)
  2. espn配置路由_华为敏捷网络解决方案.ppt
  3. element-ui el-dialog侧边弹窗可横向拖拽改变宽度
  4. Socks代理上网工具 tsocks
  5. MFC - LNK2001 “无法解析的外部符号”的几种情况及解决办法
  6. Web应用开发技术笔记
  7. 学计算机的人常备哪些护眼的东西,常用电脑的人如何保护视力?
  8. Learning to Memorize Entailment and Discourse Relations for Persona-Consistent Dialogues论文学习
  9. 从数据类型 varchar 转换为 numeric 时出错
  10. 修改QQ默认下载目录