【vn.py学习笔记(八)】vn.py utility、BarGenerator、ArrayManager源码阅读
【vn.py学习笔记(八)】vn.py utility、BarGenerator、ArrayManager源码阅读
- 写在前面
- 1 工具函数
- 2 BarGenerator
- 2.1 update_tick
- 2.2 update_bar
- 2.3 update_bar_minute_window
- 2.4 update_bar_hour_window
- 2.5 on_hour_bar
- 2.6 generate
- 3 ArrayManager
- 学习资料
写在前面
笔者刚接触量化投资,对量化投资挺感兴趣,在闲暇时间进行量化投资的学习,只能进行少量资金进行量化实践。目前在进行基于vnpy的A股市场的量化策略学习,主要尝试攻克的技术难点在:A股市场日线数据的免费获取维护、自动下单交易、全市场选股程序、选股策略的回测程序、基于机器学习的股票趋势预测。现阶段的计划是阅读vn.py的源码,学习vn.py架构机制,在学习的过程中,会以分享的形式记录,以加深对vn.py的理解,有不对的地方欢迎大家批评指正。
欢迎志同道合的朋友加我QQ(1163962054)交流。
分享的github仓库:https://github.com/PanAndy/quant_share。
这次来学习一下vnpy/trader/utility.py下的内容,utility.py下的内容可以分为三个部分:工具函数、BarGenerator、ArrayManager,其中工具函数部分比较好理解,只是对通用的一些功能进行的封装;BarGenerator是K线合成器,负责根据实时接收的tick数据合成1分钟k线,并借此合成n分钟K线;ArrayManager是指标计算辅助类,负责维护一定量的历史数据,以供常见指标如sma、ema、atr等的计算。BarGenerator和ArrayManager是这次重点关注的内容。
1 工具函数
utility.py提供的工具函数是主要是对合约代码的转换、路径的读取、json文件读写、数值位数的设置、日志等相关的功能,这些函数主要是对基本功能的封装,没有特别复杂的算法。工具函数的接口如下:
# 合约代码的转换
def extract_vt_symbol(vt_symbol: str) -> Tuple[str, Exchange]:
def generate_vt_symbol(symbol: str, exchange: Exchange) -> str:# 路径的读取
def _get_trader_dir(temp_name: str) -> Tuple[Path, Path]:
def get_file_path(filename: str) -> Path:
def get_folder_path(folder_name: str) -> Path:
def get_icon_path(filepath: str, ico_name: str) -> str:# json文件读写
def load_json(filename: str) -> dict:
def save_json(filename: str, data: dict) -> None:# 数值位数设置
def round_to(value: float, target: float) -> float:
def floor_to(value: float, target: float) -> float:
def ceil_to(value: float, target: float) -> float:
def get_digits(value: float) -> int:def virtual(func: Callable) -> Callable:# 日志相关
def _get_file_logger_handler(filename: str) -> logging.FileHandler:
def get_file_logger(filename: str) -> logging.Logger:
2 BarGenerator
BarGenerator类用于从tick数据中生成1分钟bar数据,也可以用于从1分钟的bar数据中合成x分钟或者x小时的bar。对于合成x分钟的bar,x必须能被60整除,如2、3、5、6、10、15、20、30;对于合成x小时的bar,x可以是任何整数。BarGenerator的主要函数有6个:
- update_tick,负责向BarGenerator类输入一个最新的tick,使用输入的tick更新bar。
- update_bar,负责向BarGenerator类输入一个最新的bar,可以使用这个方法生成x分钟/小时的新bar,它会调用update_bar_minute_window或者update_bar_hour_window。
- update_bar_minute_window,由1分钟bar生成x分钟bar的逻辑。
- update_bar_hour_window,由1分钟bar生成1小时bar的逻辑。
- on_hour_bar,由1小时bar生成x小时bar的逻辑。
- generate,强制把当前缓存着的k线合成完毕并推送出来,它主要用于郑州商品交易所每天在收盘后缺失收盘最终的tick推送导致行情记录的时候,最后的k线推不出来。
接下来,我们逐个来看具体实现算法。
2.1 update_tick
害,本来想画个流程图直观一些的,发现判断太多了,还不如python代码看着清晰,我就只在代码里添加了一些注释说明了。update_tick的基本逻辑就是先对tick数据进行过滤,然后判断self.bar是否为空,如果为空,说明是新的一分钟;如果self.bar不为空,判断接收到的tick与self.bar是否处于同一分钟, 如果不处于同一分钟,则是新一分钟,把当前的self.bar推送出去。接下来,如果是新的一分钟,则新创建一个BarData对象,用于后续累积更新,如果不是新的一分钟,由使用当前接收到的tick对self.bar进行累积更新。随后,如果self.last_tick非空,则可以计算self.bar的在这一分钟内的成交量。最后,更新self.last_tick进行缓存。
def update_tick(self, tick: TickData) -> None:"""Update new tick data into generator."""# 判断是否走完了一分钟new_minute = False# Filter tick data with 0 last price# 最新成交价为0if not tick.last_price:return# Filter tick data with older timestamp# 过滤掉收到的过去的tickif self.last_tick and tick.datetime < self.last_tick.datetime:returnif not self.bar:# self.bar为None,那收到的tick就是新的一分钟的ticknew_minute = Trueelif ((self.bar.datetime.minute != tick.datetime.minute)or (self.bar.datetime.hour != tick.datetime.hour)):# self.bar不为None,判断是否到了下一分钟,如果到了下一分钟,就给self.bar推出去。self.bar.datetime = self.bar.datetime.replace(second=0, microsecond=0)# 已经过了当着这一分钟了,把已经合成的bar推出去self.on_bar(self.bar)new_minute = Trueif new_minute:# 新的一分钟,新生成一个bar对象# 初始化barself.bar = BarData(symbol=tick.symbol,exchange=tick.exchange,interval=Interval.MINUTE,datetime=tick.datetime,gateway_name=tick.gateway_name,open_price=tick.last_price,high_price=tick.last_price,low_price=tick.last_price,close_price=tick.last_price,open_interest=tick.open_interest)else:# 将当前tick的信息更新到bar里self.bar.high_price = max(self.bar.high_price, tick.last_price)if tick.high_price > self.last_tick.high_price:self.bar.high_price = max(self.bar.high_price, tick.high_price)self.bar.low_price = min(self.bar.low_price, tick.last_price)if tick.low_price < self.last_tick.low_price:self.bar.low_price = min(self.bar.low_price, tick.low_price)self.bar.close_price = tick.last_priceself.bar.open_interest = tick.open_interestself.bar.datetime = tick.datetimeif self.last_tick:# 当前品种、全天交易到当前tick时的成交量,而不是最新的一笔tick的成交量volume_change = tick.volume - self.last_tick.volumeself.bar.volume += max(volume_change, 0)self.last_tick = tick
2.2 update_bar
update_bar函数的角色相当于一个调度员,根据self.interval的粒度来选择是调用self.update_bar_minute_window还是调用self.update_bar_hour_window,最终相应粒度的bar数据合成完成之后,都会回调self.on_window_bar函数。
def update_bar(self, bar: BarData) -> None:"""Update 1 minute bar into generator"""if self.interval == Interval.MINUTE:self.update_bar_minute_window(bar)else:self.update_bar_hour_window(bar)
2.3 update_bar_minute_window
update_bar_hour_window的实现逻辑和update_tick的逻辑差不多,它在一个函数实现了缓存x分钟的逻辑,主要是缓存1分钟bar的逻辑在update_tick中已经实现了。update_bar_minute_window的逻辑就是将一分钟bar积累起来,当累积的数目达到目标self.window时,进行推送。
def update_bar_minute_window(self, bar: BarData) -> None:""""""# If not inited, create window bar objectif not self.window_bar:dt = bar.datetime.replace(second=0, microsecond=0)self.window_bar = BarData(symbol=bar.symbol,exchange=bar.exchange,datetime=dt,gateway_name=bar.gateway_name,open_price=bar.open_price,high_price=bar.high_price,low_price=bar.low_price)# Otherwise, update high/low price into window barelse:self.window_bar.high_price = max(self.window_bar.high_price,bar.high_price)self.window_bar.low_price = min(self.window_bar.low_price,bar.low_price)# Update close price/volume into window barself.window_bar.close_price = bar.close_priceself.window_bar.volume += int(bar.volume)self.window_bar.open_interest = bar.open_interest# Check if window bar completedif not (bar.datetime.minute + 1) % self.window:self.on_window_bar(self.window_bar)self.window_bar = None# Cache last bar objectself.last_bar = bar
2.4 update_bar_hour_window
update_bar_hour_window的实现逻辑和update_tick的逻辑也差不多,只是将分钟级别的合成升级到小时级别的合成。如果self.hour_bar为None,由新建一个BarData对象;如果self.hour_bar不空None,则判断当前接收bar的分钟是不是第59分钟,如果是,则说明1小时bar合成完成,缓存进finished_bar中,后面给推送出去,并把self.hour_bar清空;如果当前接收的bar是位于新的一个小时,则把缓存的一小时bar推送出去,开启新的一小时bar缓存;其他情况,则说明在当前一小时内,继续累积更新缓存的bar;接着,如果finished_bar不为None,则说明有可以推送出去的bar,调用self.on_hour_bar,继续进行累积x小时bar的逻辑;最后,把当前接收到的bar数据缓存到self.last_bar中。
def update_bar_hour_window(self, bar: BarData) -> None:""""""# If not inited, create window bar objectif not self.hour_bar:dt = bar.datetime.replace(minute=0, second=0, microsecond=0)self.hour_bar = BarData(symbol=bar.symbol,exchange=bar.exchange,datetime=dt,gateway_name=bar.gateway_name,open_price=bar.open_price,high_price=bar.high_price,low_price=bar.low_price)returnfinished_bar = None# If minute is 59, update minute bar into window bar and pushif bar.datetime.minute == 59:self.hour_bar.high_price = max(self.hour_bar.high_price,bar.high_price)self.hour_bar.low_price = min(self.hour_bar.low_price,bar.low_price)self.hour_bar.close_price = bar.close_priceself.hour_bar.volume += int(bar.volume)self.hour_bar.open_interest = bar.open_interestfinished_bar = self.hour_barself.hour_bar = None# If minute bar of new hour, then push existing window barelif bar.datetime.hour != self.hour_bar.datetime.hour:finished_bar = self.hour_bardt = bar.datetime.replace(minute=0, second=0, microsecond=0)self.hour_bar = BarData(symbol=bar.symbol,exchange=bar.exchange,datetime=dt,gateway_name=bar.gateway_name,open_price=bar.open_price,high_price=bar.high_price,low_price=bar.low_price)# Otherwise only update minute barelse:self.hour_bar.high_price = max(self.hour_bar.high_price,bar.high_price)self.hour_bar.low_price = min(self.hour_bar.low_price,bar.low_price)self.hour_bar.close_price = bar.close_priceself.hour_bar.volume += int(bar.volume)self.hour_bar.open_interest = bar.open_interest# Push finished window barif finished_bar:self.on_hour_bar(finished_bar)# Cache last bar objectself.last_bar = bar
2.5 on_hour_bar
on_hour_bar就和update_bar_minute_window的逻辑一样了,将由update_bar_hour_window产生的一小时bar积累起来,当累积数目达到目标self.window时,进行推送。
def on_hour_bar(self, bar: BarData) -> None:""""""if self.window == 1:self.on_window_bar(bar)else:if not self.window_bar:self.window_bar = BarData(symbol=bar.symbol,exchange=bar.exchange,datetime=bar.datetime,gateway_name=bar.gateway_name,open_price=bar.open_price,high_price=bar.high_price,low_price=bar.low_price)else:self.window_bar.high_price = max(self.window_bar.high_price,bar.high_price)self.window_bar.low_price = min(self.window_bar.low_price,bar.low_price)self.window_bar.close_price = bar.close_priceself.window_bar.volume += int(bar.volume)self.window_bar.open_interest = bar.open_interestself.interval_count += 1if not self.interval_count % self.window:self.interval_count = 0self.on_window_bar(self.window_bar)self.window_bar = None
2.6 generate
generate函数的作用就是强制把当前缓存着的bar推送出去。
def generate(self) -> Optional[BarData]:"""Generate the bar data and call callback immediately."""bar = self.barif self.bar:bar.datetime = bar.datetime.replace(second=0, microsecond=0)self.on_bar(bar)self.bar = Nonereturn bar
3 ArrayManager
ArrayManager是时间序列容器,用于按时间序列缓存bar数据,提供技术指标的计算。ArrayManager的整体结构图(来源于《全实战进阶系统 - CTA策略》)如下图所示,它提供的函数分为四类:init函数、update_bar、@property函数、技术指标函数。
- init函数中定义了缓存K线的计数count、需要缓存的最小K线数量size、是否缓存足够K数据标识inited。
- update_bar函数采用切片平移的方式更新最新数据,当K线数量达到size大小时,将inited设置为true。
- @property函数提供了访问缓存K线数据的方式。
- 技术指标函数提供了利用缓存K线数据计算指标的方法,它们既可以返回最新一个周期的指标值,也可以返回计算出的所有指标序列,可以通过参数array控制。
学习资料
- vn.py community 《全实战进阶系统 - CTA策略》课程8-K线自定义合成
【vn.py学习笔记(八)】vn.py utility、BarGenerator、ArrayManager源码阅读相关推荐
- 【vn.py学习笔记(五)】vn.py Base、Log、Oms、Email Engine源码阅读
[vn.py学习笔记(五)]vn.py Base.Log.Oms.Email Engine源码阅读 写在前面 1 BaseEngine 2 LogEngine 3 OmsEngine 3.1 构造函数 ...
- Mr.J-- jQuery学习笔记(三十二)--jQuery属性操作源码封装
扫码看专栏 jQuery的优点 jquery是JavaScript库,能够极大地简化JavaScript编程,能够更方便的处理DOM操作和进行Ajax交互 1.轻量级 JQuery非常轻巧 2.强大的 ...
- resnet50网络结构_学习笔记(一):分析resnet源码理解resnet网络结构
最近在跑实验的过程中一直在使用resnet50和resnet34,为了弄清楚网络的结构和原理的实现,打开resnet的源码进行了学习. 残差网络学习的原理 针对神经网络过深而导致的学习准确率饱和甚至是 ...
- 【学习笔记】Keras库下的resnet源码分析
Keras库下的resnet源码应用及解读 其实我也不知道这种东西有没有写下来的必要,但是跑代码的时候总摸鱼总归是不好的.虽然很简单,不过我也大概做个学习记录,写给小白看的.源码来自keras文档,大 ...
- Caffe学习笔记(4)--------用师兄的源码都差点没跑通觉得自己智商真的捉急!...
在自己不知道哪个旮旯荡了一个不知道什么版本的VGG16更改多次未曾成功之后... 又厚着脸皮去找师兄求教了... 结果发现是自己荡的模型都不对...而且数据输入的格式也不对...然后师兄很无奈的把自己 ...
- Linux学习笔记15—RPM包的安装OR源码包的安装
RPM安装命令 1. 安装一个rpm包 rpm –ivh 包名 "-i" : 安装的意思 "-v" : 可视化 "-h" : 显示安装进度 ...
- Linux内核学习笔记——内核页表隔离KPTI机制(源码分析)
KPTI(Kernel PageTable Isolation)全称内核页表隔离,它通过完全分离用户空间与内核空间页表来解决页表泄露. KPTI中每个进程有两套页表--内核态页表与用户态页表(两个地址 ...
- 小迪渗透测试学习笔记(四)基础入门-WEB源码拓展
前言: WEB源码在安全测试中是非常重要的信息来源,可以用来代码审 计漏洞也可以用来做信息突破口,其中WEB源码有很多技术需要简明分析. 比如:获取某ASP源码后可以采用默认数据库下载为突破,获取某其 ...
- 【vn.py学习笔记(二)】vn.py底层接口 学习笔记
[vn.py学习笔记(二)]vn.py底层接口 学习笔记 1 CTP API的工作原理 1.1 CTP介绍 1.2 API功能介绍 1.3 CTP API文件 1.4 API 通用规则 2 CTP A ...
最新文章
- 调用vba_Python VS JavaScript,谁将是替代VBA最好语言
- FD_CLOEXEC用法及原因_转
- 设计模式的理解:状态模式(State) 和备忘录模式(Memento)
- storm目录结构及在zk中的目录结构
- 安卓欢迎界面和activity之间的跳转问题
- html中的文本格式化标签+多媒体标签+关于IE浏览器兼容的问题(干货!)
- [转载] 希腊字母读音表
- 20145236 《Java程序设计》 第6周学习总结
- 软件测试必学之python+unittest+requests+HTMLRunner编写接口自动化测试集
- Linux入门-安装篇(Debian 服务器版)
- 找工作,如何写好一份漂亮的简历,给你借鉴一下
- [转载]浅谈敏捷管理在软件项目中应用
- python实现微信自动发信息软件_Python实现给微信好友自动发送消息的示例
- 如何进行企业设备管理?
- 无心剑中译阿道司.赫胥黎《冥思月亮》
- 超火的微信渐变国旗头像,一键生成!!
- c语言正弦函数求导,正弦函数求导公式基本推导
- 【运筹帷幄】关于阿里云服务器自己开启的6010端口
- CMake Error: The current CMakeCache.txt directory is different...
- 树莓派4B安装Ubuntu Mate20.04