您的位置:首页 > 其它

消息队列NetMQ 原理分析3-命令产生/处理和回收线程

2017-04-27 17:22 477 查看
消息队列NetMQ 原理分析3-命令产生/处理和回收线程

前言

介绍

目的

命令

命令结构

命令产生

命令处理

创建Socket(SocketBase)

创建连接

创建绑定

回收线程

初始化回收线程

MailBox

释放SocketBase

发送回收命令

处理回收命令

SocketBase回收

终止处理

终止SocketBase

终止当前Socket关联的Session

终止管道

总结

前言

介绍

[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是对标准socket接口的扩展。它提供了一种异步消息队列,多消息模式,消息过滤(订阅),对多种传输协议的无缝访问。
当前有2个版本正在维护,版本3最新版为3.3.4,版本4最新版本为4.0.1。本文档是对4.0.1分支代码进行分析。

zeromq的英文文档

NetMQ的英文文档

目的

对NetMQ的源码进行学习并分析理解,因此写下该系列文章,本系列文章暂定编写计划如下:

消息队列NetMQ 原理分析1-Context和ZObject

消息队列NetMQ 原理分析2-IO线程和完成端口

消息队列NetMQ 原理分析3-命令产生/处理、创建Socket和回收线程

消息队列NetMQ 原理分析4-Session、Option和Pipe

消息队列NetMQ 原理分析5-Engine

消息队列NetMQ 原理分析6-TCP和Inpoc实现

消息队列NetMQ 原理分析7-Device

消息队列NetMQ 原理分析8-不同类型的Socket

消息队列NetMQ 原理分析9-实战


友情提示: 看本系列文章时最好获取源码,更有助于理解。



命令

命令结构

Command定义如下

internal struct Command
{
public Command([CanBeNull] ZObject destination, CommandType type, [CanBeNull] object arg = null) : this()
{
Destination = destination;
CommandType = type;
Arg = arg;
}
[CanBeNull]
public ZObject Destination { get; }
public CommandType CommandType { get; }
[CanBeNull]
public object Arg { get; private set; }
public override string ToString()
{
return base.ToString() + "[" + CommandType + ", " + Destination + "]";
}
}

其包含了3个信息:调用者,命令类型和命令参数。

命令产生

还记的《消息队列NetMQ 原理分析1-Context和ZObject》中我们介绍过NetMQ中的命令类型吗?待处理命令全部会存放着
Socket
的信箱中。当
Socket
有命令(连接完成、发送完成或接受完成等)需要处理时调用基类
ZObject
SendCommand
方法。

private void SendCommand([NotNull] Command cmd)
{
m_ctx.SendCommand(cmd.Destination.ThreadId, cmd);
}




ZObject
实际调用
Context
的SendCommand方法

public void SendCommand(int threadId, [NotNull] Command command)
{
m_slots[threadId].Send(command);
}

m_slots[threadId]
保存的是当前IO线程的IO信箱
IOThreadMailbox
,在《消息队列NetMQ 原理分析2-IO线程和完成端口》

我们简单介绍了
IOThreadMailbox
的结构。

[NotNull] private readonly YPipe<Command> m_commandPipe = new YPipe<Command>(Config.CommandPipeGranularity, "mailbox");

IOThreadMailbox
中维护这一个
Command
管道,该管道实际就是一个先进先出队列,详细解析会在第四章进行介绍。

public void Send(Command command)
{
bool ok;
lock (m_sync)
{
//向管道写入命令
m_commandPipe.Write(ref command, false);
//成功写入会返回false,表示有命令需要处理
ok = m_commandPipe.Flush();
}
if (!ok)
{
//向完成端口传递信号
m_proactor.SignalMailbox(this);
}
}

public bool TryRecv(out Command command)
{
return m_commandPipe.TryRead(out command);
}

public void RaiseEvent()
{
if (!m_disposed)
{
m_mailboxEvent.Ready();
}
}

IOThreadMailbox
的主要就是这三个方法

当有命令来的时候调用
Send
方法向管道(队列)写入命令。写完时,会向完成端口传递信号。

当有命令需要处理时调用
TryRecv
方法读取

当完成端口接收到信号需要命令处理时,调用
RaiseEvent
(实际是信箱的IO线程的
RaiseEvent
方法)进行处理命令。

public void SignalMailbox(IOThreadMailbox mailbox)
{
//该方法会向完成端口的队列中插入一个信号状态
m_completionPort.Signal(mailbox);
}


命令处理

当有命令需要处理时,完成端口会接收到信号。

private void Loop()
{
...
int timeout = ExecuteTimers();
int removed;
if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed))
continue;
for (int i = 0; i < removed; i++)
{
try
{
if (completionStatuses[i].OperationType == OperationType.Signal)
{
var mailbox = (IOThreadMailbox)completionStatuses[i].State;
mailbox.RaiseEvent();
}
...
}
...
}
...
}

在线程轮询方法
Loop
中,当接收到需要处理的数据时,首先会判断是否是信号,若为信号,则将状态(参数)转化为
IOThreadMailbox
类型,同时调用
RaiseEvent
方法处理命令。

public void Ready()
{
Command command;
while (m_mailbox.TryRecv(out command))
command.Destination.ProcessCommand(command);
}

当有命令需要处理时,会调用
IOThreadMailbox
TryRecv
方法从管道(队列,先进先出)中获取第一个命令进行处理。

创建Socket(SocketBase)

在介绍回收线程工作之前,我们先看下创建一个新的
Socket
做了哪些工作,这里的
Socket
实际是NetMQ中的
SocketBase


RequestSocket socket = new RequestSocket();
socket.Connect("tcp://127.0.0.1:12345");

NetMQSocket
是NetMQ的
Socket
的基类。

public RequestSocket(string connectionString = null) : base(ZmqSocketType.Req, connectionString, DefaultAction.Connect)
{

}

internal NetMQSocket(ZmqSocketType socketType, string connectionString, DefaultAction defaultAction)
{
m_socketHandle = NetMQConfig.Context.CreateSocket(socketType);
m_netMqSelector = new NetMQSelector();
Options = new SocketOptions(this);
m_socketEventArgs = new NetMQSocketEventArgs(this);

Options.Linger = NetMQConfig.Linger;

if (!string.IsNullOrEmpty(connectionString))
{
var endpoints =
connectionString.Split(new[] {','}, StringSplitOptions.RemoveEmptyEntries)
.Select(a => a.Trim()).Where(a=> !string.IsNullOrEmpty(a));

foreach (string endpoint in endpoints)
{
if (endpoint[0] == '@')
{
Bind(endpoint.Substring(1));
}
else if (endpoint[0] == '>')
{
Connect(endpoint.Substring(1));
}
else if (defaultAction == DefaultAction.Connect)
{
Connect(endpoint);
}
else
{
Bind(endpoint);
}
}
}
}

首先会根据
Socket
的类型创建对应的
Socket
,调用的是
Context
CreateSocket
方法。具体的请看创建SocketBase。最终创建方法是调用
SocketBase
Create
方法

public static SocketBase Create(ZmqSocketType type, [NotNull] Ctx parent, int threadId, int socketId)
{
switch (type)
{
...
case ZmqSocketType.Req:
return new Req(parent, threadId, socketId);
...
default:
throw new InvalidException("SocketBase.Create called with invalid type of " + type);
}
}

创建完后,就对地址进行解析。若有多个地址,则可用,分隔。

var endpoints =
connectionString.Split(new[] {','}, StringSplitOptions.RemoveEmptyEntries)
.Select(a => a.Trim()).Where(a=> !string.IsNullOrEmpty(a));

解析完成后则用默认的方式进行绑定或连接,如
RequestSocket
默认为连接,而
ResponseSocket
则为绑定。

创建连接

首先对地址进行解析,判断当前是tcp还是其他协议。然后会根据协议类型创建对应的Socket,具体的协议类型分析请查看《消息队列NetMQ 原理分析6-TCP和Inpoc实现》

private static void DecodeAddress([NotNull] string addr, out string address, out string protocol)
{
const string protocolDelimeter = "://";
int protocolDelimeterIndex = addr.IndexOf(protocolDelimeter, StringComparison.Ordinal);

protocol = addr.Substring(0, protocolDelimeterIndex);
address = addr.Substring(protocolDelimeterIndex + protocolDelimeter.Length);
}


负载均衡选择一个IO线程。

创建
Session
,
Socket
Session
的关系如图所示



创建管道,创建管道会创建一对单向管道,形成“一个”双向管道。头尾分别连接
Socket
Session
,如上图所示。创建管道完毕后需要设置管道的回调事件,管道1设置回调为
Socket
的回调方法,管道2设置为
Session
的回调方法。


具体关于
Session
Pipe
的内容请查看《消息队列NetMQ 原理分析4-Session、Option和Pipe》。



处理
Socket
Session
的关系

protected void LaunchChild([NotNull] Own obj)
{
// Specify the owner of the object.
obj.SetOwner(this);
// Plug the object into the I/O thread.
SendPlug(obj);
// Take ownership of the object.
SendOwn(this, obj);
}


Session
的宿主设置为该
Socket


private void SetOwner([NotNull] Own owner)
{
Debug.Assert(m_owner == null);
m_owner = owner;
}


为IO对象设置
Session
,当管道有数据交互时,
Session
的回调方法就会触发。

protected void SendPlug([NotNull] Own destination, bool incSeqnum = true)
{
if (incSeqnum)
destination.IncSeqnum();
SendCommand(new Command(destination, CommandType.Plug));
}

SessionBase
ProcessPlug
会被触发

protected override void ProcessPlug()
{
m_ioObject.SetHandler(this);
if (m_connect)
StartConnecting(false);
}


将当前
Session
加入到
Socket
Session
集合中,

protected void SendOwn([NotNull] Own destination, [NotNull] Own obj)
{
destination.IncSeqnum();
SendCommand(new Command(destination, CommandType.Own, obj));
}

SocketBase
的父类方法
SendOwn
(Own方法)方法会被触发,将
Session
加入到集合中

protected override void ProcessOwn(Own obj)
{
...
// Store the reference to the owned object.
m_owned.Add(obj);
}

创建绑定

首先对地址进行解析,判断当前是tcp还是其他协议。然后会根据协议类型创建对应的
Socket
,具体的协议类型分析请查看《消息队列NetMQ 原理分析6-TCP和Inpoc实现》

private static void DecodeAddress([NotNull] string addr, out string address, out string protocol)
{
const string protocolDelimeter = "://";
int protocolDelimeterIndex = addr.IndexOf(protocolDelimeter, StringComparison.Ordinal);

protocol = addr.Substring(0, protocolDelimeterIndex);
address = addr.Substring(protocolDelimeterIndex + protocolDelimeter.Length);
}


负载均衡选择一个IO线程。

处理
Socket
Session
的关系

protected void LaunchChild([NotNull] Own obj)
{
// Specify the owner of the object.
obj.SetOwner(this);
// Plug the object into the I/O thread.
SendPlug(obj);
// Take ownership of the object.
SendOwn(this, obj);
}


Listener
的宿主设置为该
Socket


private void SetOwner([NotNull] Own owner)
{
Debug.Assert(m_owner == null);
m_owner = owner;
}


为IO对象设置
Listener
,当管道有数据交互是,
Listener
的回调方法就会触发。

protected void SendPlug([NotNull] Own destination, bool incSeqnum = true)
{
if (incSeqnum)
destination.IncSeqnum();
SendCommand(new Command(destination, CommandType.Plug));
}

Listener
ProcessPlug
会被触发

protected override void ProcessPlug()
{
m_ioObject.SetHandler(this);
m_ioObject.AddSocket(m_handle);
//接收异步socket
Accept();
}


将当前
Listener
加入到
Socket
Listener
集合中,

protected void SendOwn([NotNull] Own destination, [NotNull] Own obj)
{
destination.IncSeqnum();
SendCommand(new Command(destination, CommandType.Own, obj));
}

SocketBase
的父类方法
SendOwn
(Own方法)方法会被触发,将
Listener
加入到集合中

protected override void ProcessOwn(Own obj)
{
...
// Store the reference to the owned object.
m_owned.Add(obj);
}


SocketBase
的创建处理就完成了



回收线程

(垃圾)回收线程是专门处理(清理)异步关闭的
Socket
的线程,它在NetMQ中起到至关重要的作用。

internal class Reaper : ZObject, IPollEvents
{
...
}

Reaper
是一个ZObject对象,同时实现了
IPollEvents
接口,该接口的作用是当有信息接收或发送时进行处理。回收线程实现了
InEvent
方法。

internal interface IPollEvents : ITimerEvent
{
void InEvent();
void OutEvent();
}

InEvent
方法实现和IO线程的
Ready
方法很像,都是遍历需要处理的命令进行处理。

public void InEvent()
{
while (true)
{
Command command;
if (!m_mailbox.TryRecv(0, out command))
break;
command.Destination.ProcessCommand(command);
}
}

初始化回收线程

public Reaper([NotNull] Ctx ctx, int threadId)
: base(ctx, threadId)
{
m_sockets = 0;
m_terminating = false;

string name = "reaper-" + threadId;
m_poller = new Utils.Poller(name);

m_mailbox = new Mailbox(name);

m_mailboxHandle = m_mailbox.Handle;
m_poller.AddHandle(m_mailboxHandle, this);
m_poller.SetPollIn(m_mailboxHandle);
}


初始化回收线程是会创建一个
Poller
对象,用于轮询回收
SocketBase


初始化回收线程会创建一个
Mailbox
对象用于
Command
的收发

MailBox

internal class Mailbox : IMailbox{
...
}

MailBox
IO线程
IOThreadMailbox
一样,实现了
IMailbox
接口。

释放SocketBase

当有
SocketBase
需要释放时,会向完成端口发送
Reap
信号。

public void Close()
{
// Mark the socket as disposed
m_disposed = true;
//工作线程向Socket邮箱发送Reap信号
//回收线程会做剩下的工作
SendReap(this);
}

发送回收命令

向回收线程的邮箱发送当前
SocketBase
的回收命令

protected void SendReap([NotNull] SocketBase socket)
{
SendCommand(new Command(m_ctx.GetReaper(), CommandType.Reap, socket));
}

处理回收命令

Reap
接收到释放信号进行处理

protected override void ProcessReap(SocketBase socket)
{
// Add the socket to the poller.
socket.StartReaping(m_poller);
++m_sockets;
}

SocketBase回收

将当前
Socket
的加入到回收线程的中,当
Socket
接收到数据时,由回收线程回调该Socket的处理事件进行处理。

当前Socket终止处理

最后确认释放

internal void StartReaping([NotNull] Poller poller)
{
m_poller = poller;
m_handle = m_mailbox.Handle;
m_poller.AddHandle(m_handle, this);
m_poller.SetPollIn(m_handle);
Terminate();
CheckDestroy();
}


终止处理

终止
Socket
时,直接终止即可


默认情况下
NetMQ
Linger
值被设置为-1,就是说如果网络读写没有进行完是不能退出的。如果
Linger
被设置为0,那么中断时会丢弃一切未完成的网络操作。如果
Linger
被设置的大于0,那么将等待
Linger
毫秒用来完成未完成的网络读写,在指定的时间里完成或者超时都会立即返回。



若终止的是
Session
,则需要发送请求清理关联Socket的当前
Session
对象

protected void Terminate()
{
...
if (m_owner == null)
{
// 释放的是Socket,Owner为空
ProcessTerm(m_options.Linger);
}
else
{
// 释放的是Session则会关联一个Socket
SendTermReq(m_owner, this);
}
}

终止SocketBase

终止
SocketBase
时,需要先中断当前
SocketBase
关联的
SessionBase


然后增加需要终端请求响应的个数,当全部都响应了则处理第四步骤

清空当前关联的
Session
集合

最后当
Session
全部终止后发送给当前
Socket
宿主终端响应(TermAck)

protected override void ProcessTerm(int linger)
{
...
// 断开所有session的连接
foreach (Own it in m_owned)
{
SendTerm(it, linger);
}
RegisterTermAcks(m_owned.Count);
m_owned.Clear();
CheckTermAcks();
}

终止当前Socket关联的Session

如果终端管道命令在终止命令前处理了,则立即终止当前
Session


标记当前准备终止

Ligner
大于0 则等到N毫秒后再终止终止
Socket
Session
之间的管道

检查管道是否还有数据要读取

protected override void ProcessTerm(int linger)
{
if (m_pipe == null)
{
ProceedWithTerm();
return;
}

m_pending = true;

if (linger > 0)
{
Debug.Assert(!m_hasLingerTimer);
m_ioObject.AddTimer(linger, LingerTimerId);
m_hasLingerTimer = true;
}
// 是否需要等待一定时间后消息处理完再终止管道.
m_pipe.Terminate(linger != 0);

// TODO: Should this go into pipe_t::terminate ?
// In case there's no engine and there's only delimiter in the
// pipe it wouldn't be ever read. Thus we check for it explicitly.
m_pipe.CheckRead();
}

终止管道
管道状态如下所示

private enum State
{
/// <summary> Active 表示在中断命令开始前的状态 </summary>
Active,
/// <summary> Delimited 表示在终端命令接收前从管道接收到分隔符</summary>
Delimited,
/// <summary> Pending 表示中断命令已经从管道接收,但是仍有待定消息可读</summary>
Pending,
/// <summary> Terminating 表示所有待定消息都已经读取等待管道终止确认信号返回 </summary>
Terminating,
/// <summary> Terminated 表示终止命令是由用户显示调用 </summary>
Terminated,
/// <summary> Double_terminated 表示用户调用了终止命令同时管道也调用了终止命令 </summary>
DoubleTerminated
}


终止当前管道

若当前状态为
Terminated
DoubleTerminated
Terminating
不再处理终止命令

public void Terminate(bool delay)
{
//判断当前状态是否可处理终止命令
...

if (m_state == State.Active)
{
// 向另一个管道发送终止命令然后等待确认终止
SendPipeTerm(m_peer);
m_state = State.Terminated;
}
else if (m_state == State.Pending && !m_delay)
{
// 若有待处理数据,但是不等待直接终止,则向另一个管道发送确认终止.
m_outboundPipe = null;
SendPipeTermAck(m_peer);
m_state = State.Terminating;
}
else if (m_state == State.Pending)
{
//若有待处理数据但是需要等到则不处理.
}
else if (m_state == State.Delimited)
{
//若已经获取到限定符但是还没有收到终止命令则忽略定界符,然后发送终止命令给另一个管道
SendPipeTerm(m_peer);
m_state = State.Terminated;
}
else
{
// 没有其他状态
Debug.Assert(false);
}
//停止向外发送的消息
m_outActive = false;

if (m_outboundPipe != null)
{
//抛弃未发送出的消息.
Rollback();

// 这里不会再先查水位,所以即使管道满了也可再写入,向管道写入定界符 .
var msg = new Msg();
msg.InitDelimiter();
m_outboundPipe.Write(ref msg, false);
Flush();
}
}


终止另一个管道

protected override void ProcessPipeTerm()
{
// 这是一个简单的例子有道管道终止
//若没有更多待处理消息需要读取,或者这个管道已经丢去待处理数据,我们直接将状态设置为正在终止(terminating),否则我们搁置待处理状态直到所有待处理消息被发送
if (m_state == State.Active)
{
if (!m_delay)
{
//不需要等到消息处理
m_state = State.Terminating;
m_outboundPipe = null;
//发送终止确认
SendPipeTermAck(m_peer);
}
else
m_state = State.Pending;
return;
}
// 若定界符碰巧在终止命令之前到达,将状态改为正在终止
if (m_state == State.Delimited)
{
m_state = State.Terminating;
m_outboundPipe = null;
SendPipeTermAck(m_peer);
return;
}
// 当管道并发关闭,则状态改为DoubleTerminated
if (m_state == State.Terminated)
{
m_state = State.DoubleTerminated;
m_outboundPipe = null;
SendPipeTermAck(m_peer);
return;
}
// pipe_term is invalid in other states.
Debug.Assert(false);
}


确认终止

protected override void ProcessPipeTermAck()
{
// 通知Socket或Session中断当前管道 .
Debug.Assert(m_sink != null);
m_sink.Terminated(this);

// 若正则处理或double_terminated这里不做任何事
// 简化释放管道,在已终止状态,我们必须在释放这个管道之前确认
//其他状态都是非法的
if (m_state == State.Terminated)
{
m_outboundPipe = null;
SendPipeTermAck(m_peer);
}
else
Debug.Assert(m_state == State.Terminating || m_state == State.DoubleTerminated);

// 删除所有管道中的未读消息,然后释放流入管道
var msg = new Msg();
while (m_inboundPipe.TryRead(out msg))
{
msg.Close();
}

m_inboundPipe = null;
}

整体回收
Socket
流程图如下:



public virtual void InEvent()
{
// 回收线程命令会调用此事件
try
{
ProcessCommands(0, false);
}
catch
{
// ignored
}
finally
{
CheckDestroy();
}
}

private void CheckDestroy()
{
// socket释放完则做最后的清除和释放工作.
if (m_destroyed)
{
// 从回收线程移除轮询
m_poller.RemoveHandle(m_handle);
// 释放socke.
DestroySocket(this);
// 通知已释放.
SendReaped();
// Deallocate.
base.ProcessDestroy();
}
}

总结

该篇介绍命令处理方式和回收线程回收
Socket
,顺便介绍了下创建
SocketBase
的细节性问题。以便对释放
Socket
有更清晰的认识。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐