OpenTracing 是针对应用程序和 OSS(Open-Source Software)软件包的开放分布式追踪标准。许多追踪后端服务都支持 OpenTracing API,例如 Jaeger、Zipkin 和 SkyWalking。

本文详细介绍如何使用 Jaeger 通过 OpenTracing API 追踪 Pulsar 消息。

准备工作

在开始前,需要安装好 JDK 8、Maven 3 和 Pulsar(集群模式或单机模式)。如果还没有安装 Pulsar,点击这里按照提示进行安装。

第 1 步:启动 Jaeger 后端

  1. 在 Docker 中启动 Jaeger 后端。

    docker run -d -p 6831:6831/udp -p 16686:16686 jaegertracing/all-in-one:latest
    

    成功启动 Jaeger 后,就可以打开 Jaeger UI 网站。

    提示

    如何你没有 Jaeger Docker 环境,可以下载二进制文件或通过源代码构建。

  2. 访问 http://localhost:16686,无需填写用户名或密码就可以打开 Jeager UI 网站。

第 2 步:添加 maven dependencies

本示例使用 Open Tracing Pulsar Client,它是 Pulsar Client 与 OpenTracing API(基于 Pulsar Client Interceptors)的集成,用于追踪 Pulsar 消息。OpenTracing Pulsar Client 由 StreamNative 研发,是 StreamNatvie Hub 中的监控工具。

添加 Jaeger client dependency 以连接到 Jaeger 后端。

<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>2.5.1</version>
</dependency><dependency><groupId>io.streamnative</groupId><artifactId>opentracing-pulsar-client</artifactId><version>0.1.0</version>
</dependency><dependency><groupId>io.jaegertracing</groupId><artifactId>jaeger-client</artifactId><version>1.2.0</version>
</dependency>

第 3 步:使用 OpenTracing Pulsar Client

为便于理解,本示例假设有 2 个 Job 和 2 个 topic。Job-1 向 topic-A 发送消息,Job-2 从 topc-A 消费消息。当 Job 2 收到 topic-A 的消息后,Job 2 会向 topic-B 发送消息,然后 Job-3 从 topic-B 消费消息。因此,在这种情况下有 2 个 topic、2 个 producer 和 2 个 consumer。

要完成上述工作场景中的任务,需要启动三个应用程序。

  • Job-1:发布消息到 topic-A
  • Job-2:消费 topic-A 中的消息,并发布消息到 topic-B
  • Job-3:消费 topic-B 中的消息

Job-1

以下示例为发布消息至 topic-A。

Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);
Configuration configuration = new Configuration("Job-1").withSampler(samplerConfig).withReporter(reporterConfig);Tracer tracer = configuration.getTracer();
GlobalTracer.registerIfAbsent(tracer);PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();Producer<String> producerA = client.newProducer(Schema.STRING).topic("topic-A").intercept(new TracingProducerInterceptor()).create();for (int i = 0; i < 10; i++) {producerA.newMessage().value(String.format("[%d] Hello", i)).send();
}

Job-2

以下示例为从 topic-A 消费消息,并将消息发布到 topic-B。

Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);
Configuration configuration = new Configuration("Job-2").withSampler(samplerConfig).withReporter(reporterConfig);Tracer tracer = configuration.getTracer();
GlobalTracer.registerIfAbsent(tracer);PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();Consumer<String> consumer = client.newConsumer(Schema.STRING).topic("topic-A").subscriptionName("open-tracing").subscriptionType(SubscriptionType.Shared).intercept(new TracingConsumerInterceptor<>()).subscribe();Producer<String> producerB = client.newProducer(Schema.STRING).topic("topic-B").intercept(new TracingProducerInterceptor()).create();while (true) {Message<String> received = consumer.receive();SpanContext context = TracingPulsarUtils.extractSpanContext(received, tracer);TypedMessageBuilder<String> messageBuilder = producerB.newMessage();messageBuilder.value(received.getValue() + " Pulsar and OpenTracing!");// Inject parent span contexttracer.inject(context, Format.Builtin.TEXT_MAP, new TypeMessageBuilderInjectAdapter(messageBuilder));messageBuilder.send();consumer.acknowledge(received);
}

Job-3

以下示例为从 topic-B 消费消息。

Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);
Configuration configuration = new Configuration("Job-3").withSampler(samplerConfig).withReporter(reporterConfig);Tracer tracer = configuration.getTracer();
GlobalTracer.registerIfAbsent(tracer);PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();Consumer<String> consumer = client.newConsumer(Schema.STRING).topic("topic-B").subscriptionName("open-tracing").subscriptionType(SubscriptionType.Shared).intercept(new TracingConsumerInterceptor<>()).subscribe();while (true) {Message<String> received = consumer.receive();System.out.println(received.getValue());consumer.acknowledge(received);
}

现在,可以分别运行 Job-3、Job-2 和 Job-1。控制台中会出现 Job-3 接收的日志,如下:

[0] Hello Pulsar and OpenTracing!
[1] Hello Pulsar and OpenTracing!
...
[9] Hello Pulsar and OpenTracing!

现在,你可以再次打开 Jaeger UI,页面中会出现十条消息追踪链路。

点击任务名称即可查看消息追踪链路的详细信息。

可以从 span 名称轻松辨别是 producer 还是 consumer 发布了此条消息,span 名称格式为 To__<topic-name>From__<topic-name>__<subscription_name>

总结

OpenTracing Pulsar Client 集成了 Pulsar 客户端和 OpenTracing,可以实现轻松地追踪消息。如果你在应用程序中使用了 Pulsar 和 OpenTracing,赶快试一试吧!

如何使用 OpenTracing 和 Jaeger 追踪 Pulsar 消息相关推荐

  1. pulsar 容量_[Pulsar系列] 10分钟学会Pulsar消息系统概念

    Apache Pulsar Pulsar是一个支持多租户的.高性能的服务与服务之间消息通讯的解决方案,最初由雅虎开发,现在由Apache软件基金会管理. Pulsar在Yahoo的生产环境运行了三年多 ...

  2. 大厂实践|Apache Pulsar 消息队列在拉卡拉的应用

    关于 Apache Pulsar Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息.存储.轻量化函数式计算为一体,采用计算与存储分离架构设计,支 ...

  3. [资源分享] 【Springboot】实例讲解Springboot整合OpenTracing分布式链路追踪系统(Jaeger和Zipkin)

    1 分布式追踪系统 随着大量公司把单体应用重构为微服务,对于运维人员的责任就更加重大了.架构更复杂.应用更多,要从中快速诊断出问题.找到性能瓶颈,并不是一件容易的事.因此,也随着诞生了一系列面向Dev ...

  4. Redis、Kafka 和 Pulsar 消息队列对比

    点击关注公众号,Java干货及时送达 导语 | 市面上有非常多的消息中间件,rabbitMQ.kafka.rocketMQ.pulsar. redis等等,多得令人眼花缭乱.它们到底有什么异同,你应该 ...

  5. Redis、Kafka 和 Pulsar 消息队列对比,写得太好了!

    市面上有非常多的消息中间件,rabbitMQ.kafka.rocketMQ.pulsar. redis等等,多得令人眼花缭乱.它们到底有什么异同,你应该选哪个? 本文尝试通过技术演进的方式,以redi ...

  6. 面试官:请你从架构演进的角度讲讲redis、kafka和 pulsar消息队列

    导语 | 市面上有非常多的消息中间件,rabbitMQ.kafka.rocketMQ.pulsar. redis等等,多得令人眼花缭乱.它们到底有什么异同,你应该选哪个?本文尝试通过技术演进的方式,以 ...

  7. CentOS7 搭建Pulsar 消息队列环境,CentOS(Linux)部署Pulsar,亲测成功,以及Python操作Pulsar实例驱动

    在 最佳开源数据库与数据分析平台奖 中,之前曾连续两年入选的 Kafka 意外滑铁卢落选,取而代之的是新兴项目 Pulsar,Bossie Awards中对 Pulsar 点评如下:"Pul ...

  8. 第一条Pulsar消息发送

    什么是Pulsar pulsar是一个多租户.高性能server-to-srever消息解决方案,最初由雅虎开发,现在由apache维护. Pulsar的核心特性: 多集群云原生支持 低延迟 良好的伸 ...

  9. OpenTracing开放式分布式追踪规范说明介绍整理

    一.OpenTracing:开放式分布式追踪规范 两个组成部分 Trace:调用链,追踪链路,由span组成. Span:跨度点,一次跨方法调用 span属性 Operation name:操作名词 ...

最新文章

  1. VTA:深度学习加速器堆栈
  2. nginx php unix负载,使用nginx配置多个php fastcgi负载均衡
  3. ux和ui_从UI切换到UX设计
  4. 揭秘全球开发最新趋势!JS开发者达1380万,C#超越PHP,Rust增长最快
  5. 云原生安全厂商小佑科技获达泰资本千万级投资
  6. Ubuntu16.04 Swap的开闭
  7. 如果把Python代码写成这样子就太难看了
  8. varnish运行机制及管理优化
  9. 拓端tecdat|R语言关联挖掘实例(购物篮分析)
  10. 远程580解锁bl工具_一年让你多挣4万的工具丨又增VIVO新款保资料解锁!
  11. CGAL 计算几何库
  12. Not showing null elements
  13. Jmeter_基本操作-取样器
  14. CS5213芯片|HDMI to VGA转换头芯片资料分享
  15. 平面设计证书怎么考,平面设计证书有用吗:夏雨老师
  16. python的实验报告大一心理_Python程序设计实验报告: 实验六
  17. CTSCAPIO 2017游记
  18. 在html5页面中添加canvas,HTML页面中添加Canvas标签示例
  19. php球鞋,最炫中国风!中国元素球鞋盘点!
  20. FFA-Net: Feature Fusion Attention Network for Single Image Dehazing

热门文章

  1. Python入门三:安装Pyqt5
  2. 详解,N沟道MOS管和P沟道MOS管
  3. C# 模拟鼠标移动和点击(转载)
  4. socket是什么?有什么作用?
  5. MATLAB-工具箱-如何导出拟合系数?
  6. Exchange 2019数据库切换
  7. 【iOS】监听耳机状态
  8. KaTeX 数学符号列表
  9. [实践篇]13.8 如何解析gcore?
  10. 单片机C语言数据存储原理,介绍单片机中C语言的数据存储与程序编写