说明

上一回说到,通过ADBS构建了一个分钟级的实时数据源。这次打算将RQ的静态数据也灌入这个ADBS。

内容

1 下载数据

start_date = '2000-01-01'
end_date = '2023-10-12'
df = get_price('510300.XSHG',start_date=start_date,end_date=end_date,frequency='1m')
df =df.reset_index()df.to_csv('510300_0312_all.csv',index=False)

下载文件到本地

然后上传到服务器。(By the way, 我在ucloud上的服务器时间快到了,有一个文件上传下载服务还是有用的,到时候挪到现在的天翼云上)

2 对齐数据(列约束)

将离线数据上传到jupyter


现在这个数据要受到列的约束,我觉得这个是ADBS对于数据源的约束:在强调稳定性的同时对灵活性有一定压制,不过我还是觉得是可接受的。

看看现在实时数据存储的维度

3 送往队列

import pandas as pd
import time
def get_time_str1(ts = None,bias_hours=0):ts = ts or time.time()return time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(ts + bias_hours*3600)) def inverse_time_str(time_str  = None, bias_hours = 0):ts = time.mktime (time.strptime(time_str,'%Y-%m-%d %H:%M:%S')) + bias_hours* 3600return ts import time
# 将标准时间戳转为时间轴时序 day/hour/minute
def ts2ord(ts = None, tx_level='minute', bias_hours=8):ts = ts or time.time()tx_level = tx_level.lower()assert tx_level in ['day','hour','minute'], '只有 day, hour, minute三个级别'if tx_level == 'minute':res = (ts + bias_hours * 3600)//60elif tx_level == 'hour':res = (ts + bias_hours * 3600)//3600else:res = (ts + bias_hours * 3600)//86400return int(res)
# 将时序映射回字符
def slot_ord2str(some_slot, tx_level='minute', bias_hours=-8, fs=None):tx_level = tx_level.lower()assert tx_level in ['day','hour','minute'], '只有 day, hour, minute三个级别'if tx_level == 'minute':res = get_time_str1(some_slot*60, bias_hours=bias_hours)elif tx_level == 'hour':res = get_time_str1(some_slot*3600, bias_hours=bias_hours)else:res = get_time_str1(some_slot*86400, bias_hours=bias_hours)return resimport tqdm
def slice_list_by_batch2(some_list, batch_num):batch_list =list(range(0, len(some_list) + batch_num , batch_num))res_list = []for i in range(len(batch_list)-1):tem_list = some_list[batch_list[i]:batch_list[i+1]]res_list.append(tem_list)return res_listdef is_q_available(stream_name, maxlen = 100000, new_task_len = 10000, redis_agent = None,connection_hash =None  ):cur_redis_agent = redis_agentcur_len_resp = req.post(cur_redis_agent + 'len_of_queue/',json ={'stream_name':stream_name,'connection_hash':connection_hash}).json()if cur_len_resp['status']:cur_len = cur_len_resp['data']if cur_len + new_task_len >=maxlen:return False else:return True else:print('Connection Error')return False #  基于并发方法,向数据库存数【队列Write相关-写入消息】- 其实是使用pipeline - 最好单次一万左右
def parrallel_write_msg(stream_name, data_listofdict = None ,maxlen = None, time_out = None,redis_agent = None,connection_hash =None,is_return_msg_id_list=False):cur_redis_agent = redis_agentcur_maxlen = maxlen or 100000# 默认十秒超时time_out = time_out or 30print('>>> 并发写Stream')tick11 = time.time()resp_dict = req.post(cur_redis_agent + 'batch_add_msg/',json ={'connection_hash':connection_hash,'stream_name':stream_name,'msg_dict_list':data_listofdict,'maxlen':cur_maxlen,'is_return_msg_id_list':is_return_msg_id_list},timeout=time_out).json()tick13 = time.time()print('写入任务数据 {:.2f}'.format(tick13 -tick11))return resp_dict# ========= 准备数据
df = pd.read_csv('./Step1DataETL/510300_0312_all.csv')
df1 = pd.DataFrame()
df1['data_dt'] = df['datetime']
df1['open'] = df['open']
df1['close'] = df['close']
df1['high'] = df['high']
df1['low'] = df['low']
df1['vol'] = df['volume']
df1['amt'] = df['total_turnover']data_source = 'RQ'
code ='510300'
market ='SH'
df1['data_source'] = data_source
df1['code'] = code
df1['market'] = market
df1['data_slot'] = df1['data_dt'].apply(inverse_time_str).apply(ts2ord)
df1['rec_id'] = df1['data_source'] + '_' + df1['market'] + '_' + df1['code'].apply(str) \+ '_' + df1['data_slot'].apply(str)data_list = df1.to_dict(orient='records')# ========= 停等写入
batch_num = 10000
data_list2 = slice_list_by_batch2(data_list, batch_num)
redis_agent_host = 'http://172.17.0.1:24021/'
redis_agent = redis_agent_host
redis_connection_hash =None# 这个sniffer盯的是上ak接口
gs_id = 'rec_id'
target_q_max_len = 100000
project_name ='QuantData'
step1_stream_in = 'step1_stream_in'
target_q_name = '%s.%s' % (project_name, step1_stream_in)import requests as req
for tem_data_list in data_list2:# 判断队列是否可以写入is_target_q_available = is_q_available(target_q_name,maxlen = target_q_max_len, new_task_len = batch_num, redis_agent = redis_agent,connection_hash =redis_connection_hash)if is_target_q_available:cur_len_resp = req.post(redis_agent + 'len_of_queue/',json ={'stream_name':target_q_name,'connection_hash':redis_connection_hash}).json()q_len = cur_len_resp['data']print('{} Q has {} Messages' .format(target_q_name,q_len))tick101 = time.time()print('>>> Writing To Stream ')write_resp = parrallel_write_msg(target_q_name, data_listofdict = tem_data_list ,maxlen = target_q_max_len, time_out = 30,redis_agent = redis_agent,connection_hash =redis_connection_hash, is_return_msg_id_list=True)
#         print(write_resp)print('Spends {:.2f}' .format(time.time()-tick101))else:print('Q is full ,wait ... ')time.sleep(15)


这样就算好了。下一步是把基础ADBS的基础ETL给调好。

Python 算法交易实验56 ADBS:QuantData-灌入离线数据相关推荐

  1. Python 算法交易实验55 ADBS:QuantData

    说明 更快的切入实战 我希望在开发的过程中也可以看到一些信号或者模拟结果在当前市场下的表现,这样有助于更好的评估方法的健壮性,特性等. 例如,我希望持续的看到 这样的图(to 2023-3-6) 从上 ...

  2. Python 算法交易实验49 Step1 DataETL

    说明 万丈高楼平地起 按照前面的规划,开始有序推进我的[15% 资金加速器]计划.这一步是通过某个源,获取分钟级数据,然后送到第一个ADBS. Sniffer : 读取数据并发送到入队列.一开始我会把 ...

  3. python算法交易工程师_清华编程高手尹成带你基于算法实践python量化交易

    清华编程高手尹成带你基于算法实践python量化交易 量化交易是指以先进的数学模型替代人为的主观判断,利用计算机技术从庞大的历史数据中海选能带来超额收益的多种"大概率"事件以制定策 ...

  4. 实战:基于技术分析的Python算法交易

    译者 | Tianyu 出品 | AI科技大本营(ID:rgznai100) 本文是用 Python 做交易策略回测系列文章的第四篇.上个部分介绍了以下几个方面内容: 介绍了 zipline 回测框架 ...

  5. python量化交易入门学习 之用优矿的数据接口文档

    数据研究里面都是接口调用方法,  需要自己填充对应的参数 之前以为可以直接点击获得某个股票的直接调用的代码呢--可以优化一下  查询股票, 选择股票 直接生成代码. 研究数据包括了左侧的一些大类信息 ...

  6. Python量化交易09——使用证券宝获取金融行情数据(baostock)

    接上一篇文章介绍怎么用Tushare获取日k数据后,本章教大家怎么用证券宝获取数据. baostock 的官网链接:A股K线数据 - www.baostock.com 这是免费的一个库,数据种类也很很 ...

  7. 金融科技、算法交易、量化金融必读书:Python金融大数据分析第2版

    银行本质上是技术公司. --胡戈•班齐格 近来,Python无疑是金融业的重要策略性技术平台之一.到2018年底,这已经不再是个问题:全世界的金融机构现在都尽最大努力利用Python及其强大的数据分析 ...

  8. python 量化交易知识

    1.历史:原先股票交易市场是由交易员来进行买卖股票,现在越来越由机器自动化代替. 2.银行中量化交易员中,就是在复杂的金融衍生品中,分析投资标的的价值,来以某种策略管理仓位,进行买进卖出. 3.名词: ...

  9. Python算法:决策树分类

    Python算法:决策树分类 文章目录 Python算法:决策树分类 一.前言 二.决策树算法原理介绍 1.决策树原理 2.决策树构造 3.交叉验证 三.决策树算法函数介绍 1. train_test ...

最新文章

  1. pandas索引复合索引dataframe数据、索引其中一个水平(level)的特定数据列(index a column of a level)
  2. linux学习之sed grep
  3. python打包成exe闪退_脚本程序打包后,黑框一闪而过,程序不能运行
  4. ESP8266固件的下载
  5. 设计一个函数能够取出字符串中指定的字符
  6. 【Linux】一步一步学Linux——fc命令(224)
  7. 更换checkbox的原有样式
  8. HEU 2036 Paths on a Grid
  9. ASP.NET Core 5.0新增功能摘要
  10. 再暴BBSxp 7.0 Beta 2漏洞
  11. 算法课讨论 深究哈密顿图
  12. 计算机主机检测不到耳机,win10电脑检测不到耳机的原因及处理方法
  13. 运算放大器---转换速率(slew rate)
  14. 一个屌丝程序猿的人生(六十三)
  15. ao能连接oracle吗,[转载]使用AO连接ORACLE数据库
  16. js禁止鼠标右键的菜单事件
  17. Evm链原生代币转账
  18. 免费英文写作批改网站
  19. 好记性不如烂笔头(一)——局域网可以Ping通,但Socket无法连接
  20. 昇腾Atlas200DK学习笔记(一)——环境部署

热门文章

  1. 金蝶注册表服务器地址,金蝶KIS客户端修改IP连接服务器的方法
  2. html两个th合并单元格,10、HTML表格(table 、th、tr、td、合并单元格)的简单认识...
  3. <LeetCode天梯>Day043 Fizz Buzz(按部就班) | 初级算法 | Python
  4. 通用单口RJ45种类
  5. java .点 是什么_java
  6. 旅游景点微信小程序开发方案
  7. 慕学生鲜xadmin登录不成功解决办法
  8. Unity同时接入ShareSdk和微派支付sdk(二)
  9. Python学习笔记-字符串
  10. 能效管理很重要——施耐德电气全球高级副总裁 、APC大中华区总裁黄陈宏博士...