ActiveMq 自学(二)
2016-05-18 16:55
267 查看
ActiveMq 自学(二)
上节讲了简单的字符串交互。这节讲一下在传输文件中使用activemq的实例。先上代码:这里和上节基本相同 只是传输对象有些不同---BlobMessage
Producer.java:
public static void main(String[] args) { JFileChooser fileChooser = new JFileChooser(); fileChooser.setDialogTitle("请选择文件!"); if(fileChooser.showOpenDialog(null)!=JFileChooser.APPROVE_OPTION){ return; } try { File file = fileChooser.getSelectedFile(); System.out.println(file); String brokerURL = "tcp://locahost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://locahost:8161/fileserver/file/"; ConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL); System.out.println(factory); Connection connect = factory.createConnection(); connect.start(); System.out.println(connect); ActiveMQSession session = (ActiveMQSession) connect.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("File_Queue"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT);//设置成持久化(客户端离线后在上线也可以收到文件) //构造BlobMessage,用来传输文件 BlobMessage blobMessage = session.createBlobMessage(file); blobMessage.setStringProperty("FILE.NAME", file.getName()); blobMessage.setLongProperty("FILE.SIZE", file.length()); System.out.println("开始发送文件:" + file.getName() + ",文件大小:"+ file.length() + " 字节"); //发送 producer.send(blobMessage); producer.close(); session.close(); connect.close(); } catch (Exception e) { System.err.println("Producer异常"+e); }
这里生产者使用了mq自带的文件系统,原理是把上传的文件先保存到mq的fileserver中 然后在通过它去作中转给消费者。
Consumer.java
public static void main(String[] args) { String brokerURL = "tcp://10.10.8.222:61616?jms.blobTransferPolicy.defaultUploadUrl=http://10.10.8.222:8161/fileserver/file/"; //获取ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); //创建Connection Connection connection; try { connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("File_Queue"); //创建consumer MessageConsumer consumer = session.createConsumer(destination); //设置监听,当有信息的时候触发 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(message instanceof BlobMessage){ //是文件信息 BlobMessage blobMessage = (BlobMessage) message; try { String file_name = blobMessage.getStringProperty("FILE.NAME"); System.out.println("接收文件:"+file_name); InputStream in = blobMessage.getInputStream(); InputStreamReader reader = new InputStreamReader(in,"UTF-8"); BufferedReader br = new BufferedReader(reader); String content = null; while ((content = br.readLine())!=null) { System.out.println(content); } in.close(); ((ActiveMQBlobMessage)blobMessage).deleteFile();//注意处理完后需要手工删除服务器端文件 } catch (Exception e) { System.err.println("Consumer异常"+e); } } } }); } catch (Exception e) { System.out.println("异常"); } }
消费者获取文件内容并打印。
这里使用到了MQ的文件服务器,实现了异步上传文件功能。
相关文章推荐
- 解析ActiveMQ的使用说明总结
- ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现
- activemq报EOFExceptionjvm错误
- ActiveMQ 消息服务(一)
- ActiveMQ 消息服务(二)
- ActiveMQ 消息服务(三)
- Spring+Log4j+ActiveMQ实现远程记录日志——实战+分析
- 基于zookeeper+leveldb搭建activemq集群
- ActiveMQ 实例
- 一台机器上运行多个ActiveMq
- activemq安全设置 设置admin的用户名和密码
- Ubuntu 14.04.1 安装 activemq 5.11.1
- 在Spring中使用ActiveMQ发送邮件
- 多个地市连接MQ,如果较长时间没有消息发送,ActiveMQ的消费端会自动断开连接(topic端)
- 通过Java操作ActiveMQ的代码记录
- 通过spring开发ActiveMQ简单应用
- 【ActiveMQ教程】简介
- 【ActiveMQ教程】点对点(Point-to-Point)消息教程
- 【ActiveMQ教程】发布/订阅(Publish/Subscribe)消息教程
- JMS和消息驱动Bean(MDB)