您的位置:首页 > 编程语言 > Java开发

Java语言深入:关于多线程程序模型研究

2009-07-23 13:08 681 查看
多线程是较复杂程序设计过程中不可缺少的一部分。为了提高应用程序运行的性能,采用多线程的设计是一种比较可行的方案。本文通过介绍使用Java编写的扫描计算机端口的实例,来说明多线程设计中应注意的问题,以及得出经常使用的多线程模型。

本文要求读者具备一定的Java语言基础,对Socket有一定的了解。本文的所有程序在Java SDK 1.4.2编译通过并能正常运行。

现在,我们需要对一台主机扫描其端口,找出哪些端口是open的状态。我们先采用单线程进行处理,程序代码如下:

-------------------------------------------------------------------------------------------------------

import java.io.IOException;

import java.net.Socket;

import java.net.UnknownHostException;

public class PortScannerSingleThread {

public static void main(String[] args) {

String host = null; //第一个参数,目标主机。

int beginport = 1; //第二个参数,开始端口。

int endport = 65535; //第三个参数,结束端口。

try{

host = args[0];

beginport = Integer.parseInt(args[1]);

endport = Integer.parseInt(args[2]);

if(beginport <= 0 || endport >= 65536 || beginport > endport){

throw new Exception("Port is illegal");

}

}catch(Exception e){

System.out.println("Usage: java PortScannerSingleThread host beginport endport");

System.exit(0);

}

for (int i = beginport; i <= endport; i++) {

try {

Socket s = new Socket(host, i);

System.out.println("The port " + i + " is opened at " + host);

}catch (UnknownHostException ex) {

System.err.println(ex);

break;

}catch (IOException ex) {

}

}

}

}

--------------------------------------------------------------------------------------------------------


以上程序中,通过java.net.Socket类来识别端口是否是open状态。程序接受3个参数,第一个参数是主机IP,第二和第三个参数是需要扫描
的起始和中止的端口号(1~65535)。本程序(java PortScannerSingleThread 10.1.1.1 1 1000)运行结
果如下:

The port 25 is opened at 10.1.1.182

The port 110 is opened at 10.1.1.182

The port 135 is opened at 10.1.1.182

...

但是,以上程序运行效率实在不敢恭维,把目标主机端口扫描一遍需要十几分钟甚至更长,估计没有哪个用户可以忍受这样的效率。

所以,提高程序处理效率是必须的,下面的程序通过多线程的方法来进行处理。程序代码如下:

----------------------------------------------------------------------------------------------------------

import java.io.IOException;

import java.net.Socket;

import java.net.UnknownHostException;

public class PortScannerMultiThread {

public static void main(String[] args) {

String host = null;

int beginport = 1;

int endport = 65535;

try{

host = args[0];

beginport = Integer.parseInt(args[1]);

endport = Integer.parseInt(args[2]);

if(beginport <= 0 || endport >= 65536 || beginport > endport){

throw new Exception("Port is illegal");

}

}catch(Exception e){

System.out.println("Usage: java PortScannerSingleThread host beginport endport");

System.exit(0);

}

for (int i = beginport; i <= endport; i++) {

PortProcessor pp = new PortProcessor(host,i); //一个端口创建一个线程

pp.start();

}

}

}

class PortProcessor extends Thread{

String host;

int port;

PortProcessor(String host, int port){

this.host = host;

this.port = port;

}

public void run(){

try{

Socket s = new Socket(host,port);

System.out.println("The port " + port + " is opened at " + host);

}catch(UnknownHostException ex){

System.err.println(ex);

}catch(IOException ioe){

}

}

}

------------------------------------------------------------------------------------------------------------


上程序在for循环结构中创建PortProcessor对象,PortProcessor类是线程类,其关键的Socket在
public void run()方法中实现。此程序比第一个单线程的程序运行效率提高很多倍,几乎在几秒钟内得出结果。所以可见多线程处理是何等的重
要。

程序(java PortScannerMultiThread 10.1.1.100 1 1000)运行结果如下:

The port 25 is opened at 10.1.1.100

The port 42 is opened at 10.1.1.100

The port 88 is opened at 10.1.1.100

...


细对第2个程序分析,不难发现其中的问题:创建的线程个数是不固定的,取决于输入的第二和第三个参数。如果扫描1~100端口,那么主线程就产生100个
线程来分别处理;如果扫描1~10000端口,主线程就会产生10000个线程来进行处理。在JVM中创建如此多的线程同样会带来性能上的问题,因为线程
的创建和消失都是需要花费系统资源的。所以以上的第二个程序也存在明显的不足。

所以,我们需要一个确定数量的线程在JVM中运行,这样就需要了解“线程池”(ThreadPool)的概念。线程池在多线程程序设计中是比不可少的,而且初学者不太容易掌握,下面通过对线程池的介绍,结合第3和第4个程序,引出两种常用的线程池模型。


一种实现线程池的方法是:创建一个”池“,在”池“中增加要处理的数据对象,然后创建一定数量的线程,这些线程对”池“中的对象进行处理。当”池“是空的
时候,每个线程处于等待状态;当往”池“里添加一个对象,通知所有等待的线程来处理(当然一个对象只能有一个线程来处理)。

第二种方法是:同样创建一个”池“,但是在”池“中放的不是数据对象,而是线程,可以把”池“中的一个个线程比喻成一个个”工人“,当没有任务的时候,”工人“们严阵以待;当给”池“添加一个任务后,”工人“就开始处理并直到处理完成。


第3个程序中,定义了List类型的entries作为“池”,这个“池”用来保存需要扫描的端口,List中的元素必须是Object类型,不能用基本
数据类型int往池里添加,而需要用使用Integer。在processMethod()方法中,首先就启动一定数量的PortThread线程,同时
在while循环中通过entries.add(0, new Integer(port))往“池”里添加对象。在PortThread类的run()
方法中通过entry = (Integer)entries.remove(entries.size()-1);取得“池”中的对象,转换成int后
传递给Socket构造方法。

第3个程序如下:

-----------------------------------------------------------------------------------------------------------------------

import java.io.IOException;

import java.net.InetAddress;

import java.net.Socket;

import java.net.UnknownHostException;

import java.util.Collections;

import java.util.LinkedList;

import java.util.List;

public class PortScanner {

private List entries = Collections.synchronizedList(new LinkedList()); //这个”池“比较特别

int numofthreads;

static int port;

int beginport;

int endport;

InetAddress remote = null;

public boolean isFinished(){

if(port >= endport){

return true;

}else{

return false;

}

}

PortScanner(InetAddress addr, int beginport, int endport, int numofthreads){

this.remote = addr;

this.beginport = beginport;

this.endport = endport;

this.numofthreads = numofthreads;

}

public void processMethod(){

for(int i = 0; i < numofthreads; i++){ //创建一定数量的线程并运行

Thread t = new PortThread(remote, entries, this);

t.start();

}

port = beginport;

while(true){

if(entries.size() > numofthreads){

try{

Thread.sleep(1000); //”池“中的内容太多的话就sleep

}catch(InterruptedException ex){

}

continue;

}

synchronized(entries){

if(port > endport) break;

entries.add(0, new Integer(port)); //往”池“里添加对象,需要使用int对应的Integer类

entries.notifyAll();

port++;

}

}

}

public static void main(String[] args) {

String host = null;

int beginport = 1;

int endport = 65535;

int nThreads = 100;

try{

host = args[0];

beginport = Integer.parseInt(args[1]);

endport = Integer.parseInt(args[2]);

nThreads = Integer.parseInt(args[3]);

if(beginport <= 0 || endport >= 65536 || beginport > endport){

throw new Exception("Port is illegal");

}

}catch(Exception e){

System.out.println("Usage: java PortScannerSingleThread host beginport endport nThreads");

System.exit(0);

}

try{

PortScanner scanner = new PortScanner(InetAddress.getByName(host), beginport, endport, nThreads);

scanner.processMethod();

}catch(UnknownHostException ex){

}

}

}

class PortThread extends Thread{

private InetAddress remote;

private List entries;

PortScanner scanner;

PortThread(InetAddress add, List entries, PortScanner scanner){

this.remote = add;

this.entries = entries;

this.scanner = scanner;

}

public void run(){

Integer entry;

while(true){

synchronized(entries){

while(entries.size() == 0){

if(scanner.isFinished()) return;

try{

entries.wait(); //”池“里没内容就只能等了

}catch(InterruptedException ex){

}

}

entry = (Integer)entries.remove(entries.size()-1); //把”池“里的东西拿出来进行处理

}

Socket s = null;

try{

s = new Socket(remote, entry.intValue());

System.out.println("The port of " + entry.toString() + " of the remote " + remote +" is opened.");

}catch(IOException e){

}finally{

try{

if(s != null) s.close();

}catch(IOException e){

}

}

}

}

}

--------------------------------------------------------------------------------------------------------------------------

以上程序需要4个参数,输入java PortScanner 10.1.1.182 1 10000 100运行(第4个参数是线程数),结果前两个程序一样,但是速度比第一个要快,可能比第二个要慢一些。


3个程序是把端口作为“池”中的对象,下面我们看第4个实现方式,把“池”里面的对象定义成是线程类,把具体的任务定义成”池“中线程类的参数。第4个程
序有2个文件组成,分别是ThreadPool.java和PortScannerByThreadPool.java.

ThreadPool.java文件内容如下:

--------------------------------------------------------------------------------------------------------------------------

import java.util.LinkedList;

public class ThreadPool{

private final int nThreads;

private final PoolWorker[] threads;

private final LinkedList queue;

public ThreadPool(int nThreads){

this.nThreads = nThreads;

queue = new LinkedList();

threads = new PoolWorker[nThreads];

for (int i=0; i

threads[i] = new PoolWorker();

threads[i].start();

}

}

public void execute(Runnable r) {

synchronized(queue) {

queue.addLast(r);

queue.notifyAll();

}

}

private class PoolWorker extends Thread {

public void run() {

Runnable r;

while (true) {

synchronized(queue) {

while (queue.isEmpty()) {

try{

queue.wait();

}catch (InterruptedException ignored){

}

}

r = (Runnable) queue.removeFirst();

}

try {

r.run();

}

catch (RuntimeException e) {

}

}

}

}

}

------------------------------------------------------------------------------------------------------------------


ThreadPool.java文件中定义了2个类:ThreadPool和PoolWorker。ThreadPool类中的nThreads变量表示
线程数,PoolWorker数组类型的threads变量表示线程池中的“工人”,这些“工人”的工作就是一直循环处理通过
queue.addLast(r)加入到“池”中的任务。

PortScannerByThreadPool.java文件内容如下:

-------------------------------------------------------------------------------------------------------------------

import java.io.IOException;

import java.net.InetAddress;

import java.net.Socket;

public class PortScannerByThreadPool {

public static void main(String[] args) {

String host = null;

int beginport = 1;

int endport = 65535;

int nThreads = 100;

try{

host = args[0];

beginport = Integer.parseInt(args[1]);

endport = Integer.parseInt(args[2]);

nThreads = Integer.parseInt(args[3]);

if(beginport <= 0 || endport >= 65536 || beginport > endport){

throw new Exception("Port is illegal");

}

}catch(Exception e){

System.out.println("Usage: java PortScannerSingleThread host beginport endport nThreads");

System.exit(0);

}

ThreadPool tp = new ThreadPool(nThreads);

for(int i = beginport; i <= endport; i++){

Scanner ps = new Scanner(host,i);

tp.execute(ps);

}

}

}

class Scanner implements Runnable{

String host;

int port;

Scanner(String host, int port){

this.host = host;

this.port = port;

}

public void run(){

Socket s = null;

try{

s = new Socket(InetAddress.getByName(host),port);

System.out.println("The port of " + port + " is opened.");

}catch(IOException ex){

}finally{

try{

if(s != null) s.close();

}catch(IOException e){

}

}

}

}

---------------------------------------------------------------------------------------------------------------------

PortScannerByThreadPool
是主程序类,处理输入的4个参数(和第3个程序是一样的):主机名、开始端口、结束端口和线程数。Scanner类定义了真正的”任务“。在
PortScannerByThreadPool中通过new ThreadPool(nThreads)创建ThreadPool对象,然后在for循
环中通过new Scanner(host,i)创建”任务“对象,再通过tp.execute(ps)把”任务“对象添加到”池“中。


者可以编译运行第4个程序,得出的结果和前面的是一样的。但是第4和第3个程序之间最大的差别就是:第4个程序会一直运行下去,不会自动结束。在第3个程
序中存在一个isFinished()方法,可以用来判断任务是否处理完毕,而第4个程序中没有这样做。请读者自己思考这个问题。


第3和第4个程序中,我们可以概括出多线程的模型。第3个程序的线程”池“里装的要处理的对象,第4个程序的线程”池“里装的是”工人“,还需要通过定义
”任务“并给把它”派工“给”工人“。我个人比较偏好后者的线程池模型,虽然类的个数多了几个,但逻辑很清晰。不管怎样,第3和第4个程序中关键的部分都
大同小异,就是2个synchronized程序块中的内容,如下(第4个程序中的):

synchronized(queue) {

queue.addLast(r);

queue.notifyAll();

}



synchronized(queue) {

while (queue.isEmpty()) {

try{

queue.wait();

}catch (InterruptedException ignored){

}

}

r = (Runnable) queue.removeFirst();

}


般拿synchronized用来定义方法或程序块,这样可以在多线程同时访问的情况下,保证在一个时刻只能有一个线程对这部分内容进行访问,避免了数据
出错。在第3个程序中通过
List entries = Collections.synchronizedList(new LinkedList())来定义”池“,在第4个
程序中直接用LinkedList queue,都差不多,只是Collections.synchronizedList()可以保证”池“的同步,其
实”池“里的内容访问都是在synchronized定义的程序块中,所以不用Collections.synchronizedList()也是可以
的。

wait()和notifyAll()是很重要的,而且这2个方法是Object基类的方法,所以任何一个类都是可以使用
的。这里说明一个可能产生混淆的问题:queue.wait()并不是说queue对象需要进行等待,而是说queue.wait()所在的线程需要进行
等待,并且释放对queue的锁,把对queue的访问权交给别的线程。如果读者对这2个方法难以理解,建议参考JDK的文档说明。

好了,通过以上4个例子的理解,读者应该能对多线程的程序设计有了一定的理解。第3和第4个程序对应线程模型是非常重要的,可以说是多线程程序设计过程中不可或缺的内容。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: