SequoiaDB 系列之六 :源码分析之coord节点
2015-12-10 14:32
435 查看
好久不见。
在上一篇SequoiaDB 系列之五 :源码分析之main函数,有讲述进程开始运行时,会根据自身的角色,来初始化不同的CB(控制块,control block)。
在之前的一篇SequoiaDB 系列之四 :架构简析中,我们简单过了一遍SequoiaDB的架构和各个节点的角色。
今天来看看SequoiaDB的coord角色。
首先,需要有个大致的轮廓:
coord节点主要承担代理的角色。作为SequoiaDB集群对外的接头人,它转发消息给其它节点,组合(combine)不同节点返回的数据,把结果返回给client。
catalog节点主要存储meta数据,比如集群中有哪些组,每个组的状态;每个组上有哪些节点,有哪些集合(Collection),哪些集合是主子表等等。
data节点主要是管理存储的数据,它接受coord转发过来的CRUD等操作,并记录同步日志(最终一致性)。
在注册CB的函数中:
coord注册这几个CB之后,就开始注册和启动服务:
具体函数在_KRCB::init()中,不再表述。_KRCB::init()会根据节点的角色,启动不同的服务。
客户端连接到coord,coord便会启动一个线程,为该连接服务。
服务线程监听到client的连接,启动一个EDU_TYPE_AGENT类型的线程,单独为client服务。
下面讲述coord节点的最主要的功能——消息转发
coord的启动初,会初始化一些必要的全局变量。在SequoiaDB中,会初始化很多command,拿创建集合空间来说,在文件SequoiaDB/engine/rtn/rtnCoord.cpp 中
嗯,上面的代码有点MFC中消息映射的感觉。
来看看 RTN_COORD_CMD_ADD 宏:
宏主要是new一个对象,再把对象插入到_cmdMap中,这样在程序初始化时候,便会有一系列的command对象存储在_cmdMap中。
另外,对SequoiaDB而言,所有的command操作,都是在查询操作的基础上做的,服务端用一些方法区别开是真正的查询,还是command。SequoiaDB的命令,是以$开头的字符串。
前提简述完毕,现在假设client连接上了coord,coord也创建了一个线程,为这个client服务。
_processMsg方法:
coord节点上的processor,是pmdCoordProcessor的一个实例,是用来做数据转发的,不同于真正做数据处理的pmdDataProcessor。
pmdCoordProcessor的处理过程
以创建集合空间的command为例,看看 rtnCoordCMDListCollectionSpace 的 execute做了什么:
该函数的主体,构造了另外一个数据包,然后执行 executeOnCataGroup ( (CHAR*)pCreateReq, pRouteAgent, cb, NULL, NULL ) ;这一句上。跟进这一函数:
rtnCoordSendRequestToPrimary就不再详细跟进描述了,根据函数名,大致就可以了解一个大概,是把数据发送到指定组(此处是catalog组)的主节点。
coord上的其它command或者operator也是采用类似的方法来转发消息给其它节点,就不再一一赘述了。
综合全文的讲述,coord处理client请求的流程
发送请求给coord节点
coord先揪出这个请求是做什么
交给对应的command处理
查询(本地缓存或者远程获取的)catalog信息
把消息转成节点间的内部消息
转发给目标节点
然后等待返回数据
再把返回数据交给处理线程
线程把返回结果发送给client
=====>THE END<=====
在上一篇SequoiaDB 系列之五 :源码分析之main函数,有讲述进程开始运行时,会根据自身的角色,来初始化不同的CB(控制块,control block)。
在之前的一篇SequoiaDB 系列之四 :架构简析中,我们简单过了一遍SequoiaDB的架构和各个节点的角色。
今天来看看SequoiaDB的coord角色。
首先,需要有个大致的轮廓:
coord节点主要承担代理的角色。作为SequoiaDB集群对外的接头人,它转发消息给其它节点,组合(combine)不同节点返回的数据,把结果返回给client。
catalog节点主要存储meta数据,比如集群中有哪些组,每个组的状态;每个组上有哪些节点,有哪些集合(Collection),哪些集合是主子表等等。
data节点主要是管理存储的数据,它接受coord转发过来的CRUD等操作,并记录同步日志(最终一致性)。
在注册CB的函数中:
void _pmdController::registerCB( SDB_ROLE dbrole ) { if ( SDB_ROLE_DATA == dbrole ) { ... } else if ( SDB_ROLE_COORD == dbrole ) { PMD_REGISTER_CB( sdbGetTransCB() ) ; // TRANS PMD_REGISTER_CB( sdbGetCoordCB() ) ; // COORD PMD_REGISTER_CB( sdbGetFMPCB () ) ; // FMP } ... // 每个节点都会注册的控制块 PMD_REGISTER_CB( sdbGetDMSCB() ) ; // DMS PMD_REGISTER_CB( sdbGetRTNCB() ) ; // RTN PMD_REGISTER_CB( sdbGetSQLCB() ) ; // SQL PMD_REGISTER_CB( sdbGetAggrCB() ) ; // AGGR PMD_REGISTER_CB( sdbGetPMDController() ) ; // CONTROLLER }
coord注册这几个CB之后,就开始注册和启动服务:
具体函数在_KRCB::init()中,不再表述。_KRCB::init()会根据节点的角色,启动不同的服务。
客户端连接到coord,coord便会启动一个线程,为该连接服务。
INT32 pmdTcpListenerEntryPoint ( pmdEDUCB *cb, void *pData ) { ... while ( !cb->isDisconnected() && !pListerner->isClosed() ) { SOCKET s ; rc = pListerner->accept ( &s, NULL, NULL ) ; if ( SDB_TIMEOUT == rc || SDB_TOO_MANY_OPEN_FD == rc ) { rc = SDB_OK ; continue ; } if ( rc && PMD_IS_DB_DOWN ) { rc = SDB_OK ; goto done ; } else if ( rc ) { PD_LOG ( PDERROR, "Failed to accept socket in TcpListener(rc=%d)", rc ) ; if ( pListerner->isClosed() ) { break ; } else { continue ; } } cb->incEventCount() ; ++mondbcb->numConnects ; void *pData = NULL ; *((SOCKET *) &pData) = s ; if ( !krcb->isActive() ) { ossSocket newsock ( &s ) ; newsock.close () ; continue ; } rc = eduMgr->startEDU ( EDU_TYPE_AGENT, pData, &agentEDU ) ; if ( rc ) { PD_LOG( ( rc == SDB_QUIESCED ? PDWARNING : PDERROR ), "Failed to start edu, rc: %d", rc ) ; ossSocket newsock ( &s ) ; newsock.close () ; continue ; } } //while ( ! cb->isDisconnected() ) ... }
服务线程监听到client的连接,启动一个EDU_TYPE_AGENT类型的线程,单独为client服务。
下面讲述coord节点的最主要的功能——消息转发
coord的启动初,会初始化一些必要的全局变量。在SequoiaDB中,会初始化很多command,拿创建集合空间来说,在文件SequoiaDB/engine/rtn/rtnCoord.cpp 中
RTN_COORD_CMD_BEGIN ... RTN_COORD_CMD_ADD( COORD_CMD_LISTCOLLECTIONSPACES, rtnCoordCMDListCollectionSpace ) ... RTN_COORD_OP_END
嗯,上面的代码有点MFC中消息映射的感觉。
来看看 RTN_COORD_CMD_ADD 宏:
#define RTN_COORD_CMD_ADD( cmdName, cmdClass ) {\ rtnCoordCommand *pObj = SDB_OSS_NEW cmdClass();\ _cmdMap.insert ( COORD_CMD_MAP::value_type (cmdName, pObj ));}
宏主要是new一个对象,再把对象插入到_cmdMap中,这样在程序初始化时候,便会有一系列的command对象存储在_cmdMap中。
另外,对SequoiaDB而言,所有的command操作,都是在查询操作的基础上做的,服务端用一些方法区别开是真正的查询,还是command。SequoiaDB的命令,是以$开头的字符串。
前提简述完毕,现在假设client连接上了coord,coord也创建了一个线程,为这个client服务。
INT32 _pmdLocalSession::run() { INT32 rc = SDB_OK ; UINT32 msgSize = 0 ; CHAR *pBuff = NULL ; INT32 buffSize = 0 ; pmdEDUMgr *pmdEDUMgr = NULL ; if ( !_pEDUCB ) { rc = SDB_SYS ; goto error ; } pmdEDUMgr = _pEDUCB->getEDUMgr() ; while ( !_pEDUCB->isDisconnected() && !_socket.isClosed() ) { _pEDUCB->resetInterrupt() ; _pEDUCB->resetInfo( EDU_INFO_ERROR ) ; _pEDUCB->resetLsn() ; rc = recvData( (CHAR*)&msgSize, sizeof(UINT32) ) ; // 收取数据包的前四个字节,代表该数据包有多大 if ( rc ) { if ( SDB_APP_FORCED != rc ) { PD_LOG( PDERROR, "Session[%s] failed to recv msg size, " "rc: %d", sessionName(), rc ) ; } break ; } if ( msgSize == (UINT32)MSG_SYSTEM_INFO_LEN ) // 如果包长度是 MSG_SYSTEM_INFO_LEN (-1),则这是一个请求系统信息包,coord会返回本机的字节序列给client { // 每个连接的第一个包,一定是长度标记为 MSG_SYSTEM_INFO_LEN 的包,否则字节序不正确,所有的数据都不能保证能正确解析(万一数据库运行在大端机上呢) rc = _recvSysInfoMsg( msgSize, &pBuff, buffSize ) ; if ( rc ) { break ; } rc = _processSysInfoRequest( pBuff ) ; if ( rc ) { break ; } _setHandshakeReceived() ; } else if ( msgSize < sizeof(MsgHeader) || msgSize > SDB_MAX_MSG_LENGTH ) // 对包的大小做出了限制,包长超过某值或者小于某值的包,都会导致连接中断 { PD_LOG( PDERROR, "Session[%s] recv msg size[%d] is less than " "MsgHeader size[%d] or more than max msg size[%d]", sessionName(), msgSize, sizeof(MsgHeader), SDB_MAX_MSG_LENGTH ) ; rc = SDB_INVALIDARG ; break ; } else { pBuff = getBuff( msgSize + 1 ) ; if ( !pBuff ) { rc = SDB_OOM ; break ; } buffSize = getBuffLen() ; *(UINT32*)pBuff = msgSize ; rc = recvData( pBuff + sizeof(UINT32), msgSize - sizeof(UINT32), PMD_RECV_DATA_AFTER_LENGTH_TIMEOUT ) ; // 到此处,说明程序可以愉快的接受client的发送的数据包了 if ( rc ) { if ( SDB_APP_FORCED != rc ) { PD_LOG( PDERROR, "Session[%s] failed to recv msg[len: %u], " "rc: %d", sessionName(), msgSize - sizeof(UINT32), rc ) ; } break ; } _pEDUCB->incEventCount() ; pBuff[ msgSize ] = 0 ; if ( SDB_OK != ( rc = pmdEDUMgr->activateEDU( _pEDUCB ) ) ) { PD_LOG( PDERROR, "Session[%s] activate edu failed, rc: %d", sessionName(), rc ) ; break ; } rc = _processMsg( (MsgHeader*)pBuff ) ; // 收到数据包,开始处理,该函数在结合代码讲解 if ( rc ) { break ; } if ( SDB_OK != ( rc = pmdEDUMgr->waitEDU( _pEDUCB ) ) ) { PD_LOG( PDERROR, "Session[%s] wait edu failed, rc: %d", sessionName(), rc ) ; break ; } } } // end while done: disconnect() ; return rc ; error: goto done ; }
_processMsg方法:
INT32 _pmdLocalSession::_processMsg( MsgHeader * msg ) { INT32 rc = SDB_OK ; const CHAR *pBody = NULL ; INT32 bodyLen = 0 ; rtnContextBuf contextBuff ; INT32 opCode = msg->opCode ; rc = _onMsgBegin( msg ) ; // 对数据包做前期处理,例如改数据包是不是需要返回,(若出错)需不需要回滚,并初始化好回复的数据包头部 if ( SDB_OK == rc ) { rc = _processor->processMsg( msg, contextBuff, // 我是项目经理,这个包就交给processor处理去吧,我要的是结果。 _replyHeader.contextID, // processor在不同的节点中,指向不同的对象(咦,这不是多态么?),因此也有不同的处理方式 _needReply ) ; pBody = contextBuff.data() ; // pBody指向要返回的数据,避免拷贝(提高执行效率) bodyLen = contextBuff.size() ; // 数据长度,不表 _replyHeader.numReturned = contextBuff.recordNum() ; // 返回的数据共有多少条记录 _replyHeader.startFrom = (INT32)contextBuff.getStartFrom() ; // 应该从哪一条开始读 if ( SDB_OK != rc ) { if ( _needRollback ) // 当执行过程中例如(insert, delete等),出错了,需要把数据复原 { INT32 rcTmp = rtnTransRollback( eduCB(), getDPSCB() ) ; if ( rcTmp ) { PD_LOG( PDERROR, "Session[%s] failed to rollback trans " "info, rc: %d", sessionName(), rcTmp ) ; } _needRollback = FALSE ; } } } if ( _needReply ) // 需要回复,那就再处理一下把 { if ( rc && bodyLen == 0 ) // 执行过程出错,那就返回出错信息 { _errorInfo = utilGetErrorBson( rc, _pEDUCB->getInfo( EDU_INFO_ERROR ) ) ; pBody = _errorInfo.objdata() ; bodyLen = (INT32)_errorInfo.objsize() ; _replyHeader.numReturned = 1 ; } _replyHeader.header.opCode = MAKE_REPLY_TYPE(opCode) ; // 填充回复数据包中的字段 _replyHeader.flags = rc ; _replyHeader.header.messageLength = sizeof( _replyHeader ) + bodyLen ; INT32 rcTmp = _reply( &_replyHeader, pBody, bodyLen ) ; // 把包发送给client if ( rcTmp ) { PD_LOG( PDERROR, "Session[%s] failed to send response, rc: %d", sessionName(), rcTmp ) ; disconnect() ; } } _onMsgEnd( rc, msg ) ; rc = SDB_OK ; return rc ; }
coord节点上的processor,是pmdCoordProcessor的一个实例,是用来做数据转发的,不同于真正做数据处理的pmdDataProcessor。
INT32 _pmdCoordProcessor::processMsg( MsgHeader *msg, rtnContextBuf &contextBuff, INT64 &contextID, BOOLEAN &needReply ) { ... rc = _processCoordMsg( msg, _replyHeader, contextBuff ) ; // 转给另一个函数(_processCoordMsg)处理,下面讲述 if ( SDB_COORD_UNKNOWN_OP_REQ == rc ) { contextBuff.release() ; rc = _pmdDataProcessor::processMsg( msg, contextBuff, // 如果上一个函数处理后,返回的错误是一个 SDB_COORD_UNKNOWN_OP_REQ类型,则交给pmdDataProcessor处理 contextID, needReply ) ; } ... }
pmdCoordProcessor的处理过程
INT32 _pmdCoordProcessor::_processCoordMsg( MsgHeader *msg, MsgOpReply &replyHeader, rtnContextBuf &contextBuff ) { INT32 rc = SDB_OK ; if ( NULL != _pErrorObj ) { SDB_OSS_DEL _pErrorObj ; _pErrorObj = NULL ; } if ( NULL != _pResultBuff ) { _pResultBuff = NULL ; } CoordCB *pCoordcb = _pKrcb->getCoordCB(); rtnCoordProcesserFactory *pProcesserFactory = pCoordcb->getProcesserFactory(); if ( MSG_AUTH_VERIFY_REQ == msg->opCode ) { rc = SDB_COORD_UNKNOWN_OP_REQ ; goto done ; } else if ( MSG_BS_INTERRUPTE == msg->opCode || MSG_BS_INTERRUPTE_SELF == msg->opCode || MSG_BS_DISCONNECT == msg->opCode ) { } else if ( !getClient()->isAuthed() ) // 没有用用户和密码登录,就收到了数据包的,就先尝试用默认的用户名和密码,先取得数据库的授权,否则无法做操作 { rc = getClient()->authenticate( "", "" ) ; if ( rc ) { goto done ; } } switch ( msg->opCode ) // 开始检查client要做什么样的操作了 { case MSG_BS_GETMORE_REQ : // get more操作,coord不做处理,先标记成 SDB_COORD_UNKNOWN_OP_REQ,交给其它地方处理 rc = SDB_COORD_UNKNOWN_OP_REQ ; break ; case MSG_BS_QUERY_REQ: // 查询操作,这个是重点。所有的command { MsgOpQuery *pQueryMsg = ( MsgOpQuery * )msg ; CHAR *pQueryName = pQueryMsg->name ; SINT32 queryNameLen = pQueryMsg->nameLength ; if ( queryNameLen > 0 && '$' == pQueryName[0] ) // 如果查询的name字段,是用$开头的字符串,则认为这个是command,要走command处理 { rtnCoordCommand *pCmdProcesser = pProcesserFactory->getCommandProcesser( pQueryMsg ) ; // 找到command的对象,上文中有描述所有的command都在初始化的时候,存入_cmdMap中 if ( NULL != pCmdProcesser ) { rc = pCmdProcesser->execute( ( CHAR *)msg, // 找到了,就开始command处理了 msg->messageLength, eduCB(), replyHeader, &contextBuff ) ; break ; } } // 如果没有找到,则走入 default代码块 } default: { rtnContextBase *pContext = NULL ; rtnCoordOperator *pOperator = pProcesserFactory->getOperator( msg->opCode ) ; // 交给operator处理,operator是类似于command的几个特殊的处理对象,数量比较少,此处不表 rc = pOperator->execute( ( CHAR* )msg, // 转发给对应的operator类实例 msg->messageLength, eduCB(), replyHeader, &contextBuff ) ; ... } }
以创建集合空间的command为例,看看 rtnCoordCMDListCollectionSpace 的 execute做了什么:
INT32 rtnCoordCMDCreateCollectionSpace::execute( CHAR *pReceiveBuffer, SINT32 packSize, pmdEDUCB *cb, MsgOpReply &replyHeader, rtnContextBuf *buf ) { ... MsgOpQuery *pCreateReq = (MsgOpQuery *)pReceiveBuffer; // 构造一个 MSG_CAT_CREATE_COLLECTION_SPACE_REQ 的数据包 pCreateReq->header.routeID.value = 0; pCreateReq->header.TID = cb->getTID(); pCreateReq->header.opCode = MSG_CAT_CREATE_COLLECTION_SPACE_REQ; // 数据包的类型 rc = executeOnCataGroup ( (CHAR*)pCreateReq, pRouteAgent, cb, NULL, NULL ) ; if ( rc ) { PD_LOG ( PDERROR, "create collectionspace failed, rc = %d", rc ) ; goto error ; } done : replyHeader.flags = rc ; PD_TRACE_EXITRC ( SDB_RTNCOCMDCRCS_EXE, rc ) ; return rc; error : goto done ; }
该函数的主体,构造了另外一个数据包,然后执行 executeOnCataGroup ( (CHAR*)pCreateReq, pRouteAgent, cb, NULL, NULL ) ;这一句上。跟进这一函数:
INT32 rtnCoordCommand::executeOnCataGroup ( CHAR *pBuffer, netMultiRouteAgent *pRouteAgent, pmdEDUCB *cb, rtnContextCoord *pContext, CoordGroupList *pGroupList, std::vector<BSONObj> *pReplyObjs ) { INT32 rc = SDB_OK; ... retry : rc = rtnCoordGetCatGroupInfo( cb, isNeedRefresh, catGroupInfo ); // 查询catalog的信息,主要是获取到catalog组的主节点的服务地址 if ( rc ) { probe = 100 ; goto error ; PD_LOG ( PDERROR, "Execute on catalogue node failed, failed to get " "catalogue group info(rc=%d)", rc ); } rc = rtnCoordSendRequestToPrimary( pBuffer, catGroupInfo, sendNodes, // 跟了这么久,做了那么多的准备,这一句才是真开始了,有兴趣可以自己看一下 :) pRouteAgent, MSG_ROUTE_CAT_SERVICE, cb ); if ( rc ) { probe = 200 ; goto error ; } rc = rtnCoordGetReply( cb, sendNodes, replyQue, // 等待并收取远程节点处理的返回信息 MAKE_REPLY_TYPE(((MsgHeader*)pBuffer)->opCode) ) ; ... }
rtnCoordSendRequestToPrimary就不再详细跟进描述了,根据函数名,大致就可以了解一个大概,是把数据发送到指定组(此处是catalog组)的主节点。
coord上的其它command或者operator也是采用类似的方法来转发消息给其它节点,就不再一一赘述了。
综合全文的讲述,coord处理client请求的流程
发送请求给coord节点
coord先揪出这个请求是做什么
交给对应的command处理
查询(本地缓存或者远程获取的)catalog信息
把消息转成节点间的内部消息
转发给目标节点
然后等待返回数据
再把返回数据交给处理线程
线程把返回结果发送给client
=====>THE END<=====
相关文章推荐
- Could not parse mapping document from input stream
- visual studio各个版本的差异
- Oracle数据库10个小问题
- IOS atomic与nonatomic,assign,copy与retain的定义和区别
- HDU 5573 Binary Tree 2015上海现场赛B题
- gulpfile.js 中的坑--注入篇(gulp-inject,gulp-wiredep)
- js取json数据笔记
- Linux学习笔记(二) - sort,uniq,cut,wc 命令详解
- leetcode:226 Invert Binary Tree-每日编程第七题
- Swift使用Playground编程
- MyEclipse SVN的使用
- CSS文件动态加载(续)
- Mac系统的JDK8默认安装路径
- Windowsコマンドの使い方(基本操作)
- LaunchMode加载模式
- leetcode:100 Same Tree-每日编程第六题
- (BUG) SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder"问题解决 .
- React-Native入门
- [原创][Verilog]个人.v文件书写规范
- java异常处理【转载】