在本文中,将对pilotDiscovery服务创建流程进行源码分析 具体代码注释请移至
https://gitee.com/meng_mengs_boys/istio_1.14.1_test


pilot-discovery是istio的注册发现中心,可以说它相当于k8s中的kube-apiserver与协调各个组件,相当于指挥部的存在.
那么它具体有什么功能那?让我们来罗列一下(内容可能不全,还请补充)

  1. 动态更新istiod的配置(使用CRD机制,创建文件监听器…,注意这里是istiod的配置不是istio资源的配置)
  2. 证书认证与管理
  3. istio资源监听,将istio资源转化为envoy能够识别的配置然后向envoy进行推送.
  4. 上游服务(POD)的服务注册
  5. 创建webhook服务器,主要提供自动注入功能

下面是对pilot-discovery的创建方法,这里可以大体有个印象接下来本文会详细讲解里面的方法对应的功能,比如40行 s.initKubeClient(args) 的作用就是创建k8s客户端,根据client.conf为istio中的每个资源创建informer(CRD).

// NewServer creates a new Server instance based on the provided arguments.
func NewServer(args *PilotArgs, initFuncs ...func(*Server)) (*Server, error) {// 创建一个环境结构体.//该结构的作用包含了XDS服务器,对当前网格的管理.e := &model.Environment{PushContext:  model.NewPushContext(),DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix,}// 创建注册中心控制器ac := aggregate.NewController(aggregate.Options{// 当前网格管理者MeshHolder: e,})e.ServiceDiscovery = acs := &Server{clusterID:               getClusterID(args),environment:             e,fileWatcher:             filewatcher.NewWatcher(),httpMux:                 http.NewServeMux(),monitoringMux:           http.NewServeMux(),readinessProbes:         make(map[string]readinessProbe),workloadTrustBundle:     tb.NewTrustBundle(nil),server:                  server.New(),shutdownDuration:        args.ShutdownDuration,internalStop:            make(chan struct{}),istiodCertBundleWatcher: keycertbundle.NewWatcher(),}// 这里为空for _, fn := range initFuncs {fn(s)}// 该属性是证书池,存储服务发现中的证书e.TrustBundle = s.workloadTrustBundle//创建XDS服务,并初始化它,比如为ConfigGenerator生成器初始化,初始化caches.XDSServer = xds.NewDiscoveryServer(e, args.PodName, args.RegistryOptions.KubeOptions.ClusterAliases)prometheus.EnableHandlingTimeHistogram()// 初始化kubeclient,并创建所有资源的informer连接。if err := s.initKubeClient(args); err != nil {return nil, fmt.Errorf("error initializing kube client: %v", err)}// 获取endpoint模式,是否为分片模式args.RegistryOptions.KubeOptions.EndpointMode = kubecontroller.DetectEndpointMode(s.kubeClient)//初始化 网格网络配置信息、创建监听事件/*meshConfig ConfigMapInformer事件触发后迪奥哟用的handler1.initMeshHandlers() 向envoy 发送通知2.environment.Init().NewClusterLocalProvider().onMeshUpdated更新environment.clusterLocalServices.hosts 本地host值3.environment.InitNetworksManager(s.XDSServer).NewNetworkManager().NetworkManager.reloadAndPush()*/s.initMeshConfiguration(args, s.fileWatcher)//创建信用域, 是SPIRE需要的配置 ,1.14中的新特性,默认值为cluster.localspiffe.SetTrustDomain(s.environment.Mesh().GetTrustDomain())//这里根据参数 networksConfig 重新创建NetworksWatchers.initMeshNetworks(args, s.fileWatcher)//在initMeshConfiguration为mesh配置创建了Infomer,这里就是添加事件函数s.initMeshHandlers()//设置只允许本集群访问的host//本地环境相关内容初始化,这里只做了将本地集群访问可以访问的host存储起来s.environment.Init()//初始化network管理器// 例子:/*networks:network1:endpoints:- fromRegistry: registry1 #must match kubeconfig name in Kubernetes secret- fromCidr: 192.168.100.0/22 #a VM network for examplegateways:- registryServiceName: istio-ingressgateway.istio-system.svc.cluster.localport: 15443locality: us-east-1a- address: 192.168.100.1port: 15443locality: us-east-1a*/if err := s.environment.InitNetworksManager(s.XDSServer); err != nil {return nil, err}// Ca配置caOpts := &caOptions{TrustDomain:      s.environment.Mesh().TrustDomain,Namespace:        args.Namespace,ExternalCAType:   ra.CaExternalType(externalCaType),CertSignerDomain: features.CertSignerDomain,}if caOpts.ExternalCAType == ra.ExtCAK8s {// Older environment variable preserved for backward compatibilitycaOpts.ExternalCASigner = k8sSigner}// 创建CA证书管理器,用于证书签发、验证等功能if err := s.maybeCreateCA(caOpts); err != nil {return nil, err}// 重点//为路由规则、目的规则等CRD绑定增删改事件,原理采用informer机制。// 为服务发现注册一个控制器,实现服务发现功能。if err := s.initControllers(args); err != nil {return nil, err}//初始化Envoy配置生成插件//以map的形式存储插件//在发送给envoy时,会将原本的istio结构体转换生成为envoy能够识别的配置s.XDSServer.InitGenerators(e, args.Namespace)// Initialize workloadTrustBundle after CA has been initializedif err := s.initWorkloadTrustBundle(args); err != nil {return nil, err}// 获取当前服务hostistiodHost, _, err := e.GetDiscoveryAddress()if err != nil {return nil, err}// Create Istiod certs and setup watches.if err := s.initIstiodCerts(args, string(istiodHost)); err != nil {return nil, err}//  创建有TLS的Grpc服务if err := s.initSecureDiscoveryService(args); err != nil {return nil, fmt.Errorf("error initializing secure gRPC Listener: %v", err)}var wh *inject.Webhook// common https server for webhooks (e.g. injection, validation)if s.kubeClient != nil {// 这里会创建安全的webhook服务器,以供K8sAdmission访问s.initSecureWebhookServer(args)// 添加/inject handler,在原有的pod基础上添加init container属性//在MutatingAdmissionWebhooks时调用wh, err = s.initSidecarInjector(args)if err != nil {return nil, fmt.Errorf("error initializing sidecar injector: %v", err)}//校验配置信息//在validateadmission时调用if err := s.initConfigValidation(args); err != nil {return nil, fmt.Errorf("error initializing config validator: %v", err)}}whc := func() map[string]string {if wh != nil {return wh.Config.RawTemplates}return map[string]string{}}// Used for readiness, monitoring and debug handlers.if err := s.initIstiodAdminServer(args, whc); err != nil {return nil, fmt.Errorf("error initializing debug server: %v", err)}// 为上面CRD资源注册事件触发回调函数//作用为,通知监听的服务,向Envoy发送规则配置更改指令s.initRegistryEventHandlers()//创建Grpc服务器,主要提供服务注册调用的接口。//其中的handler由proto直接生成,作用是接受请求,并将请求解析成WorkloadEntry资源,使用k8s客户端进行创建//创建后会触发上面服务注册控制器s.initDiscoveryService(args)s.initSDSServer()// 权限认证,先使用证书认证,如果成果就不会往下判断authenticators := []security.Authenticator{&authenticate.ClientCertAuthenticator{},}if args.JwtRule != "" {jwtAuthn, err := initOIDC(args, s.environment.Mesh().TrustDomain)if err != nil {return nil, fmt.Errorf("error initializing OIDC: %v", err)}if jwtAuthn == nil {return nil, fmt.Errorf("JWT authenticator is nil")}authenticators = append(authenticators, jwtAuthn)}// 这里会添加上jwt认证authenticators = append(authenticators,kubeauth.NewKubeJWTAuthenticator(s.environment.Watcher, s.kubeClient, s.clusterID, s.multiclusterController.GetRemoteKubeClient, features.JwtPolicy))if features.XDSAuth {s.XDSServer.Authenticators = authenticators}caOpts.Authenticators = authenticators// 启动CA认证服务器s.startCA(caOpts)// TODO: don't run this if galley is started, one ctlz is enoughif args.CtrlZOptions != nil {_, _ = ctrlz.Run(args.CtrlZOptions, nil)}// 启动所有资源informerif s.kubeClient != nil {s.addStartFunc(func(stop <-chan struct{}) error {s.kubeClient.RunAndWait(stop)return nil})}//添加就绪探针s.addReadinessProbe("discovery", func() (bool, error) {return s.XDSServer.IsServerReady(), nil})return s, nil
}

我们可以看到返回值是一个Server结构体,它包含了整个discovery运行过程中所需要的服务,配置信息,那么就让我们看一下它都有哪些属性!

Server配置

对于discovery中所有包含的服务、配置都存储在pilot\pkg\bootstrap\server.go:Server 结构体中,那么它究竟包含了哪些属性那?让我们一探究竟!

type Server struct {//创建XDS服务, XDS是与envoy通讯的服务器,istio通过它与envoy进行通讯XDSServer *xds.DiscoveryServer// 判断要使用的注册集群,比如使用的MCP模式或者使用k8sclusterID cluster.ID//记录了当前集群中的服务个数、规则配置environment *model.Environment//kubeClient客户端kubeClient kubelib.Client//多集群控制器multiclusterController *multicluster.Controller// 配置管理中心,有MCP、K8s管理中心(informer)、内存管理中心configController model.ConfigStoreController//配置管理中心数组ConfigStores []model.ConfigStoreController//外部服务控制器,用于对serviceEntry信息进行存储,将其转化为instanceserviceEntryController *serviceentry.ControllerhttpServer       *http.Server // debug, monitoring and readiness Server.httpsServer      *http.Server // webhooks HTTPS Server.httpsReadyClient *http.Client// XDSServer其实是使用grpcserver跟envoy进行通讯grpcServer        *grpc.ServergrpcAddress       stringsecureGrpcServer  *grpc.ServersecureGrpcAddress string// monitoringMux listens on monitoringAddr(:15014).// Currently runs prometheus monitoring and debug (if enabled).monitoringMux *http.ServeMux// httpMux listens on the httpAddr (8080).// If a Gateway is used in front and https is off it is also multiplexing// the rest of the features if their port is empty.// Currently runs readiness and debug (if enabled)httpMux *http.ServeMux// httpsMux listens on the httpsAddr(15017), handling webhooks// If the address os empty, the webhooks will be set on the default httpPort.httpsMux *http.ServeMux // webhooks// MultiplexGRPC will serve gRPC and HTTP (1 or 2) over the HTTPListener, if enabled.MultiplexGRPC bool// fileWatcher used to watch mesh config, networks and certificates.// 文件监听器,监听mesh配置与networks,certificates配置fileWatcher filewatcher.FileWatcher// certWatcher watches the certificates for changes and triggers a notification to Istiod.//证书监听器cacertsWatcher *fsnotify.WatcherdnsNames       []string//证书管理器certController *chiron.WebhookController//CA证书CA *ca.IstioCARA ra.RegistrationAuthority// TrustAnchors for workload to workload mTLSworkloadTrustBundle     *tb.TrustBundlecertMu                  sync.RWMutexistiodCert              *tls.CertificateistiodCertBundleWatcher *keycertbundle.Watcher// 对于server中的组件进行存储起来,在start时进行启动server server.InstancereadinessProbes map[string]readinessProbe// duration used for graceful shutdown.shutdownDuration time.Duration// internalStop is closed when the server is shutdown. This should be avoided as much as possible, in// favor of AddStartFunc. This is only required if we *must* start something outside of this process.// For example, everything depends on mesh config, so we use it there rather than trying to sequence everything// in AddStartFuncinternalStop chan struct{}//状态通知statusReporter *distribution.Reporter//状态管理statusManager *status.Manager// RWConfigStore is the configstore which allows updates, particularly for status.RWConfigStore model.ConfigStoreController
}

如果我们将Server中的属性按照功能分类的话与我们上面罗列的discovery的功能基本对应,那么本文将按照功能模块对NewServer(也就是pilot-discovery服务的创建)进行分析

Server的创建

既然是返回Server那么首先应该先创建它

 // 创建一个环境结构体.//该结构的作用包含了XDS服务器,对当前网格的管理.e := &model.Environment{PushContext:  model.NewPushContext(),DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix,}e.SetLedger(buildLedger(args.RegistryOptions))// 创建注册中心控制器ac := aggregate.NewController(aggregate.Options{// 当前网格管理者MeshHolder: e,})e.ServiceDiscovery = acs := &Server{clusterID:               getClusterID(args),environment:             e,fileWatcher:             filewatcher.NewWatcher(),httpMux:                 http.NewServeMux(),monitoringMux:           http.NewServeMux(),readinessProbes:         make(map[string]readinessProbe),workloadTrustBundle:     tb.NewTrustBundle(nil),server:                  server.New(),shutdownDuration:        args.ShutdownDuration,internalStop:            make(chan struct{}),istiodCertBundleWatcher: keycertbundle.NewWatcher(),}// 这里为空,传值为nilfor _, fn := range initFuncs {fn(s)}// 该属性是证书池,存储服务发现中的证书,好像提供mTLS功能e.TrustBundle = s.workloadTrustBundle

Environment的创建

我们从3-6行可以看到创建了一个Environment结构体,为两个属性赋了值,这个时候疑问就出来了,Environment结构体有什么用,这里面的两个属性为什么要这么赋值?那么接下来让我们一一讲解!

type Environment struct {// Discovery interface for listing services and instances.// istio会将ServiceEntry和workloadEntry解析成ServiceInstance结构体然后进行存储到注册表中// 而IstioService管理器主要是管理注册表,比如获取当前集群中的所有服务等等// 默认为ServiceDiscovery// 管理istio资源配置,比如获取istio中的资源配置信息,默认使用的CRDInformer获取k8s中的资源ConfigStore// Watcher is the watcher for the mesh config (to be merged into the config store)// 这是对mesh配置的监听,mesh配置默认在cm中的istio配置中mesh.Watcher//对mesh配置中的Network进行监听NetworksWatcher mesh.NetworksWatcher//对mesh配置中的Network进行管理NetworkManager *NetworkManager//一个上下文信息,记录了当前集群中的所有配置信息,比如virtualService,destinationRule等//为什么会记录所有那?因为istio是一个全量推送(当然有一些优化但是这些基本的元素还是全量)//所以会记录所有的信息,在每个推送会话过程中会使用到PushContext *PushContext// istio服务的后缀,默认为cluster.localDomainSuffix stringledger ledger.Ledger// TrustBundle: List of Mesh TrustAnchors// 与mesh配置相关的TrustAnchorsTrustBundle *trustbundle.TrustBundle// 本地集群中的一些服务信息,做一些比如namespaces为kube-system的服务,不能动的操作等clusterLocalServices ClusterLocalProvider//网关api管理器GatewayAPIController GatewayController
}e.SetLedger(buildLedger(args.RegistryOptions))// 创建注册中心控制器ac := aggregate.NewController(aggregate.Options{// 当前网格管理者MeshHolder: e,})e.ServiceDiscovery = ac

对于注册中心控制器,我们不需要太多关注,它主要是将每个注册中心进行整合起来\

PushContext

这里重点讲解一下PushContext,因为当配置发生配置之后,istio会将其进行envoy配置的生成.而envoy配置的生成很有可能是全部生成,这时候就需要用到所有的资源,如果每操作一次都获取全部的资源是非常麻烦的,所以istio创建了一个PushContext,该上下文包含了所有资源的信息,当调用xdsservice.configUpdate()方法后(下篇start时会讲到)会根据全局的PushContext(也就是当前的这个)拷贝一个信息的PushContext进行接下来的操作.


Mesh创建与监听

Server创建完成后,继续往下走我们可以看到好几个对mesh,Networks的初始化等方法,他们所提供的功能是监听mesh配置,根据mesh配置的变动进行推送。

// 获取endpoint模式,是否为分片模式args.RegistryOptions.KubeOptions.EndpointMode = kubecontroller.DetectEndpointMode(s.kubeClient)//初始化 网格网络配置信息、创建监听事件/*meshConfig ConfigMapInformer事件触发后迪奥哟用的handler1.initMeshHandlers() 向envoy 发送通知2.environment.Init().NewClusterLocalProvider().onMeshUpdated更新environment.clusterLocalServices.hosts 本地host值3.environment.InitNetworksManager(s.XDSServer).NewNetworkManager().NetworkManager.reloadAndPush()*/s.initMeshConfiguration(args, s.fileWatcher)//创建信用域, 是SPIRE需要的配置 ,1.14中的新特性,默认值为cluster.localspiffe.SetTrustDomain(s.environment.Mesh().GetTrustDomain())//这里根据参数 networksConfig 重新创建NetworksWatchers.initMeshNetworks(args, s.fileWatcher)//在initMeshConfiguration为mesh配置创建了Infomer,这里就是添加事件函数s.initMeshHandlers()//设置只允许本集群访问的host//本地环境相关内容初始化,这里只做了将本地集群访问可以访问的host存储起来s.environment.Init()//初始化network管理器// 例子:/*networks:network1:endpoints:- fromRegistry: registry1 #must match kubeconfig name in Kubernetes secret- fromCidr: 192.168.100.0/22 #a VM network for examplegateways:- registryServiceName: istio-ingressgateway.istio-system.svc.cluster.localport: 15443locality: us-east-1a- address: 192.168.100.1port: 15443locality: us-east-1a*/if err := s.environment.InitNetworksManager(s.XDSServer); err != nil {return nil, err}

可能大家对这块有点糊涂(反正我是这样)其实这块很好理解,它的基本流程就是,通过file或者Informer对mesh配置进行监听,如果有变动,那就推送给Envoy.

证书认证

对于证书这块代码有些长,所以请细心查看

 // 该属性是证书池,存储服务发现中的证书e.TrustBundle = s.workloadTrustBundle//创建信用域, 是SPIRE需要的配置 ,1.14中的新特性,默认值为cluster.localspiffe.SetTrustDomain(s.environment.Mesh().GetTrustDomain())// Ca配置caOpts := &caOptions{TrustDomain:      s.environment.Mesh().TrustDomain,Namespace:        args.Namespace,ExternalCAType:   ra.CaExternalType(externalCaType),CertSignerDomain: features.CertSignerDomain,}if caOpts.ExternalCAType == ra.ExtCAK8s {// Older environment variable preserved for backward compatibilitycaOpts.ExternalCASigner = k8sSigner}// 创建CA证书管理器,用于证书签发、验证等功能if err := s.maybeCreateCA(caOpts); err != nil {return nil, err}// Initialize workloadTrustBundle after CA has been initializedif err := s.initWorkloadTrustBundle(args); err != nil {return nil, err}// Create Istiod certs and setup watches.
// 这里生成istiod的证书,主要用于webhook与安全Grpcif err := s.initIstiodCerts(args, string(istiodHost)); err != nil {return nil, err}//  创建有TLS的Grpc服务if err := s.initSecureDiscoveryService(args); err != nil {return nil, fmt.Errorf("error initializing secure gRPC Listener: %v", err)}//Secret discovery service,envoy通过SDS向pilot-agent发送证书和密钥请求//在收到 SDS 请求后,istio-agent 创建私钥和 CSR,然后将 CSR 及其凭据发送到 istiod CA 进行签名。//istiod CA 验证 CSR 中携带的凭据,成功验证后签署 CSR 以生成证书。//Istio-agent 通过 Envoy SDS API 将私钥和从 Istio CA 收到的证书发送给 Envoy。//Istio-agent 会监工作负载证书的有效期。上述 CSR 过程会周期性地重复,以处理证书和密钥轮换。s.initSDSServer()authenticators := []security.Authenticator{&authenticate.ClientCertAuthenticator{},}if args.JwtRule != "" {jwtAuthn, err := initOIDC(args, s.environment.Mesh().TrustDomain)if err != nil {return nil, fmt.Errorf("error initializing OIDC: %v", err)}if jwtAuthn == nil {return nil, fmt.Errorf("JWT authenticator is nil")}authenticators = append(authenticators, jwtAuthn)}// 这里会添加上jwt认证authenticators = append(authenticators,kubeauth.NewKubeJWTAuthenticator(s.environment.Watcher, s.kubeClient, s.clusterID, s.multiclusterController.GetRemoteKubeClient, features.JwtPolicy))if features.XDSAuth {s.XDSServer.Authenticators = authenticators}caOpts.Authenticators = authenticators// 启动CA认证服务器,向grpc注册创建ca证书handlers.startCA(caOpts)// TODO: don't run this if galley is started, one ctlz is enoughif args.CtrlZOptions != nil {_, _ = ctrlz.Run(args.CtrlZOptions, nil)}

对于证书的签证与管理有两步

  1. 管理每个工作负载,使用插入CA证书,管理员可以在运行参数中静态提供CA证书的地址,如果没有提供的话默认istiod会创建一个自签名的根证书和密钥,并使用它们来签署工作负载证书。
  2. 管理DNS证书,istio使用 Chiron 配置和管理 DNS 证书,Chiron 是一个与 Istiod 相连的轻量型组件,使用 Kubernetes 的 CA API 签发证书.

DiscoveryServer

接下里就开始本文的重点DiscoveryServer的创建,该结构体定义了对istio中资源的操作反馈,可以说它完成了istio-Discovery最核心的功能
创建XDS服务, XDS是与envoy通讯的服务器,istio通过它与envoy进行通讯

    //创建XDS服务s.XDSServer = xds.NewDiscoveryServer(e, args.PodName, args.RegistryOptions.KubeOptions.ClusterAliases)// 重点//为路由规则、目的规则等CRD绑定增删改事件,原理采用informer机制。// 为服务发现注册一个控制器,实现服务发现功能。if err := s.initControllers(args); err != nil {return nil, err}//初始化Envoy配置生成插件//以map的形式存储插件//在发送给envoy时,会将原本的istio结构体转换生成为envoy能够识别的配置s.XDSServer.InitGenerators(e, args.Namespace)
// 为上面CRD资源注册事件触发回调函数//作用为,通知监听的服务,向Envoy发送规则配置更改指令s.initRegistryEventHandlers()//创建Grpc服务器,主要提供服务注册调用的接口。//其中的handler由proto直接生成,作用是接受请求//创建后会触发上面服务注册控制器s.initDiscoveryService(args)

结构

既然它这么重要,就让我们看看他的结构吧!

// DiscoveryServer is Pilot's gRPC implementation for Envoy's xds APIs
type DiscoveryServer struct {// 与server中的env一致Env *model.Environment// MemRegistry is used for debug and load testing, allow adding services. Visible for testing.// 创建debug,测试使用的服务MemRegistry *memory.ServiceDiscovery// 配置生成器ConfigGenerator core.ConfigGenerator// 为每个Envoy配置添加对应的配置生成器,有的配置会使用到上面的配置生成器,而有的不会使用过Generators map[string]model.XdsResourceGenerator// ProxyNeedsPush is a function that determines whether a push can be completely skipped. Individual generators// may also choose to not send any updates.ProxyNeedsPush func(proxy *model.Proxy, req *model.PushRequest) bool// concurrentPushLimit is a semaphore that limits the amount of concurrent XDS pushes.concurrentPushLimit chan struct{}// requestRateLimit limits the number of new XDS requests allowed. This helps prevent thundering hurd of incoming requests.requestRateLimit *rate.Limiter// InboundUpdates describes the number of configuration updates the discovery server has receivedInboundUpdates *atomic.Int64// CommittedUpdates describes the number of configuration updates the discovery server has// received, process, and stored in the push context. If this number is less than InboundUpdates,// there are updates we have not yet processed.// Note: This does not mean that all proxies have received these configurations; it is strictly// the push context, which means that the next push to a proxy will receive this configuration.CommittedUpdates *atomic.Int64// IstioEndpoint分片存储器,根据服务名与命名空间存储EndpointIndex *model.EndpointIndex// pushChannel is the buffer used for debouncing.// after debouncing the pushRequest will be sent to pushQueuepushChannel chan *model.PushRequest// mutex used for protecting Environment.PushContextupdateMutex sync.RWMutex// pushQueue is the buffer that used after debounce and before the real xds push.pushQueue *PushQueue// debugHandlers is the list of all the supported debug handlers.debugHandlers map[string]string// adsClients reflect active gRPC channels, for both ADS and EDS.// 存储了istio与每个envoy创建的connadsClients      map[string]*ConnectionadsClientsMutex sync.RWMutexStatusReporter DistributionStatusCache// Authenticators for XDS requests. Should be same/subset of the CA authenticators.// 请求身份验证集合Authenticators []security.Authenticator// StatusGen is notified of connect/disconnect/nack on all connectionsStatusGen *StatusGen// WorkloadEntry控制器,作用是对每个envoy进行健康检测WorkloadEntryController *workloadentry.Controller// serverReady indicates caches have been synced up and server is ready to process requests.// 标识缓存已经同步完成serverReady atomic.Bool// 去抖策略debounceOptions debounceOptions// 当前istiod所对应的podNameinstanceID string// 缓存XDS资源Cache model.XdsCache// jwt解析器JwtKeyResolver *model.JwksResolver// ListRemoteClusters collects debug information about other clusters this istiod reads from.ListRemoteClusters func() []cluster.DebugInfo// ClusterAliases are aliase names for cluster. When a proxy connects with a cluster ID// and if it has a different alias we should use that a cluster ID for proxy.// 集群别名ClusterAliases map[cluster.ID]cluster.ID
}

上面有些属性没有进行翻译,因为都是些通俗易懂的配置,如果不了解请查阅istio启动配置参数进行了解.

initControllers

它的主要功能有两个

  1. 根据注册中心创建资源监听事件,比如使用k8s注册中心,那么为每个资源创建informer监听事件
  2. 为serviceEntry,workloadEntry创建控制器主要存储存储了服务注册时创建的serviceEntry,workloadEntry信息

在该方法中进行了注册中心的判断,如果我们使用默认的配置话使用的是k8s作为注册中心.

// initControllers initializes the controllers.
func (s *Server) initControllers(args *PilotArgs) error {log.Info("initializing controllers")// 集群原生资源管理s.initMulticluster(args)// Certificate controller is created before MCP controller in case MCP server pod// waits to mount a certificate to be provisioned by the certificate controller.// 将mesh配置中的Certificate,生成密钥对并获取由 K8s_CA 签名的公共证书// 选项用于签署 DNS 证书,然后创建k8s secret资源if err := s.initCertController(args); err != nil {return fmt.Errorf("error initializing certificate controller: %v", err)}//为istio中的CRD资源创建informer并绑定事件函数,比如虚拟路由规则、目标地址规则等if err := s.initConfigController(args); err != nil {return fmt.Errorf("error initializing config controller: %v", err)}//创建服务注册控制器,里面绑定了Serivceentry和workloadEntry资源的事件函数if err := s.initServiceControllers(args); err != nil {return fmt.Errorf("error initializing service controllers: %v", err)}return nil
}

Controller结构体

老规矩先看一下Controller结构体

// Controller communicates with ServiceEntry CRDs and monitors for changes.
type Controller struct {// 向XDS客户端,主要向envoy推送信息XdsUpdater model.XDSUpdater// 获取资源客户端,比如client-gostore model.ConfigStore//集群ID k8sclusterID cluster.ID// This lock is to make multi ops on the below stores. For example, in some case,// it requires delete all instances and then update new ones.// 资源锁mutex sync.RWMutex// 存储serviceInstances,由ServiceEntry转换serviceInstances serviceInstancesStore// NOTE: historically, one index for both WorkloadEntry(s) and Pod(s);//       beware of naming collisions// 存储workloadInstances由workloadEntry转换workloadInstances workloadinstances.Index//所有Serviceservices serviceStore// to make sure the eds update run in serial to prevent stale ones can override new ones// There are multiple threads calling edsUpdate.// If all share one lock, then all the threads can have an obvious performance downgrade.// 将所有更新操作串行化edsQueue queue.InstanceworkloadHandlers []func(*model.WorkloadInstance, model.Event)// 用于根据工作负载 ip 和标签获取网络 ID 的回调函数。networkIDCallback func(IP string, labels labels.Instance) network.IDprocessServiceEntry boolmodel.NetworkGatewaysHandler
}

initMulticluster

初始化多集群(英文直译),监听每个集群中的资源,pod,namespaces,endpoint,service,根据这些资源的事件来更新envoy中的endpoint
具体的更新流程为

  1. 创建一个POD,触发更新事件
  2. 根据POD的命名空间与名称查找endpoints中是否有信息
  3. 如果找到获取endpoints信息,将其转换为IstioEndpoints 存入缓存中并推送给envoy
  4. 如果没有找到则结束.

看到这里我们应该对它的功能有所熟悉了,它就是将原生的service也添加到内部网络中,这就是istio兼容k8s原生服务的原理
接下来让我们看一下它的创建过程

func (s *Server) initMulticluster(args *PilotArgs) {if s.kubeClient == nil {return}// 创建secert informer,主要监听每个集群的客户端证书s.multiclusterController = multicluster.NewController(s.kubeClient, args.Namespace, s.clusterID)//根据secert证书获取集群同步状态s.XDSServer.ListRemoteClusters = s.multiclusterController.ListRemoteClusters// 启动每个集群s.addStartFunc(func(stop <-chan struct{}) error {return s.multiclusterController.Run(stop)})
}
  1. 第6行,其实是创建了secert的informer监听,主要监听每个集群的客户端证书,然后会根据secert的添加(相当于集群的注册)调用方法,将该集群的原生资源解析(比如说Service资源那么就转换成istioService并推送给上游服务)
  2. 第8行主要判断当前集群组同步状态,当client.RunAndWait(client-go里的方法,判断当前informer等缓存同步)成功后完成集群同步状态
  3. 第10-12行是开启本地集群,并运行远程集群注册informer,本地集群的运行在下篇启动时讲解

initConfigController

对于initConfigController的详细介绍请移至https://blog.csdn.net/a1023934860/article/details/125787691?spm=1001.2014.3001.5502

initServiceControllers

它的功能是注册服务发现控制器,其主要功能是存储由serviceEntry,workloadEntry转换成的ServiceInstance.
其主要的功能为为serviceEntry,workloadEntry注册了监听触发事件

// NewController creates a new ServiceEntry discovery service.
func NewController(configController model.ConfigStoreController, store model.ConfigStore, xdsUpdater model.XDSUpdater,options ...Option) *Controller {//创建控制器,里面存储了pod注册时创建的serviceEntry,workloadEntry信息s := newController(store, xdsUpdater, options...)if configController != nil {//为serviceEntry   Informer添加handler方法configController.RegisterEventHandler(gvk.ServiceEntry, s.serviceEntryHandler)// 同理为workloadEntry informer添加handler方法configController.RegisterEventHandler(gvk.WorkloadEntry, s.workloadEntryHandler)_ = configController.SetWatchErrorHandler(informermetric.ErrorHandlerForCluster(s.clusterID))}return s
}

这两个handler会在下一篇Start讲解


WebHook服务创建与自动注入

我们在使用的过程中一般都会自动让其注入代理,那么就让我们看看webhook的注入吧.

 var wh *inject.Webhook// common https server for webhooks (e.g. injection, validation)if s.kubeClient != nil {// 这里会创建安全的webhook服务器,以供K8sAdmission访问s.initSecureWebhookServer(args)// 添加/inject handler,在原有的pod基础上添加init container属性//在MutatingAdmissionWebhooks时调用wh, err = s.initSidecarInjector(args)if err != nil {return nil, fmt.Errorf("error initializing sidecar injector: %v", err)}//校验配置信息//在validateadmission时调用if err := s.initConfigValidation(args); err != nil {return nil, fmt.Errorf("error initializing config validator: %v", err)}}

首先第5行创建了一个基于istio证书认证的http2.0服务器.下面第8行才是重点让我们直接进入initSidecarInjector一探究竟

initSidecarInjector


func (s *Server) initSidecarInjector(args *PilotArgs) (*inject.Webhook, error) {// currently the constant: "./var/lib/istio/inject"\// 根据inject路径获取inject配置文件injectPath := args.InjectionOptions.InjectionDirectoryif injectPath == "" || !injectionEnabled.Get() {log.Infof("Skipping sidecar injector, injection path is missing or disabled.")return nil, nil}// If the injection config exists either locally or remotely, we will set up injection.// 这里会判断如果本地没有则会去k8s中查询Cmvar watcher inject.Watcherif _, err := os.Stat(filepath.Join(injectPath, "config")); !os.IsNotExist(err) {configFile := filepath.Join(injectPath, "config")valuesFile := filepath.Join(injectPath, "values")watcher, err = inject.NewFileWatcher(configFile, valuesFile)if err != nil {return nil, err}} else if s.kubeClient != nil {configMapName := getInjectorConfigMapName(args.Revision)cms := s.kubeClient.CoreV1().ConfigMaps(args.Namespace)if _, err := cms.Get(context.TODO(), configMapName, metav1.GetOptions{}); err != nil {if errors.IsNotFound(err) {log.Infof("Skipping sidecar injector, template not found")return nil, nil}return nil, err}watcher = inject.NewConfigMapWatcher(s.kubeClient, args.Namespace, configMapName, "config", "values")} else {log.Infof("Skipping sidecar injector, template not found")return nil, nil}log.Info("initializing sidecar injector")parameters := inject.WebhookParameters{Watcher:  watcher,Env:      s.environment,Mux:      s.httpsMux,Revision: args.Revision,}// 添加webhook   handler// 添加inject方法wh, err := inject.NewWebhook(parameters)
}//进入NewWebhook
// NewWebhook creates a new instance of a mutating webhook for automatic sidecar injection.
func NewWebhook(p WebhookParameters) (*Webhook, error) {if p.Mux == nil {return nil, errors.New("expected mux to be passed, but was not passed")}wh := &Webhook{watcher:    p.Watcher,meshConfig: p.Env.Mesh(),env:        p.Env,revision:   p.Revision,}// 添加自动注入方法,我们在创建资源后apiserver会根据webhook配置调用该handler对POD资源进行填充p.Mux.HandleFunc("/inject", wh.serveInject)p.Mux.HandleFunc("/inject/", wh.serveInject)p.Env.Watcher.AddMeshHandler(func() {wh.mu.Lock()wh.meshConfig = p.Env.Mesh()wh.mu.Unlock()})return wh, nil
}

对于serveInject方法,主要对POD添加了init,istio-proxy等信息.


还有一个方法没有讲到第171行s.initDiscoveryService(args),该方法是创建一个Grpc服务器,主要供上游注册时访问,该服务器注册了服务注册的方法,具体流程请移至下篇文章Istio-PilotDiscovery服务的启动

总结

下面是一些需要注意的地方

  1. 它里面有五个控制器ServiceDiscovery,configStroe,XDSService,serviceEntryController,WorkloadEntryController

这五个控制器基本上完成了大部分功能让我们看一下他们具体的功能
ServiceDiscovery,服务管理器目的是整合所有的注册控制器,向外提供统一的接口,比如获取上游服务,它会遍历所有的注册中心然后将其整合在一起返回
configStroe,配置管理中心,对于使用k8s注册中心来说,它就是一个client-go客户端,对每个资源创建informe并使用informer缓存所有的资源信息,供系统调用
XDSService,主要保存了每个上游服务的conn,身份验证集合,推送管道,接受管道,envoy配置生成器,可以理解成它保存了istio与envoy通讯之间的一系列操作.
serviceEntryController,k8s服务注册器,主要存储当前集群的上游服务,并根据上游服务的命名空间等过滤条件进行存储
WorkloadEntryController,上游服务管理器,主要对上游服务进行健康检查等操作.
在这几个控制器中,都有缓存的影子存在,但是每个缓存的作用不太一样
在XDSService中有EndpointIndex(根据serviceName与namespaces存储Istioendpoint),作用是判断当前更新的istioendpoint是否需要进行全量更新,如果不是则进行增量更新.
ServiceDiscovery主要存储了serviceInstance与workloadInstance结构,主要记录了当前注册中心中的服务.
configStroe缓存,如果默认使用的是k8s注册中心,则它的缓存就是informer(CRD)中的缓存机制

Handler

istio的代码有些绕,主要表现在handler,它会时不时add一下,所以最后都不知道handler具体调用了哪些.
接下来让我们罗列一下在修改资源时调用了哪些handler,这些handler都有哪些功能

Istio-PilotDiscovery服务的创建相关推荐

  1. Istio 自动注入 sidecar 不成功及k8s安装istio后pod无法创建解决方案

    环境 Kubernetes v1.15.6 源码安装 Istio v1.2.5 Helm 安装 Istio v1.2.5 Helm 安装 Istio Helm安装 问题 安装完后,做官方 bookin ...

  2. 借助 Istio 让服务更具弹性 | 周末送福利

    本文介绍如何借助 Istio 提供的功能,来让我们的服务更具弹性.这主要包括配置服务的负载均衡策略,配置服务的连接池,配置服务的健康检测机制,配置服务熔断,配置服务重试,配置服务限流.通过上述这些配置 ...

  3. Istio微服务架构时代

    在微服务架构盛行的今天,作为一名互联网技术从业人员,对于微服务的概念相信大家都已经耳熟能详了!而至于像Spring Cloud这样的微服务框架,因为大部分互联网公司都在此基础上构建过第一代微服务体系, ...

  4. netty 5 alph1源码分析(服务端创建过程)

    研究了netty的服务端创建过程.至于netty的优势,可以参照网络其他文章.<Netty系列之Netty 服务端创建>是 李林锋撰写的netty源码分析的一篇好文,绝对是技术干货.但抛开 ...

  5. Android服务一 创建启动服务

    若要学习创建绑定服务,请查看下篇Android服务二 创建绑定服务 启动服务 基于Service package service;import android.app.Service; import ...

  6. 在阿里云容器服务上创建一个使用Redis的Python应用

    使用容器服务可以方便快速的创建应用,下面的例子展示如何在容器服务上创建一个使用Redis的Python应用,只需要简单的几步. 第一步:准备代码 由于只是一个例子,所以我不可能使用太复杂的应用代码. ...

  7. WinRM 服务无法创建以下 SPN: /WSMAN/DC.contoso.com; WSMAN/DC

    当在 Windows Server 2008 R2 上安装 Active Directory 服务后,重新启动服务器会收到类似"WinRM 服务无法创建以下 SPN: /WSMAN/DC.c ...

  8. Istio:服务发现和Pilot的架构机制

    大纲 Istio架构&Pilot介绍 Istio服务发现 Istio服务配置 stio服务发现&规则管理与Kubernetes结合 ShowCase Istio架构&Pilot ...

  9. unicloud云开发---uniapp云开发(一)---服务空间创建以及部署一个云函数

    云开发系列 视频 https://www.bilibili.com/video/BV1eK4y1p7Qe 新系列视频 我们的视频教程(免费)链接为https://static-b5208986-2c0 ...

最新文章

  1. linux并发控制之自旋锁
  2. FJUT Home_W的拆分序列(DP)题解
  3. 如何在C/S下打印报表
  4. iphone11什么时候上市_hd3手表高仿哪里买 什么时候上市?
  5. 小程序api 分享scene_微信小程序 插件调用API的限制
  6. Spring使用ComponentScan扫描Maven多模块工程的其它模块
  7. 扩展BSGS-传送门
  8. 视频自动生成字幕(免费版)
  9. 海康威视Linux下SDK开发(Ubuntu16.04 QT5.10)
  10. 蓝牙 - 通信原理:电磁波 - 无线电波 - ISM - 蓝牙
  11. 计算机加入域后的用户名和密码,Windows自动改计算机名和加入域工具
  12. 支付宝当面付扫码支付支付后不回调_免费开通支付宝商家收款码 支持信用卡 花呗收款...
  13. Go语言-复合数据结构(map)
  14. matlab怎么求周期积分,Matlab中怎么求解积分
  15. Rocky Linux Yum源替换位上海交大镜像站点
  16. ar面部识别_【华为P20Pro评测】系统的进化:面部识别、AR该有的都有_华为 P20 Pro_手机评测-中关村在线...
  17. android4.4广播,Android4.4 framework分析——广播的注册(BroadcastReceiver)和发送(sendbroadcast)过程分析...
  18. 搜索计算机无法输入法,Windows10左下角搜索框无法输入字符的两种解决方法
  19. HDFS 本地编程报错java.net.ConnectException: Call From LAPTOP-LJF22VB3/192.168.56.1 to 192.168
  20. Typeof保姆式级教程

热门文章

  1. sublime build 系统必读
  2. 实例讲解单片机模拟量采集: 从硬件到程序, 从滤波到实际值转换
  3. matlab一些指令
  4. python实现日历功能_详解Python日历模块的使用
  5. linux系统安全优化策略
  6. 实验七 函数程序设计 张玉生《C语言程序设计实训教程》双色版 配套实验书答案 (纯手打, 仅供参考)
  7. Beagle填充之坑ERROR: REF field is not a sequence of A, C, T, G, or N characters at
  8. H国身份证号码【多组实例测试】
  9. 华为鲲鹏HCIA考试-练习04
  10. 编程规则 - 1 概述 -- 帮助你成长为优秀的程序员 杰出的软件工程师、设计师、分析师和架构师