您的位置:首页 > 编程语言 > Java开发

数据处理---Java数据处理之消息队列

2015-10-09 13:43 465 查看
前面说了RMI,这是一个同步分布式调用的必备;但是为了实现异步的分布式处理,不得不说到的就是消息队列了。对任何架构或应用来说,消息队列都是一个至关重要的组件,它具有多方面的优点:

1. 解耦性

消息队列在处理过程提供了中间插入了一个隐含的、基于数据的接口层。这样就可以把不同的系统边界隔离开来,每个系统做专门的事情,提供独立的服务,需要调用的时候传输给第三方。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

2. 可扩展性

因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;不需要改变代码、不需要调节参数即可完成。不同的应用可能处理的能力不一样,我们隔离开来,可以方便优化系统并行规模,这样获得较强的灵活性和应对峰值处理能力

3. 可恢复性

当系统的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。

4.异步处理

消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。消息系列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。

消息处理的过程看似很简单: 生产消息----------接收消息-----------消费消息 (典型的生产者消费者模式,中间的接收消息中心,就是消息中间件)

Java提供了JMS(java message service)来处理消息队列。

JMS对象模型包含如下几个要素:

1)连接工厂(ConnectionFactory)是由管理员创建,并绑定到JNDI树中。客户端使用JNDI查找连接工厂,然后利用连接工厂创建一个JMS连接。

2)JMS连接(Connection)表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。

3)JMS会话(Session)表示JMS客户与JMS服务器之间的会话状态。JMS会话建立在JMS连接上,表示客户与服务器之间的一个会话线程。

4)JMS目的(Destination),又称为消息队列,是实际的消息源。

5)JMS生产者和消费者。生产者(Message Producer)和消费者(Message Consumer)对象由Session对象创建,用于发送和接收消息。

6)JMS消息通常有两种类型:

① 点对点(Point-to-Point)。在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列(javax.jms.Queue)相关联。

② 发布/订阅(Publish/Subscribe)。发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题(javax.jms.Topic)关联。

JMS定义了五种不同的消息正文格式,以及调用的消息类型,

· StreamMessage-Java原始值的数据流

· MapMessage-一套名称-值对

· TextMessage-一个字符串对象

· ObjectMessage-一个序列化的 Java对象

· BytesMessag--一个未解释字节的数据流

具体代码就不多说了,这里举个ActiveMQ(又是Aapache家的)的例子。

1.下载apache-activemq-5.12.0,我用的是这个版本的,解压,其内容:



2.启动Broker. D:\app\apache-activemq-5.12.0\bin\actibemq start



3.浏览器查看 http://localhost:8161
可以对queue进行管理,查看状态(默认admin/admin);

可以试试里面的几个例子,如websocket聊天





4.通过代码生成/消费消息:

生成者:

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.Message;

import javax.jms.MessageProducer;

import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {

public static void main(String[] args) throws Exception {

String jmsProviderAddress = "tcp://localhost:61616";// 地址

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(

jmsProviderAddress);// 连接器

Connection connection = connectionFactory.createConnection();// 创建连接

Session session = connection.createSession(false,

Session.AUTO_ACKNOWLEDGE);// 打开会话

Destination dest = session.createQueue("demoQueue");// 消息目的地

MessageProducer producer = session.createProducer(dest);// 消息发送者

Message message = null;

for (int i = 1; i < 10000; i++) {

//Thread.sleep(1000);

message = session.createTextMessage("hello world "+i);// 消息

producer.send(message);// 发送

}

producer.close();// 关闭

session.close();

connection.close();

}

}

消费者:

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {

public static void main(String[] args) throws JMSException {

String jmsProviderAddress = "tcp://localhost:61616";// 地址

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(

jmsProviderAddress);// 连接器

Connection connection = connectionFactory.createConnection();// 创建连接

Session session = connection.createSession(false,

Session.AUTO_ACKNOWLEDGE);// 打开会话

String destinationName = "demoQueue";

Destination dest = session.createQueue(destinationName);// 消息目的地

MessageConsumer consumer = session.createConsumer(dest);

connection.start();

consumer.setMessageListener(new MessageListener(){

int cnt = 0;

public void onMessage(Message arg0) {

TextMessage textMessage = (TextMessage) arg0;

try {

String text = textMessage.getText();

cnt++;

if(cnt%100==0){

System.out.println(System.currentTimeMillis()+"=" + text);

}

} catch (JMSException e) {

e.printStackTrace();

}

}});

try {

Thread.sleep(1000000);

} catch (InterruptedException e) {

e.printStackTrace();

}

consumer.close();

session.close();

connection.close();

}

}

可以通过浏览器查看queue里面的数据有没有变化。

其他的MQ产品不少,包括JBoss的 HornetQ,BEA WebLogic Server JMS,IBM的WebSphere MQ,webMethods(这个webmethods很有意思,有机会我再吐槽下,web你知道method你知道,所以webmethod你也知道???!!!)的JMS+ -,应该说最多的是TIBCO Software的EMS(TIBCO)。

MQ家族不小,还有很多,支持不同语言的,例如:

MSMQ 当然不用说了,是微软的;

RabbitMQ 是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现;

ZeroMQ 引用官方的说法: “ZeroMQ是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: