您的位置:首页 > 编程语言 > C语言/C++

C语言实现类workerman的功能

2016-08-11 19:14 323 查看
闲来无聊,使用C/C++写了一个类似PHP下的workerman网络框架库

文件目录结构:

pub/

    pub.h                 //公共方法头文件

    pub.cpp            //公共方法实现文件

    pub.conf           //日志路径,master pid保存路径,可配置最大错误信息长度,消息头长度,消息体长度(但消息头的前三个字段含义是不变的)

                     header[0]     如果为0,则表示客户刚连接上    如果为1,则表示客户已经连接后端服务

                     header[1]     源用户UID

                     header[2]     目的用户UID

worker/

    worker.h           //worker类的基本定义

    worker.cpp      //worker类 的具体实现

                      setWorkerProcess成员函数:用于定义启用的进程数量

                      msgHandler成员函数:回调方法,用于处理用户发送过来的消息,目前功能比较简单

启动方法:worker start

停止方法:worker stop

重启方法:worker restart

(目前不支持reload)

基本配置如下:

[PUB]
MAX_ERR_MSG_LEN:1024
ERR_LOG_PATH:/home/solariens/cpp/log/err.log
MSG_HEADER_MAX_LEN:8
MSG_BODY_MAX_SIZE:1024
[WORKER]
MAX_EPOLL_SIZE:512
MASTER_PID_PATH:/home/solariens/cpp/run/master.pid
COMMAND_MAX_LEN:32

基本使用方法如下:

main.cpp

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include "worker/worker.h"
#include "pub/pub.h"

int recvMsg(struct _msg msg, int size, int fd) {
memset(msg.body, 0, sizeof(msg.body));
strcpy(msg.body, "this is a framework of the network!");
send(fd, (char *)&msg, size, 0);
return -1;
}

int main(int argc, char *argv[]) {
int port = 8081;
int backlog = 20;
Worker worker(port, backlog, argv[1]);
worker.msgHandler = recvMsg;
worker.setWorkerProcess(4);
Worker::runAll(worker);
return 0;
}


具体源码如下:
pub.h

#ifndef PUB_H_

#define PUB_H_

struct _msg {
char header[HEADER_MAX_LEN];
char body[BODY_MAX_SIZE];
};

struct _conf {
int max_err_msg_len;
char err_log_path[64];
int msg_header_max_len;
int msg_body_max_size;
int max_epoll_size;
char master_pid_path[64];
int command_max_len;
};

extern struct _conf *glb_conf;

void write_log(const char *);

void create_dir(const char *);

int socket_create(int port, int backlog=20);

void socket_setnonblock(int);

struct _conf * init_conf();

#endif


pub.cpp
#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <libgen.h>
#include "pub.h"

struct _conf *glb_conf = init_conf();

void write_log(const char *errmsg) {

if (errmsg == NULL) {
return;
}

time_t t;
if (time(&t) == -1) {
printf("%s\n", strerror(errno));
return;
}

struct tm *date = localtime(&t);
if (date == NULL) {
printf("%s\n", strerror(errno));
return;
}

int year = date->tm_year;
int mon = date->tm_mon;
int day = date->tm_mday;
int hour = date->tm_hour;
int min = date->tm_min;
int sec = date->tm_sec;

char buf[glb_conf->max_err_msg_len];
sprintf(buf, "[%04d-%02d-%02d %02d:%02d:%02d] %s\n", year, mon, day, hour, min, sec, errmsg);
create_dir(glb_conf->err_log_path);
FILE *fp = fopen(glb_conf->err_log_path, "a");
if (fp == NULL) {
printf("%s\n", strerror(errno));
return;
}
fwrite(buf, 1, strlen(buf), fp);
fclose(fp);
return;
}

void create_dir(const char *dirpath) {
char *dirc = strdup(dirpath);
char *dname = dirname(dirc);
if (access(dname, F_OK) == -1) {
mkdir(dname, 0666);
}
}

int socket_create(int port, int backlog) {
int server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd == -1) {
write_log(strerror(errno));
return -1;
}
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(server_fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
write_log(strerror(errno));
return -1;
}
if (listen(server_fd, backlog) == -1) {
write_log(strerror(errno));
return -1;
}
return server_fd;
}

void socket_setnonblock(int fd) {
int flag = fcntl(fd, F_GETFL);
if (flag == -1) {
write_log(strerror(errno));
return;
}
flag |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flag) == -1) {
write_log(strerror(errno));
return;
}
}

struct _conf *init_conf() {
char cur_path[64];
memset(cur_path, 0, sizeof(cur_path));
if (getcwd(cur_path, sizeof(cur_path)) == NULL) {
printf("get the current path failed\n");
exit(0);
}
strcat(cur_path, "/pub/pub.conf");
struct _conf *conf = new _conf;
FILE *fp = fopen(cur_path, "r");
char buf[64], key[32];
char *tmp_str;
int len, tmp_len;
memset(buf, 0, sizeof(buf));
memset(key, 0, sizeof(key));
while (fgets(buf, sizeof(buf), fp)) {
len = strlen(buf);
if (buf[len - 1] == '\n') {
buf[len - 1] = '\0';
}
tmp_str = strchr(buf, ':');
if (tmp_str == NULL) {
continue;
}
tmp_len = strlen(tmp_str);
memcpy(key, buf, len - tmp_len - 1);
if (!strcmp(key, "MAX_ERR_MSG_LEN")) {
conf->max_err_msg_len = atoi(++tmp_str);
} else if (!strcmp(key, "ERR_LOG_PATH")) {
strcpy(conf->err_log_path, ++tmp_str);
} else if (!strcmp(key, "MSG_HEADER_MAX_LEN")) {
conf->msg_header_max_len = atoi(++tmp_str);
} else if (!strcmp(key, "MSG_BODY_MAX_SIZE")) {
conf->msg_body_max_size = atoi(++tmp_str);
} else if (!strcmp(key, "MAX_EPOLL_SIZE")) {
conf->max_epoll_size = atoi(++tmp_str);
} else if (!strcmp(key, "MASTER_PID_PATH")) {
strcpy(conf->master_pid_path, ++tmp_str);
} else if (!strcmp(key, "COMMAND_MAX_LEN")) {
conf->command_max_len = atoi(++tmp_str);
}
memset(buf, 0, sizeof(buf));
memset(key, 0, sizeof(key));
}
fclose(fp);
return conf;
}


worker.h
#ifndef WORKER_H_

#define WORKER_H_

#define SHUT_DOWN 0

#define START_UP 1

class Worker{
private:
static int socket_fd;
static int worker_process;
static int *pids;
static int start_up;
static pid_t master_pid;
static int epfd;
int worker_count;
char command[COMMAND_MAX_LEN];
void stopProc();
static int socket_client[MAX_EPOLL_SIZE];
int recvMsg(int);
void parseCommand();
void run();
public:
Worker(int, int, const char *);
void setDeamon();
void setSignal();
static void signalHandler(int);
void savePid2File();
void setWorkerProcess(int);
void forkWorker();
void listen();
void monitorWorker();
int (*msgHandler)(struct _msg, int, int);
static void runAll(Worker &);
};

#endif
worker.cpp
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/stat.h>
#include <signal.h>
#include <sys/epoll.h>
#include <wait.h>
#include "worker.h"
#include "../pub/pub.h"

int Worker::worker_process = 0;
int *Worker::pids = NULL;
int Worker::start_up = START_UP;
pid_t Worker::master_pid = 0;
4000
int Worker::socket_fd = 0;
int Worker::epfd = 0;
int Worker::socket_client[MAX_EPOLL_SIZE] = {0};

Worker::Worker(int port, int backlog, const char *cmd) {
if (port <= 0 || backlog <=0 || cmd == NULL) {
write_log("the parameters of constructor is error!");
exit(0);
}
worker_count = 0;
memset(command, 0, sizeof(command));
strcpy(command, cmd);
parseCommand();
socket_fd = socket_create(port, backlog);
if (socket_fd == -1) {
exit(0);
}
socket_setnonblock(socket_fd);
}

void Worker::parseCommand() {
int ret = access(glb_conf->master_pid_path, F_OK);
if (!strcmp(command, "start")) {
if (!ret){
write_log("the process is running !");
exit(0);
}
} else if (!strcmp(command, "stop")) {
if (ret == -1) {
write_log("the process is not running !");
exit(0);
}
stopProc();
exit(0);
} else if(!strcmp(command, "restart")) {
if (!ret) {
stopProc();
}
write_log("the process is restarting....!");
}
}

void Worker::stopProc() {
FILE *fp = fopen(glb_conf->master_pid_path, "r");
if (fp == NULL) {
write_log(strerror(errno));
exit(0);
}
char buf[12];
memset(buf, 0, sizeof(buf));
if (fread(buf, 1, sizeof(buf), fp) <= 0) {
write_log(strerror(errno));
exit(0);
}
fclose(fp);
pid_t tmp_pid = atoi(buf);
write_log("the process is stoping.....!");
if (kill(tmp_pid, SIGINT) == -1) {
write_log(strerror(errno));
}
}

void Worker::setDeamon() {

pid_t pid = fork();

if (pid < 0) {
write_log(strerror(errno));
return;
} else if (pid > 0) {
exit(0);
} else {
if (setsid() == -1) {
write_log(strerror(errno));
exit(0);
}
chdir("/");
umask(0);
close(STDIN_FILENO);
close(STDOUT_FILENO);
close(STDERR_FILENO);
}
}

void Worker::savePid2File() {
create_dir(glb_conf->master_pid_path);
FILE *fp = fopen(glb_conf->master_pid_path, "w");
if (fp == NULL) {
write_log(strerror(errno));
return;
}
master_pid = getpid();
char buf[10];
sprintf(buf, "%d", master_pid);
fwrite(buf, 1, strlen(buf), fp);
fclose(fp);
}

void Worker::listen() {
struct epoll_event event;
epfd = epoll_create(glb_conf->max_epoll_size);
if (epfd == -1) {
write_log(strerror(errno));
return;
}
memset(&event, 0, sizeof(event));
event.events = EPOLLIN | EPOLLERR | EPOLLHUP;
event.data.fd = socket_fd;
int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, socket_fd, &event);
if (ret == -1) {
write_log(strerror(errno));
exit(0);
}
}

void Worker::run() {

struct epoll_event event, events[glb_conf->max_epoll_size];
memset(&event, 0, sizeof(event));
int active_num = 0, i = 0;

while(1) {
active_num = epoll_wait(epfd, events, glb_conf->max_epoll_size, -1);
for(i=0; i<active_num; ++i) {
if (events[i].data.fd == socket_fd) {
struct sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(client_addr));
socklen_t addr_len = sizeof(client_addr);
int client_fd = accept(socket_fd, (struct sockaddr *)&client_addr, &addr_len);
if (client_fd == -1) {
write_log(strerror(errno));
exit(0);
}
socket_setnonblock(client_fd);
event.data.fd = client_fd;
event.events = EPOLLIN | EPOLLERR | EPOLLHUP;
int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &event);
if (ret == -1) {
write_log(strerror(errno));
/*if epoll ctl failed, then send the SIGINT signal to master process and close all resources*/
kill(master_pid, SIGINT);
exit(0);
}
continue;
} else if (events[i].events & EPOLLERR || events[i].events & EPOLLHUP) {
events[i].data.fd = -1;
continue;
} else {
int tmp_fd = events[i].data.fd;
int ret_uid = recvMsg(tmp_fd);
if (ret_uid < 0) {
close(tmp_fd);
} else {
socket_client[ret_uid] = 0;
}
events[i].data.fd = -1;
}
}
}
}

int Worker::recvMsg(int fd) {
struct _msg msg;
memset(&msg, 0, sizeof(msg));
ssize_t size = recv(fd, (char *)&msg, sizeof(msg), 0);
if(size < 0) {
return -1;
}
switch(msg.header[0]) {
case 0:
socket_client[msg.header[1]] = fd;
break;
case 1:
return msgHandler(msg, size, socket_client[msg.header[2]]);
break;
}
return 0;
}

void Worker::setWorkerProcess(int num) {
worker_process = num;
pids = new int[worker_process];
for(int i=0; i<worker_process; ++i) {
pids[i] = -1;
}
}

void Worker::setSignal() {
signal(SIGINT, Worker::signalHandler);
signal(SIGUSR1, Worker::signalHandler);
signal(SIGUSR2, Worker::signalHandler);
}

void Worker::forkWorker() {
int i = worker_count;
for(; i<worker_process; ++i) {
pid_t pid = fork();
if (pid < 0) {
write_log(strerror(errno));
exit(0);
} else if (pid > 0) {
for (int j=0; j<worker_process; ++j) {
if (pids[j] == -1) {
pids[j] = pid;
break;
}
}
worker_count++;
} else {
run();
exit(-1);
}
}
}

void Worker::monitorWorker() {
int status, s;
while (1) {
pid_t pid = wait(&status);
s = WEXITSTATUS(status);
if (s == 0) {
for(int i=0; i<worker_count; ++i) {
if (pids[i] == pid) {
pids[i] = -1;
break;
}
}
}
worker_count--;
if (start_up == START_UP) {
forkWorker();
}
}
}

void Worker::signalHandler(int signo) {
switch(signo) {
case SIGINT:
{
if (master_pid == getpid()) {
for(int i=0; i<worker_process; ++i) {
if (pids[i] != -1) {
kill(pids[i], SIGKILL);
}
}
start_up = SHUT_DOWN;
if (unlink(glb_conf->master_pid_path) == -1) {
write_log(strerror(errno));
}
delete [] pids;
if (socket_fd > 0) {
close(socket_fd);
}
if (epfd > 0) {
close(epfd);
}
delete glb_conf;
kill(master_pid, SIGKILL);
}
}
break;
case SIGUSR1:
break;
case SIGUSR2:
break;
}
}

void Worker::runAll(Worker &worker) {
worker.setDeamon();
worker.savePid2File();
worker.setSignal();
worker.listen();
worker.forkWorker();
worker.monitorWorker();
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息