[并发并行]_[线程模型]_[Pthread线程使用模型之三 客户端/服务端模型(Client/Server]
2016-12-25 15:55
525 查看
Pthread线程使用模型之三 客户端/服务端模型(Client/Server)
场景
1.在客户端/服务端模型时,客户端向服务端请求一些数据集的操作. 服务端执行执行操作独立的(多进程或跨网络)– 客户端可以等待服务端响应再做其他任务或者做一些并行的操作,在一段时间后被通知时再去查询结果. 虽然客户端等待服务端是最简单的方式, 但是这种情况极少使用, 因为它不具备速度和性能优势, 同步的只适合获取一些普通的资源. 比如Socket i/O的非阻塞或异步模型.2.在做一些复杂的命令行程序时(单元测试),也需要这种模型.比如实现并发的方法测试,或者需要异步网络通讯的测试,或者通按键的功能执行(1: 执行导入. 2: 执行输出 q:执行退出)
说明
Server的作用就是并发的处理请求并分发任务.Client可以同步等待或者异步执行, 在实际的项目开发中, 一般是Socket i/o来实现 Client和Server.
例子
1.程序server.c, 每个线程都在反复的读,接着输出, 输入几行. 当程序在运行时, 你能看到线程在不同的顺序输出提醒, 也有其他线程在输出之前先提示. 但是你将不会发现一个提示或一个输出在prompt服务器里穿插在提示和读取(stdin)之间.2.request_t结构体定义了对服务器的每个请求. 未处理的requests使用next来连接成链表.
3.tty_server_t结构体提供server线程的上下文.它有两个同步对象(mutex and request). 一个标记位表明了这个服务是否正在运行, 同时一个requests列表表明了有哪些未处理的请求.
4.主程序和客户端线程使用同步对象client_mutex和clients_done来通讯, 而不是使用pthread_join.
5.创建detach线程的两种方法:
// 方法1: pthread_t thread; pthread_attr_t detached_attr; status = pthread_attr_init (&detached_attr); if (status != 0) err_abort (status, "Init attributes object"); status = pthread_attr_setdetachstate ( &detached_attr, PTHREAD_CREATE_DETACHED); if (status != 0) err_abort (status, "Set detach state"); tty_server.running = 1; status = pthread_create (&thread, &detached_attr, tty_server_routine, NULL); // 方法2: pthread_t thread; status = pthread_create (&thread, NULL, tty_server_routine, NULL); pthread_detach(thread);
/* * server.c * * Demonstrate a client/server threading model. * * Special notes: On a Solaris system, call thr_setconcurrency() * to allow interleaved thread execution, since threads are not * timesliced. */ #include <pthread.h> #include <math.h> #include "errors.h" #define CLIENT_THREADS 4 /* Number of clients */ #define REQ_READ 1 /* Read with prompt */ #define REQ_WRITE 2 /* Write */ #define REQ_QUIT 3 /* Quit server */ /* * Internal to server "package" -- one for each request. */ typedef struct request_tag { struct request_tag *next; /* Link to next */ int operation; /* Function code */ int synchronous; /* Non-zero if synchronous */ int done_flag; /* Predicate for wait */ pthread_cond_t done; /* Wait for completion */ char prompt[32]; /* Prompt string for reads */ char text[128]; /* Read/write text */ } request_t; /* * Static context for the server */ typedef struct tty_server_tag { request_t *first; request_t *last; int running; pthread_mutex_t mutex; pthread_cond_t request; } tty_server_t; tty_server_t tty_server = { NULL, NULL, 0, PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER}; /* * Main program data */ int client_threads; pthread_mutex_t client_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t clients_done = PTHREAD_COND_INITIALIZER; /* * The server start routine. It waits for a request to appear * in tty_server.requests using the request condition variable. * It processes requests in FIFO order. If a request is marked * "synchronous" (synchronous != 0), the server will set done_flag * and signal the request's condition variable. The client is * responsible for freeing the request. If the request was not * synchronous, the server will free the request on completion. */ void *tty_server_routine (void *arg) { static pthread_mutex_t prompt_mutex = PTHREAD_MUTEX_INITIALIZER; request_t *request; int operation, len; int status; while (1) { status = pthread_mutex_lock (&tty_server.mutex); if (status != 0) err_abort (status, "Lock server mutex"); /* * Wait for data */ while (tty_server.first == NULL) { status = pthread_cond_wait ( &tty_server.request, &tty_server.mutex); if (status != 0) err_abort (status, "Wait for request"); } request = tty_server.first; tty_server.first = request->next; if (tty_server.first == NULL) tty_server.last = NULL; status = pthread_mutex_unlock (&tty_server.mutex); if (status != 0) err_abort (status, "Unlock server mutex"); /* * Process the data */ operation = request->operation; switch (operation) { case REQ_QUIT: break; case REQ_READ: if (strlen (request->prompt) > 0) printf (request->prompt); if (fgets (request->text, 128, stdin) == NULL) request->text[0] = '\0'; /* * Because fgets returns the newline, and we don't want it, * we look for it, and turn it into a null (truncating the * input) if found. It should be the last character, if it is * there. */ len = strlen (request->text); if (len > 0 && request->text[len-1] == '\n') request->text[len-1] = '\0'; break; case REQ_WRITE: puts (request->text); break; default: break; } if (request->synchronous) { status = pthread_mutex_lock (&tty_server.mutex); if (status != 0) err_abort (status, "Lock server mutex"); request->done_flag = 1; status = pthread_cond_signal (&request->done); if (status != 0) err_abort (status, "Signal server condition"); status = pthread_mutex_unlock (&tty_server.mutex); if (status != 0) err_abort (status, "Unlock server mutex"); } else free (request); if (operation == REQ_QUIT) break; } return NULL; }
5.tty_server_routine用来持续处理客户端请求直到退出请求发出. 如果一个请求标记为同步(synchronous不为0),那么server设置done_flag的值并且通知done条件变量. 当请求是同步的, 那么客户端有责任去释放request包. 如果请求是异步的, 那么server在完成request时会释放.
void tty_server_request ( int operation, int sync, const char *prompt, char *string) { request_t *request; int status; status = pthread_mutex_lock (&tty_server.mutex); if (status != 0) err_abort (status, "Lock server mutex"); if (!tty_server.running) { pthread_t thread; pthread_attr_t detached_attr; status = pthread_attr_init (&detached_attr); if (status != 0) err_abort (status, "Init attributes object"); status = pthread_attr_setdetachstate ( &detached_attr, PTHREAD_CREATE_DETACHED); if (status != 0) err_abort (status, "Set detach state"); tty_server.running = 1; status = pthread_create (&thread, &detached_attr, tty_server_routine, NULL); if (status != 0) err_abort (status, "Create server"); /* * Ignore an error in destroying the attributes object. * It's unlikely to fail, there's nothing useful we can * do about it, and it's not worth aborting the program * over it. */ pthread_attr_destroy (&detached_attr); } /* * Create and initialize a request structure. */ request = (request_t*)malloc (sizeof (request_t)); if (request == NULL) errno_abort ("Allocate request"); request->next = NULL; request->operation = operation; request->synchronous = sync; if (sync) { request->done_flag = 0; status = pthread_cond_init (&request->done, NULL); if (status != 0) err_abort (status, "Init request condition"); } if (prompt != NULL) strncpy (request->prompt, prompt, 32); else request->prompt[0] = '\0'; if (operation == REQ_WRITE && string != NULL) strncpy (request->text, string, 128); else request->text[0] = '\0'; /* * Add the request to the queue, maintaining the first and * last pointers. */ if (tty_server.first == NULL) { tty_server.first = request; tty_server.last = request; } else { (tty_server.last)->next = request; tty_server.last = request; } /* * Tell the server that a request is available. */ status = pthread_cond_signal (&tty_server.request); if (status != 0) err_abort (status, "Wake server"); /* * If the request was "synchronous", then wait for a reply. */ if (sync) { while (!request->done_flag) { status = pthread_cond_wait ( &request->done, &tty_server.mutex); if (status != 0) err_abort (status, "Wait for sync request"); } if (operation == REQ_READ) { if (strlen (request->text) > 0) strcpy (string, request->text); else string[0] = '\0'; } status = pthread_cond_destroy (&request->done); if (status != 0) err_abort (status, "Destroy request condition"); free (request); } status = pthread_mutex_unlock (&tty_server.mutex); if (status != 0) err_abort (status, "Unlock mutex"); }
6.tty_server_request 用来初始化一个发送到tty server的请求. 如果tty server线程没有在运行, 启动一个, 并创建一个属性对象(detached_attr), 并设置detachstate属性值为 PTHREAD_CREATE_DETACHED.
PTHREAD_CREATE_JOINABLE – pthread_create创建时默认就是joinable, 意味着通过pthread_create创建的线程id能被用来join with这个线程,查询线程的返回值或者取消它.
PTHREAD_CREATE_DETACHED – 通过pthread_create创建的id是不能再用饿了,当线程终止时, 任何被线程使用的资源都会被系统迅速回收. 也可以在pthread_create之后通过pthread_detach转换为detach线程. 建议如果不需要控制线程, 就把它设置为detach.
7.分配和初始化一个server的请求(request_t)包. 如果请求是同步的, 初始化request里的条件变量done(注意配合使用的还是server的mutex,这里和server共用mutex,如果是同步请求的话,server只能等待client的请求完成并释放mutex才能继续执行下一个request,效率并不高)
/* * Client routine -- multiple copies will request server. */ void *client_routine (void *arg) { int my_number = (int)arg, loops; char prompt[32]; char string[128], formatted[128]; int status; sprintf (prompt, "Client %d> ", my_number); while (1) { tty_server_request (REQ_READ, 1, prompt, string); if (strlen (string) == 0) break; for (loops = 0; loops < 4; loops++) { sprintf ( formatted, "(%d#%d) %s", my_number, loops, string); tty_server_request (REQ_WRITE, 0, NULL, formatted); sleep (1); } } status = pthread_mutex_lock (&client_mutex); if (status != 0) err_abort (status, "Lock client mutex"); client_threads--; if (client_threads <= 0) { status = pthread_cond_signal (&clients_done); if (status != 0) err_abort (status, "Signal clients done"); } status = pthread_mutex_unlock (&client_mutex); if (status != 0) err_abort (status, "Unlock client mutex"); return NULL; } int main (int argc, char *argv[]) { pthread_t thread; int count; int status; #ifdef sun /* * On Solaris 2.5, threads are not timesliced. To ensure * that our threads can run concurrently, we need to * increase the concurrency level to CLIENT_THREADS. */ DPRINTF (("Setting concurrency level to %d\n", CLIENT_THREADS)); thr_setconcurrency (CLIENT_THREADS); #endif /* * Create CLIENT_THREADS clients. */ client_threads = CLIENT_THREADS; for (count = 0; count < client_threads; count++) { status = pthread_create (&thread, NULL, client_routine, (void*)count); if (status != 0) err_abort (status, "Create client thread"); } status = pthread_mutex_lock (&client_mutex); if (status != 0) err_abort (status, "Lock client mutex"); while (client_threads > 0) { status = pthread_cond_wait (&clients_done, &client_mutex); if (status != 0) err_abort (status, "Wait for clients to finish"); } status = pthread_mutex_unlock (&client_mutex); if (status != 0) err_abort (status, "Unlock client mutex"); printf ("All clients done\n"); tty_server_request (REQ_QUIT, 1, NULL, NULL); return 0; }
相关文章推荐
- [并发并行]_[线程模型]_[Pthread线程使用模型之三 客户端/服务端模型(Client/Server]
- [并发并行]_[线程模型]_[Pthread线程使用模型之一管道Pipeline]
- [并发并行]_[线程模型]_[Pthread线程使用模型之一管道Pipeline]
- [并发并行]_[线程模型]_[Pthread线程使用模型之二 工作组work crew]
- [并发并行]_[线程模型]_[Pthread线程使用模型之二 工作组work crew]
- [并发并行]_[C/C++]_[使用线程本地存储Thread Local Storage(TLS)-win32线程和pthread线程比较]
- spring-oauth-server实践:使用授权方式四:client_credentials 模式的客户端和服务端交互
- [并发并行]_[C/C++]_[使用线程本地存储Thread Local Storage(TLS)-win32线程和pthread线程比较]
- [并发并行]_[任务停止]_[使用Pthread的线程本地存储来停止任务执行]
- php中soap使用,SoapServer服务端编写,SoapClient客户端编写
- ZSChatServer线程设计模型---客户端测试程序
- java 和 C++ Socket通信(java作为服务端server,C++作为客户端client,解决中文乱码问题GBK和UTF8)
- [企业化NET]Window Server 2008 R2[3]-SVN 服务端 和 客户端 基本使用
- java 和 C++ Socket通信(java作为服务端server,C++作为客户端client,解决中文乱码问题GBK和UTF8)
- libevent基础:用libevent写服务端server程序和客户端client程序
- [并发并行]_[pthread]_[使用线程池并发复制文件]
- 基于Android平台实战无线点餐系统(客户端(Client)和服务端(Server))
- java 和 C++ Socket通信(java作为服务端server,C++作为客户端client,解决中文乱码问题GBK和UTF8)
- java 和 C++ Socket通信(java作为服务端server,C++作为客户端client,解决中文乱码问题GBK和UTF8)
- AS3 服务端、客户端协议路由简单写法 ---- AS3 Server、Client Router Demo