您的位置:首页 > 其它

消息队列ActiveMQ

2015-02-02 12:14 183 查看
参考文档:

官网(http://activemq.apache.org/)

开源中国社区(http://www.oschina.net/p/activemq)

专门介绍原理(http://blog.csdn.net/kongjing0815/article/details/8177459)

基础知识

1. 基本信息:
1. 下载activeMQ,(http://activemq.apache.org/activemq-570-release.html)

2. 这里以apache-activemq-5.7.0-bin.tar.gz为例。
3. 下载后,将其拷贝到/home/ll/commlib/下, 执行tar -zxvf apache-activemq-5.7.0-bin.tar.gz
4. cd apache-activemq-5.7.0,可以看见有个README.txt文件。读读有好处。
5. cd ./bin,可以看见一些可执行的脚本。运行./activemq start,启动activemq.
6. activemq默认端口为61616,用netstat -ano | grep 61616,看看该端口是否处于监听状态。
7. ./activemq stop,关闭.

8. 在apache-activemq-5.7.0/conf/目录下,有对MQ进行配置的文件。
9. 监控: http://10.15.144.162:8161/admin/topics.jsp
2. 性能

官方提供数据:2.1W-2.2W/second,64bit双核CPU,SUSE系统。

3. 基础知识

(1)点对点消息(queue)

每个消息只能有一个消费者(无论消费者在消息产生的时候是否处于运行状态,都可以接收消息)。

(2)sub/pub消息(topic)

每个消息,可以有多个消费者。消费者只能得到订阅后,生产者发布的消息。

JMS 规范允许客户创建持久订阅,持久订阅允许消费者消费它在未处于激活状态时发送的消息。

(3)消息的生产者(producer)

由会话创建的一个对象,将消息发送到目的地。
消息的消费者(consumer)

(1)同步消费(receive阻塞式的接收);(2)异步消费(设置回调函数)
(4)传输方式
VM transport: 在VM内部通信,避免了网络传输的开销。不是采用socket通信,而是采用直接调用的方法。

TCP transport: 客户端通过TCP的方式连接到broker。

Failover transport: 是一种重新建立连接的机制,可靠的传输。

<mq url = "failover:(tcp://10.15.144.71:61616,tcp://10.15.144.72:61616)?randomize=false"/>

failover transport,允许重连的机制,建立可靠的传输;

master-slave:消息被复制到slave-broker。当master-broker遇到故障时,slave-broker不丢失任何消息。

设置randomize="false",可以让客户总是尝试连接master broker(slave broker不会接收任何连接,直到它成为master broker).

Discovery transport: 可靠的transport.
(5)持久化存储(persistence) http://activemq.apache.org/persistence.html
AMQ Message
Store: messages都是被保存在data logs中的,同时被reference store进行索引以提高存取速度。当某个data logs中的消息都被消费了,该data log文件就会被标记,以便在下一轮清理中被归档或删除。

Kaha Persistence: 在
Kaha 中,数据被追加到 data logs 中。当不再需要log文件中的数据的时候,log 文件会被丢弃。

JDBC Persistence: 将消息保存在数据库中。

(6)集群模式(clustering)
master-slave,主从模式。消息从Master Broker复制到Slave Broker。master-broker不需要特殊配置,但slave-broker需要特殊配置

4. 一些特性

(1)当多个consumer同时从Queue中取消息时,要注意消息的顺序。Message Groups特性保证了具有相同GroupID的消息被发送给同一个consumer.

(2)JMS Selectors,可以按照关键字对消息进行过滤,某个consumer只接收关心的消息。

(3)Pending Message Limit Strategy。每个consumer都会设置一定的缓存来接收消息,当consumer的消息满时,broker不会再向consumer分发消息。但这时broker的消息就会累积,会将消息存磁盘(会带来性能问题)。consumer的一种策略是新的消息来时,将旧的消息丢掉,这样就broker就不会积累消息了。

(4)AMQ支持同步发送(sync)和异步发送(async)。async的性能更高,但可能会少量丢失消息。

(5)Strict Order Dispatch Policy,可以保证多个consumer按照同样的顺序来接收多个消息。

(6)AMQ可以采用批量确认(Optimized Acknowledgement)的方式提高效率。

(7)使用Producer Flow
Control来控制producer消息的发送速度。可以限制producer在收到broker确认前,能够发送的最大字节数。

CMS (stands for C++ Messaging Service)

activeMQ作为broker,可以支持多种语言的连接(如java, c++, c等)。其中,C++的地址为(http://activemq.apache.org/cms/);开发接口API文档地址(http://activemq.apache.org/cms/api_docs/activemqcpp-3.6.0/html/index.html)。假设使用的版本为activemq-cpp-library-3.5.0.

activemq-cpp-library-3.5.0的编译是比较麻烦的,依赖于APR, APR-Util, CPPUnit, OpenSSL等,可以参考README.txt文件。

开发模型图



4. 包装后的接口代码

点击(此处)折叠或打开
mq_api.h


#pragma once

#include <decaf/lang/Thread.h>

#include <decaf/lang/Runnable.h>

#include <decaf/util/concurrent/CountDownLatch.h>

#include <activemq/core/ActiveMQConnectionFactory.h>

#include <activemq/core/ActiveMQConnection.h>

#include <activemq/transport/DefaultTransportListener.h>

#include <activemq/library/ActiveMQCPP.h>

#include <decaf/lang/Integer.h>

#include <activemq/util/Config.h>

#include <decaf/util/Date.h>

#include <cms/Connection.h>

#include <cms/Session.h>

#include <cms/TextMessage.h>

#include <cms/BytesMessage.h>

#include <cms/MapMessage.h>

#include <cms/ExceptionListener.h>

#include <cms/MessageListener.h>

#include <string>

#include <vector>

#include <stdlib.h>

#include <stdio.h>

#include <iostream>

using namespace activemq;

using namespace activemq::core;

using namespace activemq::transport;

using namespace decaf::lang;

using namespace decaf::util;

using namespace decaf::util::concurrent;

using namespace cms;

using namespace std;

typedef void (*MQCbFunc)(string dest_name, unsigned
char *buffer,

size_t buflen);

class ActiveMQAPI: public ExceptionListener, public MessageListener, public DefaultTransportListener

{

public:

ActiveMQAPI(const string& brokerURI, MQCbFunc
pCallBack, const string& clientID = "") :

connection(NULL), session(NULL), pCallBack(pCallBack), brokerURI(brokerURI), clientID(clientID)

{

}

virtual ~ActiveMQAPI() throw ();

void close();

int Init();

/**

*@param dest: topic/queue
name

*@param name: if PersistentMode=true, name is the
durable sub name

*/

int Subscribe(const string& dest, bool
useTopic = true, const string& name =

"", bool PersistentMode = false);

/**

@priority: 0-9,

@timeToLive:The time to live value for this
message in milliseconds. 0, never expire;

*/

int PubMsg(const string& dest, const string& msg, bool
useTopic = true,

bool PersistentMode = false, int priority = 4,

long long timeToLive = 0);

int UnSubscribe(const string& dest, const string& name = "");

virtual void onMessage(const cms::Message* message) throw ();

virtual void onException(const CMSException& ex AMQCPP_UNUSED);

virtual void transportInterrupted();

virtual void transportResumed();

int GetDestCount()

{

return (int) dest_names.size();

}

private:

void cleanup();

private:

Connection* connection;

Session* session;

vector<string> dest_names;

map<string, MessageConsumer*> consumermap; // 消费者容器

map<string, MessageProducer*> producermap; // 生产者容器

map<Destination*, string> destmap;

MQCbFunc pCallBack;

const string brokerURI;

const string clientID;

};

点击(此处)折叠或打开
mq_api.cpp


#include "mq_api.h"

void ActiveMQAPI::close() {

cleanup();

}

ActiveMQAPI::~ActiveMQAPI() throw () {

cleanup();

}

// 初始化

int ActiveMQAPI::Init()

{

try

{

ActiveMQConnectionFactory* connectionFactory =

new ActiveMQConnectionFactory(brokerURI);

//create connection

connection = connectionFactory->createConnection();

if (clientID.size() > 0)

connection->setClientID(clientID);

delete connectionFactory;

connectionFactory = NULL;

ActiveMQConnection* amqConnection =

dynamic_cast<ActiveMQConnection*> (connection);

if (amqConnection != NULL)

{

amqConnection->addTransportListener(this);

}

connection->start();

connection->setExceptionListener(this);

//create session

session = connection->createSession(Session::AUTO_ACKNOWLEDGE); // 自动应答

} catch (CMSException& e)

{

e.printStackTrace();

return -1;

}

return 0;

}

// 去broker订阅.

// @dest为 topic或queue的名字

// @useTopic = true,订阅的为topic;否则为queue.

int ActiveMQAPI::Subscribe(const string& dest, bool
useTopic,

const string& name, bool
PersistentMode)

{

if (session == NULL)

return -1;

Destination* destination;

MessageConsumer *consumer = NULL;

map<string, MessageConsumer*>::iterator
it;

it = consumermap.find(dest);

if (it != consumermap.end())

return 0;

else

{

if (useTopic)

destination = session->createTopic(dest);

else

destination = session->createQueue(dest);

dest_names.push_back(dest);

destmap.insert(pair<Destination*, string> (destination, dest));

if (useTopic && PersistentMode)

{

if (name.size() == 0)

return -1;

// 持久订阅

consumer = session->createDurableConsumer(

(cms::Topic*) destination, name, "");

}

else

{

consumer = session->createConsumer(destination);

}

consumermap.insert(pair<string, MessageConsumer*> (dest, consumer));

consumer->setMessageListener(this);

}

return 0;

}

// 发布

int ActiveMQAPI::PubMsg(const string& dest, const string& msg, bool
useTopic,

bool PersistentMode, int priority, long long timeToLive)

{

if (msg.size() == 0 || dest.size() == 0)

return -1;

if (session == NULL)

return -1;

Destination* destination = NULL;

MessageProducer* producer = NULL;

map<string, MessageProducer*>::iterator
it;

it = producermap.find(dest);

if (it != producermap.end())

{

producer = it->second;

}

else

{

if (useTopic)

destination = session->createTopic(dest);

else

destination = session->createQueue(dest);

producer = session->createProducer(destination);

if (PersistentMode)

producer->setDeliveryMode(DeliveryMode::PERSISTENT);

else

producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);

producermap.insert(pair<string, MessageProducer*> (dest, producer));

}

BytesMessage* message = session->createBytesMessage((unsigned
char*) msg.c_str(), msg.size());

// producer->send(message);

producer->send(message, (PersistentMode == true) ? DeliveryMode::PERSISTENT

: DeliveryMode::NON_PERSISTENT, priority, timeToLive);

delete message;

message = NULL;

return 0;

}

// 取消订阅

int ActiveMQAPI::UnSubscribe(const string& dest, const string& name) {

map<string, MessageConsumer*>::iterator
it_c = consumermap.find(dest);

MessageConsumer* consumer = NULL;

if (it_c != consumermap.end())

{

consumer = it_c->second;

consumermap.erase(it_c);

delete consumer;

consumer = NULL;

}

map<Destination*, string>::iterator
it;

Destination* destination = NULL;

for (it = destmap.begin(); it != destmap.end();)

{

Destination* k = it->first;

const string &v = it->second;

it++;

if (dest.compare(v) == 0)

{

destination = k;

destmap.erase(k);

try

{

if (NULL == destination)

{

printf(

"ActiveMQAPI(UnSubscribe)[%s]: destination[%p] is NULL",

dest.c_str(), destination);

return -1;

}

ActiveMQConnection* amqConnection =

dynamic_cast<ActiveMQConnection*> (connection);

amqConnection->destroyDestination(destination);

} catch (std::exception& e)

{

printf("ActiveMQAPI(UnSubscribe)[%s]: exception[%s]",

dest.c_str(), e.what());

return -1;

}

delete destination;

destination = NULL;

break;

}

}

if (name.size() > 0)

{

if (session == NULL)

return -1;

try

{

session->unsubscribe(name);

} catch (CMSException& e)

{

e.printStackTrace();

return -1;

}

}

return 0;

}

// 消息来时的回调函数

void ActiveMQAPI::onMessage(const Message* message) throw ()

{

try

{

const BytesMessage* msg = dynamic_cast<const BytesMessage*> (message);

unsigned char *buffer = NULL;

size_t buflen = 0;

string destname;

if (msg != NULL)

{

buffer = msg->getBodyBytes();

buflen = msg->getBodyLength();

map<Destination*, string>::iterator
it;

for (it = destmap.begin(); it != destmap.end(); it++)

{

if (msg->getCMSDestination()->equals(*(it->first)))

{

destname = it->second;

}

}

}

else

{

printf("the msg is NULL");

return;

}

pCallBack(destname, buffer, buflen);

if (NULL != buffer)

{

delete[] buffer;

buffer = NULL;

}

}

catch (CMSException& e)

{

e.printStackTrace();

}

}

void ActiveMQAPI::onException(const CMSException& ex
AMQCPP_UNUSED)

{

printf("ActiveMQAPI: CMS Exception occurred[%s]. Shutting down client.",

ex.what());

// exit(1);

}

void ActiveMQAPI::transportInterrupted()

{

printf("ActiveMQAPI: The Connection's Transport has been Interrupted.");

}

void ActiveMQAPI::transportResumed()

{

printf("ActiveMQAPI: The Connection's Transport has been Restored.");

}

void ActiveMQAPI::cleanup()

{

map<Destination*, string>::iterator
it;

for (it = destmap.begin(); it != destmap.end();)

{

Destination* k = it->first;

it++;

destmap.erase(k);

delete k;

k = NULL;

}

map<string, MessageConsumer*>::iterator
it_consumer;

for (it_consumer = consumermap.begin(); it_consumer != consumermap.end();)

{

const string &k = it_consumer->first;

const MessageConsumer* v = it_consumer->second;

it_consumer++;

if (v != NULL)

{

delete v;

v = NULL;

consumermap.erase(k);

}

}

if (session != NULL)

{

session->close();

delete session;

session = NULL;

}

if (connection != NULL)

{

connection->close();

delete connection;

connection = NULL;

}

}

5. 测试代码

点击(此处)折叠或打开
订阅topic


void MQCallBack(string dest_name, unsigned
char *buffer,

size_t buflen)

{

printf("topic[%s] buffer[%s] \n", dest_name.c_str(), string((char*)buffer, buflen).c_str());

}

int main()

{

activemq::library::ActiveMQCPP::initializeLibrary();

//

ActiveMQAPI* mq = new ActiveMQAPI("failover:(tcp://XX:61616,tcp://XX:61616)?randomize=false",MQCallBack);

if(NULL == mq)

{

printf("new activeMQ error.");

return 0;

}

if(0 != mq->Init())

{

printf("init activeMQ failed.");

delete mq;

mq = NULL;

return 0;

}

mq->Subscribe("bond.mq", true);

while(true)

{

sleep(5);

}

mq->UnSubscribe("bond.mq");

delete mq;

mq = NULL;

activemq::library::ActiveMQCPP::shutdownLibrary();

return 0;

}

订阅后,可以通过 MQ提供的监控平台来查看是否订阅成功。在(http://XX:8161/admin/topics.jsp)主页的
“Connections”下面,通过IP找到运行订阅程序的主机,然后再点击进去,看是否有这个topic。

点击(此处)折叠或打开 发布topic

int main()

{

activemq::library::ActiveMQCPP::initializeLibrary();

//

ActiveMQAPI* mq = new ActiveMQAPI("failover:(tcp://XX:61616,tcp://XX:61616)?randomize=false",MQCallBack);

if(NULL == mq)

{

printf("new activeMQ error.");

return 0;

}

if(0 != mq->Init())

{

printf("init activeMQ failed.");

delete mq;

mq = NULL;

return 0;

}

int iRet = mq->PubMsg("bond.mq", "the
message to send to the consumers.", true, true);

delete mq;

mq = NULL;

activemq::library::ActiveMQCPP::shutdownLibrary();

return 0;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: