1.代码入口

/usr/bin/neutron-l3-agent

import sysfrom neutron.cmd.eventlet.agents.l3 import mainif __name__ == "__main__":sys.exit(main())

neutron.cmd.eventlet.agent.l3:

from neutron.agent import l3_agentdef main():l3_agent.main()

2.创建Service

2.1

l3_agent.main():

def main(manager='neutron.agent.l3.agent.L3NATAgentWithStateReport'):register_opts(cfg.CONF)common_config.init(sys.argv[1:])config.setup_logging()config.setup_privsep()server = neutron_service.Service.create(binary='neutron-l3-agent',topic=topics.L3_AGENT,report_interval=cfg.CONF.AGENT.report_interval,manager=manager)service.launch(cfg.CONF, server).wait()

2.2

创建服务,其中服务的manager类为 neutron.agent.l3.agent.L3NATAgentWithStateReport

    @classmethoddef create(cls, host=None, binary=None, topic=None, manager=None,report_interval=None, periodic_interval=None,periodic_fuzzy_delay=None):""":param host: defaults to cfg.CONF.host:param binary: 可执行文件名:param topic:  'neutron-' :param manager: manager类名:param report_interval: 将agent状态上报给L3Plugin的间隔:param periodic_interval: 执行周期性同步任务的间隔:param periodic_fuzzy_delay: defaults to cfg.CONF.periodic_fuzzy_delay"""if not host:host = cfg.CONF.hostif not binary:binary = os.path.basename(inspect.stack()[-1][1])if not topic:topic = binary.rpartition('neutron-')[2]topic = topic.replace("-", "_")if not manager:manager = cfg.CONF.get('%s_manager' % topic, None)if report_interval is None:report_interval = cfg.CONF.report_intervalif periodic_interval is None:periodic_interval = cfg.CONF.periodic_intervalif periodic_fuzzy_delay is None:periodic_fuzzy_delay = cfg.CONF.periodic_fuzzy_delay# 在初始化的过程中加载一个manager对象service_obj = cls(host, binary, topic, manager,report_interval=report_interval,periodic_interval=periodic_interval,periodic_fuzzy_delay=periodic_fuzzy_delay)return service_obj

2.3加载manager对象 neutron.agent.l3.agent.L3NATAgentWithStateReport

class L3NATAgentWithStateReport(L3NATAgent):def __init__(self, host, conf=None):# 2.4下面这行加载一堆插件super(L3NATAgentWithStateReport, self).__init__(host=host, conf=conf)self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)self.agent_state = {'binary': 'neutron-l3-agent','host': host,'availability_zone': self.conf.AGENT.availability_zone,'topic': topics.L3_AGENT,'configurations': {'agent_mode': self.conf.agent_mode,'handle_internal_only_routers':self.conf.handle_internal_only_routers,'external_network_bridge': self.conf.external_network_bridge,'gateway_external_network_id':self.conf.gateway_external_network_id,'interface_driver': self.conf.interface_driver,'log_agent_heartbeats': self.conf.AGENT.log_agent_heartbeats},'start_flag': True,'agent_type': lib_const.AGENT_TYPE_L3}report_interval = self.conf.AGENT.report_interval# 2.5将状态报告定时任务加入心跳报告中if report_interval:self.heartbeat = loopingcall.FixedIntervalLoopingCall(self._report_state)self.heartbeat.start(interval=report_interval)

2.4加载ProcessMonitor, interface_driver,namespace_manager,metadata_driver,以及PrefixDelegation,这一步是L3NATAgent中,具体是在上面注释的那一行中

2.5 将状态报告定时任务加入心跳报告中

3.启动Service

def main(manager='neutron.agent.l3.agent.L3NATAgentWithStateReport'):register_opts(cfg.CONF)common_config.init(sys.argv[1:])config.setup_logging()config.setup_privsep()server = neutron_service.Service.create(binary='neutron-l3-agent',topic=topics.L3_AGENT,report_interval=cfg.CONF.AGENT.report_interval,manager=manager)# 这一行将启动service与等待异常退出放在一起了service.launch(cfg.CONF, server).wait()

service.lauch:

def launch(conf, service, workers=1, restart_method='reload'):if workers is not None and workers <= 0:raise ValueError(_("Number of workers should be positive!"))if workers is None or workers == 1:launcher = ServiceLauncher(conf, restart_method=restart_method)else:launcher = ProcessLauncher(conf, restart_method=restart_method)launcher.launch_service(service, workers=workers)return launcher

最重要的一句是

launcher.launch_service(service, workers=workers)

    def launch_service(self, service, workers=1):if workers is not None and workers != 1:raise ValueError(_("Launcher asked to start multiple workers"))_check_service_base(service)service.backdoor_port = self.backdoor_portself.services.add(service)

self.services.add(service):

    def add(self, service):"""Add a service to a list and create a thread to run it.:param service: service to run"""self.services.append(service)self.tg.add_thread(self.run_service, service, self.done)

self.tg.add_thread:

    def add_thread(self, callback, *args, **kwargs):gt = self.pool.spawn(callback, *args, **kwargs)th = Thread(gt, self, link=False)self.threads.append(th)gt.link(_on_thread_done, self, th)return th

这一行把self.run_service加到了后台绿色线程池里面

self.run_service:

    @staticmethoddef run_service(service, done):"""Service start wrapper.:param service: service to run:param done: event to wait on until a shutdown is triggered:returns: None"""try:service.start()except Exception:LOG.exception('Error starting thread.')raise SystemExit(1)else:done.wait()

service.start,将report_state与periodic_tasks加入定时任务,然后执行after_start

    def start(self):self.manager.init_host()super(Service, self).start()if self.report_interval:pulse = loopingcall.FixedIntervalLoopingCall(self.report_state)pulse.start(interval=self.report_interval,initial_delay=self.report_interval)self.timers.append(pulse)if self.periodic_interval:if self.periodic_fuzzy_delay:initial_delay = random.randint(0, self.periodic_fuzzy_delay)else:initial_delay = Noneperiodic = loopingcall.FixedIntervalLoopingCall(self.periodic_tasks)periodic.start(interval=self.periodic_interval,initial_delay=initial_delay)self.timers.append(periodic)self.manager.after_start()

after_start:after_start将process_routers_loop加入了绿色线程池

    def after_start(self):eventlet.spawn_n(self._process_routers_loop)LOG.info("L3 agent started")# Do the report state before we do the first full sync.self._report_state()self.pd.after_start()

self._process_routers_loop:

    def _process_routers_loop(self):LOG.debug("Starting _process_routers_loop")pool = eventlet.GreenPool(size=8)while True:pool.spawn_n(self._process_router_update)

self._process_router_update,这个函数是用来在节点上实际更新路由信息的

    def _process_router_update(self):for rp, update in self._queue.each_update_to_next_router():LOG.debug("Starting router update for %s, action %s, priority %s",update.id, update.action, update.priority)if update.action == queue.PD_UPDATE:self.pd.process_prefix_update()LOG.debug("Finished a router update for %s", update.id)continuerouter = update.routerif update.action != queue.DELETE_ROUTER and not router:try:update.timestamp = timeutils.utcnow()routers = self.plugin_rpc.get_routers(self.context,[update.id])except Exception:msg = "Failed to fetch router information for '%s'"LOG.exception(msg, update.id)self._resync_router(update)continueif routers:router = routers[0]if not router:removed = self._safe_router_removed(update.id)if not removed:self._resync_router(update)else:rp.fetched_and_processed(update.timestamp)LOG.debug("Finished a router update for %s", update.id)continue# 下面这段话是处理路由信息更新的try:self._process_router_if_compatible(router)except n_exc.RouterNotCompatibleWithAgent as e:log_verbose_exc(e.msg, router)# Was the router previously handled by this agent?if router['id'] in self.router_info:LOG.error("Removing incompatible router '%s'",router['id'])self._safe_router_removed(router['id'])except Exception:log_verbose_exc("Failed to process compatible router: %s" % update.id,router)self._resync_router(update)continueLOG.debug("Finished a router update for %s", update.id)rp.fetched_and_processed(update.timestamp)

这段代码要处理self._queue里面的数据,self._queue里面的router数据是通过periodic_task轮询路由信息得到的

    @periodic_task.periodic_task(spacing=1, run_immediately=True)def periodic_sync_routers_task(self, context):if not self.fullsync:returnLOG.debug("Starting fullsync periodic_sync_routers_task")try:with self.namespaces_manager as ns_manager:self.fetch_and_sync_all_routers(context, ns_manager)except n_exc.AbortSyncRouters:self.fullsync = True

self.fetch_and_sync_all_routers,这段代码是将agent上的路由与plugin上的数据库中的路由信息同步

    def fetch_and_sync_all_routers(self, context, ns_manager):# 之前的routers:prev_router_ids = set(self.router_info)# 现在的routers:curr_router_ids = set()timestamp = timeutils.utcnow()router_ids = []chunk = []is_snat_agent = (self.conf.agent_mode ==lib_const.L3_AGENT_MODE_DVR_SNAT)try:router_ids = self.plugin_rpc.get_router_ids(context)for i in range(0, len(router_ids), self.sync_routers_chunk_size):chunk = router_ids[i:i + self.sync_routers_chunk_size]routers = self.plugin_rpc.get_routers(context, chunk)LOG.debug('Processing :%r', routers)for r in routers:curr_router_ids.add(r['id'])ns_manager.keep_router(r['id'])if r.get('distributed'):# need to keep fip namespaces as wellext_net_id = (r['external_gateway_info'] or {}).get('network_id')if ext_net_id:ns_manager.keep_ext_net(ext_net_id)elif is_snat_agent and not r.get('ha'):ns_manager.ensure_snat_cleanup(r['id'])update = queue.RouterUpdate(r['id'],queue.PRIORITY_SYNC_ROUTERS_TASK,router=r,timestamp=timestamp)self._queue.add(update)except oslo_messaging.MessagingTimeout:if self.sync_routers_chunk_size > SYNC_ROUTERS_MIN_CHUNK_SIZE:self.sync_routers_chunk_size = max(self.sync_routers_chunk_size / 2,SYNC_ROUTERS_MIN_CHUNK_SIZE)LOG.error('Server failed to return info for routers in ''required time, decreasing chunk size to: %s',self.sync_routers_chunk_size)else:LOG.error('Server failed to return info for routers in ''required time even with min chunk size: %s. ''It might be under very high load or ''just inoperable',self.sync_routers_chunk_size)raiseexcept oslo_messaging.MessagingException:failed_routers = chunk or router_idsLOG.exception("Failed synchronizing routers '%s' ""due to RPC error", failed_routers)raise n_exc.AbortSyncRouters()self.fullsync = FalseLOG.debug("periodic_sync_routers_task successfully completed")if self.sync_routers_chunk_size < SYNC_ROUTERS_MAX_CHUNK_SIZE:self.sync_routers_chunk_size = min(self.sync_routers_chunk_size + SYNC_ROUTERS_MIN_CHUNK_SIZE,SYNC_ROUTERS_MAX_CHUNK_SIZE)# 将变化后的router情况加入self._queuefor router_id in prev_router_ids - curr_router_ids:ns_manager.keep_router(router_id)update = queue.RouterUpdate(router_id,queue.PRIORITY_SYNC_ROUTERS_TASK,timestamp=timestamp,action=queue.DELETE_ROUTER)self._queue.add(update)

现在回到self._process_router_update,里面有一句self._process_router_if_compatible

    def _process_router_update(self):for rp, update in self._queue.each_update_to_next_router():..................try:self._process_router_if_compatible(router)except n_exc.RouterNotCompatibleWithAgent as e:..................except Exception:..................LOG.debug("Finished a router update for %s", update.id)rp.fetched_and_processed(update.timestamp)

self._process_router_if_compatible,重点是最后两句

    def _process_router_if_compatible(self, router):..............if router['id'] not in self.router_info:self._process_added_router(router)else:self._process_updated_router(router)

如果内存中self.router_info没有该router信息,那么添加router,否则修改 router

添加router, self._process_added_router:

    def _process_added_router(self, router):self._router_added(router['id'], router)ri = self.router_info[router['id']]ri.router = routerri.process()registry.notify(resources.ROUTER, events.AFTER_CREATE, self, router=ri)self.l3_ext_manager.add_router(self.context, router)

最后由l3_ext_manager处理,即由extension处理。同样的,更新router也是这样,最终由l3_extension_manager处理

    def _process_updated_router(self, router):ri = self.router_info[router['id']]is_dvr_only_agent = (self.conf.agent_mode in[lib_const.L3_AGENT_MODE_DVR,l3_constants.L3_AGENT_MODE_DVR_NO_EXTERNAL])is_ha_router = getattr(ri, 'ha_state', None) is not Noneif router.get('ha') and not is_dvr_only_agent and is_ha_router:self.check_ha_state_for_router(router['id'], router.get(l3_constants.HA_ROUTER_STATE_KEY))ri.router = routerregistry.notify(resources.ROUTER, events.BEFORE_UPDATE,self, router=ri)ri.process()registry.notify(resources.ROUTER, events.AFTER_UPDATE, self, router=ri)self.l3_ext_manager.update_router(self.context, router)

至此,完成了l3 agent启动到与plugin上数据库同步的流程分析。关于增加与删除router,则是直接利用rpc通信进行的,这里不再叙述。

如果有任何问题,欢迎私信讨论

openstack(Queens) neutron-l3-agent 代码解析1(从命令行启动到同步plugin数据)相关推荐

  1. python解析库_Python命令行解析库argparse

    原博文 2014-08-13 05:48 − 2.7之后python不再对optparse模块进行扩展,python标准库推荐使用argparse模块对命令行进行解析. 1.example 有一道面试 ...

  2. PerfLib 2.0 计数器 removal 失败,退出代码为 2。命令行: C:\Windows\system32\unlodctr.exe /m:hkengperfctr.xml

    sql server 2019卸载后重装反复出现: PerfLib 2.0 计数器 removal 失败,退出代码为 2.命令行: C:\Windows\system32\unlodctr.exe / ...

  3. python多任务、面向对象、命令行启动动态绑定端口号静态web服务器代码实现

    一.静态web服务器-多任务 多任务web服务器:使用多线程,比进程更节省资源,支持多用户同时访问,可以同时处理多个客户端请求 实现步骤 若客户端与服务端建立连接,则创建子线程,使用子线程处理客户端请 ...

  4. C++ 简化 推箱子 小游戏 完整代码 参考网络资料 命令行运行 仅供初学者参考交流

    C++ 简化 推箱子 小游戏 完整代码 参考网络资料 命令行运行 仅供初学者参考交流 说明:学做了4关推箱子, 仅供初学者参考可用g++ 编译,可以将内容复制到TXT文件,将后缀改为".cp ...

  5. 【TypeScript】通过node-cmd使用代码,执行cmd命令行

    使用第三方库 node-cmd 执行 cmd 命令行命令 具体实现代码如下: const cmdShell = require('node-cmd')async function cmd(domain ...

  6. python 命令行解析模块_Python命令行解析模块详解

    python2.7 怎么解析命令行输入的中文参数 本文实例讲述了python读取命令行参数的方法.分享给大家供大家参考.具体分析如下: 如果想对python脚本传参数,python中对应的argc, ...

  7. python运行代码不成功_命令行执行python模块时提示包找不到的问题

    庄稼人不是专职python开发的道友,虽然与python相识已多年,可惜相识不相知,只是偶尔借助pydev写一些简单的小工具. 多年来,一直困惑于这样一个问题:同样的工程,同样的代码,使用pydev可 ...

  8. python心脏线绘制代码_C++和Java命令行绘制心形图代码分享

    C++和Java命令行绘制心形图案 心形线 心形线,是一个圆上的固定一点在它绕着与其相切且半径相同的另外一个圆周滚动时所形成的轨迹,因其形状像心形而得名. 心脏线亦为蚶线的一种.在曼德博集合正中间的图 ...

  9. python argv 详解_python解析传入的命令行参数 argv

    python解析命令行参数主要有三种方法:sys.argv.argparse解析.getopt解析 方法一:sys.argv -- 命令行执行:python test_命令行传参.py 1,2,3 1 ...

最新文章

  1. 主流框架中DOMContentLoaded事件的实现
  2. YOLOv3 best_iou问题
  3. python进程池multiprocessing.Pool运行错误:The freeze_support() line can be omitted if the program is not g
  4. php读取excel的公式,PHPExcel在解析xlsx文件中的公式时返回零“0”
  5. ccf--20140903--字符串匹配
  6. 复盘阿里城市大脑这3年
  7. 怎樣制作线段动画_OPPO又开发布会!这两个PPT动画太炫了,荣获网友清一色好评...
  8. AD维护管理工具详解(一)dcdiag
  9. 实例--[QSerialPort]串口通信_vortex_新浪博客
  10. 优先级(HTML、CSS)
  11. 【从0到1搭建LoRa物联网】12、LoRa网关与平台通讯协议
  12. OpenID实现多系统整合的用户同步解决方案
  13. 【手写源码-设计模式7】-桥接模式-基于苹果小米手机刷机场景
  14. 网络安全职业_如何开始网络安全职业
  15. 北邮iptv用WindowsMediaplayer打不开的解决的方法
  16. 设置电脑 保护视力 还有桌面默认颜色
  17. Windows平台上使用Qt(MinGW)调用基于VS编写的周立功CAN卡Dll文件
  18. 微信小程序的基本操作
  19. TunesKit Audio Converter for Mac(音频格式转换软件)
  20. VMware-ovftool命令行部署与导出镜像

热门文章

  1. CCNP精粹系列之三十二--BGP下一跳问题,推荐
  2. 双通输入法源码公布 by 尉迟方
  3. Active Directory操作主机详解
  4. Linux系统内存管理之伙伴系统分析 - 旭东的博客 - 博客园
  5. 一个严重损坏Excel深度修复案例
  6. 基于catalog 创建RMAN存储脚本
  7. linux 字符串转换函数 simple_strtoul 简介
  8. linux shell 删除文本 较长行
  9. linux nDPI 协议检测 源码分析
  10. docker 不使用缓存重建镜像