Opentracing基本模型

image.png

如图,在跟踪链中有以下几个比较重要的数据结构和概念:

span:标识一次分布式调用,其自身包含了id,parentId(指向上级Span的id), traceIds,服务名称等重要属性,其应尽量保持精简;

trace:标识整个请求链,即一些列Span的组合。其自身的ID将贯穿整个调用链,其中的每个Span都必须携带这个traceId,因此traceId将在整个调用链中传递;

cs:客户端发起请求,标志Span的开始;

sr:服务端接收到请求,并开始处理内部事务,其中sr - cs则为网络延迟和时钟抖动;

ss:服务端处理完请求,返回响应内容,其中ss - sr则为服务端处理请求耗时;

cr:客户端接收到服务端响应内容,标志着Span的结束,其中cr - ss则为网络延迟和时钟抖动。

客户端调用时间=cr-cs

服务端处理时间=sr-ss

分布式系统调用跟踪的基本架构要求

低侵入性,高性能,高可用容错,低丢失率等。

基于GRPC的分布式系统调用跟踪实践

创建TraceContext

TraceContext通过Threadlocal对span进行保存,并且将traceid和spanid向底层服务传递,zebra对线程上下文传递进行了封装,具体参照GRPC如何实现公共参数与业务参数分离传递下面是TraceContext具体代码

public class TraceContext{

private static final String SPAN_LIST_KEY = "spanList";

public static final String TRACE_ID_KEY = "traceId";

public static final String SPAN_ID_KEY = "spanId";

public static final String ANNO_CS = "cs";

public static final String ANNO_CR = "cr";

public static final String ANNO_SR = "sr";

public static final String ANNO_SS = "ss";

private TraceContext(){}

public static void setTraceId(String traceId) {

RpcContext.getContext().set(TRACE_ID_KEY, traceId);

}

public static String getTraceId() {

return (String) RpcContext.getContext().get(TRACE_ID_KEY);

}

public static String getSpanId() {

return (String) RpcContext.getContext().get(SPAN_ID_KEY);

}

public static void setSpanId(String spanId) {

RpcContext.getContext().set(SPAN_ID_KEY, spanId);

}

@SuppressWarnings("unchecked")

public static void addSpan(Span span){

((List)RpcContext.getContext().get(SPAN_LIST_KEY)).add(span);

}

@SuppressWarnings("unchecked")

public static List getSpans(){

return (List) RpcContext.getContext().get(SPAN_LIST_KEY);

}

public static void clear(){

RpcContext.getContext().remove(TRACE_ID_KEY);

RpcContext.getContext().remove(SPAN_ID_KEY);

RpcContext.getContext().remove(SPAN_LIST_KEY);

}

public static void start(){

clear();

RpcContext.getContext().set(SPAN_LIST_KEY, new ArrayList());

}

}

创建TraceAgent

TraceAgent将span信息上传至kafka,代码如下:

public class TraceAgent {

private GrpcProperties grpcProperties;

private KafkaSender sender;

private AsyncReporter report;

public TraceAgent() {

grpcProperties = SpringContextUtils.getBean(GrpcProperties.class);

sender = KafkaSender.newBuilder().bootstrapServers(grpcProperties.getCallChainUpdAddr()).topic("zipkin").encoding(Encoding.JSON).build();

report = AsyncReporter.builder(sender).build();

}

public void send(final List spans){

spans.forEach(item ->{

report.report(item);

});

}

}

创建ZebraClientTracing

ZebraClientTracing用于记录调用端的span信息,具体代码如下:

@Component

public class ZebraClientTracing {

public Span startTrace(String method) {

String id = IdUtils.get() + "";

String traceId = null;

if (null == TraceContext.getTraceId()) {

TraceContext.start();

traceId = id;

} else {

traceId = TraceContext.getTraceId();

}

long timestamp = System.currentTimeMillis() * 1000;

// 注册本地信息

Endpoint endpoint = Endpoint.newBuilder().ip(NetUtils.getLocalHost()).serviceName(EtcdRegistry.serviceName)

.port(50003).build();

// 初始化span

Span consumerSpan = Span.newBuilder().localEndpoint(endpoint).id(id).traceId(traceId)

.parentId(TraceContext.getSpanId() + "").name(EtcdRegistry.serviceName).timestamp(timestamp)

.addAnnotation(timestamp, TraceContext.ANNO_CS).putTag("method", method)

.putTag("pkgId", RpcContext.getContext().getAttachment("pkg")).build();

// 将tracing id和spanid放到上下文

RpcContext.getContext().get().put(TraceContext.TRACE_ID_KEY, consumerSpan.traceId());

RpcContext.getContext().get().put(TraceContext.SPAN_ID_KEY, String.valueOf(consumerSpan.id()));

return consumerSpan;

}

public void endTrace(Span span, Stopwatch watch,int code) {

span = span.toBuilder().addAnnotation(System.currentTimeMillis() * 1000, TraceContext.ANNO_CR)

.duration(watch.stop().elapsed(TimeUnit.MICROSECONDS)).putTag("code", code+"").build();

TraceAgent traceAgent = new TraceAgent();

traceAgent.send(TraceContext.getSpans());

}

}

创建ZebraServerTracing

ZebraServerTracing用于记录服务端的span信息,具体代码如下:

@Component

public class ZebraServerTracing {

public Span startTrace(String method) {

String traceId = (String) RpcContext.getContext().get(TraceContext.TRACE_ID_KEY);

String parentSpanId = (String) RpcContext.getContext().get(TraceContext.SPAN_ID_KEY);

String id = IdUtils.get() + "";

TraceContext.start();

TraceContext.setTraceId(traceId);

TraceContext.setSpanId(parentSpanId);

long timestamp = System.currentTimeMillis() * 1000;

Endpoint endpoint = Endpoint.newBuilder().ip(NetUtils.getLocalHost()).serviceName(EtcdRegistry.serviceName)

.port(50003).build();

Span providerSpan = Span.newBuilder().id(id).parentId(parentSpanId).traceId(traceId)

.name(EtcdRegistry.serviceName).timestamp(timestamp).localEndpoint(endpoint)

.addAnnotation(timestamp, TraceContext.ANNO_SR).putTag("method", method)

.putTag("pkgId", RpcContext.getContext().getAttachment("pkg"))

.build();

TraceContext.addSpan(providerSpan);

return providerSpan;

}

public void endTrace(Span span, Stopwatch watch,int code) {

span = span.toBuilder().addAnnotation(System.currentTimeMillis() * 1000, TraceContext.ANNO_SS)

.duration(watch.stop().elapsed(TimeUnit.MICROSECONDS)).putTag("code", code+"").build();

TraceAgent traceAgent = new TraceAgent();

traceAgent.send(TraceContext.getSpans());

}

}

创建grpc client拦截器

public class HeaderClientInterceptor implements ClientInterceptor {

private static final Logger log = LogManager.getLogger(HeaderClientInterceptor.class);

private final ZebraClientTracing clientTracing;

public static ClientInterceptor instance() {

return new HeaderClientInterceptor();

}

private HeaderClientInterceptor() {

clientTracing = SpringContextUtils.getBean(ZebraClientTracing.class);

}

@Override

public ClientCall interceptCall(MethodDescriptor method,

CallOptions callOptions, Channel next) {

return new SimpleForwardingClientCall(next.newCall(method, callOptions)) {

//判断API网关是否要打开调用链

boolean isGatewayTracing = "1".equals(RpcContext.getContext().getAttachment(ZebraConstants.ZEBRA_OPEN_TRACING))?true:false;

boolean isSubTracing = RpcContext.getContext().get(TraceContext.TRACE_ID_KEY)!=null?true:false;

Stopwatch watch =null;

Span span =null;

@Override

public void start(Listener responseListener, Metadata headers) {

if(isSubTracing||isGatewayTracing){

span =clientTracing.startTrace(method.getFullMethodName());

watch = Stopwatch.createStarted();

}

copyThreadLocalToMetadata(headers);

super.start(new SimpleForwardingClientCallListener(responseListener) {

@Override

public void onHeaders(Metadata headers) {

super.onHeaders(headers);

}

@Override

public void onClose(Status status, Metadata trailers) {

super.onClose(status, trailers);

if(isSubTracing||isGatewayTracing)

clientTracing.endTrace(span, watch,status.getCode().value());

}

}, headers);

}

};

}

private void copyThreadLocalToMetadata(Metadata headers) {

Map attachments = RpcContext.getContext().getAttachments();

Map values = RpcContext.getContext().get();

try {

if (!attachments.isEmpty()) {

headers.put(GrpcUtil.GRPC_CONTEXT_ATTACHMENTS, SerializerUtil.toJson(attachments));

}

if (!values.isEmpty()) {

headers.put(GrpcUtil.GRPC_CONTEXT_VALUES, SerializerUtil.toJson(values));

}

} catch (Throwable e) {

log.error(e.getMessage(), e);

}

}

}

创建grpc server拦截器

public class HeaderServerInterceptor implements ServerInterceptor {

private static final Logger log = LogManager.getLogger(HeaderServerInterceptor.class);

private final ZebraServerTracing serverTracing;

public static ServerInterceptor instance() {

return new HeaderServerInterceptor();

}

private HeaderServerInterceptor() {

serverTracing = SpringContextUtils.getBean(ZebraServerTracing.class);

}

@Override

public Listener interceptCall(ServerCall call, final Metadata headers,

ServerCallHandler next) {

return next.startCall(new SimpleForwardingServerCall(call) {

boolean isSubTracing = RpcContext.getContext().get(TraceContext.TRACE_ID_KEY) != null ? true : false;

Stopwatch watch = null;

Span span = null;

@Override

public void request(int numMessages) {

if (isSubTracing) {

span = serverTracing.startTrace(call.getMethodDescriptor().getFullMethodName());

watch = Stopwatch.createStarted();

}

InetSocketAddress remoteAddress = (InetSocketAddress) call.getAttributes()

.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);

RpcContext.getContext().setAttachment(ZebraConstants.REMOTE_ADDRESS, remoteAddress.getHostString());

copyMetadataToThreadLocal(headers);

log.debug("FullMethodName:{},RemoteAddress={},attachments={},context={}",

call.getMethodDescriptor().getFullMethodName(), remoteAddress.getHostString(),

headers.get(GrpcUtil.GRPC_CONTEXT_ATTACHMENTS), headers.get(GrpcUtil.GRPC_CONTEXT_VALUES));

super.request(numMessages);

}

@Override

public void close(Status status, Metadata trailers) {

delegate().close(status, trailers);

if(isSubTracing)

serverTracing.endTrace(span, watch,status.getCode().value());

}

}, headers);

}

private void copyMetadataToThreadLocal(Metadata headers) {

String attachments = headers.get(GrpcUtil.GRPC_CONTEXT_ATTACHMENTS);

String values = headers.get(GrpcUtil.GRPC_CONTEXT_VALUES);

try {

if (attachments != null) {

Map attachmentsMap = SerializerUtil.fromJson(attachments,

new TypeToken>() {

}.getType());

RpcContext.getContext().setAttachments(attachmentsMap);

}

if (values != null) {

Map valuesMap = SerializerUtil.fromJson(values, new TypeToken>() {

}.getType());

for (Map.Entry entry : valuesMap.entrySet()) {

RpcContext.getContext().set(entry.getKey(), entry.getValue());

}

}

} catch (Throwable e) {

log.error(e.getMessage(), e);

}

}

}

java高可用grpc_GRPC java 分布式调用链跟踪实践相关推荐

  1. 使用 docker 构建分布式调用链跟踪框架skywalking

    一旦你的程序docker化之后,你会遇到各种问题,比如原来采用的本地记日志的方式就不再方便了,虽然你可以挂载到宿主机,但你使用 --scale 的话,会导致记录日志异常,所以最好的方式还是要做日志中心 ...

  2. 快狗打车CTO沈剑:低成本搞定分布式调用链追踪系统

    本文根据沈剑老师在[2020 Gdevops全球敏捷运维峰会]现场演讲内容整理而成. 讲师介绍 沈剑,到家集团技术VP&技术委员会主席,快狗打车CTO,互联网架构技术专家,"架构师之 ...

  3. 小程聊微服务-自己动手扩展分布式调用链

    一.说在前面 微服务是当下最火的词语,现在很多公司都在推广微服务,当服务越来越多的时候,我们是否会纠结以下几个问题: 面对一笔超时的订单,究竟是哪一步处理时间超长呢? 数据由于并发莫名篡改,到底都谁有 ...

  4. 程超:手把手教你动手扩展分布式调用链

    一.说在前面 微服务是当下最火的词语,现在很多公司都在推广微服务,当服务越来越多的时候,我们是否会纠结以下几个问题: 面对一笔超时的订单,究竟是哪一步处理时间超长呢? 数据由于并发莫名篡改,到底都谁有 ...

  5. java分布式+高可用_[Java复习] 分布式高可用-Hystrix

    什么是Hystrix? Hystrix 可以让我们在分布式系统中对服务间的调用进行控制,加入一些调用延迟或者依赖故障的容错机制. Hystrix 的设计原则 对依赖服务调用时出现的调用延迟和调用失败进 ...

  6. 高级架构师,精通JAVA/高并发/微服务/分布式/中间件

    JAVA高级架构师专栏 大纲介绍 Java基础复习 单点系统 Springboot框架应用 页面开发模版引擎 Java核心基础 JVM调优 高并发优化 Spring.Mybatis.Springboo ...

  7. 如何保证高可用?java测试工程师测试的方法

    一面 正式批(别看了都是正式批,提前批就没让我面!)一面.面试时间 08-18,19:53 - 21:08,全程1个小时15分钟.涉及内容:项目.网络.数据库.算法题 1. 自我介绍 2. 项目中的有 ...

  8. 现阶段Java高可用集群架构与微服务架构的简单分析

    一.如何选择 1.高可用集群 适用于中小型创业公司项目架构,小型技术团队快速迭代版本发布部署需求,前期低成本运行,爆发时可通过投入适量成本横向扩容服务器抗压. 特点: 前期技术开发成本低 一定的服务器 ...

  9. VDL:唯品会强一致、高可用、高性能分布式日志存储介绍(产品篇)

    "You can't fully understand databases, NoSQL stores, key value stores, replication, paxos, hado ...

最新文章

  1. 素数环问题---深度搜索遍历
  2. 被“钱”困住的开源开发者们!
  3. mysql索引之间的区别
  4. 《Oracle SQL疑难解析》——1.6 批量地从一个表中复制数据到另一个表
  5. Excel 下来公式 内容却一样
  6. 笔记本8G+256G固态免费送,吃鸡不吃力,包邮!
  7. 【lora无线数传通信模块】亿佰特E22串口模块用于物联网地震预警传感通信方案
  8. 专访微软Greg Yang,构建神经网络的底层语言,探索AI的“万物理论”
  9. C++模板函数只声明一种类型但输入两种类型参数
  10. 面向对象编程其实很简单——Python 面向对象(初级篇)
  11. 【kafka】kafka Producer Metadata概述及源码分析
  12. 微服务主见传递ID还是json_Feign实现微服务间文件传递
  13. 基于avr atmega16单片机控制2路9g舵机进行0-180°旋转,1602显示运行状态,adc采样控制舵机转速。然后适合于初学avr单片机的朋友
  14. Proxomitron之天涯只看规则
  15. js连接蓝牙打印机打印一维码和二维码
  16. Conflux TokenGazer AMA活动内容回顾
  17. Error from chokidar (C:\): Error: EBUSY: resource busy or locked, lstat ‘D:\DumpStack.log.tmp
  18. Python 优雅地利用两点经纬度计算地理空间距离
  19. Linux计划任务要怎么弄?
  20. python面试题(六)——数据库mysql、redis篇

热门文章

  1. 访问nfs_通过NFS访问编年引擎
  2. java转换为c#_C#vs Java哪一个更快? 将25k C#转换为Java(2)
  3. Java中的装饰器设计模式
  4. gradle 构建应用流程_使用Gradle构建和应用AST转换
  5. lambda 高并发_玩Java 8 – Lambda和并发
  6. gradle ant_区分基于Ant目标的Gradle任务
  7. Java 10迁移建议
  8. 将Swagger与Spring Boot REST API集成
  9. Java命令行界面(第5部分):JewelCli
  10. 适用于Java开发人员的Elasticsearch:简介