说明

万丈高楼平地起

按照前面的规划,开始有序推进我的【15% 资金加速器】计划。这一步是通过某个源,获取分钟级数据,然后送到第一个ADBS。

Sniffer : 读取数据并发送到入队列。一开始我会把文件以离线形式上传到某个folder,所以sniffer读文件源,后续我会找到接口源。

嗯,先吐槽一下云服务商,去年的云主机价格贵了好多。腾讯1M的竟然500多一年,还算是几个大厂里面涨的少的。我觉得5M的500一年我还能接受,所以感觉是5倍的不值。

而且挺怪的是,IT设备一直在降价,内存条我一年前买4000块,现在是2千4;固态硬盘也差不多都在降。所以带宽反而涨咯?反正我觉得目前国内云服务市场的不那么市场,提升单价(而不是提升使用)是最容易摧毁这个市场的。曾经我还有考虑过建立云服务器集群,现在已经没这个想法了,有个2~3台就行,配置低的够用就行,还是走自建局域网比较靠谱。说起来我最早是搞光网络的,实在不明白这带宽是怎么提不上去的… 比造路造房子要简单很多啊,比5nm也要简单吧。

感恩的是国内还是有不少新的云服务商让薅羊毛,今天买了天翼云(看起来还是比较大方的)的一台5M主机,400多一年。大约在2个小时左右,我就完成了云主机的切换动作,以后大可以走开箱即用,过期即扔的策略。同一时刻只要有个2~3台就行了。

使用Jupyter的话,建议还是要5M的带宽,否则感觉等的时间太长了(估计是很多js之类的要下载)

内容

steady but firmly, I will made it

1 计划

嗯,话说又刷了一下网页,咻的一下就出来,爽的很,哈哈哈。

先建一个文件夹Step1DataETL, 之后启动了之后,新的数据就放在这里,而Sniffer也会去这里取数更新。初期在实操的时候采用手动的方式去下载和更新数据。

所以这次的任务包括两部分:

  • 1 Action1:在jupyter中开发好ETL过程,算出结果
  • 2 Action 2:在ADBS中将ETL过程复现,然后启动(多个)Worker去计算并校验

如同算法一样,在后续的步骤中都是如此。可能唯一的缺点是,这种运行方式需要在每一个时隙计算,所以会产生几十万次的数据库查询、计算,然后入库;不过庆幸的是,即使是这种情况,由于计算主要发生在局域网内,甚至是单个主机内,所以开销并不大。而且,这种全量计算相当于建立installed base只是进行一次,最多花费1天(?),之后都是秒级完成。

2 从某量化平台获取实验数据

我选择 沪深300ETF(510300)作为研究标的,具体原因就不解释了,反正我觉得可以。

Data looked like this :

所以sniffer要做的事就是将数据简单清洗后送到入队列,主要包括了主键计算,计算时隙,变量名称映射等。

Step1中的ETL要做的主要任务是根据周期,计算出若干通用的基础特征, that’s all。

3 Sniffer Part

读取数据然后将最新的部分刷新到队列

  • 1 有一个文件夹,里面会放入若干文件
  • 2 有一个缓存,记得已经读取过哪些文件(Redis - Mongo)
  • 3 读取最新时隙
  • 4 读取最新的文件,然后将主时隙之后的新数据存入队列

3.1 数据库集群-缓存

这里需要有一个元数据缓存,可以使用mymeta集群来存储。主要的考虑是这个元数据库集群可以在公网上访问,并且传输的数据很小,这样对带宽也没什么压力。

关于这个公网集群的设计上,有两点需要注意/改进点。

在服务端,日志还是会不断膨胀,可以考虑在容器的启动时,不特别存储日志,而是通过docker自动管理日志的删减。

在使用端,要考虑到允许硬刷新的方式来重新建立连接,这样数据库集群的地址即使变了也不必担心。这个是需要埋在各个项目的设计里的。

本次的实验暂时不考虑数据库方面的改动,但是在使用时会使用Redis + Mongo的缓存读取方式。

  • 1 在Redis中创建唯一的缓存名称
  • 2 在Mongo集群中设立缓存表(Buffer.SnifferRedisMongo)
  • 3 数据刷新时同时向Redis(+TTL)和Mongo更新
  • 4 读取时优先从Redis读取, 如果没有数据则向Mongo读取,并将数据同步写入Redis + TTL

在Redis中创建唯一的缓存名称

缓存变量的命名按照 3X3的方式


必填的参数

  • redis_class: BUFF 缓存类
  • project : microservice
  • subproject: Quant
  • app: data
  • subapp: Step1ETL
  • var: buffer_dict

通过RedisAgent来创建变量,会同时检查冲突(当前是否已经有了)
BUFF.microservice.Quant.pf.data.Step1ETL.af.gp.0.uf.buffer_dict

redis_agent_host = 'http://172.17.0.1:24021/'
redis_buff_var = {'redis_class':'BUFF','project':'microservice','subproject':'Quant',
'app':'data','subapp':'Step1ETL','var':'buffer_dict'}
req.post(redis_agent_host + 'redis_naming_test/',json =  redis_buff_var).json(){'data': None,'msg': 'NOT Existed,  <<<<<<  BUFF.microservice.Quant.pf.data.Step1ETL.af.gp.0.uf.buffer_dict','status': True}

在Mongo集群中设立缓存表(Buffer.SnifferRedisMongo)

本质上这个是为了更好的支持Redis的持久化创建的:我希望每个在内存中的变量都有ttl,同时也希望当再次需要时可以容易的再进入内存。

这样的连接还是比较有意思,直接就可以创建连接了

cur_machine = get_machine_name()
cur_machine='m7'
print('Current Machine', cur_machine)w = WMongo('w')
target_server = 'mymeta'
cur_w = w.TryConnectionOnceAndForever(server_name =target_server)Wmongo_v9000.012
设置当前连接 local
>>> Switching To Mymeta
设置当前连接 local
在CN001访问mymeta,通用
当前机器的名称: 48831f790d1d
1.当前使用的MongeAgent:http://172.17.0.1:24011/
2.Tier1:meta, Tier2:servers
3.ConnectionHash:e8d1bc791049988d89465d5ce24d993b
4.FilterDict:{'my_server_pkey': 'mymeta'}
5.Limits:1
6.Sort:
7.Skip:0
>>> Hit Records
当前机器的局网: None
【I】目标服务的机器:Clulster, 目标服务的机器局网:NO_LAN
【I】采用wan方式连接目标主机
Wmongo_v9000.012
设置当前连接 local
此时建立连接
target connection hash: 38aeabac8113f89488d5f530dca85827

接下来只要设置好两个变量就可以

序号 变量名 解释
1 redis_var_key 存储在redis中的键,索引
2 redis_var_val 存储在redid中的值

这个函数多创建了几个索引,但都属于默认的索引

tier1 ='Buffer'
tier2 ='SnifferRedisMongo'
index_var = 'redis_var_key'cur_w.ensure_mongo_index(tier1 =tier1, tier2 =tier2, key_index=index_var)
{'data': {'redis_var_key': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}
{'data': {'_is_enable_1': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}
{'data': {'_create_time_1': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}
{'data': {'_update_time_1': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}
{'data': {'_ch001_1': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}
{'data': {'_ch001_cnt_1': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}

读取最新时隙

这个在刚开始其实并不能读到(表内为空),可以先试一下连接,然后就要往里面灌数据了,然后才能继续测试这个功能。

# 读取redis变量
redis_var_name = 'BUFF.microservice.Quant.pf.data.Step1ETL.af.gp.0.uf.buffer_dict'
ttl = 10000
buffer_mongo_name = 'mymeta'
buffer_mongo_w = from_pickle(buffer_mongo_name)
buffer_mongo_tier1 = 'Buffer'
buffer_mongo_tier2 = 'SnifferRedisMongo'
# 1 直接读取
buffer_dict = req.post(redis_agent_host + 'getv/',json ={'k':redis_var_name}).json()['data']
print('[1]直接从redis读取  buffer_dict', buffer_dict)
# 2 如果为空则去mongo读取
if buffer_dict is None :print('[2]向mongo发起查询 buffer_dict')buffer_data_list = buffer_mongo_w.query_recs(tier1=buffer_mongo_tier1, tier2 = buffer_mongo_tier2,filter_dict ={'redis_var_key':redis_var_name},silent=True)['data']if len(buffer_data_list):print('[3]查询到结果,并重新写入redis')buffer_dict = buffer_data_list[0]['redis_var_val']# 设置新的buffer变量resp_dict = req.post(redis_agent_host + 'setv/',json ={'k':redis_var_name, 'v':buffer_dict,'ex_seconds':ttl}).json()if resp_dict['status']:print('[4]写入redis成功')else:buffer_dict = {}buffer_dict['already_read_files'] = []# 当前的文件列表
cur_data_files = [x for x in os.listdir(data_path) if not x.startswith('.') or x.endswith('.csv')]
# 缓存中的文件列表
buffer_data_file = buffer_dict['already_read_files']
# 本次要读取的差集
gap_files = sorted(list(set(cur_data_files) - set(buffer_data_file)))
print(gap_files)[1]直接从redis读取  buffer_dict None
[2]向mongo发起查询 buffer_dict
['510300_20230215.csv']

到这里又是一个小段落,关于程序所需要使用的缓存(Redis和Mongo)方法。

接下直接对于数据进行处理,按照要求将原始数据转为一下字段

先从量化平台下载数据

然后进行一些变换:

  • 1 变量的筛选/映射

采用简单的映射,这看起来有点像AETL,但我不想让这步操作过于规范化。最主要是因为这步是负责接入的,逻辑很简单,但是变化又很多。而AETL最好基于一些固定的Base进行操作。

  • 2 其他变量的增加/计算
# 将沪深300增加对应的字段
def sniffer_work(some_dict):res_dict = {}res_dict['market'] = 'SH'res_dict['data_slot'] = ts2ord(inverse_time_str(some_dict['original_data_dt']))res_dict['data_dt'] = slot_ord2str(res_dict['data_slot'] )res_dict['code'] = some_dict['code'].replace('.XSHG','')# 计算主键res_dict['rec_id'] = '_'.join( [ str(res_dict['market']),str(res_dict['code']),str(res_dict['data_slot']) ])# 其他价格res_dict['open'] = some_dict['open']res_dict['close'] = some_dict['close']res_dict['high'] = some_dict['high']res_dict['low'] = some_dict['low']res_dict['vol'] = some_dict['vol']res_dict['amt'] = some_dict['total_turnover']res_dict['trades'] = some_dict['trades']return res_dict 

主要是校验一下数据时间是否一致(因为做了统一的时隙转换)

  • 3 数据投送

sniffer在处理好数据之后,就要向入队列发送。在此之前,我先试一下读取「当前已处理」的最大时隙,看看如果没有缓存的时候如何处理。

在处理完成后,就要给redis和mongo进行缓存变量的更新,包括当前处理的文件名和「当前已处理」最大时隙。

碰到了一个幺蛾子,感受上特别毁三观,记录一下。
1. 换了frp服务器,当时比较忙,没有在portal上修改ip,但是以为改了
2. 写入数据后发现portal的页面不刷新数据条数
3. 看到monto 3T上显示数据服务,但是没有表了第一反应是程序出bug了,还是首先怀疑自己
第二反应是磁盘掉了,怀疑pcie转接
第三反应是xfs格式化的问题
很惶恐,我的微服务、底层数据库啊
...
最后发现,是没有改地址。忙过头了,自己吓自己。

因为队列长度十万的限制,所以分两次写入数据

# 获取已处理的时隙
already_done_data_slot = buffer_dict.get('already_done_data_slot') or 0
sel = handled_df['data_slot'] >already_done_data_slot
submit_df = handled_df[sel]project_name ='MyQuantBase'
gs_id = 'rec_id'step1_stream_in = 'step1_stream_in'
step1_stream_workin = 'step1_stream_workin'
step1_stream_workout = 'step1_stream_workout'step1_mongo_in = 'step1_mongo_in'
step1_mongo_out = 'step1_mongo_out'
step1_mongo_meta = 'step1_mongo_meta'full_name_step1_stream_in = '%s.%s' % (project_name,step1_stream_in)
print(full_name_step1_stream_in)# 第一部分(不超过10万条)
part1_df = submit_df.iloc[:90000]
part2_df = submit_df.iloc[90000:]part1_listofdict = part1_df.to_dict(orient='records')
part2_listofdict = part2_df.to_dict(orient='records')resp = req.post(redis_agent_host + 'batch_add_msg/',json ={'stream_name':full_name_step1_stream_in,'msg_dict_list':part1_listofdict,'maxlen':100000}).json()resp = req.post(redis_agent_host + 'batch_add_msg/',json ={'stream_name':full_name_step1_stream_in,'msg_dict_list':part2_listofdict,'maxlen':100000}).json()
print(resp)

最后更新缓存

buffer_dict['already_read_files'].append(gap_files[0])
buffer_dict['already_done_data_slot'] = int(submit_df['data_slot'].max())resp_dict = req.post(redis_agent_host + 'setv/',json ={'k':redis_var_name, 'v':buffer_dict,'ex_seconds':ttl}).json()redis_buffer_dict = {'redis_var_key':redis_var_name,'redis_var_val':buffer_dict}w = WMongo('w')
target_server = 'mymeta'
try:cur_w = from_pickle(target_server)color_print('【Loading cur_w】from pickle')
except:w = WMongo('w')cur_w = w.TryConnectionOnceAndForever(server_name =target_server)to_pickle(cur_w, target_server)tier1 ='Buffer'
tier2 ='SnifferRedisMongo'
cur_w.insert_or_update_with_key(tier1 = tier1, tier2 = tier2, data_listofdict = [redis_buffer_dict], key_name='redis_var_key')

在Mongo里有了就不会掉了

4 AETL Part

这部分用来生成基础变量,这些变量是生成交易信号的基础。

按AETL的规范,从实例开始,构建指标计算

注意,原来的Dummy处理是不对的,要按照顺序来(默认的Dummy处理的是非时序数据)。也要确保统计的周期足够。

这部分的调试要放在终端中进行,这样可以直接取数。

任务通道:默认的情况下ABDS会创建一个通道「_ch001」,Dummy也在使用这个通道进行任务的流转。

Redis队列里里的每个元素是什么?

**这个概念需要明确一下,Redis队列里,一条数据有时是数据,有时候也可以是任务。**例如,当用户发起一系列实体解析请求时,每条数据可能是一段文字,那么Worker可以一次获取n条处理。在当前场景下,因为每个任务的取数实际上是rolling的,如果把数据都放进队列是不合适的,所以每条数据实际上是一个任务。

Worker每次提取一个任务,根据任务中的data_slot,追溯过去n个周期的数据,然后进行计算、更新。以本次为例,最大的回顾周期是24000,所以Worker在获得当前任务的数据时隙后,向Mongo发起取数,然后在获得的数据内进行各种计算。

从本质上说,这种方式是将pandas rolling的动作延展到了队列上,最大的好处是提供了持久化的工作机制,同时也提供了超大型计算的基础(通过数据库存放超大型数据,每次任务所提取的数据是相对小的)。

接下来,使用终端进行Worker的开发:

核心的转换部分

        max_T = max(T_list)print('最大回顾周期',max_T)recs = cur_w.query_recs(tier1 = tier1, tier2 = tier2,filter_dict = {'$and':[{'market':'SH','code':code}]},limits= max_T, sort_tuple_list = [('data_slot',-1)])['data']# print(len(recs))# 如果有充足的数据才计算res_dict = {}if len(recs) == max_T:res_df = pd.DataFrame(recs).sort_values(['data_slot'])for T in T_list:res_dict['open_T_%s' % T] = float(res_df.iloc[-T]['open'])res_dict['high_T_%s' % T] = res_df.iloc[-T:]['high'].apply(float).max()res_dict['low_T_%s' % T] = res_df.iloc[-T:]['low'].apply(float).min()res_dict['vol_T_%s' % T] = res_df.iloc[-T:]['vol'].apply(float).sum()res_dict['amt_T_%s' % T] = res_df.iloc[-T:]['amt'].apply(float).sum()res_dict['trades_T_%s' % T] = res_df.iloc[-T:]['trades'].apply(float).sum()res_dict['vol_T_mean_%s' % T] = res_df.iloc[-T:]['vol'].apply(float).mean()res_dict['amt_T_mean_%s' % T] = res_df.iloc[-T:]['amt'].apply(float).mean()res_dict['trades_T_mean_%s' % T] = res_df.iloc[-T:]['trades'].apply(float).mean()res_dict['close_T_mean_%s' %T] = res_df.iloc[-T:]['close'].apply(float).mean()

结果如下:


这样就可以了,接下来就是最后一部分工作,整合并运行。

仔细配置了Worker,设置了一些索引之后,开始正式运行。跑一天我观察一下结果再看看是否要调整。


看起来是正常的

Python 算法交易实验49 Step1 DataETL相关推荐

  1. Python 算法交易实验55 ADBS:QuantData

    说明 更快的切入实战 我希望在开发的过程中也可以看到一些信号或者模拟结果在当前市场下的表现,这样有助于更好的评估方法的健壮性,特性等. 例如,我希望持续的看到 这样的图(to 2023-3-6) 从上 ...

  2. Python 算法交易实验56 ADBS:QuantData-灌入离线数据

    说明 上一回说到,通过ADBS构建了一个分钟级的实时数据源.这次打算将RQ的静态数据也灌入这个ADBS. 内容 1 下载数据 start_date = '2000-01-01' end_date = ...

  3. python算法交易工程师_清华编程高手尹成带你基于算法实践python量化交易

    清华编程高手尹成带你基于算法实践python量化交易 量化交易是指以先进的数学模型替代人为的主观判断,利用计算机技术从庞大的历史数据中海选能带来超额收益的多种"大概率"事件以制定策 ...

  4. 实战:基于技术分析的Python算法交易

    译者 | Tianyu 出品 | AI科技大本营(ID:rgznai100) 本文是用 Python 做交易策略回测系列文章的第四篇.上个部分介绍了以下几个方面内容: 介绍了 zipline 回测框架 ...

  5. 金融科技、算法交易、量化金融必读书:Python金融大数据分析第2版

    银行本质上是技术公司. --胡戈•班齐格 近来,Python无疑是金融业的重要策略性技术平台之一.到2018年底,这已经不再是个问题:全世界的金融机构现在都尽最大努力利用Python及其强大的数据分析 ...

  6. Python算法:决策树分类

    Python算法:决策树分类 文章目录 Python算法:决策树分类 一.前言 二.决策树算法原理介绍 1.决策树原理 2.决策树构造 3.交叉验证 三.决策树算法函数介绍 1. train_test ...

  7. python实现蒙特卡洛算法_用Python实现基于蒙特卡洛算法小实验

    用Python实现基于蒙特卡洛算法小实验 蒙特卡洛算法思想 蒙特卡洛(Monte Carlo)法是一类随机算法的统称,提出者是大名鼎鼎的数学家冯· 诺伊曼 ,他在20世纪40年代中期用驰名世界的赌城- ...

  8. python实验原理_Python实现蒙特卡洛算法小实验过程详解

    蒙特卡洛算法思想 蒙特卡洛(Monte Carlo)法是一类随机算法的统称,提出者是大名鼎鼎的数学家冯·诺伊曼,他在20世纪40年代中期用驰名世界的赌城-摩纳哥的蒙特卡洛来命名这种方法. 通俗的解释一 ...

  9. python统计套利_清华编程高手尹成带你基于算法实践python量化交易

    清华编程高手尹成带你基于算法实践python量化交易 量化交易是指以先进的数学模型替代人为的主观判断,利用计算机技术从庞大的历史数据中海选能带来超额收益的多种"大概率"事件以制定策 ...

最新文章

  1. Mac下pycharm如何安装tensorflow
  2. 终极大招——怎么在学术会议上有所收获?
  3. 产品打包工具的制作,ant,编译源码,打jar包,打tag,打war包,备份release版本等...
  4. mysql安装教程8.0.21安装,mysql 8.0.21 安装配置方法图文教程
  5. ARKit从入门到精通(9)-ARKit让飞机跟着镜头飞起来
  6. Android 4.0新增Space及GridLayout初谈
  7. surfaceView中的线程问题
  8. Sql Server 行转列学习 根据学生表、课程表、学生成绩表统计每个学生的各科成绩和他的总成绩、平均成绩...
  9. Linux安装配置CI框架
  10. 遥感原理与应用_专家报告 | 叶绿素荧光卫星遥感—原理与应用
  11. Excel VBA内部函数大全
  12. 应用计算机测定线性电阻伏安特性实验器材,实验一电路元件伏安特性的测试
  13. 响应式 - 创建自适应的响应式字体
  14. [附源码]java毕业设计校园拓展活动管理系统
  15. NOT EXISTS真的不走索引么?如何优化NOT EXISTS!
  16. [Python私活案例]24行代码,轻松赚取400元,运用Selenium爬取39万条数据
  17. 2019年7月28日 恶心人
  18. android 联系人操作: ContentProvider往通讯录添加联系人和获取联系人
  19. 2021年JAVA面试~初识集合Map(二)
  20. 最强找茬微信小程序源码修复版,已更新微信授权

热门文章

  1. ubuntu系统下使用jenkins自动构建Android项目
  2. android 五彩纸屑动画,HTML5 Canvas五彩纸屑粒子动画特效
  3. GDAL VSI文件扩展(virtual_file_systems扩展)
  4. 类微信米聊App语聊功能研究
  5. 低效能人士的7个习惯
  6. 求助:web项目中关于大华摄像机通过RTSP在摄像头、NVR/EVS取流的问题
  7. PS基础——自由变换
  8. Photoshop“自由变形”工具(转)
  9. PS 的打开文件和自由变换
  10. 全球技术协会ISACA在成立五十周年之际发起专注于未来的倡议