您的位置:首页 > 其它

中间件activeMQ的使用和介绍

2014-01-08 15:09 459 查看
中间件:中间件是一种独立的系统软件或服务程序,分布式应用软件借助这种软件在不同的技术之间共享资源。中间件位于客户机/ 服务器的操作系统之上,管理计算机资源和网络通讯。是连接两个独立应用程序或独立系统的软件。相连接的系统,即使它们具有不同的接口,但通过中间件相互之间仍能交换信息。执行中间件的一个关键途径是信息传递。通过中间件,应用程序可以工作于多平台或 OS 环境。

java中有好多中间件,比如IBM MQ,JMS,以及马上要讲的activeMQ等。

为什么要使用中间件以及使用中间件的好处就不说了。

到apache官网下载最新版本的activeMQ是apache-activemq-5.9.0。

解压到一个目录下,如果是本机用,基本上不用任何配置。如果配置的话,都在conf下配置即可。具体的请参考apache官方说明。

启动bin下的activemq.bat。如果想看例子,把webapps-demo下的demo复制到webapps下即可。

activemq内置jetty容器,而且在jetty.xml内又这样一段节点:

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="port" value="8161"/>
</bean>


熟悉tomcat的都应该明白这是对外访问的端口哦,所以可以通过localhost:8161访问jetty容器了。浏览器输入,即可看到页面,如果不相信是jetty的话可以再8161后随便输入一个路径,是不是返回如下:

Problem accessing /ddd. Reason:

Not Found
Powered by Jetty://


\webapps目录下有admin,顾名思义是管理jetty的啊,和tomcat的manager一样啊。别的不说了。

启动bin下的activemq.bat。

通过查看activemq.xml,这个文件内又有这样一段节点:

<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>


所以如果想访问mq,有好多种方式啊。

最常用的当然是tcp协议啦。

activemq的管理页面有Home | Queues | Topics | Subscribers | Connections | Network | Scheduled | Send

通过看文档知道消息传输有两种模型,queue和topic。

queue:

消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。但是消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

topic:

消息生产者将消息发布到topic中,同时有多个消息消费者消费该消息。和queue不同,发布到topic的消息会被所有订阅者消费。

以上各特点可以通过实际代码验证。

下面就写个代码区测试测试了。

发送:

package com.mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageProducer producer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,// admin,admin
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("testQueue");

producer = session.createProducer(destination);

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i = 0; i < 5; i++) {
TextMessage message = session.createTextMessage("测试发送" + i);
producer.send(message);
}
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}

接受:

package com.mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,// admin,admin
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("testQueue");
consumer = session.createConsumer(destination);
while (true) {
TextMessage message = (TextMessage) consumer.receive(100000);
if (null != message) {
System.out.println("收到" + message.getText());
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}


用户名和密码都是admin,如果不知道,就用ActiveMQConnection.DEFAULT_USER和ActiveMQConnection.DEFAULT_PASSWORD去替代吧。

自己运行看看吧。

下面展示topic:

发送:

package com.mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Topic destination;
MessageProducer producer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,// admin,admin
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic("testTopic");

producer = session.createProducer(destination);

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i = 0; i < 5; i++) {
TextMessage message = session.createTextMessage("测试发送" + i);
producer.send(message);
}
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}


接收者:

package com.mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Topic destination;
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,// admin,admin
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic("testTopic");
consumer = session.createConsumer(destination);
while (true) {
TextMessage message = (TextMessage) consumer.receive();
if (null != message) {
System.out.println("收到" + message.getText());
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}


区别都用红线标明了,先运行接收者(多run几次),再运行发送者。切换控制台看吧。

如果你切换控制台,你会看到。queue方式下发送的消息被某个接收者接收后,别的接收者就接收不到啦。而topic方式下,发送者的消息会被所有接收者接收。

好了,大概就介绍这么多吧。

activemq和IBM MQ的机制差不多,但是IBM MQ要比activeMq复杂的多。

有些项目,特别是银行项目用中间件比较多。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  activeMQ 中间件