您的位置:首页 > 编程语言 > C#

ActiveMQ在C#中的应用

2015-04-20 10:45 120 查看


ActiveMQ在C#中的应用

分类:
C#2010-06-04 16:4715793人阅读评论(14)收藏举报
activemqc#testingpropertiessession服务器

ActiveMQ是个好东东,不必多说。ActiveMQ提供多种语言支持,如Java, C, C++, C#, Ruby, Perl, Python, PHP等。由于我在windows下开发GUI,比较关心C++和C#,其中C#的ActiveMQ很简单,Apache提供NMS(.Net Messaging Service)支持.Net开发,只需如下几个步骤即能建立简单的实现。C++的应用相对麻烦些,稍后写文章介绍。

1、去ActiveMQ官方网站下载最新版的ActiveMQ,网址:http://activemq.apache.org/download.html。我之前下的是5.3.1,5.3.2现在也已经出来了。

2、去ActiveMQ官方网站下载最新版的Apache.NMS,网址:http://activemq.apache.org/nms/download.html,需要下载Apache.NMS和Apache.NMS.ActiveMQ两个bin包,如果对源码感兴趣,也可下载src包。这里要提醒一下,如果下载1.2.0版本的NMS.ActiveMQ,Apache.NMS.ActiveMQ.dll在实际使用中有个bug,即停止ActiveMQ应用时会抛WaitOne函数异常,查看src包中的源码发现是由于Apache.NMS.ActiveMQ-1.2.0-src/src/main/csharp/Transport/InactivityMonitor.cs中的如下代码造成的,修改一下源码重新编译即可。看了一下最新版1.3.0已经修复了这个bug,因此下载最新版即可。

[c-sharp]
view plaincopyprint?

private void StopMonitorThreads()

{
lock(monitor)
{
if(monitorStarted.CompareAndSet(true,
false))
{
AutoResetEvent shutdownEvent = new AutoResetEvent(false);

// Attempt to wait for the Timers to shutdown, but don't wait

// forever, if they don't shutdown after two seconds, just quit.

this.readCheckTimer.Dispose(shutdownEvent);

shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));
this.writeCheckTimer.Dispose(shutdownEvent);

shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));

//WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext)

this.asyncTasks.Shutdown();
this.asyncTasks =
null;
this.asyncWriteTask = null;

this.asyncErrorTask =
null;
}
}
}

[c-sharp] 
view plaincopyprint?

using System; 
using System.Collections.Generic; 
using System.Text; 
using Apache.NMS; 
using Apache.NMS.ActiveMQ; 
using System.IO; 
using System.Xml.Serialization; 
using System.Runtime.Serialization.Formatters.Binary;

namespace Publish 
{ 
class Program 
{ 
static void Main(string[] args)

{ 
try 
{ 
//Create the Connection Factory 
IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");

using (IConnection connection = factory.CreateConnection())

{ 
//Create the Session 
using (ISession session = connection.CreateSession())

{ 
//Create the Producer for the topic/queue

IMessageProducer prod = session.CreateProducer( 
new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));

//Send Messages 
int i = 0; 

while (!Console.KeyAvailable) 
{ 
ITextMessage msg = prod.CreateTextMessage(); 
msg.Text = i.ToString(); 
Console.WriteLine("Sending: " + i.ToString()); 
prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);

System.Threading.Thread.Sleep(5000); 
i++; 
} 
} 
} 

Console.ReadLine(); 
} 
catch (System.Exception e) 
{ 
Console.WriteLine("{0}",e.Message); 
Console.ReadLine(); 
} 
} 
} 
} 

using System;
using System.Collections.Generic;
using System.Text;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using System.IO;
using System.Xml.Serialization;
using System.Runtime.Serialization.Formatters.Binary;

namespace Publish
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                //Create the Connection Factory
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
                using (IConnection connection = factory.CreateConnection())
                {
                    //Create the Session
                    using (ISession session = connection.CreateSession())
                    {
                        //Create the Producer for the topic/queue
                        IMessageProducer prod = session.CreateProducer(
                            new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));
                        
                        //Send Messages
                        int i = 0;

                        while (!Console.KeyAvailable)
                        {
                            ITextMessage msg = prod.CreateTextMessage();
                            msg.Text = i.ToString();
                            Console.WriteLine("Sending: " + i.ToString());
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);

                            System.Threading.Thread.Sleep(5000);
                            i++;
                        }
                    }
                }

                Console.ReadLine();
           }
            catch (System.Exception e)
            {
                Console.WriteLine("{0}",e.Message);
                Console.ReadLine();
            }
        }
    }
}


consumer:

[c-sharp]
view plaincopyprint?

using System;
using System.Collections.Generic;
using System.Text;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using System.IO;
using System.Xml.Serialization;
using System.Runtime.Serialization.Formatters.Binary;

namespace Subscribe
{
class Program
{
static void Main(string[] args)

{
try
{
//Create the Connection factory
IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");

//Create the connection
using (IConnection connection = factory.CreateConnection())

{
connection.ClientId = "testing listener";
connection.Start();

//Create the Session
using (ISession session = connection.CreateSession())

{
//Create the Consumer
IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"),
"testing listener", null,
false);

consumer.Listener += new MessageListener(consumer_Listener);

Console.ReadLine();
}
connection.Stop();
connection.Close();
}
}
catch (System.Exception e)
{
Console.WriteLine(e.Message);
}
}

static void consumer_Listener(IMessage message)

{
try
{
ITextMessage msg = (ITextMessage)message;
Console.WriteLine("Receive: " + msg.Text);
}
catch (System.Exception e)
{
Console.WriteLine(e.Message);
}
}
}
}

[c-sharp] 
view plaincopyprint?

ITextMessage msg = prod.CreateTextMessage(); 
msg.Text = i.ToString(); 
msg.Properties.SetString("myFilter", 
"test1"); 
Console.WriteLine("Sending: " + i.ToString()); 
prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);

ITextMessage msg = prod.CreateTextMessage();
                            msg.Text = i.ToString();
                            msg.Properties.SetString("myFilter", "test1");
                            Console.WriteLine("Sending: " + i.ToString());
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);


consumer:

[c-sharp]
view plaincopyprint?

//生成consumer时通过参数设置Selector
IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"),
"myFilter='test1'");

//生成consumer时通过参数设置Selector
IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"), "myFilter='test1'");
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: