您的位置:首页 > 编程语言 > Java开发

【java-分布式系统】消息中间件学习-ActiveMQ入门实测

2017-05-27 15:17 465 查看
下载:

http://www.apache.org/dyn/closer.cgi?filename=/activemq/5.14.5/apache-activemq-5.14.5-bin.zip&action=download

1.创建ActiveMQ队列(Queue)

运行apache-activemq-5.14.5\bin\activemq.bat

浏览器打开http://localhost:8161/admin/

创建一个队列HalloQueue.



详细步骤(摘录自官方文档)

activemq.bat

From a console window, change to the installation directory and run ActiveMQ:

cd [activemq_install_dir]


where activemq_install_dir is the directory in which ActiveMQ was installed, e.g., c:\Program Files\ActiveMQ-5.x.

Then type:

bin\activemq start


Access the web console

In a web browser you can access the url http://localhost:8161/admin/ to access the ActiveMQ web console.

The broker may ask for credentials to login the web console the first time. The default username and password is admin/admin(默认账号密码). You can configure this in the conf/jetty-real.properties file.

2.用Manven编译并运行官方example(Topic)

转到目录apache-activemq-5.14.5\examples\amqp\java

运行cmd命令 mvn insatall,目录中会出现一个target文件夹

分别在两个cmd运行命令:

java -cp target/amqp-example-0.1-SNAPSHOT.jar example.Listener
D:\java类库\消息中间件\apache-activemq-5.14.5\examples\amqp\java>java -cp target/amqp-example-0.1-SNAPSHOT.jar example.Listener
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Waiting for messages...


java -cp target/amqp-example-0.1-SNAPSHOT.jar example.Publisher
D:\java类库\消息中间件\apache-activemq-5.14.5\examples\amqp\java>java -cp target/amqp-example-0.1-SNAPSHOT.jar example.Publisher
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Sent 1000 messages
Sent 2000 messages
Sent 3000 messages
Sent 4000 messages
Sent 5000 messages
Sent 6000 messages
Sent 7000 messages
Sent 8000 messages
Sent 9000 messages
Sent 10000 messages


D:\java类库\消息中间件\apache-activemq-5.14.5\examples\amqp\java>java -cp target/amqp-example-0.1-SNAPSHOT.jar example.Listener
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Waiting for messages...Received 1000 messages.
Received 2000 messages.
Received 3000 messages.
Received 4000 messages.
Received 5000 messages.
Received 6000 messages.
Received 7000 messages.
Received 8000 messages.
Received 9000 messages.
Received 10000 messages.
Received 10001 in 8.23 seconds


demo源码:

package example;

import org.apache.qpid.jms.*;
import javax.jms.*;

class Listener {

public static void main(String[] args) throws JMSException {

final String TOPIC_PREFIX = "topic://";

String user = env("ACTIVEMQ_USER", "admin");
String password = env("ACTIVEMQ_PASSWORD", "password");
String host = env("ACTIVEMQ_HOST", "localhost");
int port = Integer.parseInt(env("ACTIVEMQ_PORT", "5672"));

String connectionURI = "amqp://" + host + ":" + port;
String destinationName = arg(args, 0, "topic://event");

JmsConnectionFactory factory = new JmsConnectionFactory(connectionURI);

Connection connection = factory.createConnection(user, password);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination destination = null;
if (destinationName.startsWith(TOPIC_PREFIX)) {
destination = session.createTopic(destinationName.substring(TOPIC_PREFIX.length()));
} else {
destination = session.createQueue(destinationName);
}

MessageConsumer consumer = session.createConsumer(destination);
long start = System.currentTimeMillis();
long count = 1;
System.out.println("Waiting for messages...");
while (true) {
Message msg = consumer.receive();
if (msg instanceof TextMessage) {
String body = ((TextMessage) msg).getText();
if ("SHUTDOWN".equals(body)) {
long diff = System.currentTimeMillis() - start;
System.out.println(String.format("Received %d in %.2f seconds", count, (1.0 * diff / 1000.0)));
connection.close();
try {
Thread.sleep(10);
} catch (Exception e) {}
System.exit(1);
} else {
try {
if (count != msg.getIntProperty("id")) {
System.out.println("mismatch: " + count + "!=" + msg.getIntProperty("id"));
}
} catch (NumberFormatException ignore) {
}

if (count == 1) {
start = System.currentTimeMillis();
} else if (count % 1000 == 0) {
System.out.println(String.format("Received %d messages.", count));
}
count++;
}

} else {
System.out.println("Unexpected message type: " + msg.getClass());
}
}
}

private static String env(String key, String defaultValue) {
String rc = System.getenv(key);
if (rc == null)
return defaultValue;
return rc;
}

private static String arg(String[] args, int index, String defaultValue) {
if (index < args.length)
return args[index];
else
return defaultValue;
}
}


package example;

import org.apache.qpid.jms.*;
import javax.jms.*;

class Publisher {

public static void main(String[] args) throws Exception {

final String TOPIC_PREFIX = "topic://";

String user = env("ACTIVEMQ_USER", "admin");
String password = env("ACTIVEMQ_PASSWORD", "password");
String host = env("ACTIVEMQ_HOST", "localhost");
int port = Integer.parseInt(env("ACTIVEMQ_PORT", "5672"));

String connectionURI = "amqp://" + host + ":" + port;
String destinationName = arg(args, 0, "topic://event");

int messages = 10000;
int size = 256;

String DATA = "abcdefghijklmnopqrstuvwxyz";
String body = "";
for (int i = 0; i < size; i++) {
body += DATA.charAt(i % DATA.length());
}

JmsConnectionFactory factory = new JmsConnectionFactory(connectionURI);

Connection connection = factory.createConnection(user, password);
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination destination = null;
if (destinationName.startsWith(TOPIC_PREFIX)) {
destination = session.createTopic(destinationName.substring(TOPIC_PREFIX.length()));
} else {
destination = session.createQueue(destinationName);
}

MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

for (int i = 1; i <= messages; i++) {
TextMessage msg = session.createTextMessage("#:" + i);
msg.setIntProperty("id", i);
producer.send(msg);
if ((i % 1000) == 0) {
System.out.println(String.format("Sent %d messages", i));
}
}

producer.send(session.createTextMessage("SHUTDOWN"));
Thread.sleep(1000 * 3);
connection.close();
System.exit(0);
}

private static String env(String key, String defaultValue) {
String rc = System.getenv(key);
if (rc == null)
return defaultValue;
return rc;
}

private static String arg(String[] args, int index, String defaultValue) {
if (index < args.length)
return args[index];
else
return defaultValue;
}

}


详细步骤

Overview

This is an example of how use the Java JMS api with ActiveMQ via the AMQP protocol.

Prereqs

Install Java SDK

Install Maven

Building

Run:

mvn install


Running the Examples

In one terminal window run:

java -cp target/amqp-example-0.1-SNAPSHOT.jar example.Listener


In another terminal window run:

java -cp target/amqp-example-0.1-SNAPSHOT.jar example.Publisher


You can control to which AMQP server the examples try to connect to by

setting the following environment variables:

ACTIVEMQ_HOST


ACTIVEMQ_PORT


ACTIVEMQ_USER


ACTIVEMQ_PASSWORD
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息