【rabbitmq】解决SimpleAmqpClient创建连接时阻塞的问题
2016-02-27 15:13
609 查看
如果在单线程的程序使用SimpleAmqpClient-v2.4,在使用接口Channel::Create()连接到rabbitmq时,如果网络中断或者ip端口地址不对的时候,程序就会一直阻塞在这个调用上,没有返回值没有异常提示,这种情况如果你想提示个错误什么的就无能为力了,Panda工作中也遇到这个问题,我想:如果他能提供一个连接超时异常就好了,毕竟SimpleAmqpClient只是对另外一个c语言开源项目rabbitmq-c的封装,而且我记得rabbitmq-c是支持我所说的功能的。下面请跟随我一起一步一步完成这个事情吧。
第一步:确定SimpleAmqpClient不支持我要的功能
简化了channel.h里有关创建连接的关键代码
这样的话只能自己修改库的代码了
第二步:找出关键调用
先来看一下Channel::Channel(…)
然后在rabbitmq-c项目头文件amqp.h中找到创建非阻塞socket的函数
第三步:代码实现
有方向了,终于可以快乐的写代码o(∩_∩)o 。根据设计模式的开闭原则:我们做的事情更好的是扩展而不是修改现有的功能,所以比较优雅的方案应该是增加一个工厂函数生成创建一个channel,做法如下:
在Channel.h增加两个函数
然后在Channel.cpp实现
这样就大功告成了。使用例子如下:
第一步:确定SimpleAmqpClient不支持我要的功能
简化了channel.h里有关创建连接的关键代码
class Channel : boost::noncopyable { public: typedef boost::shared_ptr<Channel> ptr_t; // 创建连接 static ptr_t Create(const std::string &host = "127.0.0.1", int port = 5672, const std::string &username = "guest", const std::string &password = "guest", const std::string &vhost = "/", int frame_max = 131072); // 创建ssl连接 static ptr_t CreateSecure(const std::string &path_to_ca_cert="", const std::string &host = "127.0.0.1", const std::string &path_to_client_key="", const std::string &path_to_client_cert="", int port = 5671, const std::string &username = "guest", const std::string &password = "guest", const std::string &vhost = "/", int frame_max = 131072, bool verify_hostname = true); // 从uri创建连接 static ptr_t CreateFromUri(const std::string &uri, int frame_max = 131072); // 从uri创建ssl连接 static ptr_t CreateSecureFromUri(const std::string &uri, const std::string &path_to_ca_cert, const std::string &path_to_client_key="", const std::string &path_to_client_cert="", bool verify_hostname = true, int frame_max = 131072); );没有发现一个参数可以设置超时,然后我上github问了一下库的作者,证实确实是这样了
这样的话只能自己修改库的代码了
第二步:找出关键调用
先来看一下Channel::Channel(…)
然后在rabbitmq-c项目头文件amqp.h中找到创建非阻塞socket的函数
第三步:代码实现
有方向了,终于可以快乐的写代码o(∩_∩)o 。根据设计模式的开闭原则:我们做的事情更好的是扩展而不是修改现有的功能,所以比较优雅的方案应该是增加一个工厂函数生成创建一个channel,做法如下:
在Channel.h增加两个函数
/** * 以非阻塞的方法创建Channel * author: panxianzhan * @param timeout 最大等待事件,为NULL时采用阻塞方式打开 */ explicit Channel(const std::string &host, int port, const std::string &username, const std::string &password, const std::string &vhost, int frame_max, timeval* ); /** * 工厂方法 * 以非阻塞的方法创建Channel * author: panxianzhan * @param timeout 最大等待事件,为NULL时采用阻塞方式打开 */ static ptr_t CreateNoBlock(const std::string &host = "127.0.0.1", int port = 5672, const std::string &username = "guest", const std::string &password = "guest", const std::string &vhost = "/", int frame_max = 131072, timeval* timeout = NULL) { return boost::make_shared<Channel>(host, port, username, password, vhost, frame_max, timeout); }
然后在Channel.cpp实现
Channel::Channel(const std::string &host, int port, const std::string &username, const std::string &password, const std::string &vhost, int frame_max, timeval* timeout) : m_impl(new Detail::ChannelImpl) { m_impl->m_connection = amqp_new_connection(); if (NULL == m_impl->m_connection) { throw std::bad_alloc(); } try { amqp_socket_t *socket = amqp_tcp_socket_new(m_impl->m_connection); int sock = amqp_socket_open_noblock(socket, host.c_str(), port, timeout); } //如果连接超时,下面这一行就会抛出异常 m_impl->CheckForError(sock); m_impl->DoLogin(username, password, vhost, frame_max); } catch (...) { amqp_destroy_connection(m_impl->m_connection); throw; } m_impl->SetIsConnected(true); }
这样就大功告成了。使用例子如下:
int main() { timeval tv = {0}; tv.tv_usec = 200 * 1000; //等待200毫秒 try { Channel::ptr_t channel = Channel::CreateNoBlock( "127.0.0.1", 5567,"guest", "guest", "/", 131072, &tv); ... ... } catch (AmqpLibraryException& ex) { //提示连接失败; } return 0; }
相关文章推荐
- 基于 Apache 在本地配置多个虚拟主机
- 边缘检测
- HDU 5294 Tricks Device(最短路+最大流)
- 在Eclipse中使用JUnit4进行单元测试(高级篇)
- 第三届_银行密码
- MVVM设计模式具体实现
- Ulua_toLua_基本案例(一)
- 004_Http之response响应头
- 左侧容器高度随着右侧容器的高度改变而改变
- CodeForces - 280D k-Maximum Subsequence Sum 线段树模拟费用流操作
- listview单个刷新item
- moveZeroes--JavaScript
- SpringMVC实现poi 解析excel 导入导出
- log4j【8】(slf4j)
- adb断开解决方法
- 如何配置JAVA的环境变量、Tomcat环境变量
- UVA294DIvisors(唯一分解定理+约数个数)
- java,冒泡排序法,网上查阅
- MySQL数据库基准压力测试工具之MySQLSlap使用实例
- sql行转列