您的位置:首页 > 理论基础 > 计算机网络

gen_tcp发送缓冲区以及水位线问题分析

2013-06-07 12:56 405 查看
前段时间有同学在线上问了个问题:

服务器端我是这样设的:gen_tcp:listen(8000,[{active,false},{recbuf,1},{buffer,1}]).

客户端是这样设的:gen_tcp:connect(“localhost”,8000,[{active,false},{high_watermark,2},{low_watermark,1},{sndbuf,1},{buffer,1}]).

我客户端每次gen_tcp:send()发送一个字节,前6个字节返回ok,第7个字节阻塞

服务端每次gen_tcp:recv(_,0)接收一个字节,接收三个字节后,客户端的第7次发送返回。

按我的理解的话:应该是服务器端可以接收2个字节+sndbuf里的一个字节,第4个字节客户端就该阻塞的,可事实不时这样,求分析

这个问题确实还是比较复杂,涉及到gen_tcp的发送缓冲区和接收缓冲区,水位线等问题,其中接收缓冲区的问题在这篇以及这篇
博文里面讲的比较清楚了,今天我们重点来分析下发送缓冲区和水位线的问题。

在开始分析前,我们需要熟悉几个gen_tcp的选项,更多参见
这里:

{delay_send,Boolean}

Normally,whenanErlangprocesssendstoasocket,thedriverwilltrytoimmediatelysendthedata.Ifthatfails,thedriverwilluseanymeansavailabletoqueueupthemessagetobesentwhenevertheoperatingsystemsaysitcanhandleit.Setting{delay_send,
true}willmakeallmessagesqueueup.Thismakesthemessagesactuallysentontothenetworkbelargerbutfewer.TheoptionactuallyaffectstheschedulingofsendrequestsversusErlangprocessesinsteadofchanginganyrealpropertyofthesocket.Needless
tosayitisanimplementationspecificoption.Defaultisfalse.

{high_msgq_watermark,Size}(TCP/IPsockets)

Thesocketmessagequeuewillbesetintoabusystatewhentheamountofdataqueuedonthemessagequeuereachesthislimit.NotethatthislimitonlyconcernsdatathathavenotyetreachedtheERTSinternalsocketimplementation.Defaultvalueusedis8
kB.

Sendersofdatatothesocketwillbesuspendedifeitherthesocketmessagequeueisbusy,orthesocketitselfisbusy.

Formoreinformationseethelow_msgq_watermark,high_watermark,andlow_watermarkoptions.

Notethatdistributionsocketswilldisabletheuseofhigh_msgq_watermarkandlow_msgq_watermark,andwillinsteadusethedistributionbufferbusylimitwhichisasimilarfeature.

{high_watermark,Size}(TCP/IPsockets)

ThesocketwillbesetintoabusystatewhentheamountofdataqueuedinternallybytheERTSsocketimplementationreachesthislimit.Defaultvalueusedis8kB.

Sendersofdatatothesocketwillbesuspendedifeitherthesocketmessagequeueisbusy,orthesocketitselfisbusy.

Formoreinformationseethelow_watermark,high_msgq_watermark,andlow_msqg_watermarkoptions.

{low_msgq_watermark,Size}(TCP/IPsockets)

Ifthesocketmessagequeueisinabusystate,thesocketmessagequeuewillbesetinanotbusystatewhentheamountofdataqueuedinthemessagequeuefallsbelowthislimit.NotethatthislimitonlyconcernsdatathathavenotyetreachedtheERTSinternal
socketimplementation.Defaultvalueusedis4kB.

Sendersthathavebeensuspendedduetoeitherabusymessagequeueorabusysocket,willberesumedwhenneitherthesocketmessagequeue,northesocketarebusy.

Formoreinformationseethehigh_msgq_watermark,high_watermark,andlow_watermarkoptions.

Notethatdistributionsocketswilldisabletheuseofhigh_msgq_watermarkandlow_msgq_watermark,andwillinsteadusethedistributionbufferbusylimitwhichisasimilarfeature.

{low_watermark,Size}(TCP/IPsockets)

Ifthesocketisinabusystate,thesocketwillbesetinanotbusystatewhentheamountofdataqueuedinternallybytheERTSsocketimplementationfallsbelowthislimit.Defaultvalueusedis4kB.

Sendersthathavebeensuspendedduetoeitherabusymessagequeueorabusysocket,willberesumedwhenneitherthesocketmessagequeue,northesocketarebusy.

Formoreinformationseethehigh_watermark,high_msgq_watermark,andlow_msgq_watermarkoptions.

这选项里面两对高低水位线的设置,以及delay_send选项,对发送缓冲区的影响很大。

gen_tcp:send的行为在之前的
博文中分析的比较到位了,建议同学先看看这篇文章垫底下。

我们知道每个erlang的进程都有个消息队列,其他进程要和他通信就需要透过发消息给他,把通讯的内容在消息里面交代清楚。进程消息队列里面一旦有消息,erlang的VM就会马上准备调度该进程来让进程执行,处理消息。这个进程的消息队列机制每个erlang入门的书籍都写的非常清楚。那么port呢?在Erlang的早期,Port是和进程一样的地位,接口,使用方式。Port作为Erlang对外的IO的执行单位,也拥有自己的消息队列,当进程把消息发送给port的时候,port通常也是把消息保存在消息队列中,然后VM就会调度这个port。等到port被调度执行的时候,port把队列里面的消息消耗掉,发送到网络或者执行相应IO的操作。port的调度和erlang的进程的调度是一样的,都非常讲究公平调度。

我们来考证下port和进程消息发送的接口。我们知道!符号是erlang:send的语法糖,当我们给Port!msg或者Pid!msg,最终都是调用erlang:send来发送消息。后面不知道为什么,erlang的设计者专门为port设计了port_command系列函数专门为port发送消息。

我们来考证下:

erlang:send->BIF_RETTYPEsend_3(BIF_ALIST_3)->do_send源码在bif.c中我们来看看:

viewsource

print?

Sint
do_send(Process*p,Etermto,Etermmsg,
int
suspend,Eterm*refp){
...
if
(is_internal_pid(to)){
...

}

else
if
(is_external_pid(to)){
...

return
remote_send(p,dep,to,to,msg,suspend);
}

else
if
(is_atom(to)){
...

}

else
if
(is_external_port(to)
&&(external_port_dist_entry(to)
==erts_this_dist_entry)){
erts_dsprintf_buf_t*dsbufp=erts_create_logger_dsbuf();
erts_dsprintf(dsbufp,
"Discardingmessage%Tfrom%Tto%Tinanold"
"incarnation(%d)ofthisnode(%d)\n"
,
msg,
p->common.id,
to,
external_port_creation(to),
erts_this_node->creation);
erts_send_error_to_logger(p->group_leader,dsbufp);
return
0;
}

else
if
(is_internal_port(to)){
...

pt=erts_port_lookup(portid,ERTS_PORT_SFLGS_INVALID_LOOKUP);
...
switch
(erts_port_command(p,ps_flags,pt,msg,refp)){
case
ERTS_PORT_OP_CALLER_EXIT:
...
}
诸位看到了吧!1.erlang:send接受二种对象:port和process2.发送到port的消息走的和erts_port_command是一样的路。

喝口水,保存体力,重新温习下二点:1.port有消息队列。2.port也是公平调度。

有了上面的知识铺垫,我们其实就比较好明白上面选项中的水位线做什么的。和每个消息队列一样,为了防止发送者和接收者能力的失衡,通常都会设置高低水位线来保护队列不至于太大把

系统撑爆。上面的{high_watermark,Size},{low_watermark,Size}就是干这个用的。

那port是如何保护自己的呢?答案是:

当消息量达到高水位线的时候,port进入busy状态,这时候会把发送进程suspend起来,等消息达到低水位线的时候,解除busy状态,同时让发送进程继续执行。

证明上面的说法,参考下port_command
文档:

port_command(Port,Data,OptionList)->boolean()

Types:

Port=port()|atom()

Data=iodata()

Option=force|nosuspend

OptionList=[Option]

Sendsdatatoaport.port_command(Port,Data,[])equalsport_command(Port,Data).

Iftheportcommandisabortedfalseisreturned;otherwise,trueisreturned.

Iftheportisbusy,thecallingprocesswillbesuspendeduntiltheportisnotbusyanymore.

CurrentlythefollowingOptionsarevalid:

force

Thecallingprocesswillnotbesuspendediftheportisbusy;instead,theportcommandisforcedthrough.Thecallwillfailwithanotsupexceptionifthedriverof

theportdoesnotsupportthis.FormoreinformationseetheERL_DRV_FLAG_SOFT_BUSYdriverflag.

nosuspend

Thecallingprocesswillnotbesuspendediftheportisbusy;instead,theportcommandisabortedandfalseisreturned.

那如何知道一个port进入busy状态,因为这个状态通常很严重,发送进程被挂起,会引起很大的latency.

幸亏erlang考虑周到,参看这里:

erlang:system_monitor(MonitorPid,Options)->MonSettings

busy_port

Ifaprocessinthesystemgetssuspendedbecauseitsendstoabusyport,amessage{monitor,SusPid,busy_port,Port}issenttoMonitorPid.SusPidisthepidthat

gotsuspendedwhensendingtoPort.

系统会很友好的把发生busy_port的进程发出来,我们就可以知道那个进程进程碰到高水位线被挂起了,方面我们后面调整水位线避免这种情况发生。

当用户调用gen_tcp:send要发送数据的时候最终都会调用port_command来具体执行,那么我们来看下它是如何运作的:

viewsource

print?

/*Commandshouldbeoftheform
**{PID,close}
**{PID,{command,io-list}}
**{PID,{connect,New_PID}}
*/
ErtsPortOpResult
erts_port_command(Process*c_p,

int
flags,
Port*port,
Etermcommand,
Eterm*refp)
{
...
if
(is_tuple_arity(command,2)){
Etermcntd;
tp=tuple_val(command);
cntd=tp[1];
if
(is_internal_pid(cntd)){
if
(tp[2]==am_close){
if
(!erts_port_synchronous_ops)
refp=NULL;
flags&=~ERTS_PORT_SIG_FLG_NOSUSPEND;
return
erts_port_exit(c_p,flags,port,cntd,am_normal,refp);
}
else
if
(is_tuple_arity(tp[2],
2)){
tp=tuple_val(tp[2]);
if
(tp[1]==am_command){
if
(!(flags&ERTS_PORT_SIG_FLG_NOSUSPEND)
&&!erts_port_synchronous_ops)
refp=NULL;
return
erts_port_output(c_p,flags,port,cntd,tp[2],refp);
}
else
if
(tp[1]
==am_connect){
if
(!erts_port_synchronous_ops)
refp=NULL;
flags&=~ERTS_PORT_SIG_FLG_NOSUSPEND;
return
erts_port_connect(c_p,flags,port,cntd,tp[2],refp);
}
}
}
}

}
...
}
ErtsPortOpResult
erts_port_output(Process*c_p,

int
flags,
Port*prt,
Etermfrom,
Etermlist,
Eterm*refp)
{
...
try_call=(force_immediate_call
/*crashdumping*/
||!(sched_flags&(invalid_flags
|ERTS_PTS_FLGS_FORCE_SCHEDULE_OP)));
if
(drv->outputv){
try_call_state.pre_chk_sched_flags=0;
/*alreadychecked*/
if
(force_immediate_call)
try_call_res=force_imm_drv_call(&try_call_state);
else
try_call_res=try_imm_drv_call(&try_call_state);
switch
(try_call_res){
case
ERTS_TRY_IMM_DRV_CALL_OK:
call_driver_outputv(flags&ERTS_PORT_SIG_FLG_BANG_OP,
c_p?c_p->common.id:ERTS_INVALID_PID,
from,
prt,
drv,
evp);
if
(force_immediate_call)
finalize_force_imm_drv_call(&try_call_state);
else
finalize_imm_drv_call(&try_call_state);
/*Fallthrough...*/
...
case
ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS:
sched_flags=try_call_state.sched_flags;
case
ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK:
/*Scheduleoutputv()callinstead...*/
break
;
...
}
static
ERTS_INLINE
void
call_driver_outputv(
int
bang_op,
Etermcaller,
Etermfrom,
Port*prt,
erts_driver_t*drv,
ErlIOVec*evp)
{
/*
*if(bang_op)
*wearepartofa"Prt!{From,{command,Data}}"operation
*else
*wearepartofacalltoport_command/[2,3]
*behaveaccordingly...
*/
if
(bang_op&&from!=ERTS_PORT_GET_CONNECTED(prt))
send_badsig(prt);
else
{
...
prt->caller=caller;
(*drv->outputv)((ErlDrvData)prt->drv_data,evp);
prt->caller=NIL;
prt->bytes_out+=size;
erts_smp_atomic_add_nob(&erts_bytes_out,size);
}

...
}
从源码分析来看,我们看到port_command如果看到port要执行command命令就会调用erts_port_output,而后者会做复杂的判断,来决定如何调用call_driver_outputv。

这个复杂的流程就是msgq_watermark水位线发挥作用地方,我们暂时不分析,等后面讲msgq_watermark的时候一起。

目前只需要知道最终gen_tcp:send发松数据会调用portdriver的outputv回调函数输出就好了。

接着源码分析:

viewsource

print?

static
struct
erl_drv_entrytcp_inet_driver_entry
=
{
tcp_inet_init,
/*inet_initwilladdthisdriver!!*/
tcp_inet_start,
tcp_inet_stop,
tcp_inet_command,
tcp_inet_drv_input,
tcp_inet_drv_output,
"tcp_inet"
,
NULL,
NULL,
tcp_inet_ctl,
tcp_inet_timeout,
tcp_inet_commandv,
...
}
static
void
tcp_inet_commandv(ErlDrvData
e,ErlIOVec*ev)
{
tcp_descriptor*desc=(tcp_descriptor*)e;
desc->inet.caller=driver_caller(desc->inet.port);
DEBUGF((
"tcp_inet_commanv(%ld){s=%d\r\n"
,
(
long
)desc->inet.port,desc->inet.s));
if
(!IS_CONNECTED(INETP(desc))){
if
(desc->tcp_add_flags&TCP_ADDF_DELAYED_CLOSE_SEND){
desc->tcp_add_flags&=~TCP_ADDF_DELAYED_CLOSE_SEND;
inet_reply_error_am(INETP(desc),am_closed);
}
else
inet_reply_error(INETP(desc),ENOTCONN);
}

else
if
(tcp_sendv(desc,
ev)==0)
inet_reply_ok(INETP(desc));
DEBUGF((
"tcp_inet_commandv(%ld)}\r\n"
,(
long
)desc->inet.port));
}
对于inet_drv(gen_tcp)的例子来讲就是会调用tcp_sendv来把消息转变成网络封包发送出去。

好吧,喝口水,休息下。这里我们梳理下我们的数据路线:

gen_tcp:send->port_command->erts_port_output->call_driver_outputv->tcp_inet_commandv->tcp_sendv

大家要牢记在心。

继续接着我们参照源码来分析下水位线的实现:

viewsource

print?

/*inet_drv.c*/
#defineINET_LOPT_TCP_HIWTRMRK27/*setlocalhighwatermark*/
#defineINET_LOPT_TCP_LOWTRMRK28/*setlocallowwatermark*/
#defineINET_HIGH_WATERMARK(1024*8)/*8kpendinghigh=>busy*/
#defineINET_LOW_WATERMARK(1024*4)/*4kpending=>allowmore*/
typedef
struct
{
...
int
high;
/*highwatermark*/
int
low;
/*lowwatermark*/
...
}tcp_descriptor;
static
ErlDrvDatatcp_inet_start(ErlDrvPortport,
char
*args)
{
...
desc->high=INET_HIGH_WATERMARK;
desc->low=INET_LOW_WATERMARK;
...
}
static
int
inet_set_opts(inet_descriptor*
desc,
char
*ptr,
int
len)
{
...
case
INET_LOPT_TCP_HIWTRMRK:
if
(desc->stype==SOCK_STREAM){
tcp_descriptor*tdesc=(tcp_descriptor*)desc;
if
(ival<0)ival=0;
if
(tdesc->low>ival)
tdesc->low=ival;
tdesc->high=ival;
}
continue
;
case
INET_LOPT_TCP_LOWTRMRK:
if
(desc->stype==SOCK_STREAM){
tcp_descriptor*tdesc=(tcp_descriptor*)desc;
if
(ival<0)ival=0;
if
(tdesc->high<ival)
tdesc->high=ival;
tdesc->low=ival;
}
continue
;
...
}
/*Copyadescriptor,bycreatinganewportwithsamesettings
*asthedescriptordesc.
*returnNULLonerror(SYSTEM_LIMITnoportsavail)
*/
static
tcp_descriptor*tcp_inet_copy(tcp_descriptor*desc,SOCKETs,
ErlDrvTermDataowner,
int
*err)
{
...
copy_desc->high=desc->high;
copy_desc->low=desc->low;
...
}
static
int
tcp_sendv(tcp_descriptor*
desc,ErlIOVec*ev)
{
...
if
((sz=driver_sizeq(ix))>0){
driver_enqv(ix,ev,0);
if
(sz+ev->size>=desc->high){
DEBUGF((
"tcp_sendv(%ld):s=%d,senderforcedbusy\r\n"
,
(
long
)desc->inet.port,desc->inet.s));
desc->inet.state|=INET_F_BUSY;
/*markforlow-watermark*/
desc->inet.busy_caller=desc->inet.caller;
set_busy_port(desc->inet.port,1);
if
(desc->send_timeout!=INET_INFINITY){
desc->busy_on_send=1;
driver_set_timer(desc->inet.port,desc->send_timeout);
}
return
1;
}
...
}
/*socketreadyforouput:
**1.INET_STATE_CONNECTING=>nonblockconnect?
**2.INET_STATE_CONNECTED=>writeoutput
*/
static
int
tcp_inet_output(tcp_descriptor*
desc,
HANDLE
event)
{
...
if
(driver_deq(ix,n)<=desc->low){
if
(IS_BUSY(INETP(desc))){
desc->inet.caller=desc->inet.busy_caller;
desc->inet.state&=~INET_F_BUSY;
set_busy_port(desc->inet.port,0);
/*ifwehaveatimerthencancelandsendoktoclient*/
if
(desc->busy_on_send){
driver_cancel_timer(desc->inet.port);
desc->busy_on_send=0;
}
inet_reply_ok(INETP(desc));
}
}
...
}
从源码我们可以分析出几点:

1.水位线设置是可以继承的。

2.高低水位线默认是8K/4K.

3.进入高水位后,port进入busy状态。

4.当消息消耗到小于低水位线,busy解除。

这个水位线的说明和文档解释的一样,接下来我们稍微看看delay_send的实现原理,还是继续上源码:

viewsource

print?

/*TCPadditionalflags*/
#defineTCP_ADDF_DELAY_SEND1
static
int
inet_set_opts(inet_descriptor*
desc,
char
*ptr,
int
len)
{
...
case
INET_LOPT_TCP_DELAY_SEND:
if
(desc->stype==SOCK_STREAM){
tcp_descriptor*tdesc=(tcp_descriptor*)desc;
if
(ival)
tdesc->tcp_add_flags|=TCP_ADDF_DELAY_SEND;
else
tdesc->tcp_add_flags&=~TCP_ADDF_DELAY_SEND;
}
continue
;
...
}
/*
**Sendnon-blockingvectordata
*/
static
int
tcp_sendv(tcp_descriptor*
desc,ErlIOVec*ev)
{
...
if
(INETP(desc)->is_ignored){
INETP(desc)->is_ignored|=INET_IGNORE_WRITE;
n=0;
}
else
if
(desc->tcp_add_flags
&TCP_ADDF_DELAY_SEND){
n=0;
}
else
if
(IS_SOCKET_ERROR(sock_sendv(desc->inet.s,
ev->iov,
vsize,&n,0))){
if
((sock_errno()!=ERRNO_BLOCK)&&(sock_errno()!=EINTR)){
int
err=sock_errno();
DEBUGF((
"tcp_sendv(%ld):s=%d,"
"sock_sendv(size=2)errno=%d\r\n"
,
(
long
)desc->inet.port,desc->inet.s,err));
return
tcp_send_error(desc,err);
}
n=0;
}
else
{
DEBUGF((
"tcp_sendv(%ld):s=%d,onlysent"
LLU
"/%dof"
LLU
"/%d
bytes/items\r\n"
,
(
long
)desc->inet.port,desc->inet.s,
(llu_t)n,vsize,(llu_t)ev->size,ev->vsize));
}
DEBUGF((
"tcp_sendv(%ld):s=%d,Sendfailed,queuing\r\n"
,
(
long
)desc->inet.port,desc->inet.s));
driver_enqv(ix,ev,n);
if
(!INETP(desc)->is_ignored)
sock_select(INETP(desc),(FD_WRITE|FD_CLOSE),1);
...
}
static
void
tcp_inet_drv_output(ErlDrvData
data,ErlDrvEventevent)
{
(
void
)tcp_inet_output((tcp_descriptor*)data,(
HANDLE
)event);
}
/*socketreadyforouput:
**1.INET_STATE_CONNECTING=>nonblockconnect?
**2.INET_STATE_CONNECTED=>writeoutput
*/
static
int
tcp_inet_output(tcp_descriptor*
desc,
HANDLE
event)
{
...
else
if
(IS_CONNECTED(INETP(desc)))
{
for
(;;){
int
vsize;
ssize_tn;
SysIOVec*iov;
if
((iov=driver_peekq(ix,&vsize))==NULL){
sock_select(INETP(desc),FD_WRITE,0);
send_empty_out_q_msgs(INETP(desc));
goto
done;
}
vsize=vsize>MAX_VSIZE?MAX_VSIZE:vsize;
DEBUGF((
"tcp_inet_output(%ld):s=%d,Abouttosend%ditems\r\n"
,
(
long
)desc->inet.port,desc->inet.s,vsize));
if
(IS_SOCKET_ERROR(sock_sendv(desc->inet.s,iov,vsize,&n,0))){
if
((sock_errno()!=ERRNO_BLOCK)&&(sock_errno()!=EINTR)){
DEBUGF((
"tcp_inet_output(%ld):sock_sendv(%d)errno=%d\r\n"
,
(
long
)desc->inet.port,vsize,sock_errno()));
ret=tcp_send_error(desc,sock_errno());
goto
done;
}
goto
done;
}
if
(driver_deq(ix,n)<=desc->low){
if
(IS_BUSY(INETP(desc))){
desc->inet.caller=desc->inet.busy_caller;
desc->inet.state&=~INET_F_BUSY;
set_busy_port(desc->inet.port,0);
/*ifwehaveatimerthencancelandsendoktoclient*/
if
(desc->busy_on_send){
driver_cancel_timer(desc->inet.port);
desc->busy_on_send=0;
}
inet_reply_ok(INETP(desc));
}
}
...
}
从源码分析我们可以知道当tcp_sendv发送数据前看下:

1.delay_send标志是否设置,如果设置就不尝试调用sock_sendv发送。

2.调用sock_sendv发送网络数据,剩下的部分数据保存到驱动的队列去。

3.如果队列有数据的话,就把把epoll的写事件挂载上。

4.后续epoll会通知socket可写的时候,会调用tcp_inet_drv_output

5.tcp_inet_drv_output->tcp_inet_output继续把之前在队列里面的数据透过sock_sendv再次发送到网络

步骤3和4之间需要时间,依赖于epoll的写事件发生的以及port调度的时间点。

所以简单的说:delay_send就是在第一阶段不尝试发送数据,直接把数据推入port的消息队列去,等后面epoll说socket可写的时候一起发送出去。

这种做法的好处是gen_tcp:send马上就可以返回,因为sock_send通常要耗费几十us的时间,可用在对发送的latency很敏感的场合。

到这里为止,我们清楚的分析了数据是如何在port的各个链条里面流动.

再回顾下:当gen_tcp:send数据无法离开通过网络发送出去的时候,会暂时保留在port的消息队列里面,当消息队列满(到高水位线)的时候,port就会busy,抑制发送者推送更多的数据。当epoll探测到socket可写的时候,vm会调用tcp_inet_output把消息队列里面的数据,拉到网络去,这个过程中,队列里面的数据会越来越少,少到低水位线的时候,解除busy,好让发送者发送更多的数据。

再喝口水,我们接着分析msgq_watermark.这又是很大的一个坑,大家要坚持住,好像又是一顿分析!

首先,还是普及下知识:

从R16B的发布note里面我们摘抄和port相关的重大变化就是:

—Latencyofsignalssentfromprocessestoports—Signals

fromprocessestoportswherepreviouslyalwaysdelivered

immediately.Thiskeptlatencyforsuchcommunicationtoa

minimum,butitcouldcauselockcontentionwhichwasvery

expensiveforthesystemasawhole.Inordertokeepthis

latencylowalsointhefuture,mostsignalsfromprocesses

toportsarebydefaultstilldeliveredimmediatelyaslong

asnoconflictsoccur.Suchconflictsincludenotbeingable

toacquiretheportlock,butalsoincludeotherconflicts.

Whenaconflictoccur,thesignalwillbescheduledfor

deliveryatalatertime.Ascheduledsignaldeliverymay

causeahigherlatencyforthisspecificcommunication,but

improvestheoverallperformanceofthesystemsinceit

reducelockcontentionbetweenschedulers.Thedefault

behaviorofonlyschedulingdeliveryofthesesignalson

conflictcanbechangedbypassingthe+sppcommandlineflag

toerl(1).Thebehaviorcanalsobechangedonportbasis

usingtheparallelismoptionoftheopen_port/2BIF.

简单的说,过去进程给port发送数据的时候,都是立即锁定port,调用call_driver_outputv来消耗数据,干完活解锁。这样对于单个请求来讲,latency最低。但是如果系统有多个进程给同一个port发送数据,锁的碰撞率就会很高,势必影响到port的吞吐量。所以新的VM引入了port的parallelism这个概念,也就是说当锁冲突的时候,不是在那里傻傻的等,而是把要发送的数据引用计数后,通知port调度器在合适的时间,也就是说在

port不忙的时候,择机call_driver_outputv来消耗数据,这样就会大大提高吞吐量。

考证下代码:

viewsource

print?

int
erts_port_parallelism=0;
void
erl_start(
int
argc,
char
**argv)
{
...
else
if
(has_prefix(
"pp"
,
sub_param)){
arg=get_arg(sub_param+2,argv[i+1],&i);
if
(sys_strcmp(arg,
"true"
)==0)
erts_port_parallelism=1;
else
if
(sys_strcmp(arg,
"false"
)==0)
erts_port_parallelism=0;
...
}
static
Port*
open_port(Process*p,Etermname,Etermsettings,
int
*err_typep,
int
*err_nump)
{
...
opts.parallelism=erts_port_parallelism;
...
}

else
if
(option==am_parallelism){
if
(*tp==am_true)
opts.parallelism=1;
else
if
(*tp
==am_false)
opts.parallelism=0;
else
...
}
分析了源码可以知道:

1.port并行发送的行为为了和过去的版本兼容默认是关闭的,但是可以用+spp全局打开

2.在open_port的时候通过参数{parallelism,true}来个别打开这个选项。

{parallelism,Boolean}

Setschedulerhintforportparallelism.Ifsettotrue,theVMwillscheduleporttaskswhenitbythiscanimprovetheparallelisminthesystem.Ifsettofalse,theVMwilltrytoperformporttasksimmediatelyandbythisimprovingthelatencyattheexpense
ofparallelism.Thedefaultcanbesetonsystemstartupbypassingthe+sppcommandlineargumenttoerl(1).

那开了这个选项后,对gen_tcp(port)有什么影响呢?最明显的区别是过去call_driver_outputv是排队执行的,谁先拿到锁,谁先执行。那么在driver_outputv里面如果消息消耗不了,有可能会把数据加到port的消息队列去。这个我们前面分析过,每个消息队列有高低水位线来控制,总能保证消息在一定的量。但是parallelism了后,当port在忙着做call_driver_outputv的时候,其他进程就不等了,直接把消息加引用计数保存到一个地方去,然后请求port调度器稍后调度执行这个消息,它就立即返回了。

各位看官看出来问题了吗?如果不做控制的话,每个进程都会积累很多消息,都等着port调度器后续执行。所以port调度器就有义务来为这部分消息做水位线的控制,这就很自然的引入了msgq_watermark选项。

是不是有点复杂?更复杂的还有呢?引入msgq_watermark项后,那这些发送进程就可能被挂起,那如何唤醒它呢?我们先回答这个问题,上代码:

viewsource

print?

%%erlang.erl
port_command(
Port
,
Data
)
->
case
case
erts_internal:port_command
(
Port
,
Data
,
[])
of
Ref
when
erlang:is_reference
(
Ref
)
->
receive
{
Ref
,
Res
}
->
Res
end
;
Res
->
Res
end
of
true->true;
Error
->
erlang:error
(
Error
,[
Port
,
Data
])
end
.
viewsource

print?

/*
*erts_internal:port_command/3isusedbythe
*erlang:port_command/2anderlang:port_command/3
*BIFs.
*/
BIF_RETTYPEerts_internal_port_command_3(BIF_ALIST_3)
{
...
prt=lookup_port(BIF_P,BIF_ARG_1);
if
(!prt)
BIF_RET(am_badarg);
...
switch
(erts_port_output(BIF_P,flags,prt,prt->common.id,BIF_ARG_2,&ref)){
case
ERTS_PORT_OP_CALLER_EXIT:
case
ERTS_PORT_OP_BADARG:
case
ERTS_PORT_OP_DROPPED:
ERTS_BIF_PREP_RET(res,am_badarg);
break
;
case
ERTS_PORT_OP_BUSY:
ASSERT(!(flags&ERTS_PORT_SIG_FLG_FORCE));
if
(flags&ERTS_PORT_SIG_FLG_NOSUSPEND)
ERTS_BIF_PREP_RET(res,am_false);
else
{
erts_suspend(BIF_P,ERTS_PROC_LOCK_MAIN,prt);
ERTS_BIF_PREP_YIELD3(res,bif_export[BIF_erts_internal_port_command_3],
BIF_P,BIF_ARG_1,BIF_ARG_2,BIF_ARG_3);
}
break
;
case
ERTS_PORT_OP_BUSY_SCHEDULED:
ASSERT(!(flags&ERTS_PORT_SIG_FLG_FORCE));
/*Fallthrough...*/
case
ERTS_PORT_OP_SCHEDULED:
ASSERT(is_internal_ref(ref));
ERTS_BIF_PREP_RET(res,ref);
break
;
}
调用流程是:erlang:port_command->erts_internal:port_command->erts_internal_port_command_3

从erts_internal_port_command_3代码可以看出来,当我们调用发送数据的时候,遇到ERTS_PORT_OP_BUSY时候,系统会被挂起,等被唤醒的时候,发生trap,erlangvm会在合适的时间再重新调用erts_internal_port_command_3完成之前未完成的事情。

温习下:从port_command这层来讲,parallelism执行的时候,如果底层的port出现busy,调用进程会经历挂起,唤醒,重新调用这套流程。

现在的问题是什么时候会出现ERTS_PORT_OP_BUSY?

从前面的分析,我们知道port_command->erts_port_output,那么我们来看下什么情况下会返回ERTS_PORT_OP_BUSY:
viewsource

print?

ErtsPortOpResult
erts_port_output(Process*c_p,

int
flags,
Port*prt,
Etermfrom,
Etermlist,
Eterm*refp)
{
...
/*
*Assumescallerhavecheckedthatportisvalid...
*/
sched_flags=erts_smp_atomic32_read_nob(&prt->sched.flags);
if
(sched_flags&(busy_flgs|ERTS_PTS_FLG_EXIT))
return
((sched_flags&ERTS_PTS_FLG_EXIT)
?ERTS_PORT_OP_DROPPED
:ERTS_PORT_OP_BUSY);
...
}
void
set_busy_port(ErlDrvPortdprt,

int
on)
{
...
if
(on){
flags=erts_smp_atomic32_read_bor_acqb(&prt->sched.flags,
ERTS_PTS_FLG_BUSY_PORT);
if
(flags&ERTS_PTS_FLG_BUSY_PORT)
return
;
/*Alreadybusy
*/
...
}
代码很清楚的说明了,一旦到达高水位线调用set_busy_port就会导致后续的erts_port_output调用直接返回ERTS_PORT_OP_BUSY,简单干脆。

再喝口水,接着分析,现在到了代码验证的阶段了。

重新温习下前面我们分析过的:

在parallelism模式下,当port在忙着做call_driver_outputv的时候,其他进程就不等了,直接把消息加引用计数保存到一个地方去,然后请求port调度器稍后调度执行这个消息,它就立即返回了。

viewsource

print?

ErtsPortOpResult
erts_port_output(Process*c_p,

int
flags,
Port*prt,
Etermfrom,
Etermlist,
Eterm*refp)
{
...
if
(drv->outputv){
...
}
...
if
(!try_call){
int
i;
/*Needtoincreaserefconallbinaries*/
for
(i=1;i<evp->vsize;i++)
if
(bvp[i])
driver_binary_inc_refc(bvp[i]);
}
else
{
int
i;
ErlIOVec*new_evp;
ErtsTryImmDrvCallResulttry_call_res;
ErtsTryImmDrvCallStatetry_call_state
=ERTS_INIT_TRY_IMM_DRV_CALL_STATE(
c_p,
prt,
ERTS_PORT_SFLGS_INVALID_LOOKUP,
invalid_flags,
!refp,
am_command);
try_call_state.pre_chk_sched_flags=0;
/*alreadychecked*/
if
(force_immediate_call)
try_call_res=force_imm_drv_call(&try_call_state);
else
try_call_res=try_imm_drv_call(&try_call_state);
switch
(try_call_res){
case
ERTS_TRY_IMM_DRV_CALL_OK:
call_driver_outputv(flags&ERTS_PORT_SIG_FLG_BANG_OP,
c_p?c_p->common.id:ERTS_INVALID_PID,
from,
prt,
drv,
evp);
if
(force_immediate_call)
finalize_force_imm_drv_call(&try_call_state);
else
finalize_imm_drv_call(&try_call_state);
/*Fallthrough...*/
case
ERTS_TRY_IMM_DRV_CALL_INVALID_PORT:
.....
case
ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS:
sched_flags=try_call_state.sched_flags;
case
ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK:
/*Scheduleoutputv()callinstead...*/
break
;
}
sigdp=erts_port_task_alloc_p2p_sig_data();
sigdp->flags=ERTS_P2P_SIG_TYPE_OUTPUTV;
sigdp->u.outputv.from=from;
sigdp->u.outputv.evp=evp;
sigdp->u.outputv.cbinp=cbin;
port_sig_callback=port_sig_outputv;
}

res=erts_schedule_proc2port_signal(c_p,
prt,
c_p?c_p->common.id:ERTS_INVALID_PID,
refp,
sigdp,
task_flags,
port_sig_callback);
if
(res!=ERTS_PORT_OP_SCHEDULED){
if
(drv->outputv)
cleanup_scheduled_outputv(evp,cbin);
else
cleanup_scheduled_output(buf);
return
res;
}

}
static
int
port_sig_outputv(Port*prt,erts_aint32_tstate,
int
op,ErtsProc2PortSigData*sigdp)
{
Etermreply;
switch
(op){
case
ERTS_PROC2PORT_SIG_EXEC:
/*Executionofascheduledoutputv()call*/
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
if
(state&ERTS_PORT_SFLGS_INVALID_LOOKUP)
reply=am_badarg;
else
{
call_driver_outputv(sigdp->flags&ERTS_P2P_SIG_DATA_FLG_BANG_OP,
sigdp->caller,
sigdp->u.outputv.from,
prt,
prt->drv_ptr,
sigdp->u.outputv.evp);
reply=am_true;
}
break
;
...
}
ErtsPortOpResult
erts_schedule_proc2port_signal(Process*c_p,
Port*prt,
Etermcaller,
Eterm*refp,
ErtsProc2PortSigData*sigdp,
int
task_flags,
ErtsProc2PortSigCallbackcallback)
{
...
/*Scheduleportclosecallforlaterexecution...*/
sched_res=erts_port_task_schedule(prt->common.id,
NULL,
ERTS_PORT_TASK_PROC_SIG,
sigdp,
callback,
task_flags);
...
}
/*
*Scheduleatask.
*/
int
erts_port_task_schedule(Etermid,

ErtsPortTaskHandle*pthp,
ErtsPortTaskTypetype,
...)
{
...
if
(!enqueue_task(pp,ptp,sigdp,ns_pthlp,&act)){
reset_handle(ptp);
if
(ns_pthlp&&!(act&ERTS_PTS_FLG_EXIT))
goto
abort_nosuspend;
else
goto
fail;
}

...
}
static
ERTS_INLINE
int
enqueue_task(Port*pp,
ErtsPortTask*ptp,
ErtsProc2PortSigData*sigdp,
ErtsPortTaskHandleList*ns_pthlp,
erts_aint32_t*flagsp)
{
...
pp->sched.taskq.in.last=ptp;
flags=enqueue_proc2port_data(pp,sigdp,flags);
res=1;
...
}
从上面的代码我们知道,当port不忙的时候,就会直接调用call_driver_outputv干活,否则就会把消息延迟放到port调度去。这个环节的调用流程是

erts_port_output->erts_schedule_proc2port_signal->erts_port_task_schedule->enqueue_task->enqueue_proc2port_data.

enqueue_proc2port_data这个函数顾名思义就是把进程发送到port的数据保存到port_task的某个队列去。那么在这个地方做水位线检查是最合适的。

好吧,我们的msg_watermark相关的东西要出场了。

viewsource

print?

/*erl_driver.h*/
#defineERL_DRV_BUSY_MSGQ_LIM_MAX(ERL_DRV_BUSY_MSGQ_DISABLED-1)
#defineERL_DRV_BUSY_MSGQ_LIM_MIN((ErlDrvSizeT)1)
/*inet_drv.c*/
#defineINET_HIGH_MSGQ_WATERMARK(1024*8)/*8kpendinghigh=>busy*/
#defineINET_LOW_MSGQ_WATERMARK(1024*4)/*4kpending=>allowmore*/
static
ErlDrvDatatcp_inet_start(ErlDrvPortport,
char
*args)
{
...
q_high=INET_HIGH_MSGQ_WATERMARK;
q_low=INET_LOW_MSGQ_WATERMARK;
...
if
(q_high<ERL_DRV_BUSY_MSGQ_LIM_MIN)
q_high=ERL_DRV_BUSY_MSGQ_LIM_MIN;
else
if
(q_high
>ERL_DRV_BUSY_MSGQ_LIM_MAX)
q_high=ERL_DRV_BUSY_MSGQ_LIM_MAX;
erl_drv_busy_msgq_limits(port,&q_low,&q_high);
...
}
/*erl_port_task.c*/
/*
*erl_drv_busy_msgq_limits()iscalledbydriverseitherreadingor
*writingthelimits.
*
*Alimitofzeroisinterpretedasareadonlyrequest(usinga
*limitofzerowouldnotbeuseful).Othervaluesareinterpreted
*asawrite-readrequest.
*/
void
erl_drv_busy_msgq_limits(ErlDrvPortdport,ErlDrvSizeT*lowp,ErlDrvSizeT*highp)
{
...
Port*pp=erts_drvport2port(dport,NULL);
ErtsPortTaskBusyPortQ*bpq=pp->sched.taskq.bpq;
...
if
(!low)
low=(ErlDrvSizeT)erts_smp_atomic_read_nob(&bpq->low);
else
{
if
(bpq->high<low)
bpq->high=low;
erts_smp_atomic_set_relb(&bpq->low,(erts_aint_t)low);
written=1;
}
if
(!high)
high=bpq->high;
else
{
if
(low>high){
low=high;
erts_smp_atomic_set_relb(&bpq->low,(erts_aint_t)low);
}
bpq->high=high;
written=1;
}
...
}
从源码分析来看:

1.MSGQ高低水位线也是8/4K,最小值是1,高不封顶。

2.它影响的是每个port调度器认为队列的bpq->low和bpq->high

有了这个知识,我们就很容易分析enqueue_proc2port_data是如何在数据入队列时限定高低水位线的,同时一定会有个配套的dequeued_proc2port_data在出数据队列的时候解除busy状态的。

继续代码之旅:

viewsource

print?

static
ERTS_INLINEerts_aint32_t
enqueue_proc2port_data(Port*pp,

ErtsProc2PortSigData*sigdp,
erts_aint32_tflags)
{
ErtsPortTaskBusyPortQ*bpq=pp->sched.taskq.bpq;
...
if
(!(flags&ERTS_PTS_FLG_BUSY_PORT_Q)&&qsz>bpq->high){
flags=erts_smp_atomic32_read_bor_acqb(&pp->sched.flags,
ERTS_PTS_FLG_BUSY_PORT_Q);
flags|=ERTS_PTS_FLG_BUSY_PORT_Q;
qsz=(ErlDrvSizeT)erts_smp_atomic_read_acqb(&bpq->size);
if
(qsz<(ErlDrvSizeT)erts_smp_atomic_read_nob(&bpq->low)){
flags=(erts_smp_atomic32_read_bor_relb(
&pp->sched.flags,
ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q));
flags|=ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
}
}
...
}
static
ERTS_INLINE
void
dequeued_proc2port_data(Port*pp,ErlDrvSizeTsize)
{
ErtsPortTaskBusyPortQ*bpq;
erts_aint32_tflags;
ErlDrvSizeTqsz;
ASSERT(pp->sched.taskq.bpq);
if
(size==0)
return
;
bpq=pp->sched.taskq.bpq;
qsz=(ErlDrvSizeT)erts_smp_atomic_add_read_acqb(&bpq->size,
(erts_aint_t)-size);
ASSERT(qsz+size>qsz);
flags=erts_smp_atomic32_read_nob(&pp->sched.flags);
if
(!(flags&ERTS_PTS_FLG_BUSY_PORT_Q))
return
;
if
(qsz<(ErlDrvSizeT)erts_smp_atomic_read_acqb(&bpq->low))
check_unset_busy_port_q(pp,flags,bpq);
}
/*
*Busyportqueuemanagement
*/
static
erts_aint32_t
check_unset_busy_port_q(Port*pp,

erts_aint32_tflags,
ErtsPortTaskBusyPortQ*bpq)
{
ErlDrvSizeTqsize,low;
int
resume_procs=0;
ASSERT(bpq);
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
erts_port_task_sched_lock(&pp->sched);
qsize=(ErlDrvSizeT)erts_smp_atomic_read_nob(&bpq->size);
low=(ErlDrvSizeT)erts_smp_atomic_read_nob(&bpq->low);
if
(qsize<low){
erts_aint32_tmask=~(ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
|ERTS_PTS_FLG_BUSY_PORT_Q);
flags=erts_smp_atomic32_read_band_relb(&pp->sched.flags,mask);
if
((flags&ERTS_PTS_FLGS_BUSY)==ERTS_PTS_FLG_BUSY_PORT_Q)
resume_procs=1;
}

else
if
(flags
&ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q){
flags=erts_smp_atomic32_read_band_relb(&pp->sched.flags,
~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
flags&=~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
}

erts_port_task_sched_unlock(&pp->sched);
if
(resume_procs)
erts_port_resume_procs(pp);
return
flags;
}
代码很明白的说:

任务入队列的时候超过水位线会设置ERTS_PTS_FLG_BUSY_PORT_Q状态,出队列的时候发现低于水位线的时候,会调用check_unset_busy_port_q来唤醒被挂起的发送进程。

到现在任务入队列的流程我们基本明白了,但是何时会调用dequeued_proc2port_data出队列呢?

答案是erts_port_task_execute。还记得前面分析的时候说:这些任务会入队列,port调度器会在适当的时候执行这些任务。

继续看代码:

viewsource

print?

int
erts_port_task_execute(ErtsRunQueue*runq,Port**curr_port_pp)
{
...
pp=pop_port(runq);
if
(!pp){
res=0;
goto
done;
}

...
while
(1){
erts_aint32_ttask_state;
ErtsPortTask*ptp;
ptp=select_task_for_exec(pp,&execq,&processing_busy_q);
if
(!ptp)
break
;
...
switch
(ptp->type){
case
ERTS_PORT_TASK_PROC_SIG:{
ErtsProc2PortSigData*sigdp=&ptp->u.alive.td.psig.data;
ASSERT((state&ERTS_PORT_SFLGS_DEAD)==0);
if
(!pp->sched.taskq.bpq)
reds+=ptp->u.alive.td.psig.callback(pp,
state,
ERTS_PROC2PORT_SIG_EXEC,
sigdp);
else
{
ErlDrvSizeTsize=erts_proc2port_sig_command_data_size(sigdp);
reds+=ptp->u.alive.td.psig.callback(pp,
state,
ERTS_PROC2PORT_SIG_EXEC,
sigdp);
dequeued_proc2port_data(pp,size);
}
break
;
}
...
}
任务调度器发现发现是ERTS_PORT_TASK_PROC_SIG类型的任务,就会调用该任务的callback,然后把该任务出队列。还记得那个任务callback叫什么吗?好记性,就是port_sig_outputv。他会真正执行把数据拖到网络的具体事情。

山路十八弯,好不容易才把整个流程给串起来,是不是很复杂!确实很复杂,前段时间感冒的时候,躺在床上想了好几天,才把事情的来龙去脉差不多搞清楚,做高性能服务器的不容易呀,来点掌声,为我们自己鼓励下!

总结:这个水位线官方文档写的不清不楚,还是源码靠谱!watermark和msgq_watermark的本质差别就是一个是控制port消息队列的水位线,一个控制进程在给并行port发送数据时,条件暂时无法满足时候,数据暂缓到port调度器时候,port调度器队列的水位线,也可以理解为inflight的数据的水位线。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  gen_tcp