前文介绍了MySQL的安装和配置过程,本文记录将股票数据写入到MySQL的过程。

安装pymysql

使用python实现与MySQL的数据读写时,需要安装相关的包。由于我们搭建开发环境时选择的是Anaconda,大部分包已经被默认安装好,这里只需要手动安装pymysql:

pip install pymysql

主要代码分析

新建源文件,命名为data_center_v7.py,全部内容见文末,v7主要涉及2个方面改动:

新增创建数据库引擎对象函数

def create_mysql_engine():

该函数用于创建数据库引擎对象,返回值为新创建的数据库引擎对象。

    host = 'localhost'user = 'root'passwd = '111111'port = '3306'db = 'db_quant'

定义引擎参数信息。

    mysql_engine = sqlalchemy.create_engine('mysql+pymysql://{0}:{1}@{2}:{3}'.format(user, passwd, host, port),poolclass=sqlalchemy.pool.NullPool)

创建数据库引擎对象,用于后面判断是否需要创建数据库。

    mysql_engine.execute("CREATE DATABASE IF NOT EXISTS {0} ".format(db))

如果不存在数据库db_quant则创建。

    db_engine = sqlalchemy.create_engine('mysql+pymysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(user, passwd, host, port, db),poolclass=sqlalchemy.pool.NullPool)

创建连接数据库db_quant的引擎对象。

    return db_engine

返回引擎对象。

修改创建数据函数

def create_data(stock_codes, from_date='1990-12-19', to_date=datetime.date.today().strftime('%Y-%m-%d'),adjustflag='2'):"""下载指定日期内,指定股票的日线数据,计算扩展因子:param stock_codes: 待下载数据的股票代码:param from_date: 日线开始日期:param to_date: 日线结束日期:param adjustflag: 复权选项 1:后复权  2:前复权  3:不复权  默认为前复权:return: None"""# 创建数据库引擎对象engine = create_mysql_engine()

这里创建数据库引擎对象,用于后续将数据写入数据库。

    # 下载股票循环for code in stock_codes:print('正在下载{}...'.format(code))# 登录BaoStockbs.login()# 下载日线数据out_df = bs.query_history_k_data_plus(code, g_baostock_data_fields, start_date=from_date, end_date=to_date,frequency='d', adjustflag=adjustflag).get_data()# 剔除停盘数据if out_df.shape[0]:out_df = out_df[(out_df['volume'] != '0') & (out_df['volume'] != '')]# 如果数据为空,则不创建if not out_df.shape[0]:continue# 删除重复数据out_df.drop_duplicates(['date'], inplace=True)# 日线数据少于g_available_days_limit,则不创建if out_df.shape[0] < g_available_days_limit:continue# 将数值数据转为float型,便于后续处理convert_list = ['open', 'high', 'low', 'close', 'preclose', 'volume', 'amount', 'turn', 'pctChg']out_df[convert_list] = out_df[convert_list].astype(float)# 重置索引out_df.reset_index(drop=True, inplace=True)# 计算扩展因子out_df = extend_factor(out_df)

以上内容与v3相同,可参考v3分析内容。

        # 写入数据库table_name = '{}_{}'.format(code[3:], code[:2])out_df.to_sql(name=table_name, con=engine, if_exists='replace', index=True, index_label='id')

使用DataFrame的to_sql方法,将数据写入MySQL。这里每只股票的数据会形成一张表,表名的格式为“代码_市场”,例如股票sh.600158中体产业的数据,会保存在600158_sh中。

查看写入数据结果

打开MySQL Workbench,点击左侧Local instance MySQL80位置,如下图所示:

第一次登录需要输入密码,登录后就可以查看本机数据库内容,导航窗口的Schemas标签内容如下:

可以看到MySQL中包含了我们创建db_quant数据库,点击db_quant左侧下拉三角,再点击其下方Table左侧的下拉三角,就可以看到我们创建的所有股票的表:

我们可以右键点击任意一张表,然后选择Select Rows - Limit 1000,就可以查看具体的数据内容:

要查看更多数据内容或者操作数据,就可以脚本窗口编写SQL语句,执行相关的操作。

小结

至此,我们完成了股票数据的创建,并把数据写入到MySQL中。

下一篇文章将记录从MySQL中读取数据的过程。


data_center_v7.py的全部代码如下:

import baostock as bs
import datetime
import sys
import numpy as np
import pandas as pd
import multiprocessing
import sqlalchemy# 可用日线数量约束
g_available_days_limit = 250# BaoStock日线数据字段
g_baostock_data_fields = 'date,open,high,low,close,preclose,volume,amount,adjustflag,turn,tradestatus,pctChg,peTTM,pbMRQ, psTTM,pcfNcfTTM,isST'def create_mysql_engine():"""创建数据库引擎对象:return: 新创建的数据库引擎对象"""# 引擎参数信息host = 'localhost'user = 'root'passwd = '111111'port = '3306'db = 'db_quant'# 创建数据库引擎对象mysql_engine = sqlalchemy.create_engine('mysql+pymysql://{0}:{1}@{2}:{3}'.format(user, passwd, host, port),poolclass=sqlalchemy.pool.NullPool)# 如果不存在数据库db_quant则创建mysql_engine.execute("CREATE DATABASE IF NOT EXISTS {0} ".format(db))# 创建连接数据库db_quant的引擎对象db_engine = sqlalchemy.create_engine('mysql+pymysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(user, passwd, host, port, db),poolclass=sqlalchemy.pool.NullPool)# 返回引擎对象return db_enginedef get_stock_codes(date=None):"""获取指定日期的A股代码列表若参数date为空,则返回最近1个交易日的A股代码列表若参数date不为空,且为交易日,则返回date当日的A股代码列表若参数date不为空,但不为交易日,则打印提示非交易日信息,程序退出:param date: 日期:return: A股代码的列表"""# 登录baostockbs.login()# 从BaoStock查询股票数据stock_df = bs.query_all_stock(date).get_data()# 如果获取数据长度为0,表示日期date非交易日if 0 == len(stock_df):# 如果设置了参数date,则打印信息提示date为非交易日if date is not None:print('当前选择日期为非交易日或尚无交易数据,请设置date为历史某交易日日期')sys.exit(0)# 未设置参数date,则向历史查找最近的交易日,当获取股票数据长度非0时,即找到最近交易日delta = 1while 0 == len(stock_df):stock_df = bs.query_all_stock(datetime.date.today() - datetime.timedelta(days=delta)).get_data()delta += 1# 注销登录bs.logout()# 筛选股票数据,上证和深证股票代码在sh.600000与sz.39900之间stock_df = stock_df[(stock_df['code'] >= 'sh.600000') & (stock_df['code'] < 'sz.399000')]# 返回股票列表return stock_df['code'].tolist()def create_data(stock_codes, from_date='1990-12-19', to_date=datetime.date.today().strftime('%Y-%m-%d'),adjustflag='2'):"""下载指定日期内,指定股票的日线数据,计算扩展因子:param stock_codes: 待下载数据的股票代码:param from_date: 日线开始日期:param to_date: 日线结束日期:param adjustflag: 复权选项 1:后复权  2:前复权  3:不复权  默认为前复权:return: None"""# 创建数据库引擎对象engine = create_mysql_engine()# 下载股票循环for code in stock_codes:print('正在下载{}...'.format(code))# 登录BaoStockbs.login()# 下载日线数据out_df = bs.query_history_k_data_plus(code, g_baostock_data_fields, start_date=from_date, end_date=to_date,frequency='d', adjustflag=adjustflag).get_data()# 剔除停盘数据if out_df.shape[0]:out_df = out_df[(out_df['volume'] != '0') & (out_df['volume'] != '')]# 如果数据为空,则不创建if not out_df.shape[0]:continue# 删除重复数据out_df.drop_duplicates(['date'], inplace=True)# 日线数据少于g_available_days_limit,则不创建if out_df.shape[0] < g_available_days_limit:continue# 将数值数据转为float型,便于后续处理convert_list = ['open', 'high', 'low', 'close', 'preclose', 'volume', 'amount', 'turn', 'pctChg']out_df[convert_list] = out_df[convert_list].astype(float)# 重置索引out_df.reset_index(drop=True, inplace=True)# 计算扩展因子out_df = extend_factor(out_df)# 写入数据库table_name = '{}_{}'.format(code[3:], code[:2])out_df.to_sql(name=table_name, con=engine, if_exists='replace', index=True, index_label='id')def get_code_group(process_num, stock_codes):"""获取代码分组,用于多进程计算,每个进程处理一组股票:param process_num: 进程数:param stock_codes: 待处理的股票代码:return: 分组后的股票代码列表,列表的每个元素为一组股票代码的列表"""# 创建空的分组code_group = [[] for i in range(process_num)]# 按余数为每个分组分配股票for index, code in enumerate(stock_codes):code_group[index % process_num].append(code)return code_groupdef multiprocessing_func(func, args):"""多进程调用函数:param func: 函数名:param args: func的参数,类型为元组,第0个元素为进程数,第1个元素为股票代码列表:return: 包含各子进程返回对象的列表"""# 用于保存各子进程返回对象的列表results = []# 创建进程池with multiprocessing.Pool(processes=args[0]) as pool:# 多进程异步计算for codes in get_code_group(args[0], args[1]):results.append(pool.apply_async(func, args=(codes, *args[2:],)))# 阻止后续任务提交到进程池pool.close()# 等待所有进程结束pool.join()return resultsdef create_data_mp(stock_codes, process_num=61,from_date='1990-12-19', to_date=datetime.date.today().strftime('%Y-%m-%d'), adjustflag='2'):"""使用多进程创建指定日期内,指定股票的日线数据,计算扩展因子:param stock_codes: 待创建数据的股票代码:param process_num: 进程数:param from_date: 日线开始日期:param to_date: 日线结束日期:param adjustflag: 复权选项 1:后复权  2:前复权  3:不复权  默认为前复权:return: None"""multiprocessing_func(create_data, (process_num, stock_codes, from_date, to_date, adjustflag,))def extend_factor(df):"""计算扩展因子:param df: 待计算扩展因子的DataFrame:return: 包含扩展因子的DataFrame"""# 使用pipe依次计算涨停、双神及是否为候选股票df = df.pipe(zt).pipe(ss, delta_days=30).pipe(candidate)return dfdef zt(df):"""计算涨停因子若涨停,则因子为True,否则为False以当日收盘价较前一日收盘价上涨9.8%及以上作为涨停判断标准:param df: 待计算扩展因子的DataFrame:return: 包含扩展因子的DataFrame"""df['zt'] = np.where((df['close'].values >= 1.098 * df['preclose'].values), True, False)return dfdef shift_i(df, factor_list, i, fill_value=0, suffix='a'):"""计算移动因子,用于获取前i日或者后i日的因子:param df: 待计算扩展因子的DataFrame:param factor_list: 待移动的因子列表:param i: 移动的步数:param fill_value: 用于填充NA的值,默认为0:param suffix: 值为a(ago)时表示移动获得历史数据,用于计算指标;值为l(later)时表示获得未来数据,用于计算收益:return: 包含扩展因子的DataFrame"""# 选取需要shift的列构成新的DataFrame,进行shift操作shift_df = df[factor_list].shift(i, fill_value=fill_value)# 对新的DataFrame列进行重命名shift_df.rename(columns={x: '{}_{}{}'.format(x, i, suffix) for x in factor_list}, inplace=True)# 将重命名后的DataFrame合并到原始DataFrame中df = pd.concat([df, shift_df], axis=1)return dfdef shift_till_n(df, factor_list, n, fill_value=0, suffix='a'):"""计算范围移动因子用于获取前/后n日内的相关因子,内部调用了shift_i:param df: 待计算扩展因子的DataFrame:param factor_list: 待移动的因子列表:param n: 移动的步数范围:param fill_value: 用于填充NA的值,默认为0:param suffix: 值为a(ago)时表示移动获得历史数据,用于计算指标;值为l(later)时表示获得未来数据,用于计算收益:return: 包含扩展因子的DataFrame"""for i in range(n):df = shift_i(df, factor_list, i + 1, fill_value, suffix)return dfdef ss(df, delta_days=30):"""计算双神因子,即间隔的两个涨停若当日形成双神,则因子为True,否则为False:param df: 待计算扩展因子的DataFrame:param delta_days: 两根涨停间隔的时间不能超过该值,否则不判定为双神,默认值为30:return: 包含扩展因子的DataFrame"""# 移动涨停因子,求取近delta_days天内的涨停情况,保存在一个临时DataFrame中temp_df = shift_till_n(df, ['zt'], delta_days, fill_value=False)# 生成列表,用于后续检索第2天前至第delta_days天前是否有涨停出现col_list = ['zt_{}a'.format(x) for x in range(2, delta_days + 1)]# 计算双神,需同时满足3个条件:# 1、第2天前至第delta_days天前,至少有1个涨停# 2、1天前不是涨停(否则就是连续涨停,不是间隔的涨停)# 3、当天是涨停df['ss'] = temp_df[col_list].any(axis=1) & ~temp_df['zt_1a'] & temp_df['zt']return dfdef ma(df, n=5, factor='close'):"""计算均线因子:param df: 待计算扩展因子的DataFrame:param n: 待计算均线的周期,默认计算5日均线:param factor: 待计算均线的因子,默认为收盘价:return: 包含扩展因子的DataFrame"""# 均线名称,例如,收盘价的5日均线名称为ma_5,成交量的5日均线名称为volume_ma_5name = '{}ma_{}'.format('' if 'close' == factor else factor + '_', n)# 取待计算均线的因子列s = pd.Series(df[factor], name=name, index=df.index)# 利用rolling和mean计算均线数据s = s.rolling(center=False, window=n).mean()# 将均线数据添加到原始的DataFrame中df = df.join(s)# 均线数值保留两位小数df[name] = df[name].apply(lambda x: round(x + 0.001, 2))return dfdef mas(df, ma_list, factor='close'):"""计算多条均线因子,内部调用ma计算单条均线:param df: 待计算扩展因子的DataFrame:param ma_list: 待计算均线的周期列表:param factor: 待计算均线的因子,默认为收盘价:return: 包含扩展因子的DataFrame"""for i in ma_list:df = ma(df, i, factor)return dfdef cross_mas(df, ma_list):"""计算穿均线因子若当日最低价不高于均线价格且当日收盘价不低于均线价格则当日穿均线因子值为True,否则为False:param df: 待计算扩展因子的DataFrame:param ma_list: 均线的周期列表:return: 包含扩展因子的DataFrame"""for i in ma_list:df['cross_{}'.format(i)] = (df['low'] <= df['ma_{}'.format(i)]) & (df['ma_{}'.format(i)] <= df['close'])return dfdef candidate(df):"""计算是否为候选若当日日线同时穿过5、10、20、30日均线且30日均线在60日均线上方且当日形成双神则当日作为候选,该因子值为True,否则为False:param df: 待计算扩展因子的DataFrame:return: 包含扩展因子的DataFrame"""# 均线周期列表ma_list = [5, 10, 20, 30, 60]# 计算均线的因子,保存到临时的DataFrame中temp_df = mas(df, ma_list)# 计算穿多线的因子,保存到临时的DataFrame中temp_df = cross_mas(temp_df, ma_list)# 穿多线因子的列名列表column_list = ['cross_{}'.format(x) for x in ma_list[:-1]]# 计算是否为候选df['candidate'] = temp_df[column_list].all(axis=1) & (temp_df['ma_30'] >= temp_df['ma_60']) & df['ss']return dfif __name__ == '__main__':stock_codes = get_stock_codes()create_data_mp(stock_codes)

博客内容只用于交流学习,不构成投资建议,盈亏自负!
个人博客:https://coderx.com.cn/(优先更新)
欢迎大家转发、留言。已建微信群用于学习交流,群1已满,群2已创建,感兴趣的读者请扫码加微信!

股票数据写入MySQL——从零到实盘9相关推荐

  1. 利用Tushare将股票数据写入MySql数据库

    一.使用的工具 a.SQLAlchemy. b .MySql. c.python3.7 二.学习资料 SQLAlchemy https://www.osgeo.cn/sqlalchemy/orm/tu ...

  2. python写入mysql数据库_python调用http接口,数据写入mysql数据库并下载录音文件

    写个脚本一共完成了三件事: 第一,python调用http接口, 第二,把调用到的数据写入mysql数据库, 第三,python调用wsdl接口,获取录音文件, import time import ...

  3. php 编写mysql_php编写数据写入mysql问题

    我刚写好的:前台是htm界面填写数据 留言板... 我刚写好的:前台是 htm界面 填写数据 留言板 你的姓名: 你的性别:男 女 你的email: 你的留言内容: 后台是: if(isset($_P ...

  4. Spark将数据写入Mysql

    前言 我在很早之前用spark读取本地文件然后使用如下代码将数据写入到mysql df.write.format("jdbc").mode(SaveMode.Append).opt ...

  5. Python爬取股票数据存入mysql数据库,获取股票(最新、最高、今开、成交量、成交额、量比、换手率、涨幅等)支持多线程+数据库连接池

    项目简介 (Python)爬虫 + MySQL + Redis项目. 爬取下来的数据可用于后续的数据分析(我计划将其用于我的毕业设计). 未来会将数据分析的可视化部署到服务器上, 并添加股票降价通知的 ...

  6. OPC服务器软件Kepware Kepserver实现与Mysql数据库连接交互(三)Kepserver 数据写入mysql数据库

    在上篇教程:OPC服务器软件Kepware Kepserver实现与Mysql数据库连接交互(一)中我们学习了MySQL数据库简介.OPC服务器软件Kepserver软件介绍.MySQL5.5数据库安 ...

  7. kepserver写入mysql_Kepserver连接Mysql教程(三)Kepserver 数据写入mysql数据库

    五.MySQL的ODBC驱动下载并安装 5.1 .下载 5.2. 配置数据源 5.3.点击"系统DSN",并点击"添加"(下图是已经添加好的) 5.4.选择&q ...

  8. kepserver写入mysql_OPC服务器软件Kepware Kepserver实现与Mysql数据库连接交互(三)Kepserver 数据写入mysql数据库...

    在上篇教程:OPC服务器软件Kepware Kepserver实现与Mysql数据库连接交互(一)中我们学习了MySQL数据库简介.OPC服务器软件Kepserver软件介绍.MySQL5.5数据库安 ...

  9. python读取excel文件数据写入MySQL数据库(入门级)

    写入前准备 1.window电脑提前安装好MySQL 2.知道自己MySQL的密码和用户名 3.提前建好要写入的数据库和数据表 1.首先测试写入一条数据进去MySQL在尝试批量写入 1.1第一数据类型 ...

最新文章

  1. pytorch cycleGAN代码学习1
  2. 封装php连接数据库返回方法
  3. JDK 1.5 环境变量的配置
  4. 对称加密-DES解密
  5. (转)OL2中设置鼠标的样式
  6. priority_queue 优先队列
  7. 欠20万信用卡卡奴自救方法
  8. mfc实验报告心得体会_mfc实验报告.doc
  9. webservice 传输数据过大,解析失败
  10. 图解机器学习—算法原理与Python语言实现(文末留言送书)
  11. linux系统发送短信,Linux系统的短信收发怎么实现?
  12. Java岗大厂面试百日冲刺 - 日积月累,每日三题【Day8】 —— Redis2
  13. Apache CXF前端应用(Frontend)
  14. 安卓神秘事件之点击事件不响应
  15. 蚂蚁花呗的交易分期(有别于账单分期)
  16. 周易六十四卦——蹇卦
  17. 8、双目测距及3D重建python
  18. UI设计中色彩搭配使用技巧
  19. html背景透明图片固定,请问在HTML中如何把一张图片的背景设定为透明的?
  20. 华为数通笔记-NSR

热门文章

  1. android 自定义 radiobutton布局,RadioButton的自定义布局
  2. dell服务器做阵列文档,DELL服务器做RAID5磁盘阵列图文教程
  3. 产品经理技能树之 渠道运营
  4. 谈谈数据分析 caoz_让我们谈谈开放数据…
  5. 沁恒(WCH)和RT-Thread共建RISC-V应用生态
  6. oracle trunc命令,Oracle的trunc函数
  7. 每秒8700万次,双11数据库峰值新纪录背后的关键力量
  8. springboot根据模板导出word
  9. 含含乐代理为球场运动员提供吸烟替代解决方案
  10. [ 英语 ] 如何解决那些让人恼火的介词?