消息队列NetMQ 原理分析4-Socket、Session、Option和Pipe
2017-07-04 17:57
531 查看
消息队列NetMQ 原理分析4-Socket、Session、Option和Pipe
前言
介绍
目的
Socket
接口实现
内部结构
Session
Option
Pipe
YPipe
Msg
YQueue
总结
zeromq的英文文档
NetMQ的英文文档
消息队列NetMQ 原理分析1-Context和ZObject
消息队列NetMQ 原理分析2-IO线程和完成端口
消息队列NetMQ 原理分析3-命令产生/处理、创建Socket和回收线程
消息队列NetMQ 原理分析4-Socket、Session、Option和Pipe
消息队列NetMQ 原理分析5-Engine,Encord和Decord
消息队列NetMQ 原理分析6-TCP和Inpoc实现
消息队列NetMQ 原理分析7-Device
消息队列NetMQ 原理分析8-不同类型的Socket
消息队列NetMQ 原理分析9-实战
首先
当
在
在第一章创建SocketBase我们介绍了
每个
在
具体创建
当
Affinity
表示哪个线程是可用的,默认为0,表示所有线程在负载均衡都可使用。
Backlog
最大
DelayAttachOnConnect
在创建连接时,延迟在
DelayOnClose
若为
否则直接关闭,默认为
DelayOnDisconnect
若为
否则直接中断管道。默认为
Endianness
字节序,数据在内存中是高到低排还是低到高排。
Identity
响应的
IdentitySize
1个字节用于保存Identity的长度。
IPv4Only
Linger
当Socket关闭时,是否延迟一段时间等待数据发送完毕后再关闭管道
MaxMessageSize
每个消息包最大消息大小
RawSocket
若设置为true,
默认是false,
RecvIdentity
若为true,
ReconnectIvl
设置最小重连时间间隔,单位ms。默认100ms
ReconnectIvlMax
设置最大重连时间间隔,单位ms。默认0(无用)
RecoveryIvl
SendBuffer
发送缓存大小,设置底层传输
ReceiveBuffer
接收缓存大小,设置底层传输
SendHighWatermark
ReceiveHighWatermark
SendLowWatermark
ReceiveLowWatermark
SendTimeout
TcpKeepalive
TCP保持连接设置,默认-1不修改配置
TcpKeepaliveIdle
TCP心跳包在空闲时的时间间隔,默认-1不修改配置
TcpKeepaliveIntvl
TCP心跳包时间间隔,默认-1不修改配置
DisableTimeWait
客户端断开连接时禁用
TryRead
该方法用于判断当前队列是否可读,可读的话第一个对象出队
Unwrite
取消写入消息
写入消息
将消息写入到队列中,若写入未完成则当前消息的指针索引指向当前队列块的后一位。
完成写入
当该部分消息写完时,则会调用Flush完成写入并通知另一个管道消息可读
每个
由于每次向
每次消息写完消息时调用
前言
介绍
目的
Socket
接口实现
内部结构
Session
Option
Pipe
YPipe
Msg
YQueue
总结
前言
介绍
[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是对标准socket接口的扩展。它提供了一种异步消息队列,多消息模式,消息过滤(订阅),对多种传输协议的无缝访问。 当前有2个版本正在维护,版本3最新版为3.3.4,版本4最新版本为4.0.1。本文档是对4.0.1分支代码进行分析。
zeromq的英文文档
NetMQ的英文文档
目的
对NetMQ的源码进行学习并分析理解,因此写下该系列文章,本系列文章暂定编写计划如下:消息队列NetMQ 原理分析1-Context和ZObject
消息队列NetMQ 原理分析2-IO线程和完成端口
消息队列NetMQ 原理分析3-命令产生/处理、创建Socket和回收线程
消息队列NetMQ 原理分析4-Socket、Session、Option和Pipe
消息队列NetMQ 原理分析5-Engine,Encord和Decord
消息队列NetMQ 原理分析6-TCP和Inpoc实现
消息队列NetMQ 原理分析7-Device
消息队列NetMQ 原理分析8-不同类型的Socket
消息队列NetMQ 原理分析9-实战
Socket
上一章最后我们简单介绍了SocketBase和
SessionBase的创建和回收,这一张我们详细介绍
SocketBase和
SessionBase。
首先
SocketBase继承自
Own,即也是
ZObject对象,同时由于
SocketBase需要进行消息的传输,因此它实现了一些结构,包括
IPollEvents、
Pipe.IPipeEvents。
接口实现
internal abstract class SocketBase : Own, IPollEvents, Pipe.IPipeEvents{ ... }
IPollEvents事件上一章回收线程已经介绍过,这里不再做过多说明了,简单讲
SocketBase实现该事件只有在回收线程回收
Socket的时候会触发。
Pipe.IPipeEvents:是管道事件,它的签名如下
public interface IPipeEvents { void ReadActivated([NotNull] Pipe pipe); void WriteActivated([NotNull] Pipe pipe); void Hiccuped([NotNull] Pipe pipe); void Terminated([NotNull] Pipe pipe); }
ReadActivated:表示管道可读,管道实际调用
SocketBase或
SessionBase的
ReadActivated方法,而
SocketBase实际会调用
XReadActivated方法。
WriteActivated:表示管道可写,管道实际调用
SocketBase或
SessionBase的
WriteActivated方法,而
SocketBase实际会调用
XWriteActivated方法。
Hiccuped:当连接突然中断时会调用此方法。
WriteActivated:表示管道终止。
内部结构
SocketBase的内部维护着一个字段,用于存放连接/绑定地址和它的管道(若当前
SocketBase是
TCPListener,则无需初始化管道,管道为空)。
private readonly Dictionary<string, Endpoint> m_endpoints = new Dictionary<string, Endpoint>(); private readonly Dictionary<string, Pipe> m_inprocs = new Dictionary<string, Pipe>();
Endpoint对象用于存放
SessionBase和
Pipe或
Listener的引用
private class Endpoint { public Endpoint(Own own, Pipe pipe) { Own = own; Pipe = pipe; } public Own Own { get; } public Pipe Pipe { get; } }
当
SocketBase连接或绑定最后会向将
Endpoint保存到字典中
private void AddEndpoint([NotNull] string address, [NotNull] Own endpoint, Pipe pipe) { LaunchChild(endpoint); m_endpoints[address] = new Endpoint(endpoint, pipe); }
在
SocketBase断开连接时会移除它
public void TermEndpoint([NotNull] string addr) { ... if (protocol == Address.InProcProtocol) { ... m_inprocs.Remove(addr); } else { ... m_endpoints.Remove(addr); } }
m_inprocs也是一个字典用于存放
inproc协议的连接。
在第一章创建SocketBase我们介绍了
Context创建
SocketBase所做的一些工作,初始化
SocketBase时,会创建MailBox,用于传输
Command。
protected SocketBase([NotNull] Ctx parent, int threadId, int socketId) : base(parent, threadId) { m_options.SocketId = socketId; m_mailbox = new Mailbox("socket-" + socketId); }
每个
SocketBase的命令处理实际都是在工作线程中进行。因此理论上(忽略线程上下文切换时造成的性能损失)线程数越多,
NetMQ的IO吞吐量和工作线程数成正比关系。
在
Context创建
SocketBase会根据
Create静态方法根据不同类型创建不同的
SocketBase
public static SocketBase Create(ZmqSocketType type, [NotNull] Ctx parent, int threadId, int socketId) { switch (type) { case ZmqSocketType.Pair: return new Pair(parent, threadId, socketId); case ZmqSocketType.Pub: return new Pub(parent, threadId, socketId); case ZmqSocketType.Sub: return new Sub(parent, threadId, socketId); case ZmqSocketType.Req: return new Req(parent, threadId, socketId); case ZmqSocketType.Rep: return new Rep(parent, threadId, socketId); case ZmqSocketType.Dealer: return new Dealer(parent, threadId, socketId); case ZmqSocketType.Router: return new Router(parent, threadId, socketId); case ZmqSocketType.Pull: return new Pull(parent, threadId, socketId); case ZmqSocketType.Push: return new Push(parent, threadId, socketId); case ZmqSocketType.Xpub: return new XPub(parent, threadId, socketId); case ZmqSocketType.Xsub: return new XSub(parent, threadId, socketId); case ZmqSocketType.Stream: return new Stream(parent, threadId, socketId); default: throw new InvalidException("SocketBase.Create called with invalid type of " + type); } }
具体创建
SocketBase的工作在上一章已经做了详细的介绍,这里不再复述。
Session
首先和SocketBase一样,
SessionBase也继承自
Own,即也是
ZObject对象,同时由于
SessionBase和
SocketBase存在消息传输,所以它也实现了
IPipeEvents接口,同时它实现了
IProactorEvents接口,在消息收发是会接收到通知。
SessionBase一端和
SocketBase进行消息的通讯,另一端和
Engine存在消息通讯,它实现了
IMsgSink和
IMsgSource接口和
Engine进行消息传输。
internal class SessionBase : Own, Pipe.IPipeEvents, IProactorEvents, IMsgSink, IMsgSource{ }
internal interface IMsgSink { /// <summary> /// 传输消息.成功时返回true. /// </summary> /// <param name="msg">将msg消息写入到管道中</param> bool PushMsg(ref Msg msg); }
internal interface IMsgSource { /// <summary> /// 取一个消息。成功时返回,从管道获取消息写入msg参数中;若失败则返回false,将null写入到msg参数中。 /// </summary> /// <param name="msg">从管道获取消息写入Msg中</param> /// <returns>true if successful - and writes the message to the msg argument</returns> bool PullMsg(ref Msg msg); }
当
SocketBase将消息写入到写管道时,对应的
SessionBase会从读管道读到
SocketBase写入的数据,然后将数据从管道取出生成一个
Msg,
Engine会和
AsyncSocket交互传输数据,关于
Engine下一章再做介绍。
Option
option参数如下
Affinity
表示哪个线程是可用的,默认为0,表示所有线程在负载均衡都可使用。
Backlog
最大
Socket待连接数
DelayAttachOnConnect
在创建连接时,延迟在
Socket和
Session之间创建双向的管道,默认创建连接时立即创建管道
DelayOnClose
若为
true,则在
Socket关闭时
Session先从管道接收所有消息发送出去。
否则直接关闭,默认为
true。
DelayOnDisconnect
若为
true,则在
Pipe通知我们中断时
Socket先将接收所有入队管道消息。
否则直接中断管道。默认为
true.
Endianness
字节序,数据在内存中是高到低排还是低到高排。
Identity
响应的
Identity,每个
Identity用于查找
Socket。
Identiy是一个重复的随机32位整形数字,转换为字节5位字节数组。每个消息的第一部分是
Identity,
IdentitySize
1个字节用于保存Identity的长度。
IPv4Only
Linger
当Socket关闭时,是否延迟一段时间等待数据发送完毕后再关闭管道
MaxMessageSize
每个消息包最大消息大小
RawSocket
若设置为true,
RouterSocket可以接收非
NetMQ发送来的
tcp连接。
默认是false,
Stream在构造函数时会设置为
true,设置为
true时会将
RecvIdentity修改为
false(用
NetMQ接收其他系统发送来的
Socket请求应该用
StreamSocekt,否则由于应用层协议不一样可能会导致一些问题。)
RecvIdentity
若为true,
Identity转发给
Socket。
ReconnectIvl
设置最小重连时间间隔,单位ms。默认100ms
ReconnectIvlMax
设置最大重连时间间隔,单位ms。默认0(无用)
RecoveryIvl
PgmSocket用的
SendBuffer
发送缓存大小,设置底层传输
Socket的发送缓存大小,初始为0
ReceiveBuffer
接收缓存大小,设置底层传输
Socket的接收缓存大小,初始为0
SendHighWatermark
Socket发送的管道的最大消息数,当发送水位达到最大时会阻塞发送。
ReceiveHighWatermark
Socket接收管道的最大消息数
SendLowWatermark
Socket发送低水位,消息的最小数量单位,每次达到多少消息数量才向Session管道才激活写事件。默认1000
ReceiveLowWatermark
Socket接收低水位,消息的最小数量单位,每次达到多少消息数量
Session管道才激活读事件。默认1000
SendTimeout
Socket发送操作超时时间
TcpKeepalive
TCP保持连接设置,默认-1不修改配置
TcpKeepaliveIdle
TCP心跳包在空闲时的时间间隔,默认-1不修改配置
TcpKeepaliveIntvl
TCP心跳包时间间隔,默认-1不修改配置
DisableTimeWait
客户端断开连接时禁用
TIME_WAITTCP状态
Pipe
在上一章我们讲到过在SocketBase和
SessionBase是通过2条单向管道进行消息传输,传输的消息单位是
Msg,消息管道是
YPipe<Msg>类型,那么
YPipe<>又是什么呢?
YPipe
Ypipe内部实际维护这一个
YQueue类型的先进先出队列,
YPipe向外暴露了一下方法:
TryRead
该方法用于判断当前队列是否可读,可读的话第一个对象出队
public bool TryRead(out T value) { if (!CheckRead()) { value = default(T); return false; } value = m_queue.Pop(); return true; }
Unwrite
取消写入消息
public bool Unwrite(ref T value) { if (m_flushToIndex == m_queue.BackPos) return false; value = m_queue.Unpush(); return true; }
写入消息
将消息写入到队列中,若写入未完成则当前消息的指针索引指向当前队列块的后一位。
public void Write(ref T value, bool incomplete) { m_queue.Push(ref value); // Move the "flush up to here" pointer. if (!incomplete) { m_flushToIndex = m_queue.BackPos; } }
完成写入
当该部分消息写完时,则会调用Flush完成写入并通知另一个管道消息可读
public void Flush() { if (m_state == State.Terminating) return; if (m_outboundPipe != null && !m_outboundPipe.Flush()) SendActivateRead(m_peer); }
Msg
写入的消息单位是Msg,它实现了多条数据的存储,当每次数据写完还有数据带写入时通过将Flag标记为
More表示消息还没写入完。
YQueue
YQueue是由一个个
trunk组成的,每个
trunk就是一个消息块,每个消息块可能包含多个
Msg,主要由写入消息时是否还有更多消息带写入(
Flag)决定。
trunk是一个双向循环链表,内部维护着一个数组用于存放数据,每个数据会有2个指针,分别指向前一个块和后一个块,每个块还有一个索引,表示当前块在队列中的位置。
private sealed class Chunk { public Chunk(int size, int globalIndex) { Values = new T[size]; GlobalOffset = globalIndex; Debug.Assert(Values != null); } /// <summary>数据</summary> public T[] Values { get; } /// <summary>当前块在队列中的位置</summary> public int GlobalOffset { get; } /// <summary>前一个块</summary> [CanBeNull] public Chunk Previous { get; set; } /// <summary>下一个块</summary> [CanBeNull] public Chunk Next { get; set; } }
每个
chunk默认最多可保存256个部分。
由于每次向
SocketBase写入的
Msg可能有多个部分,因此消息会写入到数组中,所有消息写完后指向
trunk的指针才会后移一位。
YQueue有以下字段
//用于记录当前块消息的个数,默认为256 private readonly int m_chunkSize; // 当队列是空的时,下一个块指向null,首尾块都指向初始化的一个块,开始位置的块仅用于队列的读取(front/pop),最后位置的仅用于队列的写入(back/push)。 // 开始位置 private volatile Chunk m_beginChunk; //chunk的当前可读位置索引 private int m_beginPositionInChunk; //指向后一个块 private Chunk m_backChunk; //chunk的最后一个可读位置索引 private int m_backPositionInChunk; //指向后一个块 private Chunk m_endChunk; //chunk的下一个可写位置索引 private int m_endPosition; //当达到最大Msg数量时,扩展一个chunk,最大为256个块 private Chunk m_spareChunk; 当前trunk头部在整个队列中的的索引位置 private int m_nextGlobalIndex;
YPipe写入
Msg实际是向
YQueue入队
public void Push(ref T val) { m_backChunk.Values[m_backPositionInChunk] = val; //指向后一个块 m_backChunk = m_endChunk; //索引更新到最后可读位置 m_backPositionInChunk = m_endPosition; //下一个可写位置向后移动一位 m_endPosition++; if (m_endPosition != m_chunkSize) return; //到达最后一个位置则需要扩充一个块 Chunk sc = m_spareChunk; if (sc != m_beginChunk) { //已经扩充了块则更新下一个块的位置 m_spareChunk = m_spareChunk.Next; m_endChunk.Next = sc; sc.Previous = m_endChunk; } else { //新建一个块,并更新索引位置 m_endChunk.Next = new Chunk(m_chunkSize, m_nextGlobalIndex); m_nextGlobalIndex += m_chunkSize; m_endChunk.Next.Previous = m_endChunk; } m_endChunk = m_endChunk.Next; 当前块的局部位置从0开始 m_endPosition = 0; }
每次消息写完消息时调用
YPipe的
Flush方法完成当前消息的写入
public bool Flush() { //只有一条Msg if (m_flushFromIndex == m_flushToIndex) { return true; } //将m_lastAllowedToReadIndex更新为flushToIndex if (Interlocked.CompareExchange(ref m_lastAllowedToReadIndex, m_flushToIndex, m_flushFromIndex) != m_flushFromIndex) { //没有数据写入时,lastAllowedToReadIndex为-1,表示没有数据可读,因此这里不需要关系线程安全 Interlocked.Exchange(ref m_lastAllowedToReadIndex, m_flushToIndex); m_flushFromIndex = m_flushToIndex; return false; } 有数据写入时更新指针 m_flushFromIndex = m_flushToIndex; return true; }
总结
该篇在上一片的基础上对SocketBase和
SessionBase进行了一些细节上的补充。同时,对
NetMQ的配置参数进行了一些介绍,最后对消息管道进行了简单讲解。
相关文章推荐
- 消息队列NetMQ 原理分析2-IO线程和完成端口
- 消息队列NetMQ 原理分析3-命令产生/处理和回收线程
- 消息队列NetMQ 原理分析1-Context和ZObject
- 消息队列NetMQ 原理分析5-StreamEngine、Encord和Decord
- Socket、Session、Option和Pipe
- Session原理分析
- Session原理分析
- IPC消息队列使用详细分析
- IPC消息队列使用详细分析
- 链式队列的c++代码和原理以及编译分析
- 从汇编分析Win32消息原理
- 从汇编分析Win32消息原理
- 简要分析Ogre渲染队列的实现原理(一)
- 消息队列技术的介绍和原理(MQ)
- AsyncWeb原理分析(八)——session的实现
- 内核等待队列机制原理分析
- 【简单Web服务器搭建】基于Socket实现的最简单的Web服务器【ASP.NET原理分析】
- 【本文已迁移到“程序员文摘” http://programmerdigest.cn/category/lajp】Linux下消息队列和socket绝对速度比拼
- win32API+C分析Windows消息运行原理
- 简要分析Ogre渲染队列的实现原理(一)