您的位置:首页 > 其它

ZeroMQ的进阶

2015-06-10 16:18 309 查看
上一篇博文我们对ZeroMQ的经典模式做了写Demo让他跑起来了,但实际开发中我们可能面临一些远比上述复杂的场景。这时候我们需要进一步的对经典模式进行扩展,所幸ZeroMQ已经为我们做好了准备工作。

来吧,让我们继续在码上几行ZeroMQ的砖头。

ZeroMQ扩展模式

请求响应代理模式

请求响应模式绑定了请求端和响应端的联系,当我们添加一个新的响应端时则不得不修改响应的请求端配置,这是在是太不scalability了,想分布式必须解耦啊,想解耦就得添加第三方做代理,这个Proxy就是RouterSocet+DealerSokect。如果响应端是无状态的DealerSokect还能公平队列算法提供的负载均衡的效果。



Demo:

发布订阅同步模式

简单的发布订阅模式有个文档由于ZeroMQ收发的速率特别快,可能有些订阅方没来得及启动就已经广播了几条消息,或者中间一旦有些订阅端连接期间同样 会漏过一些消息。在比较苛刻的环境中可能会要求必须所有订阅端到齐才开始广播,或者一旦有订阅端断开连接马上停止广播。如果发布方知道所有的订阅方时,就 可以使用同步的发布订阅模式。这个模式实际启动两个Socket连接,一个是Pub - Sub,另一个是Req - Rep, 运行时订阅方首先启动请求应答队列向发布方告知自己已连接,并定期发送Ping消息告知自己在线,发布方发现所有的订阅方都已到齐,开始启动发布,并能够 在某个订阅方失去几次心跳请求后停止发布,并向MoniterSocket告知订阅方掉线。运行如下图:



发布订阅代理模式

发布订阅代理模式适用于跨网络广播时应用,只需要在一个网络入口设置一个和一个Pub对接的Sub客户端即可。



Sub客户端接收的消息通过Pub广播出去即可,唯一需要注意的是,如果一个消息是多帧消息,保证消息的完整转发即可。一般在在代理的处理代码中加上一个 嵌套的While处理。内层While只有确认一个多帧消息结束才可以break跳出,接受下一个新的消息。

代理简化代码如下:

扩展推拉模式

扩展推拉模式在原来经典的推拉模式的Work上添加了订阅队列, 当Sink一方受到所有的worker推送过来的执行结果后,向所有额Work推送自杀消息,结束Worker线程,如下左图。

当然也可以进一步扩展在三者之外添加一个Manger角色托管一个ResponseSocket,负责Start Worker和Kill Worker。 具体为在Ventilator收到任务时向Manger发送消息告知有任务,Manger负责启动Worker,并向Ventilator返回 Worker线程已启动,在Sink收到全部Worker执行结果后,广播任务完成消息,Worker的订阅队列收到消息后自杀,如右图。

推 拉模式注意一点,在NetMQ中PushSocket的Send的动作时阻塞的,在没有连接到Pull时Send方法将不返回。也就是说他是一个同步发送 的过程,内容一直尝试发送直到超时。所以最好发送前做一次尝试发送这也是为什么Demo中首先发送一个0之后继续发送真正的Task的原因。

当Push连接多个Pull时会启动负载均衡策略保证尽可能平均的将信息分配给Pull,同样如果一个Pull连接了多个Push则会保证尽可能公平的从各个Push中接收信息叫公平队列,注意慢连接的情况,有时候可能在一个Worker首先连接上了Ventilator上,接下了所有任务这导致其他Worker的队列饥渴,所以要保证Work都启动后在开始分发任务。

注:PushSocket和PullSocket都能够Bind和Connect(1 VS N );但只有Push可以Send,只有Pull可以Receive;





Demo:

深入理解NetMQSocket

上文我们使用了各类的Socket,有Response/Request、Push/Pull、Router/Dealer等等,这些Socket都集成值同一个基类NetMQSocket。接下来我们到NetMQSocket里看看还为我们准备什么?

发送接收信息(Send / Receive)

毫无疑问Send和Receive是最重要的两个方法,没有这两个方法我嘛事也做不了。仔细看发现这两个方法都提供对了是否阻塞发送/接受和 HasMore的选项。而且最终发送的都是字节数组,所以如果你总是使用Send发送字符串而不是自己做Encoding工作这里会有一定的损耗,不过这 不是大问题,总要有人来做Encoding的不是。

另一个参数hasMore则是值当前收到消息帧是否还有关联的消息帧,如果hasMore = = true那么你需要收到所有的消息帧合并为一个消息才能解析

通信绑定和连接(Bind / Connecet)

Bind方法将向底层的Socket注册要是使用的通信方式和通信地址,一个NetMQSocket运行多次Bind从而使用不通的地址和协议进行信息交 换,一般来说大部分的NetMQSocket子类都同时支持Bind, Receive 方法,一般来说在通信双方处于比较稳定的一方总是使用Bind来确认通信地址和协议。另一方则使用Connect来执行连接。

轮询(Poll)

内部封装了一个轮询方法,在我们注册了 ReceiveReady / SendReady事件处理函数后负责同步或异步触发这些事件;

订阅(Subscribe)

订阅仅供SubscriberSocket和XSubscriberSocket使用,如果你发现你的额SubscriberSocket收不到订阅信 息,请检查是否代码中少加了Subscribe()方法,该方法提供一个Topic参数重载,允许你订阅特定主题的信息,但是信息根据主题筛选实际是在订 阅端筛分的,也就是说订阅端实际仍然接受了所有的发布方发出的消息。

监控(Monitor)

之前版本的NetMQ并没有提供像其他MQ那样明确的监控角色幸好在3.2版本添加了一个MonitorMQ的队列其他队列可以在发送延时,堆积通信异常 时将信息发往Monitor,这里的Monitor方法指定一个监控Socket,允许在NetMQSocket出现异常或正常事件是将事件信息发送到 MonitorSocket。

代理(Proxy)

Proxy其实是一个独立的类而不是NetMQSocet的方法或事件。 它的主要用处就是如果需要解耦通信双方时可以在Proxy内加入承上、启下的两个NetMQSocet对象在整个通信链路汇总充当代理的角色。如发布订阅代理,请求应答代理等。

简单的Demo:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: