测试发送 ActiveMq topic消息
2015-04-08 14:20
357 查看
package com.vv.mq.topic; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; /** * * @author 007 * @date Mar 12, 2015 9:55:14 AM * @version V1.0 * @Description: TODO(测试发送 ActiveMq topic消息) * */ public class Sender { static int size = 400000; static Session session; static MessageProducer producer; static Topic topic; static Connection connection; static String str = "[{'flag':'1','uid':'22021','time':'1418162222222'}]"; public static void init_connection() throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://ip:61616"); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); topic = session.createTopic("vv.mq.topic"); // topic = session.createTopic("vv.mq.im.UserLogin"); producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } public static void sendMessage(String msg) { MapMessage message; //TextMessage message2; try { //支持多种消息 Text Map Object Stream message = session.createMapMessage(); // message.setText("asda"); // message = session.createMapMessage(); long t = System.currentTimeMillis(); message.setLong("time", t); message.setString("text", msg); producer.send(message); } catch (JMSException e) { e.printStackTrace(); } } public static void close() throws Exception { connection.close(); } public static void main(String[] arg) throws Exception { long start = System.currentTimeMillis(); ExecutorService es = Executors.newFixedThreadPool(10); final CountDownLatch cdl = new CountDownLatch(size); init_connection(); for (int a = 0; a < size; a++) { final int b = a; es.execute(new Runnable() { int c = b; @Override public void run() { sendMessage("\t" + String.valueOf(c) + " / " + str); cdl.countDown(); } }); } cdl.await(); es.shutdown(); //session.commit(); long time = System.currentTimeMillis() - start; System.out.println("插入" + size + "条JSON,共消耗:" + (double) time / 1000 + " s"); System.out.println("平均:" + size / ((double) time / 1000) + " 条/秒"); close(); } }
相关文章推荐
- spring整合activemq发送MQ消息[Topic模式]实例,activemqmq
- spring activeMQ 整合(一): 一个简单的demo,测试消息的发送与接收
- spring整合activemq发送MQ消息[Topic模式]实例
- java activeMQ消息的发送与接收
- spring+activemq 发送10W消息报端口被占用的异常分析以及topic持久化订阅
- JAVA ActiveMQ消息发送和接收
- Spring+ActiveMQ消息持久化,Topic持久化订阅
- Spring + ActiveMQ兑现jms发送消息
- ActiveMQ Topic发布订阅消息
- Spring + ActiveMQ实现jms发送消息
- Spring + ActiveMQ实现jms发送消息
- MQ系列3 使用Spring发送,消费topic和queue消息 activeMQ
- activemq+spring 持久化发送消息
- JMS学习十一(Spring+ActiveMQ消息持久化,Topic持久化订阅)
- activemq+spring 持久化发送消息
- jms+spring+activemq配置(发送和接收消息)
- spring activeMQ 整合(二): 重发机制(消息发送失败后的重新发送)
- jms+spring+activemq配置(发送和接收消息)
- Spring + ActiveMQ兑现jms发送消息
- mq 使用Spring发送,消费topic和queue消息