消息队列NetMQ 原理分析3-命令产生/处理和回收线程
2017-04-27 17:22
477 查看
消息队列NetMQ 原理分析3-命令产生/处理和回收线程
前言
介绍
目的
命令
命令结构
命令产生
命令处理
创建Socket(SocketBase)
创建连接
创建绑定
回收线程
初始化回收线程
MailBox
释放SocketBase
发送回收命令
处理回收命令
SocketBase回收
终止处理
终止SocketBase
终止当前Socket关联的Session
终止管道
总结
zeromq的英文文档
NetMQ的英文文档
消息队列NetMQ 原理分析1-Context和ZObject
消息队列NetMQ 原理分析2-IO线程和完成端口
消息队列NetMQ 原理分析3-命令产生/处理、创建Socket和回收线程
消息队列NetMQ 原理分析4-Session、Option和Pipe
消息队列NetMQ 原理分析5-Engine
消息队列NetMQ 原理分析6-TCP和Inpoc实现
消息队列NetMQ 原理分析7-Device
消息队列NetMQ 原理分析8-不同类型的Socket
消息队列NetMQ 原理分析9-实战
其包含了3个信息:调用者,命令类型和命令参数。
我们简单介绍了
当有命令来的时候调用
当有命令需要处理时调用
当完成端口接收到信号需要命令处理时,调用
在线程轮询方法
当有命令需要处理时,会调用
首先会根据
创建完后,就对地址进行解析。若有多个地址,则可用,分隔。
解析完成后则用默认的方式进行绑定或连接,如
负载均衡选择一个IO线程。
创建
创建管道,创建管道会创建一对单向管道,形成“一个”双向管道。头尾分别连接
具体关于
处理
将
为IO对象设置
将当前
负载均衡选择一个IO线程。
处理
将
为IO对象设置
将当前
初始化回收线程是会创建一个
初始化回收线程会创建一个
当前Socket终止处理
最后确认释放
终止处理
终止
默认情况下
若终止的是
终止SocketBase
终止
然后增加需要终端请求响应的个数,当全部都响应了则处理第四步骤
清空当前关联的
最后当
终止当前Socket关联的Session
如果终端管道命令在终止命令前处理了,则立即终止当前
标记当前准备终止
若
检查管道是否还有数据要读取
终止管道
管道状态如下所示
终止当前管道
若当前状态为
终止另一个管道
确认终止
整体回收
前言
介绍
目的
命令
命令结构
命令产生
命令处理
创建Socket(SocketBase)
创建连接
创建绑定
回收线程
初始化回收线程
MailBox
释放SocketBase
发送回收命令
处理回收命令
SocketBase回收
终止处理
终止SocketBase
终止当前Socket关联的Session
终止管道
总结
前言
介绍
[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是对标准socket接口的扩展。它提供了一种异步消息队列,多消息模式,消息过滤(订阅),对多种传输协议的无缝访问。 当前有2个版本正在维护,版本3最新版为3.3.4,版本4最新版本为4.0.1。本文档是对4.0.1分支代码进行分析。
zeromq的英文文档
NetMQ的英文文档
目的
对NetMQ的源码进行学习并分析理解,因此写下该系列文章,本系列文章暂定编写计划如下:消息队列NetMQ 原理分析1-Context和ZObject
消息队列NetMQ 原理分析2-IO线程和完成端口
消息队列NetMQ 原理分析3-命令产生/处理、创建Socket和回收线程
消息队列NetMQ 原理分析4-Session、Option和Pipe
消息队列NetMQ 原理分析5-Engine
消息队列NetMQ 原理分析6-TCP和Inpoc实现
消息队列NetMQ 原理分析7-Device
消息队列NetMQ 原理分析8-不同类型的Socket
消息队列NetMQ 原理分析9-实战
命令
命令结构
Command定义如下internal struct Command { public Command([CanBeNull] ZObject destination, CommandType type, [CanBeNull] object arg = null) : this() { Destination = destination; CommandType = type; Arg = arg; } [CanBeNull] public ZObject Destination { get; } public CommandType CommandType { get; } [CanBeNull] public object Arg { get; private set; } public override string ToString() { return base.ToString() + "[" + CommandType + ", " + Destination + "]"; } }
其包含了3个信息:调用者,命令类型和命令参数。
命令产生
还记的《消息队列NetMQ 原理分析1-Context和ZObject》中我们介绍过NetMQ中的命令类型吗?待处理命令全部会存放着Socket的信箱中。当
Socket有命令(连接完成、发送完成或接受完成等)需要处理时调用基类
ZObject的
SendCommand方法。
private void SendCommand([NotNull] Command cmd) { m_ctx.SendCommand(cmd.Destination.ThreadId, cmd); }
ZObject实际调用
Context的SendCommand方法
public void SendCommand(int threadId, [NotNull] Command command) { m_slots[threadId].Send(command); }
m_slots[threadId]保存的是当前IO线程的IO信箱
IOThreadMailbox,在《消息队列NetMQ 原理分析2-IO线程和完成端口》
我们简单介绍了
IOThreadMailbox的结构。
[NotNull] private readonly YPipe<Command> m_commandPipe = new YPipe<Command>(Config.CommandPipeGranularity, "mailbox");
IOThreadMailbox中维护这一个
Command管道,该管道实际就是一个先进先出队列,详细解析会在第四章进行介绍。
public void Send(Command command) { bool ok; lock (m_sync) { //向管道写入命令 m_commandPipe.Write(ref command, false); //成功写入会返回false,表示有命令需要处理 ok = m_commandPipe.Flush(); } if (!ok) { //向完成端口传递信号 m_proactor.SignalMailbox(this); } } public bool TryRecv(out Command command) { return m_commandPipe.TryRead(out command); } public void RaiseEvent() { if (!m_disposed) { m_mailboxEvent.Ready(); } }
IOThreadMailbox的主要就是这三个方法
当有命令来的时候调用
Send方法向管道(队列)写入命令。写完时,会向完成端口传递信号。
当有命令需要处理时调用
TryRecv方法读取
当完成端口接收到信号需要命令处理时,调用
RaiseEvent(实际是信箱的IO线程的
RaiseEvent方法)进行处理命令。
public void SignalMailbox(IOThreadMailbox mailbox) { //该方法会向完成端口的队列中插入一个信号状态 m_completionPort.Signal(mailbox); }
命令处理
当有命令需要处理时,完成端口会接收到信号。private void Loop() { ... int timeout = ExecuteTimers(); int removed; if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed)) continue; for (int i = 0; i < removed; i++) { try { if (completionStatuses[i].OperationType == OperationType.Signal) { var mailbox = (IOThreadMailbox)completionStatuses[i].State; mailbox.RaiseEvent(); } ... } ... } ... }
在线程轮询方法
Loop中,当接收到需要处理的数据时,首先会判断是否是信号,若为信号,则将状态(参数)转化为
IOThreadMailbox类型,同时调用
RaiseEvent方法处理命令。
public void Ready() { Command command; while (m_mailbox.TryRecv(out command)) command.Destination.ProcessCommand(command); }
当有命令需要处理时,会调用
IOThreadMailbox的
TryRecv方法从管道(队列,先进先出)中获取第一个命令进行处理。
创建Socket(SocketBase)
在介绍回收线程工作之前,我们先看下创建一个新的Socket做了哪些工作,这里的
Socket实际是NetMQ中的
SocketBase。
RequestSocket socket = new RequestSocket(); socket.Connect("tcp://127.0.0.1:12345");
NetMQSocket是NetMQ的
Socket的基类。
public RequestSocket(string connectionString = null) : base(ZmqSocketType.Req, connectionString, DefaultAction.Connect) { }
internal NetMQSocket(ZmqSocketType socketType, string connectionString, DefaultAction defaultAction) { m_socketHandle = NetMQConfig.Context.CreateSocket(socketType); m_netMqSelector = new NetMQSelector(); Options = new SocketOptions(this); m_socketEventArgs = new NetMQSocketEventArgs(this); Options.Linger = NetMQConfig.Linger; if (!string.IsNullOrEmpty(connectionString)) { var endpoints = connectionString.Split(new[] {','}, StringSplitOptions.RemoveEmptyEntries) .Select(a => a.Trim()).Where(a=> !string.IsNullOrEmpty(a)); foreach (string endpoint in endpoints) { if (endpoint[0] == '@') { Bind(endpoint.Substring(1)); } else if (endpoint[0] == '>') { Connect(endpoint.Substring(1)); } else if (defaultAction == DefaultAction.Connect) { Connect(endpoint); } else { Bind(endpoint); } } } }
首先会根据
Socket的类型创建对应的
Socket,调用的是
Context的
CreateSocket方法。具体的请看创建SocketBase。最终创建方法是调用
SocketBase的
Create方法
public static SocketBase Create(ZmqSocketType type, [NotNull] Ctx parent, int threadId, int socketId) { switch (type) { ... case ZmqSocketType.Req: return new Req(parent, threadId, socketId); ... default: throw new InvalidException("SocketBase.Create called with invalid type of " + type); } }
创建完后,就对地址进行解析。若有多个地址,则可用,分隔。
var endpoints = connectionString.Split(new[] {','}, StringSplitOptions.RemoveEmptyEntries) .Select(a => a.Trim()).Where(a=> !string.IsNullOrEmpty(a));
解析完成后则用默认的方式进行绑定或连接,如
RequestSocket默认为连接,而
ResponseSocket则为绑定。
创建连接
首先对地址进行解析,判断当前是tcp还是其他协议。然后会根据协议类型创建对应的Socket,具体的协议类型分析请查看《消息队列NetMQ 原理分析6-TCP和Inpoc实现》private static void DecodeAddress([NotNull] string addr, out string address, out string protocol) { const string protocolDelimeter = "://"; int protocolDelimeterIndex = addr.IndexOf(protocolDelimeter, StringComparison.Ordinal); protocol = addr.Substring(0, protocolDelimeterIndex); address = addr.Substring(protocolDelimeterIndex + protocolDelimeter.Length); }
负载均衡选择一个IO线程。
创建
Session,
Socket和
Session的关系如图所示
创建管道,创建管道会创建一对单向管道,形成“一个”双向管道。头尾分别连接
Socket和
Session,如上图所示。创建管道完毕后需要设置管道的回调事件,管道1设置回调为
Socket的回调方法,管道2设置为
Session的回调方法。
具体关于
Session和
Pipe的内容请查看《消息队列NetMQ 原理分析4-Session、Option和Pipe》。
处理
Socket和
Session的关系
protected void LaunchChild([NotNull] Own obj) { // Specify the owner of the object. obj.SetOwner(this); // Plug the object into the I/O thread. SendPlug(obj); // Take ownership of the object. SendOwn(this, obj); }
将
Session的宿主设置为该
Socket
private void SetOwner([NotNull] Own owner) { Debug.Assert(m_owner == null); m_owner = owner; }
为IO对象设置
Session,当管道有数据交互时,
Session的回调方法就会触发。
protected void SendPlug([NotNull] Own destination, bool incSeqnum = true) { if (incSeqnum) destination.IncSeqnum(); SendCommand(new Command(destination, CommandType.Plug)); }
SessionBase的
ProcessPlug会被触发
protected override void ProcessPlug() { m_ioObject.SetHandler(this); if (m_connect) StartConnecting(false); }
将当前
Session加入到
Socket的
Session集合中,
protected void SendOwn([NotNull] Own destination, [NotNull] Own obj) { destination.IncSeqnum(); SendCommand(new Command(destination, CommandType.Own, obj)); }
SocketBase的父类方法
SendOwn(Own方法)方法会被触发,将
Session加入到集合中
protected override void ProcessOwn(Own obj) { ... // Store the reference to the owned object. m_owned.Add(obj); }
创建绑定
首先对地址进行解析,判断当前是tcp还是其他协议。然后会根据协议类型创建对应的Socket,具体的协议类型分析请查看《消息队列NetMQ 原理分析6-TCP和Inpoc实现》
private static void DecodeAddress([NotNull] string addr, out string address, out string protocol) { const string protocolDelimeter = "://"; int protocolDelimeterIndex = addr.IndexOf(protocolDelimeter, StringComparison.Ordinal); protocol = addr.Substring(0, protocolDelimeterIndex); address = addr.Substring(protocolDelimeterIndex + protocolDelimeter.Length); }
负载均衡选择一个IO线程。
处理
Socket和
Session的关系
protected void LaunchChild([NotNull] Own obj) { // Specify the owner of the object. obj.SetOwner(this); // Plug the object into the I/O thread. SendPlug(obj); // Take ownership of the object. SendOwn(this, obj); }
将
Listener的宿主设置为该
Socket
private void SetOwner([NotNull] Own owner) { Debug.Assert(m_owner == null); m_owner = owner; }
为IO对象设置
Listener,当管道有数据交互是,
Listener的回调方法就会触发。
protected void SendPlug([NotNull] Own destination, bool incSeqnum = true) { if (incSeqnum) destination.IncSeqnum(); SendCommand(new Command(destination, CommandType.Plug)); }
Listener的
ProcessPlug会被触发
protected override void ProcessPlug() { m_ioObject.SetHandler(this); m_ioObject.AddSocket(m_handle); //接收异步socket Accept(); }
将当前
Listener加入到
Socket的
Listener集合中,
protected void SendOwn([NotNull] Own destination, [NotNull] Own obj) { destination.IncSeqnum(); SendCommand(new Command(destination, CommandType.Own, obj)); }
SocketBase的父类方法
SendOwn(Own方法)方法会被触发,将
Listener加入到集合中
protected override void ProcessOwn(Own obj) { ... // Store the reference to the owned object. m_owned.Add(obj); }
SocketBase的创建处理就完成了
回收线程
(垃圾)回收线程是专门处理(清理)异步关闭的Socket的线程,它在NetMQ中起到至关重要的作用。
internal class Reaper : ZObject, IPollEvents { ... }
Reaper是一个ZObject对象,同时实现了
IPollEvents接口,该接口的作用是当有信息接收或发送时进行处理。回收线程实现了
InEvent方法。
internal interface IPollEvents : ITimerEvent { void InEvent(); void OutEvent(); }
InEvent方法实现和IO线程的
Ready方法很像,都是遍历需要处理的命令进行处理。
public void InEvent() { while (true) { Command command; if (!m_mailbox.TryRecv(0, out command)) break; command.Destination.ProcessCommand(command); } }
初始化回收线程
public Reaper([NotNull] Ctx ctx, int threadId) : base(ctx, threadId) { m_sockets = 0; m_terminating = false; string name = "reaper-" + threadId; m_poller = new Utils.Poller(name); m_mailbox = new Mailbox(name); m_mailboxHandle = m_mailbox.Handle; m_poller.AddHandle(m_mailboxHandle, this); m_poller.SetPollIn(m_mailboxHandle); }
初始化回收线程是会创建一个
Poller对象,用于轮询回收
SocketBase。
初始化回收线程会创建一个
Mailbox对象用于
Command的收发
MailBox
internal class Mailbox : IMailbox{ ... }
MailBox和IO线程的
IOThreadMailbox一样,实现了
IMailbox接口。
释放SocketBase
当有SocketBase需要释放时,会向完成端口发送
Reap信号。
public void Close() { // Mark the socket as disposed m_disposed = true; //工作线程向Socket邮箱发送Reap信号 //回收线程会做剩下的工作 SendReap(this); }
发送回收命令
向回收线程的邮箱发送当前SocketBase的回收命令
protected void SendReap([NotNull] SocketBase socket) { SendCommand(new Command(m_ctx.GetReaper(), CommandType.Reap, socket)); }
处理回收命令
Reap接收到释放信号进行处理
protected override void ProcessReap(SocketBase socket) { // Add the socket to the poller. socket.StartReaping(m_poller); ++m_sockets; }
SocketBase回收
将当前Socket的加入到回收线程的中,当
Socket接收到数据时,由回收线程回调该Socket的处理事件进行处理。
当前Socket终止处理
最后确认释放
internal void StartReaping([NotNull] Poller poller) { m_poller = poller; m_handle = m_mailbox.Handle; m_poller.AddHandle(m_handle, this); m_poller.SetPollIn(m_handle); Terminate(); CheckDestroy(); }
终止处理
终止
Socket时,直接终止即可
默认情况下
NetMQ的
Linger值被设置为-1,就是说如果网络读写没有进行完是不能退出的。如果
Linger被设置为0,那么中断时会丢弃一切未完成的网络操作。如果
Linger被设置的大于0,那么将等待
Linger毫秒用来完成未完成的网络读写,在指定的时间里完成或者超时都会立即返回。
若终止的是
Session,则需要发送请求清理关联Socket的当前
Session对象
protected void Terminate() { ... if (m_owner == null) { // 释放的是Socket,Owner为空 ProcessTerm(m_options.Linger); } else { // 释放的是Session则会关联一个Socket SendTermReq(m_owner, this); } }
终止SocketBase
终止
SocketBase时,需要先中断当前
SocketBase关联的
SessionBase
然后增加需要终端请求响应的个数,当全部都响应了则处理第四步骤
清空当前关联的
Session集合
最后当
Session全部终止后发送给当前
Socket宿主终端响应(TermAck)
protected override void ProcessTerm(int linger) { ... // 断开所有session的连接 foreach (Own it in m_owned) { SendTerm(it, linger); } RegisterTermAcks(m_owned.Count); m_owned.Clear(); CheckTermAcks(); }
终止当前Socket关联的Session
如果终端管道命令在终止命令前处理了,则立即终止当前
Session
标记当前准备终止
若
Ligner大于0 则等到N毫秒后再终止终止
Socket和
Session之间的管道
检查管道是否还有数据要读取
protected override void ProcessTerm(int linger) { if (m_pipe == null) { ProceedWithTerm(); return; } m_pending = true; if (linger > 0) { Debug.Assert(!m_hasLingerTimer); m_ioObject.AddTimer(linger, LingerTimerId); m_hasLingerTimer = true; } // 是否需要等待一定时间后消息处理完再终止管道. m_pipe.Terminate(linger != 0); // TODO: Should this go into pipe_t::terminate ? // In case there's no engine and there's only delimiter in the // pipe it wouldn't be ever read. Thus we check for it explicitly. m_pipe.CheckRead(); }
终止管道
管道状态如下所示
private enum State { /// <summary> Active 表示在中断命令开始前的状态 </summary> Active, /// <summary> Delimited 表示在终端命令接收前从管道接收到分隔符</summary> Delimited, /// <summary> Pending 表示中断命令已经从管道接收,但是仍有待定消息可读</summary> Pending, /// <summary> Terminating 表示所有待定消息都已经读取等待管道终止确认信号返回 </summary> Terminating, /// <summary> Terminated 表示终止命令是由用户显示调用 </summary> Terminated, /// <summary> Double_terminated 表示用户调用了终止命令同时管道也调用了终止命令 </summary> DoubleTerminated }
终止当前管道
若当前状态为
Terminated、
DoubleTerminated和
Terminating不再处理终止命令
public void Terminate(bool delay) { //判断当前状态是否可处理终止命令 ... if (m_state == State.Active) { // 向另一个管道发送终止命令然后等待确认终止 SendPipeTerm(m_peer); m_state = State.Terminated; } else if (m_state == State.Pending && !m_delay) { // 若有待处理数据,但是不等待直接终止,则向另一个管道发送确认终止. m_outboundPipe = null; SendPipeTermAck(m_peer); m_state = State.Terminating; } else if (m_state == State.Pending) { //若有待处理数据但是需要等到则不处理. } else if (m_state == State.Delimited) { //若已经获取到限定符但是还没有收到终止命令则忽略定界符,然后发送终止命令给另一个管道 SendPipeTerm(m_peer); m_state = State.Terminated; } else { // 没有其他状态 Debug.Assert(false); } //停止向外发送的消息 m_outActive = false; if (m_outboundPipe != null) { //抛弃未发送出的消息. Rollback(); // 这里不会再先查水位,所以即使管道满了也可再写入,向管道写入定界符 . var msg = new Msg(); msg.InitDelimiter(); m_outboundPipe.Write(ref msg, false); Flush(); } }
终止另一个管道
protected override void ProcessPipeTerm() { // 这是一个简单的例子有道管道终止 //若没有更多待处理消息需要读取,或者这个管道已经丢去待处理数据,我们直接将状态设置为正在终止(terminating),否则我们搁置待处理状态直到所有待处理消息被发送 if (m_state == State.Active) { if (!m_delay) { //不需要等到消息处理 m_state = State.Terminating; m_outboundPipe = null; //发送终止确认 SendPipeTermAck(m_peer); } else m_state = State.Pending; return; } // 若定界符碰巧在终止命令之前到达,将状态改为正在终止 if (m_state == State.Delimited) { m_state = State.Terminating; m_outboundPipe = null; SendPipeTermAck(m_peer); return; } // 当管道并发关闭,则状态改为DoubleTerminated if (m_state == State.Terminated) { m_state = State.DoubleTerminated; m_outboundPipe = null; SendPipeTermAck(m_peer); return; } // pipe_term is invalid in other states. Debug.Assert(false); }
确认终止
protected override void ProcessPipeTermAck() { // 通知Socket或Session中断当前管道 . Debug.Assert(m_sink != null); m_sink.Terminated(this); // 若正则处理或double_terminated这里不做任何事 // 简化释放管道,在已终止状态,我们必须在释放这个管道之前确认 //其他状态都是非法的 if (m_state == State.Terminated) { m_outboundPipe = null; SendPipeTermAck(m_peer); } else Debug.Assert(m_state == State.Terminating || m_state == State.DoubleTerminated); // 删除所有管道中的未读消息,然后释放流入管道 var msg = new Msg(); while (m_inboundPipe.TryRead(out msg)) { msg.Close(); } m_inboundPipe = null; }
整体回收
Socket流程图如下:
public virtual void InEvent() { // 回收线程命令会调用此事件 try { ProcessCommands(0, false); } catch { // ignored } finally { CheckDestroy(); } }
private void CheckDestroy() { // socket释放完则做最后的清除和释放工作. if (m_destroyed) { // 从回收线程移除轮询 m_poller.RemoveHandle(m_handle); // 释放socke. DestroySocket(this); // 通知已释放. SendReaped(); // Deallocate. base.ProcessDestroy(); } }
总结
该篇介绍命令处理方式和回收线程回收Socket,顺便介绍了下创建
SocketBase的细节性问题。以便对释放
Socket有更清晰的认识。
相关文章推荐
- 消息队列NetMQ 原理分析2-IO线程和完成端口
- 消息队列NetMQ 原理分析4-Socket、Session、Option和Pipe
- 消息队列NetMQ 原理分析5-StreamEngine、Encord和Decord
- 消息队列NetMQ 原理分析1-Context和ZObject
- Looper原理及自定义消息处理线程
- 变态的libDispatch源码分析-全局队列异步延时任务处理过程-原理与创建ds
- 利用主线程与子线程间的消息通讯,实现任务处理队列
- 源码分析异步消息处理线程机制(Looper MessageQueue Handler Message)
- 关于android Handler进行线程处理消息队列与线程队列线程无法停止的问题
- 老李推荐:第6章6节《MonkeyRunner源码剖析》Monkey原理分析-事件源-事件源概览-命令队列
- Android--Handler使用应运及消息机制处理原理分析
- ipcs命令_Linux ipcs 命令用法详解:分析消息队列共享内存和信号量
- 第二人生的源码分析(四十二)实现消息处理的线程类
- 第二人生的源码分析(四十二)实现消息处理的线程类
- 利用主线程与子线程间的消息通讯,实现任务处理队列.子线程中创建不会阻塞执行的窗口
- Handler消息处理分析(UI线程)
- Handler 、 Looper 、Message异步消息处理线程机制( hander消息机制原理)
- Android 系列 4.11使用活动线程队列和处理程序在线程之间发送消息
- 老李推荐:第6章6节《MonkeyRunner源码剖析》Monkey原理分析-事件源-事件源概览-命令队列
- vc/mfc 进程消息队列,线程消息队列,和系统消息队列,该如何处理