您的位置:首页 > 其它

对log进行多线程C处理

2016-04-01 00:00 323 查看
一、功能描述

对log进行数据筛取,用判断和调用函数的方式,获取record_time、function_name、Error_state、client_ip、tv_id、description等数据并插入到本地数据库中error_log表中

二、log模型

[2016-03-09 16:54:05:726] HttpSendLiveStream ERROR!!

[473 58.254.216.34:18297 TV4016]:: caught up by live stream! curr:17775[19886->19885]

[2016-03-09 16:54:07:219] LiveStateCheck:: TV6047 not received pkt for 15 seconds!!

[2016-03-09 16:54:07:951] LiveStateCheck:: TV6046 not received pkt for 15 seconds!!

[2016-03-09 16:54:08:105] HlsCheckStealingLink:: HLS Relay :client 209.126.64.42 is stealing!

修改后的log模型

[2016-03-10 14:52:32:187] SystemInit INFO1!!
ReadConfig ........

[2016-03-10 14:52:32:188] ReadConfig ERROR!!
uknown element pName AdvertisementChannel, content TV999999999

[2016-03-10 14:52:32:800] OpenBWFileList ERROR!!
open bwlist file[../../tools/BWList/BlackList/BlackList-0.ini] failed.

最终效果;



三、错误和修改

1.发送和接收MSG时,发现消息不全,或空

两个线程send、rcv MSG时,都要自己定义MSG结构体变量和分配内存
2.发送、接收少量消息后,程序挂掉





多线程下对消息队列进行操作时,要lock互斥锁,且必须的语句才锁上,减少互斥时间

msgsnd、msgrcv函数要指定IPC_NOWAIT函数控制位,表示不忽略0,操作失败时不等待而导致消息队列阻塞
3.msgid无效,提示消息队列不存在



msgget去获取msgid时,指定的key如果是用自定义的get_key去获取已经存在的消息队列,这个key如果不是指定的消息队列如果已经被销毁,则会出错;指定key为0,总是创建新的消息队列





把msgid当第四个参数通过pthread_create传入线程函数时,可能创建线程函数已经结束,而线程由于系统时间片分配问题未启动,msgid就null传入线程函数;提升msgid为全局变量

4.其他

创建线程后,应自己设置为死循环存在,来不断的处理数据

编译语句:gcc log_c_multithread.c -o work -I /usr/local/mysql/include -L /usr/local/mysql/lib/ -lmysqlclient

四、完整代码

https://github.com/CharlotteLock/Cool-Code/blob/master/%E5%AF%B9log%E8%BF%9B%E8%A1%8C%E5%A4%9A%E7%BA%BF%E7%A8%8Bc%E5%A4%84%E7%90%86.c

//Scriptname:log_analyse
//Author:charlotte
//date:2016/03/17
//Purpose: sleve log and connect save on database


/*defina head file*/
#include "stdio.h"
#include "stdlib.h"
#include "string.h"    /*use function about string*/
#include "mysql.h"    /*use mysql DB*/
#include "pthread.h"   /*use multi threads*/
#include "sys/ipc.h"   /*call message queue*/
#include "sys/time.h"   /*call sleep*/


#define MAX_LINE 1024
/*define variable*/
int sign=0,sign_exit=1;   /*sign to mark no.2error,sign_exit to mark thread1 exit*/
char log_name[MAX_LINE]="record_model.log";  /*no use*/
char log_path[MAX_LINE]="record_model.log";
int record_time = 0;  /*store YMD*/
int record_time_day = 0; /*store hms*/
char record_day_time[MAX_LINE];
char function_name[MAX_LINE];
int sid;
char IP[MAX_LINE];
char tv_id[MAX_LINE];
char description[MAX_LINE];
const char* error_status="ERROR";
const char mysqlServer[20] = "localhost";
int lines=0;   /*order to check source maybe occur error*/
int msgid;    /*set global variable to avoid lose*/
pthread_t thread[2];   /*define thread variable,two threads*/
pthread_mutex_t mut;   /*define mutex locks*/
typedef struct mymsg /*define msg struct */
{
/*long ltype;*/
char record_day_time[MAX_LINE];
char function_name[MAX_LINE];
char description[MAX_LINE];
}MSG;


/*performance function*/


int check_error( char buf[])
{
int i,j;
for ( i=0; buf[i] != '\0'; i++)
{
if ( buf[i] != error_status[0])
continue;
j = 0;
while ( error_status[j] != '\0' && buf[i+j] != '\0')
{
j++;
if ( error_status[j] != buf[i+j])
break;
}
if ( error_status[j] == '\0')
return 1;
}
return 0;
}
void save_error(char buf[])
{
int i,j,colon_count;
colon_count = 0;
for ( i=1; buf[i] != '\0'; i++)  /* skip no.1 char [*/
{
if ( buf[i-1] == '[')       /* save date time,begin*/
{
j = 0;
while ( buf[i+j] != ' ')  /*separate by blank*/
{
if ( buf[ i+j] != '-') /*gain date*/
record_time = record_time * 10 + buf[i+j]-48;   /*ASCII conversion*/
j++;
}
j++;
while ( buf[i+j] != ']') /* gain time*/
{
if ( buf[ i+j] != ':' && colon_count<3)
record_time_day = record_time_day * 10 + buf[i+j]-48;
else
colon_count++;
j++;
}
}
i=i+j+2; /*move to right step*/
j=0;
while ( buf[ i+j] != ' ') /* gain function name*/
{
function_name[j] = buf[i+j];
j++;
}
break;
}
}
void save_error2(char buf[])
{
int i,j;
for ( i=1; buf[i] != '\0'; i++)  /* skip no.1 char [*/
{
if ( buf[i-1] == '[')       /* save ip*/
{
j = 0;
while ( buf[i+j] != ' ') /* skip number*/
{
j++;
}
i=i+j+1;
j=0;
while ( buf[i+j] != ' ') /* save ip*/
{
IP[j] = buf[i+j];
j++;
}
i=i+j+1;
j=0;
while ( buf[i+j] != ']') /* save tv id */
{
tv_id[j] = buf[i+j];
j++;
}
}
j++;
for( ; buf[ i+j] != ' '; j++) /* move right step*/
continue;
i=i+j+1;
j=0;
for ( ; buf[i] != '\0'; i++)
{description[j] = buf[i]; /* save description*/
j++;}
}
}
void analyse_log(char buf[])
{
int length,sign_send=1;
MSG *new_msg;
new_msg=(MSG *)malloc(sizeof(MSG));
if ( sign == 0)     /*judge sign,half deal or full deal*/
{
if ( check_error(buf) == 0) /* search error ,not return*/
;
else                       /* process error line*/
{
save_error(buf);
sign=1;
}
}
else if ( sign == 1)  /* half deal,process ip line*/
{
save_error2(buf);  /* get complete data*/
sign=0;
sprintf(record_day_time,"%d%d",record_time,record_time_day); /* send data by message*/
strcpy(new_msg->record_day_time,record_day_time);
strcpy(new_msg->function_name,function_name);
strcpy(new_msg->description,description);
/*new_msg->ltype='L';*/
length = sizeof(MSG);
while(sign_send)         /*set sign,until send ok*/
{
pthread_mutex_lock(&mut);      /* mutex cpu locks*/
if(msgsnd(msgid,new_msg,length,IPC_NOWAIT)==0)
{sign_send=0;pthread_mutex_unlock(&mut);}
pthread_mutex_unlock(&mut);      /* unlock cpu */
}
printf("send ok.\n");        /*initialize data before send ok*/
record_time=0;
record_time_day=0;
memset(function_name,'\0',sizeof(function_name));
memset(IP,'\0',sizeof(IP));
memset(tv_id,'\0',sizeof(tv_id));
memset(description,'\0',sizeof(description));
}


}
void read_log()        /*read log and store the required data*/
{
char buf[MAX_LINE];
FILE *fp;
int len,i;
if ((fp = fopen(log_path,"r")) == NULL) /* continue before valid log*/
{
perror("fail to read");
exit(1);
}
i=1;
while(fgets(buf,MAX_LINE,fp) != NULL ) /* process log line by line*/
{
lines++;
len = strlen(buf);
buf[len-1] = '\0';
i++;
analyse_log(buf);   /* call function to do second step :analyse*/
}
pthread_mutex_lock(&mut);      /* mutex cpu locks*/
sign_exit=0;
pthread_mutex_unlock(&mut);
printf("read_thread exit,%d\n",sign_exit);
}


void *thread1()   /*define threads function1 to read log and get data*/
{
read_log();
}
void insert_into_DB()
{
int length;           /*use to count msg size*/
MSG *new_msg;          /*use to save received msg*/
new_msg=(MSG *)malloc(sizeof(MSG));
length = sizeof(MSG);
while(1)           /*dead circulation to receive msg and insert into DB*/
{
pthread_mutex_lock(&mut);      /* mutex cpu locks*/
if(msgrcv(msgid,new_msg,length,0,IPC_NOWAIT)==-1) /*MQid,msg vessel,msg type(no require),set to avoid MQ block;success return no negative num*/
{
pthread_mutex_unlock(&mut);     /*unlock MQ*/
if(sign_exit==0)break;      /*if no message and the sign show no continue send msg,exit*/
printf("no message,wait a monment.\n");  /*if no message,continue,wait*/
}
else
{
pthread_mutex_unlock(&mut);     /*unlock MQ*/
printf("rcv msg successful.\n");
char ibuf[1024];
MYSQL mysql,*sock;
mysql_init(&mysql);       /*initialize mysql handle*/
if ( !(sock=mysql_real_connect( &mysql,"127.0.0.1","root","123456","operation_monitor",3306,NULL,0))) /*return sock*/
{/*connect DB*/
fprintf(stderr,"Couldn't connect to engine!\n%s\n\n",mysql_error(&mysql));
perror(" connect mysql error!\n");
exit(1);
}/*save inserts statement into ibuf*/
sprintf( ibuf,"insert into error_log(id,function_name,description,record_time) values('','%s','%s','%s')",new_msg->function_name,new_msg->description,new_msg->record_day_time);
if ( mysql_query(sock,ibuf))  /*operate executive*/
{
printf("insert data error!i No.%d\n",lines);
}
else
{
printf("insert ok.\n");
}
mysql_close(&mysql);
mysql_close(sock);
}
}
printf("insert_thread exit.\n");
}
void *thread2()   /*define threads function2 to insert data into DB*/
{
insert_into_DB();
}


void thread_create()
{
pthread_create(&thread[0],NULL,thread1,NULL); /*create thread,but maybe threads no start,attention incomming local parameters lose*/
pthread_create(&thread[1],NULL,thread2,NULL);
}


void thread_wait()
{
pthread_join(thread[0],NULL);  /*wait thread finish*/
pthread_join(thread[1],NULL);
}


int get_key()
{
int key;
key=ftok(".",'s');
return key;
}


/*function main*/
int main()
{
/*MSG *msg;       /define about msg no use/
key_t key;       /define key to get existed message queue/
key=get_key();
if(key<0)
{
perror("get key error.");
exit(1);
}*/
msgid=msgget(0,IPC_CREAT|0644);  /*get a new MQ*/
if(msgid<0)
{
perror("msgget error.");
exit(1);
}
printf("get msg success\n");
/*msg=(MSG*)malloc(sizeof(MSG));
if(msg==NULL)
{
perror("malloc error.");
exit(1);
}
*/
pthread_mutex_init(&mut,NULL);     /*initialize mutex locks variable mut*/
thread_create();     /*call function to create thread*/
thread_wait();      /*wait thread run end*/
return 0;
}




修改的,修改了名称,匹配的log模型,按条,正则匹配

//Scriptname:log_analyse
//Author:charlotte
//date:2016/03/17
//Purpose: sleve log and connect save on database

/*defina head file*/
#include "stdio.h"
#include "stdlib.h"
#include "string.h"    /*use function about string*/
#include "mysql.h"    /*use mysql DB*/
#include "pthread.h"   /*use multi threads*/
#include "sys/ipc.h"   /*call message queue*/
#include "sys/time.h"   /*call sleep*/
#include "regex.h"
#include "memory.h"

#define MAX_LINE 1024
/*define variable*/
int sign=0,sign_exit=1;   /*sign to mark no.2error,sign_exit to mark thread1 exit*/
char log_path[MAX_LINE]="record.log";
char record_day_time[MAX_LINE];
char function_name[MAX_LINE];
int sid;
char description[MAX_LINE];
const char mysqlServer[20] = "127.0.0.1";
const char mysqlUser[20] = "root";
const char mysqlPasswd[20] = "123456";
const char mysqlDBName[20] = "operation_monitor";
int lines=0;   /*order to check source maybe occur error*/
int msgid;    /*set global variable to avoid lose*/
pthread_t thread[2];   /*define thread variable,two threads*/
pthread_mutex_t mut;   /*define mutex locks*/
typedef struct mymsg /*define msg struct */
{
char record_day_time[MAX_LINE];
char function_name[MAX_LINE];
char description[MAX_LINE];
}MSG;

/*performance function*/

int check_error( char buf[])
{
char errbuf[1024];
regex_t reg;
int err,pmatch_size=10;
regmatch_t pmatch[pmatch_size];       //Regular Expression ERROR
if ( regcomp( ®, "(.*)ERROR(.*)",REG_EXTENDED|REG_ICASE) < 0)   //compile RE
{
regerror( err,®, errbuf, sizeof( errbuf));
printf ( "err:%s\n", errbuf);
}
err = regexec( ®, buf, pmatch_size, pmatch, 0);  // RE object string
if ( err == REG_NOMATCH)
{
return 0;
}
else if ( err)
{
regerror( err, ®, errbuf, sizeof(errbuf));
printf ( "err:%s\n",errbuf);
}
else
{
printf ("match ok.\n");
return 1;
}
}
int save_error(char buf[])
{
char record_day_time_mid[1024];
char errbuf[1024];
regex_t reg;
int err,pmatch_size=10,i,mid_size;
regmatch_t pmatch[pmatch_size];    // RE ERROR and save data that need
if ( regcomp( ®, "\\[(.*)-(.*)-(.*)[ ](.*):(.*):(.*):(.*)\\](.*) ERROR(.*)",REG_EXTENDED|REG_ICASE) < 0)
{
regerror( err,®, errbuf, sizeof( errbuf));
printf ( "err:%s\n", errbuf);
}
err = regexec( ®, buf, pmatch_size, pmatch, 0);
if ( err == REG_NOMATCH)
{
return 0;
}
else if ( err)
{
regerror( err, ®, errbuf, sizeof(errbuf));
printf ( "err:%s\n",errbuf);
}
for ( i=0; i<pmatch_size && pmatch[i].rm_so != -1; i++)  // read buf by point_match
{
int se_len = pmatch[i].rm_eo - pmatch[i].rm_so;
if( se_len && i >=1 && i <=6)
{
memset( record_day_time_mid,'\0',sizeof(record_day_time_mid));
memcpy( record_day_time_mid, buf+pmatch[i].rm_so, se_len);
mid_size = sizeof(record_day_time_mid);
strncat( record_day_time, record_day_time_mid, mid_size);
}
else if( se_len && i == 8)
{
memset( function_name, '\0', sizeof( function_name));
memcpy( function_name, buf+pmatch[i].rm_so, se_len);
}
}
}
void save_error2(char buf[])
{
char errbuf[1024];
regex_t reg;
int err,pmatch_size=10,i,buf_size;
regmatch_t pmatch[pmatch_size];   // RE ERROR next part
if ( regcomp( ®, "^\\[(.*)\\](.*)",REG_EXTENDED|REG_ICASE) < 0)  // end by next record
{
regerror( err,®, errbuf, sizeof( errbuf));
printf ( "err:%s\n", errbuf);
}
err = regexec( ®, buf, pmatch_size, pmatch, 0);
if ( err == REG_NOMATCH)
{
buf_size = strlen( buf);
strncat( description, buf, buf_size);
}
else if ( err)
{
regerror( err, ®, errbuf, sizeof(errbuf));
printf ( "err:%s\n",errbuf);
}
else
{
sign = 0;      // set sign ,a full record
}
}
int analyse_log(char buf[])
{
int length,sign_send=1;
MSG *new_msg;
new_msg=(MSG *)malloc(sizeof(MSG));
if ( sign == 0)     /*judge sign,half deal or full deal*/
{
if ( check_error(buf) == 0) /* search error ,not return*/
;
else                       /* process error line*/
{
save_error(buf);
sign=1;
}
}
else if ( sign == 1)  /* half deal,process ip line*/
{
save_error2(buf);  /* get complete data*/
if ( sign == 1)
return 0;
strcpy(new_msg->record_day_time,record_day_time);
strcpy(new_msg->function_name,function_name);
strcpy(new_msg->description,description);
length = sizeof(MSG);
while(sign_send)         /*set sign,until send ok*/
{
pthread_mutex_lock(&mut);      /* mutex cpu locks*/
if(msgsnd(msgid,new_msg,length,IPC_NOWAIT)==0)
{sign_send=0;pthread_mutex_unlock(&mut);}
else pthread_mutex_unlock(&mut);      /* unlock cpu */
}
printf("send ok.\n");        /*initialize data before send ok*/
memset(function_name,'\0',sizeof(function_name));
memset(record_day_time,'\0',sizeof(record_day_time));
memset(description,'\0',sizeof(description));
analyse_log(buf);
}

}
void read_log()        /*read log and store the required data*/
{
char buf[MAX_LINE];
FILE *fp;
int len,i;
if ((fp = fopen(log_path,"r")) == NULL) /* continue before valid log*/
{
perror("fail to read");
exit(1);
}
i=1;
while(fgets(buf,MAX_LINE,fp) != NULL ) /* process log line by line*/
{
lines++;
len = strlen(buf);
buf[len-1] = '\0';
i++;
analyse_log(buf);   /* call function to do second step :analyse*/
}
pthread_mutex_lock(&mut);      /* mutex cpu locks*/
sign_exit=0;
pthread_mutex_unlock(&mut);
printf("read_thread exit,%d\n",sign_exit);
}

void *read_log_analyse()   /*define threads function1 to read log and get data*/
{
read_log();
}
void insert_into_DB()
{
int length;           /*use to count msg size*/
MSG *new_msg;          /*use to save received msg*/
new_msg=(MSG *)malloc(sizeof(MSG));
length = sizeof(MSG);
while(1)           /*dead circulation to receive msg and insert into DB*/
{
pthread_mutex_lock(&mut);      /* mutex cpu locks*/
if(msgrcv(msgid,new_msg,length,0,IPC_NOWAIT)==-1) /*MQid,msg vessel,msg type(no require),set to avoid MQ block;success return no negative num*/
{
pthread_mutex_unlock(&mut);     /*unlock MQ*/
if(sign_exit==0)break;      /*if no message and the sign show no continue send msg,exit*/
printf("no message,wait a monment.\n");  /*if no message,continue,wait*/
sleep(1);
}
else
{
pthread_mutex_unlock(&mut);     /*unlock MQ*/
printf("rcv msg successful.\n");
char ibuf[1024];
MYSQL mysql,*sock;
mysql_init(&mysql);       /*initialize mysql handle*/
if ( !(sock=mysql_real_connect( &mysql,"127.0.0.1","root","123456","operation_monitor",3306,NULL,0))) /*return sock*/
{/*connect DB*/
fprintf(stderr,"Couldn't connect to engine!\n%s\n\n",mysql_error(&mysql));
perror(" connect mysql error!\n");
exit(1);
}/*save inserts statement into ibuf*/
sprintf( ibuf,"insert into error_log(id,function_name,description,record_time) values('','%s','%s','%s')",new_msg->function_name,new_msg->description,new_msg->record_day_time);
if ( mysql_query(sock,ibuf))  /*operate executive*/
{
printf("insert data error!i No.%d\n",lines);
}
else
{
printf("insert ok.\n");
}
mysql_close(&mysql);
mysql_close(sock);
}
}
printf("insert_thread exit.\n");
}
void *insert_data_DB()   /*define threads function2 to insert data into DB*/
{
insert_into_DB();
}

void thread_create()
{
pthread_create(&thread[0],NULL,read_log_analyse,NULL); /*create thread,but maybe threads no start,attention incomming local parameters lose*/
pthread_create(&thread[1],NULL,insert_data_DB,NULL);
}

void thread_wait()
{
pthread_join(thread[0],NULL);  /*wait thread finish*/
pthread_join(thread[1],NULL);
}

int get_key()
{
int key;
key=ftok(".",'s');
return key;
}

/*function main*/
int main()
{
msgid=msgget(0,IPC_CREAT|0644);  /*get a new MQ*/
if(msgid<0)
{
perror("msgget error.");
exit(1);
}
printf("get msg success\n");
pthread_mutex_init(&mut,NULL);     /*initialize mutex locks variable mut*/
thread_create();     /*call function to create thread*/
thread_wait();      /*wait thread run end*/
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息