您的位置:首页 > 其它

ActiveMq 自学(二)

2016-05-18 16:55 267 查看

ActiveMq 自学(二)

上节讲了简单的字符串交互。这节讲一下在传输文件中使用activemq的实例。

先上代码:这里和上节基本相同 只是传输对象有些不同---BlobMessage
Producer.java:
public static void main(String[] args) {
JFileChooser fileChooser = new JFileChooser();
fileChooser.setDialogTitle("请选择文件!");
if(fileChooser.showOpenDialog(null)!=JFileChooser.APPROVE_OPTION){
return;
}
try {
File file = fileChooser.getSelectedFile();
System.out.println(file);
String brokerURL = "tcp://locahost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://locahost:8161/fileserver/file/";
ConnectionFactory  factory = new ActiveMQConnectionFactory(brokerURL);
System.out.println(factory);
Connection connect = factory.createConnection();
connect.start();
System.out.println(connect);
ActiveMQSession session = (ActiveMQSession) connect.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination =  session.createQueue("File_Queue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);//设置成持久化(客户端离线后在上线也可以收到文件)
//构造BlobMessage,用来传输文件
BlobMessage blobMessage = session.createBlobMessage(file);
blobMessage.setStringProperty("FILE.NAME", file.getName());
blobMessage.setLongProperty("FILE.SIZE", file.length());
System.out.println("开始发送文件:" + file.getName() + ",文件大小:"+ file.length() + " 字节");
//发送
producer.send(blobMessage);
producer.close();
session.close();
connect.close();
} catch (Exception e) {
System.err.println("Producer异常"+e);
}

这里生产者使用了mq自带的文件系统,原理是把上传的文件先保存到mq的fileserver中 然后在通过它去作中转给消费者。

Consumer.java

public static void main(String[] args) {
String brokerURL = "tcp://10.10.8.222:61616?jms.blobTransferPolicy.defaultUploadUrl=http://10.10.8.222:8161/fileserver/file/";
//获取ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
//创建Connection
Connection connection;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("File_Queue");
//创建consumer
MessageConsumer consumer = session.createConsumer(destination);
//设置监听,当有信息的时候触发
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message instanceof BlobMessage){
//是文件信息
BlobMessage blobMessage = (BlobMessage) message;
try {
String file_name = blobMessage.getStringProperty("FILE.NAME");
System.out.println("接收文件:"+file_name);
InputStream in = blobMessage.getInputStream();
InputStreamReader reader = new InputStreamReader(in,"UTF-8");
BufferedReader br = new BufferedReader(reader);
String content = null;
while ((content = br.readLine())!=null) {
System.out.println(content);
}
in.close();
((ActiveMQBlobMessage)blobMessage).deleteFile();//注意处理完后需要手工删除服务器端文件
} catch (Exception e) {
System.err.println("Consumer异常"+e);
}
}
}
});
} catch (Exception e) {
System.out.println("异常");
}
}

消费者获取文件内容并打印。

这里使用到了MQ的文件服务器,实现了异步上传文件功能。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  ActiveMQ