您的位置:首页 > 编程语言 > Java开发

使用Active MQ在.net和java系统之间通信

2015-03-03 15:24 363 查看

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现

一.特性列表

⒈ 多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

⒉ 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

⒊ 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

⒋ 通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上

⒌ 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

⒍ 支持通过JDBC和journal提供高速的消息持久化

⒎ 从设计上保证了高性能的集群,客户端-服务器,点对点

⒏ 支持Ajax

⒐ 支持与Axis的整合

⒑ 可以很容易得调用内嵌JMS provider,进行测试

二、Jms规范里的两种message传输方式Topic和Queue,两者的对比如下:

Topic

Queue


概要

Publish Subscribe messaging 发布订阅消息

Point-to-Point 点对点

有无状态

topic数据默认不落地,是无状态的。

Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。

完整性保障

并不保证publisher发布的每条数据,Subscriber都能接受到。

Queue保证每条数据都能被receiver接收。

消息是否会丢失

一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。

Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。

消息发布接收策略

一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器

一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。

三、安装

1.首先去http://activemq.apache.org/download.html 下载最新版本

2.解压后,目录如下:

+bin (windows下面的bat和unix/linux下面的sh)

+conf (activeMQ配置目录,包含最基本的activeMQ配置文件)

+data (默认是空的)

+docs (index,replease版本里面没有文档,-.-b不知道为啥不带)

+example (几个例子

+lib (activemMQ使用到的lib)

-apache-activemq-4.1-incubator.jar (ActiveMQ的binary)

-LICENSE.txt

-NOTICE.txt

-README.txt

-user-guide.html

3.使用bin\activemq.bat启动服务

四、创建.Net Producer

1:      class Program

2:      {

3:          static IConnectionFactory _factory = null;

4:          static IConnection _connection = null;

5:          static ITextMessage _message = null;

6:          static ITextMessage _message2 = null;

7:          static void Main(string[] args)

8:          {

9:              //创建工厂

10:              _factory = new ConnectionFactory("tcp://localhost:61616/");

11:              try

12:              {

13:                  //创建连接

14:                  using (_connection = _factory.CreateConnection())

15:                  {

16:                      //创建会话

17:                      using (ISession session = _connection.CreateSession(AcknowledgementMode.AutoAcknowledge))

18:                      {

19:                          //创建一个主题

20:                          IDestination destination = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");

21:   

22:                          //创建生产者

23:                          IMessageProducer producer = session.CreateProducer(destination);

24:   

25:                          int counter = 0;

26:                          Console.WriteLine("请输入你要发送数据,然后回车!");

27:

28:                          while (true)

29:                          {

30:                              string msg = Console.ReadLine();

31:                              if (msg != string.Empty)

32:                              {

33:                                  Console.BackgroundColor = ConsoleColor.Blue;

34:                                  Console.Beep();

35:                                  counter++;

36:

37:                                  //创建一个文本消息

38:                                  _message = producer.CreateTextMessage("This is .Net AcitveMQ Message....");

39:                                  //_message2 = producer.CreateTextMessage("<Root><First>Test</First><Root>");

40:

41:                                  Console.WriteLine("正在发送第{0}组发送数据...........", counter);

42:                                  //发送消息

43:                                  producer.Send(_message, MsgDeliveryMode.Persistent, MsgPriority.Normal,

44:                                      TimeSpan.MaxValue);

45:   

46:                                  Console.WriteLine("'"+_message.Text+"'已经发送!");

47:                                  //producer.Send(_message2, MsgDeliveryMode.Persistent, MsgPriority.Normal,

48:                                  //    TimeSpan.MinValue);

49:                                  //Console.WriteLine("'" + _message2.Text + "'已经发送!");

50:                                  producer.Send(msg, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.MinValue);

51:                                  Console.WriteLine("你输入的数据'" + msg + "'已经发送!");

52:                                  Console.WriteLine("**************************************************");

53:                                  Console.BackgroundColor = ConsoleColor.Black;

54:                                  Console.WriteLine("请输入你要发送数据,然后回车!");

55:                              }

56:                          }

57:                      }

58:                  }

59:              }

60:              catch (Exception ex)

61:              {

62:                  Console.WriteLine(ex.ToString());

63:              }

64:              Console.ReadLine();

65:          }

66:      }


五、创建Java Consumer

1:  import javax.jms.Connection;

2:  import javax.jms.ConnectionFactory;

3:  import javax.jms.Session;

4:  import javax.jms.TextMessage;

5:  import javax.jms.Topic;

6:  import javax.jms.TopicSubscriber;

7:   

8:  import org.apache.activemq.ActiveMQConnectionFactory;

9:   

10:  public class ActiveMQConsumer {

11:      public static void main(String[] args) {

12:          // ConnectionFactory :连接工厂,JMS 用它创建连接

13:          ConnectionFactory connectionFactory;

14:          // Connection :JMS 客户端到JMS Provider 的连接

15:          Connection connection = null;

16:          // Session: 一个发送或接收消息的线程

17:          Session session;

18:          Topic topic;

19:          // 消费者,消息接收者

20:          TopicSubscriber subscriber;

21:          connectionFactory = new ActiveMQConnectionFactory(

22:                  //ActiveMQConnection.DEFAULT_USER,

23:                  //ActiveMQConnection.DEFAULT_PASSWORD,

24:                  "tcp://localhost:61616");

25:          try {

26:              // 构造从工厂得到连接对象

27:              connection = connectionFactory.createConnection();

28:              connection.setClientID("Customer");

29:              // 启动

30:              connection.start();

31:              // 获取操作连接

32:              session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

33:              topic = session.createTopic("topic");

34:              subscriber= session.createDurableSubscriber(topic, "Customer", null, false);

35:

36:              while (true) {

37:                  //TextMessage message = (TextMessage) subscriber.receive(10000);

38:                  //TextMessage message = (TextMessage) subscriber.receiveNoWait();

39:                  TextMessage message= (TextMessage) subscriber.receive();

40:                  if (null != message) {

41:                      System.out.println("收到消息" + message.getText());

42:

43:                  } else {

44:                      break;

45:                  }

46:              }

47:          } catch (Exception e) {

48:              e.printStackTrace();

49:          } finally {

50:              try {

51:                  if (null != connection)

52:                      connection.close();

53:              } catch (Throwable ignore) {

54:              }

55:          }

56:      }

  57:  }
文章出处:http://www.cnblogs.com/youchun/archive/2013/11/30/ActiveMQ.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  activemq .net jav
相关文章推荐