1、概述

Mixer是Istio的核心组件,提供了遥测数据收集的功能,能够实时采集服务的请求状态等信息,以达到监控服务状态目的。

1.1 核心功能

•前置检查(Check):某服务接收并响应外部请求前,先通过Envoy向Mixer(Policy组件)发送Check请求,做一些access检查,同时确认adaptor所需cache字段,供之后Report接口使用;

•配额管理(Quota):通过配额管理机制,处理多请求时发生的资源竞争;

•遥测数据上报(Report):该服务请求处理结束后,将请求相关的日志,监控等数据,通过Envoy上报给Mixer(telemetry)

1.2 示例图

2、代码分析

2.1 Report代码分析

本节主要介绍Report的详细流程(基于Istio release1.0.0版本,commit id为3a136c90)。Report是mixer server的一个接口,供Envoy通过grpc调用。首先,我们从mixer server的启动入口main函数看起:

func main() {rootCmd := cmd.GetRootCmd(os.Args[1:], supportedTemplates(), supportedAdapters(), shared.Printf, shared.Fatalf)if err := rootCmd.Execute(); err != nil {os.Exit(-1)}
}

在rootCmd中,mixs通过server命令启动了mixer server,从而触发了runserver函数,在runserver中初始化(New)了一个server,我们一起看下newServer的函数,在这个函数中,与我们本节相关的内容就是Mixer初始化了一个grpc服务器NewGRPCServer。

rootCmd.AddCommand(serverCmd(info, adapters, printf, fatalf))
func serverCmd(info map[string]template.Info, adapters []adapter.InfoFn, printf, fatalf shared.FormatFn) *cobra.Command {sa := server.DefaultArgs()sa.Templates = infosa.Adapters = adaptersserverCmd := &cobra.Command{Use:   "server",Short: "Starts Mixer as a server",Run: func(cmd *cobra.Command, args []string) {runServer(sa, printf, fatalf)},}… …
}
func newServer(a *Args, p *patchTable) (*Server, error) {grpc.EnableTracing = a.EnableGRPCTracings.server = grpc.NewServer(grpcOptions...)mixerpb.RegisterMixerServer(s.server, api.NewGRPCServer(s.dispatcher, s.gp, s.checkCache))
}

在这个grpc的服务端中,定义了一个Report接口,这就是我们这节课主要关注的内容(可以看到Check接口也在此定义,我们下节再讲)

func (s *grpcServer) Report(ctx context.Context, req *mixerpb.ReportRequest) (*mixerpb.ReportResponse, error) {lg.Debugf("Report (Count: %d)", len(req.Attributes))// 校验attribute是否为空,空则直接returnif len(req.Attributes) == 0 {return reportResp, nil}// 若属性word为空,赋为默认值for i := 0; i < len(req.Attributes); i++ {iflen(req.Attributes[i].Words) == 0 {req.Attributes[i].Words = req.DefaultWords}}// 根据第一条attribute,生成proto包,proto包能跟踪一组attributesprotoBag := attribute.NewProtoBag(&req.Attributes[0], s.globalDict, s.globalWordList)// 初始化,开始跟踪attributes各个条目中属性accumBag := attribute.GetMutableBag(protoBag)// 保存accumBag的增量状态reportBag := attribute.GetMutableBag(accumBag)reportSpan, reportCtx := opentracing.StartSpanFromContext(ctx, "Report")reporter := s.dispatcher.GetReporter(reportCtx)var errors *multierror.Errorfor i := 0; i < len(req.Attributes); i++ {span, newctx := opentracing.StartSpanFromContext(reportCtx, fmt.Sprintf("attribute bag %d", i))// 第一个属性已经在创建proto包时创建,在此追踪所有attributesif i > 0 {if err := accumBag.UpdateBagFromProto(&req.Attributes[i], s.globalWordList); err != nil {err = fmt.Errorf("request could not be processed due to invalid attributes: %v", err)span.LogFields(otlog.String("error", err.Error()))span.Finish()errors = multierror.Append(errors, err)break}}lg.Debug("Dispatching Preprocess")// 真正开始分发,预处理阶段if err := s.dispatcher.Preprocess(newctx, accumBag, reportBag); err != nil {err = fmt.Errorf("preprocessing attributes failed: %v", err)span.LogFields(otlog.String("error", err.Error()))span.Finish()errors = multierror.Append(errors, err)continue}lg.Debug("Dispatching to main adapters after running preprocessors")lg.Debuga("Attribute Bag: \n", reportBag)lg.Debugf("Dispatching Report %d out of %d", i+1, len(req.Attributes))// 真正开始分发,将数据逐步加入到缓存中if err := reporter.Report(reportBag); err != nil {span.LogFields(otlog.String("error", err.Error()))span.Finish()errors = multierror.Append(errors, err)continue}span.Finish()// purge the effect of the Preprocess call so that the next time through everything is cleanreportBag.Reset()}reportBag.Done()accumBag.Done()protoBag.Done()// 真正的发送函数,从缓存中取出并发送到adaptorif err := reporter.Flush(); err != nil {errors = multierror.Append(errors, err)}reporter.Done()if errors != nil {reportSpan.LogFields(otlog.String("error", errors.Error()))}reportSpan.Finish()if errors != nil {lg.Errora("Report failed:", errors.Error())return nil, grpc.Errorf(codes.Unknown, errors.Error())}// 过程结束return reportResp, nil
}

通过上述代码解读,我们了解了Report接口的工作流程,但此时我们还并不知道一个请求的状态是如何报给adaptor的,下面我们通过简要的函数串接,把这部分流程串起来:

上述的预处理阶段Preprocess与上报阶段Report,最终都会调用到dispatch函数,仅通过不同的type来区分要做的事情;

func (d *Impl) Preprocess(ctx context.Context, bag attribute.Bag, responseBag *attribute.MutableBag) error {s := d.getSession(ctx, tpb.TEMPLATE_VARIETY_ATTRIBUTE_GENERATOR, bag)s.responseBag = responseBagerr := s.dispatch()if err == nil {err = s.err}… …
}
func (r *reporter) Report(bag attribute.Bag) error {s := r.impl.getSession(r.ctx, tpb.TEMPLATE_VARIETY_REPORT, bag)s.reportStates = r.stateserr := s.dispatch()if err == nil {err = s.err}… …
}

而dispatch函数中做了真正的分发动作,包括:

1.遍历所有adaptor,调用adaptor中的函数,针对不同的adaptor生成不同的instance,并将instance缓存放入reportstates

var instance interface{}
if instance, err = input.Builder(s.bag); err != nil {log.Errorf("error creating instance: destination='%v', error='%v'", destination.FriendlyName, err)s.err = multierror.Append(s.err, err)continue
}
type NamedBuilder struct {InstanceShortName stringBuilder           template.InstanceBuilderFn
}
InstanceBuilderFn func(attrs attribute.Bag) (interface{}, error)
CreateInstanceBuilder: func(instanceName string, param proto.Message, expb *compiled.ExpressionBuilder) (template.InstanceBuilderFn, error)
builder.build(attr)
// For report templates, accumulate instances as much as possible before commencing dispatch.
if s.variety == tpb.TEMPLATE_VARIETY_REPORT {state.instances = append(state.instances, instance)continue
}

2.将instance分发到所有adaptor,最终调用并分发到adaptor的HandleMetric函数中

func (r *reporter) Flush() error {s := r.impl.getSession(r.ctx, tpb.TEMPLATE_VARIETY_REPORT, nil)s.reportStates = r.statess.dispatchBufferedReports()err := s.err… …
}
func (s *session) dispatchBufferedReports() {// Ensure that we can run dispatches to all destinations in parallel.s.ensureParallelism(len(s.reportStates))// dispatch the buffered dispatchStates we've gotfor k, v := range s.reportStates {s.dispatchToHandler(v)delete(s.reportStates, k)}s.waitForDispatched()
}
func (s *session) dispatchToHandler(ds *dispatchState) {s.activeDispatches++ds.session = ss.impl.gp.ScheduleWork(ds.invokeHandler, nil)
}
case tpb.TEMPLATE_VARIETY_REPORT:ds.err = ds.destination.Template.DispatchReport(ctx, ds.destination.Handler, ds.instances)
type TemplateInfo struct {Name             stringVariety          tpb.TemplateVarietyDispatchReport   template.DispatchReportFnDispatchCheck    template.DispatchCheckFnDispatchQuota    template.DispatchQuotaFnDispatchGenAttrs template.DispatchGenerateAttributesFn
}
DispatchReport: func(ctx context.Context, handler adapter.Handler, inst []interface{}) error {// Convert the instances from the generic []interface{}, to their specialized type.instances := make([]*metric.Instance, len(inst))for i, instance := range inst {instances[i] = instance.(*metric.Instance)}// Invoke the handler.if err := handler.(metric.Handler).HandleMetric(ctx, instances); err != nil {return fmt.Errorf("failed to report all values: %v", err)}return nil
}

2.2 相关结构体定义

Report接口请求体定义

// Used to report telemetry after performing one or more actions.
type ReportRequest struct {// 代表一个请求中的属性// 每个attribute代表一个请求动作,多个动作可汇总在一条message中以提高效率//虽然每个“属性”消息在语义上被视为与消息中的其他属性无关的独立独立实体,但此消息格式利用属性消息之间的增量编码,以便大幅减少请求大小并改进端到端 效率。 每组单独的属性用于修改前一组。 这消除了在单个请求中多次冗余地发送相同属性的需要。// 如果客户端上报时不想使用增量编码,可全量的发送所有属性.Attributes []CompressedAttributes `protobuf:"bytes,1,rep,name=attributes" json:"attributes"`// 所有属性的默认消息级字典.// 这使得可以为该请求中的所有属性共享相同的字典,这可以大大减少整体请求大小DefaultWords []string `protobuf:"bytes,2,rep,name=default_words,json=defaultWords" json:"default_words,omitempty"`// 全局字典的词条数,可检测客户端与服务端之间的全局字典是否同步GlobalWordCount uint32 `protobuf:"varint,3,opt,name=global_word_count,json=globalWordCount,proto3" json:"global_word_count,omitempty"`
}

3、总结

Mixer中涉及很多缓存命中等用于优化性能的设计,本文仅介绍了Mixer中Report接口发送到adaptor的过程,一些性能优化设计,如protobag,dispatch缓存等内容,将会在后续文章中解析。

相关服务请访问https://support.huaweicloud.com/cce/index.html?cce_helpcenter_2019

转载于:https://blog.51cto.com/14051317/2353294

idou老师教你学Istio 27:解读Mixer Report流程相关推荐

  1. idou老师教你学Istio 07: 如何用istio实现请求超时管理

    在前面的文章中,大家都已经熟悉了Istio的故障注入和流量迁移.这两个方面的功能都是Istio流量治理的一部分.今天将继续带大家了解Istio的另一项功能,关于请求超时的管理. 首先我们可以通过一个简 ...

  2. idou老师教你学istio:监控能力介绍

    经过了一年多的开发和测试,Istio于北京时间7月31日发布了1.0版本,并且宣布1.0版本已经可以成熟的应用于生产环境.对于istio的各项主要功能,之前的文章已经介绍的非常详细,并且还会有更多的文 ...

  3. idou老师教你学Istio 04:Istio性能及扩展性介绍

    Istio的性能问题一直是国内外相关厂商关注的重点,Istio对于数据面应用请求时延的影响更是备受关注,而以现在Istio官方与相关厂商的性能测试结果来看,四位数的qps显然远远不能满足应用于生产的要 ...

  4. idou老师教你学Istio: 如何用Istio实现K8S Egress流量管理

    本文主要介绍在使用Istio时如何访问集群外服务,即对出口流量的管理. 默认安装的Istio是不能直接对集群外部服务进行访问的,如果需要将外部服务暴露给 Istio 集群中的客户端,目前有两种方案: ...

  5. idou老师教你学Istio06: 如何用istio实现流量迁移

    流量迁移是流量管理的一个重要功能.istio提供的流量管理功能将流量从基础设施扩展中解耦,支持动态请求路由,故障注入.超时重试.熔断和流量迁移等.流量迁移的主要目的是将流量从微服务的某一版本的逐步迁移 ...

  6. httos双向认证配置_idou老师教你学Istio 15:Istio实现双向TLS的迁移

    本文由华为云容器Istio团队撰稿,未经允许谢绝转载. 众所周知,HTTPS是用来解决 HTTP 明文协议的缺陷,在 HTTP 的基础上加入 SSL/TLS 协议,依靠 SSL 证书来验证服务器的身份 ...

  7. dbnetlib不存在或拒绝访问_idou老师教你学Istio 16:如何用 Istio 实现微服务间的访问控制...

    本文由华为云容器Istio团队撰稿,未经允许谢绝转载. 摘要 使用 Istio 可以很方便地实现微服务间的访问控制.本文演示了使用 Denier 适配器和黑白名单两种方法. 使用场景 有时需要对微服务 ...

  8. tree老师:每天五分钟教你学命令第1期

    本文由xiao_dou友情整理 点击标题下「蓝色微信名」可快速关注 在公司工作,我们第一个应该学会的命令,就是帮助命令,在遇到一些不懂的命令的时候可以使用man命令去查看man-pages文档. ma ...

  9. 雪珊教你学计算机,我日了数学老师

    我的语文是数学老师教的 (河南)李彩霞 没错,我的语文是数学老师教的,数学是语文老师教的.听起来像绕口令.现在流行嘲讽他人某一学科水平低下,经常用到这句话,也或者说是师娘教的!我不是师娘教的.我从小学 ...

最新文章

  1. C++重载(overload)和重写(覆盖)的区别?
  2. Centos7找不到ifconfig和netstat命令
  3. Windows上开发测试工具集合
  4. 苹果对其语音助手Siri进行显著改进:今秋将有7大新功能
  5. matlab2c使用c++实现matlab函数系列教程-conv函数
  6. work-conserving scheduling 是什么
  7. 阿里云云计算 7 ECS的产品优势
  8. Qt编程之mapx组件编程
  9. STM32使用MCUISP下载程序教程
  10. 2022年数学建模国赛(A题/B题/C题)评阅要点
  11. 时间管理---重要紧急四象限法
  12. 从零读懂CAN总线(上)
  13. Hdmi 和vga 接口有什么区别?
  14. video 满屏显示_HTML5 video播放器全屏(fullScreen)方法实例
  15. 联想G40-30进入PE鼠标键盘失灵解决方法
  16. String 翻转字符串
  17. 【Coding】LeetCode刷题记录
  18. 笔记本电脑购买指南与建议-知识点介绍
  19. 如何通过几何画板绘制双曲线
  20. 思科路由器常用接口说明

热门文章

  1. Python 字符串 - Python零基础入门教程
  2. BugkuCTF-PWN题pwn1-瑞士军刀
  3. php开发视频播放顺序,请问关于php代码运行顺序问题
  4. 湖南hp服务器虚拟化解决方案,HP刀片服务器虚拟化整合解决方案-20210729062411.docx-原创力文档...
  5. android jdbc 连接mysql数据库,android怎么用JDBC方法连接mysql数据库
  6. linux重新安装xrog文件,Linux下重新生成xorg.conf
  7. mysql抖动可能的原因,12 | 为什么我的MySQL会“抖”一下?
  8. 宏观经济学gdp计算方法_宏观经济学考研的重要考点
  9. android h5 有广告,那些H5在Android上显示的丧心病狂的坑
  10. 收藏功能_微软Edge获得了新的收藏夹菜单、PDF功能等