我选用的数据库是Mysql。选用它是因为其可以满足我的需求,而且资料多。因为作为第三方工具,难免有一些配置问题。所以本文也会讲解一些和Mysql配置及开发相关的问题。(转载请指明出于breaksoftware的csdn博客)

数据库管理器

Mysql的安装我就不说了。我先说说和我习惯相关的一个问题:我希望在我Windows系统上可以通过Navicat for Mysql连接到我Ubuntu上的Mysql服务器。这块问题的解决可以参见《允许ubuntu下mysql远程连接》。

然后需要准备Python下进行Mysql开发的一些环境

apt-get install python-dev
apt-get install libmysqld-dev
apt-get install libmysqlclient-dev
updatedb
locate mysql_config
pip install MySQL-python -i http://pypi.douban.com/simple

由于我们要进行分表,所以数据库连接数要进行增大。于是需要修改mysql的配置

max_connections=1000

基础环境配置好后,我们就可以开始进行数据库管理器的设计和实现了。

数据库连接类

数据库连接我们使用PooledDB连接池,使用这个库的最大好处是我们可以不用考虑很多底层的重连和多线程问题。

from DBUtils.PooledDB import PooledDB
class mysql_conn():def __init__(self, host_name, port_num, user_name, password, db_name, charset_name = "utf8"):self._host = host_nameself._port = int(port_num)self._user = user_nameself._passwd = passwordself._db = db_nameself._charset = charset_nameself._pool = Noneself._table_info = {}self.re_connect()

re_connect方法要考虑数据库不存在的情况。

    def re_connect(self):self._try_close_connect()try:self._pool = PooledDB(creator=MySQLdb, mincached=1, maxcached=20, maxconnections = 3, host = self._host, port = self._port, user = self._user, passwd = self._passwd, db = self._db, charset = self._charset)LOG_INFO("connect %s success" %(self._db))self.refresh_tables_info()returnexcept MySQLdb.Error, e :if e.args[0] == 1049:self._create_db()else:LOG_WARNING("%s connect error %s" % (self._db, str(e)))returnexcept Exception as e:LOG_WARNING("connect mysql %s:%d %s error" % (self._host, self._port, self._db))return 

如果数据库不存在,MySQLdb.Error对象的值是1049,这种场景我们就需要创建数据库。如果发生其他错误,就直接报错

    def _create_db(self):conn = Nonecursor = Nonetry:conn = MySQLdb.connect(host=self._host, port=self._port, user=self._user, passwd=self._passwd)cursor = conn.cursor()sql = """create database if not exists %s""" %(self._db)#LOG_INFO(sql)cursor.execute(sql)conn.select_db(self._db);conn.commit()except MySQLdb.Error, e :LOG_WARNING("%s execute error %s" % (sql, str(e)))finally:try:if cursor:cursor.close()if conn:conn.close()except:pass

创建完数据后,要关闭连接。然后再走一遍数据库连接过程,但是这次就用不判断数据库是否存在了

        try:self._pool = PooledDB(creator=MySQLdb, mincached=1, maxcached=20, maxconnections = 3, host = self._host, port = self._port, user = self._user, passwd = self._passwd, db = self._db, charset = self._charset)LOG_INFO("connect %s success" %(self._db))self.refresh_tables_info()returnexcept Exception as e:LOG_WARNING("connect mysql %s:%d %s error" % (self._host, self._port, self._db))returnif None == self._pool:LOG_WARNING("connect mysql %s:%d %s error" % (self._host, self._port, self._db))return

连接完数据库后,我们需要通过refresh_tables_info获取该库中表的信息。为什么我们需要获取这个信息呢?因为我希望在调用数据库操作时,mysql_conn类已经知晓一些字段的类型和长度,这样就可以将用户传入的数据进行相应的格式化,而从让调用者不用太多关心表字段类型。

    def refresh_tables_info(self):self._table_info = self._get_tables_info()def _get_tables_info(self):tables_info = {}tables_sql = "show tables"tables_name = self.execute(tables_sql, select = True)for table_name_item in tables_name:table_name = table_name_item[0]if 0 == len(table_name):continuecolumns_sql = "show columns from " + table_name table_info = self.execute(columns_sql, select = True)table_name = table_name_item[0]columns_info = self._get_table_info(table_info)if len(columns_info):tables_info[table_name] = columns_inforeturn tables_infodef _get_table_info(self, table_info):columns_info = {}for item in table_info:column_name = item[0]column_type_info = item[1](type, len) = self._get_column_type_info(column_type_info)columns_info[column_name] = {"type":type, "length":len}return columns_infodef _get_column_type_info(self, type_info):re_str = '(\w*)\((\d*),?.*\)'kw = re.findall(re_str, type_info)if len(kw):if len(kw[0]) > 1:return (kw[0][0], kw[0][1])return (None, None)

连接完数据库后,我们需要对表进行一系列操作,比如表查询

    def select(self, table_name, fields_array, conditions, pre = "", extend = ""):fields_str = "," . join(fields_array)conds = []for (column_name, column_data_info) in conditions.items():column_type = self._get_column_type(table_name, column_name)column_data = column_data_info[0]operation = column_data_info[1]if isinstance(column_data, list):new_datas = []for item in column_data:new_data = self._conv_data(item, column_type)try:new_datas.append(new_data)except:LOG_WARNING("%s %s conv error" %(item, column_type))temp_str = "," . join(new_datas)cond = column_name + " " + operation  + " (" + temp_str + ")"conds.append(cond)else:new_data = self._conv_data(column_data, column_type)try:cond = column_name + " " + operation + " " + new_dataconds.append(cond)except:LOG_WARNING("%s %s conv error" %(column_data, column_type))conds_str = " and " . join(conds)sql = "select " + pre + " " + fields_str + " from " + table_nameif len(conds_str) > 0:sql = sql + " where " + conds_strif len(extend) > 0:sql = sql + " " + extenddata_info = self.execute(sql, select = True)return data_info

select方法中table_name是表名;fields_array是需要查询的字段数组;conditions是查询条件的Key/Value对,其中Key是字段名称,Value是个数组,数组的第一个元素是表达式右值,第二个元素是表达式的操作符。比如条件a>1 and b < 2,则conditions是{"a":["1",">"],"b":["2","<"] }。这儿需要考虑表达式右值是一个数组的场景,比如 a in (1,2,3)这样的条件,所以组装条件时做了特殊处理。

在处理表中数据的时候,比如查询语句的条件中有表中字段信息,再比如更新、插入数据语句中也有相关信息,这个时候都需要调用_get_column_type方法获取字段类型,然后通过_conv_data方法将数据进行格式化——当然目前这个函数还不能涵盖所有类型。

    def _get_column_type(self, table_name, column_name):if 0 == len(self._table_info):self.refresh_tables_info()if table_name not in self._table_info.keys():LOG_WARNING("table_%s info in not exist" %(table_name))return "None"if column_name not in self._table_info[table_name].keys():LOG_WARNING("column name %s is not in table %s" % (column_name, table_name))return "None"return self._table_info[table_name][column_name]["type"]def _conv_data(self, data, type):if type == "varchar" or type == "char":return '"%s"' % (data)elif type == "float" or type == "double":try:conv_data = float(data)return "%.8f"  % (conv_data)except Exception as e:LOG_WARNING("conv %s to %s error" % (data, type))return "0"elif type == "tinyint" or type == "bigint" or type == "int":return "%d" % (int(data))

数据的更新操作和插入操作我就不把代码贴出来了。大家可以到之后公布的源码地址里看。

最后说明下操作执行的方法

    def execute(self, sql, select = False, commit=False):try:data = ()conn = self._pool.connection()cursor = conn.cursor()data = cursor.execute(sql)if select:data = cursor.fetchall()if commit:conn.commit()cursor.close()except Exception as e:LOG_WARNING("excute sql error %s" % (str(e)))LOG_ERROR_SQL("%s" % (sql))finally:cursor.close()conn.close()return data

一些操作我们需要数据库服务马上去执行,如创建数据库和创建表操作,因为我们在创建后立即会去使用或者查询相关信息。如果不及时执行,将影响之后的结果。这个场景下我们需要把commit参数设置为True。当然这种方式不要滥用,否则会影响数据库执行效率。

还有一些操作我们需要关心返回结果,比如select指令。这个时候就需要通过fetchall获取全部数据并返回。而创建表等操作则不需要fetchall结果。

连接管理类

因为我们数据库是分库的,而上述每个连接只管理一个数据库的操作,所以我们需要一个连接管理器去管理这些连接。

连接管理类是个单例,它通过modify_conns方法连接每个数据库

@singleton
class mysql_manager():def __init__(self):self._conns = {}def modify_conns(self, conns_info):for (conn_name, conn_info) in conns_info.items():conn_info_hash = frame_tools.hash(json.dumps(conn_info))if conn_name in self._conns.keys():if conn_info_hash in self._conns[conn_name].conns_dict.keys():continueelse:self._conns[conn_name] = mysql_conn_info()for key in conf_keys.mysql_conn_keys:if key not in conn_info.keys():continueconn_obj = mysql_conn(conn_info["host"], conn_info["port"], conn_info["user"], conn_info["passwd"], conn_info["db"], conn_info["charset"])self._conns[conn_name].conns_dict[conn_info_hash] = conn_objself._conns[conn_name].valid = 1self._print_conns()

如果数据库连接信息发生改变,则需要将发生改变的数据库连接置为无效,然后再重新连接并记录

    def add_conns(self, conns_info):self.modify_conns(conns_info)def remove_conns(self, conns_info):for (conn_name, conn_info) in conns_info.items():conn_info_hash = frame_tools.hash(json.dumps(conn_info))if conn_name in  self._conns.keys():if conn_info_hash in self._conns[conn_name].conns_dict.keys():self._conns[conn_name].valid = 0self._print_conns()

连接管理类通过get_mysql_conn方法暴露连接对象

    def get_mysql_conn(self, conn_name):if conn_name not in self._conns.keys():return Noneconn_info = self._conns[conn_name]valid = self._conns[conn_name].validif 0 == valid:return Noneconns_dict_keys = self._conns[conn_name].conns_dict.keys()if len(conns_dict_keys) == 0:return Nonekey = conns_dict_keys[-1]ret_conn = self._conns[conn_name].conns_dict[key]return ret_conn

它还暴露了一个刷新所有数据库中表信息的方法,用于在系统任务中定期更新内存中信息,保证数据稳定写入。

    def refresh_all_conns_tables_info(self):for (conn_name, conn_info) in self._conns.items():conn = self.get_mysql_conn(conn_name)if None != conn:conn.refresh_tables_info()

连接管理配置

我共设计了三种数据库。一种是保存股票基础数据的数据库,其配置是

[stock_db]
host=127.0.0.1
port=3306
user=root
passwd=fangliang
db=stock
charset=utf8

一个是保存每日实时数据的数据库

[daily_temp]
host=127.0.0.1
port=3306
user=root
passwd=fangliang
db=daily_temp
charset=utf8

最后一种是按股票代码分类的库,这种库有300个,设计原因我在《码农技术炒股之路——架构和设计》有说明

[stock_part]
host=127.0.0.1
port=3306
user=root
passwd=fangliang
db=stock_part
charset=utf8
range_max=300

注意range_max这个参数,如果配置中有该参数,则代表其是一个数据库组

class mysql_conf_parser:def parse(self, job_conf_path):cp = ConfigParser.SafeConfigParser()cp.read(job_conf_path)sections = cp.sections()conns_info = {}for section in sections:conn_info = {}for key in conf_keys.mysql_conn_keys:if False == cp.has_option(section, key):LOG_WARNING()continueconn_info[key] = cp.get(section, key)if cp.has_option(section, "range_max"):range_max = int(cp.get(section, "range_max"))db_name_base = conn_info["db"] for index in range(0, range_max):conn_info["db"] = db_name_base + "_" + str(index)section_index_name = section + "_" + str(index)conns_info[section_index_name] = copy.deepcopy(conn_info)else:conns_info[section] = conn_inforeturn conns_info

最终我们将建成下图所示数据库

正则表达式管理器

当我们从数据源获取数据后,需要使用一系列正则将原始数据转换成一组数据。然后才可以将这些数据写入数据库。举个例子,我们先看下正则管理器的配置

[string_comma_regular]
regular_expression_0 = data:\[(.*)\]
regular_expression_1 = "[^"]+"
regular_expression_2 = [^,"]+[hq_sinajs_cn_list]
regular_expression_0 = var hq_str_([^;]*);
regular_expression_1 = ([^,="shz]+)[quotes_money_163]
regular_expression_0 = ([^\r\n]+)
regular_expression_1 = ([^,'\r\n]+)

每一节都是一个正则名称,其下都是以“regular_expression_”开始的关键字。正则执行的顺序从序号小的向序号大的方向执行。我们通过下面的配置解释器读取配置

import ConfigParserclass regulars_manager_conf_parser:def parse(self, regulars_conf_path):cp = ConfigParser.SafeConfigParser()cp.read(regulars_conf_path)sections = cp.sections()regulars_info = {}for section in sections:regular_info = []regular_name_pre = "regular_expression_"for index in range(0, len(cp.options(section))):regular_name = regular_name_pre + str(index)if cp.has_option(section, regular_name):regular_info.append(cp.get(section, regular_name))else:breakregulars_info[section] = regular_inforeturn regulars_info

正则表达式管理通过下面方法维护信息

@singleton
class regular_split_manager():def __init__(self):self._regulars = {}def modify_regulars(self, regulars_info):for (regular_name, regular_info) in regulars_info.items():self._regulars[regular_name] = regulars_infodef add_regulars(self, regulars_info):for (regular_name, regular_info) in regulars_info.items():self._regulars[regular_name] = regular_infodef remove_regulars(self, regulars_info):for (regular_name, regular_info) in regulars_info.items():if regular_name in self._regulars.keys():del self._regulars[regular_name]

通过get_split_data方法可以将数据通过指定的正则名称进行分解,且分解到最后一步

    def get_split_data(self, data, regular_name):data_array = []self._recursion_regular(data, regular_name, 0, data_array)   return data_arraydef _get_regular(self, regular_name, deep):if regular_name not in self._regulars.keys():LOG_WARNING("regular manager has no %s" % (regular_name))return ""if deep >= len(self._regulars[regular_name]):return ""return self._regulars[regular_name][deep]def _recursion_regular(self, data, regular_name, deep, data_array):regular_str = self._get_regular(regular_name, deep)split_data = re.findall(regular_str, data)regualer_next_str = self._get_regular(regular_name, deep + 1)split_array = []if len(regualer_next_str) > 0:for item in split_data:self._recursion_regular(item, regular_name, deep + 1, data_array)else:for item in split_data:split_array.append(item)if len(split_array) > 0:data_array.append(split_array)

有了上述各种管理器,我们已经把主要的准备工作做好。下一篇我将介绍最核心的任务调取管理器,它才是上述管理器最终的使用方。

码农技术炒股之路——数据库管理器、正则表达式管理器相关推荐

  1. 码农技术炒股之路——抓取股票基本信息、实时交易信息、主力动向信息

    从本节开始,我们开始介绍各个抓取和备份业务.(转载请指明出于breaksoftware的csdn博客) 因为我们数据库很多,数据库中表也很多,所以我们需要一个自动检测并创建数据库和表的功能.在< ...

  2. 码农技术炒股之路——配置管理器、日志管理器

    配置管理器和日志管理器是项目中最为独立的模块.我们可以很方便将其剥离出来供其他Python工程使用.文件的重点将是介绍Python单例和logging模块的使用.(转载请指明出于breaksoftwa ...

  3. 码农技术炒股之路——任务管理器

    系统任务和普通任务都是通过任务管理器调度的.它们的区别是:系统任务在程序运行后即不会被修改,而普通任务则会被修改.(转载请指明出于breaksoftware的csdn博客) 为什么要有这样的设计?因为 ...

  4. 码农技术炒股之路——抓取日线数据、计算均线和除权数据

    日线数据是股票每日收盘后的信息.这块数据不用实时抓取,所以并不占用宝贵的交易时间的资源.于是我们抓取完数据后直接往切片后的数据库中保存.(转载请指明出于breaksoftware的csdn博客) 抓取 ...

  5. 码农技术炒股之路——实时交易信息、主力动向信息分库备份

    一般来说,一个股票信息应该保存在一张表中.但是由于我机器资源限制,且我希望尽快频率的抓取数据.所以每天我将所有股票的实时交易信息放在daily_temp库中的一个以日期命名的表中.主力动向信息也是如此 ...

  6. 一个30岁男人转型码农的平凡之路

    今天给大家带来的是一个转行的故事,一个30岁才开始学习编程的小白,资质平平,真正的零基础. 他的故事和那些大众喜欢的.夸张的.甚至虚假的华丽转身不同,一点也不精彩.一点也不鸡汤,平淡如水,但是能反映出 ...

  7. php-fpm哪里下载_如何在centos系统下找到php-fpm的位置 - 翟码农技术博客

    reboot重启了服务器后,所有的服务都需要重新启动. 启动php-fpm时,使用如下命令systemctl start php-fpm.service 提示:Failed to restart ph ...

  8. mt管理器错误信息java_MT管理器_MT管理器手机版_MT管理器清爽版_易玩网

    MT管理器清爽版App是一款非常强大的文件管理软件,在这里你能够轻松的管理自己的文件,这款软件有着非常使用的双窗口,能够增加文件的浏览量,将其置顶,能更快的找到自己想要的文件,有需要的用户赶紧来网下载 ...

  9. mt管理器java_MT管理器app

    <MT管理器app>这是一款帮助大家手机管理文件的软件,在这里你可以看到JAVA平台的双窗口操作模式,而且提供强大的管理功能,可以让你在手机上轻松掌握文件和进行管理,使用起来也是很方便,并 ...

最新文章

  1. 过分了,又双叒叕吃狗粮:因为爱情,才有思科
  2. python中itertools模块介绍---03
  3. f5 2017.09.03故障
  4. 51. N 皇后(回溯算法)
  5. 在Linux中挂载Windows端共享权限设定方法和出现报错的解决办法
  6. 【机器学习】Lasso回归(L1正则,MAP+拉普拉斯先验)
  7. 代码执行漏洞-无字母数字RCE-create_function()
  8. 电脑知识:电脑无法开机解决方案,赶紧收藏吧!
  9. 【WPF】设置DataGrid表头内容居中显示
  10. kotlin之泛型的使用
  11. 数 AI 人物还看今朝!CCAI 2017 人工智能青年论坛即将启航
  12. git分布式版本管理系统和github平台
  13. 手机 html5 hammer drag widget,javascript – HTML5使用Hammer.js拖放事件拖放div上的元素
  14. JavaWeb -- Jsp 自定义标签的使用
  15. linux学习书籍汇总 值得推荐的linux学习书籍
  16. CUDA版本与驱动对应情况
  17. Java生态技术体系科普
  18. 马王堆汉墓帛书‧老子——甲本释文(德经)
  19. 实现“附近的人”的方式原理
  20. 【模拟电路】常用的DC-DC电源电路图

热门文章

  1. io获取 pcl_点云数据可视化之PCL滤波学习
  2. (曲率系列2:)Paper6:Curvature Estimation of 3D Point Cloud Surfaces Through the Fitting of Normal
  3. 4. 编程规范和编程安全指南--go语言
  4. 中职计算机php学啥,计算机专业都学什么主要课程有什么_中职中专网
  5. r语言 断轴 画图_R语言基础画图/绘图/作图
  6. echarts横坐标文字太长显示不完的两种解决办法:rotate旋转文字、调用函数让文字纵向排列
  7. 学习用C#在Unity中创建一个2D Metroidvania游戏
  8. Carrier frequency 和 EARFCN的关系
  9. C语言中整型在计算机中的存储
  10. Windows平台下程序打包流程