您的位置:首页 > 其它

ActiveMQ 一个简单的例子

2009-05-24 00:29 561 查看
ActiveMQ

的一个简单示例




function StorePage(){d=document;t=d.selection?(d.selection.type!='None'?d.selection.createRange().text:''):(d.getSelection?d.getSelection():'');void(keyit=window.open('http://www.365key.com/storeit.aspx?t='+escape(d.title)+'&u='+escape(d.location.href)+'&c='+escape(t),'keyit','scrollbars=no,width=475,height=575,left=75,top=20,status=no,resizable=yes'));keyit.focus();}


最近由于公司项目需要,开始学习
JMS
,用的是
ActiveMQ
。由于这方面网上的例子不是很多,而且有的也不完整。于是经过几天的摸索学习,写了一个简单的小例子,现在贴出来与大家分享。

ProducerTool.java
用于发送消息:



package
homework;





import
javax.jms.Connection;



import
javax.jms.DeliveryMode;



import
javax.jms.Destination;



import
javax.jms.JMSException;



import
javax.jms.MessageProducer;



import
javax.jms.Session;



import
javax.jms.TextMessage;





import
org.apache.activemq.ActiveMQConnection;



import
org.apache.activemq.ActiveMQConnectionFactory;







public

class
ProducerTool ...
{





private
String user = ActiveMQConnection.DEFAULT_USER;





private
String password = ActiveMQConnection.DEFAULT_PASSWORD;





private
String url = ActiveMQConnection.DEFAULT_BROKER_URL;





private
String subject = "TOOL.DEFAULT";





private
Destination destination =
null
;





private
Connection connection =
null
;





private
Session session =
null
;





private
MessageProducer producer =
null
;





//
初始化





private

void
initialize()
throws
JMSException, Exception ...
{



ActiveMQConnectionFactory connectionFactory =
new
ActiveMQConnectionFactory(



user, password, url);



connection = connectionFactory.createConnection();



session = connection.createSession(
false
, Session.AUTO_ACKNOWLEDGE);



destination = session.createQueue(subject);



producer = session.createProducer(destination);



producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);



}





//
发送消息





public

void
produceMessage(String message)
throws
JMSException, Exception ...
{



initialize();



TextMessage msg = session.createTextMessage(message);



connection.start();



System.out.println("Producer:->Sending message: " + message);



producer.send(msg);



System.out.println("Producer:->Message sent complete!");



}





//
关闭连接





public

void
close()
throws
JMSException ...
{



System.out.println("Producer:->Closing connection");



if
(producer !=
null
)



producer.close();



if
(session !=
null
)



session.close();



if
(connection !=
null
)



connection.close();



}



}





ConsumerTool.java
用于接受消息,我用的是基于消息监听的机制,需要实现
MessageListener
接口,这个接口有个
onMessage
方法,当接受到消息的时候会自动调用这个函数对消息进行处理。



package
homework;





import
javax.jms.Connection;



import
javax.jms.Destination;



import
javax.jms.JMSException;



import
javax.jms.MessageConsumer;



import
javax.jms.Session;



import
javax.jms.MessageListener;



import
javax.jms.Message;



import
javax.jms.TextMessage;





import
org.apache.activemq.ActiveMQConnection;



import
org.apache.activemq.ActiveMQConnectionFactory;







public

class
ConsumerTool
implements
MessageListener ...
{





private
String user = ActiveMQConnection.DEFAULT_USER;





private
String password = ActiveMQConnection.DEFAULT_PASSWORD;





private
String url = ActiveMQConnection.DEFAULT_BROKER_URL;





private
String subject = "TOOL.DEFAULT";





private
Destination destination =
null
;





private
Connection connection =
null
;





private
Session session =
null
;





private
MessageConsumer consumer =
null
;





//
初始化





private

void
initialize()
throws
JMSException, Exception ...
{



ActiveMQConnectionFactory connectionFactory =
new
ActiveMQConnectionFactory(



user, password, url);



connection = connectionFactory.createConnection();



session = connection.createSession(
false
, Session.AUTO_ACKNOWLEDGE);



destination = session.createQueue(subject);



consumer = session.createConsumer(destination);





}





//
消费消息





public

void
consumeMessage()
throws
JMSException, Exception ...
{



initialize();



connection.start();





System.out.println("Consumer:->Begin listening...");



//
开始监听



consumer.setMessageListener(
this
);



// Message message = consumer.receive();



}





//
关闭连接





public

void
close()
throws
JMSException ...
{



System.out.println("Consumer:->Closing connection");



if
(consumer !=
null
)



consumer.close();



if
(session !=
null
)



session.close();



if
(connection !=
null
)



connection.close();



}





//
消息处理函数





public

void
onMessage(Message message) ...
{





try
...
{





if
(message
instanceof
TextMessage) ...
{



TextMessage txtMsg = (TextMessage) message;



String msg = txtMsg.getText();



System.out.println("Consumer:->Received: " + msg);





}
else
...
{



System.out.println("Consumer:->Received: " + message);



}





}
catch
(JMSException e) ...
{



// TODO Auto-generated catch block



e.printStackTrace();



}



}



}





如果想主动的去接受消息,而不用消息监听的话,把
consumer.setMessageListener(this)
改为
Message message = consumer.receive()
,手动去调用
MessageConsumer

receive
方法即可。

下面是测试类
Test.java




package
homework;





import
javax.jms.JMSException;







public

class
Test ...
{







/** */

/**



*
@param
args



*/





public

static

void
main(String[] args)
throws
JMSException, Exception ...
{



// TODO Auto-generated method stub



ConsumerTool consumer =
new
ConsumerTool();



ProducerTool producer =
new
ProducerTool();



//
开始监听



consumer.consumeMessage();





//
延时
500
毫秒之后发送消息



Thread.sleep(500);



producer.produceMessage("Hello, world!");



producer.close();





//
延时
500
毫秒之后停止接受消息



Thread.sleep(500);



consumer.close();



}



}



以上就是我学习
ActiveMQ
之后写的一个简单的例子,希望对你有帮助,如果有什么错误还请指正。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: