一般来说,一个股票信息应该保存在一张表中。但是由于我机器资源限制,且我希望尽快频率的抓取数据。所以每天我将所有股票的实时交易信息放在daily_temp库中的一个以日期命名的表中。主力动向信息也是如此。但是盘后分析股票时,我们会以单只股票进行分析。这样就需要跨越很多天,而这样的设计将导致需要查询若干个表,且随着日期增加,读取的表也将增加。我觉得这样是不合适的。(转载请指明出于breaksoftware的csdn博客)

目前我们系统繁忙的时间和交易时间同步。为了最大幅度的利用资源,我决定在盘后对每日的数据按照股票代码进行拆分备份。这样我们就可以查询一张表得到该股票所有历史数据。

拆分备份实时交易信息

首先我们要从股票基本信息表中读取所有股票代码

    def _get_all_share_ids(self):date_info = time.strftime('%Y_%m_%d')trade_table_name = "trade_info_%s" % (date_info)share_ids = fetch_data.get_data(fetch_data.select_db(self._daily_temp_conn_name, trade_table_name, ["share_id"],{}, pre = "distinct"))return share_ids

fetch_data是我用于封装获取数据的接口。其中包含通过网络获取数据,通过数据库获取数据和通过正则拆分数据

class select_db:def __init__(self, conn_name, table_name, select_columns, conditions, pre="", extend=""):self._conn_name = conn_nameself._table_name = table_nameself._select_columns = select_columnsself._conditions = conditionsself._pre = preself._extend = extenddef get_data(self):db_manager = mysql_manager()conn = db_manager.get_mysql_conn(self._conn_name)result = conn.select(self._table_name, self._select_columns, self._conditions, self._pre, self._extend)return resultclass query_http:def __init__(self, url):self._url = urldef get_data(self):res = ""tried = Falsewhile True:try:socket.setdefaulttimeout(15)req = urllib2.Request(self._url)res_data = urllib2.urlopen(req)res = res_data.read()breakexcept Exception as e:LOG_ERROR("request error: %s %s"  % (self._url ,e))if tried:breakelse:tried = Truereturn resclass regular_split:def __init__(self, regular_name, data):self._regular_name = regular_nameself._data = datadef get_data(self):regular_split_mgr = regular_split_manager()ret_array = regular_split_mgr.get_split_data(self._data, self._regular_name)return ret_arraydef get_data(query_item):if False == hasattr(query_item, "get_data"):return Noneresult = query_item.get_data()return result

下一步通过股票代码查询当天所有数据

    def _bak_trade_info(self, share_id):date_info = time.strftime('%Y_%m_%d')table_name = "trade_info_%s" % (date_info)db_manager = mysql_manager()conn = db_manager.get_mysql_conn(self._daily_temp_conn_name)fields_array = ["today_open","yesterday_close","cur","today_high","today_low","compete_buy_price","compete_sale_price","trade_num","trade_price","buy_1_num","buy_1_price","buy_2_num","buy_2_price","buy_3_num","buy_3_price","buy_4_num","buy_4_price","buy_5_num","buy_5_price","sale_1_num","sale_1_price","sale_2_num","sale_2_price","sale_3_num","sale_3_price","sale_4_num","sale_4_price","sale_5_num","sale_5_price","time_date_str","time_str"]daily_data = conn.select(table_name, fields_array, {"share_id":[share_id, "="]})self._bak_single_market_maker_info(share_id, daily_data)

由于抓取时间和数据源时间存在差异,所以我们可能会抓取到交易时间之外的数据。于是我们要对这些数据进行归一化处理。比如我们有11.29、11:31和11:32三个数据,则对交易时间之外的数据11:31和11:32数据归一为11:30的数据并保存。

    def _bak_single_market_maker_info(self, share_id, daily_data):daily_data_list = []has_between_11_30_and_13_00 = Falseafter_15_00 = Falsekeys_list = []for item in daily_data:item_list = list(item)date_str = item[-2] + " " + item[-1]today_11_30 = date_str[:date_str.find(" ")] + " 11:30:00" today_13_00 = date_str[:date_str.find(" ")] + " 13:00:00"today_15_00 = date_str[:date_str.find(" ")] + " 15:00:00"today_11_30_int = time.mktime(time.strptime(today_11_30,'%Y-%m-%d %H:%M:%S'))today_13_00_int = time.mktime(time.strptime(today_13_00,'%Y-%m-%d %H:%M:%S'))today_15_00_int = time.mktime(time.strptime(today_15_00,'%Y-%m-%d %H:%M:%S'))date_int = time.mktime(time.strptime(date_str,'%Y-%m-%d %H:%M:%S'))if date_int >= today_11_30_int and date_int < today_13_00_int:if has_between_11_30_and_13_00:continueelse:has_between_11_30_and_13_00 = Trueif date_int >= today_15_00_int:if after_15_00:continueelse:after_15_00 = Trueif date_int in keys_list:continueelse:keys_list.append(date_int)item_list.insert(0, date_int)del item_list[-1]del item_list[-1]daily_data_list.append(item_list)keys_array = ["time","today_open","yesterday_close","cur","today_high","today_low","compete_buy_price","compete_sale_price","trade_num","trade_price","buy_1_num","buy_1_price","buy_2_num","buy_2_price","buy_3_num","buy_3_price","buy_4_num","buy_4_price","buy_5_num","buy_5_price","sale_1_num","sale_1_price","sale_2_num","sale_2_price","sale_3_num","sale_3_price","sale_4_num","sale_4_price","sale_5_num","sale_5_price"]share_trade_info_table_name = "trade_info_detail_" +share_idself._create_table_if_not_exist(share_id, share_trade_info_table_name)stock_conn_manager_obj = stock_conn_manager()conn = stock_conn_manager_obj.get_conn(share_id)conn.insert_data(share_trade_info_table_name, keys_array, daily_data_list)

此处我们并没有使用直接检查并创建表的方式,而是使用了_create_table_if_not_exist方法

    def _create_table_if_not_exist(self, share_id, table_name):stock_conn_manager_obj = stock_conn_manager()conn_name = stock_conn_manager_obj.get_conn_name(share_id)prepare_table_obj = prepare_table(conn_name, "trade_info")prepare_table_obj.prepare(table_name)

为什么要这么用?因为我们要将三千多支股票信息保存分片到300个不同的数据库中。那么当前这支股票在哪个库中,则需要一个中间层去代理管理。

@singleton
class stock_conn_manager():def __init__(self):passdef get_conn(self, share_id):conn_name = self.get_conn_name(share_id)db_manager = mysql_manager()conn = db_manager.get_mysql_conn(conn_name)return conndef get_conn_name(self, share_id):share_id_int = int(share_id)share_id_part_no = share_id_int % 300conn_name = "stock_part_%d" % (share_id_part_no)return conn_name

stock_conn_manager类将股票代码和300取余数,得出分片ID。然后连接该ID对应的库。这层设计非常重要,因为不仅此处我们备份数据要用到,之后对全部股票进行分析时也要用到它。

拆分备份主力动向信息

主要逻辑同实时交易信息。故只贴出代码

class bak_today_market_maker(job_base):def __init__(self):self._db_manager = mysql_manager()self._daily_temp_conn_name = "daily_temp"def run(self):share_ids = self._get_all_share_ids()for share_id  in share_ids:self._bak_market_maker_info(share_id[0])LOG_INFO("run bak_today_market_maker")def _bak_market_maker_info(self, share_id):date_info = time.strftime('%Y_%m_%d')table_name = "market_maker_%s" % (date_info)fields_array =["time_str", "price", "up_percent", "market_maker_net_inflow", "market_maker_net_inflow_per","huge_inflow", "huge_inflow_per", "large_inflow", "large_inflow_per", "medium_inflow", "medium_inflow_per", "small_inflow", "small_inflow_per"]daily_data = fetch_data.get_data(fetch_data.select_db(self._daily_temp_conn_name, table_name, fields_array, {"share_id":[share_id, "="]}))self._bak_single_market_maker_info(share_id, daily_data)def _bak_single_market_maker_info(self, share_id, daily_data):daily_data_list = []has_between_11_30_and_13_00 = Falseafter_15_00 = Falsekeys_list = []for item in daily_data:item_list = list(item)date_str = item[0]today_11_30 = date_str[:date_str.find(" ")] + " 11:30:00" today_13_00 = date_str[:date_str.find(" ")] + " 13:00:00"today_15_00 = date_str[:date_str.find(" ")] + " 15:00:00"today_11_30_int = time.mktime(time.strptime(today_11_30,'%Y-%m-%d %H:%M:%S'))today_13_00_int = time.mktime(time.strptime(today_13_00,'%Y-%m-%d %H:%M:%S'))today_15_00_int = time.mktime(time.strptime(today_15_00,'%Y-%m-%d %H:%M:%S'))date_int = time.mktime(time.strptime(date_str,'%Y-%m-%d %H:%M:%S'))if date_int >= today_11_30_int and date_int < today_13_00_int:if has_between_11_30_and_13_00:continueelse:has_between_11_30_and_13_00 = Trueif date_int >= today_15_00_int:if after_15_00:continueelse:after_15_00 = Trueif date_int in keys_list:continueelse:keys_list.append(date_int)item_list[0] = date_intdaily_data_list.append(item_list)keys_array =["time", "price", "up_percent", "market_maker_net_inflow", "market_maker_net_inflow_per","huge_inflow", "huge_inflow_per", "large_inflow", "large_inflow_per", "medium_inflow", "medium_inflow_per", "small_inflow", "small_inflow_per"]share_market_maker_table_name = "market_maker_detail_" + share_idself._create_table_if_not_exist(share_id, share_market_maker_table_name)stock_conn_manager_obj = stock_conn_manager()conn = stock_conn_manager_obj.get_conn(share_id)conn.insert_data(share_market_maker_table_name, keys_array, daily_data_list)def _get_all_share_ids(self):date_info = time.strftime('%Y_%m_%d')trade_table_name = "trade_info_%s" % (date_info)share_ids = fetch_data.get_data(fetch_data.select_db(self._daily_temp_conn_name, trade_table_name, ["share_id"],{}, pre = "distinct"))return share_idsdef _create_table_if_not_exist(self, share_id, table_name):stock_conn_manager_obj = stock_conn_manager()conn_name = stock_conn_manager_obj.get_conn_name(share_id)prepare_table_obj = prepare_table(conn_name, "market_maker")prepare_table_obj.prepare(table_name)

实时交易和主力动向拆分备份的任务配置如下。因为这两个数据库比较大,我给每个任务留了一个小时的处理时间。

[bak_today_market_maker]
type=cron
class=bak_today_market_maker
day_of_week=1-5
hour=16
minute=50
timezone = Asia/Shanghai[bak_today_trade]
type=cron
class=bak_today_trade
day_of_week=1-5
hour=15
minute=50
timezone = Asia/Shanghai

码农技术炒股之路——实时交易信息、主力动向信息分库备份相关推荐

  1. 码农技术炒股之路——抓取股票基本信息、实时交易信息、主力动向信息

    从本节开始,我们开始介绍各个抓取和备份业务.(转载请指明出于breaksoftware的csdn博客) 因为我们数据库很多,数据库中表也很多,所以我们需要一个自动检测并创建数据库和表的功能.在< ...

  2. 码农技术炒股之路——数据库管理器、正则表达式管理器

    我选用的数据库是Mysql.选用它是因为其可以满足我的需求,而且资料多.因为作为第三方工具,难免有一些配置问题.所以本文也会讲解一些和Mysql配置及开发相关的问题.(转载请指明出于breaksoft ...

  3. 码农技术炒股之路——任务管理器

    系统任务和普通任务都是通过任务管理器调度的.它们的区别是:系统任务在程序运行后即不会被修改,而普通任务则会被修改.(转载请指明出于breaksoftware的csdn博客) 为什么要有这样的设计?因为 ...

  4. 码农技术炒股之路——配置管理器、日志管理器

    配置管理器和日志管理器是项目中最为独立的模块.我们可以很方便将其剥离出来供其他Python工程使用.文件的重点将是介绍Python单例和logging模块的使用.(转载请指明出于breaksoftwa ...

  5. 码农技术炒股之路——抓取日线数据、计算均线和除权数据

    日线数据是股票每日收盘后的信息.这块数据不用实时抓取,所以并不占用宝贵的交易时间的资源.于是我们抓取完数据后直接往切片后的数据库中保存.(转载请指明出于breaksoftware的csdn博客) 抓取 ...

  6. 一个30岁男人转型码农的平凡之路

    今天给大家带来的是一个转行的故事,一个30岁才开始学习编程的小白,资质平平,真正的零基础. 他的故事和那些大众喜欢的.夸张的.甚至虚假的华丽转身不同,一点也不精彩.一点也不鸡汤,平淡如水,但是能反映出 ...

  7. php-fpm哪里下载_如何在centos系统下找到php-fpm的位置 - 翟码农技术博客

    reboot重启了服务器后,所有的服务都需要重新启动. 启动php-fpm时,使用如下命令systemctl start php-fpm.service 提示:Failed to restart ph ...

  8. 一个普通码农的Linux之路

    1. Hi,大家好,我是奔跑的码仔,是一名长期混迹于Linux江湖,靠Linux吃饭的程序员.生活在一个IT大环境不好的二线城市,大家也知道,这里的程序员本来就很稀少,况且是Linux程序员呢,就更是 ...

  9. php redis管道,php redis pipeline怎么用 - 翟码农技术博客

    网上一大堆文章都在说pipeline怎么提升性能,我只是想知道安装好phpredis库之后,代码上如何写来开启管道模式,也就是下面这一小撮代码而已.$pipe = $redis->multi(R ...

最新文章

  1. 10000字的Pandas核心操作知识大全!
  2. linux windows并发模型,Linux并发服务器模型四 -- poll
  3. 简单的PHP和MYSQL做投票系统_php mysql简单投票系统
  4. django 三天写个人博客
  5. UOJ #310 黎明前的巧克力 (FWT)
  6. Windows Print Spooler服务最新漏洞CVE-2021-34527详细分析
  7. Python类的封装
  8. ROS学习之节点间话题通信的
  9. Android 应用 之路 百度地图API使用(3)
  10. 5W-Lora电台的远距离传输优势
  11. 八 .数据库(多表查询)
  12. 周期置换加密算法用c语言实现,古典密码实验报告.doc
  13. 两男子骑摩托车抢夺宴席礼金 警方:嫌疑人已被抓获
  14. 集群ddos_《DNS攻击防范科普系列2》 -DNS服务器怎么防DDoS攻击
  15. 考研_数学二_中值定理_证明题_辅助函数的设法
  16. centos7 关闭自动yum更新
  17. 语音处理/语音识别基础(五)- 声音的音量,过零率,音高的计算
  18. 通过Excel VBA对序列实现自动分级
  19. 执笔写流年,焚纸闻墨香
  20. win10修复引导工具怎么用【系统天地】

热门文章

  1. 1. 编程规范和编程安全指南--python
  2. 分离颜色通道(split)和多通道融合(merge)
  3. ml不是内部或外部命令_美国飞机制造商波音公司采用VR技术训练宇航员 ; Snap Lens Studio推出支持自定义ML驱动的Snapchat镜头...
  4. Revit:概念建模环境技能学习 Revit: Conceptual Modeling Environment
  5. 一图带你入门Linux 存储I/O栈
  6. 二叉树:最近的公共祖先 Lowest Common Ancestor of a Binary Tree
  7. l-lsblk查看设备可用块设备
  8. [NOI2005]维护数列
  9. vmware虚拟机启动centOs黑屏
  10. 【Python之路】第二篇--初识Python