k8s概念入门之kube-proxy-针对1.1版本阅读
背景
在后续阅读k8s0.4版本的过程中,发现文档上描述的确实是一个不完整的版本,故切换版本到1.1,因为在1.1文档中已经标明了可以在生成环境中使用,故重新再学习一下有关kube-proxy的内容,本文主要是做一个补充。
kube-proxy
功能还是以前那样通过部署在每个node节点上面,来提给service层面和node层面的通信支持。其主要的功能内容就是通过iptables来进行网络的联通与路由功能。
- proxy通过master提供的api直接获取对应的endpoints和services,通过定时访问api的方式来获取配置的信息。
- 通过第一步获取的services和endpoints的信息,通过iptables来操作数据转发,此时可以分为用户态转发也可以通过iptables来转发,一旦master中的enpoints和services的信息进行了改变,就本地通过iptables规则来进行管理。
如上的proxy的基本的核心实现原理就是如上两条。
启动流程
func init() {healthz.DefaultHealthz() // 设置心跳检测server
}func main() {runtime.GOMAXPROCS(runtime.NumCPU()) // 设置跟cpu数量相同的线程数量config := app.NewProxyConfig() // 获取代理配置config.AddFlags(pflag.CommandLine) // 解析输入参数util.InitFlags() // 解析启动的输入参数util.InitLogs()defer util.FlushLogs()verflag.PrintAndExitIfRequested()s, err := app.NewProxyServerDefault(config) // 初始化一个代理serverif err != nil {fmt.Fprintf(os.Stderr, "%v\n", err)os.Exit(1)}if err = s.Run(pflag.CommandLine.Args()); err != nil { // 运行起来fmt.Fprintf(os.Stderr, "%v\n", err)os.Exit(1)}
}
此时再NewProxyServerDefault中启动了对定时访问master接口信息的协程,在Run函数中启动了同步ipables规则的协程。
// NewProxyServerDefault creates a new ProxyServer object with default parameters.
func NewProxyServerDefault(config *ProxyServerConfig) (*ProxyServer, error) {protocol := utiliptables.ProtocolIpv4 // 检查是ipv4还是ipv6if config.BindAddress.To4() == nil {protocol = utiliptables.ProtocolIpv6}// We ommit creation of pretty much everything if we run in cleanup modeif config.CleanupAndExit {execer := exec.New()dbus := utildbus.New()IptInterface := utiliptables.New(execer, dbus, protocol)return &ProxyServer{Config: config,IptInterface: IptInterface,}, nil}// TODO(vmarmol): Use container config for this.var oomAdjuster *oom.OomAdjusterif config.OOMScoreAdj != 0 {oomAdjuster := oom.NewOomAdjuster()if err := oomAdjuster.ApplyOomScoreAdj(0, config.OOMScoreAdj); err != nil {glog.V(2).Info(err)}}if config.ResourceContainer != "" {// Run in its own container.if err := util.RunInResourceContainer(config.ResourceContainer); err != nil {glog.Warningf("Failed to start in resource-only container %q: %v", config.ResourceContainer, err)} else {glog.V(2).Infof("Running in resource-only container %q", config.ResourceContainer)}}// Create a Kube Client// define api config sourceif config.Kubeconfig == "" && config.Master == "" {glog.Warningf("Neither --kubeconfig nor --master was specified. Using default API client. This might not work.")}// This creates a client, first loading any specified kubeconfig// file, and then overriding the Master flag, if non-empty.kubeconfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(&clientcmd.ClientConfigLoadingRules{ExplicitPath: config.Kubeconfig},&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: config.Master}}).ClientConfig() // 如果包括是否添加证书或者认证的信息if err != nil {return nil, err}client, err := kubeclient.New(kubeconfig)// 生成访问的client实例if err != nil {glog.Fatalf("Invalid API configuration: %v", err)}// Create event recorderhostname := nodeutil.GetHostname(config.HostnameOverride)eventBroadcaster := record.NewBroadcaster()recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "kube-proxy", Host: hostname})eventBroadcaster.StartRecordingToSink(client.Events(""))// Create a iptables utils.execer := exec.New()dbus := utildbus.New()iptInterface := utiliptables.New(execer, dbus, protocol) // 生成操作iptables的接口var proxier proxy.ProxyProvidervar endpointsHandler proxyconfig.EndpointsConfigHandleruseIptablesProxy := false // 检查是用户态来转发还是直接利用iptables来转发if mayTryIptablesProxy(config.ProxyMode, client.Nodes(), hostname) {var err error// guaranteed false on error, error only necessary for debugginguseIptablesProxy, err = iptables.ShouldUseIptablesProxier() if err != nil {glog.Errorf("Can't determine whether to use iptables proxy, using userspace proxier: %v", err)}}if useIptablesProxy {glog.V(2).Info("Using iptables Proxier.")execer := exec.New()proxierIptables, err := iptables.NewProxier(iptInterface, execer, config.SyncPeriod, config.MasqueradeAll) // 使用iptables来转发if err != nil {glog.Fatalf("Unable to create proxier: %v", err)}proxier = proxierIptablesendpointsHandler = proxierIptables // 设置enpoint处理的函数// No turning back. Remove artifacts that might still exist from the userspace Proxier.glog.V(2).Info("Tearing down userspace rules. Errors here are acceptable.")userspace.CleanupLeftovers(iptInterface) } else {glog.V(2).Info("Using userspace Proxier.")// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for// our config.EndpointsConfigHandler.loadBalancer := userspace.NewLoadBalancerRR() // 用户态的转发// set EndpointsConfigHandler to our loadBalancerendpointsHandler = loadBalancer // 负载均衡的规则proxierUserspace, err := userspace.NewProxier(loadBalancer, config.BindAddress, iptInterface, config.PortRange, config.SyncPeriod, config.UDPIdleTimeout) // 生成用户态转发的代理实例if err != nil {glog.Fatalf("Unable to create proxier: %v", err)}proxier = proxierUserspace// Remove artifacts from the pure-iptables Proxier.glog.V(2).Info("Tearing down pure-iptables proxy rules. Errors here are acceptable.")iptables.CleanupLeftovers(iptInterface) }iptInterface.AddReloadFunc(proxier.Sync)// Create configs (i.e. Watches for Services and Endpoints)// Note: RegisterHandler() calls need to happen before creation of Sources because sources// only notify on changes, and the initial update (on process start) may be lost if no handlers// are registered yet.serviceConfig := proxyconfig.NewServiceConfig() // 生成service监控的实例并注册proxierserviceConfig.RegisterHandler(proxier)endpointsConfig := proxyconfig.NewEndpointsConfig() // 生成endpoint监控的实例并监控endpointsConfig.RegisterHandler(endpointsHandler)proxyconfig.NewSourceAPI(client,30*time.Second,serviceConfig.Channel("api"),endpointsConfig.Channel("api"),) // 生成对service和endpoint的定时访问master的任务config.nodeRef = &api.ObjectReference{Kind: "Node",Name: hostname,UID: types.UID(hostname),Namespace: "",}return NewProxyServer(config, client, endpointsConfig, endpointsHandler, iptInterface, oomAdjuster, proxier, recorder, serviceConfig) // 生成代理转发的实例
}
通过该流程可以看出如下两点:
- 无论是用户态转发还是iptables转发都是通过iptables来控制流量的转发任务的,如果是在用户态则需要根据对应的IP和端口来监听,并通过规则将流量转发到该端口上并服务,这也就是service为什么可以在每个node上面进行访问。
- proxy与master的交互包括两种一个是service,另一个是endpoint的信息交互,并且暴露心跳检查的端口来检查proxy是否存活,proxy通过获取master修改的信息来进行本地iptables的相关信息的维护。
service和endpoint信息同步
type EndpointsConfig struct {mux *config.Muxbcaster *config.Broadcasterstore *endpointsStore
} // endpoint同步的结构体// NewEndpointsConfig creates a new EndpointsConfig.
// It immediately runs the created EndpointsConfig.
func NewEndpointsConfig() *EndpointsConfig {updates := make(chan struct{}) // 生成一个管道store := &endpointsStore{updates: updates, endpoints: make(map[string]map[types.NamespacedName]api.Endpoints)} // 生成一个保存的结构体mux := config.NewMux(store) // 生成一个mux该mux用来保存管道方便多个数据更新bcaster := config.NewBroadcaster() // 生成一个广播者go watchForUpdates(bcaster, store, updates) // 监听更新信息return &EndpointsConfig{mux, bcaster, store} // 返回该实例
} func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {glog.V(3).Infof("Calling handler.OnEndpointsUpdate()")handler.OnEndpointsUpdate(instance.([]api.Endpoints))})) // 注册一个广播者的回调对象,一旦有更新就调用该handler的OnEndpointsUpdate方法
}func (c *EndpointsConfig) Channel(source string) chan EndpointsUpdate {ch := c.mux.Channel(source) // 获取一个管道endpointsCh := make(chan EndpointsUpdate) // 生成一个消息输入的chango func() {for update := range endpointsCh { // 监听管道的数据并发送给监听的一端ch <- update}close(ch) // 如果退出则关闭}()return endpointsCh // 返回
}func (c *EndpointsConfig) Config() []api.Endpoints {return c.store.MergedState().([]api.Endpoints) // 返回endpoints
}type endpointsStore struct {endpointLock sync.RWMutexendpoints map[string]map[types.NamespacedName]api.Endpointsupdates chan<- struct{}
}func (s *endpointsStore) Merge(source string, change interface{}) error {s.endpointLock.Lock()endpoints := s.endpoints[source]if endpoints == nil {endpoints = make(map[types.NamespacedName]api.Endpoints)}update := change.(EndpointsUpdate)switch update.Op { // 根据操作来进行endpoins的增删改case ADD:glog.V(4).Infof("Adding new endpoint from source %s : %s", source, spew.Sdump(update.Endpoints))for _, value := range update.Endpoints {name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}endpoints[name] = value}case REMOVE:glog.V(4).Infof("Removing an endpoint %s", spew.Sdump(update))for _, value := range update.Endpoints {name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}delete(endpoints, name)}case SET:glog.V(4).Infof("Setting endpoints %s", spew.Sdump(update))// Clear the old map entries by just creating a new mapendpoints = make(map[types.NamespacedName]api.Endpoints)for _, value := range update.Endpoints {name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}endpoints[name] = value}default:glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update))}s.endpoints[source] = endpoints // 保存对应管道的endpointss.endpointLock.Unlock()if s.updates != nil {s.updates <- struct{}{}}return nil
}func (s *endpointsStore) MergedState() interface{} {s.endpointLock.RLock()defer s.endpointLock.RUnlock()endpoints := make([]api.Endpoints, 0)for _, sourceEndpoints := range s.endpoints { for _, value := range sourceEndpoints { // 新增endpoints的数据endpoints = append(endpoints, value)}}return endpoints
}// ServiceConfig tracks a set of service configurations.
// It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change.
type ServiceConfig struct {mux *config.Muxbcaster *config.Broadcasterstore *serviceStore
}// NewServiceConfig creates a new ServiceConfig.
// It immediately runs the created ServiceConfig.
func NewServiceConfig() *ServiceConfig {updates := make(chan struct{})store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]api.Service)} // 生成一个storemux := config.NewMux(store)bcaster := config.NewBroadcaster() // 生成一个广播者go watchForUpdates(bcaster, store, updates) // 监控数据更新return &ServiceConfig{mux, bcaster, store}
}func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {glog.V(3).Infof("Calling handler.OnServiceUpdate()")handler.OnServiceUpdate(instance.([]api.Service))})) // 注册更新之后调用的回调函数处理
}func (c *ServiceConfig) Channel(source string) chan ServiceUpdate {ch := c.mux.Channel(source)serviceCh := make(chan ServiceUpdate) // 通过监控这个chan 来更新到另外等待的chan中go func() {for update := range serviceCh {ch <- update}close(ch)}()return serviceCh
}func (c *ServiceConfig) Config() []api.Service {return c.store.MergedState().([]api.Service)
}type serviceStore struct {serviceLock sync.RWMutexservices map[string]map[types.NamespacedName]api.Serviceupdates chan<- struct{}
}func (s *serviceStore) Merge(source string, change interface{}) error {s.serviceLock.Lock()services := s.services[source]if services == nil {services = make(map[types.NamespacedName]api.Service)}update := change.(ServiceUpdate)switch update.Op { // 根据更新操作来进行增删改case ADD:glog.V(4).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Services))for _, value := range update.Services {name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}services[name] = value}case REMOVE:glog.V(4).Infof("Removing a service %s", spew.Sdump(update))for _, value := range update.Services {name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}delete(services, name)}case SET:glog.V(4).Infof("Setting services %s", spew.Sdump(update))// Clear the old map entries by just creating a new mapservices = make(map[types.NamespacedName]api.Service)for _, value := range update.Services {name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}services[name] = value}default:glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update))}s.services[source] = services // 保证最终的services信息s.serviceLock.Unlock()if s.updates != nil {s.updates <- struct{}{}}return nil
}func (s *serviceStore) MergedState() interface{} {s.serviceLock.RLock()defer s.serviceLock.RUnlock()services := make([]api.Service, 0)for _, sourceServices := range s.services {for _, value := range sourceServices {services = append(services, value)}}return services
}// watchForUpdates invokes bcaster.Notify() with the latest version of an object
// when changes occur.
func watchForUpdates(bcaster *config.Broadcaster, accessor config.Accessor, updates <-chan struct{}) {for true {<-updatesbcaster.Notify(accessor.MergedState()) // 调用广播者的合并信息}
}
该段代码主要通过了订阅发布的设计模式,将需要订阅的操作都集中到了bcaster中,通过调用Notify来通知每一个订阅消息的对象。当一切准备完成之后就需要启动定时访问master的协程。
proxyconfig.NewSourceAPI(client,30*time.Second,serviceConfig.Channel("api"),endpointsConfig.Channel("api"),
)...// NewSourceAPIserver creates config source that watches for changes to the services and endpoints.
func NewSourceAPI(c *client.Client, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) {servicesLW := cache.NewListWatchFromClient(c, "services", api.NamespaceAll, fields.Everything()) // 生成一个services的定时信息endpointsLW := cache.NewListWatchFromClient(c, "endpoints", api.NamespaceAll, fields.Everything()) // 生成一个endpoints的定时信息newServicesSourceApiFromLW(servicesLW, period, servicesChan)newEndpointsSourceApiFromLW(endpointsLW, period, endpointsChan)
}func newServicesSourceApiFromLW(servicesLW cache.ListerWatcher, period time.Duration, servicesChan chan<- ServiceUpdate) {servicesPush := func(objs []interface{}) {var services []api.Servicefor _, o := range objs {services = append(services, *(o.(*api.Service)))}servicesChan <- ServiceUpdate{Op: SET, Services: services} // 将更新的信息发入servicesChan中}serviceQueue := cache.NewUndeltaStore(servicesPush, cache.MetaNamespaceKeyFunc)cache.NewReflector(servicesLW, &api.Service{}, serviceQueue, period).Run()
}func newEndpointsSourceApiFromLW(endpointsLW cache.ListerWatcher, period time.Duration, endpointsChan chan<- EndpointsUpdate) {endpointsPush := func(objs []interface{}) {var endpoints []api.Endpointsfor _, o := range objs {endpoints = append(endpoints, *(o.(*api.Endpoints)))}endpointsChan <- EndpointsUpdate{Op: SET, Endpoints: endpoints} // 发送到endpointsChan中}endpointQueue := cache.NewUndeltaStore(endpointsPush, cache.MetaNamespaceKeyFunc) // 生成一个存储cache.NewReflector(endpointsLW, &api.Endpoints{}, endpointQueue, period).Run() // 定时运行监听协程
}
通过配置可知每个三十秒就去拉取一下master的接口来检查当前是否有service或endpoint的信息变更。
首先生成一个客户端;
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {listFunc := func() (runtime.Object, error) {return c.Get().Namespace(namespace).Resource(resource).FieldsSelectorParam(fieldSelector).Do().Get()} // 注册list的回调函数watchFunc := func(resourceVersion string) (watch.Interface, error) {return c.Get().Prefix("watch").Namespace(namespace).Resource(resource).FieldsSelectorParam(fieldSelector).Param("resourceVersion", resourceVersion).Watch()} // 注册watchFunc的回调函数return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
生成回调函数之后就需要启动对应的调用的方法。
// NewReflector creates a new Reflector object which will keep the given store up to
// date with the server's contents for the given resource. Reflector promises to
// only put things in the store that have the type of expectedType, unless expectedType
// is nil. If resyncPeriod is non-zero, then lists will be executed after every
// resyncPeriod, so that you can use reflectors to periodically process everything as
// well as incrementally processing the things that change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)
}// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {r := &Reflector{name: name,listerWatcher: lw,store: store,expectedType: reflect.TypeOf(expectedType),period: time.Second,resyncPeriod: resyncPeriod,}return r
}// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
// call chains to NewReflector, so they'd be low entropy names for reflectors
var internalPackages = []string{"kubernetes/pkg/client/cache/", "kubernetes/pkg/controller/framework/"}
...// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run starts a goroutine and returns immediately.
func (r *Reflector) Run() {go util.Until(func() { r.ListAndWatch(util.NeverStop) }, r.period, util.NeverStop)
}// Returns error if ListAndWatch didn't even tried to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {var resourceVersion stringresyncCh, cleanup := r.resyncChan()defer cleanup()if r.verbose {glog.Infof("watch %v: listwatch cycle started.", r.name)defer glog.Infof("watch %v: listwatch cycle exited.", r.name)}list, err := r.listerWatcher.List() // 获取信息if err != nil {return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)}meta, err := meta.Accessor(list)if err != nil {return fmt.Errorf("%s: Unable to understand list result %#v", r.name, list)}resourceVersion = meta.ResourceVersion() // 获取信息items, err := runtime.ExtractList(list)if err != nil {return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)}if err := r.syncWith(items, resourceVersion); err != nil {return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)}r.setLastSyncResourceVersion(resourceVersion)for {w, err := r.listerWatcher.Watch(resourceVersion) // 调用访问接口访问数据if err != nil {switch err {case io.EOF:// watch closed normallycase io.ErrUnexpectedEOF:glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)default:util.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))}// If this is "connection refused" error, it means that most likely apiserver is not responsive.// It doesn't make sense to re-list all objects because most likely we will be able to restart// watch where we ended.// If that's the case wait and resend watch request.if urlError, ok := err.(*url.Error); ok {if opError, ok := urlError.Err.(*net.OpError); ok {if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {time.Sleep(time.Second)continue}}}return nil}if err := r.watchHandler(w, &resourceVersion, resyncCh, stopCh); err != nil { // 调用handle进行处理if err != errorResyncRequested && err != errorStopRequested {glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)}return nil}}
}// syncWith replaces the store's items with the given list.
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {found := make([]interface{}, 0, len(items))for _, item := range items {found = append(found, item)}return r.store.Replace(found, resourceVersion)
}// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, resyncCh <-chan time.Time, stopCh <-chan struct{}) error {start := time.Now()eventCount := 0// Stopping the watcher should be idempotent and if we return from this function there's no way// we're coming back in with the same watch interface.defer w.Stop()if r.verbose {glog.Infof("watch %v: watch started.", r.name)defer glog.Infof("watch %v: watch exited.", r.name)}loop:for {select {case <-stopCh:if r.verbose {glog.Infof("watch %v: stop requested.", r.name)}return errorStopRequestedcase <-resyncCh:if r.verbose {glog.Infof("watch %v: time to resync.", r.name)}return errorResyncRequestedcase event, ok := <-w.ResultChan(): // 获取返回数据if !ok {break loop}if r.verbose {glog.Infof("watch %v: about to process result %s %#v.", r.name, event.Type, event.Object)}if event.Type == watch.Error {return apierrs.FromObject(event.Object)}if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {util.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))continue}meta, err := meta.Accessor(event.Object) // 获取源信息if err != nil {util.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))continue}newResourceVersion := meta.ResourceVersion()switch event.Type {case watch.Added:r.store.Add(event.Object) // 调用增删改来处理当前返回的数据case watch.Modified:r.store.Update(event.Object)case watch.Deleted:// TODO: Will any consumers need access to the "last known// state", which is passed in event.Object? If so, may need// to change this.r.store.Delete(event.Object)default:util.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))}*resourceVersion = newResourceVersionr.setLastSyncResourceVersion(newResourceVersion)eventCount++if r.verbose {glog.Infof("watch %v: now at rv %v.", r.name, *resourceVersion)}}}watchDuration := time.Now().Sub(start)if watchDuration < 1*time.Second && eventCount == 0 {glog.V(4).Infof("%s: Unexpected watch close - watch lasted less than a second and no items received", r.name)return errors.New("very short watch")}glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)return nil
}
从代码中可知NewReflector会启动一个协程来调用注册的ListAndWatch来进行处理,以services为例,就是进行。
最终会通过如下函数进行处理
// cache responsibilities are limited to:
// 1. Computing keys for objects via keyFunc
// 2. Invoking methods of a ThreadSafeStorage interface
type cache struct {// cacheStorage bears the burden of thread safety for the cachecacheStorage ThreadSafeStore// keyFunc is used to make the key for objects stored in and retrieved from items, and// should be deterministic.keyFunc KeyFunc
}// Add inserts an item into the cache.
func (c *cache) Add(obj interface{}) error { // 添加key, err := c.keyFunc(obj) // 调用管道发送数据if err != nil {return KeyError{obj, err}}c.cacheStorage.Add(key, obj) // 保存对应信息return nil
}// Update sets an item in the cache to its updated state.
func (c *cache) Update(obj interface{}) error {key, err := c.keyFunc(obj) // 更新信息if err != nil {return KeyError{obj, err}}c.cacheStorage.Update(key, obj) // 更新对应信息return nil
}// Delete removes an item from the cache.
func (c *cache) Delete(obj interface{}) error {key, err := c.keyFunc(obj) // 先发送给等待的iptables信息if err != nil {return KeyError{obj, err}}c.cacheStorage.Delete(key) // 删除对应信息return nil
}// List returns a list of all the items.
// List is completely threadsafe as long as you treat all items as immutable.
func (c *cache) List() []interface{} {return c.cacheStorage.List()
}// ListKeys returns a list of all the keys of the objects currently
// in the cache.
func (c *cache) ListKeys() []string {return c.cacheStorage.ListKeys()
}// Index returns a list of items that match on the index function
// Index is thread-safe so long as you treat all items as immutable
func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) {return c.cacheStorage.Index(indexName, obj)
}// ListIndexFuncValues returns the list of generated values of an Index func
func (c *cache) ListIndexFuncValues(indexName string) []string {return c.cacheStorage.ListIndexFuncValues(indexName)
}func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {return c.cacheStorage.ByIndex(indexName, indexKey)
}// Get returns the requested item, or sets exists=false.
// Get is completely threadsafe as long as you treat all items as immutable.
func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {key, _ := c.keyFunc(obj)if err != nil {return nil, false, KeyError{obj, err}}return c.GetByKey(key)
}// GetByKey returns the request item, or exists=false.
// GetByKey is completely threadsafe as long as you treat all items as immutable.
func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {item, exists = c.cacheStorage.Get(key)return item, exists, nil
}// Replace will delete the contents of 'c', using instead the given list.
// 'c' takes ownership of the list, you should not reference the list again
// after calling this function.
func (c *cache) Replace(list []interface{}, resourceVersion string) error {items := map[string]interface{}{}for _, item := range list {key, err := c.keyFunc(item)if err != nil {return KeyError{item, err}}items[key] = item}c.cacheStorage.Replace(items, resourceVersion)return nil
}func NewStore(keyFunc KeyFunc) Store {return &cache{cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),keyFunc: keyFunc, // 对修改收到的数据进行回调处理的函数}
}
至此,通过api获取master的处理的流程基本完成,接下来看当等待接收变更的iptables相关的操作。
iptables相关操作流程
// Run runs the specified ProxyServer. This should never exit (unless CleanupAndExit is set).
func (s *ProxyServer) Run(_ []string) error {// remove iptables rules and exitif s.Config.CleanupAndExit {encounteredError := userspace.CleanupLeftovers(s.IptInterface)encounteredError = iptables.CleanupLeftovers(s.IptInterface) || encounteredErrorif encounteredError {return errors.New("Encountered an error while tearing down rules.")}return nil}// Birth Cry after the birth is successfuls.birthCry()// Start up Healthz service if requestedif s.Config.HealthzPort > 0 {go util.Until(func() {err := http.ListenAndServe(s.Config.HealthzBindAddress.String()+":"+strconv.Itoa(s.Config.HealthzPort), nil)if err != nil {glog.Errorf("Starting health server failed: %v", err)}}, 5*time.Second, util.NeverStop) // 开启心跳健康检查的端口}// Just loop forever for now...s.Proxier.SyncLoop() // 开启修改信息return nil
}
继续查看SyncLoop,本文默认查看用户态的流量转发。
func (proxier *Proxier) openOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error {if local, err := isLocalIP(portal.ip); err != nil {return fmt.Errorf("can't determine if IP is local, assuming not: %v", err)} else if local {err := proxier.claimNodePort(portal.ip, portal.port, protocol, name) // 代开一个新的代理端口信息if err != nil {return err}}// Handle traffic from containers.args := proxier.iptablesContainerPortalArgs(portal.ip, portal.isExternal, false, portal.port, protocol, proxyIP, proxyPort, name)existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerPortalChain, args...) // 检查规则if err != nil {glog.Errorf("Failed to install iptables %s rule for service %q, args:%v", iptablesContainerPortalChain, name, args)return err}if !existed {glog.V(3).Infof("Opened iptables from-containers portal for service %q on %s %s:%d", name, protocol, portal.ip, portal.port)}if portal.isExternal {args := proxier.iptablesContainerPortalArgs(portal.ip, false, true, portal.port, protocol, proxyIP, proxyPort, name) existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerPortalChain, args...) // 添加对外ip的规则if err != nil {glog.Errorf("Failed to install iptables %s rule that opens service %q for local traffic, args:%v", iptablesContainerPortalChain, name, args)return err}if !existed {glog.V(3).Infof("Opened iptables from-containers portal for service %q on %s %s:%d for local traffic", name, protocol, portal.ip, portal.port)}args = proxier.iptablesHostPortalArgs(portal.ip, true, portal.port, protocol, proxyIP, proxyPort, name)existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesHostPortalChain, args...)if err != nil {glog.Errorf("Failed to install iptables %s rule for service %q for dst-local traffic", iptablesHostPortalChain, name)return err}if !existed {glog.V(3).Infof("Opened iptables from-host portal for service %q on %s %s:%d for dst-local traffic", name, protocol, portal.ip, portal.port)}return nil}// Handle traffic from the host.args = proxier.iptablesHostPortalArgs(portal.ip, false, portal.port, protocol, proxyIP, proxyPort, name)existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesHostPortalChain, args...)if err != nil {glog.Errorf("Failed to install iptables %s rule for service %q", iptablesHostPortalChain, name)return err}if !existed {glog.V(3).Infof("Opened iptables from-host portal for service %q on %s %s:%d", name, protocol, portal.ip, portal.port)}return nil
}// Marks a port as being owned by a particular service, or returns error if already claimed.
// Idempotent: reclaiming with the same owner is not an error
func (proxier *Proxier) claimNodePort(ip net.IP, port int, protocol api.Protocol, owner proxy.ServicePortName) error {proxier.portMapMutex.Lock()defer proxier.portMapMutex.Unlock()// TODO: We could pre-populate some reserved ports into portMap and/or blacklist some well-known portskey := portMapKey{ip: ip.String(), port: port, protocol: protocol}existing, found := proxier.portMap[key]if !found {// Hold the actual port open, even though we use iptables to redirect// it. This ensures that a) it's safe to take and b) that stays true.// NOTE: We should not need to have a real listen()ing socket - bind()// should be enough, but I can't figure out a way to e2e test without// it. Tools like 'ss' and 'netstat' do not show sockets that are// bind()ed but not listen()ed, and at least the default debian netcat// has no way to avoid about 10 seconds of retries.socket, err := newProxySocket(protocol, ip, port) // 生成一个代理的socket来进行数据转发if err != nil {return fmt.Errorf("can't open node port for %s: %v", key.String(), err)}proxier.portMap[key] = &portMapValue{owner: owner, socket: socket} // 保存信息glog.V(2).Infof("Claimed local port %s", key.String())return nil}if existing.owner == owner {// We are idempotentreturn nil}return fmt.Errorf("Port conflict detected on port %s. %v vs %v", key.String(), owner, existing)
}func (proxier *Proxier) openPortal(service proxy.ServicePortName, info *serviceInfo) error {err := proxier.openOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service) // 打开iptables对应的信息if err != nil {return err}for _, publicIP := range info.externalIPs {err = proxier.openOnePortal(portal{net.ParseIP(publicIP), info.portal.port, true}, info.protocol, proxier.listenIP, info.proxyPort, service) // 如果有暴露的IP则添加ipatbleif err != nil {return err}}for _, ingress := range info.loadBalancerStatus.Ingress {if ingress.IP != "" {err = proxier.openOnePortal(portal{net.ParseIP(ingress.IP), info.portal.port, false}, info.protocol, proxier.listenIP, info.proxyPort, service) // 如果有ingress插件的信息则打开if err != nil {return err}}}if info.nodePort != 0 {err = proxier.openNodePort(info.nodePort, info.protocol, proxier.listenIP, info.proxyPort, service) // 如果打开了nodePort的信息则新增iptables的转发信息if err != nil {return err}}return nil
}// Ensure that portals exist for all services.
func (proxier *Proxier) ensurePortals() {proxier.mu.Lock()defer proxier.mu.Unlock()// NB: This does not remove rules that should not be present.for name, info := range proxier.serviceMap { // 通过api获取的最新变更完成的数据err := proxier.openPortal(name, info) // 确保端口服务打开if err != nil {glog.Errorf("Failed to ensure portal for %q: %v", name, err)}}
}// Sync is called to immediately synchronize the proxier state to iptables
func (proxier *Proxier) Sync() {if err := iptablesInit(proxier.iptables); err != nil { // 确保iptables能够正常工作glog.Errorf("Failed to ensure iptables: %v", err)}proxier.ensurePortals() // 确保当前的的规则能够正常提供服务proxier.cleanupStaleStickySessions() // 关闭过期的会话
}// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
func (proxier *Proxier) SyncLoop() {t := time.NewTicker(proxier.syncPeriod) // 开启定时任务defer t.Stop()for {<-t.Cglog.V(6).Infof("Periodic sync")proxier.Sync() }
}
这是通过同步的信息来确保当前的iptable的转发信息正确。
当信息修改之后调用的OnServiceUpdate。
// addServiceOnPort starts listening for a new service, returning the serviceInfo.
// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
// connections, for now.
func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) {sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort) // 生成代理的sockif err != nil {return nil, err}_, portStr, err := net.SplitHostPort(sock.Addr().String()) // 获取端口if err != nil {sock.Close()return nil, err}portNum, err := strconv.Atoi(portStr)if err != nil {sock.Close()return nil, err}si := &serviceInfo{isAliveAtomic: 1,proxyPort: portNum,protocol: protocol,socket: sock,timeout: timeout,activeClients: newClientCache(),sessionAffinityType: api.ServiceAffinityNone, // defaultstickyMaxAgeMinutes: 180, // TODO: parameterize this in the API.}proxier.setServiceInfo(service, si)glog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)go func(service proxy.ServicePortName, proxier *Proxier) {defer util.HandleCrash()atomic.AddInt32(&proxier.numProxyLoops, 1)sock.ProxyLoop(service, si, proxier) // 将接收到的数据写入到代理中atomic.AddInt32(&proxier.numProxyLoops, -1) }(service, proxier)return si, nil
}// OnServiceUpdate manages the active set of service proxies.
// Active service proxies are reinitialized if found in the update set or
// shutdown if missing from the update set.
func (proxier *Proxier) OnServiceUpdate(services []api.Service) {glog.V(4).Infof("Received update notice: %+v", services)activeServices := make(map[proxy.ServicePortName]bool) // use a map as a setfor i := range services {service := &services[i]// if ClusterIP is "None" or empty, skip proxyingif !api.IsServiceIPSet(service) { // 检查是否设置过glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP)continue}for i := range service.Spec.Ports {servicePort := &service.Spec.Ports[i]serviceName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name}activeServices[serviceName] = trueserviceIP := net.ParseIP(service.Spec.ClusterIP)info, exists := proxier.getServiceInfo(serviceName) // 获取service信息// TODO: check health of the socket? What if ProxyLoop exited?if exists && sameConfig(info, service, servicePort) {// Nothing changed.continue}if exists {glog.V(4).Infof("Something changed for service %q: stopping it", serviceName)err := proxier.closePortal(serviceName, info) // 如果存在则关闭旧的信息if err != nil {glog.Errorf("Failed to close portal for %q: %v", serviceName, err)}err = proxier.stopProxy(serviceName, info) // 停止旧的服务if err != nil {glog.Errorf("Failed to stop service %q: %v", serviceName, err)}}proxyPort, err := proxier.proxyPorts.AllocateNext() // 获取随机端口号if err != nil {glog.Errorf("failed to allocate proxy port for service %q: %v", serviceName, err)continue}glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout) // 通过协议来创建新的service并运行用户态的代理if err != nil {glog.Errorf("Failed to start proxy for %q: %v", serviceName, err)continue}info.portal.ip = serviceIPinfo.portal.port = servicePort.Portinfo.externalIPs = service.Spec.ExternalIPs// Deep-copy in case the service instance changesinfo.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)info.nodePort = servicePort.NodePortinfo.sessionAffinityType = service.Spec.SessionAffinityglog.V(4).Infof("info: %+v", info)err = proxier.openPortal(serviceName, info) // 生成iptables对应的信息if err != nil {glog.Errorf("Failed to open portal for %q: %v", serviceName, err)}proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes) // 设置对应的负载均衡信息}}proxier.mu.Lock()defer proxier.mu.Unlock()for name, info := range proxier.serviceMap { // 删除已经不用的servcie信息if !activeServices[name] {glog.V(1).Infof("Stopping service %q", name)err := proxier.closePortal(name, info) // 关闭端口信息if err != nil {glog.Errorf("Failed to close portal for %q: %v", name, err)}err = proxier.stopProxyInternal(name, info) // 停止代理信息if err != nil {glog.Errorf("Failed to stop service %q: %v", name, err)}}}
}
至此,当信息变更的时候就会第一时间来进行service的信息修改。其实处理流程主要包含了如下两块:
- 通过定时同步来保存services的信息与endpoints的信息在创建iptables时的数据准确,默认30秒同步一次,这也保证了如果手工修改错了iptables也有一定机会来进行修复。
- 通过监控service和endpoint发来的修改的数据信号,来进行整个service和endpoint的代理信息的修改。
通过代码的学习,k8s的设计与实现都是非常值得学习与借鉴。
总结
通过1.1版本的学习,基本上了解到了kube-proxy的代理的实现的基本原理与逻辑,通过API的形式向master请求对应的service和endpoint信息,并根据修改调用本地的iptable来配置对应的网络服务,以此来实现在k8s集群中的透明的流量代理,不得不说k8s的设计与实现都值得深入去学习与阅读。由于本人才疏学浅,如有错误请批评指正。
k8s概念入门之kube-proxy-针对1.1版本阅读相关推荐
- k8s概念入门之kube-proxy-针对早期(0.4)版本阅读
k8s的kube-proxy分析 Kube-proxy主要是伴随着kubtlet进程一起部署在每个node节点中,proxy的功能主要就是为了完成在k8s集群中实现集群内部的通信,也可完成集群外的数据 ...
- k8s概念入门之apiserver-针对1.1.版本阅读
apiserver k8s中最重要的一个通信节点就是apiserver,是一个中心节点连接着每一环,是kubelet,kube-proxy和control-manager的交互的中心点,提供基于API ...
- k8s概念入门之control-manager-针对1.1.版本阅读
control-manager 资源控制器主要是为了控制各种资源的变更信息,例如pod的创建新增,副本控制器和账户控制器等信息,资源控制器的主要职责就是通过list-watch机制,从APIServe ...
- k8s概念入门之kubelet-针对1.1.版本阅读
kubelet kubelet是在每个节点上运行的主要"节点代理".它可以使用以下之一向apiserver注册该节点:主机名:用于覆盖主机名的标志:或云提供商的特定逻辑. kube ...
- k8s组件说明:kubelet 和 kube proxy
k8s的node节点需要安装三个组件:docker/kubelet/kube proxy pod是存储容器的容器,但容器不止docker一种. CRI:container runtime interf ...
- Kubernetes ~ k8s 从入门到入坑。
Kubernetes ~ k8s 从入门到入坑. 文章目录 Kubernetes ~ k8s 从入门到入坑. 1. Kubernetes 介绍. 1.1 应用部署方式演变. 1.2 kubernete ...
- Kubernetes(k8s)入门及集群部署文档
文章目录 一.k8s 快速入门 简介 部署方式的进化 k8s能做什么 架构 整体主从方式 Master 节点架构 Node 节点架构 概念 快速体验 流程叙述 二.k8s 集群安装 前置要求 了解ku ...
- K8s基础入门及实战
文章目录 Kubernetes组件 控制平面组件(Control Plane Components) Node 组件 插件(Addons) 调度器 概述 k8s 调度工作方式 工作负载 Namespa ...
- kubernetes 菜鸟_菜鸟系列k8s——k8s快速入门(1)
k8s快速入门 1.快速创建k8s集群 点击教程菜单 1. Create a Cluster -> Interactive Tutorial - Creating a Cluster note: ...
最新文章
- 总结 | 如何测试你自己的 RubyGem
- 2012 Microsoft Intern Hiring Written Test [转]
- Java源码研究之object to json string debug
- 网络编程之如何通过URL获取网页代码
- Windows环境下搭建Tomcat
- 手机通讯录备份代码实现二
- 你读过的最心酸的句子有哪些?
- 十进制与二进制、八进制、十六进制对照表
- mac 安装 PyAudio
- el-descriptions文本水平垂直居中
- Unity Shader Graph 制作Grid网格效果
- 【矩阵论】04——线性空间——子空间
- weblogic 忘记密码重置密码
- 宝塔使用心得--快速部署javaweb应用
- Currency Trading: Dollar in a Funk as Traders Bet on Slow Rebound
- Arduino旋转编码器
- oracle如何exp远程备份,oracle exp远程数据库
- iVX无代码挑战五秒游戏制作
- 图的邻接矩阵表示法及顶点入度、出度的计算方法
- torbrowser浏览器的配置
热门文章
- 攀登数据科学家和数据工程师之间的隔墙
- 6大理由,告诉你为什么这个大会你不能错过! | 文末有福利
- 心酸科研路:3年前CVPR论文,仅被引用11次,如今成就黑洞照片!
- 佩奇扑街、外星人疯狂!Python 告诉你大年初二应该看哪部电影
- 专访英特尔AIPG全球研究负责人Casimir Wierzynski:物理学、隐私和大脑将根本性塑造AI
- NIPS2018 | 腾讯AI Lab入选20篇论文,含2篇Spotlight
- AI一分钟 | MIT研发盲眼机器人;卫报披露“假AI”,不少是人假扮的
- 微软披露拓扑量子计算机计划!
- AI一分钟 | Yann LeCun怒批机器人Sophia:招摇撞骗;李嘉诚:我比较保守,只投了1亿港币到比特币终端市场
- 频频曝出程序员被抓,我们该如何避免面向监狱编程?