关键字:python sanic 微服务 异步 zipkin sanic-plugin 插件 Sanic-Plugins-Framewor Pypi发布

所需环境:python3.7, Docker, Linux or WSL

image.png

Sanic插件(Plugin/Extension) - sanic-zipkin已经ready,你可以轻松用pip安装啦:

喜欢的话,github点个赞吧:https://github.com/kevinqqnj/sanic-zipkin

pip install sanic-zipkin

# app.py

from sanic_zipkin import SanicZipkin, logger, sz_rpc

sz = SanicZipkin(app)

上一篇已经学会了如何在Sanic app里引入aiozipkin,来做分布式系统追踪。本篇,来讨论下,如何创建一个完整的Sanic插件,以方便自己或者分享给他人。

先来看看插件是怎么用的:

功能

adding "Request span" by default

if Request is from another micro-service endpoint, span will be attached (Inject/Extract) to that endpoint

use "logger" decorator to create span for "methods" calls

use "sz_rpc" method to create sub-span for RPC calls, attaching to parent span

run examples/servic_a.py and examples/service_b.py

use Docker to run zipkin or jaeger:

docker run -d -p9411:9411 openzipkin/zipkin:latest

or

docker run -d -e COLLECTOR_ZIPKIN_HTTP_PORT=9411 -p5775:5775/udp -p6831:6831/udp -p6832:6832/udp -p5778:5778 -p16686:16686 -p14268:14268 -p9411:9411 jaegertracing/all-in-one

access the endpoint:

最简应用:

curl localhost:8000/ to see plugin's basic usage

from sanic_zipkin import SanicZipkin, logger, sz_rpc

app = Sanic(__name__)

# initilize plugin, default parameters:

# zipkin_address = 'http://127.0.0.1:9411/api/v2/spans'

# service = __name__

# host = '127.0.0.1'

# port = 8000

sz = SanicZipkin(app, service='service-a')

@app.route("/")

async def index(request):

return response.json({"hello": "from index"})

This "/" endpoint will add trace span to zipkin automatically

使用装饰器装饰方法,以及链式trace

curl localhost:8000/2 to see how to decorate methods and chain-calls

@logger()

async def db_access(context, data):

await asyncio.sleep(0.1)

print(f'db_access done. data: {data}')

return

@sz.route("/2")

async def method_call(request, context):

await db_access(context, 'this is method_call data')

return response.json({"hello": 'method_call'})

Use "@logger" decorator to generate span for methods.

Note: in this case, you need to use "@sz.route" decorator, and pass contextparameter to method calls.

微服务之间通过PRC访问:

curl localhost:8000/3 to see how RPC calls working, both GET/POST is supported

@logger()

async def decorate_demo(context, data):

await db_access(context, data)

data = {'payload': 'rpc call data of decorate_demo'}

rsp = await sz_rpc(context, backend_service2, data, method='GET')

print(rsp.status, await rsp.text())

return

@sz.route("/3")

async def rpc_call(request, context):

await decorate_demo(context, 'this is index4 data')

data = {'payload': 'rpc call data of rpc_call'}

rsp = await sz_rpc(context, backend_service1, data) # default method='POST'

print(rsp.status, await rsp.text())

return response.json({"hello": 'rpc_call'})

method sz_rpc just wrapper span injection to RPC POST/GET calls. In peer server, span-context will be automatically extracted and generate a chain-view in zipkin.

在Zipkin/Jaeger UI里查看Trace:

image.png

Sanic插件开发过程

使用Sanic-Plugins-Framework开发,省时省力,而且充分利用Sanic异步框架的威力。

1. 插件初始化

用户可自定义的初始化变量

继承Sanic-Plugins-Framework(SPF) 的Contextualize类型,在on_before_registered方法引用时,加载用户自定义的初始变量。

目前支持:

zipkin server地址

微服务名称service

微服务IP, port

from spf.plugins.contextualize import Contextualize

class SanicZipkin(Contextualize):

def __init__(self, *args, **kwargs):

super(SanicZipkin, self).__init__(*args, **kwargs)

self.zipkin_address = None

self.service = None

self.host = None

self.port = None

def on_before_registered(self, context, *args, **kwargs):

self.zipkin_address = kwargs.get('zipkin_address', 'http://127.0.0.1:9411/api/v2/spans')

self.service = kwargs.get('service', __name__)

self.host = kwargs.get('host', '127.0.0.1')

self.port = kwargs.get('port', 8000)

_logger.info(f'SanicZipkin: before registered: service={self.service}')

创建aiozipkin服务

实例化sanic_zipkin,然后调用Sanic 'before_server_start'方法,初始化context.tracer、context.aio_session。

context是SPF全局可以访问的上下文变量,可以存储任何你想要共享的数据。

sanic_zipkin = instance = SanicZipkin()

@sanic_zipkin.listener('before_server_start')

async def setup_zipkin(app, loop, context):

endpoint = az.create_endpoint(sanic_zipkin.service, ipv4=sanic_zipkin.host,

port=sanic_zipkin.port)

context.tracer = await az.create(sanic_zipkin.zipkin_address, endpoint,

sample_rate=1.0)

trace_config = az.make_trace_config(context.tracer)

context.aio_session = aiohttp.ClientSession(trace_configs=[trace_config])

context.span = []

context.zipkin_headers = []

这里context.span设计成数组,模拟堆栈FILO(先进后出),是因为考虑到链式调用时,tracer需要以parent span为基础,创建child span。同理Inject/Extract用到的context.zipkin_headers也设成数组。

2. 创建middleware,给Request GET/POST自动添加span

Requst: 先用middleware装饰器来监听request消息,然后调用自定义方法request_span(request, context)来创建span。

@sanic_zipkin.middleware(priority=2, with_context=True)

def mw1(request, context):

context.log(DEBUG, f'mw-request: add span and headers before request')

span = request_span(request, context)

context.span.append(span)

context.zipkin_headers.append(span.context.make_headers())

这里,要考虑到,当前span是new span,还是child span。

span.append()压入堆栈。

自定义方法request_span(),通过读取request里是否有zipkin_headers信息,来判断当前是其它微服务的RPC call,还是用户发起的http访问。

def request_span(request, context):

context.log(DEBUG, f'REQUEST json: {request.json}, args: {request.args}')

headers = request.parsed_json.get('zipkin_headers', None) if request.json else \

request.args.get('zipkin_headers', None)

Response: 在一次http访问结束,返回response时,此次访问的context.span和context.zipkin_headers弹出堆栈,以免污染到其它http访问的trace:span.pop()

@sanic_zipkin.middleware(priority=8, attach_to='response', relative='post',

with_context=True)

def mw2(request, response, context):

context.span.pop()

context.zipkin_headers.pop()

context.log(DEBUG, 'mw-response: clear span/zipkin_headers after Response')

3. 创建zipkin_headers,用于RPC Inject/Extract

如果当前堆栈里有zipkin_headers,则方法request_span()创建上下文:

def request_span(request, context):

context.log(DEBUG, f'REQUEST json: {request.json}, args: {request.args}')

headers = request.parsed_json.get('zipkin_headers', None) if request.json else \

request.args.get('zipkin_headers', None)

if headers:

span_context = az.make_context(headers)

with context.tracer.new_child(span_context) as span:

...

如果无,则创建新的span:

with context.tracer.new_trace() as span:

span.name(f'{request.method} {request.path}')

...

4. methods函数,添加@logger装饰器

对于非http request的方法函数,因为没有middleware可以监听,则需要创建新的装饰器了。

使用@logger时,默认context作为第一个参数:

def logger(type=None, category=None, detail=None, description=None,

tracing=True, level=logging.INFO, *args, **kwargs):

def decorator(fn=None):

@functools.wraps(fn)

async def _decorator(*args, **kwargs):

# print('_decorator args: ', args, kwargs)

context = args[0] if len(args) > 0 and isinstance(args[0], ContextDict) else None

...

然后创建新的new span 或 child span。

同时,添加(Inject)新的上下文zipkin_headers:

span = gen_span(fn.__name__, context)

context.zipkin_headers.append(span.context.make_headers())

执行装饰器所装饰的函数fn()。

之后,必须清除(弹出最上面)本次装饰器新增的临时span和zipkin_headers。因为当前函数外部的其它引用函数(如果有)所需要的数据,还在堆栈下面。

try:

exce = False

res = await fn(*args, **kwargs)

return res

except Exception as e:

exce = True

raise e

finally:

...

# clean up tmp vars for this wrapper

context.span.pop()

context.zipkin_headers.pop()

5. 创建帮助函数sz_rpc(),简化RPC 访问其它微服务时的Inject操作

这个很简单的helper,负责把zipkin_headers,inject到POST/GET访问的data里,这样,对方收到http request时,就可以顺利的Extract,得到span上下文了。

async def sz_rpc(context, url, data, method='POST'):

data.update({'zipkin_headers': json.dumps(context.zipkin_headers[-1])})

if method.upper() == 'POST':

return await context.aio_session.post(url, json=data)

else:

return await context.aio_session.get(url, params=data)

6. 发布插件到Pypi

插件写好了,下面就是发布了。

修改当前目录的结构,以及添加一些必要的标注文件:

kevinqq@CN-00009841:/c/Users/xxx/git/sanic-zipkin$ tree -L 2

├── CHANGES.txt # 版本信息,必须

├── LICENSE # 必须

├── MANIFEST.in

├── README.md

├── dist # 发布时自动打的包

│ └── sanic-zipkin-0.1.2.tar.gz

├── examples

│ ├── requirements.txt

│ ├── service_a.py

│ └── service_b.py

├── requirements-dev.txt

├── sanic_zipkin # 包的目录

│ ├── __init__.py # 必须。含版本信息和可引用的对象

│ └── sanic_zipkin.py # 主文件

├── sanic_zipkin.egg-info # 发布时自动生成

└── setup.py # 发布用的程序

发布前检查:

python3 setup.py check

发布到Pypi:

python3 setup.py sdist upload

此时,会让你输入Pypi的密码。

如果收到200,则上传成功。

检查是否已经可用:

pip install sanic-zipkin

python sanic加速_python微服务sanic 使用异步zipkin(2) - 一步步创建Sanic插件: sanic-zipin...相关推荐

  1. python 微服务框架_Python微服务框架NameKo 性能体验

    Nameko是Python下的一个微服务框架,小巧简洁,通过RabbitMq消息组件来实现RPC服务 Github:NameKo 一.准备工作 1.RabbitMq 使用docker安装 docker ...

  2. python 微服务框架_Python微服务架构chili_chicken

    ### chili_chicken是什么 现在微服务架构大火,企业项目纷纷向微服务转变.Python目前处于稳步发展的状态,用于多领域,比如人工智能.爬虫.运维.web等,我们此贴只讨论web方向.现 ...

  3. python 微服务架构_Python微服务架构chili_chicken

    ### chili_chicken是什么 现在微服务架构大火,企业项目纷纷向微服务转变.Python目前处于稳步发展的状态,用于多领域,比如人工智能.爬虫.运维.web等,我们此贴只讨论web方向.现 ...

  4. 微服务中的异步消息通讯

    前言 在上一篇文章中,我们说到了异步消息通讯,下面这篇文章呢,大部分内容是翻译来自于这篇微软的文章,所以其内容还是具有一定的理论指导意义的. 当我们跨多个微服务进行内部通讯的时候,异步消息和事件驱动至 ...

  5. 异步服务_微服务全链路异步化实践

    1. 背景 随着公司业务的发展,核心服务流量越来越大,使用到的资源也越来越多.在微服务架构体系中,大部分的业务是基于Java 语言实现的,受限于Java 的线程实现,一个Java 线程映射到一个ker ...

  6. 微服务链路追踪之zipkin搭建

    前言 微服务治理方案中,链路追踪是必修课,SpringCloud的组件其实使用很简单,生产环境中真正令人头疼的往往是软件维护,接口在微服务间的调用究竟哪个环节出现了问题,哪个环节耗时较长,这都是项目上 ...

  7. 「Java分享客栈」随时用随时翻:微服务链路追踪之zipkin搭建

    前言 微服务治理方案中,链路追踪是必修课,SpringCloud的组件其实使用很简单,生产环境中真正令人头疼的往往是软件维护,接口在微服务间的调用究竟哪个环节出现了问题,哪个环节耗时较长,这都是项目上 ...

  8. 二. 应用加速(微服务架构设计的cdn访问加速)

    这里写目录标题 1. CDN的产生及作用 1. 2. CDN(Content Delivery Network),即内容分发网络 2.访问源站的过程(DNS) 2.1DNS的记录类型(以阿里云为例) ...

  9. python音频加速_python将音频进行变速的操作方法

    有的时候需要手里的音频文件变速听,或可能变慢或可能变快 这里使用的python进行操作,我的目标是将文件转成2倍速 首先需要安装插件 pip install ffmpeg 然后再执行: from ff ...

最新文章

  1. LeetCode 202. Happy Number--Python解法
  2. Linux系统资源管理 之 硬件信息
  3. 用JS实现根据当前时间随机生成流水号或者订单号
  4. Qt 判断文件或文件夹是否存在及创建文件夹
  5. animate inater插件_基于animate.css动画库的全屏滚动小插件,适用于vue.js(移动端、pc)项目...
  6. 最简单的视频编码器:基于libx264(编码YUV为H.264)
  7. C#:使用dsoframer.ocx控件实现内嵌office效果(详解)
  8. android 支付接口
  9. uni-app引入阿里图标【单色】
  10. 在线免费服务器,免费web服务器Tomcat
  11. 关于 COPY 导入数据的问题
  12. Insecure Randomness引发对随机数生成器抵挡加密攻击的方法
  13. 想要更高效地使用云计算,推荐学习云计算部署的五大策略
  14. Windows打印机API封装
  15. 7-78 求e的近似值 (15 分)
  16. 用计算机牙模,人类恒牙的计算机三维建模
  17. pip查看包的历史版本
  18. python单位根检验平稳性怎么看是否平稳_Python ADF 单位根检验 如何查看结果的实现...
  19. jmockit 创建异常
  20. ping服务器ip地址ping不通

热门文章

  1. Python字典中setdefault和update用法区别
  2. 让计算机时间和网络时间同步,如何让电脑时间与北京时间同步?
  3. 指令发送没反应_如何判断网络故障的原因?7个指令,教你逐步排查!
  4. ubuntu 交叉编译 armv7_32 ffmpeg x264(已完成)
  5. 视频压缩编码 gop(Group of Pictures)(I帧间隔)的概念、IDR、I帧(关键帧,intra picture)、P帧、B帧、帧内压缩、帧间压缩、pts(显示时间)、dts(解码时间)
  6. C语言 enum和typedef enum的区别
  7. qt无法找到动态链接库文件怎么办?
  8. python PyQt5 QVBoxLayout 垂直布局管理
  9. python 解决conda的environment未被激活解决方案
  10. 文件分类tkinter UI小程序,界面点击,后台自动归档到某个文件夹