简单多线程服务器实现
2010-01-15 10:27
423 查看
闲来没事,本来是在学习nio框架的,突然发现对最原始的多线程服务器都不是很了解,遂自己写了个简单的例子。
1 package testmutithreadserver.old;
2
3 import java.io.IOException;
4 import java.net.ServerSocket;
5 import java.net.Socket;
6
7 import testmutithreadserver.old.threadpool.ThreadPool;
8
9 /**
* 简单阻塞式多线程服务器(线程池处理)
*
* @author zhangjun
*
*/
public class Server {
private int port;
private ServerSocket serverSocket;
private ThreadPool threadPool;
private PortListenThread listener;
public Server(int port) {
this.port = port;
threadPool = new ThreadPool();
}
public void start() {
try {
serverSocket = new ServerSocket(port);
listener = new PortListenThread();
listener.start();
} catch (IOException e) {
e.printStackTrace();
}
}
public void shutdown() {
threadPool.shutdown();
listener.finish();
}
private class PortListenThread extends Thread {
private Boolean finish = false;
@Override
public void run() {
while (!finish) {
try {
final Socket socket = serverSocket.accept();
threadPool.execute(new Runnable() {
@Override
public void run() {
new TestMessage(socket).execute();
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void finish() {
finish = true;
}
}
public static void main(String[] args) {
int port = 8888;
System.out.println("server is listening on port: " + port);
new Server(port).start();
}
}
这个Server调用的是自己实现的一个基于任务队列的简单线程池:
1 package testmutithreadserver.old.threadpool;
2
3 import java.util.LinkedList;
4
5 /**
6 * 简单线程池 (基于工作队列的同步线程池)
7 *
8 * @author zhangjun
9 *
10 */
11 public class ThreadPool extends ThreadGroup {
12 private final static String THREADPOOL = "thread pool";
13 private final static String WORKTHREAD = "work thread ";
14 private final static int DEFAULTSIZE = Runtime.getRuntime()
15 .availableProcessors() + 1;
16 private LinkedList<Runnable> taskQueue;
17 private boolean isPoolClose = false;
18
19 public ThreadPool() {
20 this(DEFAULTSIZE);
21 }
22
23 public ThreadPool(int size) {
24 super(THREADPOOL);
25 setDaemon(true);
26 taskQueue = new LinkedList<Runnable>();
27 initWorkThread(size);
28 }
29
30 private void initWorkThread(int size) {
31 for (int i = 0; i < size; i++) {
32 new WorkThread(WORKTHREAD + i).start();
33 }
34 try {
35 Thread.sleep(100 * size);
36 } catch (InterruptedException e) {
37 }
38 }
39
40 public synchronized void execute(Runnable task) {
41 if (isPoolClose) {
42 throw new IllegalStateException();
43 }
44 if (task != null) {
45 taskQueue.add(task);
46 notify();
47 }
48 }
49
50 private synchronized Runnable getTask() throws InterruptedException {
51 if (taskQueue.size() == 0) {
52 if (isPoolClose) {
53 return null;
54 }
55 wait();
56 }
57 if (taskQueue.size() == 0) {
58 return null;
59 }
60 return taskQueue.removeFirst();
61 }
62
63 public void shutdown() {
64 waitFinish();
65 synchronized (this) {
66 isPoolClose = true;
67 interrupt();
68 taskQueue.clear();
69 }
70 }
71
72 private void waitFinish() {
73 synchronized (this) {
74 isPoolClose = true;
75 notifyAll();
76 }
77 Thread[] threads = new Thread[activeCount()];
78 enumerate(threads);
79 try {
80 for (Thread t : threads) {
81 t.join();
82 }
83 } catch (InterruptedException e) {
84 //swallow this
85 }
86 }
87
88 private class WorkThread extends Thread {
89
90 public WorkThread(String name) {
91 super(ThreadPool.this, name);
92 }
93
94 @Override
95 public void run() {
96 while (!isInterrupted()) {
97 Runnable task = null;
98 try {
99 task = getTask();
} catch (InterruptedException e) {
//swallow this
}
if (task == null) {
return;
}
try {
task.run();
} catch (Throwable e) {
e.printStackTrace();
}
}
}
}
}
当然也可以直接使用concurrent的线程池,代码几乎不用改变:
1 package testmutithreadserver.concurrent;
2
3 import java.io.IOException;
4 import java.net.ServerSocket;
5 import java.net.Socket;
6 import java.util.concurrent.ExecutorService;
7 import java.util.concurrent.Executors;
8
9 import testmutithreadserver.old.TestMessage;
/**
* 简单阻塞式多线程服务器(线程池处理)
*
* @author zhangjun
*
*/
public class Server {
private int port;
private ServerSocket serverSocket;
private ExecutorService threadPool;
private PortListenThread listener;
public Server(int port) {
this.port = port;
threadPool = Executors.newFixedThreadPool(3);
}
public void start() {
try {
serverSocket = new ServerSocket(port);
listener = new PortListenThread();
listener.start();
} catch (IOException e) {
e.printStackTrace();
}
}
public void shutdown() {
threadPool.shutdown();
listener.finish();
}
private class PortListenThread extends Thread {
private Boolean finish = false;
@Override
public void run() {
while (!finish) {
try {
final Socket socket = serverSocket.accept();
threadPool.execute(new Runnable() {
@Override
public void run() {
new TestMessage(socket).execute();
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void finish() {
finish = true;
}
}
public static void main(String[] args) {
int port = 8888;
System.out.println("server is listening on port: " + port);
new Server(port).start();
}
}
里边我构造了一个Message接口:
1 package testmutithreadserver.old;
2
3 /**
4 * 通用消息接口
5 *
6 * @author zhangjun
7 *
8 */
9 public interface Message {
void execute();
}
以及实现了一个测试消息类:
1 package testmutithreadserver.old;
2
3 import java.io.BufferedReader;
4 import java.io.IOException;
5 import java.io.InputStreamReader;
6 import java.io.PrintWriter;
7 import java.net.Socket;
8
9 /**
* 测试消息
*
* @author zhangjun
*
*/
public class TestMessage implements Message {
private Socket socket;
public TestMessage(Socket socket) {
this.socket = socket;
}
@Override
public void execute() {
try {
BufferedReader in = new BufferedReader(new InputStreamReader(socket
.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
String s;
while ((s = in.readLine()) != null) {
System.out.println("received message:" + s);
if (s.equals("quit")) {
break;
}
out.println("hello " + s);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (!socket.isClosed()) {
socket.close();
}
} catch (IOException e) {
}
}
}
}
代码很简单,就不用多解释什么了。下一步打算用nio在自己写个非阻塞的服务器。
1 package testmutithreadserver.old;
2
3 import java.io.IOException;
4 import java.net.ServerSocket;
5 import java.net.Socket;
6
7 import testmutithreadserver.old.threadpool.ThreadPool;
8
9 /**
* 简单阻塞式多线程服务器(线程池处理)
*
* @author zhangjun
*
*/
public class Server {
private int port;
private ServerSocket serverSocket;
private ThreadPool threadPool;
private PortListenThread listener;
public Server(int port) {
this.port = port;
threadPool = new ThreadPool();
}
public void start() {
try {
serverSocket = new ServerSocket(port);
listener = new PortListenThread();
listener.start();
} catch (IOException e) {
e.printStackTrace();
}
}
public void shutdown() {
threadPool.shutdown();
listener.finish();
}
private class PortListenThread extends Thread {
private Boolean finish = false;
@Override
public void run() {
while (!finish) {
try {
final Socket socket = serverSocket.accept();
threadPool.execute(new Runnable() {
@Override
public void run() {
new TestMessage(socket).execute();
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void finish() {
finish = true;
}
}
public static void main(String[] args) {
int port = 8888;
System.out.println("server is listening on port: " + port);
new Server(port).start();
}
}
这个Server调用的是自己实现的一个基于任务队列的简单线程池:
1 package testmutithreadserver.old.threadpool;
2
3 import java.util.LinkedList;
4
5 /**
6 * 简单线程池 (基于工作队列的同步线程池)
7 *
8 * @author zhangjun
9 *
10 */
11 public class ThreadPool extends ThreadGroup {
12 private final static String THREADPOOL = "thread pool";
13 private final static String WORKTHREAD = "work thread ";
14 private final static int DEFAULTSIZE = Runtime.getRuntime()
15 .availableProcessors() + 1;
16 private LinkedList<Runnable> taskQueue;
17 private boolean isPoolClose = false;
18
19 public ThreadPool() {
20 this(DEFAULTSIZE);
21 }
22
23 public ThreadPool(int size) {
24 super(THREADPOOL);
25 setDaemon(true);
26 taskQueue = new LinkedList<Runnable>();
27 initWorkThread(size);
28 }
29
30 private void initWorkThread(int size) {
31 for (int i = 0; i < size; i++) {
32 new WorkThread(WORKTHREAD + i).start();
33 }
34 try {
35 Thread.sleep(100 * size);
36 } catch (InterruptedException e) {
37 }
38 }
39
40 public synchronized void execute(Runnable task) {
41 if (isPoolClose) {
42 throw new IllegalStateException();
43 }
44 if (task != null) {
45 taskQueue.add(task);
46 notify();
47 }
48 }
49
50 private synchronized Runnable getTask() throws InterruptedException {
51 if (taskQueue.size() == 0) {
52 if (isPoolClose) {
53 return null;
54 }
55 wait();
56 }
57 if (taskQueue.size() == 0) {
58 return null;
59 }
60 return taskQueue.removeFirst();
61 }
62
63 public void shutdown() {
64 waitFinish();
65 synchronized (this) {
66 isPoolClose = true;
67 interrupt();
68 taskQueue.clear();
69 }
70 }
71
72 private void waitFinish() {
73 synchronized (this) {
74 isPoolClose = true;
75 notifyAll();
76 }
77 Thread[] threads = new Thread[activeCount()];
78 enumerate(threads);
79 try {
80 for (Thread t : threads) {
81 t.join();
82 }
83 } catch (InterruptedException e) {
84 //swallow this
85 }
86 }
87
88 private class WorkThread extends Thread {
89
90 public WorkThread(String name) {
91 super(ThreadPool.this, name);
92 }
93
94 @Override
95 public void run() {
96 while (!isInterrupted()) {
97 Runnable task = null;
98 try {
99 task = getTask();
} catch (InterruptedException e) {
//swallow this
}
if (task == null) {
return;
}
try {
task.run();
} catch (Throwable e) {
e.printStackTrace();
}
}
}
}
}
当然也可以直接使用concurrent的线程池,代码几乎不用改变:
1 package testmutithreadserver.concurrent;
2
3 import java.io.IOException;
4 import java.net.ServerSocket;
5 import java.net.Socket;
6 import java.util.concurrent.ExecutorService;
7 import java.util.concurrent.Executors;
8
9 import testmutithreadserver.old.TestMessage;
/**
* 简单阻塞式多线程服务器(线程池处理)
*
* @author zhangjun
*
*/
public class Server {
private int port;
private ServerSocket serverSocket;
private ExecutorService threadPool;
private PortListenThread listener;
public Server(int port) {
this.port = port;
threadPool = Executors.newFixedThreadPool(3);
}
public void start() {
try {
serverSocket = new ServerSocket(port);
listener = new PortListenThread();
listener.start();
} catch (IOException e) {
e.printStackTrace();
}
}
public void shutdown() {
threadPool.shutdown();
listener.finish();
}
private class PortListenThread extends Thread {
private Boolean finish = false;
@Override
public void run() {
while (!finish) {
try {
final Socket socket = serverSocket.accept();
threadPool.execute(new Runnable() {
@Override
public void run() {
new TestMessage(socket).execute();
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void finish() {
finish = true;
}
}
public static void main(String[] args) {
int port = 8888;
System.out.println("server is listening on port: " + port);
new Server(port).start();
}
}
里边我构造了一个Message接口:
1 package testmutithreadserver.old;
2
3 /**
4 * 通用消息接口
5 *
6 * @author zhangjun
7 *
8 */
9 public interface Message {
void execute();
}
以及实现了一个测试消息类:
1 package testmutithreadserver.old;
2
3 import java.io.BufferedReader;
4 import java.io.IOException;
5 import java.io.InputStreamReader;
6 import java.io.PrintWriter;
7 import java.net.Socket;
8
9 /**
* 测试消息
*
* @author zhangjun
*
*/
public class TestMessage implements Message {
private Socket socket;
public TestMessage(Socket socket) {
this.socket = socket;
}
@Override
public void execute() {
try {
BufferedReader in = new BufferedReader(new InputStreamReader(socket
.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
String s;
while ((s = in.readLine()) != null) {
System.out.println("received message:" + s);
if (s.equals("quit")) {
break;
}
out.println("hello " + s);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (!socket.isClosed()) {
socket.close();
}
} catch (IOException e) {
}
}
}
}
代码很简单,就不用多解释什么了。下一步打算用nio在自己写个非阻塞的服务器。
相关文章推荐
- python多进程、多线程服务器和客户端的简单实现
- C/S模型 多线程服务器实现简单计算工作并回馈客户端
- Java基于Socket实现简单的多线程回显服务器功能示例
- 用Python实现一个简单的多线程TCP服务器的教程
- 【网络】实现简单的TCP、UDP服务器、TCP多进程/多线程服务器
- Java Socket实现一个简单的多线程回显服务器。
- 用 Java 实现一个简单的多线程 web 服务器
- JavaWeb多线程简单Web服务器实现
- 用Python实现一个简单的多线程TCP服务器的教程
- C#实现简单WEB服务器
- 使用select系统调用实现简单的TCP服务器
- 在linux下实现简单聊天系统(三)服务器
- 基于TCP协议用多线程实现并发服务器,实现思路、算法和demo
- Java实例开发05-01 简单的多线程服务器
- 一个简单的文件服务器实现方案
- java基础:Web服务器原理 以及 用java简单实现
- 【Java】Java多线程实现的聊天客户端和服务器
- Java网络与多线程系列之1:实现一个简单的对象池
- Socket 简单实现服务器和客户端
- Java实现通过服务器实现客户端之间的简单群聊