前文介绍了把股票数据写入MySQL的过程,本文记录从MySQL中读取股票数据的过程。

到目前为止,我们在访问股票代码列表时,每次需要通过BaoStock重新下载。本文将把下载的股票代码保存到MySQL中,在后续访问股票列表时,就可以通过MySQL读取股票代码数据,从而避免每次都要重新下载代码列表,也以此来演示从MySQL中读取数据的过程。

主要代码分析

新建源文件,命名为data_center_v8.py,全部内容见文末,v8主要修改了函数get_stock_codes:

def get_stock_codes(date=None, update=False):

该函数用于获取指定日期的A股代码列表,其中:

  • 参数date为日期,默认为None
  • 参数update为是否更新股票列表,默认为False
  • 返回值为A股代码的列表

不同的参数设置,会有3种情况:

  • 若参数update为False,表示从数据库中读取股票列表
  • 若数据库中不存在股票列表的表,或者update为True,则下载指定日期date的交易股票列表
  • 若参数date为空,则返回最近1个交易日的A股代码列表
  • 若参数date不为空,且为交易日,则返回date当日的A股代码列表
  • 若参数date不为空,但不为交易日,则打印提示非交易日信息,程序退出

综上,在默认参数条件下:

  • 若程序是第一次运行,则下载最近1个交易的A股代码列表,并保存在数据库中
  • 若程序不是第一次运行,则会从数据库中读取A股列表
    engine = create_mysql_engine()

创建数据库引擎对象,用判断表是否存在及读写表数据。

    table_name = 'stock_codes'

数据库中股票代码的表名。

    if table_name not in sqlalchemy.inspect(engine).get_table_names() or update:

数据库中不存在股票代码表,或者需要更新股票代码表。

        # 登录baostockbs.login()# 从BaoStock查询股票数据stock_df = bs.query_all_stock(date).get_data()# 如果获取数据长度为0,表示日期date非交易日if 0 == len(stock_df):# 如果设置了参数date,则打印信息提示date为非交易日if date is not None:print('当前选择日期为非交易日或尚无交易数据,请设置date为历史某交易日日期')sys.exit(0)# 未设置参数date,则向历史查找最近的交易日,当获取股票数据长度非0时,即找到最近交易日delta = 1while 0 == len(stock_df):stock_df = bs.query_all_stock(datetime.date.today() - datetime.timedelta(days=delta)).get_data()delta += 1# 注销登录bs.logout()# 筛选股票数据,上证和深证股票代码在sh.600000与sz.39900之间stock_df = stock_df[(stock_df['code'] >= 'sh.600000') & (stock_df['code'] < 'sz.399000')]

通过查询BaoStock更新股票列表代码,上面代码与v1内容相同,可参考v1分析内容。

        stock_df.to_sql(name=table_name, con=engine, if_exists='replace', index=False, index_label=False)

调用DataFrame的to_sql方法将股票代码写入数据库。

        return stock_df['code'].tolist()

返回股票列表。

    else:

从数据库中读取股票代码列表。

        sql_cmd = 'SELECT {} FROM {}'.format('code', table_name)

待执行的sql语句。

        return pd.read_sql(sql=sql_cmd, con=engine)['code'].tolist()

调用read_sql方法,执行sql命令,并返回DataFrame。read_sql的参数sql表示要执行的sql命令, con表示创建的数据库连接,返回值的数据类型是DataFrame。下图展示了我们通过MySQL workbench查看的表stock_codes的内容。我们提取其中的code列,转化为list并返回。

小结

至此,我们完成了股票代码数据的数据库写入和读出功能,可以通过数据库读取股票代码,而不必每次都去BaoStock重新下载。

通过pandas的read_sql和to_sql方法,可以方便地完成DataFrame对MySQL的读写操作。

下一篇文章将记录从MySQL中读取日线及因子数据,并尝试对策略的胜率进行分析。


data_center_v8.py的全部代码如下:

import baostock as bs
import datetime
import sys
import numpy as np
import pandas as pd
import multiprocessing
import sqlalchemy# 可用日线数量约束
g_available_days_limit = 250# BaoStock日线数据字段
g_baostock_data_fields = 'date,open,high,low,close,preclose,volume,amount,adjustflag,turn,tradestatus,pctChg,peTTM,pbMRQ, psTTM,pcfNcfTTM,isST'def create_mysql_engine():"""创建数据库引擎对象:return: 新创建的数据库引擎对象"""# 引擎参数信息host = 'localhost'user = 'root'passwd = '111111'port = '3306'db = 'db_quant'# 创建数据库引擎对象mysql_engine = sqlalchemy.create_engine('mysql+pymysql://{0}:{1}@{2}:{3}'.format(user, passwd, host, port),poolclass=sqlalchemy.pool.NullPool)# 如果不存在数据库db_quant则创建mysql_engine.execute("CREATE DATABASE IF NOT EXISTS {0} ".format(db))# 创建连接数据库db_quant的引擎对象db_engine = sqlalchemy.create_engine('mysql+pymysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(user, passwd, host, port, db),poolclass=sqlalchemy.pool.NullPool)# 返回引擎对象return db_enginedef get_stock_codes(date=None, update=False):"""获取指定日期的A股代码列表若参数update为False,表示从数据库中读取股票列表若数据库中不存在股票列表的表,或者update为True,则下载指定日期date的交易股票列表若参数date为空,则返回最近1个交易日的A股代码列表若参数date不为空,且为交易日,则返回date当日的A股代码列表若参数date不为空,但不为交易日,则打印提示非交易日信息,程序退出:param date: 日期,默认为None:param update: 是否更新股票列表,默认为False:return: A股代码的列表"""# 创建数据库引擎对象engine = create_mysql_engine()# 数据库中股票代码的表名table_name = 'stock_codes'# 数据库中不存在股票代码表,或者需要更新股票代码表if table_name not in sqlalchemy.inspect(engine).get_table_names() or update:# 登录baostockbs.login()# 从BaoStock查询股票数据stock_df = bs.query_all_stock(date).get_data()# 如果获取数据长度为0,表示日期date非交易日if 0 == len(stock_df):# 如果设置了参数date,则打印信息提示date为非交易日if date is not None:print('当前选择日期为非交易日或尚无交易数据,请设置date为历史某交易日日期')sys.exit(0)# 未设置参数date,则向历史查找最近的交易日,当获取股票数据长度非0时,即找到最近交易日delta = 1while 0 == len(stock_df):stock_df = bs.query_all_stock(datetime.date.today() - datetime.timedelta(days=delta)).get_data()delta += 1# 注销登录bs.logout()# 筛选股票数据,上证和深证股票代码在sh.600000与sz.39900之间stock_df = stock_df[(stock_df['code'] >= 'sh.600000') & (stock_df['code'] < 'sz.399000')]# 将股票代码写入数据库stock_df.to_sql(name=table_name, con=engine, if_exists='replace', index=False, index_label=False)# 返回股票列表return stock_df['code'].tolist()# 从数据库中读取股票代码列表else:# 待执行的sql语句sql_cmd = 'SELECT {} FROM {}'.format('code', table_name)# 读取sql,返回股票列表return pd.read_sql(sql=sql_cmd, con=engine)['code'].tolist()def create_data(stock_codes, from_date='1990-12-19', to_date=datetime.date.today().strftime('%Y-%m-%d'),adjustflag='2'):"""下载指定日期内,指定股票的日线数据,计算扩展因子:param stock_codes: 待下载数据的股票代码:param from_date: 日线开始日期:param to_date: 日线结束日期:param adjustflag: 复权选项 1:后复权  2:前复权  3:不复权  默认为前复权:return: None"""# 创建数据库引擎对象engine = create_mysql_engine()# 下载股票循环for code in stock_codes:print('正在下载{}...'.format(code))# 登录BaoStockbs.login()# 下载日线数据out_df = bs.query_history_k_data_plus(code, g_baostock_data_fields, start_date=from_date, end_date=to_date,frequency='d', adjustflag=adjustflag).get_data()# 剔除停盘数据if out_df.shape[0]:out_df = out_df[(out_df['volume'] != '0') & (out_df['volume'] != '')]# 如果数据为空,则不创建if not out_df.shape[0]:continue# 删除重复数据out_df.drop_duplicates(['date'], inplace=True)# 日线数据少于g_available_days_limit,则不创建if out_df.shape[0] < g_available_days_limit:continue# 将数值数据转为float型,便于后续处理convert_list = ['open', 'high', 'low', 'close', 'preclose', 'volume', 'amount', 'turn', 'pctChg']out_df[convert_list] = out_df[convert_list].astype(float)# 重置索引out_df.reset_index(drop=True, inplace=True)# 计算扩展因子out_df = extend_factor(out_df)# 写入数据库table_name = '{}_{}'.format(code[3:], code[:2])out_df.to_sql(name=table_name, con=engine, if_exists='replace', index=True, index_label='id')def get_code_group(process_num, stock_codes):"""获取代码分组,用于多进程计算,每个进程处理一组股票:param process_num: 进程数:param stock_codes: 待处理的股票代码:return: 分组后的股票代码列表,列表的每个元素为一组股票代码的列表"""# 创建空的分组code_group = [[] for i in range(process_num)]# 按余数为每个分组分配股票for index, code in enumerate(stock_codes):code_group[index % process_num].append(code)return code_groupdef multiprocessing_func(func, args):"""多进程调用函数:param func: 函数名:param args: func的参数,类型为元组,第0个元素为进程数,第1个元素为股票代码列表:return: 包含各子进程返回对象的列表"""# 用于保存各子进程返回对象的列表results = []# 创建进程池with multiprocessing.Pool(processes=args[0]) as pool:# 多进程异步计算for codes in get_code_group(args[0], args[1]):results.append(pool.apply_async(func, args=(codes, *args[2:],)))# 阻止后续任务提交到进程池pool.close()# 等待所有进程结束pool.join()return resultsdef create_data_mp(stock_codes, process_num=61,from_date='1990-12-19', to_date=datetime.date.today().strftime('%Y-%m-%d'), adjustflag='2'):"""使用多进程创建指定日期内,指定股票的日线数据,计算扩展因子:param stock_codes: 待创建数据的股票代码:param process_num: 进程数:param from_date: 日线开始日期:param to_date: 日线结束日期:param adjustflag: 复权选项 1:后复权  2:前复权  3:不复权  默认为前复权:return: None"""multiprocessing_func(create_data, (process_num, stock_codes, from_date, to_date, adjustflag,))def extend_factor(df):"""计算扩展因子:param df: 待计算扩展因子的DataFrame:return: 包含扩展因子的DataFrame"""# 使用pipe依次计算涨停、双神及是否为候选股票df = df.pipe(zt).pipe(ss, delta_days=30).pipe(candidate)return dfdef zt(df):"""计算涨停因子若涨停,则因子为True,否则为False以当日收盘价较前一日收盘价上涨9.8%及以上作为涨停判断标准:param df: 待计算扩展因子的DataFrame:return: 包含扩展因子的DataFrame"""df['zt'] = np.where((df['close'].values >= 1.098 * df['preclose'].values), True, False)return dfdef shift_i(df, factor_list, i, fill_value=0, suffix='a'):"""计算移动因子,用于获取前i日或者后i日的因子:param df: 待计算扩展因子的DataFrame:param factor_list: 待移动的因子列表:param i: 移动的步数:param fill_value: 用于填充NA的值,默认为0:param suffix: 值为a(ago)时表示移动获得历史数据,用于计算指标;值为l(later)时表示获得未来数据,用于计算收益:return: 包含扩展因子的DataFrame"""# 选取需要shift的列构成新的DataFrame,进行shift操作shift_df = df[factor_list].shift(i, fill_value=fill_value)# 对新的DataFrame列进行重命名shift_df.rename(columns={x: '{}_{}{}'.format(x, i, suffix) for x in factor_list}, inplace=True)# 将重命名后的DataFrame合并到原始DataFrame中df = pd.concat([df, shift_df], axis=1)return dfdef shift_till_n(df, factor_list, n, fill_value=0, suffix='a'):"""计算范围移动因子用于获取前/后n日内的相关因子,内部调用了shift_i:param df: 待计算扩展因子的DataFrame:param factor_list: 待移动的因子列表:param n: 移动的步数范围:param fill_value: 用于填充NA的值,默认为0:param suffix: 值为a(ago)时表示移动获得历史数据,用于计算指标;值为l(later)时表示获得未来数据,用于计算收益:return: 包含扩展因子的DataFrame"""for i in range(n):df = shift_i(df, factor_list, i + 1, fill_value, suffix)return dfdef ss(df, delta_days=30):"""计算双神因子,即间隔的两个涨停若当日形成双神,则因子为True,否则为False:param df: 待计算扩展因子的DataFrame:param delta_days: 两根涨停间隔的时间不能超过该值,否则不判定为双神,默认值为30:return: 包含扩展因子的DataFrame"""# 移动涨停因子,求取近delta_days天内的涨停情况,保存在一个临时DataFrame中temp_df = shift_till_n(df, ['zt'], delta_days, fill_value=False)# 生成列表,用于后续检索第2天前至第delta_days天前是否有涨停出现col_list = ['zt_{}a'.format(x) for x in range(2, delta_days + 1)]# 计算双神,需同时满足3个条件:# 1、第2天前至第delta_days天前,至少有1个涨停# 2、1天前不是涨停(否则就是连续涨停,不是间隔的涨停)# 3、当天是涨停df['ss'] = temp_df[col_list].any(axis=1) & ~temp_df['zt_1a'] & temp_df['zt']return dfdef ma(df, n=5, factor='close'):"""计算均线因子:param df: 待计算扩展因子的DataFrame:param n: 待计算均线的周期,默认计算5日均线:param factor: 待计算均线的因子,默认为收盘价:return: 包含扩展因子的DataFrame"""# 均线名称,例如,收盘价的5日均线名称为ma_5,成交量的5日均线名称为volume_ma_5name = '{}ma_{}'.format('' if 'close' == factor else factor + '_', n)# 取待计算均线的因子列s = pd.Series(df[factor], name=name, index=df.index)# 利用rolling和mean计算均线数据s = s.rolling(center=False, window=n).mean()# 将均线数据添加到原始的DataFrame中df = df.join(s)# 均线数值保留两位小数df[name] = df[name].apply(lambda x: round(x + 0.001, 2))return dfdef mas(df, ma_list=None, factor='close'):"""计算多条均线因子,内部调用ma计算单条均线:param df: 待计算扩展因子的DataFrame:param ma_list: 待计算均线的周期列表,默认为None:param factor: 待计算均线的因子,默认为收盘价:return: 包含扩展因子的DataFrame"""if ma_list is None:ma_list = []for i in ma_list:df = ma(df, i, factor)return dfdef cross_mas(df, ma_list):"""计算穿均线因子若当日最低价不高于均线价格且当日收盘价不低于均线价格则当日穿均线因子值为True,否则为False:param df: 待计算扩展因子的DataFrame:param ma_list: 均线的周期列表:return: 包含扩展因子的DataFrame"""for i in ma_list:df['cross_{}'.format(i)] = (df['low'] <= df['ma_{}'.format(i)]) & (df['ma_{}'.format(i)] <= df['close'])return dfdef candidate(df):"""计算是否为候选若当日日线同时穿过5、10、20、30日均线且30日均线在60日均线上方且当日形成双神则当日作为候选,该因子值为True,否则为False:param df: 待计算扩展因子的DataFrame:return: 包含扩展因子的DataFrame"""# 均线周期列表ma_list = [5, 10, 20, 30, 60]# 计算均线的因子,保存到临时的DataFrame中temp_df = mas(df, ma_list)# 计算穿多线的因子,保存到临时的DataFrame中temp_df = cross_mas(temp_df, ma_list)# 穿多线因子的列名列表column_list = ['cross_{}'.format(x) for x in ma_list[:-1]]# 计算是否为候选df['candidate'] = temp_df[column_list].all(axis=1) & (temp_df['ma_30'] >= temp_df['ma_60']) & df['ss']return dfif __name__ == '__main__':stock_codes = get_stock_codes()print(stock_codes)

博客内容只用于交流学习,不构成投资建议,盈亏自负!
个人博客:https://coderx.com.cn/(优先更新)
项目最新代码:https://gitee.com/sl/quant_from_scratch
欢迎大家转发、留言。已建微信群用于学习交流,群1已满,群2已创建,感兴趣的读者请扫码加微信!

从MySQL中读取股票数据——从零到实盘10相关推荐

  1. 按日更新股票数据——从零到实盘13

    前文介绍了多进程创建股票数据的过程,整个创建过程大概约10几分钟.在实盘时,每个交易日都有新数据生成,我们没有必要对全面历史时间都进行重新创建计算,只需要下载新产生的日线数据,每次更新需要3分钟左右. ...

  2. 随机从mysql中读取_如何实现MySQL表数据随机读取?从mysql表中读取随机数据

    文章转自 http://blog.efbase.org/2006/10/16/244/ 如何实现MySQL表数据随机读取?从mysql表中读取随机数据?以前在群里讨论过这个问题,比较的有意思.mysq ...

  3. 从钱龙数据中读取股票权息信息导入到数据库

    从钱龙数据中读取股票权息信息导入到数据库 前面写了如果读股票代码和日线数据,下面是如何读股票的权息信息. 钱龙中权息数据存储在QLDATA/history/shase/weight和QLDATA/hi ...

  4. Python 基于Python从mysql表读取千万数据实践

    基于Python 从mysql表读取千万数据实践   by:授客 QQ:1033553122 场景:   有以下两个表,两者都有一个表字段,名为waybill_no,我们需要从tl_waybill_b ...

  5. Spark-SQL从MySQL中加载数据以及将数据写入到mysql中(Spark Shell方式,Spark SQL程序)

    1. JDBC Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中. 1.1. 从MySQ ...

  6. MySQL 中删除的数据都去哪儿了?

    不知道大家有没有想过下面这件事? 我们平时调用 DELETE 在 MySQL 中删除的数据都去哪儿了? 这还用问吗?当然是被删除了啊 那么这里又有个新的问题了,如果在 InnoDB 下,多事务并发的情 ...

  7. spark mysql 写_Spark-SQL从MySQL中加载数据以及将数据写入到mysql中(Spark Shell方式,Spark SQL程序)...

    1. JDBC Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中. 1.1. 从MySQ ...

  8. matlab 十六进制数组,【MATLAB】MATLAB中读取二进制数据文件并加入到矩阵中

    MATLAB中读取二进制数据文件并加入到矩阵中的应用如下: 如果对c语言十分熟悉的话,应该对fopen,fclose,ftell,fseek,fread,fwrite,feof 这些函数非常熟悉了,在 ...

  9. php数据存储mysql_php – 在MySQL中存储路线数据的最佳方式

    我正在开发一个应用程序,它要求我存储一些位置的方向,下面是我试图存储的数据的示例: 方向1 从西部:乘528 East(Beechline),经过机场出口,然后从13号出口驶入Narcoossee R ...

最新文章

  1. 2021年大数据Spark(四十六):Structured Streaming Operations 操作
  2. IoT -- (九) IoT通讯技术选型与模型设计
  3. snmpwalk命令常用方法
  4. c++数字转化为字符串、字符串转换为数字
  5. 厨师刀行业调研报告 - 市场现状分析与发展前景预测(2021-2027年)
  6. java生产者消费者代码_在Java面试中常遇到的技术问题汇总
  7. asp连接linux下的oracle,如何从ASP连接到Oracle Server?
  8. 后端-SpringBoot MySql 连接语句
  9. Flume系列一之架构介绍和安装
  10. 基于JavaWeb的幼儿园信息管理
  11. Android TextToSpeech TTS中文文本转语音(语音合成)
  12. 手机计算机怎么玩24点游戏,计算器上的24点游戏怎么操作
  13. pytorch动态调整学习率之Poly策略
  14. js禁止输入框输入特殊符号或emoji表情
  15. 自己整理的资料 视频格式以及参数含义
  16. 使用GCD 转自 Posted by 唐巧
  17. git如何安装aur_Linux┊一个好用的AUR工具yaourt | 简单.生活
  18. php 合成微信头像,PHP 图片合成、仿微信群头像的方法示例
  19. jme示例代码中的素材在哪
  20. docker-compose docker容器编排插件

热门文章

  1. 怎么给新加的固态硬盘装系统
  2. 计算机二级程序设计提交,程序设计方法与风格(计算机二级复习指导)
  3. OSPF ASBR及4类LSA研究
  4. 【云原生网关】Kong 使用详解
  5. 用户行为分析,就该这么做!
  6. python爬虫爬取网页壁纸图片(《底特律:变人》)
  7. Python hash函数详解
  8. Dynamics CRM 向视图列添加自定义图标和提示信息
  9. 大量冷笑话 (冬天别看哦~)
  10. C# 证书 .cer, .pfx 创建,加解密 导出为 Base64编码文件