zmq ipc
2017-01-04 22:18
183 查看
/************************************************************************* * > File Name: ZMQ_PipeSocket.h * > Author: wangzhicheng * > Mail: 2363702560@qq.com * > Created Time: 2017-01-06 * > statement: ZMQ封装类 *************************************************************************/ #ifndef ZMQ_PIPE_SOCKET_H #define ZMQ_PIPE_SOCKET_H #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <string.h> #include <zmq.h> #include <string> namespace zmqpipe { using namespace std; class ZMQ_PIPESocket { private: void *m_pContext; void *m_pSocket; int m_nSocketType; string m_strSendServer; // 发送的服务器地址 public: ZMQ_PIPESocket(); ~ZMQ_PIPESocket(); /* * @brief 初始化zmq * */ bool Init(int SocketType, const string &addr, int sendhwm = 1000, int recvhwm = 1000, int sendtimeout = 1000); /* * @brief 发送info指向的对象 * @len 对象的大小 字节数 * @return 发送成功返回true * */ bool Send(void *info, int len); /* * @brief 发送string对象 * @return 发送成功返回true * */ bool Send(const string &strSend); /* * @brief 接收数据 接收的数据由info指向 * @return 接收成功返回true * */ bool Recv(void *info); /* * @brief 接收string对象数据 * @return 接收成功返回true * */ bool Recv(string &strRecv); /* * @brief 获取发送服务器地址 * */ inline void GetSendAddr(string &strOut) { strOut = m_strSendServer; } }; } #endif /************************************************************************* * > File Name: ZMQ_PipeSocket.cpp * > Author: wangzhicheng * > Mail: 2363702560@qq.com * > Created Time: 2017-01-06 * > statement: ZMQ封装类 *************************************************************************/ #include "ZMQ_PipeSocket.h" namespace zmqpipe { ZMQ_PIPESocket::ZMQ_PIPESocket() { m_pContext = NULL; m_pSocket = NULL; } bool ZMQ_PIPESocket::Init(int SocketType, const string &addr, int sendhwm, int recvhwm, int sendtimeout) { m_pContext = zmq_ctx_new(); // m_pContext = zmq_init(1); if(!m_pContext) return false; m_pSocket = zmq_socket(m_pContext, SocketType); if(!m_pSocket) return false; int rc; switch(SocketType) { case ZMQ_REP: case ZMQ_PULL: case ZMQ_PUB: rc = zmq_bind(this->m_pSocket, addr.c_str()); break; case ZMQ_REQ: case ZMQ_PUSH: case ZMQ_SUB: rc = zmq_connect(this->m_pSocket, addr.c_str()); break; default: return false; } if(rc) return false; rc = zmq_setsockopt(m_pSocket, ZMQ_SNDHWM, &sendhwm, sizeof(sendhwm)); if(rc) return false; rc = zmq_setsockopt(m_pSocket, ZMQ_RCVHWM, &recvhwm, sizeof(recvhwm)); if(rc) return false; rc = zmq_setsockopt(m_pSocket, ZMQ_SNDTIMEO, &sendtimeout, sizeof(sendtimeout)); if(rc) return false; m_strSendServer = addr; return true; } /* * @brief 发送info指向的对象 * @len 对象的大小 字节数 * */ bool ZMQ_PIPESocket::Send(void *info, int len) { int rc; zmq_msg_t msg; rc = zmq_msg_init_size(&msg, len); if(rc) return false; memcpy(zmq_msg_data(&msg), (char *)info, len); rc = zmq_msg_send(&msg, this->m_pSocket, 0); return rc == len; } /* * @brief 发送string对象 * */ bool ZMQ_PIPESocket::Send(const string &strSend) { int rc; size_t len = strSend.size(); zmq_msg_t msg; rc = zmq_msg_init_size(&msg, len); if(rc) return false; memcpy(zmq_msg_data(&msg), strSend.c_str(), len); size_t Len = zmq_msg_send(&msg, this->m_pSocket, 0); return Len == len; } /* * @brief 接收数据 接收的数据由info指向 * */ bool ZMQ_PIPESocket::Recv(void *info) { int rc; zmq_msg_t msg; rc = zmq_msg_init(&msg); if(rc) return false; int len = zmq_msg_recv(&msg, this->m_pSocket, 0); // 阻塞方式 if(len <= 0) return false; memcpy(info, (char *)zmq_msg_data(&msg), len); if(zmq_msg_close(&msg)) return false; return true; } /* * @brief 接收string对象数据 * */ bool ZMQ_PIPESocket::Recv(string &strRecv) { int rc; zmq_msg_t msg; rc = zmq_msg_init(&msg); if(rc) return false; int len = zmq_msg_recv(&msg, this->m_pSocket, 0); // 阻塞方式 if(len <= 0) return false; strRecv.assign((char *)zmq_msg_data(&msg), len); if(zmq_msg_close(&msg)) return false; return true; } ZMQ_PIPESocket::~ZMQ_PIPESocket() { if(m_pSocket) { zmq_close(m_pSocket); m_pSocket = NULL; } if(m_pContext) { zmq_ctx_destroy(m_pContext); m_pContext = NULL; } } } /************************************************************************* > File Name: test.cpp > Author: wangzhicheng > Mail: 2363702560@qq.com > Created Time: Wed 04 Jan 2017 09:12:07 PM AWST ************************************************************************/ #include <iostream> #include <vector> #include <thread> #include <chrono> #include "ZMQ_PipeSocket.h" using namespace zmqpipe; //static const string ADDR0 = "ipc://127.0.0.1.ipc"; //static const string ADDR1 = "ipc://127.0.0.1.ipc"; static const string ADDR0 = "inproc://#0"; static const string ADDR1 = "inproc://#1"; static const chrono::milliseconds dura(1000); void Send() { string str = "abc"; ZMQ_PIPESocket sender0; ZMQ_PIPESocket sender1; // if(!sender0.Init(ZMQ_PUSH, ADDR0, 1, 1)) if(!sender0.Init(ZMQ_PUB, ADDR0)) { cerr << "sender init failed...!" << endl; return; } // if(!sender1.Init(ZMQ_PUSH, ADDR1)) /* if(!sender1.Init(ZMQ_PUB, ADDR1)) { cerr << "sender init failed...!" << endl; return; }*/ while(1) { if(!sender0.Send(str)) { cerr << str << " send failed...!" << endl; } /* if(!sender1.Send(str)) { cerr << str << " send failed...!" << endl; }*/ // this_thread::sleep_for(dura); } } void Recv0() { string str; ZMQ_PIPESocket receiver; // if(!receiver.Init(ZMQ_PULL, ADDR0, 1, 1)) if(!receiver.Init(ZMQ_SUB, ADDR0)) { cerr << "receiver init failed...!" << endl; return; } while(1) { if(!receiver.Recv(str)) { cerr << str << " recv failed...!" << endl; } // else cout << str << " has been received...!" << endl; // this_thread::sleep_for(dura); } } void Recv1() { string str; ZMQ_PIPESocket receiver; // if(!receiver.Init(ZMQ_PULL, ADDR1, 1, 1)) if(!receiver.Init(ZMQ_SUB, ADDR0)) { cerr << "receiver init failed...!" << endl; return; } while(1) { if(!receiver.Recv(str)) { cerr << str << " recv failed...!" << endl; } // else cout << str << " has been received...!" << endl; // this_thread::sleep_for(dura); } } int main() { thread th0(Send); thread th1(Recv0); thread th2(Recv1); th0.join(); th1.join(); th2.join(); return 0; } CC=g++ all: $(CC) -std=c++11 -g -o TestZMQ_IPC main.cpp ZMQ_PipeSocket.h ZMQ_PipeSocket.cpp -lzmq -pthread -lpthread
相关文章推荐
- flume启动No appenders could be found for logger (org.apache.flume.lifecycle.LifecycleSupervisor)
- 用管道实现进程间通信
- 用angularjs封装指令遇到的错误Error: [$compile:tplrt]
- catalog备份数据库及RMAN存储脚本
- Caffe 使用1:准备数据 Caffe DIY-step1: Prepare Data
- 列表、表格与框架
- 对HashMap进行排序
- 求数字特征值
- 如何看待master的50连胜
- 指针操作
- [置顶] tomcat启动慢, Creation of SecureRandom instance for session ID generation using [SHA1PRNG]took [xx] mil
- Intellj Idea 远程调试
- php连接sql_server数据库
- 找X(简单查找)
- golang 图片处理,剪切,base64数据转换,文件存储
- SpringMVC学习总结(五).拦截器的使用
- Javascript闭包——懂不懂由你,反正我是懂了
- SpringMVC对比Struts2
- 【Windows】Windows Server 2008 R2:核心基础架构
- leetcode217