Druid Kafka indexing task停不下来,不进行Handoff
2016-12-28 19:16
274 查看
问题描述:Kafka indexing task,过了taskDuration仍在运行、ingest实时数据,不生成segments,导致coordinator console中最近的segments都是0B(即没有生成segments,数据都在middlemanager上)。
原因分析:
1). 默认配置中supervisor的workerThreads(default == min(10, taskCount)),有时会导致lock up。该issue会在以后的版本中改进,目前处理方法为将workerThreads调大一点((taskCount
* replicas + 1))。可以参见:https://groups.google.com/forum/#!topic/druid-user/TnZx0rjl03Q
2). 0.9.1.1源码中调用java定时执行任务函数ScheduleExecutorService. scheduleAtFixedRate() ,未做异常处理。在java doc中,如果定时任务执行过程中遇到发生异常,则后面的任务将不再执行。
KafkaSupervisor类中的定时方法
scheduledExec.scheduleAtFixedRate(
buildRunTask(),
ioConfig.getStartDelay().getMillis(),
Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS),
TimeUnit.MILLISECONDS
);
其中buildRunTask()最终执行的任务是
void runInternal()
{
possiblyRegisterListener();
upda
4000
tePartitionDataFromKafka();
discoverTasks();
updateTaskStatus();
checkTaskDuration();
checkPendingCompletionTasks();
checkCurrentTaskState();
createNewTasks();
}
解决方法:给相应方法加上try and catch。
private class RunNotice implements Notice{
@Override
public void handle() throws ExecutionException, InterruptedException{
long nowTime = System.currentTimeMillis();
if (nowTime - lastRunTime < MAX_RUN_FREQUENCY_MILLIS) {
return;
}
lastRunTime = nowTime;
try{
runInternal();
}catch(InterruptedException | ExecutionException | TimeoutException e){
log.info("periodic action fails!!!", e);
}
}
}
原因分析:
1). 默认配置中supervisor的workerThreads(default == min(10, taskCount)),有时会导致lock up。该issue会在以后的版本中改进,目前处理方法为将workerThreads调大一点((taskCount
* replicas + 1))。可以参见:https://groups.google.com/forum/#!topic/druid-user/TnZx0rjl03Q
2). 0.9.1.1源码中调用java定时执行任务函数ScheduleExecutorService. scheduleAtFixedRate() ,未做异常处理。在java doc中,如果定时任务执行过程中遇到发生异常,则后面的任务将不再执行。
KafkaSupervisor类中的定时方法
scheduledExec.scheduleAtFixedRate(
buildRunTask(),
ioConfig.getStartDelay().getMillis(),
Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS),
TimeUnit.MILLISECONDS
);
其中buildRunTask()最终执行的任务是
void runInternal()
{
possiblyRegisterListener();
upda
4000
tePartitionDataFromKafka();
discoverTasks();
updateTaskStatus();
checkTaskDuration();
checkPendingCompletionTasks();
checkCurrentTaskState();
createNewTasks();
}
解决方法:给相应方法加上try and catch。
private class RunNotice implements Notice{
@Override
public void handle() throws ExecutionException, InterruptedException{
long nowTime = System.currentTimeMillis();
if (nowTime - lastRunTime < MAX_RUN_FREQUENCY_MILLIS) {
return;
}
lastRunTime = nowTime;
try{
runInternal();
}catch(InterruptedException | ExecutionException | TimeoutException e){
log.info("periodic action fails!!!", e);
}
}
}
相关文章推荐
- [simple type, class io.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig] value failed:
- DRUID: kafka-indexing-service如何使用
- Druid Kafka indexing service (Druid 导入Kafka 数据)
- Spring Boot使用Druid进行维度的统计和监控
- 通过java 向kafka写入数据,druid从kafka加载数据
- 使用 Kafka 和 MongoDB 进行 Go 异步处理
- Druid通过Kafka加载数据
- 使用Flume+Kafka+SparkStreaming进行实时日志分析
- 在pring-data-jpa中使用druid进行数据库连接的配置
- spark接收kafka的数据运行spark程序节点的task数据倾斜
- Druid Kafka indexing service (上篇 )
- 从Kafka topic中获取数据并在Storm中进行分析
- 使用Apache Flink和Kafka进行大数据流处理
- 即将上线的Kafka 集群(用CM部署的)无法使用“--bootstrap-server”进行消费,怎么破?
- 将ES中的数据推到kafka中,用kylin进行OLAP计算,发现统计数据对不上
- 使用nodejs对kafka、zookeeper数据进行消费consumer
- Kafka使用Java客户端进行访问的示例代码
- 从Kafka topic中获取数据并在Spark中进行分析
- 基于Kafka 0.9版本 使用ACL进行权限控制
- 使用alibaba Druid 对sql 语句进行校验