【Orleans开胃菜系列2】连接Connect源码简易分析
2018-08-08 14:19
549 查看
code[class*="language-"],pre[class*="language-"] { color: #333; background: none; font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace; text-align: left; white-space: pre; word-spacing: normal; line-height: 1.4 }
pre[class*="language-"] { padding: .8em; overflow: auto; background: #f5f5f5 }
:not(pre)>code[class*="language-"] { padding: .1em; white-space: normal; background: #f5f5f5 }
.token.comment,.token.blockquote { color: #969896 }
.token.cdata { color: #183691 }
.token.doctype,.token.punctuation,.token.variable,.token.macro.property { color: #333 }
.token.operator,.token.important,.token.keyword,.token.rule,.token.builtin { color: #a71d5d }
.token.string,.token.url,.token.regex,.token.attr-value { color: #183691 }
.token.property,.token.number,.token.boolean,.token.entity,.token.atrule,.token.constant,.token.symbol,.token.command,.token.code { color: #0086b3 }
.token.tag,.token.selector,.token.prolog { color: #63a35c }
.token.function,.token.namespace,.token.pseudo-element,.token.class,.token.class-name,.token.pseudo-class,.token.id,.token.url-reference .token.variable,.token.attr-name { color: #795da3 }
.token.entity { cursor: help }
.token.title,.token.title .token.punctuation { font-weight: bold; color: #1d3e81 }
.token.list { color: #ed6a43 }
.token.inserted { background-color: #eaffea; color: #55a532 }
.token.deleted { background-color: #ffecec; color: #bd2c00 }
.token.bold { font-weight: bold }
.token.italic { font-style: italic }
.language-json .token.property { color: #183691 }
.language-markup .token.tag .token.punctuation { color: #333 }
code.language-css,.language-css .token.function { color: #0086b3 }
.language-yaml .token.atrule { color: #63a35c }
code.language-yaml { color: #183691 }
.language-ruby .token.function { color: #333 }
.language-markdown .token.url { color: #795da3 }
.language-makefile .token.symbol { color: #795da3 }
.language-makefile .token.variable { color: #183691 }
.language-makefile .token.builtin { color: #0086b3 }
.language-bash .token.keyword { color: #0086b3 }
html body { font-family: "Helvetica Neue", Helvetica, "Segoe UI", Arial, freesans, sans-serif; font-size: 16px; line-height: 1.6; color: #333; background-color: #fff; overflow: initial }
html body>:first-child { margin-top: 0 }
html body h1,html body h2,html body h3,html body h4,html body h5,html body h6 { line-height: 1.2; margin-top: 1em; margin-bottom: 16px; color: #000 }
html body h1 { font-size: 2.25em; font-weight: 300; padding-bottom: .3em }
html body h2 { font-size: 1.75em; font-weight: 400; padding-bottom: .3em }
html body h3 { font-size: 1.5em; font-weight: 500 }
html body h4 { font-size: 1.25em; font-weight: 600 }
html body h5 { font-size: 1.1em; font-weight: 600 }
html body h6 { font-size: 1em; font-weight: 600 }
html body h1,html body h2,html body h3,html body h4,html body h5 { font-weight: 600 }
html body h5 { font-size: 1em }
html body h6 { color: #5c5c5c }
html body strong { color: #000 }
html body del { color: #5c5c5c }
html body a:not([href]) { color: inherit; text-decoration: none }
html body a { color: #08c; text-decoration: none }
html body a:hover { color: #00a3f5; text-decoration: none }
html body img { max-width: 100% }
html body>p { margin-top: 0; margin-bottom: 16px }
html body>ul,html body>ol { margin-bottom: 16px }
html body ul,html body ol { padding-left: 2em }
html body ul.no-list,html body ol.no-list { padding: 0; list-style-type: none }
html body ul ul,html body ul ol,html body ol ol,html body ol ul { margin-top: 0; margin-bottom: 0 }
html body li { margin-bottom: 0 }
html body li.task-list-item { list-style: none }
html body li>p { margin-top: 0; margin-bottom: 0 }
html body .task-list-item-checkbox { margin: 0 .2em .25em -1.8em; vertical-align: middle }
html body .task-list-item-checkbox:hover { cursor: pointer }
html body blockquote { margin: 16px 0; font-size: inherit; padding: 0 15px; color: #5c5c5c; border-left: 4px solid #d6d6d6 }
html body blockquote>:first-child { margin-top: 0 }
html body blockquote>:last-child { margin-bottom: 0 }
html body hr { height: 4px; margin: 32px 0; background-color: #d6d6d6; border: 0 none }
html body table { margin: 10px 0 15px 0; border-collapse: collapse; border-spacing: 0; display: block; width: 100%; overflow: auto }
html body table th { font-weight: bold; color: #000 }
html body table td,html body table th { border: 1px solid #d6d6d6; padding: 6px 13px }
html body dl { padding: 0 }
html body dl dt { padding: 0; margin-top: 16px; font-size: 1em; font-style: italic; font-weight: bold }
html body dl dd { padding: 0 16px; margin-bottom: 16px }
html body code { font-family: Menlo, Monaco, Consolas, "Courier New", monospace; font-size: .85em !important; color: #000; background-color: #f0f0f0; padding: .2em 0 }
html body code::before,html body code::after { letter-spacing: -0.2em; content: " " }
html body pre>code { padding: 0; margin: 0; font-size: .85em !important; white-space: pre; background: transparent; border: 0 }
html body .highlight { margin-bottom: 16px }
html body .highlight pre,html body pre { padding: 1em; overflow: auto; font-size: .85em !important; line-height: 1.45; border: #d6d6d6 }
html body .highlight pre { margin-bottom: 0 }
html body pre code,html body pre tt { display: inline; max-width: initial; padding: 0; margin: 0; overflow: initial; line-height: inherit; background-color: transparent; border: 0 }
html body pre code::before,html body pre tt::before,html body pre code::after,html body pre tt::after { content: normal }
html body p,html body blockquote,html body ul,html body ol,html body dl,html body pre { margin-top: 0; margin-bottom: 16px }
html body kbd { color: #000; border: 1px solid #d6d6d6; border-bottom: 2px solid #c7c7c7; padding: 2px 4px; background-color: #f0f0f0 }
.markdown-preview { width: 100%; height: 100% }
.markdown-preview .pagebreak,.markdown-preview .newpage { page-break-before: always }
.markdown-preview pre.line-numbers { position: relative; padding-left: 3.8em; counter-reset: linenumber }
.markdown-preview pre.line-numbers>code { position: relative }
.markdown-preview pre.line-numbers .line-numbers-rows { position: absolute; top: 1em; font-size: 100%; left: 0; width: 3em; letter-spacing: -1px; border-right: 1px solid #999 }
.markdown-preview pre.line-numbers .line-numbers-rows>span { display: block; counter-increment: linenumber }
.markdown-preview pre.line-numbers .line-numbers-rows>span::before { content: counter(linenumber); color: #999; display: block; padding-right: .8em; text-align: right }
.markdown-preview .mathjax-exps .MathJax_Display { text-align: center !important }
.markdown-preview:not([for="preview"]) .code-chunk .btn-group { display: none }
.markdown-preview:not([for="preview"]) .code-chunk .status { display: none }
.markdown-preview:not([for="preview"]) .code-chunk .output-div { margin-bottom: 16px }
.scrollbar-style::-webkit-scrollbar { width: 8px }
.scrollbar-style::-webkit-scrollbar-track { background-color: transparent }
.scrollbar-style::-webkit-scrollbar-thumb { background-color: rgba(150,150,150,0.66); border: 4px solid rgba(150,150,150,0.66) }
html body[for="html-export"]:not([data-presentation-mode]) { position: relative; width: 100%; height: 100%; top: 0; left: 0; margin: 0; padding: 0; overflow: auto }
html body[for="html-export"]:not([data-presentation-mode]) .markdown-preview { position: relative; top: 0 }
html body[for="html-export"]:not([data-presentation-mode]) #sidebar-toc-btn { position: fixed; bottom: 8px; left: 8px; font-size: 28px; cursor: pointer; color: inherit; z-index: 99; width: 32px; text-align: center; opacity: .4 }
html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] #sidebar-toc-btn { opacity: 1 }
html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc { position: fixed; top: 0; left: 0; width: 300px; height: 100%; padding: 32px 0 48px 0; font-size: 14px; overflow: auto; background-color: inherit }
html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc::-webkit-scrollbar { width: 8px }
html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc::-webkit-scrollbar-track { background-color: transparent }
html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc::-webkit-scrollbar-thumb { background-color: rgba(150,150,150,0.66); border: 4px solid rgba(150,150,150,0.66) }
html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc a { text-decoration: none }
html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc ul { padding: 0 1.6em; margin-top: .8em }
html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc li { margin-bottom: .8em }
html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc ul { list-style-type: none }
html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .markdown-preview { left: 300px; width: calc(100% - 300px); padding: 2em calc(50% - 457px - 150px); margin: 0 }
html body[for="html-export"]:not([data-presentation-mode]):not([html-show-sidebar-toc]) .markdown-preview { left: 50% }
html body[for="html-export"]:not([data-presentation-mode]):not([html-show-sidebar-toc]) .md-sidebar-toc { display: none }
从方法看,只是一个简单允许重试的启动客户端。追踪进去会发现关于重试逻辑的实践,Socket编程的实践,基于内存的消息队列的实践,依赖注入。再看源码的基础上,最好能配合一些理论书籍来看。理论指导实践,实践反馈理论,才是技术成长的步骤。
这篇文章只涉及Connect所引用方法的部分说明,一步一步来加深理解。
本来我是打算把orleans研究透之后再来写一篇,但看了一周之后,发下connect里面调用了很多类,每个类又有很多方法,这样下去没有尽头,到最终估计什么也写不成。
分析源码本来就是循环渐进的过程,也是一个熟悉框架/原理/实践的过程。直接跳过这个步骤,必然损失良多。所以这部分就叫开胃菜吧。在查看connect过程,会越来越接触到各种知识。
本篇暂不涉及数据持久化,主要依赖.netcore内置方法操纵内存实现。
Timer
SemaphoreSlim
SemaphoreSlim 实现
BlockingCollection介绍
利用BlockingCollection实现生产者和消费者队列
Interlocked
SpinWait
两阶段提交
Monitor
Queue
Stack
TaskCompletionSource
基于Task的异步模式--全面介绍
System.Collections.Concurrent
ConcurrentDictionary
ConcurrentDictionary 对决 Dictionary+Locking
Socket微软官方文档
Socket博客园
AutoResetEvent
ManualResetEvent
ManualResetEventSlim
ActivatorUtilities
扩展.net-使用.netcore进行依赖注入
UseLocalhostClustering 用来配置连接参数:端口/ClusterId/ServiceId等。 配置一个连接本地silo的客户端,也有其他类型的如: UseServiceProviderFactory,UseStaticClustering
Build用来注册默认服务和构建容器,扩展了解依赖注入知识。微软自带Microsoft.Extensions.DependencyInjection库
这里的LockAsync,内部用了SemaphoreSlim.Wait需要扩展了解下。和lock的区别。信号量本地信号量和系统信号量。
这里用state来维护生命周期
transport用来维护客户端消息管理。
RunClientMessagePump用来处理接收分发消息。
WaitMessage里面利用了BlockingCollection.Take
这里面用ConcurrentDictionary<GuidId, LocalObjectData>来判断ObserverId是否存在,不存在移除。
如果存在,利用Queue
如果启动成功,异步调用LocalObjectMessagePumpAsync,然后利用Queue
transport.SendMessage
第一步先获取活动的网关(silo),如没有则建立GatewayConnection
第二步启动Connection
Connect--调用socket创建连接
Start--GatewayClientReceiver间接调用Socket来接收消息,
Orleans用于处理定时或延时回调作业。
创建一个简单的connect,里面有这么多沟沟渠渠,但本质上来说,最底层是利用Socket套接字机制来实施机制。在Socket的基础之上,又封装维护了一层GatewayConnection和GatewayClientReceiver来实现网关(Silo)的操作,比如重试/监控/熔断等,再结合Timer,Queue
如果您完全熟悉异步编程,并行编程,Socket网络编程。又对分布式/微服务理论有较深的理解,那么orleans实现机制,对您来说可能是相对容易。
本期结束,下期更精彩!
pre[class*="language-"] { padding: .8em; overflow: auto; background: #f5f5f5 }
:not(pre)>code[class*="language-"] { padding: .1em; white-space: normal; background: #f5f5f5 }
.token.comment,.token.blockquote { color: #969896 }
.token.cdata { color: #183691 }
.token.doctype,.token.punctuation,.token.variable,.token.macro.property { color: #333 }
.token.operator,.token.important,.token.keyword,.token.rule,.token.builtin { color: #a71d5d }
.token.string,.token.url,.token.regex,.token.attr-value { color: #183691 }
.token.property,.token.number,.token.boolean,.token.entity,.token.atrule,.token.constant,.token.symbol,.token.command,.token.code { color: #0086b3 }
.token.tag,.token.selector,.token.prolog { color: #63a35c }
.token.function,.token.namespace,.token.pseudo-element,.token.class,.token.class-name,.token.pseudo-class,.token.id,.token.url-reference .token.variable,.token.attr-name { color: #795da3 }
.token.entity { cursor: help }
.token.title,.token.title .token.punctuation { font-weight: bold; color: #1d3e81 }
.token.list { color: #ed6a43 }
.token.inserted { background-color: #eaffea; color: #55a532 }
.token.deleted { background-color: #ffecec; color: #bd2c00 }
.token.bold { font-weight: bold }
.token.italic { font-style: italic }
.language-json .token.property { color: #183691 }
.language-markup .token.tag .token.punctuation { color: #333 }
code.language-css,.language-css .token.function { color: #0086b3 }
.language-yaml .token.atrule { color: #63a35c }
code.language-yaml { color: #183691 }
.language-ruby .token.function { color: #333 }
.language-markdown .token.url { color: #795da3 }
.language-makefile .token.symbol { color: #795da3 }
.language-makefile .token.variable { color: #183691 }
.language-makefile .token.builtin { color: #0086b3 }
.language-bash .token.keyword { color: #0086b3 }
html body { font-family: "Helvetica Neue", Helvetica, "Segoe UI", Arial, freesans, sans-serif; font-size: 16px; line-height: 1.6; color: #333; background-color: #fff; overflow: initial }
html body>:first-child { margin-top: 0 }
html body h1,html body h2,html body h3,html body h4,html body h5,html body h6 { line-height: 1.2; margin-top: 1em; margin-bottom: 16px; color: #000 }
html body h1 { font-size: 2.25em; font-weight: 300; padding-bottom: .3em }
html body h2 { font-size: 1.75em; font-weight: 400; padding-bottom: .3em }
html body h3 { font-size: 1.5em; font-weight: 500 }
html body h4 { font-size: 1.25em; font-weight: 600 }
html body h5 { font-size: 1.1em; font-weight: 600 }
html body h6 { font-size: 1em; font-weight: 600 }
html body h1,html body h2,html body h3,html body h4,html body h5 { font-weight: 600 }
html body h5 { font-size: 1em }
html body h6 { color: #5c5c5c }
html body strong { color: #000 }
html body del { color: #5c5c5c }
html body a:not([href]) { color: inherit; text-decoration: none }
html body a { color: #08c; text-decoration: none }
html body a:hover { color: #00a3f5; text-decoration: none }
html body img { max-width: 100% }
html body>p { margin-top: 0; margin-bottom: 16px }
html body>ul,html body>ol { margin-bottom: 16px }
html body ul,html body ol { padding-left: 2em }
html body ul.no-list,html body ol.no-list { padding: 0; list-style-type: none }
html body ul ul,html body ul ol,html body ol ol,html body ol ul { margin-top: 0; margin-bottom: 0 }
html body li { margin-bottom: 0 }
html body li.task-list-item { list-style: none }
html body li>p { margin-top: 0; margin-bottom: 0 }
html body .task-list-item-checkbox { margin: 0 .2em .25em -1.8em; vertical-align: middle }
html body .task-list-item-checkbox:hover { cursor: pointer }
html body blockquote { margin: 16px 0; font-size: inherit; padding: 0 15px; color: #5c5c5c; border-left: 4px solid #d6d6d6 }
html body blockquote>:first-child { margin-top: 0 }
html body blockquote>:last-child { margin-bottom: 0 }
html body hr { height: 4px; margin: 32px 0; background-color: #d6d6d6; border: 0 none }
html body table { margin: 10px 0 15px 0; border-collapse: collapse; border-spacing: 0; display: block; width: 100%; overflow: auto }
html body table th { font-weight: bold; color: #000 }
html body table td,html body table th { border: 1px solid #d6d6d6; padding: 6px 13px }
html body dl { padding: 0 }
html body dl dt { padding: 0; margin-top: 16px; font-size: 1em; font-style: italic; font-weight: bold }
html body dl dd { padding: 0 16px; margin-bottom: 16px }
html body code { font-family: Menlo, Monaco, Consolas, "Courier New", monospace; font-size: .85em !important; color: #000; background-color: #f0f0f0; padding: .2em 0 }
html body code::before,html body code::after { letter-spacing: -0.2em; content: " " }
html body pre>code { padding: 0; margin: 0; font-size: .85em !important; white-space: pre; background: transparent; border: 0 }
html body .highlight { margin-bottom: 16px }
html body .highlight pre,html body pre { padding: 1em; overflow: auto; font-size: .85em !important; line-height: 1.45; border: #d6d6d6 }
html body .highlight pre { margin-bottom: 0 }
html body pre code,html body pre tt { display: inline; max-width: initial; padding: 0; margin: 0; overflow: initial; line-height: inherit; background-color: transparent; border: 0 }
html body pre code::before,html body pre tt::before,html body pre code::after,html body pre tt::after { content: normal }
html body p,html body blockquote,html body ul,html body ol,html body dl,html body pre { margin-top: 0; margin-bottom: 16px }
html body kbd { color: #000; border: 1px solid #d6d6d6; border-bottom: 2px solid #c7c7c7; padding: 2px 4px; background-color: #f0f0f0 }
.markdown-preview { width: 100%; height: 100% }
.markdown-preview .pagebreak,.markdown-preview .newpage { page-break-before: always }
.markdown-preview pre.line-numbers { position: relative; padding-left: 3.8em; counter-reset: linenumber }
.markdown-preview pre.line-numbers>code { position: relative }
.markdown-preview pre.line-numbers .line-numbers-rows { position: absolute; top: 1em; font-size: 100%; left: 0; width: 3em; letter-spacing: -1px; border-right: 1px solid #999 }
.markdown-preview pre.line-numbers .line-numbers-rows>span { display: block; counter-increment: linenumber }
.markdown-preview pre.line-numbers .line-numbers-rows>span::before { content: counter(linenumber); color: #999; display: block; padding-right: .8em; text-align: right }
.markdown-preview .mathjax-exps .MathJax_Display { text-align: center !important }
.markdown-preview:not([for="preview"]) .code-chunk .btn-group { display: none }
.markdown-preview:not([for="preview"]) .code-chunk .status { display: none }
.markdown-preview:not([for="preview"]) .code-chunk .output-div { margin-bottom: 16px }
.scrollbar-style::-webkit-scrollbar { width: 8px }
.scrollbar-style::-webkit-scrollbar-track { background-color: transparent }
.scrollbar-style::-webkit-scrollbar-thumb { background-color: rgba(150,150,150,0.66); border: 4px solid rgba(150,150,150,0.66) }
html body[for="html-export"]:not([data-presentation-mode]) { position: relative; width: 100%; height: 100%; top: 0; left: 0; margin: 0; padding: 0; overflow: auto }
html body[for="html-export"]:not([data-presentation-mode]) .markdown-preview { position: relative; top: 0 }
html body[for="html-export"]:not([data-presentation-mode]) #sidebar-toc-btn { position: fixed; bottom: 8px; left: 8px; font-size: 28px; cursor: pointer; color: inherit; z-index: 99; width: 32px; text-align: center; opacity: .4 }
html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] #sidebar-toc-btn { opacity: 1 }
html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc { position: fixed; top: 0; left: 0; width: 300px; height: 100%; padding: 32px 0 48px 0; font-size: 14px; overflow: auto; background-color: inherit }
html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc::-webkit-scrollbar { width: 8px }
html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc::-webkit-scrollbar-track { background-color: transparent }
html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc::-webkit-scrollbar-thumb { background-color: rgba(150,150,150,0.66); border: 4px solid rgba(150,150,150,0.66) }
html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc a { text-decoration: none }
html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc ul { padding: 0 1.6em; margin-top: .8em }
html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc li { margin-bottom: .8em }
html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc ul { list-style-type: none }
html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .markdown-preview { left: 300px; width: calc(100% - 300px); padding: 2em calc(50% - 457px - 150px); margin: 0 }
html body[for="html-export"]:not([data-presentation-mode]):not([html-show-sidebar-toc]) .markdown-preview { left: 50% }
html body[for="html-export"]:not([data-presentation-mode]):not([html-show-sidebar-toc]) .md-sidebar-toc { display: none }
简要说明
//连接代码。 using (var client = await StartClientWithRetries()) { }
从方法看,只是一个简单允许重试的启动客户端。追踪进去会发现关于重试逻辑的实践,Socket编程的实践,基于内存的消息队列的实践,依赖注入。再看源码的基础上,最好能配合一些理论书籍来看。理论指导实践,实践反馈理论,才是技术成长的步骤。
这篇文章只涉及Connect所引用方法的部分说明,一步一步来加深理解。
本来我是打算把orleans研究透之后再来写一篇,但看了一周之后,发下connect里面调用了很多类,每个类又有很多方法,这样下去没有尽头,到最终估计什么也写不成。
分析源码本来就是循环渐进的过程,也是一个熟悉框架/原理/实践的过程。直接跳过这个步骤,必然损失良多。所以这部分就叫开胃菜吧。在查看connect过程,会越来越接触到各种知识。
本篇暂不涉及数据持久化,主要依赖.netcore内置方法操纵内存实现。
您会接触到的扩展知识
扩展知识之Timer&TimerQueueTimer
Timer 在设置的间隔后生成事件,并提供生成重复事件的选项 TimerQueue 时间队列扩展知识之信号量
SemaphoreSlim
SemaphoreSlim 实现
//信号量 SemaphoreSlim 表示Semaphore的轻量级替代,它限制了可以同时访问资源或资源池的线程数 >>Release 释放 >> Wait 等待。 信号量有两种类型:本地信号量和命名系统信号量。前者是应用程序的本地。后者在整个操作系统中是可见的,并且适用于进程间同步。该SemaphoreSlim是一个轻量级替代信号量不使用Windows内核中的信号类。与Semaphore类不同,SemaphoreSlim类不支持命名系统信号量。您只能将其用作本地信号量。所述SemaphoreSlim类为单一的应用程序内的同步推荐的信号量。扩展知识之BlockingCollection
BlockingCollection介绍
利用BlockingCollection实现生产者和消费者队列
BlockingCollection 为实现 IProducerConsumerCollection<T> 的线程安全集合提供阻塞和限制功能。 >> Take >> Add 有这个类型,扩展知识之Interlocked
Interlocked
Interlocked为多个线程共享的变量提供原子操作。 >>Add >>Decrement以原子操作的形式递减指定变量的值并存储结果。 >>Increment以原子操作的形式递增指定变量的值并存储结果 >>Exchange >>CompareExchange >>Read 个人想法:和Redis的Increment/Decrement类似,部分情况下可以取代Redis的increment/decrement,提高速度。扩展知识之SpinWait
SpinWait
两阶段提交
Monitor
SpinWait 为基于旋转的等待提供支持。 SpinWait是一种值类型,这意味着低级代码可以使用SpinWait而不必担心不必要的分配开销。SpinWait通常不适用于普通应用程序。在大多数情况下,您应该使用.NET Framework提供的同步类,例如Monitor >> SpinOnce扩展知识之Queue&Stack
Queue
Stack
Queue<T> 表示先进先出的对象集合,此类将通用队列实现为循环数组。存储在队列<T>中的对象在一端插入并从另一端移除。 >Enqueue >Dequeue >Peek Stack<T> 表示具有相同指定类型的实例的可变大小后进先出(LIFO)集合。 >Push >Pop >PeeK ConcurrentQueue <T> 表示线程安全的先进先出的对象集合 ConcurrentStack <T> 表示线程安全的后进先出(LIFO)集合 如果需要以与存储在集合中的顺序相同的顺序访问信息,请使用Queue <T>。如果需要以相反的顺序访问信息,请使用Stack <T>。使用ConcurrentQueue <T>或ConcurrentStack <T> 如果您需要同时从多个线程访问该集合。扩展知识之Task
TaskCompletionSource
基于Task的异步模式--全面介绍
TaskCompletionSource表示未绑定到委托的Task <TResult>的生产者端,通过Task属性提供对使用者端的访问。扩展知识之线程安全的集合
System.Collections.Concurrent
ConcurrentDictionary
ConcurrentDictionary 对决 Dictionary+Locking
System.Collections.Concurrent提供了应在的地方对应的类型在使用几个线程安全的集合类System.Collections中和System.Collections.Generic命名空间,只要多线程并发访问的集合。 但是,通过当前集合实现的其中一个接口访问的成员(包括扩展方法)不保证是线程安全的,并且可能需要由调用者同步。 ConcurrentDictionary:表示可以由多个线程同时访问的键/值对的线程安全集合 对于ConcurrentDictionary <TKey,TValue>类上的所有其他操作,所有这些操作都是原子操作并且是线程安全的。唯一的例外是接受委托的方法,即AddOrUpdate和GetOrAdd。对于字典的修改和写入操作,ConcurrentDictionary <TKey,TValue>使用细粒度锁定来确保线程安全。(对字典的读取操作是以无锁方式执行的。)但是,这些方法的委托在锁外部调用,以避免在锁定下执行未知代码时可能出现的问题。因此,这些代理执行的代码不受操作的原子性影响。扩展知识之网络编程
Socket微软官方文档
Socket博客园
Socket 类提供一组丰富的方法和属性进行网络通信 TCP协议 >BeginConnect >EndConnect >BeginSend >EndSend >BeginReceive >EndReceive >BeginAccept >EndAccept UDP协议 >BeginSendTo >EndSendTo >BeginReceiveFromandEndReceiveFrom扩展知识之线程通知:
AutoResetEvent
ManualResetEvent
ManualResetEventSlim
AutoResetEvent允许线程通过信令相互通信。通常,当线程需要对资源的独占访问时,可以使用此类。 >Set释放线程 >WaitOne等待线程 ManualResetEvent 通知一个或多个等待线程发生了事件 ManualResetEventSlim 当等待时间预期非常短,并且事件未跨越进程边界时,您可以使用此类以获得比ManualResetEvent更好的性能扩展知识之依赖注入:
ActivatorUtilities
扩展.net-使用.netcore进行依赖注入
服务可以通过两种机制来解析: IServiceProvider ActivatorUtilities – 允许在依赖关系注入容器中创建没有服务注册的对象。 ActivatorUtilities 用于面向用户的抽象,例如标记帮助器、MVC 控制器、SignalR 集线器和模型绑定器。 >ActivatorUtilities.CreateInstance >ActivatorUtilities.GetServiceOrCreateInstance
Client连接代码。
//连接代码。 using (var client = await StartClientWithRetries()) { await DoClientWork(client); Console.ReadKey(); }
重点分析StartClientWithRetries
UseLocalhostClustering 用来配置连接参数:端口/ClusterId/ServiceId等。 配置一个连接本地silo的客户端,也有其他类型的如: UseServiceProviderFactory,UseStaticClustering
Build用来注册默认服务和构建容器,扩展了解依赖注入知识。微软自带Microsoft.Extensions.DependencyInjection库
private static async Task<IClusterClient> StartClientWithRetries() { attempt = 0; IClusterClient client; client = new ClientBuilder() .UseLocalhostClustering() .Configure<ClusterOptions>(options => { options.ClusterId = "dev"; options.ServiceId = "HelloWorldApp"; }) .ConfigureLogging(logging => logging.AddConsole()) .Build(); await client.Connect(RetryFilter); Console.WriteLine("Client successfully connect to silo host"); return client; }
先来看下connect
这里的LockAsync,内部用了SemaphoreSlim.Wait需要扩展了解下。和lock的区别。信号量本地信号量和系统信号量。
这里用state来维护生命周期
public async Task Connect(Func<Exception, Task<bool>> retryFilter = null) { this.ThrowIfDisposedOrAlreadyInitialized(); using (await this.initLock.LockAsync().ConfigureAwait(false)) { this.ThrowIfDisposedOrAlreadyInitialized(); if (this.state == LifecycleState.Starting) { throw new InvalidOperationException("A prior connection attempt failed. This instance must be disposed."); } this.state = LifecycleState.Starting; if (this.runtimeClient is OutsideRuntimeClient orc) await orc.Start(retryFilter).ConfigureAwait(false); await this.clusterClientLifecycle.OnStart().ConfigureAwait(false); this.state = LifecycleState.Started; } }
看下orc.Start
public async Task Start(Func<Exception, Task<bool>> retryFilter = null) { // Deliberately avoid capturing the current synchronization context during startup and execute on the default scheduler. // This helps to avoid any issues (such as deadlocks) caused by executing with the client's synchronization context/scheduler. await Task.Run(() => this.StartInternal(retryFilter)).ConfigureAwait(false); logger.Info(ErrorCode.ProxyClient_StartDone, "{0} Started OutsideRuntimeClient with Global Client ID: {1}", BARS, CurrentActivationAddress.ToString() + ", client GUID ID: " + handshakeClientId); }
重要的StartInternal
gateways获取网关列表transport用来维护客户端消息管理。
RunClientMessagePump用来处理接收分发消息。
private async Task StartInternal(Func<Exception, Task<bool>> retryFilter) { // Initialize the gateway list provider, since information from the cluster is required to successfully // initialize subsequent services. var initializedGatewayProvider = new[] {false}; await ExecuteWithRetries(async () => { if (!initializedGatewayProvider[0]) { await this.gatewayListProvider.InitializeGatewayListProvider(); initializedGatewayProvider[0] = true; } var gateways = await this.gatewayListProvider.GetGateways(); if (gateways.Count == 0) { var gatewayProviderType = this.gatewayListProvider.GetType().GetParseableName(); var err = $"Could not find any gateway in {gatewayProviderType}. Orleans client cannot initialize."; logger.Error(ErrorCode.GatewayManager_NoGateways, err); throw new SiloUnavailableException(err); } }, retryFilter); var generation = -SiloAddress.AllocateNewGeneration(); // Client generations are negative transport = ActivatorUtilities.CreateInstance<ClientMessageCenter>(this.ServiceProvider, localAddress, generation, handshakeClientId); transport.Start(); CurrentActivationAddress = ActivationAddress.NewActivationAddress(transport.MyAddress, handshakeClientId); listeningCts = new CancellationTokenSource(); var ct = listeningCts.Token; listenForMessages = true; // Keeping this thread handling it very simple for now. Just queue task on thread pool. Task.Run( () => { while (listenForMessages && !ct.IsCancellationRequested) { try { RunClientMessagePump(ct); } catch (Exception exc) { logger.Error(ErrorCode.Runtime_Error_100326, "RunClientMessagePump has thrown exception", exc); } } }, ct).Ignore(); await ExecuteWithRetries( async () => this.GrainTypeResolver = await transport.GetGrainTypeResolver(this.InternalGrainFactory), retryFilter); this.typeMapRefreshTimer = new AsyncTaskSafeTimer( this.logger, RefreshGrainTypeResolver, null, this.typeMapRefreshInterval, this.typeMapRefreshInterval); ClientStatistics.Start(transport, clientId); await ExecuteWithRetries(StreamingInitialize, retryFilter); async Task ExecuteWithRetries(Func<Task> task, Func<Exception, Task<bool>> shouldRetry) { while (true) { try { await task(); return; } catch (Exception exception) when (shouldRetry != null) { var retry = await shouldRetry(exception); if (!retry) throw; } } } }
重点关注下StartInternal里面ClientMessageCenter的初始化
用来处理消息分发等,也涉及网关部分调用。public ClientMessageCenter( IOptions<GatewayOptions> gatewayOptions, IOptions<ClientMessagingOptions> clientMessagingOptions, IPAddress localAddress, int gen, GrainId clientId, IGatewayListProvider gatewayListProvider, SerializationManager serializationManager, IRuntimeClient runtimeClient, MessageFactory messageFactory, IClusterConnectionStatusListener connectionStatusListener, ExecutorService executorService, ILoggerFactory loggerFactory, IOptions<NetworkingOptions> networkingOptions, IOptions<StatisticsOptions> statisticsOptions) { this.loggerFactory = loggerFactory; this.openConnectionTimeout = networkingOptions.Value.OpenConnectionTimeout; this.SerializationManager = serializationManager; this.executorService = executorService; lockable = new object(); MyAddress = SiloAddress.New(new IPEndPoint(localAddress, 0), gen); ClientId = clientId; this.RuntimeClient = runtimeClient; this.messageFactory = messageFactory; this.connectionStatusListener = connectionStatusListener; Running = false; GatewayManager = new GatewayManager(gatewayOptions.Value, gatewayListProvider, loggerFactory); PendingInboundMessages = new BlockingCollection<Message>(); gatewayConnections = new Dictionary<Uri, GatewayConnection>(); numMessages = 0; grainBuckets = new WeakReference[clientMessagingOptions.Value.ClientSenderBuckets]; logger = loggerFactory.CreateLogger<ClientMessageCenter>(); if (logger.IsEnabled(LogLevel.Debug)) logger.Debug("Proxy grain client constructed"); IntValueStatistic.FindOrCreate( StatisticNames.CLIENT_CONNECTED_GATEWAY_COUNT, () => { lock (gatewayConnections) { return gatewayConnections.Values.Count(conn => conn.IsLive); } }); statisticsLevel = statisticsOptions.Value.CollectionLevel; if (statisticsLevel.CollectQueueStats()) { queueTracking = new QueueTrackingStatistic("ClientReceiver", statisticsOptions); } }
关注下StartInternal的RunClientMessagePump
WaitMessage里面利用了BlockingCollection.Take
private void RunClientMessagePump(CancellationToken ct) { incomingMessagesThreadTimeTracking?.OnStartExecution(); while (listenForMessages) { var message = transport.WaitMessage(Message.Categories.Application, ct); if (message == null) // if wait was cancelled break; // when we receive the first message, we update the // clientId for this client because it may have been modified to // include the cluster name if (!firstMessageReceived) { firstMessageReceived = true; if (!handshakeClientId.Equals(message.TargetGrain)) { clientId = message.TargetGrain; transport.UpdateClientId(clientId); CurrentActivationAddress = ActivationAddress.GetAddress(transport.MyAddress, clientId, CurrentActivationAddress.Activation); } else { clientId = handshakeClientId; } } switch (message.Direction) { case Message.Directions.Response: { ReceiveResponse(message); break; } case Message.Directions.OneWay: case Message.Directions.Request: { this.localObjects.Dispatch(message); break; } default: logger.Error(ErrorCode.Runtime_Error_100327, $"Message not supported: {message}."); break; } } incomingMessagesThreadTimeTracking?.OnStopExecution(); }
RunClientMessagePump里面的ReceiveResponse
这里主要是对response做一些判断处理。public void ReceiveResponse(Message response) { if (logger.IsEnabled(LogLevel.Trace)) logger.Trace("Received {0}", response); // ignore duplicate requests if (response.Result == Message.ResponseTypes.Rejection && response.RejectionType == Message.RejectionTypes.DuplicateRequest) return; CallbackData callbackData; var found = callbacks.TryGetValue(response.Id, out callbackData); if (found) { // We need to import the RequestContext here as well. // Unfortunately, it is not enough, since CallContext.LogicalGetData will not flow "up" from task completion source into the resolved task. // RequestContextExtensions.Import(response.RequestContextData); callbackData.DoCallback(response); } else { logger.Warn(ErrorCode.Runtime_Error_100011, "No callback for response message: " + response); } } //DoCallBack public void DoCallback(Message response) { if (this.IsCompleted) return; var requestStatistics = this.shared.RequestStatistics; lock (this) { if (this.IsCompleted) return; if (response.Result == Message.ResponseTypes.Rejection && response.RejectionType == Message.RejectionTypes.Transient) { if (this.shared.ShouldResend(this.Message)) { return; } } this.IsCompleted = true; if (requestStatistics.CollectApplicationRequestsStats) { this.stopwatch.Stop(); } this.shared.Unregister(this.Message); } if (requestStatistics.CollectApplicationRequestsStats) { requestStatistics.OnAppRequestsEnd(this.stopwatch.Elapsed); } // do callback outside the CallbackData lock. Just not a good practice to hold a lock for this unrelated operation. this.shared.ResponseCallback(response, this.context); } //this.shared.Unregister(this.Message);
RunClientMessagePump里面的消息分发Dispatch(message)
这里面用ConcurrentDictionary<GuidId, LocalObjectData>来判断ObserverId是否存在,不存在移除。
如果存在,利用Queue
如果启动成功,异步调用LocalObjectMessagePumpAsync,然后利用Queue
private async Task LocalObjectMessagePumpAsync(LocalObjectData objectData) { while (true) { try { Message message; lock (objectData.Messages) { if (objectData.Messages.Count == 0) { objectData.Running = false; break; } message = objectData.Messages.Dequeue(); } if (ExpireMessageIfExpired(message, MessagingStatisticsGroup.Phase.Invoke)) continue; RequestContextExtensions.Import(message.RequestContextData); var request = (InvokeMethodRequest)message.GetDeserializedBody(this.serializationManager); var targetOb = (IAddressable)objectData.LocalObject.Target; object resultObject = null; Exception caught = null; try { // exceptions thrown within this scope are not considered to be thrown from user code // and not from runtime code. var resultPromise = objectData.Invoker.Invoke(targetOb, request); if (resultPromise != null) // it will be null for one way messages { resultObject = await resultPromise; } } catch (Exception exc) { // the exception needs to be reported in the log or propagated back to the caller. caught = exc; } if (caught != null) this.ReportException(message, caught); else if (message.Direction != Message.Directions.OneWay) this.SendResponseAsync(message, resultObject); } catch (Exception) { // ignore, keep looping. } } }SendResponseAsync经过序列化,DeepCopy,赋值各种请求参数等各种操作以后,来到最关键的部分
transport.SendMessage
第一步先获取活动的网关(silo),如没有则建立GatewayConnection
第二步启动Connection
Connect--调用socket创建连接
Start--GatewayClientReceiver间接调用Socket来接收消息,
public void SendMessage(Message msg) { GatewayConnection gatewayConnection = null; bool startRequired = false; if (!Running) { this.logger.Error(ErrorCode.ProxyClient_MsgCtrNotRunning, $"Ignoring {msg} because the Client message center is not running"); return; } // If there's a specific gateway specified, use it if (msg.TargetSilo != null && GatewayManager.GetLiveGateways().Contains(msg.TargetSilo.ToGatewayUri())) { Uri addr = msg.TargetSilo.ToGatewayUri(); lock (lockable) { if (!gatewayConnections.TryGetValue(addr, out gatewayConnection) || !gatewayConnection.IsLive) { gatewayConnection = new GatewayConnection(addr, this, this.messageFactory, executorService, this.loggerFactory, this.openConnectionTimeout); gatewayConnections[addr] = gatewayConnection; if (logger.IsEnabled(LogLevel.Debug)) logger.Debug("Creating gateway to {0} for pre-addressed message", addr); startRequired = true; } } } // For untargeted messages to system targets, and for unordered messages, pick a next connection in round robin fashion. else if (msg.TargetGrain.IsSystemTarget || msg.IsUnordered) { // Get the cached list of live gateways. // Pick a next gateway name in a round robin fashion. // See if we have a live connection to it. // If Yes, use it. // If not, create a new GatewayConnection and start it. // If start fails, we will mark this connection as dead and remove it from the GetCachedLiveGatewayNames. lock (lockable) { int msgNumber = numMessages; numMessages = unchecked(numMessages + 1); IList<Uri> gatewayNames = GatewayManager.GetLiveGateways(); int numGateways = gatewayNames.Count; if (numGateways == 0) { RejectMessage(msg, "No gateways available"); logger.Warn(ErrorCode.ProxyClient_CannotSend, "Unable to send message {0}; gateway manager state is {1}", msg, GatewayManager); return; } Uri addr = gatewayNames[msgNumber % numGateways]; if (!gatewayConnections.TryGetValue(addr, out gatewayConnection) || !gatewayConnection.IsLive) { gatewayConnection = new GatewayConnection(addr, this, this.messageFactory, this.executorService, this.loggerFactory, this.openConnectionTimeout); gatewayConnections[addr] = gatewayConnection; if (logger.IsEnabled(LogLevel.Debug)) logger.Debug(ErrorCode.ProxyClient_CreatedGatewayUnordered, "Creating gateway to {0} for unordered message to grain {1}", addr, msg.TargetGrain); startRequired = true; } // else - Fast path - we've got a live gatewayConnection to use } } // Otherwise, use the buckets to ensure ordering. else { var index = msg.TargetGrain.GetHashCode_Modulo((uint)grainBuckets.Length); lock (lockable) { // Repeated from above, at the declaration of the grainBuckets array: // Requests are bucketed by GrainID, so that all requests to a grain get routed through the same bucket. // Each bucket holds a (possibly null) weak reference to a GatewayConnection object. That connection instance is used // if the WeakReference is non-null, is alive, and points to a live gateway connection. If any of these conditions is // false, then a new gateway is selected using the gateway manager, and a new connection established if necessary. var weakRef = grainBuckets[index]; if ((weakRef != null) && weakRef.IsAlive) { gatewayConnection = weakRef.Target as GatewayConnection; } if ((gatewayConnection == null) || !gatewayConnection.IsLive) { var addr = GatewayManager.GetLiveGateway(); if (addr == null) { RejectMessage(msg, "No gateways available"); logger.Warn(ErrorCode.ProxyClient_CannotSend_NoGateway, "Unable to send message {0}; gateway manager state is {1}", msg, GatewayManager); return; } if (logger.IsEnabled(LogLevel.Trace)) logger.Trace(ErrorCode.ProxyClient_NewBucketIndex, "Starting new bucket index {0} for ordered messages to grain {1}", index, msg.TargetGrain); if (!gatewayConnections.TryGetValue(addr, out gatewayConnection) || !gatewayConnection.IsLive) { gatewayConnection = new GatewayConnection(addr, this, this.messageFactory, this.executorService, this.loggerFactory, this.openConnectionTimeout); gatewayConnections[addr] = gatewayConnection; if (logger.IsEnabled(LogLevel.Debug)) logger.Debug(ErrorCode.ProxyClient_CreatedGatewayToGrain, "Creating gateway to {0} for message to grain {1}, bucket {2}, grain id hash code {3}X", addr, msg.TargetGrain, index, msg.TargetGrain.GetHashCode().ToString("x")); startRequired = true; } grainBuckets[index] = new WeakReference(gatewayConnection); } } } if (startRequired) { gatewayConnection.Start(); if (!gatewayConnection.IsLive) { // if failed to start Gateway connection (failed to connect), try sending this msg to another Gateway. RejectOrResend(msg); return; } } try { gatewayConnection.QueueRequest(msg); if (logger.IsEnabled(LogLevel.Trace)) logger.Trace(ErrorCode.ProxyClient_QueueRequest, "Sending message {0} via gateway {1}", msg, gatewayConnection.Address); } catch (InvalidOperationException) { // This exception can be thrown if the gateway connection we selected was closed since we checked (i.e., we lost the race) // If this happens, we reject if the message is targeted to a specific silo, or try again if not RejectOrResend(msg); } }
public void Connect() { if (!MsgCenter.Running) { if (Log.IsEnabled(LogLevel.Debug)) Log.Debug(ErrorCode.ProxyClient_MsgCtrNotRunning, "Ignoring connection attempt to gateway {0} because the proxy message center is not running", Address); return; } // Yes, we take the lock around a Sleep. The point is to ensure that no more than one thread can try this at a time. // There's still a minor problem as written -- if the sending thread and receiving thread both get here, the first one // will try to reconnect. eventually do so, and then the other will try to reconnect even though it doesn't have to... // Hopefully the initial "if" statement will prevent that. lock (Lockable) { if (!IsLive) { if (Log.IsEnabled(LogLevel.Debug)) Log.Debug(ErrorCode.ProxyClient_DeadGateway, "Ignoring connection attempt to gateway {0} because this gateway connection is already marked as non live", Address); return; // if the connection is already marked as dead, don't try to reconnect. It has been doomed. } for (var i = 0; i < ClientMessageCenter.CONNECT_RETRY_COUNT; i++) { try { if (Socket != null) { if (Socket.Connected) return; MarkAsDisconnected(Socket); // clean up the socket before reconnecting. } if (lastConnect != new DateTime()) { // We already tried at least once in the past to connect to this GW. // If we are no longer connected to this GW and it is no longer in the list returned // from the GatewayProvider, consider directly this connection dead. if (!MsgCenter.GatewayManager.GetLiveGateways().Contains(Address)) break; // Wait at least ClientMessageCenter.MINIMUM_INTERCONNECT_DELAY before reconnection tries var millisecondsSinceLastAttempt = DateTime.UtcNow - lastConnect; if (millisecondsSinceLastAttempt < ClientMessageCenter.MINIMUM_INTERCONNECT_DELAY) { var wait = ClientMessageCenter.MINIMUM_INTERCONNECT_DELAY - millisecondsSinceLastAttempt; if (Log.IsEnabled(LogLevel.Debug)) Log.Debug(ErrorCode.ProxyClient_PauseBeforeRetry, "Pausing for {0} before trying to connect to gateway {1} on trial {2}", wait, Address, i); Thread.Sleep(wait); } } lastConnect = DateTime.UtcNow; Socket = new Socket(Silo.Endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); Socket.EnableFastpath(); SocketManager.Connect(Socket, Silo.Endpoint, this.openConnectionTimeout); NetworkingStatisticsGroup.OnOpenedGatewayDuplexSocket(); MsgCenter.OnGatewayConnectionOpen(); SocketManager.WriteConnectionPreamble(Socket, MsgCenter.ClientId); // Identifies this client Log.Info(ErrorCode.ProxyClient_Connected, "Connected to gateway at address {0} on trial {1}.", Address, i); return; } catch (Exception ex) { Log.Warn(ErrorCode.ProxyClient_CannotConnect, $"Unable to connect to gateway at address {Address} on trial {i} (Exception: {ex.Message})"); MarkAsDisconnected(Socket); } } // Failed too many times -- give up MarkAsDead(); } }GatewayConnection的Start会调用到GatewayClientReceiver的Run方法,利用BlockingCollection
protected override void Run() { try { while (!Cts.IsCancellationRequested) { int bytesRead = FillBuffer(buffer.BuildReceiveBuffer()); if (bytesRead == 0) { continue; } buffer.UpdateReceivedData(bytesRead); Message msg; while (buffer.TryDecodeMessage(out msg)) { gatewayConnection.MsgCenter.QueueIncomingMessage(msg); if (Log.IsEnabled(LogLevel.Trace)) Log.Trace("Received a message from gateway {0}: {1}", gatewayConnection.Address, msg); } } } catch (Exception ex) { buffer.Reset(); Log.Warn(ErrorCode.ProxyClientUnhandledExceptionWhileReceiving, $"Unexpected/unhandled exception while receiving: {ex}. Restarting gateway receiver for {gatewayConnection.Address}.", ex); throw; } }
关注SafeTimerBase类
Orleans用于处理定时或延时回调作业。
总结
创建一个简单的connect,里面有这么多沟沟渠渠,但本质上来说,最底层是利用Socket套接字机制来实施机制。在Socket的基础之上,又封装维护了一层GatewayConnection和GatewayClientReceiver来实现网关(Silo)的操作,比如重试/监控/熔断等,再结合Timer,Queue
如果您完全熟悉异步编程,并行编程,Socket网络编程。又对分布式/微服务理论有较深的理解,那么orleans实现机制,对您来说可能是相对容易。
本期结束,下期更精彩!
相关文章推荐
- jQuery-1.9.1源码分析系列(十五) 动画处理——外篇
- Libevent源码分析-----连接监听器evconnlistener
- Android进阶系列--源码分析模板方法模式在AsyncTask的运用
- jQuery源码分析系列
- Tomcat源码分析(二)--连接处理
- apache kafka系列之源码分析走读-kafka内部模块分析
- 【Spring源码分析系列】启动component-scan类扫描加载过程
- QUnit系列 -- 5.QUnit源码分析之<大致结构>
- Java集合系列之HashMap源码分析
- 【Spark SQL 源码分析系列文章】
- android缓存系列:DiskLruCache源码分析
- jQuery源码分析系列:.domManip() .buildFragment() .clean()
- 【Spring源码分析系列】bean的加载
- Android进阶系列之源码分析Activity的启动流程
- Query-1.9.1源码分析系列(十) 事件系统——事件委托
- lucene4.5源码分析系列:lucene默认索引的文件格式-总述
- 【Netty源码分析】客户端connect服务端过程
- connect 端口分配源码分析
- smack 源码分析- PacketWriter (android上实现长连接)【2】
- tomcat源码分析连接coyote catalina