最近要对一些业务流程进行端到端的监控,这些业务是由几个微服务构成,微服务都是Java Spring编写的,我们需要了解整个业务涉及的各个模块的流量统计,性能状况,例如总共有多少次业务请求调用,多少次成功或失败的回复,每个步骤的耗时是多少等等。因此我也研究了一下如何在Java Spring应用中输出统计指标,通过Prometheus来统一收集指标,并在Grafana中通过不同的报表来呈现这些信息。

首先我们先定义一个简单的业务流程,假设我们有两个Spring的应用,一个是提供业务请求接口的HTTP调用,在收到业务请求后,把里面携带的信息发送到Kafka。另一个应用是订阅Kafka的消息,获取应用一发出的业务数据,并进行处理。

应用一

在start.spring.io网站里面新建一个应用,artifact的名字为kafka-sender-example,Dependancies里面选择Apache kafka for spring, Actuator, Spring Web。打开生成的项目文件,添加一个名为RemoteCommandController的类,实现一个http接口,代码如下:

package cn.roygao.kafkasenderexample;import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;import com.alibaba.fastjson.JSONObject;@RestController
public class RemoteCommandController {@Autowiredprivate KafkaTemplate<Integer, String> template;private final static Logger LOGGER = Logger.getLogger(RemoteCommandController.class.getName());@PostMapping("/sendcommand")public ResponseEntity<Map<String, Object>> sendCommand(@RequestBody JSONObject commandMsg) {String requestId = UUID.randomUUID().toString();String vin = commandMsg.getString("vin");String command = commandMsg.getString("command");LOGGER.info("Send command to vehicle:" + vin + ", command:" + command);Map<String, Object> requestIdObj = Collections.singletonMap("requestId", requestId);ProducerRecord<Integer, String> record = new ProducerRecord<>("remotecommand", 1, command);try {System.out.println(System.currentTimeMillis());template.send(record).get(10, TimeUnit.SECONDS);}catch (ExecutionException e) {LOGGER.info("Error");LOGGER.info(e.getMessage());}catch (TimeoutException | InterruptedException e) {LOGGER.info("Timeout");LOGGER.info(e.getMessage());}return ResponseEntity.accepted().body(requestIdObj);}
}

这个代码很简单,提供了一个POST的/sendcommand的接口,用户调用这个接口,提供车辆的VIN号和要发送的指令信息,收到请求之后,将把这些业务请求信息转发到Kafka的消息主题。这里用到了KafkaTemplate来进行消息的发送。为此,定义一个名为KafkaSender的配置类,代码如下:

package cn.roygao.kafkasenderexample;import java.util.HashMap;
import java.util.Map;import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;@Configuration
public class KafkaSender {@Beanpublic NewTopic topic() {return TopicBuilder.name("remotecommand").build();}@Beanpublic ProducerFactory<Integer, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// See https://kafka.apache.org/documentation/#producerconfigs for more propertiesreturn props;}@Beanpublic KafkaTemplate<Integer, String> kafkaTemplate() {return new KafkaTemplate<Integer, String>(producerFactory());}
}

代码里面定义了Kafka服务器的地址,消息主题等配置。

运行./mvnw clean package进行编译打包。

应用二

在start.spring.io网站里面新建一个应用,artifact的名字为kafka-sender-example,Dependancies里面选择Apache kafka for spring, Actuator。打开生成的项目文件,新建一个名为RemoteCommandHandler的类,实现接收Kafka信息的功能,代码如下:

package cn.roygao.kafkareceiverexample;import java.util.concurrent.TimeUnit;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
import org.springframework.stereotype.Component;import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;@Component
public class RemoteCommandHandler {private Timer timer;public RemoteCommandHandler(MeterRegistry registry) {this.timer = Timer.builder("kafka.process.latency").publishPercentiles(0.15, 0.5, 0.95).publishPercentileHistogram().register(registry);}@KafkaListener(id = "myId", topics = "remotecommand")public void listen(String in, ConsumerRecordMetadata meta) {long latency = System.currentTimeMillis()-meta.timestamp();timer.record(latency, TimeUnit.MILLISECONDS);}
}

这里类的构造函数需要传入一个MeterRetistry的对象,然后新建一个Timer对象,这是Micrometer提供的四种Metric之一,可以用来记录时长的信息。把这个Timer注册到MeterRegistry。

在listen方法中,定义了从Kafka的消息主题订阅消息,获取消息的metadata中的生成时间的时间戳,并与当前的时间进行比较,计算出从消息生成到消息消费的耗时,然后用timer来进行计算。Timer会按照之前的定义进行不同百分位区间的分布统计。

同样我们也需要定义一个Kafka的配置类,代码如下:

package cn.roygao.kafkareceiverexample;import java.util.HashMap;
import java.util.Map;import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;@Configuration
@EnableKafka
public class KafkaConfig {@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return props;}
}

在application.properties文件中添加以下配置:

spring.kafka.consumer.auto-offset-reset=earliest
server.port=7777
management.endpoints.web.exposure.include=health,info,prometheus
management.endpoints.enabled-by-default=true
management.endpoint.health.show-details: always

然后运行./mvnw clean package进行编译打包。

启动Kafka

这里我采用Docker的方式来启动Kafka,compose文件的内容如下:

---
version: '2'
services:zookeeper:image: confluentinc/cp-zookeeper:6.1.0hostname: zookeepercontainer_name: zookeeperports:- "2181:2181"environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000broker:image: confluentinc/cp-server:6.1.0hostname: brokercontainer_name: brokerdepends_on:- zookeeperports:- "9092:9092"- "9101:9101"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXTKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporterKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1KAFKA_JMX_PORT: 9101KAFKA_JMX_HOSTNAME: localhostKAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1CONFLUENT_METRICS_ENABLE: 'true'CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'schema-registry:image: confluentinc/cp-schema-registry:6.1.0hostname: schema-registrycontainer_name: schema-registrydepends_on:- brokerports:- "8081:8081"environment:SCHEMA_REGISTRY_HOST_NAME: schema-registrySCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081connect:image: cnfldemos/cp-server-connect-datagen:0.4.0-6.1.0hostname: connectcontainer_name: connectdepends_on:- broker- schema-registryports:- "8083:8083"environment:CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'CONNECT_REST_ADVERTISED_HOST_NAME: connectCONNECT_REST_PORT: 8083CONNECT_GROUP_ID: compose-connect-groupCONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configsCONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsetsCONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1CONNECT_STATUS_STORAGE_TOPIC: docker-connect-statusCONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverterCONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverterCONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081# CLASSPATH required due to CC-2422CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.1.0.jarCONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERRORcontrol-center:image: confluentinc/cp-enterprise-control-center:6.1.0hostname: control-centercontainer_name: control-centerdepends_on:- broker- schema-registry- connect- ksqldb-serverports:- "9021:9021"environment:CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"CONTROL_CENTER_REPLICATION_FACTOR: 1CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1CONFLUENT_METRICS_TOPIC_REPLICATION: 1PORT: 9021ksqldb-server:image: confluentinc/cp-ksqldb-server:6.1.0hostname: ksqldb-servercontainer_name: ksqldb-serverdepends_on:- broker- connectports:- "8088:8088"environment:KSQL_CONFIG_DIR: "/etc/ksql"KSQL_BOOTSTRAP_SERVERS: "broker:29092"KSQL_HOST_NAME: ksqldb-serverKSQL_LISTENERS: "http://0.0.0.0:8088"KSQL_CACHE_MAX_BYTES_BUFFERING: 0KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"KSQL_KSQL_CONNECT_URL: "http://connect:8083"KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'ksqldb-cli:image: confluentinc/cp-ksqldb-cli:6.1.0container_name: ksqldb-clidepends_on:- broker- connect- ksqldb-serverentrypoint: /bin/shtty: trueksql-datagen:image: confluentinc/ksqldb-examples:6.1.0hostname: ksql-datagencontainer_name: ksql-datagendepends_on:- ksqldb-server- broker- schema-registry- connectcommand: "bash -c 'echo Waiting for Kafka to be ready... && \cub kafka-ready -b broker:29092 1 40 && \echo Waiting for Confluent Schema Registry to be ready... && \cub sr-ready schema-registry 8081 40 && \echo Waiting a few seconds for topic creation to finish... && \sleep 11 && \tail -f /dev/null'"environment:KSQL_CONFIG_DIR: "/etc/ksql"STREAMS_BOOTSTRAP_SERVERS: broker:29092STREAMS_SCHEMA_REGISTRY_HOST: schema-registrySTREAMS_SCHEMA_REGISTRY_PORT: 8081rest-proxy:image: confluentinc/cp-kafka-rest:6.1.0depends_on:- broker- schema-registryports:- 8082:8082hostname: rest-proxycontainer_name: rest-proxyenvironment:KAFKA_REST_HOST_NAME: rest-proxyKAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

运行nohup docker compose up > ./kafka.log 2>&1 &即可启动。在浏览器输入localhost:9021,可以在控制台界面观看Kafka的相关信息。

分别运行应用一和应用二,然后调用POST http://localhost:8080/remotecommand接口发送业务请求,例如以下的命令:

curl --location --request POST 'http://localhost:8080/sendcommand' \
--header 'Content-Type: application/json' \
--data-raw '{"vin": "ABC123","command": "engine-start"
}'

在Kafka的控制台可以看到有一个remotecommand的消息主题,并且有一条信息发送和被消费。

启动Prometheus和Grafana

同样采用docker compose的方式来启动,compose文件内容如下:

services:prometheus:image: prom/prometheus-linux-amd64#network_mode: hostcontainer_name: prometheusrestart: unless-stoppedvolumes:- ./config:/etc/prometheus/command:- '--config.file=/etc/prometheus/prometheus.yaml'ports:- 9090:9090grafana:image: grafana/grafanauser: '472'#network_mode: hostcontainer_name: grafanarestart: unless-stoppedlinks:- prometheus:prometheusvolumes:- ./data/grafana:/var/lib/grafanaenvironment:- GF_SECURITY_ADMIN_PASSWORD=adminports:- 3000:3000depends_on:- prometheus

在这个compose文件的目录下新建一个config目录,里面存放prometheus的配置文件,内容如下:

scrape_configs:- job_name: 'Spring Boot Application input'metrics_path: '/actuator/prometheus'scrape_interval: 2sstatic_configs:- targets: ['172.17.0.1:7777']labels:application: 'My Spring Boot Application'

这里面的targets配置的是应用二暴露的地址,metrics_path是采集指标的路径。

在compose文件的目录下新建一个data/grafana目录,挂载给Grafana的文件目录,注意这里需要用chmod 777来修改目录权限,不然Grafana会报权限错误。

运行nohup docker compose up > ./prometheus.log 2>&1 &运行即可。

打开localhost:9090可以访问prometheus的页面,然后我们可以输入kafka进行搜索,可以看到应用二上报的kafka_process_latency的指标数据,按照我们的定义进行了0.15,0.5, 0.95这三个百分位区间的统计。

打开localhost:3000可以访问Grafana的页面,配置datasource,选择Prometheus这个容器的地址,然后save&test。之后可以新建一个dashboard,然后可以在报表里面显示kafka_process_latency的指标图形。

【未完待续】,还要增加对Http接口调用的Counter metric,以及在Grafana定义更多的报表,包括其他服务指标等等。

用Prometheus和Grafana监控Java Spring应用相关推荐

  1. Prometheus+Node_exporter+Grafana监控(附送保姆级别linux安装攻略)

    Prometheus+Node_exporter+Grafana监控(附送保姆级别linux安装攻略) 前言:监控系统技术选型 从开发语言上看,为了应对高并发和快速迭代的需求,监控系统的开发语言已经慢 ...

  2. 开源OceanBase如何与Prometheus与Grafana监控结合

    一.OceanBase 数据库简介 OceanBase 数据库是一个原生的分布式关系数据库,它是完全由阿里巴巴和蚂蚁集团自主研发的项目,近期成立单独的商业公司北京奥星贝斯进行运营,并于2021年6月1 ...

  3. 完整版SpringBoot集成Prometheus配置Grafana监控指标包括响应时间分位数TP90,TP80(图+文)

    1 缘起 监控作为线上服务管理的最重要一环, 每当新服务上线后,都需要监控上线服务运行情况,包括QPS.时延.成功响应率.内存和CPU使用情况等, 通过监控信息,清晰且及时地掌握当前服务的健康程度,以 ...

  4. Docker 环境下 Prometheus 和 Grafana 监控 Mysql

    一.安装 Mysql 和 mysqld-exporter 1.1.安装Mysql Docker 下 Mysql 安装:传送门 Linux 下 Mysql 安装:传送门 Win10 下 Mysql 安装 ...

  5. linux一键部署prometheus、grafana监控系统

    基于Linux系统部署.非docker容器部署方式. 所需要的包已经全部放 123云盘 提取码000 永久生效https://www.123pan.com/s/wkyA-bN7cv%E6%8F%90% ...

  6. 云栖社区特邀专家徐雷——Java Spring Boot开发实战系列课程【往期直播回顾】...

    徐雷,花名:徐雷frank:资深架构师,MongoDB中文社区联席主席,吉林大学计算机学士,上海交通大学硕士.从事了 10年+开发工作,专注于分布式架构,Java Spring Boot.Spring ...

  7. 监控工具—Prometheus—监控Java程序

    原文作者:青蛙小白 原文地址:Prometheus监控实践:使用Prometheus监控Java应用 目录 1.Prometheus JVM Client 2.Prometheus的服务发现 3.Gr ...

  8. Docker下Prometheus和Grafana三部曲之三:自定义监控项开发和配置

    本文是<Docker下Prometheus和Grafana三部曲>的终篇,前面的文章中,我们体验了快速搭建监控环境,也揭示了如何编排Docker容器来简化环境搭建过程,在监控系统中有个业务 ...

  9. prometheus监控java项目(jvm等):k8s外、k8s内

    前言 虽然可以使用jvisualvm之类的工具监控java项目,但是集群环境下,还是捉襟见肘,下面介绍如何用主流的prometheus来监控java项目. java项目配置 在pom.xml中添加依赖 ...

最新文章

  1. 终端打不开(右键和快捷键)?因为phthon?
  2. 为什么要打jar_生活在西北的兰州人过春节为什么要打太平鼓?
  3. 软件项目管理0706:工匠精神
  4. Andorid之提示java.lang.RuntimeException: Unable to start service net.gotev.uploadservice.UploadService@
  5. JavaScript Unicode字符操作
  6. 使用Upida/Jeneva.Net验证传入的JSON
  7. 简便 or 缺陷?Python 内置函数大揭秘!| 技术头条
  8. 牛客网 斐波那契数列
  9. 「管理数学基础」1.4 矩阵理论:相似矩阵
  10. DLL文件用加密工具加密不了怎么办
  11. [转载]Matlab定积分近似计算
  12. underscore.js 964 --- 1103行
  13. Cocos2d+protobuf仿JJ斗地主源码,win32和Android编译通过
  14. SpringBoot 入门
  15. 开机后我的计算机打不开,电脑开机后图标打不开怎么办
  16. 华为手机主界面的返回键怎么调出来_主按钮怎么变回来 华为手机的返回键怎么设置?...
  17. php小程序中的页面如何换行,解决微信小程序scroll-view换行问题
  18. 递归算法:爬楼梯问题
  19. 如何添加Android返回键的退出功能
  20. * web H5 网页 浏览器 蓝牙 Bluetooth

热门文章

  1. 分众传媒CEO江南春:没钱是这个社会进步最大的动力
  2. 阿里北京裁员,首批员工3.28之前告别大文娱
  3. 如何查看自己的ubuntu系统版本
  4. FYD-Focus Your Distribution-关注你的分布:异常检测和定位的从粗到细的非对比性学习-FYD
  5. 80x86 微处理器
  6. 每日一句_《客中行》
  7. 基础(待续)-弹群协同作战
  8. 文科生如何就量子物理和相对论谈笑风生? 我读过的10本有料有趣科普书
  9. 灵敏度(sensitivity)和特异性(specificity)的计算
  10. Mac Grapher(图形软件)