说明

更快的切入实战

我希望在开发的过程中也可以看到一些信号或者模拟结果在当前市场下的表现,这样有助于更好的评估方法的健壮性,特性等。

例如,我希望持续的看到
这样的图(to 2023-3-6)




从上图,以及模拟交易的结果中,我可以对于信号的效果作出评估,也会产生一些新的想法(例如统计过去一段时间内买卖信号的多寡),同时也可以看到按信号产生的订单在最近市场下是否能起到较好的效果。

所以,先要保证持续的数据流入,才可能自动的给出实时的结果。

  • 1 建立ADBS(QuantData),通过Sniffer持续取数
  • 2 衔接ADBS(QuantData)和ADBS(MyQuantBase),完成数据进入量化ADBS链的动作
  • 3 衔接ADBS(MyQuantBase)和ADBS(MyQuantBaseStep2Signals),完成初步的信号提取
  • 4 衔接ADBS(MyQuantBaseStep2Signals)和ADBS(MyQuantBaseStep3Decisions),形成决策
  • 5 衔接ADBS(MyQuantBaseStep3Decisions)和ADBS(MyQuantBaseStep4Trades),形成交易

实际上就是在认为的观察Trades的结果,最终提取出一些规则,然后交给最后一个ADBS替代人进行观察

  • 6 衔接ADBS(MyQuantBaseStep4Trades)和ADBS(MyQuantBaseStep5Watch)

未来,真实的发出交易指令是在决策环节。决策决定要不要买,交易则是具体成没成交。

上面是把真个链条的内容串起来了,本次要做的是第一步的内容。

内容

1 建立ADBS

  • 1 拷贝其他项目的配置文件(config_base.py), 一般来说修改项目名(同时也是数据库名)就可以了;如果是是基于其他服务,还需要改目标服务
  • 2 拷贝初始化文件(init_projects_base.py),一般来说不太需要修改,但是要根据新增的字段建立索引。

本次,在之前market、code和data_slot之外,还要加上 data_source字段的索引。

使用通用的镜像挂载容器进行执行,很快就可以执行完毕,这种操作比较安全(只要项目名也就是数据库名别给错)

2 适配对应的Sniffer

Sniffer的主要职责是拉数(要生成主键)。先试一下效果

速度还是挺快的,除了没有成交单数,和rq的数据看起来一样。所以将镜像升级一下,把包装上

具体的做法是用当前镜像打开一个容器,执行pip,然后tag新版本,再次提交

pip install  akshare
pip install easyquotation

docker的镜像是有层数限制的(好像是110多层),虽然目前还没有达到,还是要稍微注意

...
7a694df0ad6c: Layer already exists
3fd9df553184: Layer already exists
805802706667: Layer already exists'''.count('Layer already exists')30

然后对于ak,分钟数据似乎只能提供5天之内的,我觉得也足够了。历史的数据我可以用rq的数据一次手动补满,然后还可以不定期的手动上传进行校验。

  • ak : 数天内的实时源
  • rq : 数年内的历史源

总体上rq的数据质量应该还是更可靠的,不过初期不必考虑,等我年收益过10万了再说。还有一个很重要的点,我怀疑去做个股是劳多而功少的行为。可能存在这样的假设:对于大量的个股,少部分投资者能获益是因为随机偏差,而其管理的繁琐,以及数据的不及时/不可靠带来的成本过高

所以ETF类的标的是更好的选择,当然,如果不存在这样的ETF,本质上是要构造这样一个组合的,这时候就会买入短期内固定配比的一篮子股票。

我还是希望找到一组或者一对,周期互补的ETF组合来进行交易的。

回到实际的工作,接下来怎么做呢?

实时的sniffer,在工作日的上午9:25分开始,到11:35;下午12:55 到15:05分为止,每个时隙执行查询。

有个细节是,9:30分的数据要不要?因为容易产生噪声,暂时先保留吧,反正我滚动几百分钟,这个可以消弭掉。

既然需要定时执行的部分确定了,那么整个ADBS的就可以开始开发了。手动的部分我觉得可以采用独立与ADBS体系外的单独脚本,调试好,按照规则发往系统入队列就行。

**关于原始数据,其实还有一个比较麻烦的地方,就是除权除息。**这个问题押后考虑吧,主要的工作也就是获得对应的除权除息日和折算关系,然后提前进行计算上的准备,在新的时点进行切换。实时性不高的甚至可以提前清仓,除权日过后重新开仓。(我怀疑可能真有机构这么搞的,至少会减仓)

比起ADBS之间的衔接Sniffer,这种按简单规则从接口取数的Sniffer显然要简单的多。

交易时间限制(先不管节假日,反正每分钟只要10条数据,没关系):


# 获取当前的分钟数
cur_slot = ts2ord()# ts = inverse_time_str('2000-01-01 09:25:00')
# ts = inverse_time_str('2000-01-01 11:35:00')
# ts = inverse_time_str('2000-01-01 12:55:00')
# ts = inverse_time_str('2000-01-01 15:05:00')
# slot = ts2ord(ts) % 1440
morning_start = 565
morning_end = 695
afternoon_start = 775
afternoon_end = 905slot_hour = cur_slot % 1440if (slot_hour >= morning_start and slot_hour <morning_end) or (slot_hour >= afternoon_start and slot_hour<afternoon_end):is_work = True print('【In Worktime】', slot_ord2str(cur_slot))
else:is_work = False print('【Not In Worktime】', slot_ord2str(cur_slot))

在调试的时候转了个圈,发现队列一会有消息,一会没消息。原因是默认的函数里含有了一个另外一个服务器的地址(感觉很像走进科学…)

将sniffer部分进行重写

import osprint('>>>sniffer is Running  ')
runcode =''
for some_app in ['sniffer01_query_akshare.py']:runcode += str(os.system('python3 %s' % some_app))
print('>>>sniffer RunCode :%s ' % runcode)

将sniffer和sniffer01_query_akshare在启动时挂载(覆盖)就完成了Sniffer部分。(注意要调整sniffer的执行周期,之前是默认按天的…)

3 适配 app01_PullToStep1MongoIn

主要就是修改一下使用的Redis Buffer的名称,这个是最早设计的时候没有考虑redis buffer 统计入流量。

这里也是同上,修改名字后挂载就可以。

4 透传 TransWorker

在这个Case中,暂时想不到还要做什么ETL,所以可以写一个透传Worker。原来的DummyWorker有些地方写死,并不好,所以做一个微调,这样就可以实现输入的透传了。

明天先实验将这个ADBS运行起来(虽然worker不通,但是可以入库的)

这里碰到一个小问题,就是AETL默认是不允许数据有空值的,这个在当时的设计有当时的考虑(高稳定),放在这里倒是给自己设了一道坎。

比较合适的解决方式是在sniffer阶段就进行字段的框定和空值的处理,值得注意的是,如果多个数据源都要进入一个ADBS,那么这些数据源只能保持相同的列。

在Sniffer阶段要设计schema,后续的Worker可以透传这个schema。

  • Sniffer有一个特殊的变量 vars_to_handle(代码上非必须,流程上必须)。后续Worker会透传其中的内容。

TransWorker.py

from funcs_apifunc_database_model1_6810f9d37e89e5e1f33e1b8f4defa22e import * gs_id = 'rec_id'
sample_listofdict = [{'amt': '3171687.0','close': '4.025','code': '510300','data_dt': '2023-03-09 13:49:00','data_slot': '27972829','data_source': 'AK','high': '4.025','low': '4.024','market': 'SH','open': '4.025','rec_id': 'AK_SH_510300_27972829','vol': '7880','_ch001': 0}]af = APIFunc('apifunc_database_model1', listofdict= sample_listofdict, key_id=gs_id)chain_funcname_dict = {}# 透传
@af.route('/MergeOut', is_force=True)
def MergeOut(input_dict = None, para_dict = None):msg = RuleMSG('MergeOut')# 1 【不检查字段】# need_cols = ['some_str']# input_cols = list(input_dict.keys())# if not (set(need_cols) <= set(input_cols)):#     msg.status = False #     msg.rule_result = None #     msg.msg = 'InputSetError'#     msg.data = None  #     return msg.to_dict()try:msg.msg ='ok'msg.status = Truemsg.data = input_dictmsg.rule_result = 1except:msg.status = Falsemsg.rule_result = Nonemsg.data = Nonereturn msg.to_dict()# >>> 添加到chain_funcname_dict
MergeOut = {}
MergeOut['subline_dict'] =None
MergeOut['main_dict'] = {'key_id':gs_id, 'depend_cols':['data_dt', 'open', 'close', 'high', 'low', 'vol', 'amt', 'data_source', 'code', 'market', 'data_slot'],'input_cols':['data_dt', 'open', 'close', 'high', 'low', 'vol', 'amt', 'data_source', 'code', 'market', 'data_slot']}
MergeOut['para_dict'] = None
chain_funcname_dict['MergeOut'] = MergeOutTransPassChain_session = ['MergeOut']TransPassChain_list = TransPassChain_session
TransPassChain_dict = {}
for k in TransPassChain_list:TransPassChain_dict[k] = chain_funcname_dict[k]if __name__ =='__main__':af.run_chain(chain_funcname_list=TransPassChain_list,chain_funcname_dict=TransPassChain_dict)af.g['ruleresult_frame'][ [x for x in af.g['ruleresult_frame'].columns if x !=gs_id]].sum()

5 修改 app03_PullToStep1MongoOut.py

这里似乎有一个ADBS设计上需要修改的地方:变量类型转换。

原来默认都是字符串,似乎在进行Redis批量写入的时候都转为字符串了。

现在希望能够将变量进行数值类型的转换,因为在使用时,筛选起来比较容易。

在两个地方可以做这个操作,一个是拉到MongoIn的过程,还有一个是拉到MongoOut的过程

保险起见,我们在这两个过程都对数值型的变量进行声明(为Float)

具体的做法就是通过重写一个方法S2M( Stream To Mongo)

...# Pattern: 从队列获取数据,存储到Mongo,然后删除。适用于新增数据。 Stream到Mongo【队列Write相关-删除消息】def S2M(self, stream_name,keyname =None, tier1 = None, tier2 = None,w = None, batch_num = None,sniffer_name = None,log_tier1 = None, log_tier2 = None,time_out = None,redis_agent = None,connection_hash =None,is_return_msg_id_list=False ):must_cols = ['tier1','tier2','sniffer_name','keyname']assert all(must_cols), ','.join(must_cols) + ' 不能为空'cur_w = w or self.w cur_redis_agent = redis_agent or self.redis_agentbatch_num = batch_num or 1msg =''log_tier1 = log_tier1 or tier1 log_tier2 = log_tier2 or 'log_sniffer' + get_time_str2()# 默认三十秒超时time_out = time_out or 30tick1 = time.time()qname = stream_namecur_len_resp = req.post(redis_agent + 'len_of_queue/',json ={'stream_name':qname,'connection_hash':connection_hash}).json()q_len = cur_len_resp['data']print('{} Q has {} Messages' .format (qname,q_len))if q_len :q_data_resp = self.xrange(qname, count = batch_num, redis_agent = redis_agent, connection_hash = connection_hash)q_data = pd.DataFrame(q_data_resp['data'])# q_data = pd.DataFrame(cur_lq.xrange(qname, count=batch_num))msg_id_list = list(q_data['_msg_id'])# 去重q_data1 = q_data.drop_duplicates([keyname], keep='last')# 脱掉自动变量:在这种情况,不需要声明其他参数(通道是默认的_ch001)q_data2 = self.filter_and_add_msg(q_data1)q_listofdict = q_data2.to_dict(orient='records')q_db_resp = cur_w.insert_or_update_with_key(tier1 = tier1, tier2 = tier2, data_listofdict = q_listofdict, key_name=keyname)# 完成后删除消息del_cnt = self.xdel(qname, mid_or_list = msg_id_list)msg = 'ok,deliver(del) {} messages from {}' .format(del_cnt['data'] ,qname)else:msg ='no data source {}' .format(qname)tick2 = time.time()duration = round(tick2 -tick1,2)log_dict = {'sniffer': sniffer_name,'duration':duration,'msg': msg }return cur_w.insert_recs(tier1=log_tier1, tier2=log_tier2, data_listofdict =[log_dict])

要修改的部分其实不多,增加一个toDoubleVarList,将对应的变量做一个转换即可。使用python对象的继承方法,将这个对象

class StreamsIO_X(StreamsIO):# Pattern: 从队列获取数据,存储到Mongo,然后删除。适用于新增数据。 Stream到Mongo【队列Write相关-删除消息】def S2M(self, stream_name,keyname =None, tier1 = None, tier2 = None,w = None, batch_num = None,sniffer_name = None,log_tier1 = None, log_tier2 = None,time_out = None,redis_agent = None,connection_hash =None,is_return_msg_id_list=False, toDoubleVarList = [] ):...# - toDoubleVarListif len(toDoubleVarList):for _var in toDoubleVarList:q_data2[_var] = q_data2[_var].apply(float)

app03_PullToStep1MongoOut.py

...
sio = StreamsIO_X('sio', w = cur_w ,redis_agent = redis_agent_host)
toDoubleVarList = ['open', 'close', 'high', 'low', 'vol', 'amt','data_slot']sio.S2M(work_out_stream,keyname= keyname, tier1= tier1,tier2=tier2,w =cur_w, batch_num =batch_num,sniffer_name=sniffer_name,log_tier2 =log_tier2, redis_agent = redis_agent_host,toDoubleVarList=toDoubleVarList)

同理,也可以修改app01_PullToStep1MongoIn.py(当时app01并没有直接使用S2M方法)

...# - toDoubleVarListif len(toDoubleVarList):for _var in toDoubleVarList:q_data2[_var] = q_data2[_var].apply(float)

6 补充

如果一开始不小心将mongo的变量格式搞错了,那么可以这么修改

import pymongo
lmongo=pymongo.MongoClient(host='172.17.0.1',port=111,username='aaa',password='bbb',authSource='admin',authMechanism='SCRAM-SHA-1')lmongo.list_database_names()# 声明数据库和集合
the_collection = lmongo['MyQuantBase']['step1_mongo_in']for varname in sorted(['amt','close','data_slot','high','low','open','vol']):command_dict = {'%s' % varname :{'$exists': True, '$type': 'string'}} command_list = [{'$set':{'%s' % varname  :  { '$toDouble': '$%s' % varname }}}]res = the_collection.update_many(command_dict, command_list)print(res.acknowledged)

7 总结

  • 1 本次打通了实时的分钟级取数
  • 2 对ADBS的规范进行了一些强化:例如入数据的变量要确定,不可缺失。同时对透传的Worker进行了一些适配性更改。
  • 3 考虑到有一些字段是要作为过滤器的,而经过Redis队列,很多变量似乎都变字符了。所以在两个ToMongo的App(01,03)增加了转浮点的部分。(在以后的ADBS升级中可以考虑将这些固化到配置文件中)

Python 算法交易实验55 ADBS:QuantData相关推荐

  1. Python 算法交易实验56 ADBS:QuantData-灌入离线数据

    说明 上一回说到,通过ADBS构建了一个分钟级的实时数据源.这次打算将RQ的静态数据也灌入这个ADBS. 内容 1 下载数据 start_date = '2000-01-01' end_date = ...

  2. Python 算法交易实验49 Step1 DataETL

    说明 万丈高楼平地起 按照前面的规划,开始有序推进我的[15% 资金加速器]计划.这一步是通过某个源,获取分钟级数据,然后送到第一个ADBS. Sniffer : 读取数据并发送到入队列.一开始我会把 ...

  3. python算法交易工程师_清华编程高手尹成带你基于算法实践python量化交易

    清华编程高手尹成带你基于算法实践python量化交易 量化交易是指以先进的数学模型替代人为的主观判断,利用计算机技术从庞大的历史数据中海选能带来超额收益的多种"大概率"事件以制定策 ...

  4. 实战:基于技术分析的Python算法交易

    译者 | Tianyu 出品 | AI科技大本营(ID:rgznai100) 本文是用 Python 做交易策略回测系列文章的第四篇.上个部分介绍了以下几个方面内容: 介绍了 zipline 回测框架 ...

  5. 金融科技、算法交易、量化金融必读书:Python金融大数据分析第2版

    银行本质上是技术公司. --胡戈•班齐格 近来,Python无疑是金融业的重要策略性技术平台之一.到2018年底,这已经不再是个问题:全世界的金融机构现在都尽最大努力利用Python及其强大的数据分析 ...

  6. Python算法:决策树分类

    Python算法:决策树分类 文章目录 Python算法:决策树分类 一.前言 二.决策树算法原理介绍 1.决策树原理 2.决策树构造 3.交叉验证 三.决策树算法函数介绍 1. train_test ...

  7. python实现蒙特卡洛算法_用Python实现基于蒙特卡洛算法小实验

    用Python实现基于蒙特卡洛算法小实验 蒙特卡洛算法思想 蒙特卡洛(Monte Carlo)法是一类随机算法的统称,提出者是大名鼎鼎的数学家冯· 诺伊曼 ,他在20世纪40年代中期用驰名世界的赌城- ...

  8. python实验原理_Python实现蒙特卡洛算法小实验过程详解

    蒙特卡洛算法思想 蒙特卡洛(Monte Carlo)法是一类随机算法的统称,提出者是大名鼎鼎的数学家冯·诺伊曼,他在20世纪40年代中期用驰名世界的赌城-摩纳哥的蒙特卡洛来命名这种方法. 通俗的解释一 ...

  9. python统计套利_清华编程高手尹成带你基于算法实践python量化交易

    清华编程高手尹成带你基于算法实践python量化交易 量化交易是指以先进的数学模型替代人为的主观判断,利用计算机技术从庞大的历史数据中海选能带来超额收益的多种"大概率"事件以制定策 ...

最新文章

  1. News Break!沈向洋投资并出任美国版“今日头条”董事长
  2. android的shadowRadius属性说明
  3. 广州的11个辖区_避开人潮,广州7月展览指南,有11个免费
  4. Chrome 开发者工具无法显示服务器正常返回的 HTTP 请求 - Failed to load response data
  5. oracle导出有分区表的用户,分区表导出导入
  6. ssh不能连接 提示WARNING: POSSIBLE DNS SPOOFING DETECTED!处理方法
  7. Linux : Notepad++ 远程连接linux
  8. 1056. 组合数的和(15)-PAT乙级真题
  9. torch.nn.NLLLoss()
  10. Alex 的 Hadoop 菜鸟教程: 第22课 分布式日志收集组件:flume
  11. Go语言 常用日志记录方法
  12. 键盘测试软件 - Vintage Keyboard Analyzer
  13. Android 微信双开
  14. [1151]python连接 redis cluster集群
  15. 手把手教你如何玩转EasyExcel的导入和导出
  16. excel减法函数_数据工作中常用到的EXCEL技巧之文本分析类
  17. int转long Long型
  18. 在联网状态下,有很多网页或者应用无法联网问题,如360安全卫士, Smartscreen筛选器无法访问, 部分网页无法访问等问题的解决方法
  19. esp8266与51单片机通信(看完不会你打我)用手机控制led灯的亮灭
  20. 新浪sae平台wordpress中Buddypress插件上传头像问题

热门文章

  1. .jks文件(JAVA KeyStore)
  2. android中使用百度地图绘制弹出框的覆盖物
  3. c++ 中min和max 函数
  4. Far_planner 代码系列(11)
  5. 看完几十篇“解决remains in conflict报错”的文章后,我决定重新导入项目!(IDEA·SVN)
  6. VSCode打开文件时出现乱码怎么办?
  7. SpringBoot一条龙
  8. SMART S7-200PLC串行自由口通讯(耐压测试仪)
  9. 品牌故事系列之一:APC成长传奇密码:永不停顿的创新
  10. Django sqlite3的实际应用