您的位置:首页 > 运维架构

测试发送 ActiveMq topic消息

2015-04-08 14:20 357 查看
package com.vv.mq.topic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
*
* @author 007
* @date Mar 12, 2015 9:55:14 AM
* @version V1.0
* @Description: TODO(测试发送 ActiveMq topic消息)
*
*/
public class Sender {
static int size = 400000;
static Session session;
static MessageProducer producer;
static Topic topic;
static Connection connection;
static String str = "[{'flag':'1','uid':'22021','time':'1418162222222'}]";

public static void init_connection() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://ip:61616");
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic("vv.mq.topic");
//		topic = session.createTopic("vv.mq.im.UserLogin");
producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}

public static void sendMessage(String msg) {
MapMessage message;
//TextMessage message2;
try {
//支持多种消息 Text  Map Object Stream
message = session.createMapMessage();
//			message.setText("asda");
//			message = session.createMapMessage();
long t = System.currentTimeMillis();
message.setLong("time", t);
message.setString("text", msg);
producer.send(message);
} catch (JMSException e) {
e.printStackTrace();
}
}

public static void close() throws Exception {
connection.close();
}

public static void main(String[] arg) throws Exception {
long start = System.currentTimeMillis();
ExecutorService es = Executors.newFixedThreadPool(10);
final CountDownLatch cdl = new CountDownLatch(size);
init_connection();
for (int a = 0; a < size; a++) {
final int b = a;
es.execute(new Runnable() {
int c = b;

@Override
public void run() {
sendMessage("\t" + String.valueOf(c) + " / " + str);
cdl.countDown();
}
});
}
cdl.await();
es.shutdown();
//session.commit();
long time = System.currentTimeMillis() - start;
System.out.println("插入" + size + "条JSON,共消耗:" + (double) time / 1000 + " s");
System.out.println("平均:" + size / ((double) time / 1000) + " 条/秒");
close();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: