您的位置:首页 > 其它

【rabbitmq】解决SimpleAmqpClient创建连接时阻塞的问题

2016-02-27 15:13 609 查看
如果在单线程的程序使用SimpleAmqpClient-v2.4,在使用接口Channel::Create()连接到rabbitmq时,如果网络中断或者ip端口地址不对的时候,程序就会一直阻塞在这个调用上,没有返回值没有异常提示,这种情况如果你想提示个错误什么的就无能为力了,Panda工作中也遇到这个问题,我想:如果他能提供一个连接超时异常就好了,毕竟SimpleAmqpClient只是对另外一个c语言开源项目rabbitmq-c的封装,而且我记得rabbitmq-c是支持我所说的功能的。下面请跟随我一起一步一步完成这个事情吧。

第一步:确定SimpleAmqpClient不支持我要的功能

简化了channel.h里有关创建连接的关键代码

class  Channel : boost::noncopyable
{
public:
typedef boost::shared_ptr<Channel> ptr_t;

// 创建连接
static ptr_t Create(const std::string &host = "127.0.0.1",
int port = 5672,
const std::string &username = "guest",
const std::string &password = "guest",
const std::string &vhost = "/",
int frame_max = 131072);
// 创建ssl连接
static ptr_t CreateSecure(const std::string &path_to_ca_cert="",
const std::string &host = "127.0.0.1",
const std::string &path_to_client_key="",
const std::string &path_to_client_cert="",
int port = 5671,
const std::string &username = "guest",
const std::string &password = "guest",
const std::string &vhost = "/",
int frame_max = 131072,
bool verify_hostname = true);

// 从uri创建连接
static ptr_t CreateFromUri(const std::string &uri, int frame_max = 131072);

// 从uri创建ssl连接
static ptr_t CreateSecureFromUri(const std::string &uri,
const std::string &path_to_ca_cert,
const std::string &path_to_client_key="",
const std::string &path_to_client_cert="",
bool verify_hostname = true,
int frame_max = 131072);
);
没有发现一个参数可以设置超时,然后我上github问了一下库的作者,证实确实是这样了



这样的话只能自己修改库的代码了

第二步:找出关键调用

先来看一下Channel::Channel(…)



然后在rabbitmq-c项目头文件amqp.h中找到创建非阻塞socket的函数



第三步:代码实现

有方向了,终于可以快乐的写代码o(∩_∩)o 。根据设计模式的开闭原则:我们做的事情更好的是扩展而不是修改现有的功能,所以比较优雅的方案应该是增加一个工厂函数生成创建一个channel,做法如下:

在Channel.h增加两个函数

/**
* 以非阻塞的方法创建Channel
* author: panxianzhan
* @param timeout 最大等待事件,为NULL时采用阻塞方式打开
*/
explicit Channel(const std::string &host,
int port,
const std::string &username,
const std::string &password,
const std::string &vhost,
int frame_max,
timeval*
);

/**
* 工厂方法
* 以非阻塞的方法创建Channel
* author: panxianzhan
* @param timeout 最大等待事件,为NULL时采用阻塞方式打开
*/
static ptr_t CreateNoBlock(const std::string &host = "127.0.0.1",
int port = 5672,
const std::string &username = "guest",
const std::string &password = "guest",
const std::string &vhost = "/",
int frame_max = 131072,
timeval* timeout = NULL)
{
return boost::make_shared<Channel>(host, port, username, password, vhost, frame_max, timeout);
}


然后在Channel.cpp实现

Channel::Channel(const std::string &host,
int port,
const std::string &username,
const std::string &password,
const std::string &vhost,
int frame_max,
timeval* timeout) :
m_impl(new Detail::ChannelImpl)
{
m_impl->m_connection = amqp_new_connection();

if (NULL == m_impl->m_connection)
{
throw std::bad_alloc();
}

try
{
amqp_socket_t *socket = amqp_tcp_socket_new(m_impl->m_connection);
int sock = amqp_socket_open_noblock(socket, host.c_str(), port, timeout);
}

//如果连接超时,下面这一行就会抛出异常
m_impl->CheckForError(sock);

m_impl->DoLogin(username, password, vhost, frame_max);
}
catch (...)
{
amqp_destroy_connection(m_impl->m_connection);
throw;
}

m_impl->SetIsConnected(true);
}


这样就大功告成了。使用例子如下:

int main()
{
timeval tv = {0};
tv.tv_usec = 200 * 1000; //等待200毫秒
try
{
Channel::ptr_t channel = Channel::CreateNoBlock(
"127.0.0.1", 5567,"guest", "guest", "/", 131072, &tv);
...
...
} catch (AmqpLibraryException& ex)
{
//提示连接失败;
}
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: