k8s的kube-proxy分析

Kube-proxy主要是伴随着kubtlet进程一起部署在每个node节点中,proxy的功能主要就是为了完成在k8s集群中实现集群内部的通信,也可完成集群外的数据到集群内部的通信。从功能上来说确实是完成了更高层次的网络封装,让用户能够忽略网络层的部分细节从而专注于业务层的功能。

在早期的k8s的实现中,使用了最简单快速的方式来实现流量的转发,即通过用户态的数据接受直接转发到后端的目标地址上去。

首先查看proxy的main函数:

var (etcdServerList util.StringList     // etcd监听的服务器列表etcdConfigFile = flag.String("etcd_config", "", "The config file for the etcd client. Mutually exclusive with -etcd_servers")bindAddress    = util.IP(net.ParseIP("0.0.0.0"))   // 本地绑定的端口clientConfig   = &client.Config{}                                 // 客户端配置
)func init() {client.BindClientConfigFlags(flag.CommandLine, clientConfig)flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated (optional). Mutually exclusive with -etcd_config")flag.Var(&bindAddress, "bind_address", "The address for the proxy server to serve on (set to 0.0.0.0 for all interfaces)")
}func main() {flag.Parse()util.InitLogs()defer util.FlushLogs()verflag.PrintAndExitIfRequested()serviceConfig := config.NewServiceConfig()         // 生成一个serviceConfigendpointsConfig := config.NewEndpointsConfig()          // 生成endpoint结构// define api config sourceif clientConfig.Host != "" {glog.Infof("Using api calls to get config %v", clientConfig.Host)client, err := client.New(clientConfig)            if err != nil {glog.Fatalf("Invalid API configuration: %v", err)}config.NewSourceAPI(client,30*time.Second,serviceConfig.Channel("api"),endpointsConfig.Channel("api"),)                                               // 生成对应的配置中心} else {var etcdClient *etcd.Client                                      // etcd客户端连接// Set up etcd clientif len(etcdServerList) > 0 {// Set up logger for etcd clientetcd.SetLogger(util.NewLogger("etcd "))etcdClient = etcd.NewClient(etcdServerList)} else if *etcdConfigFile != "" {// Set up logger for etcd clientetcd.SetLogger(util.NewLogger("etcd "))var err erroretcdClient, err = etcd.NewClientFromFile(*etcdConfigFile)if err != nil {glog.Fatalf("Error with etcd config file: %v", err)}}// Create a configuration source that handles configuration from etcd.if etcdClient != nil {glog.Infof("Using etcd servers %v", etcdClient.GetCluster())config.NewConfigSourceEtcd(etcdClient,serviceConfig.Channel("etcd"),endpointsConfig.Channel("etcd"))                             // 设置从etcd读取配置}}loadBalancer := proxy.NewLoadBalancerRR()                  // 通过负载均衡的算法proxier := proxy.NewProxier(loadBalancer, net.IP(bindAddress))     // 监听代理信息// Wire proxier to handle changes to servicesserviceConfig.RegisterHandler(proxier)// And wire loadBalancer to handle changes to endpoints to servicesendpointsConfig.RegisterHandler(loadBalancer)           // 注册endpoint// Just loop forever for now...select {}
}

从执行流程上一块看出,通过service和endpoint的概念,将service对应所有的服务与endpoint来绑定,即endpoint绑定所有该服务后续对应的pod的节点。如果从etcd来监控信息变化的方式的话,就会通过etcd监控的增删改来控制当前的proxy的端口的开启或关闭。

我们继续查看config.NewConfigSourceEtcd来查看具体的运行逻辑:

// NewConfigSourceEtcd creates a new ConfigSourceEtcd and immediately runs the created ConfigSourceEtcd in a goroutine.
func NewConfigSourceEtcd(client *etcd.Client, serviceChannel chan ServiceUpdate, endpointsChannel chan EndpointsUpdate) ConfigSourceEtcd {config := ConfigSourceEtcd{client:           client,                 // 传入的etcd实例客户端serviceChannel:   serviceChannel,    // 监听的service的chanendpointsChannel: endpointsChannel,   // 监听的endpoint的chaninterval:         2 * time.Second, }go config.Run()                                              // 开始监听变化return config
}// Run begins watching for new services and their endpoints on etcd.
func (s ConfigSourceEtcd) Run() {// Initially, just wait for the etcd to come up before doing anything more complicated.var services []api.Servicevar endpoints []api.Endpointsvar err errorfor {services, endpoints, err = s.GetServices()     // 获取当前的service信息if err == nil {break}glog.V(1).Infof("Failed to get any services: %v", err)time.Sleep(s.interval)}if len(services) > 0 {serviceUpdate := ServiceUpdate{Op: SET, Services: services}   // 如果获取有service则添加到设置chan中s.serviceChannel <- serviceUpdate}if len(endpoints) > 0 {                                                                               // 如果获取有endpoints则发送到chan中endpointsUpdate := EndpointsUpdate{Op: SET, Endpoints: endpoints}s.endpointsChannel <- endpointsUpdate}// Ok, so we got something back from etcd. Let's set up a watch for new services, and// their endpointsgo util.Forever(s.WatchForChanges, 1*time.Second)       // 每隔一秒运行监听的变化 根据service和endpoint来添加服务信息for {services, endpoints, err = s.GetServices()            // 获取服务信息 并通知到每个通道if err != nil {glog.Errorf("ConfigSourceEtcd: Failed to get services: %v", err)} else {if len(services) > 0 {serviceUpdate := ServiceUpdate{Op: SET, Services: services}s.serviceChannel <- serviceUpdate}if len(endpoints) > 0 {endpointsUpdate := EndpointsUpdate{Op: SET, Endpoints: endpoints}s.endpointsChannel <- endpointsUpdate}}time.Sleep(30 * time.Second)}
}func (s ConfigSourceEtcd) WatchForChanges() {glog.V(4).Info("Setting up a watch for new services")watchChannel := make(chan *etcd.Response)go s.client.Watch("/registry/services/", 0, true, watchChannel, nil)  // 监听新加入的service信息for {watchResponse, ok := <-watchChannel    // 获取监听的数据if !ok { break}s.ProcessChange(watchResponse)         // 根据service来进行处理}
}func (s ConfigSourceEtcd) ProcessChange(response *etcd.Response) {glog.V(4).Infof("Processing a change in service configuration... %s", *response)// If it's a new service being added (signified by a localport being added)// then process it as suchif strings.Contains(response.Node.Key, "/endpoints/") {s.ProcessEndpointResponse(response)     // 如果包含endpoints信息则添加endpoint信息} else if response.Action == "set" {service, err := etcdResponseToService(response)   // 如果是set操作则新增servcieif err != nil {glog.Errorf("Failed to parse %s Port: %s", response, err)return}glog.V(4).Infof("New service added/updated: %#v", service)serviceUpdate := ServiceUpdate{Op: ADD, Services: []api.Service{*service}}s.serviceChannel <- serviceUpdatereturn}if response.Action == "delete" {                     // 如果是删除则发送删除信号parts := strings.Split(response.Node.Key[1:], "/")if len(parts) == 4 {glog.V(4).Infof("Deleting service: %s", parts[3])serviceUpdate := ServiceUpdate{Op: REMOVE, Services: []api.Service{{TypeMeta: api.TypeMeta{ID: parts[3]}}}}s.serviceChannel <- serviceUpdatereturn}glog.Warningf("Unknown service delete: %#v", parts)}
}func (s ConfigSourceEtcd) ProcessEndpointResponse(response *etcd.Response) {glog.V(4).Infof("Processing a change in endpoint configuration... %s", *response)var endpoints api.Endpointserr := latest.Codec.DecodeInto([]byte(response.Node.Value), &endpoints)  // 处理endpoint的信息变更if err != nil {glog.Errorf("Failed to parse service out of etcd key: %v : %+v", response.Node.Value, err)return}endpointsUpdate := EndpointsUpdate{Op: ADD, Endpoints: []api.Endpoints{endpoints}}s.endpointsChannel <- endpointsUpdate
}

通过初始化的流程可知,最终无论是servcie的变更还是endpoint的变更最终都发送到了service的通道和endpoint的通道中去,这其中的通道的初始化与工作就分别放在了ServiceConfig和EndpointsConfig中去了。

type ServiceConfig struct {mux     *config.Muxwatcher *config.Watcherstore   *serviceStore
}// NewServiceConfig creates a new ServiceConfig.
// It immediately runs the created ServiceConfig.
func NewServiceConfig() *ServiceConfig {updates := make(chan struct{})                         // 初始化通道store := &serviceStore{updates: updates, services: make(map[string]map[string]api.Service)}    // 初始化数据mux := config.NewMux(store)watcher := config.NewWatcher()                         // 初始化一个watchergo watchForUpdates(watcher, store, updates)      // 监听变化return &ServiceConfig{mux, watcher, store}
}func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {c.watcher.Add(config.ListenerFunc(func(instance interface{}) {handler.OnUpdate(instance.([]api.Service))}))     // 给watcher添加一个回调处理函数
}func (c *ServiceConfig) Channel(source string) chan ServiceUpdate {ch := c.mux.Channel(source)serviceCh := make(chan ServiceUpdate)  // 生成一个管道并通过该管道给通道通信go func() {for update := range serviceCh {ch <- update}close(ch)}()return serviceCh
}

一旦service有信息更新就会调用servcieConfig注册的proxier,endpoints有更新则调用注册的loadBalancer,这其中的回调的实现使用了订阅发布的模式,一旦该通道有消息则通知到所有订阅的handler处理。

我们查看一下proxier的流程:

// Proxier is a simple proxy for TCP connections between a localhost:lport
// and services that provide the actual implementations.
type Proxier struct {loadBalancer LoadBalancer          // 复杂均衡的组件mu           sync.Mutex // protects serviceMapserviceMap   map[string]*serviceInfo   //保存的service信息address      net.IP                                        // 监听的地址
}// NewProxier returns a new Proxier given a LoadBalancer and an
// address on which to listen
func NewProxier(loadBalancer LoadBalancer, address net.IP) *Proxier {return &Proxier{loadBalancer: loadBalancer,serviceMap:   make(map[string]*serviceInfo),address:      address,}
}// This assumes proxier.mu is not locked.
func (proxier *Proxier) stopProxy(service string, info *serviceInfo) error {proxier.mu.Lock()defer proxier.mu.Unlock()return proxier.stopProxyInternal(service, info)
}// This assumes proxier.mu is locked.
func (proxier *Proxier) stopProxyInternal(service string, info *serviceInfo) error {if !info.setActive(false) {return nil}glog.V(3).Infof("Removing service: %s", service)delete(proxier.serviceMap, service)return info.socket.Close()
}func (proxier *Proxier) getServiceInfo(service string) (*serviceInfo, bool) {proxier.mu.Lock()defer proxier.mu.Unlock()info, ok := proxier.serviceMap[service]return info, ok
}func (proxier *Proxier) setServiceInfo(service string, info *serviceInfo) {proxier.mu.Lock()defer proxier.mu.Unlock()proxier.serviceMap[service] = info
}// addServiceOnUnusedPort starts listening for a new service, returning the
// port it's using.  For testing on a system with unknown ports used.  The timeout only applies to UDP
// connections, for now.
func (proxier *Proxier) addServiceOnUnusedPort(service string, protocol api.Protocol, timeout time.Duration) (string, error) {sock, err := newProxySocket(protocol, proxier.address, 0)if err != nil {return "", err}_, port, err := net.SplitHostPort(sock.Addr().String())if err != nil {return "", err}portNum, err := strconv.Atoi(port)if err != nil {return "", err}proxier.setServiceInfo(service, &serviceInfo{port:     portNum,protocol: protocol,active:   true,socket:   sock,timeout:  timeout,})proxier.startAccepting(service, sock)return port, nil
}func (proxier *Proxier) startAccepting(service string, sock proxySocket) {glog.V(1).Infof("Listening for %s on %s:%s", service, sock.Addr().Network(), sock.Addr().String())go func(service string, proxier *Proxier) {defer util.HandleCrash()sock.ProxyLoop(service, proxier)}(service, proxier)
}// How long we leave idle UDP connections open.
const udpIdleTimeout = 1 * time.Minute// OnUpdate manages the active set of service proxies.
// Active service proxies are reinitialized if found in the update set or
// shutdown if missing from the update set.
func (proxier *Proxier) OnUpdate(services []api.Service) {glog.V(4).Infof("Received update notice: %+v", services)activeServices := util.StringSet{}for _, service := range services {        // 遍历服务信息activeServices.Insert(service.ID)info, exists := proxier.getServiceInfo(service.ID)    // 检查服务是否存在// TODO: check health of the socket?  What if ProxyLoop exited?if exists && info.isActive() && info.port == service.Port { // 如果已经存在则跳过continue}if exists && info.port != service.Port {          // 如果存在但是服务端口不同则停止原代理err := proxier.stopProxy(service.ID, info)if err != nil {glog.Errorf("error stopping %s: %v", service.ID, err)}}glog.V(3).Infof("Adding a new service %s on %s port %d", service.ID, service.Protocol, service.Port)                         // 新增新的代理sock, err := newProxySocket(service.Protocol, proxier.address, service.Port)  // 根据协议来新增代理信息if err != nil {glog.Errorf("Failed to get a socket for %s: %+v", service.ID, err)continue}proxier.setServiceInfo(service.ID, &serviceInfo{port:     service.Port,protocol: service.Protocol,active:   true,socket:   sock,timeout:  udpIdleTimeout,})                 // 设置当前的代理proxier.startAccepting(service.ID, sock)  // 设置服务开始接受流量}proxier.mu.Lock()defer proxier.mu.Unlock()for name, info := range proxier.serviceMap {       // 检查当前service是否存活如果不存活则停止if !activeServices.Has(name) {err := proxier.stopProxyInternal(name, info)if err != nil {glog.Errorf("error stopping %s: %v", name, err)}}}
}

再新生成的对象中利用newProxySocket来生成对应协议的代理,当前的代理方式分为tcp和udp两种。

func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, error) {host := ip.String()switch strings.ToUpper(string(protocol)) {case "TCP":listener, err := net.Listen("tcp", net.JoinHostPort(host, strconv.Itoa(port)))  // tcp代理if err != nil {return nil, err}return &tcpProxySocket{listener}, nilcase "UDP":addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(host, strconv.Itoa(port))) // udp代理if err != nil {return nil, err}conn, err := net.ListenUDP("udp", addr)if err != nil {return nil, err}return &udpProxySocket{conn}, nil }return nil, fmt.Errorf("Unknown protocol %q", protocol)
}

以udp为例,介绍一下流程:

// udpProxySocket implements proxySocket.  Close() is implemented by net.UDPConn.  When Close() is called,
// no new connections are allowed and existing connections are broken.
// TODO: We could lame-duck this ourselves, if it becomes important.
type udpProxySocket struct {*net.UDPConn
}func (udp *udpProxySocket) Addr() net.Addr {return udp.LocalAddr()
}// Holds all the known UDP clients that have not timed out.
type clientCache struct {mu      sync.Mutexclients map[string]net.Conn // addr string -> connection
}func newClientCache() *clientCache {return &clientCache{clients: map[string]net.Conn{}}
}func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {info, found := proxier.getServiceInfo(service)   // 首先获取service信息if !found {glog.Errorf("Failed to find service: %s", service)return}activeClients := newClientCache()                             // 查看缓存的客户端var buffer [4096]byte // 4KiB should be enough for most whole-packets  设置接收缓冲区大小for {if !info.isActive() {break}// Block until data arrives.// TODO: Accumulate a histogram of n or something, to fine tune the buffer size.n, cliAddr, err := udp.ReadFrom(buffer[0:])   // 读取缓冲区数据if err != nil {if e, ok := err.(net.Error); ok {  // 如果错误则关闭if e.Temporary() {glog.V(1).Infof("ReadFrom had a temporary failure: %v", err)continue}}glog.Errorf("ReadFrom failed, exiting ProxyLoop: %v", err)break}// If this is a client we know already, reuse the connection and goroutine.svrConn, err := udp.getBackendConn(activeClients, cliAddr, proxier, service, info.timeout)   // 获取反向代理的客户端信息if err != nil {continue}// TODO: It would be nice to let the goroutine handle this write, but we don't// really want to copy the buffer.  We could do a pool of buffers or something._, err = svrConn.Write(buffer[0:n])  将读取的数据反向写入到后端代理的客户端if err != nil {if !logTimeout(err) {glog.Errorf("Write failed: %v", err)// TODO: Maybe tear down the goroutine for this client/server pair?}continue}svrConn.SetDeadline(time.Now().Add(info.timeout))  // 设置超时时间if err != nil {glog.Errorf("SetDeadline failed: %v", err)continue}}
}func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service string, timeout time.Duration) (net.Conn, error) {activeClients.mu.Lock()defer activeClients.mu.Unlock()svrConn, found := activeClients.clients[cliAddr.String()]  // 获取缓存的连接if !found {// TODO: This could spin up a new goroutine to make the outbound connection,// and keep accepting inbound traffic.glog.V(2).Infof("New UDP connection from %s", cliAddr)  // 如果没有则创建endpoint, err := proxier.loadBalancer.NextEndpoint(service, cliAddr)  // 根据调度算法来选择连接的后端if err != nil {glog.Errorf("Couldn't find an endpoint for %s %v", service, err)return nil, err}glog.V(4).Infof("Mapped service %s to endpoint %s", service, endpoint)svrConn, err = net.DialTimeout("udp", endpoint, endpointDialTimeout)  // 新建连接if err != nil {// TODO: Try another endpoint?glog.Errorf("Dial failed: %v", err)return nil, err}activeClients.clients[cliAddr.String()] = svrConn   // 保存连接go func(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) {defer util.HandleCrash()udp.proxyClient(cliAddr, svrConn, activeClients, timeout)}(cliAddr, svrConn, activeClients, timeout)  // 通过协程来同步将数据发送给后端连接}return svrConn, nil
}// This function is expected to be called as a goroutine.
func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) {defer svrConn.Close()var buffer [4096]bytefor {n, err := svrConn.Read(buffer[0:])   // 读取数据if err != nil {if !logTimeout(err) {glog.Errorf("Read failed: %v", err)}break}svrConn.SetDeadline(time.Now().Add(timeout))  // 设置超时时间if err != nil {glog.Errorf("SetDeadline failed: %v", err)break}n, err = udp.WriteTo(buffer[0:n], cliAddr)  // 将数据写入if err != nil {if !logTimeout(err) {glog.Errorf("WriteTo failed: %v", err)}break}}activeClients.mu.Lock()delete(activeClients.clients, cliAddr.String())  // 退出后删除该连接信息activeClients.mu.Unlock()
}

到此为止,基本上一整个的流量转发的流程就基本结束,这就是用户态流量转发的思路。

总结

k8s早期版本0.4只实现了用户态的流量转发,并支持了两种协议tcp和udp流量转发,转发的规则是根据在etcd中监控到的信息来将对应的service的信息转发到对应的pod上去,从而完成整个流量在k8s集群中的流转。由于本人才疏学浅,如有错误请批评指正。

k8s概念入门之kube-proxy-针对早期(0.4)版本阅读相关推荐

  1. k8s概念入门之kube-proxy-针对1.1版本阅读

    背景 在后续阅读k8s0.4版本的过程中,发现文档上描述的确实是一个不完整的版本,故切换版本到1.1,因为在1.1文档中已经标明了可以在生成环境中使用,故重新再学习一下有关kube-proxy的内容, ...

  2. k8s概念入门之apiserver-针对1.1.版本阅读

    apiserver k8s中最重要的一个通信节点就是apiserver,是一个中心节点连接着每一环,是kubelet,kube-proxy和control-manager的交互的中心点,提供基于API ...

  3. k8s概念入门之control-manager-针对1.1.版本阅读

    control-manager 资源控制器主要是为了控制各种资源的变更信息,例如pod的创建新增,副本控制器和账户控制器等信息,资源控制器的主要职责就是通过list-watch机制,从APIServe ...

  4. k8s概念入门之kubelet-针对1.1.版本阅读

    kubelet kubelet是在每个节点上运行的主要"节点代理".它可以使用以下之一向apiserver注册该节点:主机名:用于覆盖主机名的标志:或云提供商的特定逻辑. kube ...

  5. k8s升级,HA集群1.12.0~HA集群1.13.2

    k8s升级,此次升级是1.12.0 至1.13.2 准备 # 首先升级master节点的基础组件kubeadm.kubelet.kubectl apt policy kubeadm 找到相应的版本,如 ...

  6. k8s组件说明:kubelet 和 kube proxy

    k8s的node节点需要安装三个组件:docker/kubelet/kube proxy pod是存储容器的容器,但容器不止docker一种. CRI:container runtime interf ...

  7. K8S 快速入门(一)虚拟化、容器化构建云计算平台的基本概念及原理解析

    本章主题 1.认识kubernetes (k8s) 在企业中应用场景? ----- 为什么要学习K8s?? 2.云技术(云计算平台) - 虚拟化及虚拟化基本概念及原理 3.云技术(云计算平台) - 容 ...

  8. Kubernetes(k8s)从入门到精通

    Kubernetes Kubernetes介绍 1.1 应用部署方式演变 在部署应用程序的方式上,主要经历了三个时代: 传统部署:互联网早期,会直接将应用程序部署在物理机上 优点:简单,不需要其它技术 ...

  9. Kubernetes ~ k8s 从入门到入坑。

    Kubernetes ~ k8s 从入门到入坑. 文章目录 Kubernetes ~ k8s 从入门到入坑. 1. Kubernetes 介绍. 1.1 应用部署方式演变. 1.2 kubernete ...

最新文章

  1. Dubbo 负载均衡的实现
  2. 生成有控制台的WIN32程序
  3. 使用指针数组实现这两个矩阵的相乘
  4. poj-2828 Buy Tickets ***
  5. js实现向上滚动效果
  6. Educational Codeforces Round 32 G. Xor-MST 01tire + 分治 + Boruvka
  7. 现代软件工程 学生阅读和调查作业
  8. Mysql中Innodb大量插入数据时SQL语句的优化
  9. Comet 反Ajax: 基于jQuery与PHP实现Ajax长轮询(LongPoll)
  10. Sharepoint 2010 对话框框架
  11. 最全最新cpu显卡天梯图_电脑显卡天梯图2019排行榜——2019显卡CPU天梯图排行榜...
  12. R语言查找指定值的位置(行列)1
  13. dell服务器设置bios设置u盘启动不了系统,戴尔台式机bios设置图解教程|dell bios设置u盘启动...
  14. 如何使用Burp suite抓取Fiddler转发的流量包
  15. 差分放大器低通滤波器设计
  16. 18000担粮草和新四军情报
  17. python爬虫实战-网易BUFF CSGO饰品
  18. PHP 通过单号查询快递( 申通、EMS、顺丰、圆通、中通、韵达、天天、汇通、全峰、德邦、宅急送)
  19. usercity 小程序_微信小程序API 用户信息
  20. RSA加密算法加密与解密过程解析

热门文章

  1. 人脸识别模型的动手实践!
  2. 摆脱 FM!这些推荐系统模型真香
  3. 邢波出任全球第一所AI大学校长,履历横跨三门学科
  4. Python爬取B站5000条视频,揭秘为何千万人为它流泪
  5. 自动驾驶激荡风云录:来自圈内人的冷眼解读
  6. Spring Boot 中实现跨域的 5 种方式,你一定要知道!
  7. 那些jdk中坑你没商量的方法
  8. SpringBoot项目优化和Jvm调优(亲测,真实有效)
  9. IntelliJ IDEA 2019从入门到癫狂 图文教程!
  10. GitHub万星资源:强化学习算法实现,教程代码样样全,还有详细学习规划