背景

在后续阅读k8s0.4版本的过程中,发现文档上描述的确实是一个不完整的版本,故切换版本到1.1,因为在1.1文档中已经标明了可以在生成环境中使用,故重新再学习一下有关kube-proxy的内容,本文主要是做一个补充。

kube-proxy

功能还是以前那样通过部署在每个node节点上面,来提给service层面和node层面的通信支持。其主要的功能内容就是通过iptables来进行网络的联通与路由功能。

  • proxy通过master提供的api直接获取对应的endpoints和services,通过定时访问api的方式来获取配置的信息。
  • 通过第一步获取的services和endpoints的信息,通过iptables来操作数据转发,此时可以分为用户态转发也可以通过iptables来转发,一旦master中的enpoints和services的信息进行了改变,就本地通过iptables规则来进行管理。

如上的proxy的基本的核心实现原理就是如上两条。

启动流程
func init() {healthz.DefaultHealthz()    // 设置心跳检测server
}func main() {runtime.GOMAXPROCS(runtime.NumCPU())  // 设置跟cpu数量相同的线程数量config := app.NewProxyConfig()               // 获取代理配置config.AddFlags(pflag.CommandLine)    // 解析输入参数util.InitFlags()                                            // 解析启动的输入参数util.InitLogs()defer util.FlushLogs()verflag.PrintAndExitIfRequested()s, err := app.NewProxyServerDefault(config)      // 初始化一个代理serverif err != nil {fmt.Fprintf(os.Stderr, "%v\n", err)os.Exit(1)}if err = s.Run(pflag.CommandLine.Args()); err != nil {  // 运行起来fmt.Fprintf(os.Stderr, "%v\n", err)os.Exit(1)}
}

此时再NewProxyServerDefault中启动了对定时访问master接口信息的协程,在Run函数中启动了同步ipables规则的协程。

// NewProxyServerDefault creates a new ProxyServer object with default parameters.
func NewProxyServerDefault(config *ProxyServerConfig) (*ProxyServer, error) {protocol := utiliptables.ProtocolIpv4    // 检查是ipv4还是ipv6if config.BindAddress.To4() == nil {protocol = utiliptables.ProtocolIpv6}// We ommit creation of pretty much everything if we run in cleanup modeif config.CleanupAndExit {execer := exec.New()dbus := utildbus.New()IptInterface := utiliptables.New(execer, dbus, protocol)return &ProxyServer{Config:       config,IptInterface: IptInterface,}, nil}// TODO(vmarmol): Use container config for this.var oomAdjuster *oom.OomAdjusterif config.OOMScoreAdj != 0 {oomAdjuster := oom.NewOomAdjuster()if err := oomAdjuster.ApplyOomScoreAdj(0, config.OOMScoreAdj); err != nil {glog.V(2).Info(err)}}if config.ResourceContainer != "" {// Run in its own container.if err := util.RunInResourceContainer(config.ResourceContainer); err != nil {glog.Warningf("Failed to start in resource-only container %q: %v", config.ResourceContainer, err)} else {glog.V(2).Infof("Running in resource-only container %q", config.ResourceContainer)}}// Create a Kube Client// define api config sourceif config.Kubeconfig == "" && config.Master == "" {glog.Warningf("Neither --kubeconfig nor --master was specified.  Using default API client.  This might not work.")}// This creates a client, first loading any specified kubeconfig// file, and then overriding the Master flag, if non-empty.kubeconfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(&clientcmd.ClientConfigLoadingRules{ExplicitPath: config.Kubeconfig},&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: config.Master}}).ClientConfig()    // 如果包括是否添加证书或者认证的信息if err != nil {return nil, err}client, err := kubeclient.New(kubeconfig)// 生成访问的client实例if err != nil {glog.Fatalf("Invalid API configuration: %v", err)}// Create event recorderhostname := nodeutil.GetHostname(config.HostnameOverride)eventBroadcaster := record.NewBroadcaster()recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "kube-proxy", Host: hostname})eventBroadcaster.StartRecordingToSink(client.Events(""))// Create a iptables utils.execer := exec.New()dbus := utildbus.New()iptInterface := utiliptables.New(execer, dbus, protocol)  // 生成操作iptables的接口var proxier proxy.ProxyProvidervar endpointsHandler proxyconfig.EndpointsConfigHandleruseIptablesProxy := false                                                          // 检查是用户态来转发还是直接利用iptables来转发if mayTryIptablesProxy(config.ProxyMode, client.Nodes(), hostname) {var err error// guaranteed false on error, error only necessary for debugginguseIptablesProxy, err = iptables.ShouldUseIptablesProxier()  if err != nil {glog.Errorf("Can't determine whether to use iptables proxy, using userspace proxier: %v", err)}}if useIptablesProxy {glog.V(2).Info("Using iptables Proxier.")execer := exec.New()proxierIptables, err := iptables.NewProxier(iptInterface, execer, config.SyncPeriod, config.MasqueradeAll)  // 使用iptables来转发if err != nil {glog.Fatalf("Unable to create proxier: %v", err)}proxier = proxierIptablesendpointsHandler = proxierIptables   // 设置enpoint处理的函数// No turning back. Remove artifacts that might still exist from the userspace Proxier.glog.V(2).Info("Tearing down userspace rules. Errors here are acceptable.")userspace.CleanupLeftovers(iptInterface)   } else {glog.V(2).Info("Using userspace Proxier.")// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for// our config.EndpointsConfigHandler.loadBalancer := userspace.NewLoadBalancerRR()  // 用户态的转发// set EndpointsConfigHandler to our loadBalancerendpointsHandler = loadBalancer                        // 负载均衡的规则proxierUserspace, err := userspace.NewProxier(loadBalancer, config.BindAddress, iptInterface, config.PortRange, config.SyncPeriod, config.UDPIdleTimeout)  // 生成用户态转发的代理实例if err != nil {glog.Fatalf("Unable to create proxier: %v", err)}proxier = proxierUserspace// Remove artifacts from the pure-iptables Proxier.glog.V(2).Info("Tearing down pure-iptables proxy rules. Errors here are acceptable.")iptables.CleanupLeftovers(iptInterface)  }iptInterface.AddReloadFunc(proxier.Sync)// Create configs (i.e. Watches for Services and Endpoints)// Note: RegisterHandler() calls need to happen before creation of Sources because sources// only notify on changes, and the initial update (on process start) may be lost if no handlers// are registered yet.serviceConfig := proxyconfig.NewServiceConfig()   // 生成service监控的实例并注册proxierserviceConfig.RegisterHandler(proxier)endpointsConfig := proxyconfig.NewEndpointsConfig()   // 生成endpoint监控的实例并监控endpointsConfig.RegisterHandler(endpointsHandler)proxyconfig.NewSourceAPI(client,30*time.Second,serviceConfig.Channel("api"),endpointsConfig.Channel("api"),)                                                                                                  // 生成对service和endpoint的定时访问master的任务config.nodeRef = &api.ObjectReference{Kind:      "Node",Name:      hostname,UID:       types.UID(hostname),Namespace: "",}return NewProxyServer(config, client, endpointsConfig, endpointsHandler, iptInterface, oomAdjuster, proxier, recorder, serviceConfig)      // 生成代理转发的实例
}

通过该流程可以看出如下两点:

  • 无论是用户态转发还是iptables转发都是通过iptables来控制流量的转发任务的,如果是在用户态则需要根据对应的IP和端口来监听,并通过规则将流量转发到该端口上并服务,这也就是service为什么可以在每个node上面进行访问。
  • proxy与master的交互包括两种一个是service,另一个是endpoint的信息交互,并且暴露心跳检查的端口来检查proxy是否存活,proxy通过获取master修改的信息来进行本地iptables的相关信息的维护。
service和endpoint信息同步
type EndpointsConfig struct {mux     *config.Muxbcaster *config.Broadcasterstore   *endpointsStore
}               // endpoint同步的结构体// NewEndpointsConfig creates a new EndpointsConfig.
// It immediately runs the created EndpointsConfig.
func NewEndpointsConfig() *EndpointsConfig {updates := make(chan struct{})   // 生成一个管道store := &endpointsStore{updates: updates, endpoints: make(map[string]map[types.NamespacedName]api.Endpoints)}  // 生成一个保存的结构体mux := config.NewMux(store)                                                       // 生成一个mux该mux用来保存管道方便多个数据更新bcaster := config.NewBroadcaster()                                             // 生成一个广播者go watchForUpdates(bcaster, store, updates)                       // 监听更新信息return &EndpointsConfig{mux, bcaster, store}                       // 返回该实例
}  func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {glog.V(3).Infof("Calling handler.OnEndpointsUpdate()")handler.OnEndpointsUpdate(instance.([]api.Endpoints))}))                                                                                                        // 注册一个广播者的回调对象,一旦有更新就调用该handler的OnEndpointsUpdate方法
}func (c *EndpointsConfig) Channel(source string) chan EndpointsUpdate {ch := c.mux.Channel(source)                                                    // 获取一个管道endpointsCh := make(chan EndpointsUpdate)                         // 生成一个消息输入的chango func() {for update := range endpointsCh {                                   // 监听管道的数据并发送给监听的一端ch <- update}close(ch)                                                                                    // 如果退出则关闭}()return endpointsCh                                                                         // 返回
}func (c *EndpointsConfig) Config() []api.Endpoints {return c.store.MergedState().([]api.Endpoints)        // 返回endpoints
}type endpointsStore struct {endpointLock sync.RWMutexendpoints    map[string]map[types.NamespacedName]api.Endpointsupdates      chan<- struct{}
}func (s *endpointsStore) Merge(source string, change interface{}) error {s.endpointLock.Lock()endpoints := s.endpoints[source]if endpoints == nil {endpoints = make(map[types.NamespacedName]api.Endpoints)}update := change.(EndpointsUpdate)switch update.Op {                                                                      // 根据操作来进行endpoins的增删改case ADD:glog.V(4).Infof("Adding new endpoint from source %s : %s", source, spew.Sdump(update.Endpoints))for _, value := range update.Endpoints {name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}endpoints[name] = value}case REMOVE:glog.V(4).Infof("Removing an endpoint %s", spew.Sdump(update))for _, value := range update.Endpoints {name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}delete(endpoints, name)}case SET:glog.V(4).Infof("Setting endpoints %s", spew.Sdump(update))// Clear the old map entries by just creating a new mapendpoints = make(map[types.NamespacedName]api.Endpoints)for _, value := range update.Endpoints {name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}endpoints[name] = value}default:glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update))}s.endpoints[source] = endpoints             // 保存对应管道的endpointss.endpointLock.Unlock()if s.updates != nil {s.updates <- struct{}{}}return nil
}func (s *endpointsStore) MergedState() interface{} {s.endpointLock.RLock()defer s.endpointLock.RUnlock()endpoints := make([]api.Endpoints, 0)for _, sourceEndpoints := range s.endpoints { for _, value := range sourceEndpoints {      // 新增endpoints的数据endpoints = append(endpoints, value)}}return endpoints
}// ServiceConfig tracks a set of service configurations.
// It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change.
type ServiceConfig struct {mux     *config.Muxbcaster *config.Broadcasterstore   *serviceStore
}// NewServiceConfig creates a new ServiceConfig.
// It immediately runs the created ServiceConfig.
func NewServiceConfig() *ServiceConfig {updates := make(chan struct{})store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]api.Service)}       // 生成一个storemux := config.NewMux(store)bcaster := config.NewBroadcaster()                                             // 生成一个广播者go watchForUpdates(bcaster, store, updates)                       // 监控数据更新return &ServiceConfig{mux, bcaster, store}
}func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {glog.V(3).Infof("Calling handler.OnServiceUpdate()")handler.OnServiceUpdate(instance.([]api.Service))}))                                                                                                        // 注册更新之后调用的回调函数处理
}func (c *ServiceConfig) Channel(source string) chan ServiceUpdate {ch := c.mux.Channel(source)serviceCh := make(chan ServiceUpdate)                  // 通过监控这个chan 来更新到另外等待的chan中go func() {for update := range serviceCh {ch <- update}close(ch)}()return serviceCh
}func (c *ServiceConfig) Config() []api.Service {return c.store.MergedState().([]api.Service)
}type serviceStore struct {serviceLock sync.RWMutexservices    map[string]map[types.NamespacedName]api.Serviceupdates     chan<- struct{}
}func (s *serviceStore) Merge(source string, change interface{}) error {s.serviceLock.Lock()services := s.services[source]if services == nil {services = make(map[types.NamespacedName]api.Service)}update := change.(ServiceUpdate)switch update.Op {                                 // 根据更新操作来进行增删改case ADD:glog.V(4).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Services))for _, value := range update.Services {name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}services[name] = value}case REMOVE:glog.V(4).Infof("Removing a service %s", spew.Sdump(update))for _, value := range update.Services {name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}delete(services, name)}case SET:glog.V(4).Infof("Setting services %s", spew.Sdump(update))// Clear the old map entries by just creating a new mapservices = make(map[types.NamespacedName]api.Service)for _, value := range update.Services {name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}services[name] = value}default:glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update))}s.services[source] = services                // 保证最终的services信息s.serviceLock.Unlock()if s.updates != nil {s.updates <- struct{}{}}return nil
}func (s *serviceStore) MergedState() interface{} {s.serviceLock.RLock()defer s.serviceLock.RUnlock()services := make([]api.Service, 0)for _, sourceServices := range s.services {for _, value := range sourceServices {services = append(services, value)}}return services
}// watchForUpdates invokes bcaster.Notify() with the latest version of an object
// when changes occur.
func watchForUpdates(bcaster *config.Broadcaster, accessor config.Accessor, updates <-chan struct{}) {for true {<-updatesbcaster.Notify(accessor.MergedState())       // 调用广播者的合并信息}
}

该段代码主要通过了订阅发布的设计模式,将需要订阅的操作都集中到了bcaster中,通过调用Notify来通知每一个订阅消息的对象。当一切准备完成之后就需要启动定时访问master的协程。

proxyconfig.NewSourceAPI(client,30*time.Second,serviceConfig.Channel("api"),endpointsConfig.Channel("api"),
)...// NewSourceAPIserver creates config source that watches for changes to the services and endpoints.
func NewSourceAPI(c *client.Client, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) {servicesLW := cache.NewListWatchFromClient(c, "services", api.NamespaceAll, fields.Everything())       // 生成一个services的定时信息endpointsLW := cache.NewListWatchFromClient(c, "endpoints", api.NamespaceAll, fields.Everything())       // 生成一个endpoints的定时信息newServicesSourceApiFromLW(servicesLW, period, servicesChan)newEndpointsSourceApiFromLW(endpointsLW, period, endpointsChan)
}func newServicesSourceApiFromLW(servicesLW cache.ListerWatcher, period time.Duration, servicesChan chan<- ServiceUpdate) {servicesPush := func(objs []interface{}) {var services []api.Servicefor _, o := range objs {services = append(services, *(o.(*api.Service)))}servicesChan <- ServiceUpdate{Op: SET, Services: services}   // 将更新的信息发入servicesChan中}serviceQueue := cache.NewUndeltaStore(servicesPush, cache.MetaNamespaceKeyFunc)cache.NewReflector(servicesLW, &api.Service{}, serviceQueue, period).Run()
}func newEndpointsSourceApiFromLW(endpointsLW cache.ListerWatcher, period time.Duration, endpointsChan chan<- EndpointsUpdate) {endpointsPush := func(objs []interface{}) {var endpoints []api.Endpointsfor _, o := range objs {endpoints = append(endpoints, *(o.(*api.Endpoints)))}endpointsChan <- EndpointsUpdate{Op: SET, Endpoints: endpoints}  // 发送到endpointsChan中}endpointQueue := cache.NewUndeltaStore(endpointsPush, cache.MetaNamespaceKeyFunc) // 生成一个存储cache.NewReflector(endpointsLW, &api.Endpoints{}, endpointQueue, period).Run()  // 定时运行监听协程
}

通过配置可知每个三十秒就去拉取一下master的接口来检查当前是否有service或endpoint的信息变更。

首先生成一个客户端;

// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {listFunc := func() (runtime.Object, error) {return c.Get().Namespace(namespace).Resource(resource).FieldsSelectorParam(fieldSelector).Do().Get()}              // 注册list的回调函数watchFunc := func(resourceVersion string) (watch.Interface, error) {return c.Get().Prefix("watch").Namespace(namespace).Resource(resource).FieldsSelectorParam(fieldSelector).Param("resourceVersion", resourceVersion).Watch()}         // 注册watchFunc的回调函数return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}

生成回调函数之后就需要启动对应的调用的方法。

// NewReflector creates a new Reflector object which will keep the given store up to
// date with the server's contents for the given resource. Reflector promises to
// only put things in the store that have the type of expectedType, unless expectedType
// is nil. If resyncPeriod is non-zero, then lists will be executed after every
// resyncPeriod, so that you can use reflectors to periodically process everything as
// well as incrementally processing the things that change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)
}// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {r := &Reflector{name:          name,listerWatcher: lw,store:         store,expectedType:  reflect.TypeOf(expectedType),period:        time.Second,resyncPeriod:  resyncPeriod,}return r
}// internalPackages are packages that ignored when creating a default reflector name.  These packages are in the common
// call chains to NewReflector, so they'd be low entropy names for reflectors
var internalPackages = []string{"kubernetes/pkg/client/cache/", "kubernetes/pkg/controller/framework/"}
...// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run starts a goroutine and returns immediately.
func (r *Reflector) Run() {go util.Until(func() { r.ListAndWatch(util.NeverStop) }, r.period, util.NeverStop)
}// Returns error if ListAndWatch didn't even tried to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {var resourceVersion stringresyncCh, cleanup := r.resyncChan()defer cleanup()if r.verbose {glog.Infof("watch %v: listwatch cycle started.", r.name)defer glog.Infof("watch %v: listwatch cycle exited.", r.name)}list, err := r.listerWatcher.List()            // 获取信息if err != nil {return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)}meta, err := meta.Accessor(list)if err != nil {return fmt.Errorf("%s: Unable to understand list result %#v", r.name, list)}resourceVersion = meta.ResourceVersion()           // 获取信息items, err := runtime.ExtractList(list)if err != nil {return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)}if err := r.syncWith(items, resourceVersion); err != nil {return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)}r.setLastSyncResourceVersion(resourceVersion)for {w, err := r.listerWatcher.Watch(resourceVersion)       // 调用访问接口访问数据if err != nil {switch err {case io.EOF:// watch closed normallycase io.ErrUnexpectedEOF:glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)default:util.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))}// If this is "connection refused" error, it means that most likely apiserver is not responsive.// It doesn't make sense to re-list all objects because most likely we will be able to restart// watch where we ended.// If that's the case wait and resend watch request.if urlError, ok := err.(*url.Error); ok {if opError, ok := urlError.Err.(*net.OpError); ok {if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {time.Sleep(time.Second)continue}}}return nil}if err := r.watchHandler(w, &resourceVersion, resyncCh, stopCh); err != nil {  // 调用handle进行处理if err != errorResyncRequested && err != errorStopRequested {glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)}return nil}}
}// syncWith replaces the store's items with the given list.
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {found := make([]interface{}, 0, len(items))for _, item := range items {found = append(found, item)}return r.store.Replace(found, resourceVersion)
}// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, resyncCh <-chan time.Time, stopCh <-chan struct{}) error {start := time.Now()eventCount := 0// Stopping the watcher should be idempotent and if we return from this function there's no way// we're coming back in with the same watch interface.defer w.Stop()if r.verbose {glog.Infof("watch %v: watch started.", r.name)defer glog.Infof("watch %v: watch exited.", r.name)}loop:for {select {case <-stopCh:if r.verbose {glog.Infof("watch %v: stop requested.", r.name)}return errorStopRequestedcase <-resyncCh:if r.verbose {glog.Infof("watch %v: time to resync.", r.name)}return errorResyncRequestedcase event, ok := <-w.ResultChan():             // 获取返回数据if !ok {break loop}if r.verbose {glog.Infof("watch %v: about to process result %s %#v.", r.name, event.Type, event.Object)}if event.Type == watch.Error {return apierrs.FromObject(event.Object)}if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {util.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))continue}meta, err := meta.Accessor(event.Object)    // 获取源信息if err != nil {util.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))continue}newResourceVersion := meta.ResourceVersion()switch event.Type {case watch.Added:r.store.Add(event.Object)        // 调用增删改来处理当前返回的数据case watch.Modified:r.store.Update(event.Object)case watch.Deleted:// TODO: Will any consumers need access to the "last known// state", which is passed in event.Object? If so, may need// to change this.r.store.Delete(event.Object)default:util.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))}*resourceVersion = newResourceVersionr.setLastSyncResourceVersion(newResourceVersion)eventCount++if r.verbose {glog.Infof("watch %v: now at rv %v.", r.name, *resourceVersion)}}}watchDuration := time.Now().Sub(start)if watchDuration < 1*time.Second && eventCount == 0 {glog.V(4).Infof("%s: Unexpected watch close - watch lasted less than a second and no items received", r.name)return errors.New("very short watch")}glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)return nil
}

从代码中可知NewReflector会启动一个协程来调用注册的ListAndWatch来进行处理,以services为例,就是进行。

最终会通过如下函数进行处理

// cache responsibilities are limited to:
//  1. Computing keys for objects via keyFunc
//  2. Invoking methods of a ThreadSafeStorage interface
type cache struct {// cacheStorage bears the burden of thread safety for the cachecacheStorage ThreadSafeStore// keyFunc is used to make the key for objects stored in and retrieved from items, and// should be deterministic.keyFunc KeyFunc
}// Add inserts an item into the cache.
func (c *cache) Add(obj interface{}) error {  // 添加key, err := c.keyFunc(obj)                                  // 调用管道发送数据if err != nil {return KeyError{obj, err}}c.cacheStorage.Add(key, obj)                               // 保存对应信息return nil
}// Update sets an item in the cache to its updated state.
func (c *cache) Update(obj interface{}) error {key, err := c.keyFunc(obj)                                  // 更新信息if err != nil {return KeyError{obj, err}}c.cacheStorage.Update(key, obj)                        // 更新对应信息return nil
}// Delete removes an item from the cache.
func (c *cache) Delete(obj interface{}) error {key, err := c.keyFunc(obj)                                  // 先发送给等待的iptables信息if err != nil {return KeyError{obj, err}}c.cacheStorage.Delete(key)                                    // 删除对应信息return nil
}// List returns a list of all the items.
// List is completely threadsafe as long as you treat all items as immutable.
func (c *cache) List() []interface{} {return c.cacheStorage.List()
}// ListKeys returns a list of all the keys of the objects currently
// in the cache.
func (c *cache) ListKeys() []string {return c.cacheStorage.ListKeys()
}// Index returns a list of items that match on the index function
// Index is thread-safe so long as you treat all items as immutable
func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) {return c.cacheStorage.Index(indexName, obj)
}// ListIndexFuncValues returns the list of generated values of an Index func
func (c *cache) ListIndexFuncValues(indexName string) []string {return c.cacheStorage.ListIndexFuncValues(indexName)
}func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {return c.cacheStorage.ByIndex(indexName, indexKey)
}// Get returns the requested item, or sets exists=false.
// Get is completely threadsafe as long as you treat all items as immutable.
func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {key, _ := c.keyFunc(obj)if err != nil {return nil, false, KeyError{obj, err}}return c.GetByKey(key)
}// GetByKey returns the request item, or exists=false.
// GetByKey is completely threadsafe as long as you treat all items as immutable.
func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {item, exists = c.cacheStorage.Get(key)return item, exists, nil
}// Replace will delete the contents of 'c', using instead the given list.
// 'c' takes ownership of the list, you should not reference the list again
// after calling this function.
func (c *cache) Replace(list []interface{}, resourceVersion string) error {items := map[string]interface{}{}for _, item := range list {key, err := c.keyFunc(item)if err != nil {return KeyError{item, err}}items[key] = item}c.cacheStorage.Replace(items, resourceVersion)return nil
}func NewStore(keyFunc KeyFunc) Store {return &cache{cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),keyFunc:      keyFunc,                 // 对修改收到的数据进行回调处理的函数}
}

至此,通过api获取master的处理的流程基本完成,接下来看当等待接收变更的iptables相关的操作。

iptables相关操作流程
// Run runs the specified ProxyServer.  This should never exit (unless CleanupAndExit is set).
func (s *ProxyServer) Run(_ []string) error {// remove iptables rules and exitif s.Config.CleanupAndExit {encounteredError := userspace.CleanupLeftovers(s.IptInterface)encounteredError = iptables.CleanupLeftovers(s.IptInterface) || encounteredErrorif encounteredError {return errors.New("Encountered an error while tearing down rules.")}return nil}// Birth Cry after the birth is successfuls.birthCry()// Start up Healthz service if requestedif s.Config.HealthzPort > 0 {go util.Until(func() {err := http.ListenAndServe(s.Config.HealthzBindAddress.String()+":"+strconv.Itoa(s.Config.HealthzPort), nil)if err != nil {glog.Errorf("Starting health server failed: %v", err)}}, 5*time.Second, util.NeverStop)   // 开启心跳健康检查的端口}// Just loop forever for now...s.Proxier.SyncLoop()                 // 开启修改信息return nil
}

继续查看SyncLoop,本文默认查看用户态的流量转发。

func (proxier *Proxier) openOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error {if local, err := isLocalIP(portal.ip); err != nil {return fmt.Errorf("can't determine if IP is local, assuming not: %v", err)} else if local {err := proxier.claimNodePort(portal.ip, portal.port, protocol, name)  // 代开一个新的代理端口信息if err != nil {return err}}// Handle traffic from containers.args := proxier.iptablesContainerPortalArgs(portal.ip, portal.isExternal, false, portal.port, protocol, proxyIP, proxyPort, name)existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerPortalChain, args...)       // 检查规则if err != nil {glog.Errorf("Failed to install iptables %s rule for service %q, args:%v", iptablesContainerPortalChain, name, args)return err}if !existed {glog.V(3).Infof("Opened iptables from-containers portal for service %q on %s %s:%d", name, protocol, portal.ip, portal.port)}if portal.isExternal {args := proxier.iptablesContainerPortalArgs(portal.ip, false, true, portal.port, protocol, proxyIP, proxyPort, name) existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerPortalChain, args...) // 添加对外ip的规则if err != nil {glog.Errorf("Failed to install iptables %s rule that opens service %q for local traffic, args:%v", iptablesContainerPortalChain, name, args)return err}if !existed {glog.V(3).Infof("Opened iptables from-containers portal for service %q on %s %s:%d for local traffic", name, protocol, portal.ip, portal.port)}args = proxier.iptablesHostPortalArgs(portal.ip, true, portal.port, protocol, proxyIP, proxyPort, name)existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesHostPortalChain, args...)if err != nil {glog.Errorf("Failed to install iptables %s rule for service %q for dst-local traffic", iptablesHostPortalChain, name)return err}if !existed {glog.V(3).Infof("Opened iptables from-host portal for service %q on %s %s:%d for dst-local traffic", name, protocol, portal.ip, portal.port)}return nil}// Handle traffic from the host.args = proxier.iptablesHostPortalArgs(portal.ip, false, portal.port, protocol, proxyIP, proxyPort, name)existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesHostPortalChain, args...)if err != nil {glog.Errorf("Failed to install iptables %s rule for service %q", iptablesHostPortalChain, name)return err}if !existed {glog.V(3).Infof("Opened iptables from-host portal for service %q on %s %s:%d", name, protocol, portal.ip, portal.port)}return nil
}// Marks a port as being owned by a particular service, or returns error if already claimed.
// Idempotent: reclaiming with the same owner is not an error
func (proxier *Proxier) claimNodePort(ip net.IP, port int, protocol api.Protocol, owner proxy.ServicePortName) error {proxier.portMapMutex.Lock()defer proxier.portMapMutex.Unlock()// TODO: We could pre-populate some reserved ports into portMap and/or blacklist some well-known portskey := portMapKey{ip: ip.String(), port: port, protocol: protocol}existing, found := proxier.portMap[key]if !found {// Hold the actual port open, even though we use iptables to redirect// it.  This ensures that a) it's safe to take and b) that stays true.// NOTE: We should not need to have a real listen()ing socket - bind()// should be enough, but I can't figure out a way to e2e test without// it.  Tools like 'ss' and 'netstat' do not show sockets that are// bind()ed but not listen()ed, and at least the default debian netcat// has no way to avoid about 10 seconds of retries.socket, err := newProxySocket(protocol, ip, port)  // 生成一个代理的socket来进行数据转发if err != nil {return fmt.Errorf("can't open node port for %s: %v", key.String(), err)}proxier.portMap[key] = &portMapValue{owner: owner, socket: socket} // 保存信息glog.V(2).Infof("Claimed local port %s", key.String())return nil}if existing.owner == owner {// We are idempotentreturn nil}return fmt.Errorf("Port conflict detected on port %s.  %v vs %v", key.String(), owner, existing)
}func (proxier *Proxier) openPortal(service proxy.ServicePortName, info *serviceInfo) error {err := proxier.openOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service)   // 打开iptables对应的信息if err != nil {return err}for _, publicIP := range info.externalIPs {err = proxier.openOnePortal(portal{net.ParseIP(publicIP), info.portal.port, true}, info.protocol, proxier.listenIP, info.proxyPort, service)  // 如果有暴露的IP则添加ipatbleif err != nil {return err}}for _, ingress := range info.loadBalancerStatus.Ingress {if ingress.IP != "" {err = proxier.openOnePortal(portal{net.ParseIP(ingress.IP), info.portal.port, false}, info.protocol, proxier.listenIP, info.proxyPort, service) // 如果有ingress插件的信息则打开if err != nil {return err}}}if info.nodePort != 0 {err = proxier.openNodePort(info.nodePort, info.protocol, proxier.listenIP, info.proxyPort, service)      // 如果打开了nodePort的信息则新增iptables的转发信息if err != nil {return err}}return nil
}// Ensure that portals exist for all services.
func (proxier *Proxier) ensurePortals() {proxier.mu.Lock()defer proxier.mu.Unlock()// NB: This does not remove rules that should not be present.for name, info := range proxier.serviceMap {                   // 通过api获取的最新变更完成的数据err := proxier.openPortal(name, info)                          // 确保端口服务打开if err != nil {glog.Errorf("Failed to ensure portal for %q: %v", name, err)}}
}// Sync is called to immediately synchronize the proxier state to iptables
func (proxier *Proxier) Sync() {if err := iptablesInit(proxier.iptables); err != nil {   // 确保iptables能够正常工作glog.Errorf("Failed to ensure iptables: %v", err)}proxier.ensurePortals()                                                               // 确保当前的的规则能够正常提供服务proxier.cleanupStaleStickySessions()                                         // 关闭过期的会话
}// SyncLoop runs periodic work.  This is expected to run as a goroutine or as the main loop of the app.  It does not return.
func (proxier *Proxier) SyncLoop() {t := time.NewTicker(proxier.syncPeriod)   // 开启定时任务defer t.Stop()for {<-t.Cglog.V(6).Infof("Periodic sync")proxier.Sync()                 }
}

这是通过同步的信息来确保当前的iptable的转发信息正确。

当信息修改之后调用的OnServiceUpdate。

// addServiceOnPort starts listening for a new service, returning the serviceInfo.
// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
// connections, for now.
func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) {sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort)  // 生成代理的sockif err != nil {return nil, err}_, portStr, err := net.SplitHostPort(sock.Addr().String())  // 获取端口if err != nil {sock.Close()return nil, err}portNum, err := strconv.Atoi(portStr)if err != nil {sock.Close()return nil, err}si := &serviceInfo{isAliveAtomic:       1,proxyPort:           portNum,protocol:            protocol,socket:              sock,timeout:             timeout,activeClients:       newClientCache(),sessionAffinityType: api.ServiceAffinityNone, // defaultstickyMaxAgeMinutes: 180,                     // TODO: parameterize this in the API.}proxier.setServiceInfo(service, si)glog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)go func(service proxy.ServicePortName, proxier *Proxier) {defer util.HandleCrash()atomic.AddInt32(&proxier.numProxyLoops, 1)sock.ProxyLoop(service, si, proxier)                    // 将接收到的数据写入到代理中atomic.AddInt32(&proxier.numProxyLoops, -1)     }(service, proxier)return si, nil
}// OnServiceUpdate manages the active set of service proxies.
// Active service proxies are reinitialized if found in the update set or
// shutdown if missing from the update set.
func (proxier *Proxier) OnServiceUpdate(services []api.Service) {glog.V(4).Infof("Received update notice: %+v", services)activeServices := make(map[proxy.ServicePortName]bool) // use a map as a setfor i := range services {service := &services[i]// if ClusterIP is "None" or empty, skip proxyingif !api.IsServiceIPSet(service) {             // 检查是否设置过glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP)continue}for i := range service.Spec.Ports {servicePort := &service.Spec.Ports[i]serviceName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name}activeServices[serviceName] = trueserviceIP := net.ParseIP(service.Spec.ClusterIP)info, exists := proxier.getServiceInfo(serviceName)   // 获取service信息// TODO: check health of the socket?  What if ProxyLoop exited?if exists && sameConfig(info, service, servicePort) {// Nothing changed.continue}if exists {glog.V(4).Infof("Something changed for service %q: stopping it", serviceName)err := proxier.closePortal(serviceName, info)      // 如果存在则关闭旧的信息if err != nil {glog.Errorf("Failed to close portal for %q: %v", serviceName, err)}err = proxier.stopProxy(serviceName, info)              // 停止旧的服务if err != nil {glog.Errorf("Failed to stop service %q: %v", serviceName, err)}}proxyPort, err := proxier.proxyPorts.AllocateNext()         // 获取随机端口号if err != nil {glog.Errorf("failed to allocate proxy port for service %q: %v", serviceName, err)continue}glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)                        info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout)         // 通过协议来创建新的service并运行用户态的代理if err != nil {glog.Errorf("Failed to start proxy for %q: %v", serviceName, err)continue}info.portal.ip = serviceIPinfo.portal.port = servicePort.Portinfo.externalIPs = service.Spec.ExternalIPs// Deep-copy in case the service instance changesinfo.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)info.nodePort = servicePort.NodePortinfo.sessionAffinityType = service.Spec.SessionAffinityglog.V(4).Infof("info: %+v", info)err = proxier.openPortal(serviceName, info)          // 生成iptables对应的信息if err != nil {glog.Errorf("Failed to open portal for %q: %v", serviceName, err)}proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes)                          // 设置对应的负载均衡信息}}proxier.mu.Lock()defer proxier.mu.Unlock()for name, info := range proxier.serviceMap {         // 删除已经不用的servcie信息if !activeServices[name] {glog.V(1).Infof("Stopping service %q", name)err := proxier.closePortal(name, info)              // 关闭端口信息if err != nil {glog.Errorf("Failed to close portal for %q: %v", name, err)}err = proxier.stopProxyInternal(name, info)         // 停止代理信息if err != nil {glog.Errorf("Failed to stop service %q: %v", name, err)}}}
}

至此,当信息变更的时候就会第一时间来进行service的信息修改。其实处理流程主要包含了如下两块:

  • 通过定时同步来保存services的信息与endpoints的信息在创建iptables时的数据准确,默认30秒同步一次,这也保证了如果手工修改错了iptables也有一定机会来进行修复。
  • 通过监控service和endpoint发来的修改的数据信号,来进行整个service和endpoint的代理信息的修改。

通过代码的学习,k8s的设计与实现都是非常值得学习与借鉴。

总结

通过1.1版本的学习,基本上了解到了kube-proxy的代理的实现的基本原理与逻辑,通过API的形式向master请求对应的service和endpoint信息,并根据修改调用本地的iptable来配置对应的网络服务,以此来实现在k8s集群中的透明的流量代理,不得不说k8s的设计与实现都值得深入去学习与阅读。由于本人才疏学浅,如有错误请批评指正。

k8s概念入门之kube-proxy-针对1.1版本阅读相关推荐

  1. k8s概念入门之kube-proxy-针对早期(0.4)版本阅读

    k8s的kube-proxy分析 Kube-proxy主要是伴随着kubtlet进程一起部署在每个node节点中,proxy的功能主要就是为了完成在k8s集群中实现集群内部的通信,也可完成集群外的数据 ...

  2. k8s概念入门之apiserver-针对1.1.版本阅读

    apiserver k8s中最重要的一个通信节点就是apiserver,是一个中心节点连接着每一环,是kubelet,kube-proxy和control-manager的交互的中心点,提供基于API ...

  3. k8s概念入门之control-manager-针对1.1.版本阅读

    control-manager 资源控制器主要是为了控制各种资源的变更信息,例如pod的创建新增,副本控制器和账户控制器等信息,资源控制器的主要职责就是通过list-watch机制,从APIServe ...

  4. k8s概念入门之kubelet-针对1.1.版本阅读

    kubelet kubelet是在每个节点上运行的主要"节点代理".它可以使用以下之一向apiserver注册该节点:主机名:用于覆盖主机名的标志:或云提供商的特定逻辑. kube ...

  5. k8s组件说明:kubelet 和 kube proxy

    k8s的node节点需要安装三个组件:docker/kubelet/kube proxy pod是存储容器的容器,但容器不止docker一种. CRI:container runtime interf ...

  6. Kubernetes ~ k8s 从入门到入坑。

    Kubernetes ~ k8s 从入门到入坑. 文章目录 Kubernetes ~ k8s 从入门到入坑. 1. Kubernetes 介绍. 1.1 应用部署方式演变. 1.2 kubernete ...

  7. Kubernetes(k8s)入门及集群部署文档

    文章目录 一.k8s 快速入门 简介 部署方式的进化 k8s能做什么 架构 整体主从方式 Master 节点架构 Node 节点架构 概念 快速体验 流程叙述 二.k8s 集群安装 前置要求 了解ku ...

  8. K8s基础入门及实战

    文章目录 Kubernetes组件 控制平面组件(Control Plane Components) Node 组件 插件(Addons) 调度器 概述 k8s 调度工作方式 工作负载 Namespa ...

  9. kubernetes 菜鸟_菜鸟系列k8s——k8s快速入门(1)

    k8s快速入门 1.快速创建k8s集群 点击教程菜单 1. Create a Cluster -> Interactive Tutorial - Creating a Cluster note: ...

最新文章

  1. 总结 | 如何测试你自己的 RubyGem
  2. 2012 Microsoft Intern Hiring Written Test [转]
  3. Java源码研究之object to json string debug
  4. 网络编程之如何通过URL获取网页代码
  5. Windows环境下搭建Tomcat
  6. 手机通讯录备份代码实现二
  7. 你读过的最心酸的句子有哪些?
  8. 十进制与二进制、八进制、十六进制对照表
  9. mac 安装 PyAudio
  10. el-descriptions文本水平垂直居中
  11. Unity Shader Graph 制作Grid网格效果
  12. 【矩阵论】04——线性空间——子空间
  13. weblogic 忘记密码重置密码
  14. 宝塔使用心得--快速部署javaweb应用
  15. Currency Trading: Dollar in a Funk as Traders Bet on Slow Rebound
  16. Arduino旋转编码器
  17. oracle如何exp远程备份,oracle exp远程数据库
  18. iVX无代码挑战五秒游戏制作
  19. 图的邻接矩阵表示法及顶点入度、出度的计算方法
  20. torbrowser浏览器的配置

热门文章

  1. 攀登数据科学家和数据工程师之间的隔墙
  2. 6大理由,告诉你为什么这个大会你不能错过! | 文末有福利
  3. 心酸科研路:3年前CVPR论文,仅被引用11次,如今成就黑洞照片!
  4. 佩奇扑街、外星人疯狂!Python 告诉你大年初二应该看哪部电影
  5. 专访英特尔AIPG全球研究负责人Casimir Wierzynski:物理学、隐私和大脑将根本性塑造AI
  6. NIPS2018 | 腾讯AI Lab入选20篇论文,含2篇Spotlight
  7. AI一分钟 | MIT研发盲眼机器人;卫报披露“假AI”,不少是人假扮的
  8. 微软披露拓扑量子计算机计划!
  9. AI一分钟 | Yann LeCun怒批机器人Sophia:招摇撞骗;李嘉诚:我比较保守,只投了1亿港币到比特币终端市场
  10. 频频曝出程序员被抓,我们该如何避免面向监狱编程?