您的位置:首页 > 其它

消息队列-zmq常用通信模式

2012-12-27 18:23 399 查看
zmq是一个消息队列。可以在进程内、进程间、TCP、多播中,以消息为单位传输数据,而不是socket的字节流。官方主页上有下载、使用、文档,蛮全的。

常用模式有:Request-Reply,Publish-Subscribe,Parallel Pipeline。

Request-Reply

request

zmq::context_t context (1);
zmq::socket_t socket (context, ZMQ_REQ);
//Send the request
zmq::message_t request(6);
memcpy ((void *)request.data(), "Hello", 5);
socket.send(request);
//Get the reply
zmq::message_t reply;
socket.recv(&reply);


server

zmq::context_t context (1);
zmq::socket_t socket(context, ZMQ_REP);
socket.bind ("tcp://*:5555");
while (true) {
zmq::message_t request;
//  Wait for next request from client
socket.recv (&request);
std::cout << "Received Hello" << std::endl;
//  Do some 'work'
sleep (1);
//  Send reply back to client
zmq::message_t reply (5);
memcpy ((void *) reply.data (), "World", 5);
socket.send (reply);
}




Publish-Subscribe

publisher

zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
publisher.bind("tcp://*:5556");
publisher.send(message);

subscriber

zmq::context_t context (1);
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5556");
const char *filter = "";
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));
zmq::message_t update;
subscriber.recv(&update);




Parallel Pipeline

ventilator

zmq::context_t context (1);
//  Socket to send messages on
zmq::socket_t  sender(context, ZMQ_PUSH);
sender.bind("tcp://*:5557");
//  通知sink开始处理任务
zmq::socket_t sink(context, ZMQ_PUSH);
sink.connect("tcp://localhost:5558");
zmq::message_t message(2);
memcpy(message.data(), "0", 1);
sink.send(message);
//开始往pipeline发送数据
message.rebuild(10);
sprintf ((char *) message.data(), "%d", workload);
sender.send(message);


worker

zmq::context_t context(1);

//  Socket to receive messages on
zmq::socket_t receiver(context, ZMQ_PULL);
receiver.connect("tcp://localhost:5557");

//  Socket to send messages to
zmq::socket_t sender(context, ZMQ_PUSH);
sender.connect("tcp://localhost:5558");

//  Process tasks forever
while (1) {
receiver.recv(&message);
//  Send results to sink
message.rebuild();
sender.send(message);
}


sink

//  Prepare our context and socket
zmq::context_t context(1);
zmq::socket_t receiver(context,ZMQ_PULL);
receiver.bind("tcp://*:5558");

//  Wait for start of batch
zmq::message_t message;
receiver.recv(&message);
//receive from worker
receiver.recv(&message);


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: