您的位置:首页 > 其它

zmq ipc

2017-01-04 22:18 183 查看
/*************************************************************************
*      > File Name: ZMQ_PipeSocket.h
*      > Author: wangzhicheng
*      > Mail: 2363702560@qq.com
*      > Created Time: 2017-01-06
*      > statement: ZMQ封装类
*************************************************************************/
#ifndef ZMQ_PIPE_SOCKET_H
#define ZMQ_PIPE_SOCKET_H
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>
#include <zmq.h>
#include <string>
namespace zmqpipe
{
using namespace std;
class ZMQ_PIPESocket
{
private:
void *m_pContext;
void *m_pSocket;
int m_nSocketType;
string m_strSendServer;                 // 发送的服务器地址
public:
ZMQ_PIPESocket();
~ZMQ_PIPESocket();
/*
* @brief 初始化zmq
* */
bool Init(int SocketType, const string &addr, int sendhwm = 1000, int recvhwm = 1000, int sendtimeout = 1000);
/*
* @brief 发送info指向的对象
* @len 对象的大小 字节数
* @return 发送成功返回true
* */
bool Send(void *info, int len);
/*
* @brief 发送string对象
* @return 发送成功返回true
* */
bool Send(const string &strSend);
/*
* @brief 接收数据 接收的数据由info指向
* @return 接收成功返回true
* */
bool Recv(void *info);
/*
* @brief 接收string对象数据
* @return 接收成功返回true
* */
bool Recv(string &strRecv);
/*
* @brief 获取发送服务器地址
* */
inline void GetSendAddr(string &strOut)
{
strOut = m_strSendServer;
}
};
}

#endif

/*************************************************************************
*      > File Name: ZMQ_PipeSocket.cpp
*      > Author: wangzhicheng
*      > Mail: 2363702560@qq.com
*      > Created Time: 2017-01-06
*      > statement: ZMQ封装类
*************************************************************************/
#include "ZMQ_PipeSocket.h"
namespace zmqpipe
{
ZMQ_PIPESocket::ZMQ_PIPESocket()
{
m_pContext = NULL;
m_pSocket = NULL;
}
bool ZMQ_PIPESocket::Init(int SocketType, const string &addr, int sendhwm, int recvhwm, int sendtimeout)
{
m_pContext = zmq_ctx_new();
//	m_pContext = zmq_init(1);
if(!m_pContext) return false;
m_pSocket = zmq_socket(m_pContext, SocketType);
if(!m_pSocket) return false;
int rc;
switch(SocketType)
{
case ZMQ_REP:
case ZMQ_PULL:
case ZMQ_PUB:
rc = zmq_bind(this->m_pSocket, addr.c_str());
break;
case ZMQ_REQ:
case ZMQ_PUSH:
case ZMQ_SUB:
rc = zmq_connect(this->m_pSocket, addr.c_str());
break;
default:
return false;
}
if(rc) return false;
rc = zmq_setsockopt(m_pSocket, ZMQ_SNDHWM, &sendhwm, sizeof(sendhwm));
if(rc) return false;
rc = zmq_setsockopt(m_pSocket, ZMQ_RCVHWM, &recvhwm, sizeof(recvhwm));
if(rc) return false;
rc = zmq_setsockopt(m_pSocket, ZMQ_SNDTIMEO, &sendtimeout, sizeof(sendtimeout));
if(rc) return false;
m_strSendServer = addr;

return true;
}
/*
* @brief 发送info指向的对象
* @len 对象的大小 字节数
* */
bool ZMQ_PIPESocket::Send(void *info, int len)
{
int rc;
zmq_msg_t msg;
rc = zmq_msg_init_size(&msg, len);
if(rc) return false;
memcpy(zmq_msg_data(&msg), (char *)info, len);
rc = zmq_msg_send(&msg, this->m_pSocket, 0);

return rc == len;
}
/*
* @brief 发送string对象
* */
bool ZMQ_PIPESocket::Send(const string &strSend)
{
int rc;
size_t len = strSend.size();
zmq_msg_t msg;
rc = zmq_msg_init_size(&msg, len);
if(rc) return false;
memcpy(zmq_msg_data(&msg), strSend.c_str(), len);
size_t Len = zmq_msg_send(&msg, this->m_pSocket, 0);

return Len == len;
}
/*
* @brief 接收数据 接收的数据由info指向
* */
bool ZMQ_PIPESocket::Recv(void *info)
{
int rc;
zmq_msg_t msg;
rc = zmq_msg_init(&msg);
if(rc) return false;
int len = zmq_msg_recv(&msg, this->m_pSocket, 0);	// 阻塞方式
if(len <= 0) return false;
memcpy(info, (char *)zmq_msg_data(&msg), len);
if(zmq_msg_close(&msg)) return false;

return true;
}
/*
* @brief 接收string对象数据
* */
bool ZMQ_PIPESocket::Recv(string &strRecv)
{
int rc;
zmq_msg_t msg;
rc = zmq_msg_init(&msg);
if(rc) return false;
int len = zmq_msg_recv(&msg, this->m_pSocket, 0);	// 阻塞方式
if(len <= 0) return false;
strRecv.assign((char *)zmq_msg_data(&msg), len);
if(zmq_msg_close(&msg)) return false;

return true;
}
ZMQ_PIPESocket::~ZMQ_PIPESocket()
{
if(m_pSocket)
{
zmq_close(m_pSocket);
m_pSocket = NULL;
}
if(m_pContext)
{
zmq_ctx_destroy(m_pContext);
m_pContext = NULL;
}
}
}

/*************************************************************************
> File Name: test.cpp
> Author: wangzhicheng
> Mail: 2363702560@qq.com
> Created Time: Wed 04 Jan 2017 09:12:07 PM AWST
************************************************************************/

#include <iostream>
#include <vector>
#include <thread>
#include <chrono>
#include "ZMQ_PipeSocket.h"
using namespace zmqpipe;
//static const string ADDR0 = "ipc://127.0.0.1.ipc";
//static const string ADDR1 = "ipc://127.0.0.1.ipc";
static const string ADDR0 = "inproc://#0";
static const string ADDR1 = "inproc://#1";
static const chrono::milliseconds dura(1000);
void Send()
{
string str = "abc";
ZMQ_PIPESocket sender0;
ZMQ_PIPESocket sender1;
//	if(!sender0.Init(ZMQ_PUSH, ADDR0, 1, 1))
if(!sender0.Init(ZMQ_PUB, ADDR0))
{
cerr << "sender init failed...!" << endl;
return;
}
//	if(!sender1.Init(ZMQ_PUSH, ADDR1))
/*	if(!sender1.Init(ZMQ_PUB, ADDR1))
{
cerr << "sender init failed...!" << endl;
return;
}*/
while(1)
{
if(!sender0.Send(str))
{
cerr << str << " send failed...!" << endl;
}
/*		if(!sender1.Send(str))
{
cerr << str << " send failed...!" << endl;
}*/
//		this_thread::sleep_for(dura);
}
}
void Recv0()
{
string str;
ZMQ_PIPESocket receiver;
//	if(!receiver.Init(ZMQ_PULL, ADDR0, 1, 1))
if(!receiver.Init(ZMQ_SUB, ADDR0))
{
cerr << "receiver init failed...!" << endl;
return;
}
while(1)
{
if(!receiver.Recv(str))
{
cerr << str << " recv failed...!" << endl;
}
//		else cout << str << " has been received...!" << endl;
//		this_thread::sleep_for(dura);
}
}
void Recv1()
{
string str;
ZMQ_PIPESocket receiver;
//	if(!receiver.Init(ZMQ_PULL, ADDR1, 1, 1))
if(!receiver.Init(ZMQ_SUB, ADDR0))
{
cerr << "receiver init failed...!" << endl;
return;
}
while(1)
{
if(!receiver.Recv(str))
{
cerr << str << " recv failed...!" << endl;
}
//		else cout << str << " has been received...!" << endl;
//		this_thread::sleep_for(dura);
}
}
int main()
{
thread th0(Send);
thread th1(Recv0);
thread th2(Recv1);
th0.join();
th1.join();
th2.join();

return 0;
}

CC=g++
all:
$(CC) -std=c++11 -g -o TestZMQ_IPC main.cpp ZMQ_PipeSocket.h ZMQ_PipeSocket.cpp -lzmq -pthread -lpthread
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: