gen_tcp发送缓冲区以及水位线问题分析
2013-06-07 12:56
405 查看
前段时间有同学在线上问了个问题:
服务器端我是这样设的:gen_tcp:listen(8000,[{active,false},{recbuf,1},{buffer,1}]).
客户端是这样设的:gen_tcp:connect(“localhost”,8000,[{active,false},{high_watermark,2},{low_watermark,1},{sndbuf,1},{buffer,1}]).
我客户端每次gen_tcp:send()发送一个字节,前6个字节返回ok,第7个字节阻塞
服务端每次gen_tcp:recv(_,0)接收一个字节,接收三个字节后,客户端的第7次发送返回。
按我的理解的话:应该是服务器端可以接收2个字节+sndbuf里的一个字节,第4个字节客户端就该阻塞的,可事实不时这样,求分析
这个问题确实还是比较复杂,涉及到gen_tcp的发送缓冲区和接收缓冲区,水位线等问题,其中接收缓冲区的问题在这篇以及这篇
博文里面讲的比较清楚了,今天我们重点来分析下发送缓冲区和水位线的问题。
在开始分析前,我们需要熟悉几个gen_tcp的选项,更多参见
这里:
{delay_send,Boolean}
Normally,whenanErlangprocesssendstoasocket,thedriverwilltrytoimmediatelysendthedata.Ifthatfails,thedriverwilluseanymeansavailabletoqueueupthemessagetobesentwhenevertheoperatingsystemsaysitcanhandleit.Setting{delay_send,
true}willmakeallmessagesqueueup.Thismakesthemessagesactuallysentontothenetworkbelargerbutfewer.TheoptionactuallyaffectstheschedulingofsendrequestsversusErlangprocessesinsteadofchanginganyrealpropertyofthesocket.Needless
tosayitisanimplementationspecificoption.Defaultisfalse.
{high_msgq_watermark,Size}(TCP/IPsockets)
Thesocketmessagequeuewillbesetintoabusystatewhentheamountofdataqueuedonthemessagequeuereachesthislimit.NotethatthislimitonlyconcernsdatathathavenotyetreachedtheERTSinternalsocketimplementation.Defaultvalueusedis8
kB.
Sendersofdatatothesocketwillbesuspendedifeitherthesocketmessagequeueisbusy,orthesocketitselfisbusy.
Formoreinformationseethelow_msgq_watermark,high_watermark,andlow_watermarkoptions.
Notethatdistributionsocketswilldisabletheuseofhigh_msgq_watermarkandlow_msgq_watermark,andwillinsteadusethedistributionbufferbusylimitwhichisasimilarfeature.
{high_watermark,Size}(TCP/IPsockets)
ThesocketwillbesetintoabusystatewhentheamountofdataqueuedinternallybytheERTSsocketimplementationreachesthislimit.Defaultvalueusedis8kB.
Sendersofdatatothesocketwillbesuspendedifeitherthesocketmessagequeueisbusy,orthesocketitselfisbusy.
Formoreinformationseethelow_watermark,high_msgq_watermark,andlow_msqg_watermarkoptions.
{low_msgq_watermark,Size}(TCP/IPsockets)
Ifthesocketmessagequeueisinabusystate,thesocketmessagequeuewillbesetinanotbusystatewhentheamountofdataqueuedinthemessagequeuefallsbelowthislimit.NotethatthislimitonlyconcernsdatathathavenotyetreachedtheERTSinternal
socketimplementation.Defaultvalueusedis4kB.
Sendersthathavebeensuspendedduetoeitherabusymessagequeueorabusysocket,willberesumedwhenneitherthesocketmessagequeue,northesocketarebusy.
Formoreinformationseethehigh_msgq_watermark,high_watermark,andlow_watermarkoptions.
Notethatdistributionsocketswilldisabletheuseofhigh_msgq_watermarkandlow_msgq_watermark,andwillinsteadusethedistributionbufferbusylimitwhichisasimilarfeature.
{low_watermark,Size}(TCP/IPsockets)
Ifthesocketisinabusystate,thesocketwillbesetinanotbusystatewhentheamountofdataqueuedinternallybytheERTSsocketimplementationfallsbelowthislimit.Defaultvalueusedis4kB.
Sendersthathavebeensuspendedduetoeitherabusymessagequeueorabusysocket,willberesumedwhenneitherthesocketmessagequeue,northesocketarebusy.
Formoreinformationseethehigh_watermark,high_msgq_watermark,andlow_msgq_watermarkoptions.
这选项里面两对高低水位线的设置,以及delay_send选项,对发送缓冲区的影响很大。
gen_tcp:send的行为在之前的
博文中分析的比较到位了,建议同学先看看这篇文章垫底下。
我们知道每个erlang的进程都有个消息队列,其他进程要和他通信就需要透过发消息给他,把通讯的内容在消息里面交代清楚。进程消息队列里面一旦有消息,erlang的VM就会马上准备调度该进程来让进程执行,处理消息。这个进程的消息队列机制每个erlang入门的书籍都写的非常清楚。那么port呢?在Erlang的早期,Port是和进程一样的地位,接口,使用方式。Port作为Erlang对外的IO的执行单位,也拥有自己的消息队列,当进程把消息发送给port的时候,port通常也是把消息保存在消息队列中,然后VM就会调度这个port。等到port被调度执行的时候,port把队列里面的消息消耗掉,发送到网络或者执行相应IO的操作。port的调度和erlang的进程的调度是一样的,都非常讲究公平调度。
我们来考证下port和进程消息发送的接口。我们知道!符号是erlang:send的语法糖,当我们给Port!msg或者Pid!msg,最终都是调用erlang:send来发送消息。后面不知道为什么,erlang的设计者专门为port设计了port_command系列函数专门为port发送消息。
我们来考证下:
erlang:send->BIF_RETTYPEsend_3(BIF_ALIST_3)->do_send源码在bif.c中我们来看看:
viewsource
print?
诸位看到了吧!1.erlang:send接受二种对象:port和process2.发送到port的消息走的和erts_port_command是一样的路。
喝口水,保存体力,重新温习下二点:1.port有消息队列。2.port也是公平调度。
有了上面的知识铺垫,我们其实就比较好明白上面选项中的水位线做什么的。和每个消息队列一样,为了防止发送者和接收者能力的失衡,通常都会设置高低水位线来保护队列不至于太大把
系统撑爆。上面的{high_watermark,Size},{low_watermark,Size}就是干这个用的。
那port是如何保护自己的呢?答案是:
当消息量达到高水位线的时候,port进入busy状态,这时候会把发送进程suspend起来,等消息达到低水位线的时候,解除busy状态,同时让发送进程继续执行。
证明上面的说法,参考下port_command
文档:
port_command(Port,Data,OptionList)->boolean()
Types:
Port=port()|atom()
Data=iodata()
Option=force|nosuspend
OptionList=[Option]
Sendsdatatoaport.port_command(Port,Data,[])equalsport_command(Port,Data).
Iftheportcommandisabortedfalseisreturned;otherwise,trueisreturned.
Iftheportisbusy,thecallingprocesswillbesuspendeduntiltheportisnotbusyanymore.
CurrentlythefollowingOptionsarevalid:
force
Thecallingprocesswillnotbesuspendediftheportisbusy;instead,theportcommandisforcedthrough.Thecallwillfailwithanotsupexceptionifthedriverof
theportdoesnotsupportthis.FormoreinformationseetheERL_DRV_FLAG_SOFT_BUSYdriverflag.
nosuspend
Thecallingprocesswillnotbesuspendediftheportisbusy;instead,theportcommandisabortedandfalseisreturned.
那如何知道一个port进入busy状态,因为这个状态通常很严重,发送进程被挂起,会引起很大的latency.
幸亏erlang考虑周到,参看这里:
erlang:system_monitor(MonitorPid,Options)->MonSettings
busy_port
Ifaprocessinthesystemgetssuspendedbecauseitsendstoabusyport,amessage{monitor,SusPid,busy_port,Port}issenttoMonitorPid.SusPidisthepidthat
gotsuspendedwhensendingtoPort.
系统会很友好的把发生busy_port的进程发出来,我们就可以知道那个进程进程碰到高水位线被挂起了,方面我们后面调整水位线避免这种情况发生。
当用户调用gen_tcp:send要发送数据的时候最终都会调用port_command来具体执行,那么我们来看下它是如何运作的:
viewsource
print?
从源码分析来看,我们看到port_command如果看到port要执行command命令就会调用erts_port_output,而后者会做复杂的判断,来决定如何调用call_driver_outputv。
这个复杂的流程就是msgq_watermark水位线发挥作用地方,我们暂时不分析,等后面讲msgq_watermark的时候一起。
目前只需要知道最终gen_tcp:send发松数据会调用portdriver的outputv回调函数输出就好了。
接着源码分析:
viewsource
print?
对于inet_drv(gen_tcp)的例子来讲就是会调用tcp_sendv来把消息转变成网络封包发送出去。
好吧,喝口水,休息下。这里我们梳理下我们的数据路线:
gen_tcp:send->port_command->erts_port_output->call_driver_outputv->tcp_inet_commandv->tcp_sendv
大家要牢记在心。
继续接着我们参照源码来分析下水位线的实现:
viewsource
print?
从源码我们可以分析出几点:
1.水位线设置是可以继承的。
2.高低水位线默认是8K/4K.
3.进入高水位后,port进入busy状态。
4.当消息消耗到小于低水位线,busy解除。
这个水位线的说明和文档解释的一样,接下来我们稍微看看delay_send的实现原理,还是继续上源码:
viewsource
print?
从源码分析我们可以知道当tcp_sendv发送数据前看下:
1.delay_send标志是否设置,如果设置就不尝试调用sock_sendv发送。
2.调用sock_sendv发送网络数据,剩下的部分数据保存到驱动的队列去。
3.如果队列有数据的话,就把把epoll的写事件挂载上。
4.后续epoll会通知socket可写的时候,会调用tcp_inet_drv_output
5.tcp_inet_drv_output->tcp_inet_output继续把之前在队列里面的数据透过sock_sendv再次发送到网络
步骤3和4之间需要时间,依赖于epoll的写事件发生的以及port调度的时间点。
所以简单的说:delay_send就是在第一阶段不尝试发送数据,直接把数据推入port的消息队列去,等后面epoll说socket可写的时候一起发送出去。
这种做法的好处是gen_tcp:send马上就可以返回,因为sock_send通常要耗费几十us的时间,可用在对发送的latency很敏感的场合。
到这里为止,我们清楚的分析了数据是如何在port的各个链条里面流动.
再回顾下:当gen_tcp:send数据无法离开通过网络发送出去的时候,会暂时保留在port的消息队列里面,当消息队列满(到高水位线)的时候,port就会busy,抑制发送者推送更多的数据。当epoll探测到socket可写的时候,vm会调用tcp_inet_output把消息队列里面的数据,拉到网络去,这个过程中,队列里面的数据会越来越少,少到低水位线的时候,解除busy,好让发送者发送更多的数据。
再喝口水,我们接着分析msgq_watermark.这又是很大的一个坑,大家要坚持住,好像又是一顿分析!
首先,还是普及下知识:
从R16B的发布note里面我们摘抄和port相关的重大变化就是:
—Latencyofsignalssentfromprocessestoports—Signals
fromprocessestoportswherepreviouslyalwaysdelivered
immediately.Thiskeptlatencyforsuchcommunicationtoa
minimum,butitcouldcauselockcontentionwhichwasvery
expensiveforthesystemasawhole.Inordertokeepthis
latencylowalsointhefuture,mostsignalsfromprocesses
toportsarebydefaultstilldeliveredimmediatelyaslong
asnoconflictsoccur.Suchconflictsincludenotbeingable
toacquiretheportlock,butalsoincludeotherconflicts.
Whenaconflictoccur,thesignalwillbescheduledfor
deliveryatalatertime.Ascheduledsignaldeliverymay
causeahigherlatencyforthisspecificcommunication,but
improvestheoverallperformanceofthesystemsinceit
reducelockcontentionbetweenschedulers.Thedefault
behaviorofonlyschedulingdeliveryofthesesignalson
conflictcanbechangedbypassingthe+sppcommandlineflag
toerl(1).Thebehaviorcanalsobechangedonportbasis
usingtheparallelismoptionoftheopen_port/2BIF.
简单的说,过去进程给port发送数据的时候,都是立即锁定port,调用call_driver_outputv来消耗数据,干完活解锁。这样对于单个请求来讲,latency最低。但是如果系统有多个进程给同一个port发送数据,锁的碰撞率就会很高,势必影响到port的吞吐量。所以新的VM引入了port的parallelism这个概念,也就是说当锁冲突的时候,不是在那里傻傻的等,而是把要发送的数据引用计数后,通知port调度器在合适的时间,也就是说在
port不忙的时候,择机call_driver_outputv来消耗数据,这样就会大大提高吞吐量。
考证下代码:
viewsource
print?
分析了源码可以知道:
1.port并行发送的行为为了和过去的版本兼容默认是关闭的,但是可以用+spp全局打开
2.在open_port的时候通过参数{parallelism,true}来个别打开这个选项。
{parallelism,Boolean}
Setschedulerhintforportparallelism.Ifsettotrue,theVMwillscheduleporttaskswhenitbythiscanimprovetheparallelisminthesystem.Ifsettofalse,theVMwilltrytoperformporttasksimmediatelyandbythisimprovingthelatencyattheexpense
ofparallelism.Thedefaultcanbesetonsystemstartupbypassingthe+sppcommandlineargumenttoerl(1).
那开了这个选项后,对gen_tcp(port)有什么影响呢?最明显的区别是过去call_driver_outputv是排队执行的,谁先拿到锁,谁先执行。那么在driver_outputv里面如果消息消耗不了,有可能会把数据加到port的消息队列去。这个我们前面分析过,每个消息队列有高低水位线来控制,总能保证消息在一定的量。但是parallelism了后,当port在忙着做call_driver_outputv的时候,其他进程就不等了,直接把消息加引用计数保存到一个地方去,然后请求port调度器稍后调度执行这个消息,它就立即返回了。
各位看官看出来问题了吗?如果不做控制的话,每个进程都会积累很多消息,都等着port调度器后续执行。所以port调度器就有义务来为这部分消息做水位线的控制,这就很自然的引入了msgq_watermark选项。
是不是有点复杂?更复杂的还有呢?引入msgq_watermark项后,那这些发送进程就可能被挂起,那如何唤醒它呢?我们先回答这个问题,上代码:
viewsource
print?
viewsource
print?
调用流程是:erlang:port_command->erts_internal:port_command->erts_internal_port_command_3
从erts_internal_port_command_3代码可以看出来,当我们调用发送数据的时候,遇到ERTS_PORT_OP_BUSY时候,系统会被挂起,等被唤醒的时候,发生trap,erlangvm会在合适的时间再重新调用erts_internal_port_command_3完成之前未完成的事情。
温习下:从port_command这层来讲,parallelism执行的时候,如果底层的port出现busy,调用进程会经历挂起,唤醒,重新调用这套流程。
现在的问题是什么时候会出现ERTS_PORT_OP_BUSY?
从前面的分析,我们知道port_command->erts_port_output,那么我们来看下什么情况下会返回ERTS_PORT_OP_BUSY:
viewsource
print?
代码很清楚的说明了,一旦到达高水位线调用set_busy_port就会导致后续的erts_port_output调用直接返回ERTS_PORT_OP_BUSY,简单干脆。
再喝口水,接着分析,现在到了代码验证的阶段了。
重新温习下前面我们分析过的:
在parallelism模式下,当port在忙着做call_driver_outputv的时候,其他进程就不等了,直接把消息加引用计数保存到一个地方去,然后请求port调度器稍后调度执行这个消息,它就立即返回了。
viewsource
print?
从上面的代码我们知道,当port不忙的时候,就会直接调用call_driver_outputv干活,否则就会把消息延迟放到port调度去。这个环节的调用流程是
erts_port_output->erts_schedule_proc2port_signal->erts_port_task_schedule->enqueue_task->enqueue_proc2port_data.
enqueue_proc2port_data这个函数顾名思义就是把进程发送到port的数据保存到port_task的某个队列去。那么在这个地方做水位线检查是最合适的。
好吧,我们的msg_watermark相关的东西要出场了。
viewsource
print?
从源码分析来看:
1.MSGQ高低水位线也是8/4K,最小值是1,高不封顶。
2.它影响的是每个port调度器认为队列的bpq->low和bpq->high
有了这个知识,我们就很容易分析enqueue_proc2port_data是如何在数据入队列时限定高低水位线的,同时一定会有个配套的dequeued_proc2port_data在出数据队列的时候解除busy状态的。
继续代码之旅:
viewsource
print?
代码很明白的说:
任务入队列的时候超过水位线会设置ERTS_PTS_FLG_BUSY_PORT_Q状态,出队列的时候发现低于水位线的时候,会调用check_unset_busy_port_q来唤醒被挂起的发送进程。
到现在任务入队列的流程我们基本明白了,但是何时会调用dequeued_proc2port_data出队列呢?
答案是erts_port_task_execute。还记得前面分析的时候说:这些任务会入队列,port调度器会在适当的时候执行这些任务。
继续看代码:
viewsource
print?
任务调度器发现发现是ERTS_PORT_TASK_PROC_SIG类型的任务,就会调用该任务的callback,然后把该任务出队列。还记得那个任务callback叫什么吗?好记性,就是port_sig_outputv。他会真正执行把数据拖到网络的具体事情。
山路十八弯,好不容易才把整个流程给串起来,是不是很复杂!确实很复杂,前段时间感冒的时候,躺在床上想了好几天,才把事情的来龙去脉差不多搞清楚,做高性能服务器的不容易呀,来点掌声,为我们自己鼓励下!
总结:这个水位线官方文档写的不清不楚,还是源码靠谱!watermark和msgq_watermark的本质差别就是一个是控制port消息队列的水位线,一个控制进程在给并行port发送数据时,条件暂时无法满足时候,数据暂缓到port调度器时候,port调度器队列的水位线,也可以理解为inflight的数据的水位线。
服务器端我是这样设的:gen_tcp:listen(8000,[{active,false},{recbuf,1},{buffer,1}]).
客户端是这样设的:gen_tcp:connect(“localhost”,8000,[{active,false},{high_watermark,2},{low_watermark,1},{sndbuf,1},{buffer,1}]).
我客户端每次gen_tcp:send()发送一个字节,前6个字节返回ok,第7个字节阻塞
服务端每次gen_tcp:recv(_,0)接收一个字节,接收三个字节后,客户端的第7次发送返回。
按我的理解的话:应该是服务器端可以接收2个字节+sndbuf里的一个字节,第4个字节客户端就该阻塞的,可事实不时这样,求分析
这个问题确实还是比较复杂,涉及到gen_tcp的发送缓冲区和接收缓冲区,水位线等问题,其中接收缓冲区的问题在
博文里面讲的比较清楚了,今天我们重点来分析下发送缓冲区和水位线的问题。
在开始分析前,我们需要熟悉几个gen_tcp的选项,更多参见
这里:
{delay_send,Boolean}
Normally,whenanErlangprocesssendstoasocket,thedriverwilltrytoimmediatelysendthedata.Ifthatfails,thedriverwilluseanymeansavailabletoqueueupthemessagetobesentwhenevertheoperatingsystemsaysitcanhandleit.Setting{delay_send,
true}willmakeallmessagesqueueup.Thismakesthemessagesactuallysentontothenetworkbelargerbutfewer.TheoptionactuallyaffectstheschedulingofsendrequestsversusErlangprocessesinsteadofchanginganyrealpropertyofthesocket.Needless
tosayitisanimplementationspecificoption.Defaultisfalse.
{high_msgq_watermark,Size}(TCP/IPsockets)
Thesocketmessagequeuewillbesetintoabusystatewhentheamountofdataqueuedonthemessagequeuereachesthislimit.NotethatthislimitonlyconcernsdatathathavenotyetreachedtheERTSinternalsocketimplementation.Defaultvalueusedis8
kB.
Sendersofdatatothesocketwillbesuspendedifeitherthesocketmessagequeueisbusy,orthesocketitselfisbusy.
Formoreinformationseethelow_msgq_watermark,high_watermark,andlow_watermarkoptions.
Notethatdistributionsocketswilldisabletheuseofhigh_msgq_watermarkandlow_msgq_watermark,andwillinsteadusethedistributionbufferbusylimitwhichisasimilarfeature.
{high_watermark,Size}(TCP/IPsockets)
ThesocketwillbesetintoabusystatewhentheamountofdataqueuedinternallybytheERTSsocketimplementationreachesthislimit.Defaultvalueusedis8kB.
Sendersofdatatothesocketwillbesuspendedifeitherthesocketmessagequeueisbusy,orthesocketitselfisbusy.
Formoreinformationseethelow_watermark,high_msgq_watermark,andlow_msqg_watermarkoptions.
{low_msgq_watermark,Size}(TCP/IPsockets)
Ifthesocketmessagequeueisinabusystate,thesocketmessagequeuewillbesetinanotbusystatewhentheamountofdataqueuedinthemessagequeuefallsbelowthislimit.NotethatthislimitonlyconcernsdatathathavenotyetreachedtheERTSinternal
socketimplementation.Defaultvalueusedis4kB.
Sendersthathavebeensuspendedduetoeitherabusymessagequeueorabusysocket,willberesumedwhenneitherthesocketmessagequeue,northesocketarebusy.
Formoreinformationseethehigh_msgq_watermark,high_watermark,andlow_watermarkoptions.
Notethatdistributionsocketswilldisabletheuseofhigh_msgq_watermarkandlow_msgq_watermark,andwillinsteadusethedistributionbufferbusylimitwhichisasimilarfeature.
{low_watermark,Size}(TCP/IPsockets)
Ifthesocketisinabusystate,thesocketwillbesetinanotbusystatewhentheamountofdataqueuedinternallybytheERTSsocketimplementationfallsbelowthislimit.Defaultvalueusedis4kB.
Sendersthathavebeensuspendedduetoeitherabusymessagequeueorabusysocket,willberesumedwhenneitherthesocketmessagequeue,northesocketarebusy.
Formoreinformationseethehigh_watermark,high_msgq_watermark,andlow_msgq_watermarkoptions.
这选项里面两对高低水位线的设置,以及delay_send选项,对发送缓冲区的影响很大。
gen_tcp:send的行为在之前的
博文中分析的比较到位了,建议同学先看看这篇文章垫底下。
我们知道每个erlang的进程都有个消息队列,其他进程要和他通信就需要透过发消息给他,把通讯的内容在消息里面交代清楚。进程消息队列里面一旦有消息,erlang的VM就会马上准备调度该进程来让进程执行,处理消息。这个进程的消息队列机制每个erlang入门的书籍都写的非常清楚。那么port呢?在Erlang的早期,Port是和进程一样的地位,接口,使用方式。Port作为Erlang对外的IO的执行单位,也拥有自己的消息队列,当进程把消息发送给port的时候,port通常也是把消息保存在消息队列中,然后VM就会调度这个port。等到port被调度执行的时候,port把队列里面的消息消耗掉,发送到网络或者执行相应IO的操作。port的调度和erlang的进程的调度是一样的,都非常讲究公平调度。
我们来考证下port和进程消息发送的接口。我们知道!符号是erlang:send的语法糖,当我们给Port!msg或者Pid!msg,最终都是调用erlang:send来发送消息。后面不知道为什么,erlang的设计者专门为port设计了port_command系列函数专门为port发送消息。
我们来考证下:
erlang:send->BIF_RETTYPEsend_3(BIF_ALIST_3)->do_send源码在bif.c中我们来看看:
Sint |
do_send(Process*p,Etermto,Etermmsg, int suspend,Eterm*refp){ |
... |
if (is_internal_pid(to)){ |
... |
} else if (is_external_pid(to)){ |
... |
return remote_send(p,dep,to,to,msg,suspend); |
} else if (is_atom(to)){ |
... |
} else if (is_external_port(to) |
&&(external_port_dist_entry(to) |
==erts_this_dist_entry)){ |
erts_dsprintf_buf_t*dsbufp=erts_create_logger_dsbuf(); |
erts_dsprintf(dsbufp, |
"Discardingmessage%Tfrom%Tto%Tinanold" |
"incarnation(%d)ofthisnode(%d)\n" , |
msg, |
p->common.id, |
to, |
external_port_creation(to), |
erts_this_node->creation); |
erts_send_error_to_logger(p->group_leader,dsbufp); |
return 0; |
} else if (is_internal_port(to)){ |
... |
pt=erts_port_lookup(portid,ERTS_PORT_SFLGS_INVALID_LOOKUP); |
... |
switch (erts_port_command(p,ps_flags,pt,msg,refp)){ |
case ERTS_PORT_OP_CALLER_EXIT: |
... |
} |
喝口水,保存体力,重新温习下二点:1.port有消息队列。2.port也是公平调度。
有了上面的知识铺垫,我们其实就比较好明白上面选项中的水位线做什么的。和每个消息队列一样,为了防止发送者和接收者能力的失衡,通常都会设置高低水位线来保护队列不至于太大把
系统撑爆。上面的{high_watermark,Size},{low_watermark,Size}就是干这个用的。
那port是如何保护自己的呢?答案是:
当消息量达到高水位线的时候,port进入busy状态,这时候会把发送进程suspend起来,等消息达到低水位线的时候,解除busy状态,同时让发送进程继续执行。
证明上面的说法,参考下port_command
文档:
port_command(Port,Data,OptionList)->boolean()
Types:
Port=port()|atom()
Data=iodata()
Option=force|nosuspend
OptionList=[Option]
Sendsdatatoaport.port_command(Port,Data,[])equalsport_command(Port,Data).
Iftheportcommandisabortedfalseisreturned;otherwise,trueisreturned.
Iftheportisbusy,thecallingprocesswillbesuspendeduntiltheportisnotbusyanymore.
CurrentlythefollowingOptionsarevalid:
force
Thecallingprocesswillnotbesuspendediftheportisbusy;instead,theportcommandisforcedthrough.Thecallwillfailwithanotsupexceptionifthedriverof
theportdoesnotsupportthis.FormoreinformationseetheERL_DRV_FLAG_SOFT_BUSYdriverflag.
nosuspend
Thecallingprocesswillnotbesuspendediftheportisbusy;instead,theportcommandisabortedandfalseisreturned.
那如何知道一个port进入busy状态,因为这个状态通常很严重,发送进程被挂起,会引起很大的latency.
幸亏erlang考虑周到,参看
erlang:system_monitor(MonitorPid,Options)->MonSettings
busy_port
Ifaprocessinthesystemgetssuspendedbecauseitsendstoabusyport,amessage{monitor,SusPid,busy_port,Port}issenttoMonitorPid.SusPidisthepidthat
gotsuspendedwhensendingtoPort.
系统会很友好的把发生busy_port的进程发出来,我们就可以知道那个进程进程碰到高水位线被挂起了,方面我们后面调整水位线避免这种情况发生。
当用户调用gen_tcp:send要发送数据的时候最终都会调用port_command来具体执行,那么我们来看下它是如何运作的:
/*Commandshouldbeoftheform |
**{PID,close} |
**{PID,{command,io-list}} |
**{PID,{connect,New_PID}} |
*/ |
ErtsPortOpResult |
erts_port_command(Process*c_p, |
int flags, |
Port*port, |
Etermcommand, |
Eterm*refp) |
{ |
... |
if (is_tuple_arity(command,2)){ |
Etermcntd; |
tp=tuple_val(command); |
cntd=tp[1]; |
if (is_internal_pid(cntd)){ |
if (tp[2]==am_close){ |
if (!erts_port_synchronous_ops) |
refp=NULL; |
flags&=~ERTS_PORT_SIG_FLG_NOSUSPEND; |
return erts_port_exit(c_p,flags,port,cntd,am_normal,refp); |
} else if (is_tuple_arity(tp[2], |
tp=tuple_val(tp[2]); |
if (tp[1]==am_command){ |
if (!(flags&ERTS_PORT_SIG_FLG_NOSUSPEND) |
&&!erts_port_synchronous_ops) |
refp=NULL; |
return erts_port_output(c_p,flags,port,cntd,tp[2],refp); |
} |
else if (tp[1] |
if (!erts_port_synchronous_ops) |
refp=NULL; |
flags&=~ERTS_PORT_SIG_FLG_NOSUSPEND; |
return erts_port_connect(c_p,flags,port,cntd,tp[2],refp); |
} |
} |
} |
} |
} |
... |
} |
ErtsPortOpResult |
erts_port_output(Process*c_p, |
int flags, |
Port*prt, |
Etermfrom, |
Etermlist, |
Eterm*refp) |
{ |
... |
try_call=(force_immediate_call /*crashdumping*/ |
||!(sched_flags&(invalid_flags |
|ERTS_PTS_FLGS_FORCE_SCHEDULE_OP))); |
if (drv->outputv){ |
try_call_state.pre_chk_sched_flags=0; /*alreadychecked*/ |
if (force_immediate_call) |
try_call_res=force_imm_drv_call(&try_call_state); |
else |
try_call_res=try_imm_drv_call(&try_call_state); |
switch (try_call_res){ |
case ERTS_TRY_IMM_DRV_CALL_OK: |
call_driver_outputv(flags&ERTS_PORT_SIG_FLG_BANG_OP, |
c_p?c_p->common.id:ERTS_INVALID_PID, |
from, |
prt, |
drv, |
evp); |
if (force_immediate_call) |
finalize_force_imm_drv_call(&try_call_state); |
else |
finalize_imm_drv_call(&try_call_state); |
/*Fallthrough...*/ |
... |
case ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS: |
sched_flags=try_call_state.sched_flags; |
case ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK: |
/*Scheduleoutputv()callinstead...*/ |
break ; |
... |
} |
static ERTS_INLINE void |
call_driver_outputv( int bang_op, |
Etermcaller, |
Etermfrom, |
Port*prt, |
erts_driver_t*drv, |
ErlIOVec*evp) |
{ |
/* |
*if(bang_op) |
*wearepartofa"Prt!{From,{command,Data}}"operation |
*else |
*wearepartofacalltoport_command/[2,3] |
*behaveaccordingly... |
*/ |
if (bang_op&&from!=ERTS_PORT_GET_CONNECTED(prt)) |
send_badsig(prt); |
else { |
... |
prt->caller=caller; |
(*drv->outputv)((ErlDrvData)prt->drv_data,evp); |
prt->caller=NIL; |
prt->bytes_out+=size; |
erts_smp_atomic_add_nob(&erts_bytes_out,size); |
} |
... |
} |
这个复杂的流程就是msgq_watermark水位线发挥作用地方,我们暂时不分析,等后面讲msgq_watermark的时候一起。
目前只需要知道最终gen_tcp:send发松数据会调用portdriver的outputv回调函数输出就好了。
接着源码分析:
static struct erl_drv_entrytcp_inet_driver_entry |
{ |
tcp_inet_init, /*inet_initwilladdthisdriver!!*/ |
tcp_inet_start, |
tcp_inet_stop, |
tcp_inet_command, |
tcp_inet_drv_input, |
tcp_inet_drv_output, |
"tcp_inet" , |
NULL, |
NULL, |
tcp_inet_ctl, |
tcp_inet_timeout, |
tcp_inet_commandv, |
... |
} |
static void tcp_inet_commandv(ErlDrvData |
{ |
tcp_descriptor*desc=(tcp_descriptor*)e; |
desc->inet.caller=driver_caller(desc->inet.port); |
DEBUGF(( "tcp_inet_commanv(%ld){s=%d\r\n" , |
( long )desc->inet.port,desc->inet.s)); |
if (!IS_CONNECTED(INETP(desc))){ |
if (desc->tcp_add_flags&TCP_ADDF_DELAYED_CLOSE_SEND){ |
desc->tcp_add_flags&=~TCP_ADDF_DELAYED_CLOSE_SEND; |
inet_reply_error_am(INETP(desc),am_closed); |
} |
else |
inet_reply_error(INETP(desc),ENOTCONN); |
} |
else if (tcp_sendv(desc, |
inet_reply_ok(INETP(desc)); |
DEBUGF(( "tcp_inet_commandv(%ld)}\r\n" ,( long )desc->inet.port)); |
} |
好吧,喝口水,休息下。这里我们梳理下我们的数据路线:
gen_tcp:send->port_command->erts_port_output->call_driver_outputv->tcp_inet_commandv->tcp_sendv
大家要牢记在心。
继续接着我们参照源码来分析下水位线的实现:
/*inet_drv.c*/ |
#defineINET_LOPT_TCP_HIWTRMRK27/*setlocalhighwatermark*/ |
#defineINET_LOPT_TCP_LOWTRMRK28/*setlocallowwatermark*/ |
#defineINET_HIGH_WATERMARK(1024*8)/*8kpendinghigh=>busy*/ |
#defineINET_LOW_WATERMARK(1024*4)/*4kpending=>allowmore*/ |
typedef struct { |
... |
int high; /*highwatermark*/ |
int low; /*lowwatermark*/ |
... |
}tcp_descriptor; |
static ErlDrvDatatcp_inet_start(ErlDrvPortport, char *args) |
{ |
... |
desc->high=INET_HIGH_WATERMARK; |
desc->low=INET_LOW_WATERMARK; |
... |
} |
static int inet_set_opts(inet_descriptor* char *ptr, int len) |
{ |
... |
case INET_LOPT_TCP_HIWTRMRK: |
if (desc->stype==SOCK_STREAM){ |
tcp_descriptor*tdesc=(tcp_descriptor*)desc; |
if (ival<0)ival=0; |
if (tdesc->low>ival) |
tdesc->low=ival; |
tdesc->high=ival; |
} |
continue ; |
case INET_LOPT_TCP_LOWTRMRK: |
if (desc->stype==SOCK_STREAM){ |
tcp_descriptor*tdesc=(tcp_descriptor*)desc; |
if (ival<0)ival=0; |
if (tdesc->high<ival) |
tdesc->high=ival; |
tdesc->low=ival; |
} |
continue ; |
... |
} |
/*Copyadescriptor,bycreatinganewportwithsamesettings |
*asthedescriptordesc. |
*returnNULLonerror(SYSTEM_LIMITnoportsavail) |
*/ |
static tcp_descriptor*tcp_inet_copy(tcp_descriptor*desc,SOCKETs, |
ErlDrvTermDataowner, int *err) |
{ |
... |
copy_desc->high=desc->high; |
copy_desc->low=desc->low; |
... |
} |
static int tcp_sendv(tcp_descriptor* |
{ |
... |
if ((sz=driver_sizeq(ix))>0){ |
driver_enqv(ix,ev,0); |
if (sz+ev->size>=desc->high){ |
DEBUGF(( "tcp_sendv(%ld):s=%d,senderforcedbusy\r\n" , |
( long )desc->inet.port,desc->inet.s)); |
desc->inet.state|=INET_F_BUSY; /*markforlow-watermark*/ |
desc->inet.busy_caller=desc->inet.caller; |
set_busy_port(desc->inet.port,1); |
if (desc->send_timeout!=INET_INFINITY){ |
desc->busy_on_send=1; |
driver_set_timer(desc->inet.port,desc->send_timeout); |
} |
return 1; |
} |
... |
} |
/*socketreadyforouput: |
**1.INET_STATE_CONNECTING=>nonblockconnect? |
**2.INET_STATE_CONNECTED=>writeoutput |
*/ |
static int tcp_inet_output(tcp_descriptor* HANDLE event) |
{ |
... |
if (driver_deq(ix,n)<=desc->low){ |
if (IS_BUSY(INETP(desc))){ |
desc->inet.caller=desc->inet.busy_caller; |
desc->inet.state&=~INET_F_BUSY; |
set_busy_port(desc->inet.port,0); |
/*ifwehaveatimerthencancelandsendoktoclient*/ |
if (desc->busy_on_send){ |
driver_cancel_timer(desc->inet.port); |
desc->busy_on_send=0; |
} |
inet_reply_ok(INETP(desc)); |
} |
} |
... |
} |
1.水位线设置是可以继承的。
2.高低水位线默认是8K/4K.
3.进入高水位后,port进入busy状态。
4.当消息消耗到小于低水位线,busy解除。
这个水位线的说明和文档解释的一样,接下来我们稍微看看delay_send的实现原理,还是继续上源码:
/*TCPadditionalflags*/ |
#defineTCP_ADDF_DELAY_SEND1 |
static int inet_set_opts(inet_descriptor* char *ptr, int len) |
{ |
... |
case INET_LOPT_TCP_DELAY_SEND: |
if (desc->stype==SOCK_STREAM){ |
tcp_descriptor*tdesc=(tcp_descriptor*)desc; |
if (ival) |
tdesc->tcp_add_flags|=TCP_ADDF_DELAY_SEND; |
else |
tdesc->tcp_add_flags&=~TCP_ADDF_DELAY_SEND; |
} |
continue ; |
... |
} |
/* |
**Sendnon-blockingvectordata |
*/ |
static int tcp_sendv(tcp_descriptor* |
{ |
... |
if (INETP(desc)->is_ignored){ |
INETP(desc)->is_ignored|=INET_IGNORE_WRITE; |
n=0; |
} else if (desc->tcp_add_flags |
n=0; |
} else if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s, |
vsize,&n,0))){ |
if ((sock_errno()!=ERRNO_BLOCK)&&(sock_errno()!=EINTR)){ |
int err=sock_errno(); |
DEBUGF(( "tcp_sendv(%ld):s=%d," |
"sock_sendv(size=2)errno=%d\r\n" , |
( long )desc->inet.port,desc->inet.s,err)); |
return tcp_send_error(desc,err); |
} |
n=0; |
} |
else { |
DEBUGF(( "tcp_sendv(%ld):s=%d,onlysent" |
LLU "/%dof" LLU "/%d bytes/items\r\n" , |
( long )desc->inet.port,desc->inet.s, |
(llu_t)n,vsize,(llu_t)ev->size,ev->vsize)); |
} |
DEBUGF(( "tcp_sendv(%ld):s=%d,Sendfailed,queuing\r\n" , |
( long )desc->inet.port,desc->inet.s)); |
driver_enqv(ix,ev,n); |
if (!INETP(desc)->is_ignored) |
sock_select(INETP(desc),(FD_WRITE|FD_CLOSE),1); |
... |
} |
static void tcp_inet_drv_output(ErlDrvData |
{ |
( void )tcp_inet_output((tcp_descriptor*)data,( HANDLE )event); |
} |
/*socketreadyforouput: |
**1.INET_STATE_CONNECTING=>nonblockconnect? |
**2.INET_STATE_CONNECTED=>writeoutput |
*/ |
static int tcp_inet_output(tcp_descriptor* HANDLE event) |
{ |
... |
else if (IS_CONNECTED(INETP(desc))) |
for (;;){ |
int vsize; |
ssize_tn; |
SysIOVec*iov; |
if ((iov=driver_peekq(ix,&vsize))==NULL){ |
sock_select(INETP(desc),FD_WRITE,0); |
send_empty_out_q_msgs(INETP(desc)); |
goto done; |
} |
vsize=vsize>MAX_VSIZE?MAX_VSIZE:vsize; |
DEBUGF(( "tcp_inet_output(%ld):s=%d,Abouttosend%ditems\r\n" , |
( long )desc->inet.port,desc->inet.s,vsize)); |
if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s,iov,vsize,&n,0))){ |
if ((sock_errno()!=ERRNO_BLOCK)&&(sock_errno()!=EINTR)){ |
DEBUGF(( "tcp_inet_output(%ld):sock_sendv(%d)errno=%d\r\n" , |
( long )desc->inet.port,vsize,sock_errno())); |
ret=tcp_send_error(desc,sock_errno()); |
goto done; |
} |
goto done; |
} |
if (driver_deq(ix,n)<=desc->low){ |
if (IS_BUSY(INETP(desc))){ |
desc->inet.caller=desc->inet.busy_caller; |
desc->inet.state&=~INET_F_BUSY; |
set_busy_port(desc->inet.port,0); |
/*ifwehaveatimerthencancelandsendoktoclient*/ |
if (desc->busy_on_send){ |
driver_cancel_timer(desc->inet.port); |
desc->busy_on_send=0; |
} |
inet_reply_ok(INETP(desc)); |
} |
} |
... |
} |
1.delay_send标志是否设置,如果设置就不尝试调用sock_sendv发送。
2.调用sock_sendv发送网络数据,剩下的部分数据保存到驱动的队列去。
3.如果队列有数据的话,就把把epoll的写事件挂载上。
4.后续epoll会通知socket可写的时候,会调用tcp_inet_drv_output
5.tcp_inet_drv_output->tcp_inet_output继续把之前在队列里面的数据透过sock_sendv再次发送到网络
步骤3和4之间需要时间,依赖于epoll的写事件发生的以及port调度的时间点。
所以简单的说:delay_send就是在第一阶段不尝试发送数据,直接把数据推入port的消息队列去,等后面epoll说socket可写的时候一起发送出去。
这种做法的好处是gen_tcp:send马上就可以返回,因为sock_send通常要耗费几十us的时间,可用在对发送的latency很敏感的场合。
到这里为止,我们清楚的分析了数据是如何在port的各个链条里面流动.
再回顾下:当gen_tcp:send数据无法离开通过网络发送出去的时候,会暂时保留在port的消息队列里面,当消息队列满(到高水位线)的时候,port就会busy,抑制发送者推送更多的数据。当epoll探测到socket可写的时候,vm会调用tcp_inet_output把消息队列里面的数据,拉到网络去,这个过程中,队列里面的数据会越来越少,少到低水位线的时候,解除busy,好让发送者发送更多的数据。
再喝口水,我们接着分析msgq_watermark.这又是很大的一个坑,大家要坚持住,好像又是一顿分析!
首先,还是普及下知识:
从R16B的
—Latencyofsignalssentfromprocessestoports—Signals
fromprocessestoportswherepreviouslyalwaysdelivered
immediately.Thiskeptlatencyforsuchcommunicationtoa
minimum,butitcouldcauselockcontentionwhichwasvery
expensiveforthesystemasawhole.Inordertokeepthis
latencylowalsointhefuture,mostsignalsfromprocesses
toportsarebydefaultstilldeliveredimmediatelyaslong
asnoconflictsoccur.Suchconflictsincludenotbeingable
toacquiretheportlock,butalsoincludeotherconflicts.
Whenaconflictoccur,thesignalwillbescheduledfor
deliveryatalatertime.Ascheduledsignaldeliverymay
causeahigherlatencyforthisspecificcommunication,but
improvestheoverallperformanceofthesystemsinceit
reducelockcontentionbetweenschedulers.Thedefault
behaviorofonlyschedulingdeliveryofthesesignalson
conflictcanbechangedbypassingthe+sppcommandlineflag
toerl(1).Thebehaviorcanalsobechangedonportbasis
usingtheparallelismoptionoftheopen_port/2BIF.
简单的说,过去进程给port发送数据的时候,都是立即锁定port,调用call_driver_outputv来消耗数据,干完活解锁。这样对于单个请求来讲,latency最低。但是如果系统有多个进程给同一个port发送数据,锁的碰撞率就会很高,势必影响到port的吞吐量。所以新的VM引入了port的parallelism这个概念,也就是说当锁冲突的时候,不是在那里傻傻的等,而是把要发送的数据引用计数后,通知port调度器在合适的时间,也就是说在
port不忙的时候,择机call_driver_outputv来消耗数据,这样就会大大提高吞吐量。
考证下代码:
int erts_port_parallelism=0; |
void |
erl_start( int argc, char **argv) |
{ |
... |
else if (has_prefix( "pp" , |
arg=get_arg(sub_param+2,argv[i+1],&i); |
if (sys_strcmp(arg, "true" )==0) |
erts_port_parallelism=1; |
else if (sys_strcmp(arg, "false" )==0) |
erts_port_parallelism=0; |
... |
} |
static Port* |
open_port(Process*p,Etermname,Etermsettings, int *err_typep, int *err_nump) |
{ |
... |
opts.parallelism=erts_port_parallelism; |
... |
} else if (option==am_parallelism){ |
if (*tp==am_true) |
opts.parallelism=1; |
else if (*tp |
opts.parallelism=0; |
else |
... |
} |
1.port并行发送的行为为了和过去的版本兼容默认是关闭的,但是可以用+spp全局打开
2.在
{parallelism,Boolean}
Setschedulerhintforportparallelism.Ifsettotrue,theVMwillscheduleporttaskswhenitbythiscanimprovetheparallelisminthesystem.Ifsettofalse,theVMwilltrytoperformporttasksimmediatelyandbythisimprovingthelatencyattheexpense
ofparallelism.Thedefaultcanbesetonsystemstartupbypassingthe+sppcommandlineargumenttoerl(1).
那开了这个选项后,对gen_tcp(port)有什么影响呢?最明显的区别是过去call_driver_outputv是排队执行的,谁先拿到锁,谁先执行。那么在driver_outputv里面如果消息消耗不了,有可能会把数据加到port的消息队列去。这个我们前面分析过,每个消息队列有高低水位线来控制,总能保证消息在一定的量。但是parallelism了后,当port在忙着做call_driver_outputv的时候,其他进程就不等了,直接把消息加引用计数保存到一个地方去,然后请求port调度器稍后调度执行这个消息,它就立即返回了。
各位看官看出来问题了吗?如果不做控制的话,每个进程都会积累很多消息,都等着port调度器后续执行。所以port调度器就有义务来为这部分消息做水位线的控制,这就很自然的引入了msgq_watermark选项。
是不是有点复杂?更复杂的还有呢?引入msgq_watermark项后,那这些发送进程就可能被挂起,那如何唤醒它呢?我们先回答这个问题,上代码:
%%erlang.erl |
port_command( Port , Data ) -> |
case case erts_internal:port_command ( Port , Data , []) of |
Ref when erlang:is_reference ( Ref ) -> receive { Ref , Res } -> Res end ; |
Res -> Res |
end of |
true->true; |
Error -> erlang:error ( Error ,[ Port , Data ]) |
end . |
/* |
*erts_internal:port_command/3isusedbythe |
*erlang:port_command/2anderlang:port_command/3 |
*BIFs. |
*/ |
BIF_RETTYPEerts_internal_port_command_3(BIF_ALIST_3) |
{ |
... |
prt=lookup_port(BIF_P,BIF_ARG_1); |
if (!prt) |
BIF_RET(am_badarg); |
... |
switch (erts_port_output(BIF_P,flags,prt,prt->common.id,BIF_ARG_2,&ref)){ |
case ERTS_PORT_OP_CALLER_EXIT: |
case ERTS_PORT_OP_BADARG: |
case ERTS_PORT_OP_DROPPED: |
ERTS_BIF_PREP_RET(res,am_badarg); |
break ; |
case ERTS_PORT_OP_BUSY: |
ASSERT(!(flags&ERTS_PORT_SIG_FLG_FORCE)); |
if (flags&ERTS_PORT_SIG_FLG_NOSUSPEND) |
ERTS_BIF_PREP_RET(res,am_false); |
else { |
erts_suspend(BIF_P,ERTS_PROC_LOCK_MAIN,prt); |
ERTS_BIF_PREP_YIELD3(res,bif_export[BIF_erts_internal_port_command_3], |
BIF_P,BIF_ARG_1,BIF_ARG_2,BIF_ARG_3); |
} |
break ; |
case ERTS_PORT_OP_BUSY_SCHEDULED: |
ASSERT(!(flags&ERTS_PORT_SIG_FLG_FORCE)); |
/*Fallthrough...*/ |
case ERTS_PORT_OP_SCHEDULED: |
ASSERT(is_internal_ref(ref)); |
ERTS_BIF_PREP_RET(res,ref); |
break ; |
} |
从erts_internal_port_command_3代码可以看出来,当我们调用发送数据的时候,遇到ERTS_PORT_OP_BUSY时候,系统会被挂起,等被唤醒的时候,发生trap,erlangvm会在合适的时间再重新调用erts_internal_port_command_3完成之前未完成的事情。
温习下:从port_command这层来讲,parallelism执行的时候,如果底层的port出现busy,调用进程会经历挂起,唤醒,重新调用这套流程。
现在的问题是什么时候会出现ERTS_PORT_OP_BUSY?
从前面的分析,我们知道port_command->erts_port_output,那么我们来看下什么情况下会返回ERTS_PORT_OP_BUSY:
ErtsPortOpResult |
erts_port_output(Process*c_p, |
int flags, |
Port*prt, |
Etermfrom, |
Etermlist, |
Eterm*refp) |
{ |
... |
/* |
*Assumescallerhavecheckedthatportisvalid... |
*/ |
sched_flags=erts_smp_atomic32_read_nob(&prt->sched.flags); |
if (sched_flags&(busy_flgs|ERTS_PTS_FLG_EXIT)) |
return ((sched_flags&ERTS_PTS_FLG_EXIT) |
?ERTS_PORT_OP_DROPPED |
:ERTS_PORT_OP_BUSY); |
... |
} |
void |
set_busy_port(ErlDrvPortdprt, int on) |
{ |
... |
if (on){ |
flags=erts_smp_atomic32_read_bor_acqb(&prt->sched.flags, |
ERTS_PTS_FLG_BUSY_PORT); |
if (flags&ERTS_PTS_FLG_BUSY_PORT) |
return ; /*Alreadybusy |
... |
} |
再喝口水,接着分析,现在到了代码验证的阶段了。
重新温习下前面我们分析过的:
在parallelism模式下,当port在忙着做call_driver_outputv的时候,其他进程就不等了,直接把消息加引用计数保存到一个地方去,然后请求port调度器稍后调度执行这个消息,它就立即返回了。
ErtsPortOpResult |
erts_port_output(Process*c_p, |
int flags, |
Port*prt, |
Etermfrom, |
Etermlist, |
Eterm*refp) |
{ |
... |
if (drv->outputv){ |
... |
} |
... |
if (!try_call){ |
int i; |
/*Needtoincreaserefconallbinaries*/ |
for (i=1;i<evp->vsize;i++) |
if (bvp[i]) |
driver_binary_inc_refc(bvp[i]); |
} |
else { |
int i; |
ErlIOVec*new_evp; |
ErtsTryImmDrvCallResulttry_call_res; |
ErtsTryImmDrvCallStatetry_call_state |
=ERTS_INIT_TRY_IMM_DRV_CALL_STATE( |
c_p, |
prt, |
ERTS_PORT_SFLGS_INVALID_LOOKUP, |
invalid_flags, |
!refp, |
am_command); |
try_call_state.pre_chk_sched_flags=0; /*alreadychecked*/ |
if (force_immediate_call) |
try_call_res=force_imm_drv_call(&try_call_state); |
else |
try_call_res=try_imm_drv_call(&try_call_state); |
switch (try_call_res){ |
case ERTS_TRY_IMM_DRV_CALL_OK: |
call_driver_outputv(flags&ERTS_PORT_SIG_FLG_BANG_OP, |
c_p?c_p->common.id:ERTS_INVALID_PID, |
from, |
prt, |
drv, |
evp); |
if (force_immediate_call) |
finalize_force_imm_drv_call(&try_call_state); |
else |
finalize_imm_drv_call(&try_call_state); |
/*Fallthrough...*/ |
case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: |
..... |
case ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS: |
sched_flags=try_call_state.sched_flags; |
case ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK: |
/*Scheduleoutputv()callinstead...*/ |
break ; |
} |
sigdp=erts_port_task_alloc_p2p_sig_data(); |
sigdp->flags=ERTS_P2P_SIG_TYPE_OUTPUTV; |
sigdp->u.outputv.from=from; |
sigdp->u.outputv.evp=evp; |
sigdp->u.outputv.cbinp=cbin; |
port_sig_callback=port_sig_outputv; |
} |
res=erts_schedule_proc2port_signal(c_p, |
prt, |
c_p?c_p->common.id:ERTS_INVALID_PID, |
refp, |
sigdp, |
task_flags, |
port_sig_callback); |
if (res!=ERTS_PORT_OP_SCHEDULED){ |
if (drv->outputv) |
cleanup_scheduled_outputv(evp,cbin); |
else |
cleanup_scheduled_output(buf); |
return res; |
} |
} |
static int |
port_sig_outputv(Port*prt,erts_aint32_tstate, int op,ErtsProc2PortSigData*sigdp) |
{ |
Etermreply; |
switch (op){ |
case ERTS_PROC2PORT_SIG_EXEC: |
/*Executionofascheduledoutputv()call*/ |
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); |
if (state&ERTS_PORT_SFLGS_INVALID_LOOKUP) |
reply=am_badarg; |
else { |
call_driver_outputv(sigdp->flags&ERTS_P2P_SIG_DATA_FLG_BANG_OP, |
sigdp->caller, |
sigdp->u.outputv.from, |
prt, |
prt->drv_ptr, |
sigdp->u.outputv.evp); |
reply=am_true; |
} |
break ; |
... |
} |
ErtsPortOpResult |
erts_schedule_proc2port_signal(Process*c_p, |
Port*prt, |
Etermcaller, |
Eterm*refp, |
ErtsProc2PortSigData*sigdp, |
int task_flags, |
ErtsProc2PortSigCallbackcallback) |
{ |
... |
/*Scheduleportclosecallforlaterexecution...*/ |
sched_res=erts_port_task_schedule(prt->common.id, |
NULL, |
ERTS_PORT_TASK_PROC_SIG, |
sigdp, |
callback, |
task_flags); |
... |
} |
/* |
*Scheduleatask. |
*/ |
int |
erts_port_task_schedule(Etermid, |
ErtsPortTaskHandle*pthp, |
ErtsPortTaskTypetype, |
...) |
{ |
... |
if (!enqueue_task(pp,ptp,sigdp,ns_pthlp,&act)){ |
reset_handle(ptp); |
if (ns_pthlp&&!(act&ERTS_PTS_FLG_EXIT)) |
goto abort_nosuspend; |
else |
goto fail; |
} |
... |
} |
static ERTS_INLINE int |
enqueue_task(Port*pp, |
ErtsPortTask*ptp, |
ErtsProc2PortSigData*sigdp, |
ErtsPortTaskHandleList*ns_pthlp, |
erts_aint32_t*flagsp) |
{ |
... |
pp->sched.taskq.in.last=ptp; |
flags=enqueue_proc2port_data(pp,sigdp,flags); |
res=1; |
... |
} |
erts_port_output->erts_schedule_proc2port_signal->erts_port_task_schedule->enqueue_task->enqueue_proc2port_data.
enqueue_proc2port_data这个函数顾名思义就是把进程发送到port的数据保存到port_task的某个队列去。那么在这个地方做水位线检查是最合适的。
好吧,我们的msg_watermark相关的东西要出场了。
/*erl_driver.h*/ |
#defineERL_DRV_BUSY_MSGQ_LIM_MAX(ERL_DRV_BUSY_MSGQ_DISABLED-1) |
#defineERL_DRV_BUSY_MSGQ_LIM_MIN((ErlDrvSizeT)1) |
/*inet_drv.c*/ |
#defineINET_HIGH_MSGQ_WATERMARK(1024*8)/*8kpendinghigh=>busy*/ |
#defineINET_LOW_MSGQ_WATERMARK(1024*4)/*4kpending=>allowmore*/ |
static ErlDrvDatatcp_inet_start(ErlDrvPortport, char *args) |
{ |
... |
q_high=INET_HIGH_MSGQ_WATERMARK; |
q_low=INET_LOW_MSGQ_WATERMARK; |
... |
if (q_high<ERL_DRV_BUSY_MSGQ_LIM_MIN) |
q_high=ERL_DRV_BUSY_MSGQ_LIM_MIN; |
else if (q_high |
q_high=ERL_DRV_BUSY_MSGQ_LIM_MAX; |
erl_drv_busy_msgq_limits(port,&q_low,&q_high); |
... |
} |
/*erl_port_task.c*/ |
/* |
*erl_drv_busy_msgq_limits()iscalledbydriverseitherreadingor |
*writingthelimits. |
* |
*Alimitofzeroisinterpretedasareadonlyrequest(usinga |
*limitofzerowouldnotbeuseful).Othervaluesareinterpreted |
*asawrite-readrequest. |
*/ |
void |
erl_drv_busy_msgq_limits(ErlDrvPortdport,ErlDrvSizeT*lowp,ErlDrvSizeT*highp) |
{ |
... |
Port*pp=erts_drvport2port(dport,NULL); |
ErtsPortTaskBusyPortQ*bpq=pp->sched.taskq.bpq; |
... |
if (!low) |
low=(ErlDrvSizeT)erts_smp_atomic_read_nob(&bpq->low); |
else { |
if (bpq->high<low) |
bpq->high=low; |
erts_smp_atomic_set_relb(&bpq->low,(erts_aint_t)low); |
written=1; |
} |
if (!high) |
high=bpq->high; |
else { |
if (low>high){ |
low=high; |
erts_smp_atomic_set_relb(&bpq->low,(erts_aint_t)low); |
} |
bpq->high=high; |
written=1; |
} |
... |
} |
1.MSGQ高低水位线也是8/4K,最小值是1,高不封顶。
2.它影响的是每个port调度器认为队列的bpq->low和bpq->high
有了这个知识,我们就很容易分析enqueue_proc2port_data是如何在数据入队列时限定高低水位线的,同时一定会有个配套的dequeued_proc2port_data在出数据队列的时候解除busy状态的。
继续代码之旅:
static ERTS_INLINEerts_aint32_t |
enqueue_proc2port_data(Port*pp, |
ErtsProc2PortSigData*sigdp, |
erts_aint32_tflags) |
{ |
ErtsPortTaskBusyPortQ*bpq=pp->sched.taskq.bpq; |
... |
if (!(flags&ERTS_PTS_FLG_BUSY_PORT_Q)&&qsz>bpq->high){ |
flags=erts_smp_atomic32_read_bor_acqb(&pp->sched.flags, |
ERTS_PTS_FLG_BUSY_PORT_Q); |
flags|=ERTS_PTS_FLG_BUSY_PORT_Q; |
qsz=(ErlDrvSizeT)erts_smp_atomic_read_acqb(&bpq->size); |
if (qsz<(ErlDrvSizeT)erts_smp_atomic_read_nob(&bpq->low)){ |
flags=(erts_smp_atomic32_read_bor_relb( |
&pp->sched.flags, |
ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)); |
flags|=ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q; |
} |
} |
... |
} |
static ERTS_INLINE void |
dequeued_proc2port_data(Port*pp,ErlDrvSizeTsize) |
{ |
ErtsPortTaskBusyPortQ*bpq; |
erts_aint32_tflags; |
ErlDrvSizeTqsz; |
ASSERT(pp->sched.taskq.bpq); |
if (size==0) |
return ; |
bpq=pp->sched.taskq.bpq; |
qsz=(ErlDrvSizeT)erts_smp_atomic_add_read_acqb(&bpq->size, |
(erts_aint_t)-size); |
ASSERT(qsz+size>qsz); |
flags=erts_smp_atomic32_read_nob(&pp->sched.flags); |
if (!(flags&ERTS_PTS_FLG_BUSY_PORT_Q)) |
return ; |
if (qsz<(ErlDrvSizeT)erts_smp_atomic_read_acqb(&bpq->low)) |
check_unset_busy_port_q(pp,flags,bpq); |
} |
/* |
*Busyportqueuemanagement |
*/ |
static erts_aint32_t |
check_unset_busy_port_q(Port*pp, |
erts_aint32_tflags, |
ErtsPortTaskBusyPortQ*bpq) |
{ |
ErlDrvSizeTqsize,low; |
int resume_procs=0; |
ASSERT(bpq); |
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); |
erts_port_task_sched_lock(&pp->sched); |
qsize=(ErlDrvSizeT)erts_smp_atomic_read_nob(&bpq->size); |
low=(ErlDrvSizeT)erts_smp_atomic_read_nob(&bpq->low); |
if (qsize<low){ |
erts_aint32_tmask=~(ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q |
|ERTS_PTS_FLG_BUSY_PORT_Q); |
flags=erts_smp_atomic32_read_band_relb(&pp->sched.flags,mask); |
if ((flags&ERTS_PTS_FLGS_BUSY)==ERTS_PTS_FLG_BUSY_PORT_Q) |
resume_procs=1; |
} |
else if (flags |
flags=erts_smp_atomic32_read_band_relb(&pp->sched.flags, |
~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q); |
flags&=~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q; |
} |
erts_port_task_sched_unlock(&pp->sched); |
if (resume_procs) |
erts_port_resume_procs(pp); |
return flags; |
} |
任务入队列的时候超过水位线会设置ERTS_PTS_FLG_BUSY_PORT_Q状态,出队列的时候发现低于水位线的时候,会调用check_unset_busy_port_q来唤醒被挂起的发送进程。
到现在任务入队列的流程我们基本明白了,但是何时会调用dequeued_proc2port_data出队列呢?
答案是erts_port_task_execute。还记得前面分析的时候说:这些任务会入队列,port调度器会在适当的时候执行这些任务。
继续看代码:
int |
erts_port_task_execute(ErtsRunQueue*runq,Port**curr_port_pp) |
{ |
... |
pp=pop_port(runq); |
if (!pp){ |
res=0; |
goto done; |
} |
... |
while (1){ |
erts_aint32_ttask_state; |
ErtsPortTask*ptp; |
ptp=select_task_for_exec(pp,&execq,&processing_busy_q); |
if (!ptp) |
break ; |
... |
switch (ptp->type){ |
case ERTS_PORT_TASK_PROC_SIG:{ |
ErtsProc2PortSigData*sigdp=&ptp->u.alive.td.psig.data; |
ASSERT((state&ERTS_PORT_SFLGS_DEAD)==0); |
if (!pp->sched.taskq.bpq) |
reds+=ptp->u.alive.td.psig.callback(pp, |
state, |
ERTS_PROC2PORT_SIG_EXEC, |
sigdp); |
else { |
ErlDrvSizeTsize=erts_proc2port_sig_command_data_size(sigdp); |
reds+=ptp->u.alive.td.psig.callback(pp, |
state, |
ERTS_PROC2PORT_SIG_EXEC, |
sigdp); |
dequeued_proc2port_data(pp,size); |
} |
break ; |
} |
... |
} |
山路十八弯,好不容易才把整个流程给串起来,是不是很复杂!确实很复杂,前段时间感冒的时候,躺在床上想了好几天,才把事情的来龙去脉差不多搞清楚,做高性能服务器的不容易呀,来点掌声,为我们自己鼓励下!
总结:这个水位线官方文档写的不清不楚,还是源码靠谱!watermark和msgq_watermark的本质差别就是一个是控制port消息队列的水位线,一个控制进程在给并行port发送数据时,条件暂时无法满足时候,数据暂缓到port调度器时候,port调度器队列的水位线,也可以理解为inflight的数据的水位线。
相关文章推荐
- 未公开的gen_tcp:unrecv以及接收缓冲区行为分析
- gen_tcp接受链接时enfile的问题分析及解决
- TCP同步传送数据示例以及可能出现问题分析
- TCP同步传送数据示例以及可能出现问题分析
- gen_tcp发送进程被挂起起因分析及对策
- UNIX网络编程6 从tcpdump了解到的TCP/UDP发送限制和缓冲区问题
- [转载]TCP的网络编程中一些典型的问题,以及一些分析和解决方案
- 网络编程中TCP、UDP的发送缓冲区问题(转)
- 利用TCP从客户端往服务器端发送一个文件过程中发生的问题
- 睿城_技术02----说一下关于平台在属性查询,以及空间查询分析等时候遇到的问题
- tcp socket的发送与接收缓冲区(转)
- hbase region lookups流程以及rpc线程卡死问题分析
- Android UDP通信的简单实例和用法归纳,以及与TCP对比、常见问题
- 邮件服务器发送收到553的问题分析
- iOS支付宝支付(Alipay)详细接入流程以及项目中遇到的问题分析
- net.tcp协议的wcf服务在远程计算机无法调用问题分析
- Linux中TCP和UDP发送和接收缓冲区
- 使用setsockopt()接口,设置TCP的接收与发送超时,Invalid argument错误问题
- 使用setsockopt()接口,设置TCP的接收与发送超时,Invalid argument错误问题
- TCP粘包问题分析和解决(全)