您的位置:首页 > 其它

activeMq笔记

2015-08-18 22:23 239 查看

安装

下载地址:http://activemq.apache.org/download.html

安装教程: http://gerrard-ok.iteye.com/blog/1766203
解压缩:

运行: ./activemq start

.Net使用

教程:http://www.cnblogs.com/madyina/p/4121458.html#3249312

下载:http://activemq.apache.org/nms/activemq-downloads.html

还有一个下载地址比较全: http://archive.apache.org/dist/activemq/apache-nms/
控制台:http://ip:8161/

用户名: admin, 密码:admin

发送连接: tcp://192.168.16.23:61616?wireFormat.maxInactivityDuration=0

接收连接:failover:(tcp://192.168.16.23:61616?wireFormat.maxInactivityDuration=0&maxInactivityDurationInitalDelay=30000&connection.AsyncSend=true)

Send方法:

public class FileUploadedMq : IActiveMq
{
private static string ConnString;
public static ConnectionFactory Factory { get; private set; }

//private static IMessageProducer MsgProducer;

public static event Action<FileUploadedModel> Recved;

static FileUploadedMq()
{
ConnString = dbo.GetDbConnString(MqTypeEnum.FileUpload.ToString());

Factory = new ConnectionFactory(ConnString);
}

public void Send(FileUploadedModel Msg)
{
try
{
using (IConnection connection = Factory.CreateConnection())
{
//通过连接创建Session会话
using (ISession session = connection.CreateSession())
{
//通过会话创建生产者,方法里面new出来的是MQ中的Queue
var MsgProducer = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("FileUploaded"));
//创建一个发送的消息对象
var TxtMsg = MsgProducer.CreateTextMessage();
TxtMsg.Properties.SetString("filter", "FileUploaded");

//给这个对象赋实际的消息
TxtMsg.Text = Msg.ToJson();
//设置消息对象的属性,这个很重要哦,是Queue的过滤条件,也是P2P消息的唯一指定属性
//生产者把消息发送出去,几个枚举参数MsgDeliveryMode是否长链,MsgPriority消息优先级别,发送最小单位,当然还有其他重载
MsgProducer.Send(TxtMsg, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
}
}
}
catch (Exception ex)
{
InfoTypeEnum.Error.LogTo(ex.Message);
}

}
}


注册Receive:

public class ActiveMq
{
public static FileUploadedMq FileUploaded = new FileUploadedMq();
public static FileResolvedMq FileResolved = new FileResolvedMq();
public static FileFastDfsSavedMq FileFastDfsSaved = new FileFastDfsSavedMq();
// public static EmailUploadMq EmailUpload=new EmailUploadMq();

public static bool RegisteFileUploaded(Action<FileUploadedModel> msgRecv, Action<FileUploadedModel, Exception> ErrorFunc = null)
{
return RegisteMq(MqTypeEnum.FileUpload, o =>
{
if (msgRecv == null) return;
msgRecv(o.FromJson<FileUploadedModel>());
}, (a, b) =>
{
if (ErrorFunc == null) return;
ErrorFunc(a.FromJson<FileUploadedModel>(), b);
});
}

public static bool RegisteFileResolved(Action<FileResolvedModel> msgRecv, Action<FileResolvedModel, Exception> ErrorFunc = null)
{
return RegisteMq(MqTypeEnum.FileResolved, o =>
{
if (msgRecv == null) return;
msgRecv(o.FromJson<FileResolvedModel>());
}, (a, b) =>
{
if (ErrorFunc == null) return;
ErrorFunc(a.FromJson<FileResolvedModel>(), b);
});
}

public static bool RegisteFileFastDfsSaved(Action<FileFastDfsSavedModel> msgRecv, Action<FileFastDfsSavedModel, Exception> ErrorFunc = null)
{
return RegisteMq(MqTypeEnum.FileFastDfsSaved, o =>
{
if (msgRecv == null) return;
msgRecv(o.FromJson<FileFastDfsSavedModel>());
}, (a, b) =>
{
if (ErrorFunc == null) return;
ErrorFunc(a.FromJson<FileFastDfsSavedModel>(), b);
});
}

private static bool RegisteMq(MqTypeEnum Name, Action<string> msgRecv, Action<string, Exception> ErrorFunc)
{
if (msgRecv == null) return false;

var QueueName = Name.ToString();
try
{
var factory = FileResolvedMq.Factory;

//通过工厂构建连接
IConnection connection = factory.CreateConnection();
//这个是连接的客户端名称标识
//connection.ClientId = Environment.MachineName + "FileResolvedMqListener";

//启动连接,监听的话要主动启动连接
connection.Start();
//通过连接创建一个会话
ISession session = connection.CreateSession();
//通过会话创建一个消费者,这里就是Queue这种会话类型的监听参数设置
IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(QueueName));//, "filter='FileResolved'");

session.DeleteDestination(QueueName);

//注册监听事件
consumer.Listener += message =>
{
var txt = (ITextMessage)message;
if (txt == null || txt.Text.HasValue() == false) { return; }

var model = txt.Text;

try
{
msgRecv.Invoke(model);
}
catch (Exception e)
{
InfoTypeEnum.Error.LogTo(QueueName + ":在处理过程出现错误!" + connection.ClientId, model.ToJson(), e.Message);

if (ErrorFunc != null)
{
try
{
ErrorFunc(model, e);
}
catch { }
}
}
};
return true;
}
catch (Exception ex)
{
InfoTypeEnum.Error.LogTo(QueueName + ":" + ex.Message);
return false;
}
}
}


问题

1. 查看消息显示: Error! Exception occurred while processing this request, check the log for more information!

原因: 安装了 JRE8 , 改到 JRE7!

参考:http://bbs.csdn.net/topics/390811825

安装老版本:http://www.java.com/zh_CN/download/faq/other_jreversions.xml
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: