您的位置:首页 > 产品设计 > UI/UE

Druid Kafka indexing task停不下来,不进行Handoff

2016-12-28 19:16 274 查看
问题描述:Kafka indexing task,过了taskDuration仍在运行、ingest实时数据,不生成segments,导致coordinator console中最近的segments都是0B(即没有生成segments,数据都在middlemanager上)。

原因分析:

1).  默认配置中supervisor的workerThreads(default == min(10, taskCount)),有时会导致lock up。该issue会在以后的版本中改进,目前处理方法为将workerThreads调大一点((taskCount
* replicas + 1))。可以参见:https://groups.google.com/forum/#!topic/druid-user/TnZx0rjl03Q

2).  0.9.1.1源码中调用java定时执行任务函数ScheduleExecutorService. scheduleAtFixedRate() ,未做异常处理。在java doc中,如果定时任务执行过程中遇到发生异常,则后面的任务将不再执行。 

 KafkaSupervisor类中的定时方法

scheduledExec.scheduleAtFixedRate(

buildRunTask(),
 ioConfig.getStartDelay().getMillis(),
 Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS),
 TimeUnit.MILLISECONDS

);

其中buildRunTask()最终执行的任务是

void runInternal()

{

possiblyRegisterListener();
 upda
4000
tePartitionDataFromKafka();
 discoverTasks();
 updateTaskStatus();
 checkTaskDuration();
 checkPendingCompletionTasks();
 checkCurrentTaskState();
 createNewTasks();

}

解决方法:给相应方法加上try and catch。

private class RunNotice implements Notice{

    @Override
    public void handle() throws ExecutionException, InterruptedException{
    long nowTime = System.currentTimeMillis();
    if (nowTime - lastRunTime < MAX_RUN_FREQUENCY_MILLIS) {
        return;
    }

    lastRunTime = nowTime;
    try{

        runInternal();
     }catch(InterruptedException | ExecutionException | TimeoutException e){

        log.info("periodic action fails!!!", e);
     }

   }

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: