量化投资之工具篇一:Backtrader从入门到精通(3)-Cerebro源代码解读
前面两篇文章已经一步一步展示了如何使用backtrader以及使用backtrader的一些重要概念和注意事项。但是你要真正灵活地使用backtrader实现自己的策略,还需要了解backtrader各个组成部分。本文开始,对backtrader的类进行详细的说明。为了让大家更能深入了解backtrader的运行机制,咱们基于源代码进行解读。
代码架构中,会用到元类,我们先了解元类在backtrader中的应用。
元类
在backtrader的类定义中,经常出现如下定义:
class Cerebro(with_metaclass(MetaParams, object)):class MetaLineRoot(metabase.MetaParams):class WriterBase(with_metaclass(bt.MetaParams, object))
里面经常出现的metaclass,就是元类的意思。
在backtrader中,基本上所有类从元类继承。我们从顶往下看,先看metabase类。
class MetaBase(type):def doprenew(cls, *args, **kwargs):return cls, args, kwargsdef donew(cls, *args, **kwargs):_obj = cls.__new__(cls, *args, **kwargs)sreturn _obj, args, kwargsdef dopreinit(cls, _obj, *args, **kwargs):return _obj, args, kwargsdef doinit(cls, _obj, *args, **kwargs):_obj.__init__(*args, **kwargs)return _obj, args, kwargsdef dopostinit(cls, _obj, *args, **kwargs):return _obj, args, kwargsdef __call__(cls, *args, **kwargs):cls, args, kwargs = cls.doprenew(*args, **kwargs)_obj, args, kwargs = cls.donew(*args, **kwargs)_obj, args, kwargs = cls.dopreinit(_obj, *args, **kwargs)_obj, args, kwargs = cls.doinit(_obj, *args, **kwargs)_obj, args, kwargs = cls.dopostinit(_obj, *args, **kwargs)return _obj
这个元类类继承type,前面实例中还有object。这些是啥区别?这个就得从python的特性说起。在python中,一切皆为对象。甚至数据类型,以及数值都是对象。不信看看:
print(type(2))
<class ‘int’>
print(isinstance(2, object))
True
print(isinstance(2, int))
True
数值2是int类的实例,也是int超类object的实例。记住:所有类的最顶层基类是object。
print(isinstance(object, type))
Trueprint(isinstance(int, type))
True
这个看出来啥呢?object类是type的实例,所有的类都继承自object,也就是所有类都是type生成的。type就是Python在背后用来创建所有类的元类。是总结一句话:实例由类生成,类由type生成。
那元类又是啥?元类就是用来创建这些类(对象)的,元类就是类的类。
这些关系如下代码即可明了:
name = 'bob'
print("name.__class__ is %s"%name.__class__)
print("name.__class__.__class__ is %s"%name.__class__.__class__)class Bar(object): passprint("Bar.__class__ is %s"%Bar.__class__)
print("Bar.__class__.__class__ is %s"%Bar.__class__.__class__)mybar=Bar()
print("mybar.__class__ is %s"%mybar.__class__)
print("mybar.__class__.__class__ is %s"%mybar.__class__.__class__)
结果如下:
name.__class__ is <class ‘str’>
name.__class__ .__class__is <class ‘type’>
Bar.__class__ is <class ‘type’>
Bar.__class__ .__class__.class is <class ‘type’>
mybar.__class__ is <class ‘main.Bar’>
mybar.__class__ .__class__.class is <class ‘type’>
总之一句话:实例的类是类(创建它的类),类的类是type。
普通类和元类创建类啥区别?比较复杂,我们这里关注影响backtrader架构的的特点。
普通类和元类创建的类的一个重要区别就是:
普通类实例化的时候,先用__new__构造新的空对象,然后调用__init__方法,去初始化这个对象。如果要调用__call__,需要使用实例后加一个括号显式调用。
元类创建类实例化的时候,首先调用元类的__call__,然后才是__new__和__init__,这样有啥好处?由于要先调用__call__,我们可以控制类的生成,比如说,我们可以控制生成类的参数(普通类代码中固定死了)。实际上99%的没啥用。就是构造架构的时候简化下代码,代价是代码太难读了。
回到咱们的MetaBase类,该类定义了doprenew/donew/dopreinit/doinit/dopostinit,其中donew中调用类的__new__创建对象和doinit函数对__init__对对象进行初始化。特别注意__call__函数,直接完成预处理、实例化、初始化,后处理一系列动作,而且这个处理不需要显式调用,在类进行实例化的时候就会完成。所有MetaBase(或者其子类)创建的类都会先调用这个函数,包括Cerebro、lineseries、strategies、broker、indicator等等。
这些功能类并没有继承MetaBase,而是继承MetaBase的子类MetaParams:
class MetaParams(MetaBase):def __new__(meta, name, bases, dct):# Remove params from class definition to avoid inheritance# (and hence "repetition")newparams = dct.pop('params', ())packs = 'packages'newpackages = tuple(dct.pop(packs, ())) # remove before creationfpacks = 'frompackages'fnewpackages = tuple(dct.pop(fpacks, ())) # remove before creation# Create the new class - this pulls predefined "params"cls = super(MetaParams, meta).__new__(meta, name, bases, dct)# Pulls the param class out of it - default is the empty classparams = getattr(cls, 'params', AutoInfoClass)# Pulls the packages class out of it - default is the empty classpackages = tuple(getattr(cls, packs, ()))fpackages = tuple(getattr(cls, fpacks, ()))# get extra (to the right) base classes which have a param attributemorebasesparams = [x.params for x in bases[1:] if hasattr(x, 'params')]# Get extra packages, add them to the packages and put all in the classfor y in [x.packages for x in bases[1:] if hasattr(x, packs)]:packages += tuple(y)for y in [x.frompackages for x in bases[1:] if hasattr(x, fpacks)]:fpackages += tuple(y)cls.packages = packages + newpackagescls.frompackages = fpackages + fnewpackages# Subclass and store the newly derived params classcls.params = params._derive(name, newparams, morebasesparams)return clsdef donew(cls, *args, **kwargs):clsmod = sys.modules[cls.__module__]# import specified packagesfor p in cls.packages:if isinstance(p, (tuple, list)):p, palias = pelse:palias = ppmod = __import__(p)plevels = p.split('.')if p == palias and len(plevels) > 1: # 'os.path' not aliasedsetattr(clsmod, pmod.__name__, pmod) # set 'os' in moduleelse: # aliased and/or dotsfor plevel in plevels[1:]: # recurse down the modpmod = getattr(pmod, plevel)setattr(clsmod, palias, pmod)# import from specified packages - the 2nd part is a string or iterablefor p, frompackage in cls.frompackages:if isinstance(frompackage, string_types):frompackage = (frompackage,) # make it a tuplefor fp in frompackage:if isinstance(fp, (tuple, list)):fp, falias = fpelse:fp, falias = fp, fp # assumed is string# complain "not string" without fp (unicode vs bytes)pmod = __import__(p, fromlist=[str(fp)])pattr = getattr(pmod, fp)setattr(clsmod, falias, pattr)for basecls in cls.__bases__:setattr(sys.modules[basecls.__module__], falias, pattr)# Create params and set the values from the kwargsparams = cls.params()for pname, pdef in cls.params._getitems():setattr(params, pname, kwargs.pop(pname, pdef))# Create the object and set the params in place_obj, args, kwargs = super(MetaParams, cls).donew(*args, **kwargs)_obj.params = params_obj.p = params # shorter alias# Parameter values have now been set before __init__return _obj, args, kwargs
需要重点关注的是donew函数,在这个函数里面分别提取package,frompackage和params并赋值,前面两个对我们影响不大,关键是params(最后几行代码)。
如何提取参数了?比如在Cerebro类中,参数定义为元组:
params = (('preload', True),('runonce', True),('maxcpus', None),('stdstats', True),('oldbuysell', False),('oldtrades', False),('lookahead', 0),('exactbars', False),('optdatas', True),('optreturn', True),('objcache', False),('live', False),('writer', False),('tradehistory', False),('oldsync', False),('tz', None),('cheat_on_open', False),('broker_coo', True),('quicknotify', False),)
用起来就不方便了,这里通过setattr直接赋值到具体的独立参数里,方便进行访问。
总结,元组在backtrader中的应用(目前看出来的):
- 所有类实例化的时候通过原来的__call__完成,不同的类可以进行一些特殊化的处理。
- 参数通过donew完成映射。
总之,元类在backtrader中就是做繁琐无聊的工作,让具体功能类专注于功能的实现。
下面开始具体类的分析,先从backtrader中最重要的Cerebro开始。
Cerebro
cerebro是backtrader系统的中心控制系统,主要的工作包括:
- 收集所有的输入(data feeds),执行者(strategies),观测者(Observers)、评价者(Analyzers)以及文档(Writers),保证系统在任何时候都正常运行。
- 执行回测或者实时数据输入以及交易。
- 返回结果。
- 画图。
下面我们一步一步地结合源代码来分析Cerebro运行机制。
特别说明下:
- Cerebro只是搭建了运行的架构,很多细节都是在组件中实现,这里重点关注Cerebro的运行机制。
- 另外,还有一部分代码过于非常琐碎,对我们使用影响不大,也会省略。
- 当然还有一些代码我现在也没看到应用场景,或者看错了,先记录于此,后续随着深入学习后再更正和补充。
Cerebro的初始化
Cerebro通过如下代码进行初始化:
cerebro = bt.Cerebro(**kwargs)
这里实例化了一个Cerebro,首先调用是Cerebro类的__init__函数(再此之前还会调用元类的一些处理,例如参数的统一处理,参见前述元类的介绍):
def __init__(self):self._dolive = Falseself._doreplay = Falseself._dooptimize = Falseself.stores = list()self.feeds = list()self.datas = list()self.datasbyname = collections.OrderedDict()self.strats = list()self.optcbs = list() # holds a list of callbacks for opt strategiesself.observers = list()self.analyzers = list()self.indicators = list()self.sizers = dict()self.writers = list()self.storecbs = list()self.datacbs = list()self.signals = list()self._signal_strat = (None, None, None)self._signal_concurrent = Falseself._signal_accumulate = Falseself._dataid = itertools.count(1)self._broker = BackBroker()#初始化一个缺省的broker实例。self._broker.cerebro = selfself._tradingcal = None # TradingCalendar()self._pretimers = list()self._ohistory = list()self._fhistory = None
可以看出,初始化函数只是设置了类属性(只描述公共属性,私有属性主要用于类函数实现,具体用到的时候在讨论),这些属性如下表所示,具体如何使用我们在。
名称 | 定义 | 说明 |
---|---|---|
stores | list | 用于存储接收到的其他组件(例如data)消息(通知)。 |
feeds | list | 用于存储数据源(data Feeds)的基类。 |
datas | list | 用于存储数据源。那这个和feeds啥区别?区别就是feeds是data的基类。前面说了,整个框架用了大量类的方法,很复杂,但是简化了应用。我们主要关注应用方法。两者之间的的关系,我们后续在data类中再详述。 |
datasbyname | OrderedDict | 就是存储数据源的名称。存储的方式是有序的字典(python中,字典dict是没有顺序的)。 |
strats | list | 用于存储策略(strategies)类 |
optcbs | list | 存储优化策略的回调。对执行完的优化策略进行处理的回调。啥是优化策略?本系列文章1有过介绍。 |
observers | list | 保存observers类,具体信息在介绍该类的时候再说明。由于Cerebro是整个系统的中心,需要控制多个部件协调行动,所以需要保存这些部件的类(或实例)。 |
analyzers | list | 保存analyzers类,具体信息在介绍该类的时候再说明。 |
indicators | list | 保存indicators类,具体信息在介绍该类的时候再说明。 |
writers | list | 保存writers类,具体信息在介绍该类的时候再说明。 |
storecbs | list | 保存对notify_store消息处理的回调.对于收到存储在store消息中可以定制处理过程。 |
datacbs | list | 保存对data类消息处理的回调。同上类似。 |
signals | list | 保存信号。这种主要用于通过signal strategy进行回测的机制。 |
sizers | dict | 保存sizers类,具体信息在介绍该类的时候再说明。 |
Cerebro类还支持一系列的参数(这些参数在元类中统一进行处理),实例化的时候通过**kwargs传递,参数以元组的元组形式定义,如下:
params = (('preload', True),('runonce', True),('maxcpus', None),('stdstats', True),('oldbuysell', False),('oldtrades', False),('lookahead', 0),('exactbars', False),('optdatas', True),('optreturn', True),('objcache', False),('live', False),('writer', False),('tradehistory', False),('oldsync', False),('tz', None),('cheat_on_open', False),('broker_coo', True),('quicknotify', False),)
具体信息如下表所示:
参数 | 取值范围 | 缺省值 | 含义 |
---|---|---|---|
preload | TrueFalse | TRUE | 是否为Strategies预加载传递给Cerebro的不同数据源。通常我们选择为TRUE。 |
runonce | TrueFalse | True | 前面描述过,runonce是indicators类对数据访问方法的优化以加快速度,使用的是矢量化模式来提高算法的运行速度,后续如果有大量数据需要进行回测的时候将有极大的优势。strategies和Observers通常是基于事件对数据进行处理。 |
maxcpus | None->运行系统可用的核 | None | 指定可以用于优化的CPU核。 |
stdstats | TrueFalse | True | 如果为True的话为创建已缺省的Observers,包括Broker,Buysell和Trades。 |
oldbuysell | TrueFalse | FALSE | 创建Observers的时候,使用新的bugsell还是之前实现的。没有特殊需求,我们使用新代码。 |
oldtrades | TrueFalse | FALSE | 创建Observers的时候,使用新的bugsell还是之前实现的。没有特殊需求,我们使用新代码。 |
lookahead | 数值 | 0 | 这个主要用于扩展数据的时候,扩大数据的缓存的大小。通常设置缺省值即可。 |
exactbars | False,数值 | False | 这里用于指示如何缓存数据以节约内存。False就是Lines对象数据都会加入到内存以方便计算。如果采用其他值,会有一些特殊处理减少内存,可能对画图,runonce等机制造成影响。咱们现在机器这么强,内存这么大,而且白菜价,就不要节约了,缺省False就好。 |
optdatas | True False | True | 这个也是速度处理的优化机制,主要是数据预装载、runonce等,可以提高效率。设置成缺省值就行了 |
optreturn | True False | True | 同上,也是对不同类的优化机制,可以提高效率,设置为缺省值即可。 |
objcache | True False | False | 一种实验性质的方法,没啥用,缺省值。 |
live | True False | False | 是否实时输入数据,这个咱们用不上,缺省值。 |
writer | True False | False | 设置为True的话,为自动创建一个writer,缺省输出日志到stdout。 |
tradehistory | True False | False | 设置为True的话,会记录所有策略的每一次交易信息。当然也可以在strategies里面设置 |
oldsync | True False | False | 新版本之后(1.9.0.99之后)提供新来的datas同步机制。如果你要用老的同步机制,可以设置为True。谁这么无聊呢?大家都喜新厌旧。 |
tz | None,String | None | 记录时区信息,缺省None,使用的就是UTC,对于中国,就是“UTC+8”。对于回测,时区并不重要。 |
cheat_on_open | True False | False | 这个设置为True的话,会调用strategies的next_open方法,这个方法在next之前调用,主要用于在对订单的评估,可以基于前一天的open价发起一个订单。具体有啥用,还没想到有啥场景,咱们记住这个茬,说不定哪个策略能用到 |
broker_coo | True False | True | 和上面差不多,设置为True,broker调用set_coo开启‘cheat_on_open’。开启这一个参数,上一个参数cheat_on_open也必须打开。 |
quicknotify | True False | False | 就是在next之前发起broker的通知。回测没啥意义,只有实时数据可以快速通知。 |
可以看出,Cerebro基本没做啥实质性的工作,只是初始化了一堆容器,真正的处理还是在run之后。
增加数据源(Data Feeds)
增加数据源最常见的方式就是cerebro.adddata(data),data就是必须是已经实例化的data feed。
例如:
data = bt.feeds.PandasData(dataname=stock_hfq_df, fromdate=start_date, todate=end_date) # 加载数据
cerebro.adddata(data)
也可以是resample和replay
cerebro.resampledata(data,timeframe=bt.TimeFrame.Weeks)
cerebro.replaydatadata(data, timeframe=bt.TimeFrame.Days)
系统可以接受任何数量的data feeds,包括普通的数据以及resample/replay(这俩啥意思?后面Data Feeds再细说)的数据。
下面看adddata代码:
def adddata(self, data, name=None):'''Adds a ``Data Feed`` instance to the mix.If ``name`` is not None it will be put into ``data._name`` which ismeant for decoration/plotting purposes.'''if name is not None:data._name = namedata._id = next(self._dataid)data.setenvironment(self)self.datas.append(data)self.datasbyname[data._name] = datafeed = data.getfeed()if feed and feed not in self.feeds:self.feeds.append(feed)if data.islive():self._dolive = Truereturn data
整个流程简述如下:
- 记录下data的名称
- 分配data的ID,注意,这里从1开始。
- 本Cerebro和data建立关联关系:data调用setenvironment记录自己现在归当前Cerebro管,同时Cerebro将data加入到datas列表中,两人建立关系了。
- 获取data的feed,如果feed有效,加入到feeds列表中。feed是data的基类。
- 最后记录是否实时数据。
再看看resampdata的代码:
def resampledata(self, dataname, name=None, **kwargs):'''Adds a ``Data Feed`` to be resample by the systemIf ``name`` is not None it will be put into ``data._name`` which ismeant for decoration/plotting purposes.Any other kwargs like ``timeframe``, ``compression``, ``todate`` whichare supported by the resample filter will be passed transparently'''if any(dataname is x for x in self.datas):dataname = dataname.clone()dataname.resample(**kwargs)self.adddata(dataname, name=name)self._doreplay = Truereturn dataname
整个函数流程关键点简述如下:
- 首先要判断resample目的dataname(data对象实例)是不是已经保存在datas列表中。如果是已经保存的,那么不能直接resample了,需要clone一个新的data对象。
- 然后就调用data feeds的resample函数对数据进行处理,具体处理方法后续在data feeds类中再讨论。
- resample后的data就直接调用adddata函数来处理了,同时记录._doreplay标记。
replaydata的代码:
def replaydata(self, dataname, name=None, **kwargs):'''Adds a ``Data Feed`` to be replayed by the systemIf ``name`` is not None it will be put into ``data._name`` which ismeant for decoration/plotting purposes.Any other kwargs like ``timeframe``, ``compression``, ``todate`` whichare supported by the replay filter will be passed transparently'''if any(dataname is x for x in self.datas):dataname = dataname.clone()dataname.replay(**kwargs)self.adddata(dataname, name=name)self._doreplay = Truereturn dataname
和resample处理差不多,就是需要使用data的replay函数进行处理后然后加入到Cerebro的datas列表中。
加入策略
Cerebro加入策略就更简单了:
cerebro.addstrategy(TestStrategy)
看看源代码做了啥?也很简单:
def addstrategy(self, strategy, *args, **kwargs):'''Adds a ``Strategy`` class to the mix for a single pass run.Instantiation will happen during ``run`` time.args and kwargs will be passed to the strategy as they are duringinstantiation.Returns the index with which addition of other objects (like sizers)can be referenced'''self.strats.append([(strategy, args, kwargs)])return len(self.strats) - 1
函数注释写得很清楚,就是将strategies类加入到Cerebro用于存储的容器中(strats),返回索引,这个索引可以供其他对象引用。特别注意的是,这里只是存储了strategies类以及传递给它的参数,strategies只有在run的时候才会实例化。
加入其他部件
我们可以理解Cerebro是军队的的指挥中心,要打仗了,先把部队准备好。除了主力部队strategies之外,咱们还得一些配合主力行动的部队,主要包括writer(记录过程),analyzer(分析结果)以及observer(观察交易过程),提供的函数分别是:
- addwriter
- addanalyzer
- addobserver (或者 addobservermulti)
几个函数源代码如下:
def addwriter(self, wrtcls, *args, **kwargs):'''Adds an ``Writer`` class to the mix. Instantiation will be done at``run`` time in cerebro'''self.writers.append((wrtcls, args, kwargs))def addsizer(self, sizercls, *args, **kwargs):'''Adds a ``Sizer`` class (and args) which is the default sizer for anystrategy added to cerebro'''self.sizers[None] = (sizercls, args, kwargs)def addanalyzer(self, ancls, *args, **kwargs):'''Adds an ``Analyzer`` class to the mix. Instantiation will be done at``run`` time'''self.analyzers.append((ancls, args, kwargs))def addobserver(self, obscls, *args, **kwargs):'''Adds an ``Observer`` class to the mix. Instantiation will be done at``run`` time'''self.observers.append((False, obscls, args, kwargs))def addobservermulti(self, obscls, *args, **kwargs):'''Adds an ``Observer`` class to the mix. Instantiation will be done at``run`` timeIt will be added once per "data" in the system. A use case is abuy/sell observer which observes individual datas.A counter-example is the CashValue, which observes system-wide values'''self.observers.append((True, obscls, args, kwargs))
代码都简单,就是将各个部队装入对应的容器,注意,这里装入的都是类以及参数,在run的时候实例化。这些部队各自功能,后续在各自类中详述。
更改broker
大家应该记得初始化的时候实例化了一个缺省的broker,如果你希望提供自己的broker,可以通过如下方法重写:
broker = MyBroker()
cerebro.broker = broker
有人说,broker我记得是私有属性啊,咋直接赋值操作了?这个用的是property(装饰器),等同通过setbroker/getbroker设置/读取。
好了,部队ready,开始run了。
开始run
用户使用如下代码开始run:
result = cerebro.run(**kwargs)
注意,run函数是带参数的。
run函数代码解读
run的源代码如下(比较长,分几段进行解析)
def run(self, **kwargs):'''The core method to perform backtesting. Any ``kwargs`` passed to itwill affect the value of the standard parameters ``Cerebro`` wasinstantiated with.If ``cerebro`` has not datas the method will immediately bail out.It has different return values:- For No Optimization: a list contanining instances of the Strategyclasses added with ``addstrategy``- For Optimization: a list of lists which contain instances of theStrategy classes added with ``addstrategy``'''self._event_stop = False # Stop is requestedif not self.datas:return [] # nothing can be runpkeys = self.params._getkeys()for key, val in kwargs.items():if key in pkeys:setattr(self.params, key, val)# Manage activate/deactivate object cachelinebuffer.LineActions.cleancache() # clean cacheindicator.Indicator.cleancache() # clean cachelinebuffer.LineActions.usecache(self.p.objcache)indicator.Indicator.usecache(self.p.objcache)self._dorunonce = self.p.runonceself._dopreload = self.p.preloadself._exactbars = int(self.p.exactbars)if self._exactbars:self._dorunonce = False # something is saving memory, no runonceself._dopreload = self._dopreload and self._exactbars < 1self._doreplay = self._doreplay or any(x.replaying for x in self.datas)if self._doreplay:# preloading is not supported with replay. full timeframe bars# are constructed in realtimeself._dopreload = Falseif self._dolive or self.p.live:# in this case both preload and runonce must be offself._dorunonce = Falseself._dopreload = Falseself.runwriters = list()# Add the system default writer if requestedif self.p.writer is True:wr = WriterFile()self.runwriters.append(wr)# Instantiate any other writersfor wrcls, wrargs, wrkwargs in self.writers:wr = wrcls(*wrargs, **wrkwargs)self.runwriters.append(wr)# Write down if any writer wants the full csv outputself.writers_csv = any(map(lambda x: x.p.csv, self.runwriters))self.runstrats = list()
要点:
- 记录_event_stop为False,说明开始启动了,需要通过stop来停止。
- 检查是否有数据源(datas),没有的话,没法跑,退出。
- 检查下run函数携带的参数在不在Cerebro的参数表里面,有的话,更新Cerebro实例的参数,说明Cerebro参数在初始化的时候可以设置,在run的时候也可以设置。
- 紧接着就是参数的的处理,根据参数设置一些标记(参数的含义参见前述的表格,绝大部分用不上),清空缓存,为系统运行做好准备。
- 如果参数指示要求提供writers(记录日志),那么就实例化一个缺省的writers。同时看看还有没有其他的writers(通过addwriter添加的),有的话,就实例化。两者都加入到runwriters容器,然后看看这些writers有没有参数要求输出csv,任意一个有这个参数,记录到writers_csv标志中。注意这里Any的用法。
- 初始化运行策略容器(runstrats)以供后用。
下面继续:
if self.signals: # allow processing of signalssignalst, sargs, skwargs = self._signal_stratif signalst is None:# Try to see if the 1st regular strategy is a signal strategytry:signalst, sargs, skwargs = self.strats.pop(0)except IndexError:pass # Nothing thereelse:if not isinstance(signalst, SignalStrategy):# no signal ... reinsert at the beginningself.strats.insert(0, (signalst, sargs, skwargs))signalst = None # flag as not presetnif signalst is None: # recheck# Still None, create a default onesignalst, sargs, skwargs = SignalStrategy, tuple(), dict()# Add the signal strategyself.addstrategy(signalst,_accumulate=self._signal_accumulate,_concurrent=self._signal_concurrent,signals=self.signals,*sargs,**skwargs)if not self.strats: # Datas are present, add a strategyself.addstrategy(Strategy)iterstrats = itertools.product(*self.strats)if not self._dooptimize or self.p.maxcpus == 1:# If no optimmization is wished ... or 1 core is to be used# let's skip process "spawning"for iterstrat in iterstrats:runstrat = self.runstrategies(iterstrat)self.runstrats.append(runstrat)if self._dooptimize:for cb in self.optcbs:cb(runstrat) # callback receives finished strategyelse:if self.p.optdatas and self._dopreload and self._dorunonce:for data in self.datas:data.reset()if self._exactbars < 1: # datas can be full lengthdata.extend(size=self.params.lookahead)data._start()if self._dopreload:data.preload()pool = multiprocessing.Pool(self.p.maxcpus or None)for r in pool.imap(self, iterstrats):self.runstrats.append(r)for cb in self.optcbs:cb(r) # callback receives finished strategypool.close()if self.p.optdatas and self._dopreload and self._dorunonce:for data in self.datas:data.stop()if not self._dooptimize:# avoid a list of list for regular casesreturn self.runstrats[0]return self.runstrats
要点如下:
最前面是处理信号相关功能。如果要处理信号,首先看_signal_strat容器中有没有记录信号策略类(SignalStrategy,通过signal_strategy加入的,至于这个东西的用途,咱们在strategies再介绍)。如果没有的话,看看普通strategies容器strats第一个策略是不是SignalStrategy类(用到了isinstance这个函数),还不是的话(记得把普通策略类返回),就创建一个空的signal_strategy。听起来有点绕吧,总之就是用户自己通过signal_strategy函数添加的signalStrategy,普通strategy容器中的signalStrategy以及缺省创建的signalstrategy,三个按优先顺序,必须插入一个信号策略类,注意还是保存在普通strategies容器strats中,只是参数设置为signal。这里关键要记住的是,如果要使用信号回测方式,那么一定要加一个信号策略类。
如果用户没有设定策略,Cerebro局加一个缺省的strategies类。大家还记得系列文章(1)中最简单程序没有加策略,系统还是一样可以运行。
下面一行代码就比较重要了,使用了product将所有的strategies类存放到iterstrats迭代器中。
紧接着的处理就运行strategy,分为两类处理:
- 如果不进行性能优化,或者参数中maxcpus为1:那么直接按循序调用runstrategies(下一步详述)运行进行策略的运行。如果是优化策略(就是一个策略输入多个参数进行优化评估)的处理。执行完的策略还可以调用回调(回调可通过optcallback增加)进行处理。
- 对于需要进行性能优化的处理:首先对数据进行reset/start/preload的处理(具体处理,后续在data feeds类详述),然后根据maxcpus参数启用多线程,在多线程中调用runstrategies进行策略运行。同样对于优化策略进行回调处理。执行完毕,还得调用data的stop方法。
runstrategies函数代码解读
下面看runstrategies:
def runstrategies(self, iterstrat, predata=False):'''Internal method invoked by ``run```to run a set of strategies'''self._init_stcount()self.runningstrats = runstrats = list()for store in self.stores:store.start()if self.p.cheat_on_open and self.p.broker_coo:# try to activate in brokerif hasattr(self._broker, 'set_coo'):self._broker.set_coo(True)if self._fhistory is not None:self._broker.set_fund_history(self._fhistory)for orders, onotify in self._ohistory:self._broker.add_order_history(orders, onotify)self._broker.start()for feed in self.feeds:feed.start()if self.writers_csv:wheaders = list()for data in self.datas:if data.csv:wheaders.extend(data.getwriterheaders())for writer in self.runwriters:if writer.p.csv:writer.addheaders(wheaders)# self._plotfillers = [list() for d in self.datas]# self._plotfillers2 = [list() for d in self.datas]if not predata:for data in self.datas:data.reset()if self._exactbars < 1: # datas can be full lengthdata.extend(size=self.params.lookahead)data._start()if self._dopreload:data.preload()for stratcls, sargs, skwargs in iterstrat:sargs = self.datas + list(sargs)try:strat = stratcls(*sargs, **skwargs)except bt.errors.StrategySkipError:continue # do not add strategy to the mixif self.p.oldsync:strat._oldsync = True # tell strategy to use old clock updateif self.p.tradehistory:strat.set_tradehistory()runstrats.append(strat)tz = self.p.tzif isinstance(tz, integer_types):tz = self.datas[tz]._tzelse:tz = tzparse(tz)
要点如下:
策略运行计数。
初始化一个容器runningstrats用于存储正在运行的策略。
然后是对部件(Cerebro控制的部队)的参数设置和初始化:信号存储(store)的启动(start);broker参数的设置和启动。feed(feed哪来的?adddata的时候获取的)的启动。Writers的参数设置。
如果predata为否的话,那么现在就开始调用preload预加载数据。部分性能优化情况下,在run的时候就preload了。
下面重点来了:从iterstrat迭代器(run调用本函数带进来的,携带的是加入的strategies及其参数)提取每一个strategy和参数,进行strategy的实例化和初始化,并返回正在执行的strategy的实例,记录到runstrats容器中。如下代码非常重要,使用了元类的方法进行处理(这就是元类的好处,极大的简化处理,代价是难懂):
strat = stratcls(*sargs, **skwargs)
记录下时区,用datas类存储的时区或者参数输入时区。
继续代码:
if runstrats:
#loop separated for claritydefaultsizer = self.sizers.get(None, (None, None, None))for idx, strat in enumerate(runstrats):if self.p.stdstats:strat._addobserver(False, observers.Broker)if self.p.oldbuysell:strat._addobserver(True, observers.BuySell)else:strat._addobserver(True, observers.BuySell,barplot=True)if self.p.oldtrades or len(self.datas) == 1:strat._addobserver(False, observers.Trades)else:strat._addobserver(False, observers.DataTrades)for multi, obscls, obsargs, obskwargs in self.observers:strat._addobserver(multi, obscls, *obsargs, **obskwargs)for indcls, indargs, indkwargs in self.indicators:strat._addindicator(indcls, *indargs, **indkwargs)for ancls, anargs, ankwargs in self.analyzers:strat._addanalyzer(ancls, *anargs, **ankwargs)sizer, sargs, skwargs = self.sizers.get(idx, defaultsizer)if sizer is not None:strat._addsizer(sizer, *sargs, **skwargs)strat._settz(tz)strat._start()for writer in self.runwriters:if writer.p.csv:writer.addheaders(strat.getwriterheaders())if not predata:for strat in runstrats:strat.qbuffer(self._exactbars, replaying=self._doreplay)for writer in self.runwriters:writer.start()# Prepare timersself._timers = []self._timerscheat = []for timer in self._pretimers:# preprocess tzdata if neededtimer.start(self.datas[0])if timer.params.cheat:self._timerscheat.append(timer)else:self._timers.append(timer)if self._dopreload and self._dorunonce:if self.p.oldsync:self._runonce_old(runstrats)else:self._runonce(runstrats)else:if self.p.oldsync:self._runnext_old(runstrats)else:self._runnext(runstrats)for strat in runstrats:strat._stop()
要点如下:
- 开始针对运行的strategy实例,分别加入Observers、indicators、analyzer以及sizer类,并调用start函数开始启动,主要进行一些初始的处理,后续在strategies类中说明。
- 对于运行的writers(存储在runwriters容器中,run的时候实例化),设置表头,并启动。
- 设定定时器,并开始对strategies执行_runonce/_runonce_old/_runnext_old/_runnexth函数,这些后面单独描述。
- 后面一段代码就不贴了,主要就是停止各部件。如果是优化策略运行的话,还要对运行结果进行分析。
下面开始runnext和runonce代码的解读,先看最常用的runnext:
_runnext函数代码解读
def _runnext(self, runstrats):'''Actual implementation of run in full next mode. All objects have its``next`` method invoke on each data arrival'''datas = sorted(self.datas,key=lambda x: (x._timeframe, x._compression))datas1 = datas[1:]data0 = datas[0]d0ret = Truers = [i for i, x in enumerate(datas) if x.resampling]rp = [i for i, x in enumerate(datas) if x.replaying]rsonly = [i for i, x in enumerate(datas)if x.resampling and not x.replaying]onlyresample = len(datas) == len(rsonly)noresample = not rsonlyclonecount = sum(d._clone for d in datas)ldatas = len(datas)ldatas_noclones = ldatas - clonecountlastqcheck = Falsedt0 = date2num(datetime.datetime.max) - 2 # default at maxwhile d0ret or d0ret is None:# if any has live data in the buffer, no data will wait anythingnewqcheck = not any(d.haslivedata() for d in datas)if not newqcheck:# If no data has reached the live status or all, wait for# the next incoming datalivecount = sum(d._laststatus == d.LIVE for d in datas)newqcheck = not livecount or livecount == ldatas_nocloneslastret = False# Notify anything from the store even before moving datas# because datas may not move due to an error reported by the storeself._storenotify()if self._event_stop: # stop if requestedreturnself._datanotify()if self._event_stop: # stop if requestedreturn# record starting time and tell feeds to discount the elapsed time# from the qcheck valuedrets = []qstart = datetime.datetime.utcnow()for d in datas:qlapse = datetime.datetime.utcnow() - qstartd.do_qcheck(newqcheck, qlapse.total_seconds())drets.append(d.next(ticks=False))d0ret = any((dret for dret in drets))if not d0ret and any((dret is None for dret in drets)):d0ret = Noneif d0ret:dts = []for i, ret in enumerate(drets):dts.append(datas[i].datetime[0] if ret else None)# Get index to minimum datetimeif onlyresample or noresample:dt0 = min((d for d in dts if d is not None))else:dt0 = min((d for i, d in enumerate(dts)if d is not None and i not in rsonly))dmaster = datas[dts.index(dt0)] # and timemasterself._dtmaster = dmaster.num2date(dt0)self._udtmaster = num2date(dt0)# slen = len(runstrats[0])# Try to get something for those that didn't returnfor i, ret in enumerate(drets):if ret: # dts already contains a valid datetime for this icontinue# try to get a data by checking with a masterd = datas[i]d._check(forcedata=dmaster) # check to force outputif d.next(datamaster=dmaster, ticks=False): # retrydts[i] = d.datetime[0] # good -> store# self._plotfillers2[i].append(slen) # mark as fillelse:# self._plotfillers[i].append(slen) # mark as emptypass# make sure only those at dmaster level end up deliveringfor i, dti in enumerate(dts):if dti is not None:di = datas[i]rpi = False and di.replaying # to check behaviorif dti > dt0:if not rpi: # must see all ticks ...di.rewind() # cannot deliver yet# self._plotfillers[i].append(slen)elif not di.replaying:# Replay forces tick fill, else force heredi._tick_fill(force=True)# self._plotfillers2[i].append(slen) # mark as fillelif d0ret is None:# meant for things like live feeds which may not produce a bar# at the moment but need the loop to run for notifications and# getting resample and others to produce timely barsfor data in datas:data._check()else:lastret = data0._last()for data in datas1:lastret += data._last(datamaster=data0)if not lastret:# Only go extra round if something was changed by "lasts"break# Datas may have generated a new notification after nextself._datanotify()if self._event_stop: # stop if requestedreturnif d0ret or lastret: # if any bar, check timers before brokerself._check_timers(runstrats, dt0, cheat=True)if self.p.cheat_on_open:for strat in runstrats:strat._next_open()if self._event_stop: # stop if requestedreturnself._brokernotify()if self._event_stop: # stop if requestedreturnif d0ret or lastret: # bars produced by data or filtersself._check_timers(runstrats, dt0, cheat=False)for strat in runstrats:strat._next()if self._event_stop: # stop if requestedreturnself._next_writers(runstrats)# Last notification chance before stoppingself._datanotify()if self._event_stop: # stop if requestedreturnself._storenotify()if self._event_stop: # stop if requestedreturn
要点如下:
- 首先按照周期大小排序datas,将具有最小周期的data作为data0,其他的放到datas1容器中。
- 将resample和replay的数据索引单独保存,做好几个标记。计算clone数据(啥时候clone?加入resample和replay数据的时候都会clone)的个数、非clone数据的个数以及数据总的数量。
- dt0初始化最大值为最大日期(9999年12月31日)的多边格里高利度序数(每一日期对应一个独一无二的数,方便程序处理)。
- 如果所有数据是live或者没有新的数据(数据如何更新,后续讲data类的时候再详述),那么就需要等待(newqcheck这一块的处理)。
- 识别有没有收到stop消息(从store容器中读取),在等待数据的时候,可能因为失败没有进一步的数据。
- 所有的data都会调用do_qcheck(输入等待的时间,至于干啥用的,以后在data类的时候再详述)以及next。
- 后面一段确实比较难以理解,其总体思路是寻找主data(dmaster),通常应该是周期最短的数据。然后其他数据以此数据作为基准。(目前理解有限,后续在解读Data相关类的时候针对此场景再补充分析)
- 在此过程中,需要调用_brokernotify和_datanotify检测是否有停止信号。
- 数据完备(bar成功产生之后),就调用策略的next方法,这样就进入到我们定制策略next中进行处理了。
_runonce函数代码解读
runonce之前专门讲过,其特点是采取矢量模式,直接对数据进行处理,以提高效率。系统缺省采用runonce的方式。
def _runonce(self, runstrats):'''Actual implementation of run in vector mode.Strategies are still invoked on a pseudo-event mode in which ``next``is called for each data arrival'''for strat in runstrats:strat._once()strat.reset() # strat called next by next - reset lines# The default once for strategies does nothing and therefore# has not moved forward all datas/indicators/observers that# were homed before calling once, Hence no "need" to do it# here again, because pointers are at 0datas = sorted(self.datas,key=lambda x: (x._timeframe, x._compression))while True:# Check next incoming date in the datasdts = [d.advance_peek() for d in datas]dt0 = min(dts)if dt0 == float('inf'):break # no data delivers anything# Timemaster if needed be# dmaster = datas[dts.index(dt0)] # and timemasterslen = len(runstrats[0])for i, dti in enumerate(dts):if dti <= dt0:datas[i].advance()# self._plotfillers2[i].append(slen) # mark as fillelse:# self._plotfillers[i].append(slen)passself._check_timers(runstrats, dt0, cheat=True)if self.p.cheat_on_open:for strat in runstrats:strat._oncepost_open()if self._event_stop: # stop if requestedreturnself._brokernotify()if self._event_stop: # stop if requestedreturnself._check_timers(runstrats, dt0, cheat=False)for strat in runstrats:strat._oncepost(dt0)if self._event_stop: # stop if requestedreturnself._next_writers(runstrats)
要点如下:
- 开始三行代码是关键,针对每个strategy,运行_once方法并进行reset。实际上就是将数据索引指向0.
- 仍然将数据按照周期(timeframe)大小排序。
- 调用advance_peek看下一个时间(索引为1,还记得索引为1的含义?),最近时间记为dt0.
- 对于最近时间的的data调用advance(嗯,又是数据的处理,后面再具体讲吧)
- 如果设置了cheat_on_open,那么在策略的next之前调用next_open方法。
- 检测是否收到broker的stop消息。
- 下面开始调用策略的、_oncepost方法,这个方法中会调用到strategy的next方法,怎么玩的?必须要到数据类中探索了。
总的来看,Cerebro的run过程,实际上按照时间驱动数据不断运行,在此过程中协调其他部件协同工作。
画图
最后可以调用plot方法进行可视化输出:
def plot(self, plotter=None, numfigs=1, iplot=True, start=None, end=None,width=16, height=9, dpi=300, tight=True, use=None,**kwargs):'''Plots the strategies inside cerebroIf ``plotter`` is None a default ``Plot`` instance is created and``kwargs`` are passed to it during instantiation.``numfigs`` split the plot in the indicated number of charts reducingchart density if wished``iplot``: if ``True`` and running in a ``notebook`` the charts will bedisplayed inline``use``: set it to the name of the desired matplotlib backend. It willtake precedence over ``iplot````start``: An index to the datetime line array of the strategy or a``datetime.date``, ``datetime.datetime`` instance indicating the startof the plot``end``: An index to the datetime line array of the strategy or a``datetime.date``, ``datetime.datetime`` instance indicating the endof the plot``width``: in inches of the saved figure``height``: in inches of the saved figure``dpi``: quality in dots per inches of the saved figure``tight``: only save actual content and not the frame of the figure'''if self._exactbars > 0:returnif not plotter:from . import plotif self.p.oldsync:plotter = plot.Plot_OldSync(**kwargs)else:plotter = plot.Plot(**kwargs)# pfillers = {self.datas[i]: self._plotfillers[i]# for i, x in enumerate(self._plotfillers)}# pfillers2 = {self.datas[i]: self._plotfillers2[i]# for i, x in enumerate(self._plotfillers2)}figs = []for stratlist in self.runstrats:for si, strat in enumerate(stratlist):rfig = plotter.plot(strat, figid=si * 100,numfigs=numfigs, iplot=iplot,start=start, end=end, use=use)# pfillers=pfillers2)figs.append(rfig)plotter.show()return figs
这个过程比较简单,就是引入plotter模块,针对每一个运行的strategy画图。贴字段代码的目的,就是告诉大家,Cerebro没干啥,都是驱动别人干活。
总结
Cerebro解读完了,读完好像还是一头雾水,没看懂系统到底咋玩的啊!这就对了,因为Cerebor就没干啥,我们需要深入了解Cerebro驱动的部件是如何工作的,反过来你就会理解Cerebor。
关于这个文档的顺序,本来我想从底层部件代码解读,但是想到没有一个整体的框架,看部件的代码也很难理解。所以先从Cerebro开始,从整体上看Backtrader是怎么运行的,再看看每一个部件工作机制以及如何在整个系统中起作用,最后在整体上完全掌握。所以,现在看的不太明白也没啥,只要知道Cerebro的运行顺序,以及在哪里用到什么部件就行了。
下一篇文章我们开始解读data相关类,看看数据是如何保存以及如何驱动的。
量化投资之工具篇一:Backtrader从入门到精通(3)-Cerebro源代码解读相关推荐
- 量化投资之工具篇一:Backtrader从入门到精通(8)-交易系统代码详解
本文将介绍Backtrader的交易系统,包括Order.Broker.Trade和Sizer等和交易相关关键类. Order(订单) 这个有翻译为订单,也有翻译为委托单的,后续统一为订单. 如之前文 ...
- 量化投资之工具篇一:Backtrader从入门到精通(2)-重要概念以及平台的使用技巧
上一篇文章从总体上介绍了backtrader的功能和使用方法,这篇文章将从局部进行更加细致的讨论,为后续我们使用backtrader打下更加坚实的基础. 目录 一些重要概念 数据源(data feed ...
- 量化投资之工具篇一:Backtrader从入门到精通(4)-Data相关类源代码解读
前面的文章通过源代码详解Cerebro是中心系统,负责控制各个部件(例如Strategy,Data feeds,Observers等)进行协同工作. 简而言之,量化最重要的就是两点:数据和策略,其他的 ...
- python 策略回测期货_量化投资实战教程(1)—基于backtrader的简单买入卖出策略
都说Python可以用于量化投资,但是很多人都不知道该怎么做,甚至觉得是非常高深的知识,其实并非如此,任何人都可以在只有一点Python的基础上回测一个简单的策略. Backtrader是一个基于Py ...
- 量化投资实战教程(1)—基于backtrader的简单买入卖出策略
都说Python可以用于量化投资,但是很多人都不知道该怎么做,甚至觉得是非常高深的知识,其实并非如此,任何人都可以在只有一点Python的基础上回测一个简单的策略. Backtrader是一个基于Py ...
- 【答读者问26】量化投资框架哪家强?backtrader vs zipline vs 聚宽 vs 米筐
云子量化免费阅读传送链接 今天有读者咨询一个backtrader与聚宽米筐对比的问题,想要了解下backtrader与米筐聚宽各自的优缺点. 先不谈这个问题,我们回顾下初衷,我们想要用这些框架做什么呢 ...
- 量化投资之宏观篇 | 达里欧谈美国社会的矛盾及如何改良
今天,你AI了没? 关注:决策智能与机器学习,每天学点AI干货 正文共:13520字 16图 预计阅读时间:34分钟,建议收藏阅读 本文来自旁解生活与投资(id:Invest WinEver),作者: ...
- 量化策略研究员 - 工具篇
- 编程知识 [python] - 找思路,做接口,建立策略框架,计算速度不是太慢的情况下是编程主力 [c++] - 计算要求高的情况下用,主要是编写 DLL 计算模块给 python 用 [mysq ...
- 量化投资交易python工具干货大全
http://www.newsmth.net/nForum/#!article/Python/128763 最近程序化交易很热,量化也是我很感兴趣的一块. 国内量化交易的平台有几家,我个人比较喜欢用的 ...
最新文章
- LINUX自旋锁详解
- BZOJ 3168 Luogu P4100 [HEOI2013]钙铁锌硒维生素 (矩阵求逆、二分图匹配)
- 计算机网络协议(一)
- Duilib教程-控件练习
- Redis简单命令(部分示例代码)
- mysql 安装服务 w_MySQL的安装与配置
- C语言警告warning C4018: '' : signed/unsigned mismatch
- 你想要提升前端效率的方法,都在这里
- 北京大学生物信息学(3)
- 网上抢购茅台催生黄牛党:必须严打各类抢购软件
- JUC并发编程学习笔记
- 无设备云控系统(ipad协议)
- 使用webots的MPC的移动机器人轨迹跟踪控制
- 区块链:雷电网络开发预览版
- 数字PID控制算法原理及Matlab仿真
- 易中天品三国之:《大江东去》
- 提醒严肃认真的投机者们注意如下事实
- 抖音姓名测试软件,抖音账号权重查询工具玩抖音的你必须要知道
- CHB-STATCOM 整体仿真模型 低电容下 CHB-STATCOM 直流侧电容电压的平衡控制
- 阿里云服务器安骑士高危漏洞