您的位置:首页 > 编程语言 > Java开发

ActiveMQ服务器之间传输对象,项目A发送对象到项目B接收发送对象《一》

2017-06-15 20:07 411 查看
新手刚刚学习ActiveMQ,从网上找了很多案例,查阅请教之后总结,接收端监听有些小问题,使用线程来接收,希望可以多多评论互相学习。如有相同请联系删除。

项目A发送消息端:

// TODO Auto-generated method stub
ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS
// Provider 的连接
Connection connection = null; // Session: 一个发送或接收消息的线程
Session session; // Destination :消息的目的地;消息发送给谁.
Destination destination; // MessageProducer:消息发送者
MessageProducer producer; // TextMessage message;
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");

try { // 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
// 得到消息生成者【发送者】
producer = session.createProducer(destination);
// 设置不持久化,此处学习,实际根据项目决定
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 构造消息,此处写死,项目就是参数,或者方法获取

ActiveMQObjectMessage msg = (ActiveMQObjectMessage) session
.createObjectMessage();
msg.setObject((Serializable) user);
//将对象发送
producer.send(msg);

session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
项目B 接受对象:

因为使用监听一直触发不了,没有解决,所以使用项目运行时开启线程持续接收消息。

开启线程:

public class MyListener  implements ServletContextListener   {

private MyThread myThread;

public void contextDestroyed(ServletContextEvent e) {
if (myThread != null && myThread.isInterrupted()) {
myThread.interrupt();
}
}

public void contextInitialized(ServletContextEvent e) {
String str = null;
if (str == null && myThread == null) {
myThread = new MyThread();
myThread.start(); // servlet 上下文初始化时启动 socket
}
}

}
线程开启接收消息:
public class MyThread extends Thread {

public void run() {

while (!this.isInterrupted()) {// 线程未中断执行循环
try {
Thread.sleep(200); // 每隔2000ms执行一次
} catch (InterruptedException e) {
e.printStackTrace();
}

// ------------------ 开始执行 ---------------------------

// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory;
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
Session session;
// Destination :消息的目的地;消息发送给谁.
Destination destination;
// 消费者,消息接收者
MessageConsumer consumer;

connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");

try {
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
consumer = session.createConsumer(destination);

consumer.setMessageListener(new MessageListener() {

@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
User user;
try {
user = (User) ((ObjectMessage) message).getObject();
if (message != null) {
// User s = (User) message.getObject();
System.out.println("收到的消息对象:"
+ user.getLoginName());
user.setCreateBy(new User("1"));
user.setUpdateBy(new User("1"));
//使用getBean方式获取Bean 因注解方式扫描不到.
SystemService systemService = (SystemService)ApplicationContextHandle.getBean("systemService");
systemService.saveUser(user);

}

} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});

} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}
}
配置Web.xml 配置线程监听。项目开启时启动线程

<listener>
<listener-class>com.thinkgem.jeesite.modules.test.web.MyListener</listener-class>
</listener>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  activemq spring