温馨提示:Akka 中文指南的 GitHub 地址为「akka-guide」,欢迎大家StarFork,纠错。

文章目录

  • 集群指标扩展
    • 依赖
    • 简介
    • 指标收集器
    • 指标事件
    • Hyperic Sigar 配置
    • 自适应负载平衡
    • 订阅指标事件
    • 自定义指标收集器
    • 配置

集群指标扩展

依赖

为了使用集群指标扩展(Cluster Metrics Extension),你需要将以下依赖添加到你的项目中:

<!-- Maven -->
<dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-cluster-metrics_2.12</artifactId><version>2.5.22</version>
</dependency><!-- Gradle -->
dependencies {compile group: 'com.typesafe.akka', name: 'akka-cluster-metrics_2.12', version: '2.5.22'
}<!-- sbt -->
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-metrics" % "2.5.22"

并将以下配置添加到application.conf中:

akka.extensions = [ "akka.cluster.metrics.ClusterMetricsExtension" ]

简介

集群的成员节点可以收集系统健康指标,并在集群指标扩展的帮助下将其发布到其他集群节点和系统事件总线上注册的订阅者。

集群指标信息主要用于负载均衡路由器(load-balancing routers),也可用于实现基于指标的高级节点生命周期,例如当 CPU 窃取时间过多时“节点让它崩溃”。

如果启用了该功能,状态为「WeaklyUp」的集群成员将参与集群指标收集和分发。

指标收集器

指标集合委托给akka.cluster.metrics.MetricsCollector.的实现。

不同的收集器(collector)实现提供发布到集群的不同指标子集。当未设置Sigar时,某些消息路由和让其崩溃功能可能无法工作。

集群指标扩展附带两个内置收集器实现:

  1. akka.cluster.metrics.SigarMetricsCollector,它要求提供Sigar,并且更丰富/更精确
  2. akka.cluster.metrics.JmxMetricsCollector,用作回退,不太丰富/精确

你也可以插入(plug-in)自己的指标收集器实现。

默认情况下,指标扩展将使用收集器提供程序回滚,并尝试按以下顺序加载它们:

  1. 配置的用户提供的收集器
  2. 内置的akka.cluster.metrics.SigarMetricsCollector
  3. 最后是akka.cluster.metrics.JmxMetricsCollector

指标事件

指标扩展定期地将集群指标的当前快照发布到节点系统事件总线。

发布间隔由akka.cluster.metrics.collector.sample-interval设置控制。

akka.cluster.metrics.ClusterMetricsChanged事件的有效负载将包含节点的最新指标,以及在收集器采样间隔期间接收到的其他群集成员节点指标流言。

你可以通过指标侦听器 Actor 订阅这些事件,以实现自定义节点生命周期:

ClusterMetricsExtension.get(system).subscribe(metricsListenerActor);

Hyperic Sigar 配置

与可以从普通 JMX MBean 中检索到的指标相比,用户提供的指标收集器和内置的指标收集器都可以选择使用Hyperic Sigar来获取更广泛、更准确的指标范围。

Sigar使用的是本机 O/S 库,需要提供库,即在运行时将 O/S 本机库部署、提取和加载到 JVM 中。

用户可以通过以下方式之一提供Sigar类和本机库:

  1. 使用「Kamon sigar-loader」加载器用作用户项目的项目依赖项。指标扩展将根据需要在Kamon sigar provisioner的帮助下提取和加载Sigar库。
  2. 使用「Kamon sigar-loader」加载器作为 Java 代理:java -javaagent:/path/to/sigar-loader.jarKamon sigar loader代理将在 JVM 启动期间提取和加载Sigar库。
  3. sigar.jar放在classpath上,将 O/S 的Sigar本机库放在java.library.path上。用户需要手动管理项目依赖项和库部署。
  • 警告:当使用Kamon sigar loader并在同一主机上运行同一应用程序的多个实例时,必须确保将Sigar库提取到一个唯一的每个实例目录中。你可以使用akka.cluster.metrics.native-library-extract-folder配置设置控制提取目录。

为了使用Sigar的功能,需要在用户项目中添加以下依赖项:

<!-- Maven -->
<dependency><groupId>io.kamon</groupId><artifactId>sigar-loader</artifactId><version>1.6.6-rev002</version>
</dependency><!-- Gradle -->
dependencies {compile group: 'io.kamon', name: 'sigar-loader', version: '1.6.6-rev002'
}<!-- sbt -->
libraryDependencies += "io.kamon" % "sigar-loader" % "1.6.6-rev002"

你可以从「Maven Central」中下载Kamon sigar loader的依赖包。

自适应负载平衡

AdaptiveLoadBalancingPool / AdaptiveLoadBalancingGroup根据集群指标数据对集群节点的消息执行负载平衡。它使用随机选择的路由,概率来自于相应节点的剩余容量。它可以配置为使用特定的MetricsSelector来产生概率,即a.k.a.权重:

  • heap / HeapMetricsSelector - 已用和最大 JVM 堆内存。基于剩余堆容量的权重;(max - used) / max
  • load / SystemLoadAverageMetricsSelector - 过去 1 分钟的系统平均负载,在 Linux 系统顶部可以找到相应的值。如果系统平均负载接近cpus/cores的数量,则系统可能接近瓶颈。基于剩余负载能力的权重;1 - (load / processors)
  • cpu / CpuMetricsSelector - 以百分比表示的 CPU 利用率,User + Sys + Nice + Wait之和。基于剩余 CPU 容量的权重;1 - utilization
  • mix / MixMetricsSelector - 组合堆、CPU 和负载。基于组合选择器剩余容量平均值的权重。
  • akka.cluster.metrics.MetricsSelector的任何自定义实现

使用「指数加权移动平均值」平滑收集的指标值。在「集群配置」中,你可以调整过去的数据相对于新数据的衰减速度。

让我们来看看这台正在运行的路由器。还有什么比计算阶乘(factorial)更苛刻的呢?

执行阶乘计算的后端工作程序:

public class FactorialBackend extends AbstractActor {@Overridepublic Receive createReceive() {return receiveBuilder().match(Integer.class,n -> {CompletableFuture<FactorialResult> result =CompletableFuture.supplyAsync(() -> factorial(n)).thenApply((factorial) -> new FactorialResult(n, factorial));pipe(result, getContext().dispatcher()).to(getSender());}).build();}BigInteger factorial(int n) {BigInteger acc = BigInteger.ONE;for (int i = 1; i <= n; ++i) {acc = acc.multiply(BigInteger.valueOf(i));}return acc;}
}

接收用户作业并通过路由器委派到后端的前端:

public class FactorialFrontend extends AbstractActor {final int upToN;final boolean repeat;LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);ActorRef backend =getContext().actorOf(FromConfig.getInstance().props(), "factorialBackendRouter");public FactorialFrontend(int upToN, boolean repeat) {this.upToN = upToN;this.repeat = repeat;}@Overridepublic void preStart() {sendJobs();getContext().setReceiveTimeout(Duration.ofSeconds(10));}@Overridepublic Receive createReceive() {return receiveBuilder().match(FactorialResult.class,result -> {if (result.n == upToN) {log.debug("{}! = {}", result.n, result.factorial);if (repeat) sendJobs();else getContext().stop(getSelf());}}).match(ReceiveTimeout.class,x -> {log.info("Timeout");sendJobs();}).build();}void sendJobs() {log.info("Starting batch of factorials up to [{}]", upToN);for (int n = 1; n <= upToN; n++) {backend.tell(n, getSelf());}}
}

如你所见,路由器的定义方式与其他路由器相同,在这种情况下,配置如下:

akka.actor.deployment {/factorialFrontend/factorialBackendRouter = {# Router type provided by metrics extension.router = cluster-metrics-adaptive-group# Router parameter specific for metrics extension.# metrics-selector = heap# metrics-selector = load# metrics-selector = cpumetrics-selector = mix#routees.paths = ["/user/factorialBackend"]cluster {enabled = onuse-roles = ["backend"]allow-local-routees = off}}
}

只有router类型和metrics-selector参数特定于此路由器,其他事物的工作方式与其他路由器相同。

同样类型的路由器也可以在代码中定义:

int totalInstances = 100;
Iterable<String> routeesPaths = Arrays.asList("/user/factorialBackend", "");
boolean allowLocalRoutees = true;
Set<String> useRoles = new HashSet<>(Arrays.asList("backend"));
ActorRef backend =getContext().actorOf(new ClusterRouterGroup(new AdaptiveLoadBalancingGroup(HeapMetricsSelector.getInstance(), Collections.<String>emptyList()),new ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRoles)).props(),"factorialBackendRouter2");int totalInstances = 100;
int maxInstancesPerNode = 3;
boolean allowLocalRoutees = false;
Set<String> useRoles = new HashSet<>(Arrays.asList("backend"));
ActorRef backend =getContext().actorOf(new ClusterRouterPool(new AdaptiveLoadBalancingPool(SystemLoadAverageMetricsSelector.getInstance(), 0),new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles)).props(Props.create(FactorialBackend.class)),"factorialBackendRouter3");

运行自适应负载平衡示例最简单的方法下载「Akka Cluster Sample with Java」中的代码和教程。它包含有关如何运行自适应负载平衡示例的说明,此示例的源代码也可以在「 Akka Samples Repository」中找到。

订阅指标事件

可以直接订阅指标事件来实现其他功能。

import akka.actor.AbstractActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.metrics.ClusterMetricsChanged;
import akka.cluster.metrics.NodeMetrics;
import akka.cluster.metrics.StandardMetrics;
import akka.cluster.metrics.StandardMetrics.HeapMemory;
import akka.cluster.metrics.StandardMetrics.Cpu;
import akka.cluster.metrics.ClusterMetricsExtension;
import akka.event.Logging;
import akka.event.LoggingAdapter;public class MetricsListener extends AbstractActor {LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);Cluster cluster = Cluster.get(getContext().getSystem());ClusterMetricsExtension extension = ClusterMetricsExtension.get(getContext().getSystem());// Subscribe unto ClusterMetricsEvent events.@Overridepublic void preStart() {extension.subscribe(getSelf());}// Unsubscribe from ClusterMetricsEvent events.@Overridepublic void postStop() {extension.unsubscribe(getSelf());}@Overridepublic Receive createReceive() {return receiveBuilder().match(ClusterMetricsChanged.class,clusterMetrics -> {for (NodeMetrics nodeMetrics : clusterMetrics.getNodeMetrics()) {if (nodeMetrics.address().equals(cluster.selfAddress())) {logHeap(nodeMetrics);logCpu(nodeMetrics);}}}).match(CurrentClusterState.class,message -> {// Ignore.}).build();}void logHeap(NodeMetrics nodeMetrics) {HeapMemory heap = StandardMetrics.extractHeapMemory(nodeMetrics);if (heap != null) {log.info("Used heap: {} MB", ((double) heap.used()) / 1024 / 1024);}}void logCpu(NodeMetrics nodeMetrics) {Cpu cpu = StandardMetrics.extractCpu(nodeMetrics);if (cpu != null && cpu.systemLoadAverage().isDefined()) {log.info("Load: {} ({} processors)", cpu.systemLoadAverage().get(), cpu.processors());}}
}

自定义指标收集器

指标集合委托给akka.cluster.metrics.MetricsCollector的实现

你也可以插入自己的指标收集器,而不是内置的akka.cluster.metrics.SigarMetricsCollectorakka.cluster.metrics.JmxMetricsCollector

看看这两个实现的灵感。

自定义指标收集器实现类必须在akka.cluster.metrics.collector.provider配置属性中指定。

配置

可以使用以下属性配置群集指标扩展:

##############################################
# Akka Cluster Metrics Reference Config File #
############################################### This is the reference config file that contains all the default settings.
# Make your edits in your application.conf in order to override these settings.# Sigar provisioning:
#
#  User can provision sigar classes and native library in one of the following ways:
#
#  1) Use https://github.com/kamon-io/sigar-loader Kamon sigar-loader as a project dependency for the user project.
#  Metrics extension will extract and load sigar library on demand with help of Kamon sigar provisioner.
#
#  2) Use https://github.com/kamon-io/sigar-loader Kamon sigar-loader as java agent: `java -javaagent:/path/to/sigar-loader.jar`
#  Kamon sigar loader agent will extract and load sigar library during JVM start.
#
#  3) Place `sigar.jar` on the `classpath` and sigar native library for the o/s on the `java.library.path`
#  User is required to manage both project dependency and library deployment manually.# Cluster metrics extension.
# Provides periodic statistics collection and publication throughout the cluster.
akka.cluster.metrics {# Full path of dispatcher configuration key.# Use "" for default key `akka.actor.default-dispatcher`.dispatcher = ""# How long should any actor wait before starting the periodic tasks.periodic-tasks-initial-delay = 1s# Sigar native library extract location.# Use per-application-instance scoped location, such as program working directory.native-library-extract-folder = ${user.dir}"/native"# Metrics supervisor actor.supervisor {# Actor name. Example name space: /system/cluster-metricsname = "cluster-metrics"# Supervision strategy.strategy {## FQCN of class providing `akka.actor.SupervisorStrategy`.# Must have a constructor with signature `<init>(com.typesafe.config.Config)`.# Default metrics strategy provider is a configurable extension of `OneForOneStrategy`.provider = "akka.cluster.metrics.ClusterMetricsStrategy"## Configuration of the default strategy provider.# Replace with custom settings when overriding the provider.configuration = {# Log restart attempts.loggingEnabled = true# Child actor restart-on-failure window.withinTimeRange = 3s# Maximum number of restart attempts before child actor is stopped.maxNrOfRetries = 3}}}# Metrics collector actor.collector {# Enable or disable metrics collector for load-balancing nodes.# Metrics collection can also be controlled at runtime by sending control messages# to /system/cluster-metrics actor: `akka.cluster.metrics.{CollectionStartMessage,CollectionStopMessage}`enabled = on# FQCN of the metrics collector implementation.# It must implement `akka.cluster.metrics.MetricsCollector` and# have public constructor with akka.actor.ActorSystem parameter.# Will try to load in the following order of priority:# 1) configured custom collector 2) internal `SigarMetricsCollector` 3) internal `JmxMetricsCollector`provider = ""# Try all 3 available collector providers, or else fail on the configured custom collector provider.fallback = true# How often metrics are sampled on a node.# Shorter interval will collect the metrics more often.# Also controls frequency of the metrics publication to the node system event bus.sample-interval = 3s# How often a node publishes metrics information to the other nodes in the cluster.# Shorter interval will publish the metrics gossip more often.gossip-interval = 3s# How quickly the exponential weighting of past data is decayed compared to# new data. Set lower to increase the bias toward newer values.# The relevance of each data sample is halved for every passing half-life# duration, i.e. after 4 times the half-life, a data sample’s relevance is# reduced to 6% of its original relevance. The initial relevance of a data# sample is given by 1 – 0.5 ^ (collect-interval / half-life).moving-average-half-life = 12s}
}# Cluster metrics extension serializers and routers.
akka.actor {# Protobuf serializer for remote cluster metrics messages.serializers {akka-cluster-metrics = "akka.cluster.metrics.protobuf.MessageSerializer"}# Interface binding for remote cluster metrics messages.serialization-bindings {"akka.cluster.metrics.ClusterMetricsMessage" = akka-cluster-metrics"akka.cluster.metrics.AdaptiveLoadBalancingPool" = akka-cluster-metrics"akka.cluster.metrics.MixMetricsSelector" = akka-cluster-metrics"akka.cluster.metrics.CpuMetricsSelector$" = akka-cluster-metrics"akka.cluster.metrics.HeapMetricsSelector$" = akka-cluster-metrics"akka.cluster.metrics.SystemLoadAverageMetricsSelector$" = akka-cluster-metrics}# Globally unique metrics extension serializer identifier.serialization-identifiers {"akka.cluster.metrics.protobuf.MessageSerializer" = 10}#  Provide routing of messages based on cluster metrics.router.type-mapping {cluster-metrics-adaptive-pool  = "akka.cluster.metrics.AdaptiveLoadBalancingPool"cluster-metrics-adaptive-group = "akka.cluster.metrics.AdaptiveLoadBalancingGroup"}
}

英文原文链接:Cluster Metrics Extension.


———— ☆☆☆ —— 返回 -> Akka 中文指南 <- 目录 —— ☆☆☆ ————

Akka 指南 之「集群指标扩展」相关推荐

  1. Akka 指南 之「集群客户端」

    温馨提示:Akka 中文指南的 GitHub 地址为「akka-guide」,欢迎大家Star.Fork,纠错. 文章目录 集群客户端 依赖 简介 一个示例 ClusterClientReceptio ...

  2. 【Redis集群专题】「集群技术三部曲」介绍一下常用的Redis集群机制方案的原理和指南(入门篇)

    集群化的方案 Redis的Sentinel解决了主从复制故障不能自动迁移的问题,但是主节点的写性能和存储能力依然是受到了Redis单机容量有限的限制,所以使用Redis集群去解决这个问题,将Redis ...

  3. Elasticsearch 6.6 官方文档 之「集群」

    文章目录 集群 集群等级分片分配 分片分配设置 分片重新平衡设置 分片平衡探索 基于磁盘的分片分配 分片分配意识 强制感知 分片分配过滤 其他群集设置 元数据 集群分片限制 用户定义的群集元数据 索引 ...

  4. Akka 指南 之「持久化」

    温馨提示:Akka 中文指南的 GitHub 地址为「akka-guide」,欢迎大家Star.Fork,纠错. 文章目录 持久化 依赖 示例项目 简介 体系结构 事件源 标识符 恢复 恢复自定义 恢 ...

  5. Airbnb 如何实现 Kubernetes 集群动态扩展

    运营 Airbnb 基础设施的一项重要工作是,确保我们的云开支随着需求自动增长和下降.我们的流量每天波动很大,为此,我们的云资源占用应该能够动态扩展. 为了实现这种扩展,Airbnb 利用了 Kube ...

  6. C#下实现的K-Means优化[1]-「离群点检测」

    转自:http://www.cnblogs.com/lzxwalex/p/7745915.html #本文PDF版下载 C#下实现的K-Means优化[1]-「离群点检测」 前言 在上一篇博文中,我和 ...

  7. Akka 指南 之「跨多个数据中心集群」

    温馨提示:Akka 中文指南的 GitHub 地址为「akka-guide」,欢迎大家Star.Fork,纠错. 文章目录 跨多个数据中心集群 动机 定义数据中心 成员关系 故障检测 集群单例 集群分 ...

  8. Akka 指南 之「消息传递可靠性」

    温馨提示:Akka 中文指南的 GitHub 地址为「akka-guide」,欢迎大家Star.Fork,纠错. 文章目录 消息传递可靠性 一般规则 讨论:"至多一次"是什么意思? ...

  9. Akka 指南 之「Akka 简介」

    温馨提示:Akka 中文指南的 GitHub 地址为「akka-guide」,欢迎大家Star.Fork,纠错. Akka 简介 欢迎来到 Akka,它是一组用于设计跨越处理器和网络的可扩展.弹性系统 ...

最新文章

  1. python线上编辑问题_大型线上项目中动态语言诸多问题之二 -- 以 Python 语言为例...
  2. linux服务器后台运行程序
  3. 用nice等命令设定的进程优先级有什么确切的含义?
  4. 使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)--转载...
  5. 特征向量按照特征值大小进行排序
  6. get_metrology_object_result 获取计量模型的测量结果
  7. eclipse中将一个项目作为library导入另一个项目中
  8. 基于ASA防火墙的SSL ×××配置
  9. Java得到请求的IP地址
  10. 1.10 编程基础之简单排序 06 整数奇偶排序 python
  11. 大动作!华为海思注册资本从6亿增加到20亿
  12. php与web页面交互
  13. RecyclerView之自定义LayoutManager和SnapHelper
  14. sql server2016导入excel错误
  15. 最新影视双端直播盒子源码,自定义资源采集
  16. InnoSetup 制作安装包进阶版
  17. 我的Latex中文报告模板
  18. html导航栏位置垂直居中,导航栏水平+垂直居中
  19. safari windows版本_iPhone和Windows同步文件,这7个技巧让你的效率成倍增加
  20. 嵌入式开发语言-C语言编程

热门文章

  1. 译——冠状病毒:这是全球离岸测试模型的终结吗?
  2. matlab工具箱计算最小生成树_matlab_bgl 图论基本函数库, 相当丰富.包含工具箱、 、Dijkstra最短路径、 254万源代码下载- www.pudn.com...
  3. ACM_总论_杂文_测试你的智力有多高
  4. 【训练题】航线设计 | 使用最长上升子序列(LIS)长度的O(nlogn)算法优化
  5. java定时14点30分_单机定时任务的四种基本实现方式
  6. 多项式时间 (Polynomial time)
  7. 命令行查看office激活期限
  8. ORACLE VARCHAR2最大长度是4000?还是32767?
  9. ap计算机科学a 5分,新鲜出炉2019年AP考试5分率分析,尝鲜版来啦~~
  10. Geogebra中对图片和文本进行旋转