mysql thread conn_MySQL源码阅读2-连接与线程管理
本篇是第二篇,MySQL初始化完成之后,便进入一个死循环中,接受客户端请求,并完成客户端的命令(如果在window下启动多个listener,则分别启动线程监听)。该篇介绍MySQL服务中的连接与线程管理。主要将介绍与连接相关的类与流程。
1. 类图与类之间关系
与连接和线程管理相关的代码主要在mysql-server-8.0/sql/conn_handler中,主要包括以下类:Connection_handler_manager:单例类,用来管理连接处理器;
Connection_handler:连接处理的抽象类,具体实现由其子类实现;
Per_thread_connection_handler:继承了Connection_handler,每一个连接用单独的线程处理,默认为该实现,通过thread_handling参数可以设置;
One_thread_connection_handler:继承了Connection_handler,所有连接用同一个线程处理;
Plugin_connection_handler:继承了Connection_handler,支持由Plugin具体实现的handler,例如线程池;
Connection_acceptor:一个模版类,以模版方式支持不同的监听实现(三个listener)。并且提供一个死循环,用以监听连接;
Mysqld_socket_listener:实现以Socket的方式监听客户端的连接事件,并且支持TCP socket和Unix socket,即对应的TCP_socket和Unix_socket;
Shared_mem_listener:通过共享内存的监听方式;
Named_pipe_listener:通过命名管道来监听和接收客户请求;
Channel_info:连接信道的抽象类,具体实现有Channel_info_local_socket和Channel_info_tcpip_socket,
Channel_info_local_socket:与本地方式与服务器进行交互;
Channel_info_tcpip_socket:以TCP/IP方式与服务器进行交互;
TCP_socket:TCP socket,与MySQL服务不同机器的连接访问;
Unix_socket:Unix socket,与MySQL服务相同主机的连接访问;
以上为连接管理中用到的类,类之间关系如下图所示。其中,三个Listener将作为Connection_acceptor模版类作为具体的Listener。简单关系就是通过listener监听请求,并创建连接Channel_Info的具体类,然后通过单例Connection_handler_manager指派给具体的Connection_Handler进行调度。下面将详细介绍整个流程。连接相关的类图
2. 连接处理流程
本章节主要介绍连接处理的整个过程,下图是连接处理的流程图。
下面将详细介绍整个流程的过程。
2.1 监听处理框架
初始化之后,进入Connection_acceptor::connection_event_loop,对于window来说是由三个线程分别调用Socket/NamePipe/SharedMemory的connection_event_loop,以下是(mysql-server-8.0/sql/conn_handler/connection_acceptor.h)connection_event_loop的代码。
void connection_event_loop() {
Connection_handler_manager *mgr =
Connection_handler_manager::get_instance();
while (!connection_events_loop_aborted()) {
Channel_info *channel_info = m_listener->listen_for_connection_event();
if (channel_info != NULL) mgr->process_new_connection(channel_info);
}
}
该函数完成的功能就是通过监听客户请求listen_for_connection_event(),然后处理请求process_new_connection(),其中,connection_events_loop_aborted()为一个flag函数表示是否被终止;具体是哪个listener则是由模版填充。这里主要关注Mysqld_socket_listener。
2.2Mysqld_socket_listener
在介绍监听之前需要对该listener进行初始化,即setup_listener函数。具体监听的socket列表是在初始化的setup_listener中完成的,在m_socket_map中第一个存的是admin socket,后面跟着是普通的socket,具体函数链为:setup_listener -> 填充socket map -> setup_connection_events -> add_socket_to_listener。所以listener会监听多个socket,且admin是列表中的第一个socket。
1)获取请求
listen_for_connection_event监听了fd的连接请求,支持了poll或select方式,这里没有采用epoll的方式,但是这个并不会对MySQL的性能有太大的影响,这是因为MySQL的主要瓶颈不在这。
在监听到有新请求之后,调用Mysqld_socket_listener::get_ready_socket获得已经ready的socket,具体逻辑如下(对于采用poll或select均是如是逻辑):检查是否有admin的socket请求,如果有将admin的socket返回;
否则返回第一个ready的MySQL socket;
2)接收请求
在获得有效的socket之后,进入accept_connection,进入(inline_)mysql_socket_accept,调用accept接收请求。
接到connect socket之后,构造一个Channel_info,为Channel_info_local_socket(为unix_socket时,同一机器访问时)或Channel_info_tcpip_socket(tcp socket时),将该对象作为listen_for_connection_event的返回值。并且所有与连接的相关的信息都保存在Channel_info中。
当然在以上阶段的任一个时候出现错误,那么都将关闭该socket。
2.3 处理新连接(process_new_connection)
处理新连接的代码逻辑如下:首先检查服务是否已经停止,然后check_and_incr_conn_count增加连接计数,如果超过连接上限,那么将拒绝连接,并关闭连接;
如果通过,则将该连接请求加到Connection_handler(Per_thread_connection_handler)中add_connection函数;
void Connection_handler_manager::process_new_connection(
Channel_info *channel_info) {
if (connection_events_loop_aborted() ||
!check_and_incr_conn_count(channel_info->is_admin_connection())) {
channel_info->send_error_and_close_channel(ER_CON_COUNT_ERROR, 0, true);
delete channel_info;
return;
}
if (m_connection_handler->add_connection(channel_info)) {
inc_aborted_connects();
delete channel_info;
}
}
下面是两个函数的具体逻辑。
1)判断连接上限
check_and_incr_conn_count中做一个简单的判断,即:当前连接数如果大于设置的最大连接数,并且当前请求不是管理员连接,那么将拒绝;
如果没有超过上限,或者是管理员连接,那么将运行连接。也就是说,当连接满了之后,管理员还是可以登陆的。
个人感觉这里存在一个问题,就是假如一开始将最大连接数设置得比较大,如10000,并且并发请求量也到达了该数量,但是后续请求量下降了,那么还是会存在10000个线程,并不会自动缩小线程个数。
2) add_connection
add_connection 是将连接放置线程中,如果有空闲的线程,那么将直接利用空闲的线程;否则将创建一个新的线程来处理新连接。
bool Per_thread_connection_handler::add_connection(Channel_info *channel_info) {
int error = 0;
my_thread_handle id;
if (!check_idle_thread_and_enqueue_connection(channel_info)) return false;
/*There are no idle threads avaliable to take up the newconnection. Create a new thread to handle the connection*/
channel_info->set_prior_thr_create_utime();
error =
mysql_thread_create(key_thread_one_connection, &id, &connection_attrib,
handle_connection, (void *)channel_info);
if (error) {
// 错误处理... }
Global_THD_manager::get_instance()->inc_thread_created();
DBUG_PRINT("info", ("Thread created"));
return false;
}
check_idle_thread_and_enqueue_connection 做的事情就是检查当前是否有空闲状态的线程;有的话,则将channel_info加入到队列中,并向空闲线程发送信号;否则创建新线程,并执行handle_connection函数。
hangle_connection函数逻辑如下:首先是初始化线程需要的内存;
然后创建一个THD对象init_new_thd;
创建/或重用psi对象,并加到thd对象;
将thd对象加到thd manager中;
调用thd_prepare_connection做验证,主要有以下:调用thd_prepare_connection->login_connection->check_connection,验证登陆用户,通过获得客户端ip,做acl检查客户主机(acl_check_host),通过vio_keepalive设置保活;
acl_authenticate校验客户端,并且更新thd的上下文,包括了字符集和用户名(parse_com_change_user_packet);
do_auth_once:用指定的plugin名去检验用户的权限;并且后面会更新thd的用户和账号;检查用户的acl,并更新密码锁(check_and_update_password_lock_state);
检查该用户是否能够作为代理用户与其他相关授权检查(acl_find_proxy_user);ssl检查;密码是否超时等;检查单个用户是否超过最大连接;如果以上检查全部通过,那么将执行prepare_new_connection_state准备处理用户请求;检查是否有压缩配置,有的话准备压缩相关上下文;
申请内存,并将动态系统变量复制到线程独享,如字符集/事务设置/最大包等变量;
执行初始化的命令,并处理错误下面便是do_command(thd);执行单条query或者command,后续会详细介绍该函数的调用栈;
end_connection,关闭一个已经建立的连接,release_user_connection释放用户连接,并且释放线程回线程池;close_connection,disconnect关闭当前会话的vio;退出当前用户,db的限制,acl等上下文释放资源thd中的资源,处理其他信息。
最后会等待下一个连接,Per_thread_connection_handler::block_until_new_connection,监听接收前面提到的check_idle_thread_and_enqueue_connection发过来的信号;
extern "C" {
static void *handle_connection(void *arg) {
Global_THD_manager *thd_manager = Global_THD_manager::get_instance();
Connection_handler_manager *handler_manager =
Connection_handler_manager::get_instance();
Channel_info *channel_info = static_cast(arg);
bool pthread_reused MY_ATTRIBUTE((unused)) = false;
if (my_thread_init()) { //初始化需要的内存 // 错误处理 my_thread_exit(0);
return NULL;
}
for (;;) {
THD *thd = init_new_thd(channel_info);
if (thd == NULL) {
// 错误处理... break; // We are out of resources, no sense in continuing. }
#ifdef HAVE_PSI_THREAD_INTERFACE if (pthread_reused) {
/*Reusing existing pthread:Create new instrumentation for the new THD job,and attach it to this running pthread.*/
PSI_thread *psi = PSI_THREAD_CALL(new_thread)(key_thread_one_connection,
thd, thd->thread_id());
PSI_THREAD_CALL(set_thread_os_id)(psi);
PSI_THREAD_CALL(set_thread)(psi);
}
/* Find the instrumented thread */
PSI_thread *psi = PSI_THREAD_CALL(get_thread)();
/* Save it within THD, so it can be inspected */
thd->set_psi(psi);
#endif/* HAVE_PSI_THREAD_INTERFACE */ mysql_thread_set_psi_id(thd->thread_id());
mysql_thread_set_psi_THD(thd);
mysql_socket_set_thread_owner(
thd->get_protocol_classic()->get_vio()->mysql_socket);
thd_manager->add_thd(thd);
if (thd_prepare_connection(thd))
handler_manager->inc_aborted_connects();
else {
while (thd_connection_alive(thd)) {
if (do_command(thd)) break;
}
end_connection(thd);
}
close_connection(thd, 0, false, false);
thd->get_stmt_da()->reset_diagnostics_area();
thd->release_resources();
// Clean up errors now, before possibly waiting for a new connection.#if OPENSSL_VERSION_NUMBER < 0x10100000L ERR_remove_thread_state(0);
#endif/* OPENSSL_VERSION_NUMBER < 0x10100000L */ thd_manager->remove_thd(thd);
Connection_handler_manager::dec_connection_count();
#ifdef HAVE_PSI_THREAD_INTERFACE /*Delete the instrumentation for the job that just completed.*/
thd->set_psi(NULL);
PSI_THREAD_CALL(delete_current_thread)();
#endif/* HAVE_PSI_THREAD_INTERFACE */
delete thd;
// Server is shutting down so end the pthread. if (connection_events_loop_aborted()) break;
channel_info = Per_thread_connection_handler::block_until_new_connection();
if (channel_info == NULL) break;
pthread_reused = true;
if (connection_events_loop_aborted()) {
// 错误处理 }
}
my_thread_end();
my_thread_exit(0);
return NULL;
}
} // extern "C"
以上便是整个线程和连接的管理流程。
3. 总结
本文主要介绍了MySQL服务中的线程维护和连接初始化的处理流程,具体每一个指令怎么处理的,是在do_command中执行的,后续将会阅读到。
如果上述中有误,往指正~
上篇:Wenguang Liu:MySQL源码阅读1-启动初始化zhuanlan.zhihu.com
下篇:Wenguang Liu:MySQL源码阅读3-THD对象zhuanlan.zhihu.com
mysql thread conn_MySQL源码阅读2-连接与线程管理相关推荐
- Druid源码阅读3-DruidDataSource连接池的基本原理
DruidDataSource数据库连接池的的本质,实际上是一个利用ReentrentLock和两个Condition组成的生产者和消费者模型. 1.DruidDataSource中的锁 在Druid ...
- tidb mysql 协议_TiDB源码阅读(二) TiDB中的MySQL协议
当 SQL 通过上一篇所描述的路径来到这里, dispatch 这个函数如它的名字一样,对不同类型的 SQL 语句 case 到不同的函数中,那咱们来看看这个函数吧,在这里: server/conn. ...
- mysql thd_MySQL源码阅读3-THD对象
这里用单独的一篇介绍THD对象,目前看来THD对象是线程处理用户语句请求的核心类.然而该类实在是太大了,单单类的定义就差不多有了3500行.因此,这里不能够完全理解透,所以介绍得也不会全.望谅解- 1 ...
- Alibaba Druid 源码阅读(五)数据库连接池 连接关闭探索
Alibaba Druid 源码阅读(五)数据库连接池 连接关闭探索 简介 在上文中探索了数据库连接池的获取,下面接着初步来探索下数据库连接的关闭,看看其中具体执行了那些操作 连接关闭 下面的具体的代 ...
- Alibaba Druid 源码阅读(四) 数据库连接池中连接获取探索
Alibaba Druid 源码阅读(四) 数据库连接池中连接获取探索 简介 上文中分析了数据库连接池的初始化部分,接下来我们来看看获取连接部分的代码 数据库连接池中连接获取 下面的相关的代码,在代码 ...
- mysql 1260,MYSQL 源码阅读 六
前期节要 MYSQL源码阅读 一 MYSQL源码阅读 二 MYSQL源码阅读 三 MYSQL 源码阅读 四 MYSQL 源码阅读 五 上次有两个问题没搞明白 1 是 为什么一定要开启调试线程 ? 因为 ...
- Alibaba Druid 源码阅读(三) 数据库连接池初始化探索
Alibaba Druid 源码阅读(三) 数据库连接池初始化探索 简介 上文中探索了Alibaba Druid的连接池初始化和获取连接的关键代码,接下来详细看看初始化部分 数据库连接池初始化 对整个 ...
- gh-ost大表DDL工具源码阅读
gh-ost大表DDL工具源码阅读 最终目的 开发环境与测试数据库准备 一个简单的ddl案例 debug分析程序执行过程 vscode debug配置 变量介绍 核心处理逻辑 分析我的需求 最终目的 ...
- MyBatis 源码阅读 -- 核心操作篇
核心操作包是 MyBatis 进行数据库查询和对象关系映射等工作的包.该包中的类能完成参数解析.数据库查询.结果映射等主要功能.在主要功能的执行过程中还会涉及缓存.懒加载.鉴别器处理.主键自增.插件支 ...
最新文章
- 蚂蚁森林合种计划(2020.10.31,7天有效,每周更新)
- 三层神经网络前向后向传播示意图
- asp.net UrlRewrite 技术的实现
- HOG特征向量的代码
- python 好用的库存尾货女装_女装店主:做尾货有人能赚大钱,新手千万别碰,文茵告诉你原因...
- 函数的参数-在函数内部使用方法修改可变参数会影响外部实参
- 七、Web服务器——Junit单元测试 反射 注解学习笔记
- Java 3D编程实践_Java 3D编程实践——网络上的三维动画[学习笔记]
- 36数字在排序数组中出现的次数
- Office 2007中的config.xml个性定制说明
- set nocount on的疑问 set nocount on作用 set nocount on什么意思
- 【Java 8 新特性】Java LocalDate 详解
- 【android】安卓高仿京东分类页
- ucla计算机科学博士排名,加州大学洛杉矶分校专业排名一览及最强专业推荐(QS世界大学排名)...
- 频繁默认网关不可用_win7系统默认网关不可用频繁掉线的解决方法
- 两款程序员的好帮手——BitNami,Hoo WinTail
- xlive.dll缺失怎么办
- matlab 显示程序进度条,matlab学习---------------进度条waitbar
- 微信公众号怎么生成带粉丝关注统计的渠道二维码
- 电脑连接移动设备android驱动程序,安卓手机连接电脑操作最简单的方法介绍