前言

在《dubbo之网络通讯》中完成 Protocol 服务化。监控,统计将成首要任务。Dubbo引入ExporterListener,Filter两大概念。

ExporterListener

dubbo通过《dubbo之SPI》wraper的方式进行注入ProtocolListenerWrapper

相当于 new ProtocolListenerWrapper(new DubboProtocol())

// 主要对export,refer两方法进行代理,然后通知监听者
public class ProtocolListenerWrapper implements Protocol {@Overridepublic <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {if (UrlUtils.isRegistry(invoker.getUrl())) {return protocol.export(invoker);}// 在spi容器中查找Activate注解符合当前url条件的ExporterListener的实现类return new ListenerExporterWrapper<T>(protocol.export(invoker),Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class).getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));}@Overridepublic <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {if (UrlUtils.isRegistry(url)) {return protocol.refer(type, url);}return new ListenerInvokerWrapper<T>(protocol.refer(type, url),Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(InvokerListener.class).getActivateExtension(url, INVOKER_LISTENER_KEY)));}
}

关于Activate注解

  • group: 所属组,String[],例如消费端、服务端。
  • value String[],如果指定该值,只有当消费者或服务提供者URL中包含属性名为value的键值对,该过滤器才处于激活状态。
  • before:String[],用于指定执行顺序,before指定的过滤器在该过滤器之前执行(弃用)。
  • after:string[],用于指定执行顺序,after指定的过滤器在该过滤器之后执行(弃用)。
  • order:用户指定顺序,值越小,越先执行。

Filter

同理注入ProtocolFilterWrapper

// 主要通过代理Invoker完成
public class ProtocolFilterWrapper implements Protocol {private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {Invoker<T> last = invoker;// 在 spi窗口中获取Activate注解符合当前url条件FilterList<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);if (!filters.isEmpty()) {for (int i = filters.size() - 1; i >= 0; i--) {final Filter filter = filters.get(i);final Invoker<T> next = last;last = new Invoker<T>() {// ...@Overridepublic Result invoke(Invocation invocation) throws RpcException {Result asyncResult;try {// 过滤调用asyncResult = filter.invoke(next, invocation);} catch (Exception e) {// 异常结果监听if (filter instanceof ListenableFilter) {ListenableFilter listenableFilter = ((ListenableFilter) filter);try {Filter.Listener listener = listenableFilter.listener(invocation);if (listener != null) {listener.onError(e, invoker, invocation);}} finally {listenableFilter.removeListener(invocation);}} else if (filter instanceof Filter.Listener) {Filter.Listener listener = (Filter.Listener) filter;listener.onError(e, invoker, invocation);}throw e;} finally {}return asyncResult.whenCompleteWithContext((r, t) -> {// 成功监听if (filter instanceof ListenableFilter) {ListenableFilter listenableFilter = ((ListenableFilter) filter);Filter.Listener listener = listenableFilter.listener(invocation);try {if (listener != null) {if (t == null) {listener.onResponse(r, invoker, invocation);} else {listener.onError(t, invoker, invocation);}}} finally {listenableFilter.removeListener(invocation);}} else if (filter instanceof Filter.Listener) {Filter.Listener listener = (Filter.Listener) filter;if (t == null) {listener.onResponse(r, invoker, invocation);} else {listener.onError(t, invoker, invocation);}}});}@Overridepublic void destroy() {invoker.destroy();}@Overridepublic String toString() {return invoker.toString();}};}}return last;}
}

系统默认的Filter在/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter文件中定义

cache=org.apache.dubbo.cache.filter.CacheFilter
validation=org.apache.dubbo.validation.filter.ValidationFilter
echo=org.apache.dubbo.rpc.filter.EchoFilter
generic=org.apache.dubbo.rpc.filter.GenericFilter
genericimpl=org.apache.dubbo.rpc.filter.GenericImplFilter
token=org.apache.dubbo.rpc.filter.TokenFilter
// 日志
accesslog=org.apache.dubbo.rpc.filter.AccessLogFilter
activelimit=org.apache.dubbo.rpc.filter.ActiveLimitFilter
classloader=org.apache.dubbo.rpc.filter.ClassLoaderFilter
context=org.apache.dubbo.rpc.filter.ContextFilter
consumercontext=org.apache.dubbo.rpc.filter.ConsumerContextFilter
exception=org.apache.dubbo.rpc.filter.ExceptionFilter
executelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=org.apache.dubbo.rpc.filter.DeprecatedFilter
compatible=org.apache.dubbo.rpc.filter.CompatibleFilter
timeout=org.apache.dubbo.rpc.filter.TimeoutFilter
tps=org.apache.dubbo.rpc.filter.TpsLimitFilter
// 链路
trace=org.apache.dubbo.rpc.protocol.dubbo.filter.TraceFilter
future=org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter
// 监控
monitor=org.apache.dubbo.monitor.support.MonitorFiltermetrics=org.apache.dubbo.monitor.dubbo.MetricsFilter

Dubbo监控中心实现原理

声明MonitorFilter

// 表示支持提供者和消费者
@Activate(group = {Constants.PROVIDER, Constants.CONSUMER})
public class MonitorFilter implements Filter {@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {// 如果url中存在monitor,则设置了监控中心,收集调用信息if (invoker.getUrl().hasParameter(MONITOR_KEY)) {invocation.put(MONITOR_FILTER_START_TIME, System.currentTimeMillis());// 相关计数getConcurrent(invoker, invocation).incrementAndGet(); }return invoker.invoke(invocation); // proceed invocation chain}// concurrent counterprivate AtomicInteger getConcurrent(Invoker<?> invoker, Invocation invocation) {String key = invoker.getInterface().getName() + "." + invocation.getMethodName();return concurrents.computeIfAbsent(key, k -> new AtomicInteger());}// 成功时@Overridepublic void onResponse(Result result, Invoker<?> invoker, Invocation invocation) {if (invoker.getUrl().hasParameter(MONITOR_KEY)) {// 收集成功数据collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), (long) invocation.get(MONITOR_FILTER_START_TIME), false);// 计数减一getConcurrent(invoker, invocation).decrementAndGet(); // count down}}// 失败时@Overridepublic void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {if (invoker.getUrl().hasParameter(MONITOR_KEY)) {// 收集失败数据collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), (long) invocation.get(MONITOR_FILTER_START_TIME), true);getConcurrent(invoker, invocation).decrementAndGet(); // count down}}private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {try {URL monitorUrl = invoker.getUrl().getUrlParameter(MONITOR_KEY);// monitorFactory是从spi容器中创建。默认是发个监控中心Monitor monitor = monitorFactory.getMonitor(monitorUrl);if (monitor == null) {return;}// 统计数据URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error);// 发送monitor.collect(statisticsURL);} catch (Throwable t) {logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);}}}

以下根据MonitorFactory扩展点对Monitor的Demo实现

// 增加spi扩展配置
// META-INF/services/org.apache.dubbo.monitor.MonitorFactory
// dubbo=dubbo.test.MonitorTest
public class MonitorTest implements MonitorFactory {public static void main(String[] args) throws Exception {ServiceRepository repository = ApplicationModel.getServiceRepository();ServiceDescriptor serviceDescriptor = repository.registerService(GreetingsService.class);MethodDescriptor methodDescriptor = serviceDescriptor.getMethod("sayHi", new Class[]{ String.class});Method method = methodDescriptor.getMethod();String serviceName =  serviceDescriptor.getServiceName();URL url = URL.valueOf("dubbo://127.0.0.1:28092/"+serviceName+"?timeout=12000&monitor=mm");Protocol protocol = new ProtocolFilterWrapper(new DubboProtocol());protocol.export(new Invoker<GreetingsService>() {// ..@Overridepublic Result invoke(Invocation invocation) throws RpcException {System.out.println("yoyoy");return AsyncRpcResult.newDefaultAsyncResult(invocation);}});Invoker invoker = protocol.refer(GreetingsService.class, url);invoker.invoke(new RpcInvocation(method, serviceName, new String[]{"yoyo"}));Thread.sleep(60000);}@Overridepublic Monitor getMonitor(URL url) {return new Monitor() {@Overridepublic void collect(URL statistics) {System.out.println("statistics---------" + statistics.toString());}// ..};}
}

dubbo之Filter监控拦截相关推荐

  1. dubbo 使用 filter 报错解决

    dubbo可以用filter实现类似tomcat filter过滤器. 实现1.接口请求时间监控. 2.打印输入输出日志(输出日志有应用自己决定) 配置时出现报错. No such extension ...

  2. Dubbo之Filter链原理

    本文来说下Dubbo的Filter链原理 文章目录 概述 构造Filter链 Consumer ConsumerContextFilter ActiveLimitFilter FutureFilter ...

  3. spring过滤器Filter 、 拦截器Interceptor 、 切片Aspect 详解

    springboot 过滤器Filter vs 拦截器Interceptor vs 切片Aspect 详解 1 前言 最近接触到了过滤器和拦截器,网上查了查资料,这里记录一下,这篇文章就来仔细剖析下过 ...

  4. 聊聊dubbo的Filter

    序 本文主要研究一下dubbo的Filter Filter dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc ...

  5. Asp.net MVC Filter监控页面性能和运行时间

    本篇文章作用说明: Asp.net MVC Filter监控View实例,监控每个View页面加载的时间,跟踪分析每个页面的加载性能,然后做进一步优化: 问题背景 最近,客户一直反馈系统使用慢,有时候 ...

  6. Dubbo中的监控和管理

    一.Dubbo中的监控 1.原理 原理:服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心. 2.搭建监控服务 3.修改配置文件 修改注册中心的地址: 注意:这个 ...

  7. Filter过滤器拦截路径配置

    在Filter中拦截路径配置有四种形式 具体资源路径拦截:@WebFilter("/index.jsp") //这是指访问index.jsp的时候会经过过滤器 具体目录拦截:@We ...

  8. (十六)ATP应用测试平台——java应用中的过滤器Filter、拦截器Interceptor、参数解析器Resolver、Aop切面,你会了吗?

    前言 过滤器Filter.拦截器Interceptor.参数解析器Resolver.Aop切面是我们应用开发中经常使用到的技术,到底该如何使用这些web附属功能, 本小节我们就分别介绍一下其各自的用法 ...

  9. java拦截器放行_Java Filter(拦截器)

    多个Filter按照在配置文件中配置的filter顺序执行. 在web.xml文件中配置该Filter,使用init-param元素为该Filter配置参数,init-param可接受如下两个子元素: ...

最新文章

  1. 二叉树镜像,交换左右子树
  2. 谷歌正式开源Model Search!自动优化并识别AI模型,最佳模版唾手可得
  3. ACM图灵奖获得者:想从大数据中获益,先解决集成问题!
  4. jquery ajax(实现单独提交某个form)
  5. 一般向量空间的基变换_从希尔伯特空间的角度看线性变换的一般思想和问题
  6. java设计模式_工厂方法
  7. CompletableFuture并行异步处理类使用示例
  8. C语言实现Hanoi算法塔的功能(附完整源码)
  9. 教程|YOLOX目标检测ncnn实现
  10. 执行pip安装的程序:command not found
  11. Springboot项目与vue项目整合打包
  12. Echarts数据可视化series-graph关系图,开发全解+完美注释
  13. session 的工作原理以及使用细节和url编码
  14. C/C++编程学习 - 第3周 ⑥ 温度表达转化
  15. Burp Suite使用介绍
  16. 上学期间你收到最感动的小纸条是什么?
  17. Python 二次指数平滑法 预测
  18. 机器学习项目汇总,值得收藏!
  19. 古代地图的那些趣事儿
  20. 如何用C语言实现【爱心代码】

热门文章

  1. 电脑用着用着突然黑屏怎么处理
  2. 微信客服对接-唯一客服系统文档中心
  3. mybatis主从表关联查询,返回对象带有集合属性
  4. 敏捷开发中如何开好站立会议
  5. 敏捷开发第二阶段每日站立会议(五)
  6. 故宫文创产品一年卖出15亿元
  7. Centos 8 安装qq for linux
  8. Multiple commands produce “*.framework“ Command PhaseScriptExecution failed with a nonzero exit cod
  9. 还有类似的自动采集工具带发布的?
  10. java-Web(Jquery选择器)作业