消息队列-zmq常用通信模式
2012-12-27 18:23
399 查看
zmq是一个消息队列。可以在进程内、进程间、TCP、多播中,以消息为单位传输数据,而不是socket的字节流。官方主页上有下载、使用、文档,蛮全的。
常用模式有:Request-Reply,Publish-Subscribe,Parallel Pipeline。
Request-Reply
request
server
Publish-Subscribe
publisher
subscriber
Parallel Pipeline
ventilator
worker
sink
常用模式有:Request-Reply,Publish-Subscribe,Parallel Pipeline。
Request-Reply
request
zmq::context_t context (1); zmq::socket_t socket (context, ZMQ_REQ); //Send the request zmq::message_t request(6); memcpy ((void *)request.data(), "Hello", 5); socket.send(request); //Get the reply zmq::message_t reply; socket.recv(&reply);
server
zmq::context_t context (1); zmq::socket_t socket(context, ZMQ_REP); socket.bind ("tcp://*:5555"); while (true) { zmq::message_t request; // Wait for next request from client socket.recv (&request); std::cout << "Received Hello" << std::endl; // Do some 'work' sleep (1); // Send reply back to client zmq::message_t reply (5); memcpy ((void *) reply.data (), "World", 5); socket.send (reply); }
Publish-Subscribe
publisher
zmq::context_t context (1); zmq::socket_t publisher (context, ZMQ_PUB); publisher.bind("tcp://*:5556"); publisher.send(message);
subscriber
zmq::context_t context (1); zmq::socket_t subscriber (context, ZMQ_SUB); subscriber.connect("tcp://localhost:5556"); const char *filter = ""; subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter)); zmq::message_t update; subscriber.recv(&update);
Parallel Pipeline
ventilator
zmq::context_t context (1); // Socket to send messages on zmq::socket_t sender(context, ZMQ_PUSH); sender.bind("tcp://*:5557"); // 通知sink开始处理任务 zmq::socket_t sink(context, ZMQ_PUSH); sink.connect("tcp://localhost:5558"); zmq::message_t message(2); memcpy(message.data(), "0", 1); sink.send(message); //开始往pipeline发送数据 message.rebuild(10); sprintf ((char *) message.data(), "%d", workload); sender.send(message);
worker
zmq::context_t context(1); // Socket to receive messages on zmq::socket_t receiver(context, ZMQ_PULL); receiver.connect("tcp://localhost:5557"); // Socket to send messages to zmq::socket_t sender(context, ZMQ_PUSH); sender.connect("tcp://localhost:5558"); // Process tasks forever while (1) { receiver.recv(&message); // Send results to sink message.rebuild(); sender.send(message); }
sink
// Prepare our context and socket zmq::context_t context(1); zmq::socket_t receiver(context,ZMQ_PULL); receiver.bind("tcp://*:5558"); // Wait for start of batch zmq::message_t message; receiver.recv(&message); //receive from worker receiver.recv(&message);
相关文章推荐
- 消息队列-zmq常用通信模式
- zmq常用通信模式
- linux进程通信-消息队列
- IPC通信:Posix消息队列的属性设置
- 消息队列通信方式为什么在内核和用户空间进行四次的数据拷贝
- [老老实实学WCF] 第九篇 消息通信模式(上) 请求应答与单向
- linux编程---进程通信---消息队列
- 进程通信-----消息队列
- 实现基于VxWorks的消息队列通信机制的C/S通信
- 管道、消息队列、共享内存几种IPC通信,简单的代码
- SpringBoot的RabbitMQ消息队列: 五、第四模式"Routing"
- 【C语言】【unix c】两个进程通过消息队列实现进程间的通信
- 异步消息队列zeromq实现服务器间高性能通信
- 常用消息队列对比
- Linux进程通信之消息队列的双向通信
- 操作系统 进程间的通信 之 信号 消息队列 共享内存 浅析
- 消息队列(MQ)之常用消息队列简介
- 【Linux】IPC通信之消息队列
- redis 消息队列发布订阅模式spring boot实现
- android异步图片加载三之handler+线程池+消息队列模式+缓存