女主宣言

kube-proxy当前支持三种方式实现负载均衡,分别是: userspace, iptables, IPVS. 但前两者随着Service的数量增长,存在性能的瓶颈,在生产环境是不能接受的。所以本篇文章主要对IPVS模式进行源码分析。

PS:丰富的一线技术、多元化的表现形式,尽在“360云计算”,点关注哦!

kube-proxy 整体逻辑结构

这张时序图描述了kube-proxy的整体逻辑结构,由于kub-proxy组件和其它的kube-* 组件一样都是使用pflag和cobra库去构建命令行应用程序。所以先简单介绍下该包的基本使用方式:

func main() { command := &cobra.Command{ Use:   "echo [string to echo]",   Short: "Echo anything to the screen", Long: `echo is for echoing anything back.Echo works a lot like print, except it has a child command.`,    Args: cobra.MinimumNArgs(1),    Run: func(cmd *cobra.Command, args []string) {  fmt.Println("Print: " + strings.Join(args, " "))   },  }   command.Execute()
}

上面这段代码就是使用cobra包的一个最简单的例子,首先初始化Command结构,其中该结构中的Run就是最终要执行的真正逻辑。当初始化完成Command之后,通过commnad.Execute去启动应用程序。

在Command.Run中主要做了如下几件事,看下面的代码:

// Run runs the specified ProxyServer.
func (o *Options) Run() error { defer close(o.errCh)    //....  proxyServer, err := NewProxyServer(o)  if err != nil {    return err  }   if o.CleanupAndExit {   return proxyServer.CleanupAndExit() }   o.proxyServer = proxyServer    return o.runLoop()
}

1.对ProxyServer实例进行初始化。

首先先来看看ProxyServer的结构定义:

type ProxyServer struct {   Client                 clientset.Interface  EventClient            v1core.EventsGetter  IptInterface           utiliptables.Interface   IpvsInterface          utilipvs.Interface   IpsetInterface         utilipset.Interface  execer                 exec.Interface   Proxier                proxy.ProxyProvider  Broadcaster            record.EventBroadcaster  Recorder               record.EventRecorder ConntrackConfiguration kubeproxyconfig.KubeProxyConntrackConfiguration  Conntracker            Conntracker // if nil, ignored   ProxyMode              string   NodeRef                *v1.ObjectReference  CleanupIPVS            bool MetricsBindAddress     string   EnableProfiling        bool OOMScoreAdj            *int32   ConfigSyncPeriod       time.Duration    HealthzServer          *healthcheck.HealthzServer
}

在ProxyServer结构中包含了与kube-apiserver通信的Client、操作Iptables的IptInterface、操作IPVS的IpvsInterface、操作IpSet的IpsetInterface,以及通过ProxyMode参数获取基于userspace, iptables, ipvs三种方式中的哪种使用的Proxier。

接下来重点介绍基于ipvs模式实现的Proxier, 在ipvs模式下Proxier结构的定义:

type Proxier struct {    endpointsChanges *proxy.EndpointChangeTracker   serviceChanges   *proxy.ServiceChangeTracker    //...   serviceMap   proxy.ServiceMap   endpointsMap proxy.EndpointsMap portsMap     map[utilproxy.LocalPort]utilproxy.Closeable    //...   syncRunner      *async.BoundedFrequencyRunner // governs calls to syncProxyRules    //...   iptables       utiliptables.Interface   ipvs           utilipvs.Interface   ipset          utilipset.Interface  exec           utilexec.Interface   //...   ipvsScheduler  string
}

在Proxier结构中,先介绍下async.BoundedFrequencyRunner,其它的在介绍ProxyServer.Run的时候介绍。

BoundedFrequencyRunner的定义结构如下:

type BoundedFrequencyRunner struct { name        string        // the name of this instance  minInterval time.Duration // the min time between runs, modulo bursts   maxInterval time.Duration // the max time between runs  run chan struct{} // try an async run   mu      sync.Mutex  // guards runs of fn and all mutations  fn      func()      // function to run  lastRun time.Time   // time of last run timer   timer       // timer for deferred runs  limiter rateLimiter // rate limiter for on-demand runs
}

BoundedFrequencyRunner结构中的run会异步的去定期的执行任务fn,比如定期的执行proxier.syncProxyRules去创建或者更新VirtuaServer和RealServer并将VirtualServer的VIP绑定到dummy interface(kube-ipvs0)。

下面是在NewProxier方法中初始化BoundedFrequencyRunner对象的示例:

proxier.syncRunner = async.NewBoundedFrequencyRunner(    "sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)

其中:

minSyncPeriod: 规则最小的更新时间

syncPeriod: 规则最大更新时间

proxier.syncProxyRules: 同步规则的实现函数(也是kube-proxy基于ipvs同步规则的核心实现)

ProxyServer启动流程

这部分介绍下ProxyServer.Run的逻辑实现,ProxyServer启动流程如下图所示:

在启动过程中,主要做了下面这几件事情:

1.启动健康检查服务HealthzServer.

接下来详细的介绍下[4-7]这几步的流程。

ServiceConfig的结构定义如下:

type ServiceConfig struct { listerSynced  cache.InformerSynced  eventHandlers []ServiceHandler
}

ServiceHandler的结构定义如下:

type ServiceHandler interface {    // OnServiceAdd is called whenever creation of new service object   // is observed. OnServiceAdd(service *v1.Service)   // OnServiceUpdate is called whenever modification of an existing   // service object is observed.  OnServiceUpdate(oldService, service *v1.Service)    // OnServiceDelete is called whenever deletion of an existing service   // object is observed.  OnServiceDelete(service *v1.Service)    // OnServiceSynced is called once all the initial even handlers were    // called and the state is fully propagated to local cache. OnServiceSynced()
}

创建ServiceConfig实例对象的具体实现如下:

func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig { result := &ServiceConfig{  listerSynced: serviceInformer.Informer().HasSynced, }   serviceInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{    AddFunc:    result.handleAddService,    UpdateFunc: result.handleUpdateService, DeleteFunc: result.handleDeleteService, },  resyncPeriod,   )   return result
}

  • 首先通过执行serviceInformer.Informer().HasSynced来将kubernetes下的所有Service资源同步到缓存listerSynced中。

  • 其次为AddEventHandlerWithResyncPeriod添加针对Service对象,添加,更新,删除的事件触发函数。当Service有相应的触发动作,就会调用相应的函数:handleAddService、handleUpdateService和handleDeleteService。

我们看看handleAddService触发函数的实现逻辑,具体代码如下:

func (c *ServiceConfig) handleAddService(obj interface{}) {   service, ok := obj.(*v1.Service)   if !ok {    utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))    return  }   for i := range c.eventHandlers {   klog.V(4).Info("Calling handler.OnServiceAdd")    c.eventHandlers[i].OnServiceAdd(service)    }
}

当watch到kubernetes集群中有新的Service被创建之后,会触发handleAddService函数,并在该函数中遍历eventHandlers分别去调用OnServiceAdd来对proxier结构中的serviceChanages进行更新并去同步相应的规则。

OnServiceAdd的具体实现逻辑如下:

// OnServiceAdd is called whenever creation of new service object is observed.
func (proxier *Proxier) OnServiceAdd(service *v1.Service) { proxier.OnServiceUpdate(nil, service)
}   // OnServiceUpdate is called whenever modification of an existing service object is observed.
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {  if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {  proxier.syncRunner.Run()    }
}

ServiceChangeTracker的结构定义如下:

// ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of
// Services, keyed by their namespace and name.
type ServiceChangeTracker struct {  // lock protects items. lock sync.Mutex // items maps a service to its serviceChange.   items map[types.NamespacedName]*serviceChange   // makeServiceInfo allows proxier to inject customized information when processing service. makeServiceInfo makeServicePortFunc // isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.    isIPv6Mode *bool    recorder   record.EventRecorder
}

serviceChanage的结构定义如下:

// serviceChange contains all changes to services that happened since proxy rules were synced.  For a single object,
// changes are accumulated, i.e. previous is state from before applying the changes,
// current is state after applying all of the changes.
type serviceChange struct { previous ServiceMap current  ServiceMap
}

到这里在回过头来看上面的基于IPVS实现的Proxier的整体流程就完全通了,ProxyServer.Run函数在启动时,通过kubernetes LIST/WATCH机制去实时的感知kubernetes集群Service资源的变化,然后不断的在更新Proxier结构中的ServiceChanges,然后将变化的Service保存在ServiceChanges结构中的ServiceMap中,给后续的async.BoundedFrequencyRunner去执行同步规则函数syncProxyRules来使用。

8.endpointConfig的实现机制和serviceConfig的机制完全一样,这里就不在详细的介绍了。

func (s *ProxyServer) birthCry() {  s.Recorder.Eventf(s.NodeRef, api.EventTypeNormal, "Starting", "Starting kube-proxy.")
}

11.最终通过SyncLoop启动kube-proxy服务,并立刻执行syncProxyRules先来一遍同步再说.之后便会通过异步的方式定期的去同步IPVS, Iptables, Ipset的规则。

而syncProxyRules函数是kube-proxy实现的核心。主体逻辑是遍历ServiceMap并遍历ServiceMap下的endpointsMap及创建的Service类型(如: CLusterIP, Loadbalancer, NodePort)去分别创建相应的IPVS规则。

syncProxyRules的函数实现定义如下:

func (proxier *Proxier) syncProxyRules() {    //..... // Build IPVS rules for each service.   for svcName, svc := range proxier.serviceMap { //......    // Handle traffic that loops back to the originator with SNAT.  for _, e := range proxier.endpointsMap[svcName] {  //....  }   // Capture the clusterIP.   // ipset call   entry := &utilipset.Entry{ IP:       svcInfo.ClusterIP().String(), Port:     svcInfo.Port(),   Protocol: protocol, SetType:  utilipset.HashIPPort, }   // add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.  // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())    if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid { klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeClusterIPSet].Name))  continue    }   proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())    // ipvs call    serv := &utilipvs.VirtualServer{   Address:   svcInfo.ClusterIP(), Port:      uint16(svcInfo.Port()),  Protocol:  string(svcInfo.Protocol()),  Scheduler: proxier.ipvsScheduler,   }   // Set session affinity flag and timeout for IPVS service   if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {  serv.Flags |= utilipvs.FlagPersistent  serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())   }   // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() if err := proxier.syncService(svcNameString, serv, true); err == nil {   activeIPVSServices[serv.String()] = true   activeBindAddrs[serv.Address.String()] = true  // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP // So we still need clusterIP rules in onlyNodeLocalEndpoints mode. if err := proxier.syncEndpoint(svcName, false, serv); err != nil {    klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)    }   } else {    klog.Errorf("Failed to sync service: %v, err: %v", serv, err) }   // Capture externalIPs. for _, externalIP := range svcInfo.ExternalIPStrings() {   //....  }   // Capture load-balancer ingress.   for _, ingress := range svcInfo.LoadBalancerIPStrings() {  //..... }   if svcInfo.NodePort() != 0 {   //....  }   }   // sync ipset entries   for _, set := range proxier.ipsetList {    set.syncIPSetEntries()  }   // Tail call iptables rules for ipset, make sure only call iptables once    // in a single loop per ip set. proxier.writeIptablesRules()    // Sync iptables rules. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.   proxier.iptablesData.Reset()    proxier.iptablesData.Write(proxier.natChains.Bytes())   proxier.iptablesData.Write(proxier.natRules.Bytes())    proxier.iptablesData.Write(proxier.filterChains.Bytes())    proxier.iptablesData.Write(proxier.filterRules.Bytes()) }

总结

kube-proxy的代码逻辑还是比较简洁的,整体的思想就是kube-proxy服务去watch kubernetes集群的Service和Endpoint对象,当这两个资源对象有状态变化时,会把它们保存在ServiceMap和EndPonintMap中,然后会通过async.BoundedFrequencyRunner去异步的执行syncProxyRules去下发规则。

360云计算

由360云平台团队打造的技术分享公众号,内容涉及数据库、大数据、微服务、容器、AIOps、IoT等众多技术领域,通过夯实的技术积累和丰富的一线实战经验,为你带来最有料的技术分享

Kube-Proxy IPVS模式源码分析相关推荐

  1. Redis集群模式源码分析

    目录 1 主从复制模式 2 Sentinel(哨兵)模式 3 Cluster模式 4.参考文档 1 主从复制模式 主库负责读写操作,从库负责数据同步,接受来自主库的同步命令.通过分析Redis的客户端 ...

  2. Android Doze模式源码分析

    科技的仿生学无处不在,给予我们启发.为了延长电池是使用寿命,google从蛇的冬眠中得到体会,那就是在某种情况下也让手机进入类冬眠的情况,从而引入了今天的主题,Doze模式,Doze中文是打盹儿,打盹 ...

  3. android doze模式源码分析,Android Doze模式启用和恢复详解

    从Android 6.0(API level 23)开始,Android提出了两个延长电池使用时间的省电特性给用户.用户管理可以在没有充电的情况下管理app的行为.当用户一段时间没有使用手机的时候,D ...

  4. Java设计模式学习以及底层源码分析

    源码在分支master 工厂模式 把具体创建产品的细节封装起来,你要什么产品,我给你什么产品即可. 简单工厂模式 工厂方法模式 缓存层:抽象类 抽象工厂模式 缓存层是:接口 原型模式 问题: 原型模式 ...

  5. 【转】ABP源码分析一:整体项目结构及目录

    ABP是一套非常优秀的web应用程序架构,适合用来搭建集中式架构的web应用程序. 整个Abp的Infrastructure是以Abp这个package为核心模块(core)+15个模块(module ...

  6. 深入理解Spark 2.1 Core (六):Standalone模式运行的原理与源码分析

    我们讲到了如何启动Master和Worker,还讲到了如何回收资源.但是,我们没有将AppClient是如何启动的,其实它们的启动也涉及到了资源是如何调度的.这篇博文,我们就来讲一下AppClient ...

  7. Java的三种代理模式完整源码分析

    Java的三种代理模式&完整源码分析 Java的三种代理模式&完整源码分析 参考资料: 博客园-Java的三种代理模式 简书-JDK动态代理-超详细源码分析 [博客园-WeakCach ...

  8. Java的三种代理模式【附源码分析】

    Java的三种代理模式&完整源码分析 代理模式分为两种,静态代理和动态代理,动态代理包括JDK动态代理和Cglib动态代理. 静态代理 静态代理在使用时,需要定义接口或者父类,被代理对象与代理 ...

  9. 阿里开源一站式分布式事务框架seata源码分析(AT模式下TM与RM分析)

    序言: 对于阿里开源分布式事务框架seata的详细了解可以参考官网,这里不会详细介绍.本章只会介绍seata中AT模式的源码分析(对阿seata有一定了解或者成功完成过demo). seata中一个事 ...

最新文章

  1. Spark RDD与DataFrame
  2. vue-cli的初步使用
  3. LCD也可以模拟?这款模拟器别错过了!
  4. java数据结构系列——排列(2):有序阵列
  5. 关闭IOS更新功能(ios4/5/6)
  6. java判断总共天数_java 判断两个时间相差的天数
  7. 提高Eclipse的速度,去掉对于工程的 Validatioan
  8. torchtext 中文语料加载
  9. C语言程序设计教材九斗验证,C语言实验报告参考答案(原)
  10. Django数据库的增删改查学习笔记
  11. 2018年江西省电子现场赛赛题
  12. 泰拉瑞亚 服务器linux,泰拉瑞亚Linux主机打造指南
  13. 关于Python、R、VBA、SAS的生成批量变量名与动态变量引用的问题
  14. arcpy投影(三)——定义投影、地理变换关系自定义和投影变换Project_managemen(含基准面/椭球体转换参数使用方法,arcpro/arcmap)
  15. SOPHON BM1684芯片解码性能以及支持的文件格式
  16. 开源社区的由来(转载)
  17. C#中枚举和结构解释(少儿版)
  18. 【李开复】给中国学生的第五封信——你有选择的权利(五)
  19. 我试过销声匿迹最后却无人问津(人生感悟)
  20. 2022-2027年中国壁挂炉行业市场全景评估及发展战略规划报告

热门文章

  1. 关于python中self
  2. HTML中利用堆栈方式对Table进行行排序
  3. 删除顺序表中指定范围的元素
  4. java List 常见坑
  5. Ranger-Sqoop2插件实现详解
  6. Spark GraphX算法 - Aggregate Messages (aggregateMessages)算法
  7. sqlyog如何设置.时提示字段名_Spring boot 中使用 Tomcat时 用户名 密码如何设置呢?...
  8. 久谦咨询python笔试题目_python笔试含答案
  9. e盾服务端源码_原罪西游源码发布!!!
  10. Javascipt超详细版思维导图+基础语法导航