您的位置:首页 > 数据库 > MySQL

mysql多线程问题

2016-09-19 14:53 309 查看


mysql多线程问题

By Cnangel on June
30, 2015 2:11 PM | No Comments

mysql多线程处理不好,经常会发生coredump,见使用Mysql出core一文。


单线程

一般情况下,单线程连接mysql代码如下:
/*
single_thread_mysql_client.cpp
*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <mysql/mysql.h>
#include <pthread.h>
#include <unistd.h>

#define DBHOST      "localhost"
#define DBUSER      "pca"
#define DBPASS      "pca"
#define DBPORT      3306
#define DBNAME      "dxponline"
#define DBSOCK      NULL //"/var/lib/mysql/mysql.sock"
#define DBPCNT      0

int main()
{
MYSQL_RES *result;
MYSQL_ROW row;
MYSQL_FIELD *field;
unsigned int num_fields;
unsigned int i;
const char *pStatement = "SHOW TABLES";
mysql_library_init(0, NULL, NULL);
MYSQL *mysql = mysql_init(NULL);
unsigned int timeout = 3000;
mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);

if (mysql_real_connect(mysql, DBHOST, DBUSER, DBPASS, DBNAME, DBPORT, DBSOCK, DBPCNT) == NULL)
{
printf("connect failed: %s\n", mysql_error(mysql));
mysql_close(mysql);
mysql_library_end();
return 0;
}

printf("connect succssfully\n");

if (0 != mysql_real_query(mysql, pStatement, strlen(pStatement)))
{
printf("query failed: %s\n", mysql_error(mysql));
mysql_close(mysql);
mysql_library_end();
return 0;
}

result = mysql_store_result(mysql);

if (result == NULL)
{
printf("fetch result failed: %s\n", mysql_error(mysql));
mysql_close(mysql);
mysql_library_end();
return 0;
}

num_fields = mysql_num_fields(result);
printf("numbers of result: %d\n", num_fields);

while (NULL != (field = mysql_fetch_field(result)))
{
printf("field name: %s\n", field->name);
}

while (NULL != (row = mysql_fetch_row(result)))
{
unsigned long *lengths;
lengths = mysql_fetch_lengths(result);

for (i = 0; i < num_fields; i++)
{
printf("{%.*s} ", (int) lengths[i], row[i] ? row[i] : "NULL");
}

printf("\n");
}

mysql_free_result(result);
mysql_close(mysql);
mysql_library_end();
return 0;
}


执行

make single_thread_mysql_client LDFLAGS="-g -O2 -L/usr/lib64/mysql -lmysqlclient -lpthread -lz -lm -lssl -lcrypto -ldl"


即可获得对应单线程二进制。


多线程

多线程主要需要注意以下几点
mysql_library_init
 和 
mysql_library_end
 需要放入主线程;
连接句柄需要多个才能加快并发,而连接句柄由 
mysql_init
 生成,而 
mysql_init
跟随机函数 
rand
 有点相似,第一次需要初始化后才能线程安全,所以需要使用
mysql_thread_init
 和 
mysql_thread_end
 两个函数来保证线程安全;

一般多线程连接mysql代码如下
/*
muti_thread_mysql_client.cpp
*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <mysql/mysql.h>
#include <pthread.h>
#include <unistd.h>

#define THREAD_NUM  4
#define DBHOST      "localhost"
#define DBUSER      "pca"
#define DBPASS      "pca"
#define DBPORT      3306
#define DBNAME      "dxponline"
#define DBSOCK      NULL //"/var/lib/mysql/mysql.sock"
#define DBPCNT      0

typedef struct ThreadArgsST
{
int id;
pthread_t *thread_id;
} ThreadArgs;

void *func(void *arg)
{
ThreadArgs *args = (ThreadArgs *)arg;
MYSQL_RES *result;
MYSQL_ROW row;
MYSQL_FIELD *field;
unsigned int num_fields;
unsigned int i;
unsigned int timeout = 3000;
const char *pStatement = "SHOW TABLES";
mysql_thread_init();
MYSQL *mysql = mysql_init(NULL);

if (mysql == NULL)
{
printf("[%ld][%d]mysql init failed: %s\n", *args->thread_id, args->id, mysql_error(mysql));
return (void *)0;
}

mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);

if (mysql_real_connect(mysql, DBHOST, DBUSER, DBPASS, DBNAME, DBPORT, DBSOCK, DBPCNT) == NULL)
{
printf("[%ld][%d]connect failed: %s\n", *args->thread_id, args->id, mysql_error(mysql));
mysql_close(mysql);
mysql_thread_end();
return (void *)0;
}

printf("[%ld][%d]connect succssfully\n", *args->thread_id, args->id);

if (0 != mysql_real_query(mysql, pStatement, strlen(pStatement)))
{
printf("[%ld][%d]query failed: %s\n", *args->thread_id, args->id, mysql_error(mysql));
mysql_close(mysql);
mysql_thread_end();
return (void *)0;
}

result = mysql_store_result(mysql);

if (result == NULL)
{
printf("[%ld][%d]fetch result failed: %s\n", *args->thread_id, args->id, mysql_error(mysql));
mysql_close(mysql);
mysql_thread_end();
return (void *)0;
}

num_fields = mysql_num_fields(result);
printf("[%ld][%d]numbers of result: %d\n", *args->thread_id, args->id, num_fields);

while (NULL != (field = mysql_fetch_field(result)))
{
printf("[%ld][%d]field name: %s\n", *args->thread_id, args->id, field->name);
}

while (NULL != (row = mysql_fetch_row(result)))
{
unsigned long *lengths;
lengths = mysql_fetch_lengths(result);

for (i = 0; i < num_fields; i++)
{
printf("[%ld][%d]{%.*s} ", *args->thread_id, args->id, (int) lengths[i], row[i] ? row[i] : "NULL");
}

printf("\n");
}

mysql_free_result(result);
mysql_close(mysql);
mysql_thread_end();
return (void *)0;
}

int main(int argc, char *argv[])
{
int thread_num;

if (argc == 2)
{
thread_num = atoi(argv[1]);
}
else
{
thread_num = THREAD_NUM;
}

mysql_library_init(0, NULL, NULL);
printf("argc: %d and thread_num: %d\n", argc, thread_num);

do
{
pthread_t *pTh = new pthread_t[thread_num];
ThreadArgs *pArgs = new ThreadArgs[thread_num];
int i;

for (i = 0; i < thread_num; i ++)
{
pArgs[i].id = i;
pArgs[i].thread_id = &pTh[i];

if (0 != pthread_create(&pTh[i], NULL, func, &pArgs[i]))
{
printf("pthread_create failed\n");
continue;
}
}

for (i = 0; i < thread_num; i ++)
{
pthread_join(pTh[i], NULL);
}

delete[] pTh;
delete[] pArgs;
}
while (0);

mysql_library_end();
return 0;
}


执行

make muti_thread_mysql_client LDFLAGS="-g -O2 -L/usr/lib64/mysql -lmysqlclient -lpthread -lz -lm -lssl -lcrypto -ldl"


即可获得对应单线程二进制。

连接数与连接句柄是一一对应关系,故一般使用长连接,所以需要连接池,所以上面的代码可以有优化的空间,代码见:

/*
muti_thread_mysql_client_pool.cpp
*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <mysql/mysql.h>
#include <pthread.h>
#include <unistd.h>
#include <queue>
#include <string>

#define THREAD_NUM  4
#define DBHOST      "localhost"
#define DBUSER      "pca"
#define DBPASS      "pca"
#define DBPORT      3306
#define DBNAME      "dxponline"
#define DBSOCK      NULL //"/var/lib/mysql/mysql.sock"
#define DBPCNT      0

using namespace std;

class CBlockQueue;
typedef struct ThreadArgsST
{
int id;
pthread_t *thread_id;
CBlockQueue *pQueue;
} ThreadArgs;

class CMutex
{
public:
CMutex()
{
pthread_mutex_init(&_mutex, NULL);
}
~CMutex()
{
pthread_mutex_destroy(&_mutex);
}

int32_t lock()
{
return pthread_mutex_lock(&_mutex);
}

int32_t unlock()
{
return pthread_mutex_unlock(&_mutex);
}

int32_t trylock()
{
return pthread_mutex_trylock(&_mutex);
}

private:
pthread_mutex_t _mutex;
};

class CGlobalFunction
{
public:
static MYSQL *connect()
{
unsigned int timeout = 3000;
mysql_thread_init();
MYSQL *mysql = mysql_init(NULL);

if (mysql == NULL)
{
printf("mysql init failed: %s\n", mysql_error(mysql));
return NULL;
}

mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);

if (mysql_real_connect(mysql, DBHOST, DBUSER, DBPASS, DBNAME, DBPORT, DBSOCK, DBPCNT) == NULL)
{
printf("connect failed: %s\n", mysql_error(mysql));
mysql_close(mysql);
mysql_thread_end();
return NULL;
}

printf("connect succssfully\n");
return mysql;
}
};

class CBlockQueue : public CMutex
{
public:
CBlockQueue() : _size(512)
{
}
~CBlockQueue()
{
}
void set_size(int size)
{
_size = size;
}
int size()
{
this->lock();
int size = q.size();
this->unlock();
return size;
}
bool push(void *m)
{
this->lock();
// TODO
/*
if (q.size() > _size)
{
this->unlock();
fprintf(stderr, "[QUEUE_IS_FULL]queue size over limit from push: %d\n", _size);
return false;
}
*/
q.push(m);
this->unlock();
return true;
}

void *pop()
{
this->lock();

if (q.empty())
{
this->unlock();
fprintf(stderr, "[QUEUE_IS_EMPTY]queue is no item from pop");
return NULL;
}

void *m = q.front();
q.pop();
this->unlock();
return m;
}

private:
queue q;
int _size;
};

void *func(void *arg)
{
ThreadArgs *args = (ThreadArgs *)arg;
MYSQL_RES *result;
MYSQL_ROW row;
MYSQL_FIELD *field;
bool pushed = true;
unsigned int num_fields;
unsigned int i;
const char *pStatement = "SHOW TABLES";
MYSQL *db = (MYSQL *)args->pQueue->pop();

if (db == NULL)
{
db = CGlobalFunction::connect();

if (db == NULL)
{
printf("[%ld][%d]mysql connect failed\n", *args->thread_id, args->id);
return (void *)0;
}
}

if (0 != mysql_real_query(db, pStatement, strlen(pStatement)))
{
printf("[%ld][%d]query failed: %s\n", *args->thread_id, args->id, mysql_error(db));
args->pQueue->push(db);
return (void *)0;
}

result = mysql_store_result(db);

if (result == NULL)
{
printf("[%ld][%d]fetch result failed: %s\n", *args->thread_id, args->id, mysql_error(db));
args->pQueue->push(db);
return (void *)0;
}

num_fields = mysql_num_fields(result);
printf("[%ld][%d]numbers of result: %d\n", *args->thread_id, args->id, num_fields);

while (NULL != (field = mysql_fetch_field(result)))
{
printf("[%ld][%d]field name: %s\n", *args->thread_id, args->id, field->name);
}

while (NULL != (row = mysql_fetch_row(result)))
{
unsigned long *lengths;
lengths = mysql_fetch_lengths(result);

for (i = 0; i < num_fields; i++)
{
printf("[%ld][%d]{%.*s} ", *args->thread_id, args->id, (int) lengths[i], row[i] ? row[i] : "NULL");
}

printf("\n");
}

mysql_free_result(result);
args->pQueue->push(db);
return (void *)0;
}

int main(int argc, char *argv[])
{
CBlockQueue queue;
int thread_num;

if (argc == 2)
{
thread_num = atoi(argv[1]);
}
else
{
thread_num = THREAD_NUM;
}

mysql_library_init(0, NULL, NULL);
printf("argc: %d and thread_num: %d\n", argc, thread_num);

do
{
int i;
pthread_t *pTh = new pthread_t[thread_num];
ThreadArgs *pArgs = new ThreadArgs[thread_num];

for (i = 0; i < thread_num; i ++)
{
pArgs[i].id = i;
pArgs[i].thread_id = &pTh[i];
pArgs[i].pQueue = &queue;

if (0 != pthread_create(&pTh[i], NULL, func, &pArgs[i]))
{
printf("pthread_create failed\n");
continue;
}
}

for (i = 0; i < thread_num; i ++)
{
pthread_join(pTh[i], NULL);
}

delete[] pTh;
delete[] pArgs;
int qsize = queue.size();

for (i = 0; i < qsize; i ++)
{
MYSQL *db = (MYSQL *)queue.pop();

if (NULL != db)
{
mysql_close(db);
mysql_thread_end();
}
}
}
while (0);

mysql_library_end();
return 0;
}


执行

make muti_thread_mysql_client_pool LDFLAGS="-g -O2 -L/usr/lib64/mysql -lmysqlclient -lpthread -lz -lm -lssl -lcrypto -ldl"


即可获得对应单线程二进制。

上述代码就是利用队列来保持mysql连接,达到优化连接数。


总结

mysql连接与多线程处理不好,可能会造成很多问题,如
*MySQL Connection failed (#2058): This handle is already connected. Use a separate handle for each connection.*

Error in my_thread_global_end(): 1 threads didn't exit

甚至出现coredump

关于多线程连接mysql优化的思想,其实可以扩展到其他连接,如HTTP、Socket等连接中;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: