理解sidecar的功能特性

在前文中分享了有关ServiceMesh的大概的一个演进过程,该过程都是大家对于服务管理与规划的解决方案。本文主要是通过简单的代码的编写来更加体会sidecar相关的功能与优缺点。

sidecar的功能点

sidecar实现的过程中,主要包含了sidecar与sidecar的通信,sidecar对应用流量的管控,对于不同的应用的限流、熔断等策略的实现。主要架构图如下;

大致功能流程如上所示,将以往在应用程序A或B中实现的服务发现、应用限流等功能都集成在了sidecar中,后续的升级管理都可以通过完成sidecar相关的管理就可以集中管理并平滑升级,并且当前所有的sidecar的管理都被接入到了管理功能中从而完成对所有的sidecar的管理与配置功能。

sidecar的示例代码

sidecar代码

sidecar示例代码

import random
import time
import sys
from uuid import uuid4from flask import Flask, jsonify, request
import etcd3
from threading import Lock, Thread
from etcd3.events import PutEvent, DeleteEvent
import requestsclass ServiceDisover(object):def __init__(self, host="127.0.0.1", port=2379, prefix="service"):self.host = hostself.port = portself.prefix = prefixself.sidecar_prefix = "/sidecar"self.etcd_client = etcd3.client(self.host, self.port)self.connects = {}self.sidecar_connects = {}self.lock = Lock()self.start_watch()self.start_watch_sidecar()def start_watch(self):t = Thread(target=self.watch_service)t.start()def start_watch_sidecar(self):t = Thread(target=self.watch_sidecar, args=(self.sidecar_prefix, ))t.start()def register_sidecar(self, sidecar_name, connect_msg):if not sidecar_name or not connect_msg:raise Exception(" args error")path = self.sidecar_prefix + "/" + sidecar_name + "/" + connect_msgself.etcd_client.put(path, "value")def unregister_sidecar(self, sidecar_name, service_name, connect_msg):if not service_name or not connect_msg:raise Exception(" args error")path = self.sidecar_prefix + "/" + sidecar_name + "/" + connect_msgself.etcd_client.delete_prefix(path)def register(self, sidecar_name, service_name, connect_msg):if not service_name or not connect_msg:raise Exception(" args error")path = self.prefix + "/" + sidecar_name + "/" + service_name + "/" + connect_msgself.etcd_client.put(path, "value")def unregister(self, sidecar_name, service_name, connect_msg):if not service_name or not connect_msg:raise Exception(" args error")path = self.prefix + "/" + sidecar_name + "/" + service_name + "/" + connect_msgself.etcd_client.delete_prefix(path)def get_services(self, service_name, selector):with self.lock:if service_name not in self.connects:returnreturn selector.choose(self.connects[service_name])def decode_service_key(self, event):print("event key  ", event.key, event.value)service = event.key.decode("utf-8")print("event key  ", service)services = service.split("/")if len(services) != 5:print("error  event  {0}".format(services))returnreturn services[2], services[3], services[4]def watch_service(self, prefix=None):if prefix is None:# prefix = self.prefix + "/" + SidecarNameprefix = self.prefixprint("start watch ", prefix)values = self.etcd_client.get_prefix(prefix)for v in values:key = v[1].key.decode("utf-8")res = key.split("/")if len(res) != 5:continuesidecar_name = res[2]service_name = res[3]service_connect_msg = res[4]if service_name not in self.connects:self.connects[service_name] = [[sidecar_name, service_connect_msg]]else:if [sidecar_name, service_connect_msg] not in self.connects[service_name]:self.connects[service_name].append([sidecar_name, service_connect_msg])print(" service  msg  ", self.connects)vents_iterator, cancel = self.etcd_client.watch_prefix(prefix)  # 监听etcd中aaa键 是否发生改变,for event in vents_iterator:if isinstance(event, PutEvent):res = self.decode_service_key(event)print("put event  ", res)if res is None:continuewith self.lock:sidecar_name = res[0]service_name = res[1]service_connect_msg = res[2]if service_name not in self.connects:self.connects[service_name] = [[sidecar_name, service_connect_msg]]else:if [sidecar_name, service_connect_msg] not in self.connects[service_name]:self.connects[service_name].append([sidecar_name, service_connect_msg])elif isinstance(event, DeleteEvent):res = self.decode_service_key(event)print("delete event  ", res)if res is None:continuewith self.lock:sidecar_name = res[0]service_name = res[1]service_connect_msg = res[2]if service_name not in self.connects:print("not found service_name : {0} in connects ".format(service_name))continueif [sidecar_name, service_connect_msg] in self.connects[service_name]:self.connects[service_name].remove([sidecar_name, service_connect_msg])else:print('not exists event  {0}'.format(event))print("finial  ", self.connects)def decode_sidecar_key(self, event):print("sidecar event key  ", event.key, event.value)service = event.key.decode("utf-8")print("sidecar event key  ", service)services = service.split("/")if len(services) != 4:print("error  event  {0}".format(services))returnreturn services[2], services[3]def watch_sidecar(self, prefix=None):if prefix is None:prefix = self.sidecar_prefixprint("watch sidecar  ", prefix)values = self.etcd_client.get_prefix(prefix)for v in values:key = v[1].key.decode("utf-8")res = key.split("/")if len(res) != 4:continueself.sidecar_connects[res[2]] = res[3]print(" sidecar  msg  ", self.sidecar_connects)vents_iterator, cancel = self.etcd_client.watch_prefix(prefix)  # 监听etcd中aaa键 是否发生改变,for event in vents_iterator:if isinstance(event, PutEvent):res = self.decode_sidecar_key(event)print("put event  ", res)if res is None:continuewith self.lock:sidecar_name = res[0]sidecar_connect_msg = res[1]self.sidecar_connects[sidecar_name] = sidecar_connect_msgelif isinstance(event, DeleteEvent):res = self.decode_sidecar_key(event)print("delete event  ", res)if res is None:continuewith self.lock:sidecar_name = res[0]sidecar_connect_msg = res[1]if sidecar_name not in self.sidecar_connects:print("not found sidecar_name : {0} in connects ".format(sidecar_name))continueself.sidecar_connects.pop(sidecar_name)else:print('not exists sidecar event  {0}'.format(event))print("finial  ", self.sidecar_connects)class RandomLoadBalance(object):"""负载均衡 简单的随机选择"""def choose(self, values):if len(values) == 0:returnreturn random.choice(values)class RateBucketLimit(object):"""熔断限流"""def __init__(self, limit_time=1, rate=1):self.historys = []# 安装秒级限流,如一秒钟一个令牌self.rate = rate# 限制时间 即多长的时间内产生总共多个的bucket 秒self.limit_time = limit_time# 桶总共多大self.bucket = int(self.limit_time*self.rate)self.lock = Lock()def allow(self, n=1):# 获取毫秒时间戳millis = int(round(time.time() * 1000))with self.lock:if len(self.historys) == 0:if self.bucket >= n:for i in range(n):self.historys.append(millis)return Truereturn Falsefor i, v in enumerate(self.historys):if millis - v <= self.limit_time*1000:breakcan_allow_n = int(int((millis - v)/1000)*self.rate)if can_allow_n >= n:for i in range(n):self.historys.append(millis)if len(self.historys) > self.bucket:self.historys = self.historys[-self.bucket:]return Truereturn Falseclass Service(object):def __init__(self, sidecar_name, connect_msg, host="127.0.0.1", port=2379, prefix="/service"):self.discover = ServiceDisover(host, port, prefix)self.limit = RateBucketLimit(limit_time=5)self.selector = RandomLoadBalance()self.sidecar_name = sidecar_nameself.connect_msg = connect_msgself.retry = 5def get_service(self, service_name, uri):if self.limit.allow():select_msg = self.discover.get_services(service_name, self.selector)if select_msg is None or len(select_msg) != 2:print("not found select_msg or len error", select_msg)returnsidecar_name = select_msg[0]host = select_msg[1]if sidecar_name != self.sidecar_name:if sidecar_name not in self.discover.sidecar_connects:print("not found sidecar_name : {0}  connect {1}".format(sidecar_name, self.discover.sidecar_connects))returnhost = self.discover.sidecar_connects[sidecar_name]uri = "/service?uri={0}&service_name={1}".format(uri, service_name)retry_count = 0while retry_count < self.retry:try:resp = requests.get("http://" + host + "/" + uri)if resp.status_code == 200:return resp.json()print(" response status_code error ", resp.status_code)retry_count += 1except Exception as e:print("requests error  ", e)retry_count += 1else:print(" api limit  ")if len(sys.argv) > 1:SidecarName = sys.argv[1]SideCarConnect = sys.argv[2]
else:SidecarName = str(uuid4())SideCarConnect = "127.0.0.1:3000"print("sidecarname  ", SidecarName, SideCarConnect)
service = Service(SidecarName, SideCarConnect)
service.discover.register_sidecar(SidecarName, SideCarConnect)
print("over")
app = Flask(__name__)details = SideCarConnect.split(":")
host = details[0]
port = details[1]@app.route('/service')
def service_api():uri = request.args.get("uri")service_name = request.args.get("service_name")res = service.get_service(service_name, uri)if res is None:res = {}return jsonify(res)@app.route("/register")
def register():service_name = request.args.get("service_name")connect_msg = request.args.get("connect_msg")service.discover.register(SidecarName, service_name, connect_msg)return jsonify({"detail": "ok"})if __name__ == '__main__':app.run(host=host, port=port)

sidecar的主要工作就是先注册监听所有的sidecar对应的地址列表,这样就可以知道不同的sidecar的地址在哪里,然后sidecar再监听自己对应的服务列表,这样就可以指定自己sidecar对应的服务的列表,这样当一个sidecar接受到一个请求的过程中就知道将该服务发送到哪个sidecar中去,进而等到返回数据。

应用程序A
import sys
import requestsfrom flask import Flask, jsonifyapp = Flask(__name__)SideCarName = "sidecarA"
SideCarConnect = "127.0.0.1:3000"
ServiceName = "serviceA"if len(sys.argv) > 1:connect_msg = sys.argv[1]
else:connect_msg = "127.0.0.1:5000"details = connect_msg.split(":")
host = details[0]
port = details[1]def register():url = "http://" + SideCarConnect + "/register?service_name={0}&connect_msg={1}".format(ServiceName, connect_msg)resp = requests.get(url)print("register  ", resp.status_code, resp.content)register()def get_sidecar(service_name, uri):url = "http://" + SideCarConnect + "/service?service_name={0}&uri={1}".format(service_name, uri)resp = requests.get(url)if resp is None:return {service_name: "get sidecar error"}if resp.status_code == 200:return {service_name: resp.json()}return {service_name: "get sidecar badrequest"}def get_service_b():res = {}print(" call service_b ")result = get_sidecar("serviceB", "service_b")if result:res.update(result)return res@app.route('/service_a')
def service_a():res = {"service_a": "servic_a_msg"}res.update(get_service_b())return jsonify(res)if __name__ == '__main__':app.run(host, port)

部署在了sidecarA处的服务,并且其中有个api调用了service_b的接口服务,该接口就通过sidecarA来调用sidecarB,然后sidecarB再调用service_b的服务来提供服务。

应用程序B
import sys
import requests
from flask import Flask, jsonifySideCarName = "sidecarB"
SideCarConnect = "127.0.0.1:3001"
ServiceName = "serviceB"app = Flask(__name__)if len(sys.argv) > 1:connect_msg = sys.argv[1]
else:connect_msg = "127.0.0.1:5001"details = connect_msg.split(":")
host = details[0]
port = details[1]def register():url = "http://" + SideCarConnect + "/register?service_name={0}&connect_msg={1}".format(ServiceName, connect_msg)resp = requests.get(url)print("register  ", resp.status_code, resp.content)register()@app.route('/service_b')
def service_b():res = {"service_b": "service_b_msg"}return jsonify(res)if __name__ == '__main__':app.run(host, port)

部署位于sidecarB处的应用程序B,该服务对外提供服务,并且在运行的时候就注册到sidecar中的信息,然后通过各个sidecar来提供服务。

运行示例

首先运行sidecarA的示例代码;

(venv) 192:servicemesh_t wuzi$ python sidecar.py sidecarA 127.0.0.1:3000
sidecarname   sidecarA 127.0.0.1:3000
start watch  /service
watch sidecar   /sidecarsidecar  msg   {'sidecarA': '127.0.0.1:3000', 'sidecarB': '127.0.0.1:3001'}
overservice  msg   {'testService': [['sidecar1', '127.0.0.0:1111']], 'serviceA': [['sidecarA', '127.0.0.1:5000']], 'serviceB': [['sidecarB', '127.0.0.1:5001'], ['sidecarB', '127.0.0.1:5002']]}* Serving Flask app "sidecar" (lazy loading)* Environment: productionWARNING: This is a development server. Do not use it in a production deployment.Use a production WSGI server instead.* Debug mode: off* Running on http://127.0.0.1:3000/ (Press CTRL+C to quit)

打开另外一个终端如下运行

(venv) 192:servicemesh_t wuzi$ python sidecar.py sidecarB 127.0.0.1:3001
sidecarname   sidecarB 127.0.0.1:3001
start watch  /service
watch sidecar   /sidecarsidecar  msg   {'sidecarA': '127.0.0.1:3000', 'sidecarB': '127.0.0.1:3001'}service  msg   {'testService': [['sidecar1', '127.0.0.0:1111']], 'serviceA': [['sidecarA', '127.0.0.1:5000']], 'serviceB': [['sidecarB', '127.0.0.1:5001'], ['sidecarB', '127.0.0.1:5002']]}
over* Serving Flask app "sidecar" (lazy loading)* Environment: productionWARNING: This is a development server. Do not use it in a production deployment.Use a production WSGI server instead.* Debug mode: off* Running on http://127.0.0.1:3001/ (Press CTRL+C to quit)

此时打开另外一个终端运行应用程序A

(venv) 192:servicemesh_t wuzi$ python service_a_sidecar.py
register   200 b'{"detail":"ok"}\n'* Serving Flask app "service_a_sidecar" (lazy loading)* Environment: productionWARNING: This is a development server. Do not use it in a production deployment.Use a production WSGI server instead.* Debug mode: off* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)

此时打开另外一个终端运行应用程序B1

(venv) 192:servicemesh_t wuzi$ python service_b_sidecar.py
register   200 b'{"detail":"ok"}\n'* Serving Flask app "service_b_sidecar" (lazy loading)* Environment: productionWARNING: This is a development server. Do not use it in a production deployment.Use a production WSGI server instead.* Debug mode: off* Running on http://127.0.0.1:5001/ (Press CTRL+C to quit)

此时打开另外一个终端运行应用程序B2

(venv) 192:servicemesh_t wuzi$ python service_b_sidecar.py 127.0.0.1:5002
register   200 b'{"detail":"ok"}\n'* Serving Flask app "service_b_sidecar" (lazy loading)* Environment: productionWARNING: This is a development server. Do not use it in a production deployment.Use a production WSGI server instead.* Debug mode: off* Running on http://127.0.0.1:5002/ (Press CTRL+C to quit)

此时再通过浏览器访问应用程序A,http://127.0.0.1:5000/service_a此时返回的就是如下

{
"serviceB": {
"service_b": "service_b_msg"
},
"service_a": "servic_a_msg"
}

此时查看终端运行程序就可知,首先访问到了sidecarA然后再sidecarA访问了sidecarB,sidecarB通过随机算法访问了应用程序B1或者应用程序B2,至此基础的逻辑运行完成。

总结

本文只是简单的通过功能实现的角度,实现了一个最简单的有关ServiceMesh架构实现的一个流程,最要是通过功能性的实现来理解整个ServiceMesh的实现的流程。由于本人才疏学浅,如有错误请批评指正。

ServiceMesh有关sidecar理解相关推荐

  1. 云原生和ServiceMesh主要组件--理解K8s/Istio/Envoy

    1. K8s vs Istio 1.1 k8s简介 2014 年,Google 开源了 Kubernetes,随后几年得到迅猛发展,在 2017 年奠定了容器编排调度标准的地位. Kubernetes ...

  2. ServiceMesh、SideCar和Istio

    Service Mesh简介 Service Mesh直译过来就是服务网格,而他的架构就是一个个微服务组成的网络. Sidecar简介 Service Mesh中的节点就是Sidecar节点. sid ...

  3. ServiceMesh和SideCar

    转载自 http://www.importnew.com/28798.html 有部分改动 Service Mesh 直译过来是 服务网格,目的是解决系统架构微服务化后的服务间通信和治理问题.服务网格 ...

  4. 从一到万的运维之路,说一说VM/Docker/Kubernetes/ServiceMesh

    摘要:本文从单机真机运营的历史讲起,逐步介绍虚拟化.容器化.Docker.Kubernetes.ServiceMesh的发展历程.并重点介绍了容器化阶段之后,各项重点技术的安装.使用.运维知识.可以说 ...

  5. 古典、SOA、传统、K8S、ServiceMesh

    古典.SOA.传统.K8S.ServiceMesh 原文:古典.SOA.传统.K8S.ServiceMesh 古典.SOA.传统.K8S.ServiceMesh 十几年前就有一些公司开始践行服务拆分以 ...

  6. Service Mesh 初体验

    前言 计算机软件技术发展到现在,软件架构的演进无不朝着让开发者能够更加轻松快捷地构建大型复杂应用的方向发展.容器技术最初是为了解决运行环境的不一致问题而产生的,随着不断地发展,围绕容器技术衍生出来越来 ...

  7. 避免从单体到分布式单体

    回顾:从单体到微服务到Function 在过去几年间,微服务架构成为业界主流,很多公司开始采用微服务,并迁移原有的单体应用迁移到微服务架构.从架构上,微服务和单体最大的变化在于微服务架构下应用的粒度被 ...

  8. Service Mesh服务网格架构

    Service Mesh服务网格架构 01 架构的发展历史 服务网格 云原生 02 Istio基本介绍 2.1 什么是Istio 2.2 Istio特征 2.2.1 连接 2.2.2 安全 2.2.3 ...

  9. K8s Ingress Provider 为什么选择 MSE 云原生网关?

    作者:如葑 K8s Ingress 简介 K8s 集群内的网络与外部是隔离的,即在 K8s 集群外部无法直接访问集群内部的服务,如何让将 K8s 集群内部的服务提供给外部用户呢?K8s 社区有三种方案 ...

最新文章

  1. 栈C/C++实现(数据结构严蔚敏版)
  2. Mybatis源码解析之Mybatis初始化过程
  3. 运用Edraw为WPF应用程序嵌入Office文档的方法总结
  4. HihoCoder - 1873 Frog and Portal(构造+进制拆分)
  5. CSS3选择非第一个子元素
  6. 初学者入门知识图谱必看的能力:推理
  7. JAVA模板模式,简历模板(例子)
  8. c#图解教程和c#高级编程电子书链接
  9. 简述对CAN协议栈的理解
  10. python操作word文档,合并
  11. 手机app推广渠道的安装来源追踪与归因
  12. 批量webp格式转换成jpg操作方法
  13. 【jzoj2220】【二分】愤怒的奶牛2(angry)
  14. Java 细品 重写equals方法 和 hashcode 方法
  15. robot_marm catkin_make报错
  16. Dubbo 配置http协议
  17. 准备编译环境,安装gcc,工具make
  18. Gitlab Failed to squash
  19. UNCTF2019新星赛长安十二时辰write up
  20. 搭建简单的struts2框架

热门文章

  1. 英特尔2022年投资者大会:公布技术路线图及重要节点
  2. 3000 字推荐一个可视化神器,50 行 Python 代码制作数据大屏
  3. 认知推理下的常识知识库资源、常识测试评估与中文实践项目索引
  4. 针对《评人工智能如何走向新阶段》一文,继续发布国内外的跟贴留言457-465条如下:
  5. 基于深度学习和传统算法的人体姿态估计,技术细节都讲清楚了
  6. 22张精炼图笔记,深度学习专项学习必备
  7. 从词袋到Transfomer,NLP十年突破史
  8. 手把手教你使用Flask轻松部署机器学习模型(附代码链接) | CSDN博文精选
  9. 国际顶级学界业界大咖云集,9 场技术论坛布道,2019 嵌入式智能国际大会强势来袭!...
  10. 哈工大、清华、CSDN、嵌入式视觉联盟合办的 AIoT 盛会,你怎么舍得错过?!