【vn.py学习笔记(八)】vn.py utility、BarGenerator、ArrayManager源码阅读

  • 写在前面
  • 1 工具函数
  • 2 BarGenerator
    • 2.1 update_tick
    • 2.2 update_bar
    • 2.3 update_bar_minute_window
    • 2.4 update_bar_hour_window
    • 2.5 on_hour_bar
    • 2.6 generate
  • 3 ArrayManager
  • 学习资料

写在前面

  笔者刚接触量化投资,对量化投资挺感兴趣,在闲暇时间进行量化投资的学习,只能进行少量资金进行量化实践。目前在进行基于vnpy的A股市场的量化策略学习,主要尝试攻克的技术难点在:A股市场日线数据的免费获取维护、自动下单交易、全市场选股程序、选股策略的回测程序、基于机器学习的股票趋势预测。现阶段的计划是阅读vn.py的源码,学习vn.py架构机制,在学习的过程中,会以分享的形式记录,以加深对vn.py的理解,有不对的地方欢迎大家批评指正。
  欢迎志同道合的朋友加我QQ(1163962054)交流。
  分享的github仓库:https://github.com/PanAndy/quant_share。


  这次来学习一下vnpy/trader/utility.py下的内容,utility.py下的内容可以分为三个部分:工具函数、BarGenerator、ArrayManager,其中工具函数部分比较好理解,只是对通用的一些功能进行的封装;BarGenerator是K线合成器,负责根据实时接收的tick数据合成1分钟k线,并借此合成n分钟K线;ArrayManager是指标计算辅助类,负责维护一定量的历史数据,以供常见指标如sma、ema、atr等的计算。BarGenerator和ArrayManager是这次重点关注的内容。

1 工具函数

  utility.py提供的工具函数是主要是对合约代码的转换、路径的读取、json文件读写、数值位数的设置、日志等相关的功能,这些函数主要是对基本功能的封装,没有特别复杂的算法。工具函数的接口如下:

# 合约代码的转换
def extract_vt_symbol(vt_symbol: str) -> Tuple[str, Exchange]:
def generate_vt_symbol(symbol: str, exchange: Exchange) -> str:# 路径的读取
def _get_trader_dir(temp_name: str) -> Tuple[Path, Path]:
def get_file_path(filename: str) -> Path:
def get_folder_path(folder_name: str) -> Path:
def get_icon_path(filepath: str, ico_name: str) -> str:# json文件读写
def load_json(filename: str) -> dict:
def save_json(filename: str, data: dict) -> None:# 数值位数设置
def round_to(value: float, target: float) -> float:
def floor_to(value: float, target: float) -> float:
def ceil_to(value: float, target: float) -> float:
def get_digits(value: float) -> int:def virtual(func: Callable) -> Callable:# 日志相关
def _get_file_logger_handler(filename: str) -> logging.FileHandler:
def get_file_logger(filename: str) -> logging.Logger:

2 BarGenerator

  BarGenerator类用于从tick数据中生成1分钟bar数据,也可以用于从1分钟的bar数据中合成x分钟或者x小时的bar。对于合成x分钟的bar,x必须能被60整除,如2、3、5、6、10、15、20、30;对于合成x小时的bar,x可以是任何整数。BarGenerator的主要函数有6个:

  • update_tick,负责向BarGenerator类输入一个最新的tick,使用输入的tick更新bar。
  • update_bar,负责向BarGenerator类输入一个最新的bar,可以使用这个方法生成x分钟/小时的新bar,它会调用update_bar_minute_window或者update_bar_hour_window。
  • update_bar_minute_window,由1分钟bar生成x分钟bar的逻辑。
  • update_bar_hour_window,由1分钟bar生成1小时bar的逻辑。
  • on_hour_bar,由1小时bar生成x小时bar的逻辑。
  • generate,强制把当前缓存着的k线合成完毕并推送出来,它主要用于郑州商品交易所每天在收盘后缺失收盘最终的tick推送导致行情记录的时候,最后的k线推不出来。

接下来,我们逐个来看具体实现算法。

2.1 update_tick

  害,本来想画个流程图直观一些的,发现判断太多了,还不如python代码看着清晰,我就只在代码里添加了一些注释说明了。update_tick的基本逻辑就是先对tick数据进行过滤,然后判断self.bar是否为空,如果为空,说明是新的一分钟;如果self.bar不为空,判断接收到的tick与self.bar是否处于同一分钟, 如果不处于同一分钟,则是新一分钟,把当前的self.bar推送出去。接下来,如果是新的一分钟,则新创建一个BarData对象,用于后续累积更新,如果不是新的一分钟,由使用当前接收到的tick对self.bar进行累积更新。随后,如果self.last_tick非空,则可以计算self.bar的在这一分钟内的成交量。最后,更新self.last_tick进行缓存。

    def update_tick(self, tick: TickData) -> None:"""Update new tick data into generator."""# 判断是否走完了一分钟new_minute = False# Filter tick data with 0 last price# 最新成交价为0if not tick.last_price:return# Filter tick data with older timestamp# 过滤掉收到的过去的tickif self.last_tick and tick.datetime < self.last_tick.datetime:returnif not self.bar:# self.bar为None,那收到的tick就是新的一分钟的ticknew_minute = Trueelif ((self.bar.datetime.minute != tick.datetime.minute)or (self.bar.datetime.hour != tick.datetime.hour)):# self.bar不为None,判断是否到了下一分钟,如果到了下一分钟,就给self.bar推出去。self.bar.datetime = self.bar.datetime.replace(second=0, microsecond=0)# 已经过了当着这一分钟了,把已经合成的bar推出去self.on_bar(self.bar)new_minute = Trueif new_minute:# 新的一分钟,新生成一个bar对象# 初始化barself.bar = BarData(symbol=tick.symbol,exchange=tick.exchange,interval=Interval.MINUTE,datetime=tick.datetime,gateway_name=tick.gateway_name,open_price=tick.last_price,high_price=tick.last_price,low_price=tick.last_price,close_price=tick.last_price,open_interest=tick.open_interest)else:# 将当前tick的信息更新到bar里self.bar.high_price = max(self.bar.high_price, tick.last_price)if tick.high_price > self.last_tick.high_price:self.bar.high_price = max(self.bar.high_price, tick.high_price)self.bar.low_price = min(self.bar.low_price, tick.last_price)if tick.low_price < self.last_tick.low_price:self.bar.low_price = min(self.bar.low_price, tick.low_price)self.bar.close_price = tick.last_priceself.bar.open_interest = tick.open_interestself.bar.datetime = tick.datetimeif self.last_tick:# 当前品种、全天交易到当前tick时的成交量,而不是最新的一笔tick的成交量volume_change = tick.volume - self.last_tick.volumeself.bar.volume += max(volume_change, 0)self.last_tick = tick

2.2 update_bar

  update_bar函数的角色相当于一个调度员,根据self.interval的粒度来选择是调用self.update_bar_minute_window还是调用self.update_bar_hour_window,最终相应粒度的bar数据合成完成之后,都会回调self.on_window_bar函数。

    def update_bar(self, bar: BarData) -> None:"""Update 1 minute bar into generator"""if self.interval == Interval.MINUTE:self.update_bar_minute_window(bar)else:self.update_bar_hour_window(bar)

2.3 update_bar_minute_window

  update_bar_hour_window的实现逻辑和update_tick的逻辑差不多,它在一个函数实现了缓存x分钟的逻辑,主要是缓存1分钟bar的逻辑在update_tick中已经实现了。update_bar_minute_window的逻辑就是将一分钟bar积累起来,当累积的数目达到目标self.window时,进行推送。

    def update_bar_minute_window(self, bar: BarData) -> None:""""""# If not inited, create window bar objectif not self.window_bar:dt = bar.datetime.replace(second=0, microsecond=0)self.window_bar = BarData(symbol=bar.symbol,exchange=bar.exchange,datetime=dt,gateway_name=bar.gateway_name,open_price=bar.open_price,high_price=bar.high_price,low_price=bar.low_price)# Otherwise, update high/low price into window barelse:self.window_bar.high_price = max(self.window_bar.high_price,bar.high_price)self.window_bar.low_price = min(self.window_bar.low_price,bar.low_price)# Update close price/volume into window barself.window_bar.close_price = bar.close_priceself.window_bar.volume += int(bar.volume)self.window_bar.open_interest = bar.open_interest# Check if window bar completedif not (bar.datetime.minute + 1) % self.window:self.on_window_bar(self.window_bar)self.window_bar = None# Cache last bar objectself.last_bar = bar

2.4 update_bar_hour_window

  update_bar_hour_window的实现逻辑和update_tick的逻辑也差不多,只是将分钟级别的合成升级到小时级别的合成。如果self.hour_bar为None,由新建一个BarData对象;如果self.hour_bar不空None,则判断当前接收bar的分钟是不是第59分钟,如果是,则说明1小时bar合成完成,缓存进finished_bar中,后面给推送出去,并把self.hour_bar清空;如果当前接收的bar是位于新的一个小时,则把缓存的一小时bar推送出去,开启新的一小时bar缓存;其他情况,则说明在当前一小时内,继续累积更新缓存的bar;接着,如果finished_bar不为None,则说明有可以推送出去的bar,调用self.on_hour_bar,继续进行累积x小时bar的逻辑;最后,把当前接收到的bar数据缓存到self.last_bar中。

    def update_bar_hour_window(self, bar: BarData) -> None:""""""# If not inited, create window bar objectif not self.hour_bar:dt = bar.datetime.replace(minute=0, second=0, microsecond=0)self.hour_bar = BarData(symbol=bar.symbol,exchange=bar.exchange,datetime=dt,gateway_name=bar.gateway_name,open_price=bar.open_price,high_price=bar.high_price,low_price=bar.low_price)returnfinished_bar = None# If minute is 59, update minute bar into window bar and pushif bar.datetime.minute == 59:self.hour_bar.high_price = max(self.hour_bar.high_price,bar.high_price)self.hour_bar.low_price = min(self.hour_bar.low_price,bar.low_price)self.hour_bar.close_price = bar.close_priceself.hour_bar.volume += int(bar.volume)self.hour_bar.open_interest = bar.open_interestfinished_bar = self.hour_barself.hour_bar = None# If minute bar of new hour, then push existing window barelif bar.datetime.hour != self.hour_bar.datetime.hour:finished_bar = self.hour_bardt = bar.datetime.replace(minute=0, second=0, microsecond=0)self.hour_bar = BarData(symbol=bar.symbol,exchange=bar.exchange,datetime=dt,gateway_name=bar.gateway_name,open_price=bar.open_price,high_price=bar.high_price,low_price=bar.low_price)# Otherwise only update minute barelse:self.hour_bar.high_price = max(self.hour_bar.high_price,bar.high_price)self.hour_bar.low_price = min(self.hour_bar.low_price,bar.low_price)self.hour_bar.close_price = bar.close_priceself.hour_bar.volume += int(bar.volume)self.hour_bar.open_interest = bar.open_interest# Push finished window barif finished_bar:self.on_hour_bar(finished_bar)# Cache last bar objectself.last_bar = bar

2.5 on_hour_bar

  on_hour_bar就和update_bar_minute_window的逻辑一样了,将由update_bar_hour_window产生的一小时bar积累起来,当累积数目达到目标self.window时,进行推送。

    def on_hour_bar(self, bar: BarData) -> None:""""""if self.window == 1:self.on_window_bar(bar)else:if not self.window_bar:self.window_bar = BarData(symbol=bar.symbol,exchange=bar.exchange,datetime=bar.datetime,gateway_name=bar.gateway_name,open_price=bar.open_price,high_price=bar.high_price,low_price=bar.low_price)else:self.window_bar.high_price = max(self.window_bar.high_price,bar.high_price)self.window_bar.low_price = min(self.window_bar.low_price,bar.low_price)self.window_bar.close_price = bar.close_priceself.window_bar.volume += int(bar.volume)self.window_bar.open_interest = bar.open_interestself.interval_count += 1if not self.interval_count % self.window:self.interval_count = 0self.on_window_bar(self.window_bar)self.window_bar = None

2.6 generate

  generate函数的作用就是强制把当前缓存着的bar推送出去。

    def generate(self) -> Optional[BarData]:"""Generate the bar data and call callback immediately."""bar = self.barif self.bar:bar.datetime = bar.datetime.replace(second=0, microsecond=0)self.on_bar(bar)self.bar = Nonereturn bar

3 ArrayManager

  ArrayManager是时间序列容器,用于按时间序列缓存bar数据,提供技术指标的计算。ArrayManager的整体结构图(来源于《全实战进阶系统 - CTA策略》)如下图所示,它提供的函数分为四类:init函数、update_bar、@property函数、技术指标函数。

  • init函数中定义了缓存K线的计数count、需要缓存的最小K线数量size、是否缓存足够K数据标识inited。
  • update_bar函数采用切片平移的方式更新最新数据,当K线数量达到size大小时,将inited设置为true。
  • @property函数提供了访问缓存K线数据的方式。
  • 技术指标函数提供了利用缓存K线数据计算指标的方法,它们既可以返回最新一个周期的指标值,也可以返回计算出的所有指标序列,可以通过参数array控制。

学习资料

  1. vn.py community 《全实战进阶系统 - CTA策略》课程8-K线自定义合成

【vn.py学习笔记(八)】vn.py utility、BarGenerator、ArrayManager源码阅读相关推荐

  1. 【vn.py学习笔记(五)】vn.py Base、Log、Oms、Email Engine源码阅读

    [vn.py学习笔记(五)]vn.py Base.Log.Oms.Email Engine源码阅读 写在前面 1 BaseEngine 2 LogEngine 3 OmsEngine 3.1 构造函数 ...

  2. Mr.J-- jQuery学习笔记(三十二)--jQuery属性操作源码封装

    扫码看专栏 jQuery的优点 jquery是JavaScript库,能够极大地简化JavaScript编程,能够更方便的处理DOM操作和进行Ajax交互 1.轻量级 JQuery非常轻巧 2.强大的 ...

  3. resnet50网络结构_学习笔记(一):分析resnet源码理解resnet网络结构

    最近在跑实验的过程中一直在使用resnet50和resnet34,为了弄清楚网络的结构和原理的实现,打开resnet的源码进行了学习. 残差网络学习的原理 针对神经网络过深而导致的学习准确率饱和甚至是 ...

  4. 【学习笔记】Keras库下的resnet源码分析

    Keras库下的resnet源码应用及解读 其实我也不知道这种东西有没有写下来的必要,但是跑代码的时候总摸鱼总归是不好的.虽然很简单,不过我也大概做个学习记录,写给小白看的.源码来自keras文档,大 ...

  5. Caffe学习笔记(4)--------用师兄的源码都差点没跑通觉得自己智商真的捉急!...

    在自己不知道哪个旮旯荡了一个不知道什么版本的VGG16更改多次未曾成功之后... 又厚着脸皮去找师兄求教了... 结果发现是自己荡的模型都不对...而且数据输入的格式也不对...然后师兄很无奈的把自己 ...

  6. Linux学习笔记15—RPM包的安装OR源码包的安装

    RPM安装命令 1. 安装一个rpm包 rpm –ivh 包名 "-i" : 安装的意思 "-v" : 可视化 "-h" : 显示安装进度 ...

  7. Linux内核学习笔记——内核页表隔离KPTI机制(源码分析)

    KPTI(Kernel PageTable Isolation)全称内核页表隔离,它通过完全分离用户空间与内核空间页表来解决页表泄露. KPTI中每个进程有两套页表--内核态页表与用户态页表(两个地址 ...

  8. 小迪渗透测试学习笔记(四)基础入门-WEB源码拓展

    前言: WEB源码在安全测试中是非常重要的信息来源,可以用来代码审 计漏洞也可以用来做信息突破口,其中WEB源码有很多技术需要简明分析. 比如:获取某ASP源码后可以采用默认数据库下载为突破,获取某其 ...

  9. 【vn.py学习笔记(二)】vn.py底层接口 学习笔记

    [vn.py学习笔记(二)]vn.py底层接口 学习笔记 1 CTP API的工作原理 1.1 CTP介绍 1.2 API功能介绍 1.3 CTP API文件 1.4 API 通用规则 2 CTP A ...

最新文章

  1. 调用vba_Python VS JavaScript,谁将是替代VBA最好语言
  2. FD_CLOEXEC用法及原因_转
  3. 设计模式的理解:状态模式(State) 和备忘录模式(Memento)
  4. storm目录结构及在zk中的目录结构
  5. 安卓欢迎界面和activity之间的跳转问题
  6. html中的文本格式化标签+多媒体标签+关于IE浏览器兼容的问题(干货!)
  7. [转载] 希腊字母读音表
  8. 20145236 《Java程序设计》 第6周学习总结
  9. 软件测试必学之python+unittest+requests+HTMLRunner编写接口自动化测试集
  10. Linux入门-安装篇(Debian 服务器版)
  11. 找工作,如何写好一份漂亮的简历,给你借鉴一下
  12. [转载]浅谈敏捷管理在软件项目中应用
  13. python实现微信自动发信息软件_Python实现给微信好友自动发送消息的示例
  14. 如何进行企业设备管理?
  15. 无心剑中译阿道司.赫胥黎《冥思月亮》
  16. 超火的微信渐变国旗头像,一键生成!!
  17. c语言正弦函数求导,正弦函数求导公式基本推导
  18. 【运筹帷幄】关于阿里云服务器自己开启的6010端口
  19. CMake Error: The current CMakeCache.txt directory is different...
  20. 树莓派4B安装Ubuntu Mate20.04

热门文章

  1. 激活office时,提示CScript错误:无法找到ospp.vbs脚本引擎vbscript
  2. 【单片机仿真项目】外部中断0和1控制两位数码管进行计数
  3. 中国电子邮件的发展史
  4. AT91Sam9260的网卡驱动
  5. 如何更新VLC媒体播放器
  6. 查找素材终极神器,视频片段查找神器!
  7. 我国高速公路交通荷载标准研究
  8. 一个意外错误使你无法删除该文件夹。【错误0x80070091:目录不是空的】Bandizip压缩软件解决方法
  9. 一卡通系统中的前置机的设置
  10. 阿里妈妈广告源码示例