2 从create_new_thread看连接线程
2013-12-20 10:02
330 查看
一、数据包格式
在分析create_new_thread()之前,先看一下数据包格式,所有的Mysql Net包可以归纳为:1、握手阶段(当客户端开始连接时):
从服务器到客户端:握手初始化包;从客户端到服务器:客户端认证包;从服务器到客户端:OK包、Error包。
2、命令包(客户端到服务器端的任一要求):
从客户端到服务器端:命令包;从服务器到客户端:OK包、Error包、结果集包。
备注1:每种包都有一定的格式,每种包还另有一个Net包头(说明包的长度和序列号);一个Net包可以分割成多个TCP包,或者多个Net包被打包成一个TCP包(根据长度)。
备注2:一个结果集包由一系列的包组成,其中有结果集包包头(列数)、查询返回的列包(属性包)、查询返回的行包、EOF包。
二、create_new_thread
create_new_thread()曾增加thread_id后,会调用(thread_scheduler)->add_connection (thd),即create_thread_to_handle_connection(THD *thd)。三、create_thread_to_handle_connection
void create_thread_to_handle_connection(THD *thd) { if (cached_thread_count > wake_thread) { /* Get thread from cache */ thread_cache.push_back(thd); //static I_List<THD> thread_cache; 此处将thd加到该链表中,可以设想,之后被唤醒的连接线程应该会去获得这个thd作为它的thd wake_thread++; mysql_cond_signal(&COND_thread_cache); } else { //初次调用时,线程池中还没有线程,需要创建一个线程。 char error_message_buff[MYSQL_ERRMSG_SIZE]; /* Create new thread to handle connection */ int error; thread_created++; threads.append(thd); //threads的类型为I_List<THD>, DBUG_PRINT("info",(("creating thread %lu"), thd->thread_id)); thd->prior_thr_create_utime= thd->start_utime= my_micro_time(); if ((error= mysql_thread_create(key_thread_one_connection, &thd->real_id, &connection_attrib, handle_one_connection, //线程处理函数,会进一步调用do_handle_one_connection(thd) (void*) thd))) { /* purecov: begin inspected */ DBUG_PRINT("error", ("Can't create thread to handle request (error %d)", error)); thread_count--; thd->killed= THD::KILL_CONNECTION; // Safety mysql_mutex_unlock(&LOCK_thread_count); mysql_mutex_lock(&LOCK_connection_count); --connection_count; mysql_mutex_unlock(&LOCK_connection_count); statistic_increment(aborted_connects,&LOCK_status); /* Can't use my_error() since store_globals has not been called. */ my_snprintf(error_message_buff, sizeof(error_message_buff), ER_THD(thd, ER_CANT_CREATE_THREAD), error); net_send_error(thd, ER_CANT_CREATE_THREAD, error_message_buff, NULL); close_connection(thd); mysql_mutex_lock(&LOCK_thread_count); delete thd; mysql_mutex_unlock(&LOCK_thread_count); return; /* purecov: end */ } } mysql_mutex_unlock(&LOCK_thread_count); DBUG_PRINT("info",("Thread created")); }
四、do_handle_one_connection
handle_one_connection()-------->do_handle_one_connection(thd)1、先介绍一个结构
/* Functions used when manipulating threads */ struct scheduler_functions { uint max_threads; bool (*init)(void); bool (*init_new_connection_thread)(void); //调用init_new_connection_handler_thread,进一步调用my_thread_init() void (*add_connection)(THD *thd); //本例调用create_thread_to_handle_connection void (*thd_wait_begin)(THD *thd, int wait_type); void (*thd_wait_end)(THD *thd); void (*post_kill_notification)(THD *thd); bool (*end_thread)(THD *thd, bool cache_thread); void (*end)(void); };
2、do_handle_one_connection
void do_handle_one_connection(THD *thd_arg) { THD *thd= thd_arg; thd->thr_create_utime= my_micro_time(); if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0)) //根据前面,最终调用my_thread_init(),主要作用是初始化一个struct st_my_thread_var,并将其指针赋给THR_KEY_mysys(这是个共享的pthread_key_t),同时增加thread_id. { close_connection(thd, ER_OUT_OF_RESOURCES); statistic_increment(aborted_connects,&LOCK_status); MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0)); return; } /* If a thread was created to handle this connection: increment slow_launch_threads counter if it took more than slow_launch_time seconds to create the thread. */ if (thd->prior_thr_create_utime) { ulong launch_time= (ulong) (thd->thr_create_utime - thd->prior_thr_create_utime); if (launch_time >= slow_launch_time*1000000L) statistic_increment(slow_launch_threads, &LOCK_status); thd->prior_thr_create_utime= 0; } /* handle_one_connection() is normally the only way a thread would start and would always be on the very high end of the stack , therefore, the thread stack always starts at the address of the first local variable of handle_one_connection, which is thd. We need to know the start of the stack so that we could check for stack overruns. */ thd->thread_stack= (char*) &thd; if (setup_connection_thread_globals(thd)) //调用thd->store_globals(),实际上它获得THR_KEY_mysys,其中重新设置thread_id(由mysqld自行定义),这样我们可以将THD移到其他线程。? return; for (;;) { bool rc; rc= thd_prepare_connection(thd); //其中调用lex_start()(词法分析相关,先不管),login_connection()(其中会调用check_connection(),查看用户权限,之后想客户端发送"Welcome"),之后调用MYSQL_AUDIT_NOTIFY_CONNECTION_CONNECT(thd),最后调用prepare_new_connection_state(thd)(Initialize THD to handle queries,其中会调用thd->init_for_queries()) if (rc) goto end_thread; while (thd_is_connection_alive(thd)) { mysql_audit_release(thd); //Release any resources associated with the current thd. if (do_command(thd)) //Read one command from connection and execute it (query or simple command). break; } end_connection(thd); //Close an established connection end_thread: close_connection(thd); //Close a connection. 调用thd->disconnect(),进一步调用close_active_vio() if (MYSQL_CALLBACK_ELSE(thread_scheduler, end_thread, (thd, 1), 0)) //本例调用one_thread_per_connection_end,这里是关键点,线程可能加入到缓存中,并在此阻塞直到主线程唤醒。见后文第五部分。 return; // Probably no-threads /* If end_thread() returns, we are either running with thread-handler=no-threads or this thread has been schedule to handle the next connection. */ thd= current_thd; //这里thd将成为新的thd,这里的current_thd是宏,会调用my_pthread_getspecific_ptr(THD*,THR_THD),THR_THD为线程专有变量,由刚刚前面one_thread_per_connection_end()----->cache_thread()----->thd->store_globals()设置 (my_pthread_setspecific_ptr(THR_THD, this) || my_pthread_setspecific_ptr(THR_MALLOC, &mem_root)) thd->thread_stack= (char*) &thd; //至此这个线程好像什么也没发生一样,又可以继续处理新的连接发送过来的命令了 } }
四、do_command
Read one command from connection and execute it (query or simple command).
该函数主要是先使用my_net_read(net)读取一个packet,再用dispatch_command()加以解析。
实际上,mysql客户端在握手阶段(握手初始化包,认证包,OK包)之后,客户端会马上向服务器发送一个命令包(COM_QUERY命令,具体内容为"\003select @@version_comment limit 1"),服务器会将服务器版本发给客户端,客户端出现"mysql>"这样的命令提示符。之后该线程就会出现阻塞读,等待客户端的新的命令的到来,直到超时。
下面我们在客户端敲入exit命令,因为我想看看end_thread()做了些什么。毕竟do_command()解析命令部分十分复杂,只能在后篇再续。
服务器读取到命令,my_net_read()返回,实际上只收到了一个字节(不过一个字节足矣),就是命令号1(COM_QUIT命令)。
bool dispatch_command(enum enum_server_command command, THD *thd, char* packet, uint packet_length) { ... case COM_QUIT: /* We don't calculate statistics for this command */ general_log_print(thd, command, NullS); net->error=0; // Don't give 'abort' message thd->stmt_da->disable_status(); // Don't send anything back error=TRUE; // End server break; ... }dispatch_command()之后还做了一些清理工作,返回true,(注意设置thd->command=COM_SLEEP)。回答do_handle_one_connection中,break出循环,调用end_connection(),close_connection(),end_thread等。
五、end_thread——one_thread_per_connection_end
bool one_thread_per_connection_end(THD *thd, bool put_in_cache) { DBUG_ENTER("one_thread_per_connection_end"); unlink_thd(thd); //主要--connection_count; thread_count--; delete thd; if (put_in_cache) put_in_cache= cache_thread(); mysql_mutex_unlock(&LOCK_thread_count); if (put_in_cache) DBUG_RETURN(0); // Thread is reused 线程重用,所以从此处返回 /* It's safe to broadcast outside a lock (COND... is not deleted here) */ DBUG_PRINT("signal", ("Broadcasting COND_thread_count")); DBUG_LE***E; // Must match DBUG_ENTER() my_thread_end(); mysql_cond_broadcast(&COND_thread_count); pthread_exit(0); //这里线程退出 return 0; // Avoid compiler warnings }
这里看一下cache_thread():Store thread in cache for reuse by new connections
static bool cache_thread() { mysql_mutex_assert_owner(&LOCK_thread_count); if (cached_thread_count < thread_cache_size && //此处thread_cache_size为8 ! abort_loop && !kill_cached_threads) { /* Don't kill the thread, just put it in cache for reuse */ DBUG_PRINT("info", ("Adding thread to cache")); cached_thread_count++; #ifdef H***E_PSI_INTERFACE /* Delete the instrumentation for the job that just completed, before parking this pthread in the cache (blocked on COND_thread_cache). */ if (likely(PSI_server != NULL)) PSI_server->delete_current_thread(); #endif while (!abort_loop && ! wake_thread && ! kill_cached_threads) mysql_cond_wait(&COND_thread_cache, &LOCK_thread_count); //该线程阻塞在这里了,等待主线程的唤醒。 cached_thread_count--; //该线程被唤醒,从此处继续执行,这是wake_thread至少为1 if (kill_cached_threads) mysql_cond_signal(&COND_flush_thread_cache); if (wake_thread) { THD *thd; wake_thread--; thd= thread_cache.get(); //这里获得thd(从链表中取出),之前在create_thread_to_handle_connection里有thread_cache.push_back(thd); thd->thread_stack= (char*) &thd; // For store_globals,这个堆栈的管理看来需要好好研究下。 (void) thd->store_globals(); #ifdef H***E_PSI_INTERFACE /* Create new instrumentation for the new THD job, and attach it to this running pthread. */ if (likely(PSI_server != NULL)) { PSI_thread *psi= PSI_server->new_thread(key_thread_one_connection, thd, thd->thread_id); if (likely(psi != NULL)) PSI_server->set_thread(psi); } #endif /* THD::mysys_var::abort is associated with physical thread rather than with THD object. So we need to reset this flag before using this thread for handling of new THD object/connection. */ thd->mysys_var->abort= 0; thd->thr_create_utime= thd->start_utime= my_micro_time(); threads.append(thd); //将这个thd加到了threads这个链表中(threads应该是链接所有运行的线程的thd) return(1); } } return(0); }如果还记得本篇前面create_thread_to_handle_connection函数中,首先判断如果(cached_thread_count > wake_thread) ,则thread_cache.push_back(thd); wake_thread++; mysql_cond_signal(&COND_thread_cache); 从而唤醒一个线程,而无需再新建线程了。
补充:I_List<THD>的结构(这是一个链表):
class base_ilist { struct ilink *first; struct ilink last; inline void empty() { first= &last; last.prev= &first; } ... }; struct ilink { struct ilink **prev,*next; ... };下图为插入一个元素的情况,最后一个为last。注意prev指向前一个元素的next,即prev=&a->next。
相关文章推荐
- 线程ava.lang.OutOfMemoryError: unable to create new native thread
- JVM最多能创建多少个线程:unabletocreatenewnativethread
- 线程ava.lang.OutOfMemoryError: unable to create new native thread
- Java中OutOfMemoryError与unable to create new native thread(JVM创建大量线程)的关系
- 关于服务进程申请线程数量的限制unable to create new native thread
- JVM最多能创建多少个线程: unable to create new native thread
- Java中OutOfMemoryError与unable to create new native thread(JVM创建大量线程)的关系
- JVM最多能创建多少个线程: unable to create new native thread
- Java中OutOfMemoryError与unable to create new native thread(JVM创建大量线程)的关系
- Dubbo之——线程模型(Linux 用户线程数限制导致的 java.lang.OutOfMemoryError: unable to create new native thread 异常)
- java.lang.OutOfMemoryError: unable to create new native thread无法创建本地线程分析
- 记一次tomcat线程创建异常调优:unable to create new native thread
- hbase-创建连接报错 java.lang.OutOfMemoryError: unable to create new native thread
- JVM最多能创建多少个线程: unable to create new native thread
- JVM最多能创建多少个线程: unable to create new native thread
- Java中OutOfMemoryError与unable to create new native thread(JVM创建大量线程)的关系
- 记一次tomcat线程创建异常调优:unable to create new native thread
- Java中OutOfMemoryError与unable to create new native thread(JVM创建大量线程)的关系
- JVM最多能创建多少个线程: unable to create new native thread
- Can't create a new thread (errno 11) 解决办法 mysql无法连接