您的位置:首页 > 编程语言

[代码] 如何处理添加和删除文档请求

2013-07-05 21:04 363 查看





who got updateRequest first?

the answer is SolrDispatchFilter. let's look a graph first:



1.  .doFilter receive any update request first.  doFilter will choice a right core for this request.


      a. if the request specify a core(for index request) or cores(for search request), give the request to the specified core(s)

      b. if not, we will try the collection. find the first leader core then send this request to it. usually the

    then doFilter will find the rigth requestHander for the request according the request. the requestHandler would be  :

  


the REST API decide which handler will be taken

2. now, we got a handler and a core to process this request. bring the request to core.execute(handler, req, rsp). the specified core will deal with the rest.

the core add some header to response the call supper.handleRequestBody(req, rsp)

  


above is the full code of HandleRequestBody(……)

1. create all the processor in the processorChain and return the first processor(LogUpdateProcessor)

 the processorChain : 


2. got a documentLoader according to the requestHandler.  then call Loader.load(req,
rsp, stream)

 .load() call processUpdate and process different commands respectively.   all update request pass to processor created in the step 1. the processor is LogUpdateProcessor.
it call next processor in the processor chain  recursively

 the commit command with add or delete request will be removed in this progress. commit will execute at the next step 3

 if we recieve a json format index request, the documentLoader here would be JsonLoader.  we discuss add doc request processing which is the same as delete request

 we look into the JsonLoader.processUpdate()

void
 
processUpdate() 
throws
 
IOException

    
{

      
int
 
ev = parser.nextEvent();

      
while
( ev != JSONParser.EOF ) {

 
        
switch
( ev )

        
{

          
case
 
JSONParser.ARRAY_START:

            
handleAdds();

            
break
;

 
        
case
 
JSONParser.STRING:

          
if
( parser.wasKey() ) {

            
String v = parser.getString();

            
if
( v.equals( UpdateRequestHandler.ADD ) ) {

              
int
 
ev2 = parser.nextEvent();

              
if
 
(ev2 == JSONParser.OBJECT_START) {

                
processor.processAdd( parseAdd() );

              
else
 
if
 
(ev2 == JSONParser.ARRAY_START) {

                
handleAdds();

              
else
 
{

                
assertEvent(ev2, JSONParser.OBJECT_START);

              
}

            
}

            
else
 
if
( v.equals( UpdateRequestHandler.COMMIT ) ) {

              
CommitUpdateCommand cmd = 
new
 
CommitUpdateCommand(req,  
false
 
);

              
cmd.waitSearcher = 
true
;

              
parseCommitOptions( cmd );

              
processor.processCommit( cmd );

            
}

            
else
 
if
( v.equals( UpdateRequestHandler.OPTIMIZE ) ) {

              
CommitUpdateCommand cmd = 
new
 
CommitUpdateCommand(req, 
true
 
);

              
cmd.waitSearcher = 
true
;

              
parseCommitOptions( cmd );

              
processor.processCommit( cmd );

            
}

            
else
 
if
( v.equals( UpdateRequestHandler.DELETE ) ) {

              
handleDeleteCommand();

            
}

            
else
 
if
( v.equals( UpdateRequestHandler.ROLLBACK ) ) {

              
processor.processRollback( parseRollback() );

            
}

            
else
 
{

              
throw
 
new
 
SolrException(SolrException.ErrorCode.BAD_REQUEST, 
"Unknown command: "
+v+
" ["
+parser.getPosition()+
"]"
 
);

            
}

            
break
;

          
}

          
// fall through

 
        
case
 
JSONParser.LONG:

        
case
 
JSONParser.NUMBER:

        
case
 
JSONParser.BIGNUMBER:

        
case
 
JSONParser.BOOLEAN:

        
case
 
JSONParser.NULL:

          
log.info( 
"can't have a value here! "

              
+JSONParser.getEventString(ev)+
" "
+parser.getPosition() );

 
        
case
 
JSONParser.OBJECT_START:

        
case
 
JSONParser.OBJECT_END:

        
case
 
JSONParser.ARRAY_END:

          
break
;

 
        
default
:

          
log.info(
"Noggit UNKNOWN_EVENT_ID:"
+ev);

          
break
;

        
}

        
// read the next event

        
ev = parser.nextEvent();

      
}

    
}


so for ADD, processor.processAdd(cmd) is called. here, processor is LogUpdateProcessor.

LogUpdateProcessor.processAdd(cmd)

public
 
void
 
processAdd(AddUpdateCommand cmd) 
throws
 
IOException {

    
if
 
(logDebug) { log.debug(
"PRE_UPDATE "
 
+ cmd.toString()); }

 
    
// call delegate first so we can log things like the version that get set later

    
if
 
(next != 
null
) next.processAdd(cmd);

 
    
// Add a list of added id's to the response

    
if
 
(adds == 
null
) {

      
adds = 
new
 
ArrayList<String>();

      
toLog.add(
"add"
,adds);

    
}

 
    
if
 
(adds.size() < maxNumToLog) {
//10

      
long
 
version = cmd.getVersion();

      
String msg = cmd.getPrintableId();

      
if
 
(version != 
0
) msg = msg + 
" ("
 
+ version + 
')'
;

      
adds.add(msg);

    
}

 
    
numAdds++;

  
}


it call next.processAdd(cmd), DistributedUpdateProcessor.processAdd(cmd),
firstly . then LogUpdateProcessor.processAdd( cmd) will write some logs info.

now, the request  deal by DistributedUpdateProcessor.processAdd(cmd).  DistributedUpdateProcessor is
the key point for cloud solr. it decid which silce the request should send to.

follow code hash the request  cmd then decid which nodes this request should sennd to.  whether or not  i'm a leader. seting forwardToLeader flag to indicate whether or not to distribute
to leader.

int
 
hash = 
0
;

    
if
 
(zkEnabled) {

      
zkCheck();

      
hash = hash(cmd);

      
nodes = setupRequest(hash);

    
else
 
{

      
isLeader = getNonZkLeaderAssumption(req);

    
}


if
 
(!forwardToLeader) {

      
// clone the original doc

      
SolrInputDocument clonedDoc = cmd.solrDoc.deepCopy();

      
dropCmd = versionAdd(cmd, clonedDoc);

      
cmd.solrDoc = clonedDoc;

    
}


if i'm leader, call versionAdd(cmd, cloneDoc). this function will call local methed to add this index request

if there are any node need to distribute to then do :

if
 
(nodes != 
null
) {

      
params = 
new
 
ModifiableSolrParams(req.getParams());

      
params.set(DISTRIB_UPDATE_PARAM,

                 
(isLeader ?

                  
DistribPhase.FROMLEADER.toString() :

                  
DistribPhase.TOLEADER.toString()));

      
params.remove(
"commit"
); 
// this will be distributed from the local commit

      
cmdDistrib.distribAdd(cmd, nodes, params);  
//key function for distribute command

    
}


cmdDistrib.distribAdd(cmd, nodes, params) will send
this addRequest(cmd) to all nodes.

3. process commit or rollback if the request contain those kind of  commands

4. call processor.finish() to finish the request processing and finish the severlet response
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐