您的位置:首页 > 其它

简单多线程服务器实现

2010-01-15 10:27 423 查看
闲来没事,本来是在学习nio框架的,突然发现对最原始的多线程服务器都不是很了解,遂自己写了个简单的例子。

1 package testmutithreadserver.old;
2
3 import java.io.IOException;
4 import java.net.ServerSocket;
5 import java.net.Socket;
6
7 import testmutithreadserver.old.threadpool.ThreadPool;
8
9 /**
* 简单阻塞式多线程服务器(线程池处理)
*
* @author zhangjun
*
*/
public class Server {

private int port;

private ServerSocket serverSocket;

private ThreadPool threadPool;

private PortListenThread listener;

public Server(int port) {
this.port = port;
threadPool = new ThreadPool();
}

public void start() {
try {
serverSocket = new ServerSocket(port);
listener = new PortListenThread();
listener.start();
} catch (IOException e) {
e.printStackTrace();
}
}

public void shutdown() {
threadPool.shutdown();
listener.finish();
}

private class PortListenThread extends Thread {

private Boolean finish = false;

@Override
public void run() {
while (!finish) {
try {
final Socket socket = serverSocket.accept();
threadPool.execute(new Runnable() {

@Override
public void run() {
new TestMessage(socket).execute();
}
});
} catch (IOException e) {
e.printStackTrace();
}

}
}

public void finish() {
finish = true;
}

}

public static void main(String[] args) {
int port = 8888;
System.out.println("server is listening on port: " + port);
new Server(port).start();
}

}

这个Server调用的是自己实现的一个基于任务队列的简单线程池:

1 package testmutithreadserver.old.threadpool;
2
3 import java.util.LinkedList;
4
5 /**
6 * 简单线程池 (基于工作队列的同步线程池)
7 *
8 * @author zhangjun
9 *
10 */
11 public class ThreadPool extends ThreadGroup {
12 private final static String THREADPOOL = "thread pool";
13 private final static String WORKTHREAD = "work thread ";
14 private final static int DEFAULTSIZE = Runtime.getRuntime()
15 .availableProcessors() + 1;
16 private LinkedList<Runnable> taskQueue;
17 private boolean isPoolClose = false;
18
19 public ThreadPool() {
20 this(DEFAULTSIZE);
21 }
22
23 public ThreadPool(int size) {
24 super(THREADPOOL);
25 setDaemon(true);
26 taskQueue = new LinkedList<Runnable>();
27 initWorkThread(size);
28 }
29
30 private void initWorkThread(int size) {
31 for (int i = 0; i < size; i++) {
32 new WorkThread(WORKTHREAD + i).start();
33 }
34 try {
35 Thread.sleep(100 * size);
36 } catch (InterruptedException e) {
37 }
38 }
39
40 public synchronized void execute(Runnable task) {
41 if (isPoolClose) {
42 throw new IllegalStateException();
43 }
44 if (task != null) {
45 taskQueue.add(task);
46 notify();
47 }
48 }
49
50 private synchronized Runnable getTask() throws InterruptedException {
51 if (taskQueue.size() == 0) {
52 if (isPoolClose) {
53 return null;
54 }
55 wait();
56 }
57 if (taskQueue.size() == 0) {
58 return null;
59 }
60 return taskQueue.removeFirst();
61 }
62
63 public void shutdown() {
64 waitFinish();
65 synchronized (this) {
66 isPoolClose = true;
67 interrupt();
68 taskQueue.clear();
69 }
70 }
71
72 private void waitFinish() {
73 synchronized (this) {
74 isPoolClose = true;
75 notifyAll();
76 }
77 Thread[] threads = new Thread[activeCount()];
78 enumerate(threads);
79 try {
80 for (Thread t : threads) {
81 t.join();
82 }
83 } catch (InterruptedException e) {
84 //swallow this
85 }
86 }
87
88 private class WorkThread extends Thread {
89
90 public WorkThread(String name) {
91 super(ThreadPool.this, name);
92 }
93
94 @Override
95 public void run() {
96 while (!isInterrupted()) {
97 Runnable task = null;
98 try {
99 task = getTask();
} catch (InterruptedException e) {
//swallow this
}
if (task == null) {
return;
}
try {
task.run();
} catch (Throwable e) {
e.printStackTrace();
}
}
}

}
}

当然也可以直接使用concurrent的线程池,代码几乎不用改变:

1 package testmutithreadserver.concurrent;
2
3 import java.io.IOException;
4 import java.net.ServerSocket;
5 import java.net.Socket;
6 import java.util.concurrent.ExecutorService;
7 import java.util.concurrent.Executors;
8
9 import testmutithreadserver.old.TestMessage;

/**
* 简单阻塞式多线程服务器(线程池处理)
*
* @author zhangjun
*
*/
public class Server {

private int port;

private ServerSocket serverSocket;

private ExecutorService threadPool;

private PortListenThread listener;

public Server(int port) {
this.port = port;
threadPool = Executors.newFixedThreadPool(3);
}

public void start() {
try {
serverSocket = new ServerSocket(port);
listener = new PortListenThread();
listener.start();
} catch (IOException e) {
e.printStackTrace();
}
}

public void shutdown() {
threadPool.shutdown();
listener.finish();
}

private class PortListenThread extends Thread {

private Boolean finish = false;

@Override
public void run() {
while (!finish) {
try {
final Socket socket = serverSocket.accept();
threadPool.execute(new Runnable() {

@Override
public void run() {
new TestMessage(socket).execute();
}
});
} catch (IOException e) {
e.printStackTrace();
}

}
}

public void finish() {
finish = true;
}

}

public static void main(String[] args) {
int port = 8888;
System.out.println("server is listening on port: " + port);
new Server(port).start();
}
}

里边我构造了一个Message接口:

1 package testmutithreadserver.old;
2
3 /**
4 * 通用消息接口
5 *
6 * @author zhangjun
7 *
8 */
9 public interface Message {

void execute();

}

以及实现了一个测试消息类:

1 package testmutithreadserver.old;
2
3 import java.io.BufferedReader;
4 import java.io.IOException;
5 import java.io.InputStreamReader;
6 import java.io.PrintWriter;
7 import java.net.Socket;
8
9 /**
* 测试消息
*
* @author zhangjun
*
*/
public class TestMessage implements Message {

private Socket socket;

public TestMessage(Socket socket) {
this.socket = socket;
}

@Override
public void execute() {
try {
BufferedReader in = new BufferedReader(new InputStreamReader(socket
.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
String s;
while ((s = in.readLine()) != null) {
System.out.println("received message:" + s);
if (s.equals("quit")) {
break;
}
out.println("hello " + s);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (!socket.isClosed()) {
socket.close();
}
} catch (IOException e) {
}
}
}

}

代码很简单,就不用多解释什么了。下一步打算用nio在自己写个非阻塞的服务器。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: