消息队列ActiveMQ
2015-02-02 12:14
183 查看
参考文档:
官网(http://activemq.apache.org/)
开源中国社区(http://www.oschina.net/p/activemq)
专门介绍原理(http://blog.csdn.net/kongjing0815/article/details/8177459)
基础知识
1. 基本信息:
1. 下载activeMQ,(http://activemq.apache.org/activemq-570-release.html)
2. 这里以apache-activemq-5.7.0-bin.tar.gz为例。
3. 下载后,将其拷贝到/home/ll/commlib/下, 执行tar -zxvf apache-activemq-5.7.0-bin.tar.gz
4. cd apache-activemq-5.7.0,可以看见有个README.txt文件。读读有好处。
5. cd ./bin,可以看见一些可执行的脚本。运行./activemq start,启动activemq.
6. activemq默认端口为61616,用netstat -ano | grep 61616,看看该端口是否处于监听状态。
7. ./activemq stop,关闭.
8. 在apache-activemq-5.7.0/conf/目录下,有对MQ进行配置的文件。
9. 监控: http://10.15.144.162:8161/admin/topics.jsp
2. 性能
官方提供数据:2.1W-2.2W/second,64bit双核CPU,SUSE系统。
3. 基础知识
(1)点对点消息(queue)
每个消息只能有一个消费者(无论消费者在消息产生的时候是否处于运行状态,都可以接收消息)。
(2)sub/pub消息(topic)
每个消息,可以有多个消费者。消费者只能得到订阅后,生产者发布的消息。
JMS 规范允许客户创建持久订阅,持久订阅允许消费者消费它在未处于激活状态时发送的消息。
(3)消息的生产者(producer)
由会话创建的一个对象,将消息发送到目的地。
消息的消费者(consumer)
(1)同步消费(receive阻塞式的接收);(2)异步消费(设置回调函数)
(4)传输方式
VM transport: 在VM内部通信,避免了网络传输的开销。不是采用socket通信,而是采用直接调用的方法。
TCP transport: 客户端通过TCP的方式连接到broker。
Failover transport: 是一种重新建立连接的机制,可靠的传输。
<mq url = "failover:(tcp://10.15.144.71:61616,tcp://10.15.144.72:61616)?randomize=false"/>
failover transport,允许重连的机制,建立可靠的传输;
master-slave:消息被复制到slave-broker。当master-broker遇到故障时,slave-broker不丢失任何消息。
设置randomize="false",可以让客户总是尝试连接master broker(slave broker不会接收任何连接,直到它成为master broker).
Discovery transport: 可靠的transport.
(5)持久化存储(persistence) http://activemq.apache.org/persistence.html
AMQ Message
Store: messages都是被保存在data logs中的,同时被reference store进行索引以提高存取速度。当某个data logs中的消息都被消费了,该data log文件就会被标记,以便在下一轮清理中被归档或删除。
Kaha Persistence: 在
Kaha 中,数据被追加到 data logs 中。当不再需要log文件中的数据的时候,log 文件会被丢弃。
JDBC Persistence: 将消息保存在数据库中。
(6)集群模式(clustering)
master-slave,主从模式。消息从Master Broker复制到Slave Broker。master-broker不需要特殊配置,但slave-broker需要特殊配置
4. 一些特性
(1)当多个consumer同时从Queue中取消息时,要注意消息的顺序。Message Groups特性保证了具有相同GroupID的消息被发送给同一个consumer.
(2)JMS Selectors,可以按照关键字对消息进行过滤,某个consumer只接收关心的消息。
(3)Pending Message Limit Strategy。每个consumer都会设置一定的缓存来接收消息,当consumer的消息满时,broker不会再向consumer分发消息。但这时broker的消息就会累积,会将消息存磁盘(会带来性能问题)。consumer的一种策略是新的消息来时,将旧的消息丢掉,这样就broker就不会积累消息了。
(4)AMQ支持同步发送(sync)和异步发送(async)。async的性能更高,但可能会少量丢失消息。
(5)Strict Order Dispatch Policy,可以保证多个consumer按照同样的顺序来接收多个消息。
(6)AMQ可以采用批量确认(Optimized Acknowledgement)的方式提高效率。
(7)使用Producer Flow
Control来控制producer消息的发送速度。可以限制producer在收到broker确认前,能够发送的最大字节数。
CMS (stands for C++ Messaging Service)
activeMQ作为broker,可以支持多种语言的连接(如java, c++, c等)。其中,C++的地址为(http://activemq.apache.org/cms/);开发接口API文档地址(http://activemq.apache.org/cms/api_docs/activemqcpp-3.6.0/html/index.html)。假设使用的版本为activemq-cpp-library-3.5.0.
activemq-cpp-library-3.5.0的编译是比较麻烦的,依赖于APR, APR-Util, CPPUnit, OpenSSL等,可以参考README.txt文件。
开发模型图
4. 包装后的接口代码
点击(此处)折叠或打开
mq_api.h
#pragma once
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <string>
#include <vector>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
using namespace activemq;
using namespace activemq::core;
using namespace activemq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
typedef void (*MQCbFunc)(string dest_name, unsigned
char *buffer,
size_t buflen);
class ActiveMQAPI: public ExceptionListener, public MessageListener, public DefaultTransportListener
{
public:
ActiveMQAPI(const string& brokerURI, MQCbFunc
pCallBack, const string& clientID = "") :
connection(NULL), session(NULL), pCallBack(pCallBack), brokerURI(brokerURI), clientID(clientID)
{
}
virtual ~ActiveMQAPI() throw ();
void close();
int Init();
/**
*@param dest: topic/queue
name
*@param name: if PersistentMode=true, name is the
durable sub name
*/
int Subscribe(const string& dest, bool
useTopic = true, const string& name =
"", bool PersistentMode = false);
/**
@priority: 0-9,
@timeToLive:The time to live value for this
message in milliseconds. 0, never expire;
*/
int PubMsg(const string& dest, const string& msg, bool
useTopic = true,
bool PersistentMode = false, int priority = 4,
long long timeToLive = 0);
int UnSubscribe(const string& dest, const string& name = "");
virtual void onMessage(const cms::Message* message) throw ();
virtual void onException(const CMSException& ex AMQCPP_UNUSED);
virtual void transportInterrupted();
virtual void transportResumed();
int GetDestCount()
{
return (int) dest_names.size();
}
private:
void cleanup();
private:
Connection* connection;
Session* session;
vector<string> dest_names;
map<string, MessageConsumer*> consumermap; // 消费者容器
map<string, MessageProducer*> producermap; // 生产者容器
map<Destination*, string> destmap;
MQCbFunc pCallBack;
const string brokerURI;
const string clientID;
};
点击(此处)折叠或打开
mq_api.cpp
#include "mq_api.h"
void ActiveMQAPI::close() {
cleanup();
}
ActiveMQAPI::~ActiveMQAPI() throw () {
cleanup();
}
// 初始化
int ActiveMQAPI::Init()
{
try
{
ActiveMQConnectionFactory* connectionFactory =
new ActiveMQConnectionFactory(brokerURI);
//create connection
connection = connectionFactory->createConnection();
if (clientID.size() > 0)
connection->setClientID(clientID);
delete connectionFactory;
connectionFactory = NULL;
ActiveMQConnection* amqConnection =
dynamic_cast<ActiveMQConnection*> (connection);
if (amqConnection != NULL)
{
amqConnection->addTransportListener(this);
}
connection->start();
connection->setExceptionListener(this);
//create session
session = connection->createSession(Session::AUTO_ACKNOWLEDGE); // 自动应答
} catch (CMSException& e)
{
e.printStackTrace();
return -1;
}
return 0;
}
// 去broker订阅.
// @dest为 topic或queue的名字
// @useTopic = true,订阅的为topic;否则为queue.
int ActiveMQAPI::Subscribe(const string& dest, bool
useTopic,
const string& name, bool
PersistentMode)
{
if (session == NULL)
return -1;
Destination* destination;
MessageConsumer *consumer = NULL;
map<string, MessageConsumer*>::iterator
it;
it = consumermap.find(dest);
if (it != consumermap.end())
return 0;
else
{
if (useTopic)
destination = session->createTopic(dest);
else
destination = session->createQueue(dest);
dest_names.push_back(dest);
destmap.insert(pair<Destination*, string> (destination, dest));
if (useTopic && PersistentMode)
{
if (name.size() == 0)
return -1;
// 持久订阅
consumer = session->createDurableConsumer(
(cms::Topic*) destination, name, "");
}
else
{
consumer = session->createConsumer(destination);
}
consumermap.insert(pair<string, MessageConsumer*> (dest, consumer));
consumer->setMessageListener(this);
}
return 0;
}
// 发布
int ActiveMQAPI::PubMsg(const string& dest, const string& msg, bool
useTopic,
bool PersistentMode, int priority, long long timeToLive)
{
if (msg.size() == 0 || dest.size() == 0)
return -1;
if (session == NULL)
return -1;
Destination* destination = NULL;
MessageProducer* producer = NULL;
map<string, MessageProducer*>::iterator
it;
it = producermap.find(dest);
if (it != producermap.end())
{
producer = it->second;
}
else
{
if (useTopic)
destination = session->createTopic(dest);
else
destination = session->createQueue(dest);
producer = session->createProducer(destination);
if (PersistentMode)
producer->setDeliveryMode(DeliveryMode::PERSISTENT);
else
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
producermap.insert(pair<string, MessageProducer*> (dest, producer));
}
BytesMessage* message = session->createBytesMessage((unsigned
char*) msg.c_str(), msg.size());
// producer->send(message);
producer->send(message, (PersistentMode == true) ? DeliveryMode::PERSISTENT
: DeliveryMode::NON_PERSISTENT, priority, timeToLive);
delete message;
message = NULL;
return 0;
}
// 取消订阅
int ActiveMQAPI::UnSubscribe(const string& dest, const string& name) {
map<string, MessageConsumer*>::iterator
it_c = consumermap.find(dest);
MessageConsumer* consumer = NULL;
if (it_c != consumermap.end())
{
consumer = it_c->second;
consumermap.erase(it_c);
delete consumer;
consumer = NULL;
}
map<Destination*, string>::iterator
it;
Destination* destination = NULL;
for (it = destmap.begin(); it != destmap.end();)
{
Destination* k = it->first;
const string &v = it->second;
it++;
if (dest.compare(v) == 0)
{
destination = k;
destmap.erase(k);
try
{
if (NULL == destination)
{
printf(
"ActiveMQAPI(UnSubscribe)[%s]: destination[%p] is NULL",
dest.c_str(), destination);
return -1;
}
ActiveMQConnection* amqConnection =
dynamic_cast<ActiveMQConnection*> (connection);
amqConnection->destroyDestination(destination);
} catch (std::exception& e)
{
printf("ActiveMQAPI(UnSubscribe)[%s]: exception[%s]",
dest.c_str(), e.what());
return -1;
}
delete destination;
destination = NULL;
break;
}
}
if (name.size() > 0)
{
if (session == NULL)
return -1;
try
{
session->unsubscribe(name);
} catch (CMSException& e)
{
e.printStackTrace();
return -1;
}
}
return 0;
}
// 消息来时的回调函数
void ActiveMQAPI::onMessage(const Message* message) throw ()
{
try
{
const BytesMessage* msg = dynamic_cast<const BytesMessage*> (message);
unsigned char *buffer = NULL;
size_t buflen = 0;
string destname;
if (msg != NULL)
{
buffer = msg->getBodyBytes();
buflen = msg->getBodyLength();
map<Destination*, string>::iterator
it;
for (it = destmap.begin(); it != destmap.end(); it++)
{
if (msg->getCMSDestination()->equals(*(it->first)))
{
destname = it->second;
}
}
}
else
{
printf("the msg is NULL");
return;
}
pCallBack(destname, buffer, buflen);
if (NULL != buffer)
{
delete[] buffer;
buffer = NULL;
}
}
catch (CMSException& e)
{
e.printStackTrace();
}
}
void ActiveMQAPI::onException(const CMSException& ex
AMQCPP_UNUSED)
{
printf("ActiveMQAPI: CMS Exception occurred[%s]. Shutting down client.",
ex.what());
// exit(1);
}
void ActiveMQAPI::transportInterrupted()
{
printf("ActiveMQAPI: The Connection's Transport has been Interrupted.");
}
void ActiveMQAPI::transportResumed()
{
printf("ActiveMQAPI: The Connection's Transport has been Restored.");
}
void ActiveMQAPI::cleanup()
{
map<Destination*, string>::iterator
it;
for (it = destmap.begin(); it != destmap.end();)
{
Destination* k = it->first;
it++;
destmap.erase(k);
delete k;
k = NULL;
}
map<string, MessageConsumer*>::iterator
it_consumer;
for (it_consumer = consumermap.begin(); it_consumer != consumermap.end();)
{
const string &k = it_consumer->first;
const MessageConsumer* v = it_consumer->second;
it_consumer++;
if (v != NULL)
{
delete v;
v = NULL;
consumermap.erase(k);
}
}
if (session != NULL)
{
session->close();
delete session;
session = NULL;
}
if (connection != NULL)
{
connection->close();
delete connection;
connection = NULL;
}
}
5. 测试代码
点击(此处)折叠或打开
订阅topic
void MQCallBack(string dest_name, unsigned
char *buffer,
size_t buflen)
{
printf("topic[%s] buffer[%s] \n", dest_name.c_str(), string((char*)buffer, buflen).c_str());
}
int main()
{
activemq::library::ActiveMQCPP::initializeLibrary();
//
ActiveMQAPI* mq = new ActiveMQAPI("failover:(tcp://XX:61616,tcp://XX:61616)?randomize=false",MQCallBack);
if(NULL == mq)
{
printf("new activeMQ error.");
return 0;
}
if(0 != mq->Init())
{
printf("init activeMQ failed.");
delete mq;
mq = NULL;
return 0;
}
mq->Subscribe("bond.mq", true);
while(true)
{
sleep(5);
}
mq->UnSubscribe("bond.mq");
delete mq;
mq = NULL;
activemq::library::ActiveMQCPP::shutdownLibrary();
return 0;
}
订阅后,可以通过 MQ提供的监控平台来查看是否订阅成功。在(http://XX:8161/admin/topics.jsp)主页的
“Connections”下面,通过IP找到运行订阅程序的主机,然后再点击进去,看是否有这个topic。
点击(此处)折叠或打开 发布topic
int main()
{
activemq::library::ActiveMQCPP::initializeLibrary();
//
ActiveMQAPI* mq = new ActiveMQAPI("failover:(tcp://XX:61616,tcp://XX:61616)?randomize=false",MQCallBack);
if(NULL == mq)
{
printf("new activeMQ error.");
return 0;
}
if(0 != mq->Init())
{
printf("init activeMQ failed.");
delete mq;
mq = NULL;
return 0;
}
int iRet = mq->PubMsg("bond.mq", "the
message to send to the consumers.", true, true);
delete mq;
mq = NULL;
activemq::library::ActiveMQCPP::shutdownLibrary();
return 0;
}
官网(http://activemq.apache.org/)
开源中国社区(http://www.oschina.net/p/activemq)
专门介绍原理(http://blog.csdn.net/kongjing0815/article/details/8177459)
基础知识
1. 基本信息:
1. 下载activeMQ,(http://activemq.apache.org/activemq-570-release.html)
2. 这里以apache-activemq-5.7.0-bin.tar.gz为例。
3. 下载后,将其拷贝到/home/ll/commlib/下, 执行tar -zxvf apache-activemq-5.7.0-bin.tar.gz
4. cd apache-activemq-5.7.0,可以看见有个README.txt文件。读读有好处。
5. cd ./bin,可以看见一些可执行的脚本。运行./activemq start,启动activemq.
6. activemq默认端口为61616,用netstat -ano | grep 61616,看看该端口是否处于监听状态。
7. ./activemq stop,关闭.
8. 在apache-activemq-5.7.0/conf/目录下,有对MQ进行配置的文件。
9. 监控: http://10.15.144.162:8161/admin/topics.jsp
2. 性能
官方提供数据:2.1W-2.2W/second,64bit双核CPU,SUSE系统。
3. 基础知识
(1)点对点消息(queue)
每个消息只能有一个消费者(无论消费者在消息产生的时候是否处于运行状态,都可以接收消息)。
(2)sub/pub消息(topic)
每个消息,可以有多个消费者。消费者只能得到订阅后,生产者发布的消息。
JMS 规范允许客户创建持久订阅,持久订阅允许消费者消费它在未处于激活状态时发送的消息。
(3)消息的生产者(producer)
由会话创建的一个对象,将消息发送到目的地。
消息的消费者(consumer)
(1)同步消费(receive阻塞式的接收);(2)异步消费(设置回调函数)
(4)传输方式
VM transport: 在VM内部通信,避免了网络传输的开销。不是采用socket通信,而是采用直接调用的方法。
TCP transport: 客户端通过TCP的方式连接到broker。
Failover transport: 是一种重新建立连接的机制,可靠的传输。
<mq url = "failover:(tcp://10.15.144.71:61616,tcp://10.15.144.72:61616)?randomize=false"/>
failover transport,允许重连的机制,建立可靠的传输;
master-slave:消息被复制到slave-broker。当master-broker遇到故障时,slave-broker不丢失任何消息。
设置randomize="false",可以让客户总是尝试连接master broker(slave broker不会接收任何连接,直到它成为master broker).
Discovery transport: 可靠的transport.
(5)持久化存储(persistence) http://activemq.apache.org/persistence.html
AMQ Message
Store: messages都是被保存在data logs中的,同时被reference store进行索引以提高存取速度。当某个data logs中的消息都被消费了,该data log文件就会被标记,以便在下一轮清理中被归档或删除。
Kaha Persistence: 在
Kaha 中,数据被追加到 data logs 中。当不再需要log文件中的数据的时候,log 文件会被丢弃。
JDBC Persistence: 将消息保存在数据库中。
(6)集群模式(clustering)
master-slave,主从模式。消息从Master Broker复制到Slave Broker。master-broker不需要特殊配置,但slave-broker需要特殊配置
4. 一些特性
(1)当多个consumer同时从Queue中取消息时,要注意消息的顺序。Message Groups特性保证了具有相同GroupID的消息被发送给同一个consumer.
(2)JMS Selectors,可以按照关键字对消息进行过滤,某个consumer只接收关心的消息。
(3)Pending Message Limit Strategy。每个consumer都会设置一定的缓存来接收消息,当consumer的消息满时,broker不会再向consumer分发消息。但这时broker的消息就会累积,会将消息存磁盘(会带来性能问题)。consumer的一种策略是新的消息来时,将旧的消息丢掉,这样就broker就不会积累消息了。
(4)AMQ支持同步发送(sync)和异步发送(async)。async的性能更高,但可能会少量丢失消息。
(5)Strict Order Dispatch Policy,可以保证多个consumer按照同样的顺序来接收多个消息。
(6)AMQ可以采用批量确认(Optimized Acknowledgement)的方式提高效率。
(7)使用Producer Flow
Control来控制producer消息的发送速度。可以限制producer在收到broker确认前,能够发送的最大字节数。
CMS (stands for C++ Messaging Service)
activeMQ作为broker,可以支持多种语言的连接(如java, c++, c等)。其中,C++的地址为(http://activemq.apache.org/cms/);开发接口API文档地址(http://activemq.apache.org/cms/api_docs/activemqcpp-3.6.0/html/index.html)。假设使用的版本为activemq-cpp-library-3.5.0.
activemq-cpp-library-3.5.0的编译是比较麻烦的,依赖于APR, APR-Util, CPPUnit, OpenSSL等,可以参考README.txt文件。
开发模型图
4. 包装后的接口代码
点击(此处)折叠或打开
mq_api.h
#pragma once
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <string>
#include <vector>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
using namespace activemq;
using namespace activemq::core;
using namespace activemq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
typedef void (*MQCbFunc)(string dest_name, unsigned
char *buffer,
size_t buflen);
class ActiveMQAPI: public ExceptionListener, public MessageListener, public DefaultTransportListener
{
public:
ActiveMQAPI(const string& brokerURI, MQCbFunc
pCallBack, const string& clientID = "") :
connection(NULL), session(NULL), pCallBack(pCallBack), brokerURI(brokerURI), clientID(clientID)
{
}
virtual ~ActiveMQAPI() throw ();
void close();
int Init();
/**
*@param dest: topic/queue
name
*@param name: if PersistentMode=true, name is the
durable sub name
*/
int Subscribe(const string& dest, bool
useTopic = true, const string& name =
"", bool PersistentMode = false);
/**
@priority: 0-9,
@timeToLive:The time to live value for this
message in milliseconds. 0, never expire;
*/
int PubMsg(const string& dest, const string& msg, bool
useTopic = true,
bool PersistentMode = false, int priority = 4,
long long timeToLive = 0);
int UnSubscribe(const string& dest, const string& name = "");
virtual void onMessage(const cms::Message* message) throw ();
virtual void onException(const CMSException& ex AMQCPP_UNUSED);
virtual void transportInterrupted();
virtual void transportResumed();
int GetDestCount()
{
return (int) dest_names.size();
}
private:
void cleanup();
private:
Connection* connection;
Session* session;
vector<string> dest_names;
map<string, MessageConsumer*> consumermap; // 消费者容器
map<string, MessageProducer*> producermap; // 生产者容器
map<Destination*, string> destmap;
MQCbFunc pCallBack;
const string brokerURI;
const string clientID;
};
点击(此处)折叠或打开
mq_api.cpp
#include "mq_api.h"
void ActiveMQAPI::close() {
cleanup();
}
ActiveMQAPI::~ActiveMQAPI() throw () {
cleanup();
}
// 初始化
int ActiveMQAPI::Init()
{
try
{
ActiveMQConnectionFactory* connectionFactory =
new ActiveMQConnectionFactory(brokerURI);
//create connection
connection = connectionFactory->createConnection();
if (clientID.size() > 0)
connection->setClientID(clientID);
delete connectionFactory;
connectionFactory = NULL;
ActiveMQConnection* amqConnection =
dynamic_cast<ActiveMQConnection*> (connection);
if (amqConnection != NULL)
{
amqConnection->addTransportListener(this);
}
connection->start();
connection->setExceptionListener(this);
//create session
session = connection->createSession(Session::AUTO_ACKNOWLEDGE); // 自动应答
} catch (CMSException& e)
{
e.printStackTrace();
return -1;
}
return 0;
}
// 去broker订阅.
// @dest为 topic或queue的名字
// @useTopic = true,订阅的为topic;否则为queue.
int ActiveMQAPI::Subscribe(const string& dest, bool
useTopic,
const string& name, bool
PersistentMode)
{
if (session == NULL)
return -1;
Destination* destination;
MessageConsumer *consumer = NULL;
map<string, MessageConsumer*>::iterator
it;
it = consumermap.find(dest);
if (it != consumermap.end())
return 0;
else
{
if (useTopic)
destination = session->createTopic(dest);
else
destination = session->createQueue(dest);
dest_names.push_back(dest);
destmap.insert(pair<Destination*, string> (destination, dest));
if (useTopic && PersistentMode)
{
if (name.size() == 0)
return -1;
// 持久订阅
consumer = session->createDurableConsumer(
(cms::Topic*) destination, name, "");
}
else
{
consumer = session->createConsumer(destination);
}
consumermap.insert(pair<string, MessageConsumer*> (dest, consumer));
consumer->setMessageListener(this);
}
return 0;
}
// 发布
int ActiveMQAPI::PubMsg(const string& dest, const string& msg, bool
useTopic,
bool PersistentMode, int priority, long long timeToLive)
{
if (msg.size() == 0 || dest.size() == 0)
return -1;
if (session == NULL)
return -1;
Destination* destination = NULL;
MessageProducer* producer = NULL;
map<string, MessageProducer*>::iterator
it;
it = producermap.find(dest);
if (it != producermap.end())
{
producer = it->second;
}
else
{
if (useTopic)
destination = session->createTopic(dest);
else
destination = session->createQueue(dest);
producer = session->createProducer(destination);
if (PersistentMode)
producer->setDeliveryMode(DeliveryMode::PERSISTENT);
else
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
producermap.insert(pair<string, MessageProducer*> (dest, producer));
}
BytesMessage* message = session->createBytesMessage((unsigned
char*) msg.c_str(), msg.size());
// producer->send(message);
producer->send(message, (PersistentMode == true) ? DeliveryMode::PERSISTENT
: DeliveryMode::NON_PERSISTENT, priority, timeToLive);
delete message;
message = NULL;
return 0;
}
// 取消订阅
int ActiveMQAPI::UnSubscribe(const string& dest, const string& name) {
map<string, MessageConsumer*>::iterator
it_c = consumermap.find(dest);
MessageConsumer* consumer = NULL;
if (it_c != consumermap.end())
{
consumer = it_c->second;
consumermap.erase(it_c);
delete consumer;
consumer = NULL;
}
map<Destination*, string>::iterator
it;
Destination* destination = NULL;
for (it = destmap.begin(); it != destmap.end();)
{
Destination* k = it->first;
const string &v = it->second;
it++;
if (dest.compare(v) == 0)
{
destination = k;
destmap.erase(k);
try
{
if (NULL == destination)
{
printf(
"ActiveMQAPI(UnSubscribe)[%s]: destination[%p] is NULL",
dest.c_str(), destination);
return -1;
}
ActiveMQConnection* amqConnection =
dynamic_cast<ActiveMQConnection*> (connection);
amqConnection->destroyDestination(destination);
} catch (std::exception& e)
{
printf("ActiveMQAPI(UnSubscribe)[%s]: exception[%s]",
dest.c_str(), e.what());
return -1;
}
delete destination;
destination = NULL;
break;
}
}
if (name.size() > 0)
{
if (session == NULL)
return -1;
try
{
session->unsubscribe(name);
} catch (CMSException& e)
{
e.printStackTrace();
return -1;
}
}
return 0;
}
// 消息来时的回调函数
void ActiveMQAPI::onMessage(const Message* message) throw ()
{
try
{
const BytesMessage* msg = dynamic_cast<const BytesMessage*> (message);
unsigned char *buffer = NULL;
size_t buflen = 0;
string destname;
if (msg != NULL)
{
buffer = msg->getBodyBytes();
buflen = msg->getBodyLength();
map<Destination*, string>::iterator
it;
for (it = destmap.begin(); it != destmap.end(); it++)
{
if (msg->getCMSDestination()->equals(*(it->first)))
{
destname = it->second;
}
}
}
else
{
printf("the msg is NULL");
return;
}
pCallBack(destname, buffer, buflen);
if (NULL != buffer)
{
delete[] buffer;
buffer = NULL;
}
}
catch (CMSException& e)
{
e.printStackTrace();
}
}
void ActiveMQAPI::onException(const CMSException& ex
AMQCPP_UNUSED)
{
printf("ActiveMQAPI: CMS Exception occurred[%s]. Shutting down client.",
ex.what());
// exit(1);
}
void ActiveMQAPI::transportInterrupted()
{
printf("ActiveMQAPI: The Connection's Transport has been Interrupted.");
}
void ActiveMQAPI::transportResumed()
{
printf("ActiveMQAPI: The Connection's Transport has been Restored.");
}
void ActiveMQAPI::cleanup()
{
map<Destination*, string>::iterator
it;
for (it = destmap.begin(); it != destmap.end();)
{
Destination* k = it->first;
it++;
destmap.erase(k);
delete k;
k = NULL;
}
map<string, MessageConsumer*>::iterator
it_consumer;
for (it_consumer = consumermap.begin(); it_consumer != consumermap.end();)
{
const string &k = it_consumer->first;
const MessageConsumer* v = it_consumer->second;
it_consumer++;
if (v != NULL)
{
delete v;
v = NULL;
consumermap.erase(k);
}
}
if (session != NULL)
{
session->close();
delete session;
session = NULL;
}
if (connection != NULL)
{
connection->close();
delete connection;
connection = NULL;
}
}
5. 测试代码
点击(此处)折叠或打开
订阅topic
void MQCallBack(string dest_name, unsigned
char *buffer,
size_t buflen)
{
printf("topic[%s] buffer[%s] \n", dest_name.c_str(), string((char*)buffer, buflen).c_str());
}
int main()
{
activemq::library::ActiveMQCPP::initializeLibrary();
//
ActiveMQAPI* mq = new ActiveMQAPI("failover:(tcp://XX:61616,tcp://XX:61616)?randomize=false",MQCallBack);
if(NULL == mq)
{
printf("new activeMQ error.");
return 0;
}
if(0 != mq->Init())
{
printf("init activeMQ failed.");
delete mq;
mq = NULL;
return 0;
}
mq->Subscribe("bond.mq", true);
while(true)
{
sleep(5);
}
mq->UnSubscribe("bond.mq");
delete mq;
mq = NULL;
activemq::library::ActiveMQCPP::shutdownLibrary();
return 0;
}
订阅后,可以通过 MQ提供的监控平台来查看是否订阅成功。在(http://XX:8161/admin/topics.jsp)主页的
“Connections”下面,通过IP找到运行订阅程序的主机,然后再点击进去,看是否有这个topic。
点击(此处)折叠或打开 发布topic
int main()
{
activemq::library::ActiveMQCPP::initializeLibrary();
//
ActiveMQAPI* mq = new ActiveMQAPI("failover:(tcp://XX:61616,tcp://XX:61616)?randomize=false",MQCallBack);
if(NULL == mq)
{
printf("new activeMQ error.");
return 0;
}
if(0 != mq->Init())
{
printf("init activeMQ failed.");
delete mq;
mq = NULL;
return 0;
}
int iRet = mq->PubMsg("bond.mq", "the
message to send to the consumers.", true, true);
delete mq;
mq = NULL;
activemq::library::ActiveMQCPP::shutdownLibrary();
return 0;
}
相关文章推荐
- 【ActiveMq】ActiveMQ消息队列的使用及应用
- Java activemq消息队列入门学习
- ActiveMQ实现消息队列
- activeMQ死消息队列的管理
- Java ActiveMQ简介以及springboot集成activeMQ实现消息队列监听以及实现MQ延迟
- 工业物联网或系统集成中应用消息队列(ActiveMQ,C#的demo)的场景全面分析
- Spring结合嵌入式ActiveMQ使用消息队列
- Java消息队列总结只需一篇解决ActiveMQ、RabbitMQ、ZeroMQ、Kafka
- Java消息队列--ActiveMq 实战
- ActiveMQ消息队列和spring进行整合实例
- ActiveMQ消息队列主从配置
- ActiveMQ消息队列
- MSMQ?不,太弱了。使用ActiveMQ实现消息队列服务
- ActiveMQ(二):使用队列Queue方式发送消息
- [置顶] spring boot 使用activeMQ实现消息队列简单应用
- spring集成activemq--监听多个消息队列
- ActiveMQ消息队列的应用 C#客户端 Web后端
- activemq 队列消息定时清理
- Spring整合ActiveMQ实现简单的消息队列
- 【ActiveMQ】持久化消息队列的三种方式