【vn.py学习笔记(五)】vn.py Base、Log、Oms、Email Engine源码阅读

  • 写在前面
  • 1 BaseEngine
  • 2 LogEngine
  • 3 OmsEngine
    • 3.1 构造函数
    • 3.2 订单数据获取函数
    • 3.3 订单事件处理函数
  • 4 EmailEngine
  • 参考资料

写在前面

  笔者刚接触量化投资,对量化投资挺感兴趣,在闲暇时间进行量化投资的学习,只能进行少量资金进行量化实践。目前在进行基于vnpy的A股市场的量化策略学习,主要尝试攻克的技术难点在:A股市场日线数据的免费获取维护、自动下单交易、全市场选股程序、选股策略的回测程序、基于机器学习的股票趋势预测。
  欢迎志同道合的朋友加我QQ(1163962054)交流。
  分享的github仓库:https://github.com/PanAndy/quant_share。


  顺着上一篇《【vn.py学习笔记(四)】vn.py MainEngine源码阅读》继续阅读vnpy/trader/engine.py的内容。下一篇内容将来学习一下vnpy/trader/constant.py里定义的vn.py的常量,同时来分析一下vn.py中订单的生命历程。我之前看的vn.py官方课程里面讲的,订单的生命历程对精细化交易非常重要,趁此机会整理一下。
  vnpy/trader/engine.py文件内还有三个引擎LogEngine、OmsEngine、EmailEngine,和一个引擎的基类BaseEngine,vn.py代码中所有的引擎都派生于引擎基类BaseEngine。接下来我将按引擎进行源码分析。

1 BaseEngine

  BaseEngine定义了vn.py架构里所有应用级引擎的基类,构造函数需要传进主引擎、事件引擎、引擎名,此三者也是一个应用与系统交互必不可少的部分,还定义了一个关闭引擎的函数close,源码如下:

class BaseEngine(ABC):"""Abstract class for implementing an function engine."""def __init__(self,main_engine: MainEngine,event_engine: EventEngine,engine_name: str,):""" """self.main_engine = main_engineself.event_engine = event_engineself.engine_name = engine_namedef close(self):""""""pass

2 LogEngine

  LogEngine是vn.py的日志引擎模块,它主要负责将处理日志事件,输出控制台或者输出到日志文件中。它主要是对python内置logging模块的封装。代码如下:

class LogEngine(BaseEngine):"""Processes log event and output with logging module."""def __init__(self, main_engine: MainEngine, event_engine: EventEngine):""" """super(LogEngine, self).__init__(main_engine, event_engine, "log")if not SETTINGS["log.active"]:returnself.level: int = SETTINGS["log.level"]self.logger: Logger = logging.getLogger("VN Trader")self.logger.setLevel(self.level)self.formatter = logging.Formatter("%(asctime)s  %(levelname)s: %(message)s")self.add_null_handler()# 根据配置文件设置日志的输出方式if SETTINGS["log.console"]:self.add_console_handler()if SETTINGS["log.file"]:self.add_file_handler()# 注册主引擎日志事件的监控函数self.register_event()def add_null_handler(self) -> None:"""Add null handler for logger."""null_handler = logging.NullHandler()self.logger.addHandler(null_handler)def add_console_handler(self) -> None:"""Add console output of log."""console_handler = logging.StreamHandler()console_handler.setLevel(self.level)console_handler.setFormatter(self.formatter)self.logger.addHandler(console_handler)def add_file_handler(self) -> None:"""Add file output of log."""today_date = datetime.now().strftime("%Y%m%d")filename = f"vt_{today_date}.log"log_path = get_folder_path("log")file_path = log_path.joinpath(filename)file_handler = logging.FileHandler(file_path, mode="a", encoding="utf8")file_handler.setLevel(self.level)file_handler.setFormatter(self.formatter)self.logger.addHandler(file_handler)def register_event(self) -> None:""" """self.event_engine.register(EVENT_LOG, self.process_log_event)def process_log_event(self, event: Event) -> None:"""Process log event."""log = event.dataself.logger.log(log.level, log.msg)

3 OmsEngine

  OmsEngine负责实现vn.py的订单管理系统功能,包括获取最新tick数据、订单、交易、仓位、账户、合约、活动订单等等功能,称为订单数据获取函数;也包括OmsEngine对tick事件的处理函数、对订单事件的处理函数、对账户事件的处理函数、对合约事件的处理函数,称为订单数据处理函数;另外,还有一些功能函数,事件的监听的注册和向主引擎添加订单获取获取功能。接下来分功能对源码进行分析。

3.1 构造函数

  构造函数里定义OmsEngine进行订单数据维护的数据结构,基本都是字典,包括ticks、orders、traders、positions、accounts、contracts、active_orders,见名知义。构造函数里还向main_engine里添加了订单数据获取函数,向事件引擎注册了订单管理系统的处理函数,源码如下:

class OmsEngine(BaseEngine):"""Provides order management system function for VN Trader."""def __init__(self, main_engine: MainEngine, event_engine: EventEngine):""" """super(OmsEngine, self).__init__(main_engine, event_engine, "oms")self.ticks: Dict[str, TickData] = {}self.orders: Dict[str, OrderData] = {}self.trades: Dict[str, TradeData] = {}self.positions: Dict[str, PositionData] = {}self.accounts: Dict[str, AccountData] = {}self.contracts: Dict[str, ContractData] = {}self.active_orders: Dict[str, OrderData] = {}self.add_function()self.register_event()def add_function(self) -> None:"""Add query function to main engine."""self.main_engine.get_tick = self.get_tickself.main_engine.get_order = self.get_orderself.main_engine.get_trade = self.get_tradeself.main_engine.get_position = self.get_positionself.main_engine.get_account = self.get_accountself.main_engine.get_contract = self.get_contractself.main_engine.get_all_ticks = self.get_all_ticksself.main_engine.get_all_orders = self.get_all_ordersself.main_engine.get_all_trades = self.get_all_tradesself.main_engine.get_all_positions = self.get_all_positionsself.main_engine.get_all_accounts = self.get_all_accountsself.main_engine.get_all_contracts = self.get_all_contractsself.main_engine.get_all_active_orders = self.get_all_active_ordersdef register_event(self) -> None:""""""self.event_engine.register(EVENT_TICK, self.process_tick_event)self.event_engine.register(EVENT_ORDER, self.process_order_event)self.event_engine.register(EVENT_TRADE, self.process_trade_event)self.event_engine.register(EVENT_POSITION, self.process_position_event)self.event_engine.register(EVENT_ACCOUNT, self.process_account_event)self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)

3.2 订单数据获取函数

  订单数据获取函数就是提供了一个OmsEngine引擎内部数据结构的对外访问接口,比较神奇的是,它在add_function函数里,把一些数据获取函数绑定到main_engine上了,这样main_engine也就可以获取到订单状态数据了。

    def get_tick(self, vt_symbol: str) -> Optional[TickData]:"""Get latest market tick data by vt_symbol."""return self.ticks.get(vt_symbol, None)def get_order(self, vt_orderid: str) -> Optional[OrderData]:"""Get latest order data by vt_orderid."""return self.orders.get(vt_orderid, None)def get_trade(self, vt_tradeid: str) -> Optional[TradeData]:"""Get trade data by vt_tradeid."""return self.trades.get(vt_tradeid, None)def get_position(self, vt_positionid: str) -> Optional[PositionData]:"""Get latest position data by vt_positionid."""return self.positions.get(vt_positionid, None)def get_account(self, vt_accountid: str) -> Optional[AccountData]:"""Get latest account data by vt_accountid."""return self.accounts.get(vt_accountid, None)def get_contract(self, vt_symbol: str) -> Optional[ContractData]:"""Get contract data by vt_symbol."""return self.contracts.get(vt_symbol, None)def get_all_ticks(self) -> List[TickData]:"""Get all tick data."""return list(self.ticks.values())def get_all_orders(self) -> List[OrderData]:"""Get all order data."""return list(self.orders.values())def get_all_trades(self) -> List[TradeData]:"""Get all trade data."""return list(self.trades.values())def get_all_positions(self) -> List[PositionData]:"""Get all position data."""return list(self.positions.values())def get_all_accounts(self) -> List[AccountData]:"""Get all account data."""return list(self.accounts.values())def get_all_contracts(self) -> List[ContractData]:"""Get all contract data."""return list(self.contracts.values())def get_all_active_orders(self, vt_symbol: str = "") -> List[OrderData]:"""Get all active orders by vt_symbol.If vt_symbol is empty, return all active orders."""if not vt_symbol:return list(self.active_orders.values())else:active_orders = [orderfor order in self.active_orders.values()if order.vt_symbol == vt_symbol]return active_orders

3.3 订单事件处理函数

  OmsEngine的订单事件处理函数主要负责在发生订单事件时,对订单路由系统的数据进行更新。由此可见,订单路由系统的功能更类似于对全局的订单相关的数据进行维护,保证系统数据状态正常可查。代码如下:

    def process_tick_event(self, event: Event) -> None:""" """tick = event.dataself.ticks[tick.vt_symbol] = tickdef process_order_event(self, event: Event) -> None:""" """order = event.dataself.orders[order.vt_orderid] = order# If order is active, then update data in dict.if order.is_active():self.active_orders[order.vt_orderid] = order# Otherwise, pop inactive order from in dictelif order.vt_orderid in self.active_orders:self.active_orders.pop(order.vt_orderid)def process_trade_event(self, event: Event) -> None:""" """trade = event.dataself.trades[trade.vt_tradeid] = tradedef process_position_event(self, event: Event) -> None:""" """position = event.dataself.positions[position.vt_positionid] = positiondef process_account_event(self, event: Event) -> None:""" """account = event.dataself.accounts[account.vt_accountid] = accountdef process_contract_event(self, event: Event) -> None:""""""contract = event.dataself.contracts[contract.vt_symbol] = contract

4 EmailEngine

   EmailEngine就负责实现向邮件发送信息,它负责交易信号的通知功能。源码没有什么不同的地方,就很类似事件引擎,新开一个线程,不断尝试从队列中读取邮件数据,如果队列中有邮件,由调包发送。每次调用 send_email函数发送邮件时,向队列中添加一条邮件数据。事件引擎理解之后,EmailEngine代码没有什么特别难的地方。
   这里还是简单介绍一下缓存读取的原理。缓存读取的原理其实是生产者消费者模式,它是开发过程中常用的一种模式。生产者模块负责产生数据(send_email),放入缓存区(queue),这些数据由另一个消费者模块从缓冲区中取出并进行消费者相应的处理(run)。这一模式的优点在于:

  • 解耦:缓存区的存在可以让生产者和消费者降低互相之间的依赖性,一个模块代码变化,不会直接影响到另一个模块;
  • 并发:由于缓冲区的存在,生产者和消费者不是直接调用,而是两个独立的并发主体,生产者产生数据之后把它放入缓冲区,就继续生产数据,不依赖消费者的处理速度。
class EmailEngine(BaseEngine):"""Provides email sending function for VN Trader."""def __init__(self, main_engine: MainEngine, event_engine: EventEngine):""" """super(EmailEngine, self).__init__(main_engine, event_engine, "email")self.thread: Thread = Thread(target=self.run)self.queue: Queue = Queue()self.active: bool = Falseself.main_engine.send_email = self.send_emaildef send_email(self, subject: str, content: str, receiver: str = "") -> None:"""EmailEngine不是总是运行着的,只有在第一次发送邮件的时候,引擎线程启动"""# Start email engine when sending first email.if not self.active:self.start()# Use default receiver if not specified.if not receiver:receiver = SETTINGS["email.receiver"]msg = EmailMessage()msg["From"] = SETTINGS["email.sender"]msg["To"] = receivermsg["Subject"] = subjectmsg.set_content(content)self.queue.put(msg)def run(self) -> None:""" """while self.active:try:msg = self.queue.get(block=True, timeout=1)with smtplib.SMTP_SSL(SETTINGS["email.server"], SETTINGS["email.port"]) as smtp:smtp.login(SETTINGS["email.username"], SETTINGS["email.password"])smtp.send_message(msg)except Empty:passdef start(self) -> None:""" """self.active = Trueself.thread.start()def close(self) -> None:""" """if not self.active:returnself.active = Falseself.thread.join()

参考资料

  1. Python 队列(Queue)用法

【vn.py学习笔记(五)】vn.py Base、Log、Oms、Email Engine源码阅读相关推荐

  1. OpenCV学习笔记(三):图像对比度、亮度调整源码

    OpenCV学习笔记(三):图像对比度.亮度调整源码 主函数: #include <opencv2/opencv.hpp>using namespace cv;using namespac ...

  2. Python学习笔记:7.5.1 Django快速建站 -源码版本管理

    前言:本文是学习网易微专业的<python全栈工程师 - Django快速建站>课程的笔记,欢迎学习交流.同时感谢老师们的精彩传授! 一.课程目标 了解源码版本管理的意义 掌握初步的git ...

  3. 华为鸿蒙系统学习笔记5-华为方舟编译器正式开源及相关源码下载

    8月9日至8月11日,华为面向全球开发者的2019年开发者大会,将在东莞松山湖举行.据悉,今年也是华为第一次在华为松山湖基地欧洲小镇里举办开发者大会. 前不久,余承东曾在微博上预告:"让我们 ...

  4. PostgreSQL学习笔记YY(2)--Ubuntu下使用DDD调试查看源码(原创)

    说明:前一段时间一直在FreeBSD的操作系统环境下进行实验,但是由于多次安装桌面系统失败,所以无法及时更新日志.昨天换了Ubuntu系统,在Unix 环境下摸索了3天之后,终于在Ubuntu上编译, ...

  5. react学习笔记 react-router-dom react-redux基础使用及手写基础源码 组件反射 react原理

    vdom diff 高效的diff算法 新老vdom树比较 更新只需要更新节点 数据变化检测 batch dom读写 组件多重继承 //parent components export default ...

  6. detectron2入门学习一:实现FruitsNut水果坚果分割任务以源码阅读

    学习目标: 学习detectron2数据集的注册以及基本的训练推理 一.工程文件下载与数据集准备: 整体的工程文件下载地址: https://github.com/fenglingbai/Fruits ...

  7. 【vn.py学习笔记(八)】vn.py utility、BarGenerator、ArrayManager源码阅读

    [vn.py学习笔记(八)]vn.py utility.BarGenerator.ArrayManager源码阅读 写在前面 1 工具函数 2 BarGenerator 2.1 update_tick ...

  8. 【vn.py学习笔记(二)】vn.py底层接口 学习笔记

    [vn.py学习笔记(二)]vn.py底层接口 学习笔记 1 CTP API的工作原理 1.1 CTP介绍 1.2 API功能介绍 1.3 CTP API文件 1.4 API 通用规则 2 CTP A ...

  9. 【vn.py学习笔记(三)】vn.py事件引擎 学习笔记

    [vn.py学习笔记(三)]vn.py事件引擎 学习笔记 1 时间驱动 2 事件驱动 3 事件引擎工作流程 4 事件引擎结构 4.1 事件队列 4.2 事件处理线程 4.3 事件处理函数字典/通用事件 ...

最新文章

  1. 海南医学院计算机,海南医学院医学信息学院
  2. 今天说的是必须要熟练掌握的归并排序
  3. python资料书-《Python数据分析与应用》——图书配套资料下载
  4. deepin启动盘制作工具_balenaEtcher for mac(启动盘制作工具) v1.5.70已更新
  5. python k线顶分型_【缠论】分型、笔的定义及其程序化
  6. 关于调试windows services的方法
  7. 深入理解JavaScript的闭包特性如何给循环中的对象添加事件
  8. 解决SQL单用户模式不能转为多用户模式
  9. Auto Layout 和 Constraints
  10. 第三章 汇编语言和汇编软件
  11. Oracle 实验7 存储过程
  12. 以太坊服务器是什么_OKEX区块链60讲 | 第33集:什么是以太坊?
  13. GPIO应用开发方法【ZT】
  14. 排序算法之 插入排序
  15. Struts1框架轻易入门,经典示例
  16. 超声成像突破衍射极限,有望检测肿瘤
  17. python当中的列表函数和列表推导式
  18. 金工计算机测试题,金工考试题精选.doc
  19. 使用IDEA创建一个Solidity项目
  20. [SOA介绍]什么是SOA?

热门文章

  1. KingScada 组合框/列表框使用
  2. 计算机病毒防护软件有哪些,电脑防护软件排名
  3. docker容器添加微软雅黑字体
  4. 健康——基本运动的卡路里计算公式
  5. niuke题霸SQ/L篇
  6. DB210.5版本的官网下载和安装
  7. README文档模板 含下载地址0积分 项目文档模板 springboot文档示例 readme参考文档 目录跳转 项目架构 使用说明 目录结构 等等 (含附件下载)
  8. 【Linux】树莓派控制光强传感器(C、python手把手教学)
  9. 如何破解wayos禁止二级路由
  10. 真实世界的Haskell(影印版)