说完了polling agent,咱们接着说notification agent,打开setup.cfg文件,找到入口 ceilometer-agent-notification = ceilometer.cmd.agent_notification:main,打开文件,发现启动了NotificationService服务,然后看start方法,核心代码如下

[mw_shl_code=applescript,true]self.listeners, self.pipeline_listeners = [], []

self._configure_main_queue_listeners(transporter, event_transporter)[/mw_shl_code]

意思是配置主消息队列监听器(主消息队列是指exchange为nova,cinder,neutron等的消息队列),来到该方法下,

[mw_shl_code=applescript,true]notification_manager = self._get_notifications_manager(transporter)

for ext in notification_manager:

handler = ext.obj

for new_tar in handler.get_targets(cfg.CONF):

if new_tar not in targets:

targets.append(new_tar)

endpoints.append(handler)

urls = cfg.CONF.notification.messaging_urls or [None]

for url in urls:

transport = messaging.get_transport(url)

listener = messaging.get_notification_listener(

transport, targets, endpoints)

listener.start()

self.listeners.append(listener)

[/mw_shl_code]

首先调用命名空间是ceilometer.notification的plugins(还在setup.cfg中),然后获取相关的endpoins和targets,最后启动监听器,最重要的是启动了监听器

我们不断找监听器的代码,最后发现是在oslo_messaging中的server.py文件中,看他的start方法

[mw_shl_code=applescript,true]if self._executor is not None:

return

try:

listener = self.dispatcher._listen(self.transport)

except driver_base.TransportDriverError as ex:

raise ServerListenError(self.target, ex)

self._executor = self._executor_cls(self.conf, listener,

self.dispatcher)

self._executor.start()[/mw_shl_code]

这里还是用到了plugin,命名空间是oslo.messaging.executors,调用执行器执行代码,我们debug获得plugin是哪个,结果是eventlet,找到他的代码,

文件在/usr/lib/python2.7/site-packages/oslo_messaging/_executors/impl_eventlet.py中,看他的start方法,核心如下

[mw_shl_code=applescript,true]@excutils.forever_retry_uncaught_exceptions

def _executor_thread():

try:

while self._running:

incoming = self.listener.poll()

if incoming is not None:

self._dispatch(incoming)

except greenlet.GreenletExit:

return[/mw_shl_code]

意思就是不断调用listener的poll方法获得incoming,(意思就是不断获取消息队列上的消息),如果有,就执行self._dispatch(incoming)

我们找到listener,

[mw_shl_code=applescript,true]listener = AMQPListener(self, conn)

for target, priority in targets_and_priorities:

conn.declare_topic_consumer(

exchange_name=self._get_exchange(target),

topic='%s.%s' % (target.topic, priority),

callback=listener, queue_name=pool)

return listener[/mw_shl_code]

可以看出其实就是设置了许多消费者,监听消息队列上的信息,exchange_name就是nova,cinder之类的,topic是notification,priority类似与info之类

设置这样的消费者就监听了主消息队列上的消息,如果获得了消息(比如某事件触发来了compute.instance.update),然后执行self._dispatch(incoming),我们找到该方法,

[mw_shl_code=applescript,true]for screen, callback in self._callbacks_by_priority.get(priority, []):

if screen and not screen.match(ctxt, publisher_id, event_type,

metadata, payload):

continue

localcontext.set_local_context(ctxt)

try:

if executor_callback:

ret = executor_callback(callback, ctxt, publisher_id,

event_type, payload, metadata)

else:

ret = callback(ctxt, publisher_id, event_type, payload,

metadata)

ret = NotificationResult.HANDLED if ret is None else ret

if self.allow_requeue and ret == NotificationResult.REQUEUE:

return ret

finally:

localcontext.clear_local_context()

return NotificationResult.HANDLED[/mw_shl_code]

_callbacks_by_priority是filter_rule和endpoint的方法的dict,首先判断是否匹配(事件类型之类的),找到某个endpoint的方法,执行它即可。该回调方法的作用一般是publish消息。

以compute的notification中的instance.py的Memory为例子,回调方法执行其父类NotificationBase的info方法,

[mw_shl_code=applescript,true]notification = messaging.convert_to_old_notification_format(

'info', ctxt, publisher_id, event_type, payload, metadata)

self.to_samples_and_publish(context.get_admin_context(), notification)[/mw_shl_code]

将消息发布出去即可,但是在发布过程中,需要调用Memory的get_sample方法返回处理过的消息。重要的是,发布消息的途径还是在pipeline中定义的,如果消息匹配到了相应的meter,就会按照pipeline中相应的sink发布消息。

notification源码分析_Ceilometer之notification agent代码分析相关推荐

  1. 卖家工具箱源码_我的测试和代码分析工具箱

    卖家工具箱源码 上周,我们在LINEAS成立了一个"测试技能小组",该小组用于交换有关测试的知识. 各种各样的问题反复出现的一个问题是:有哪些工具可以测试和分析代码? 因此,这是我 ...

  2. vn.py源码解读(六、主引擎代码分析---策略模块)

    之前在讲MainEngine的时候,有这样一个代码: me.addApp(ctaStrategy) 这里,我们来看一下MainEngine里面这个addApp函数的代码: def addApp(sel ...

  3. vn.py源码解读(五、主引擎代码分析----CTP模块)

    上一篇文章讲了MainEngine中的初始化函数,重点是DataEngine的讲解.有了对行情数据的处理,还需要有行情数据的来源.在MainEngine的初始化函数后面的一个函数就是addGatewa ...

  4. android 特殊用户通知用法汇总--Notification源码分析

    一直用的android手机,用过这么多的app,平时也会遇到有趣的通知提醒,在这里先总结两种吧,notification和图标数字,有的以后看到再研究.还有,推广一下哈,刚刚建立一个Q群5446459 ...

  5. 【Android 10 源码】healthd 模块 HAL 2.0 分析

    Android 9 引入了从 health@1.0 HAL 升级的主要版本 android.hardware.health HAL 2.0.这一新 HAL 具有以下优势: 框架代码和供应商代码之间的区 ...

  6. vue+django 微博舆情系统源码、深度学习+舆情扩散消失分析、舆情紧急等级、属地分析、按话题、情感预测、话题评论获取、提取观点、正面负面舆情、按区域检测舆情

    项目背景 315又马上要到了,现在有开始对食品安全话题的关注地提升了,因此,本文系统对微博的食品安全话题进行分析,有如下的功能 1.展示当前食品安全事件相关的热点信息以及提供根据食品关键词,食品安全类 ...

  7. Postgresql源码(82)SPI模块拆解分析一:执行简单SQL获取结果

    相关 <Postgresql源码(76)执行器专用元组格式TupleTableSlot> <Postgresql源码(82)SPI模块拆解分析一:执行简单SQL获取结果> &l ...

  8. 抖音seo优化源码搭建/搜索排名系统,技术理论分析搭建中。

    抖音seo系统源码SaaS+源码私有化部署搭建,抖音seo源码,抖音seo系统源码,抖音seo系统搭建部署,抖音已经成为了当今最为流行的短视频平台之一,拥有着庞大的用户群体和海量的视频资源.对于一些商 ...

  9. [附源码]计算机毕业设计JAVA学生考试成绩分析系统

    [附源码]计算机毕业设计JAVA学生考试成绩分析系统 项目运行 环境配置: Jdk1.8 + Tomcat7.0 + Mysql + HBuilderX(Webstorm也行)+ Eclispe(In ...

  10. Postgresql源码(69)常规锁细节分析

    相关: <Postgresql源码(40)Latch的原理分析和应用场景> <Postgresql源码(67)LWLock锁的内存结构与初始化> <Postgresql源 ...

最新文章

  1. VS2017中使用码云上传项目以及问题汇总
  2. android主题编辑器,使用 Theme Editor 设计应用主题背景
  3. JetBrains CLion C++ IDE连接wsl2(Ubuntu)时,报错“Unable to establish SSL connection“解决方案
  4. linux7安装pgsql数据库,CentOS7下PostgreSQL安装
  5. which kinds of error message will prevent business transaction save
  6. 零基础逆向工程24_C++_01_类_this指针_继承本质_多层继承
  7. 验证身份证合法性的js
  8. 2018年AI如何发展?普华永道做出了8点预测 | 报告下载
  9. java webengine_webview – JavaFX 8 WebEngine:如何从java到console.log()从java到System.out?
  10. uos系统虚拟机_UOS开箱体验
  11. Unity使用HDR做天空盒
  12. 生信过程中的各种文件格式
  13. 手机支付宝服务器安全证书安装不了,手机上如何安装支付宝的安全证书?
  14. 读书——我本将心向明月,奈何明月照沟渠。知我者谓我心忧,不知我者谓我何求。...
  15. PeopleSoft开发:创建查询QUERY
  16. 深度学习中的Lipschitz约束:泛化与生成模型
  17. 微商怎么找客源,新手做微商如何找客源的
  18. 嵩天老师python爬虫笔记整理week3
  19. 在信用证支付的情况下,空运单可否作成以银行为收货人?在此种情况下可否起到约束进口方付款的作用?...
  20. Android11 SystemUI启动流程源码分析(一)——SystemUIApplication的创建

热门文章

  1. 怎么查看ip地址下的php文件夹,pe下查看原系统ip的方法
  2. ajax参数是json数据类型,如何保护$ .ajax数据类型:json Post参数
  3. 计算机里的东西太多,电脑里的东西太多,怎么样清理一下啊
  4. html前端页面的基本骨架是,web前端入门到实战:css实现的骨架屏方案
  5. apex英雄机器人探路者怎么玩_Apex英雄探路者机器人实战技巧攻略[多图]
  6. TIM提示“个人文件夹被占用,请稍候再登录”怎么解决
  7. 谷歌Chrome浏览器欲推门户网站聚合 正测试新“探索”页面
  8. Java核心类库篇5——异常
  9. shiro框架,自定义realm注入service失败解决办法
  10. 解决springBoot 的templates中html引入css文件失败