您的位置:首页 > 其它

kafka consumer gc问题探讨

2015-03-14 14:42 239 查看
    kafka是一种高性能的分布式消息系统,通常用于大规模的数据处理, 其吞吐量是非常高的,我们的蛮多业务线都用到了kafka,包括实时模型,用户标签,实时点击率等,在开发的过程中间,踩了一些坑,其中consumer gc问题便碰到过几次,在这里探讨下可能的解决方式。

    kafka consumer目前提供两种API, 分别为high-level api 和 SimpleConsumer api, 通常我们都是使用的high-level 的api,简单的创建consumer,通过可配置参数设置数据拉取的offset, 自动提交以及超时时间等,不用关注broker和partitions的选择等。kafka 的consumer会在后台启动线程去获取topic的消息信息,由于其很高的吞吐量,当拉取的消息来不及消费就会造成消息的堆积,时间久了就会出现内存gc的问题,我觉得可能通过以下的方式进行处理:

1. 增大机器的内存和性能、提高程序的异步处理能力。接入kafka的一般是计算密集型的,把计算任务并行化会提高消息的消费能力,而增大内存和提高机器性能更为直接,之前就碰到过将程序从虚拟机迁到实体机器就抗住的情形。

2. 修改consumer的配置或者消费方式, 有几次测试的时候,发现服务停止一段时间再重启就很容易出现gc问题,因为从老的offset开始消费,kafka已经堆积了很多消息了,拉去的数据很快会将内存占满,这个时候可以将consumer的group信息从zookeeper删除或者设置成从最新的开始消费一般能解决问题,但是这样就不能从历史的offset开始消费,会存在数据丢失的问题。其次就是尝试设置下数据拉取的数据块的大小,即fetch.messages.max.bytes,不过这个没有验证过。

3. 使用SimpleConsumer api并限制数据拉取的速度, 这个可以借鉴spark streaming RateLimiter的方式。这里贴出java的代码,参数desiredRate<=0的时候,不限制速度,当>0的时候随着数值的增加,限制降低。

public class RateLimiter {

private long lastSyncTime = System.nanoTime();
private long messagesWrittenSinceSync = 0L;
private long desiredRate = 0;
private long SYNC_INTERVAL = TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS);

public void setDesiredRate(long desiredRate) {
this.desiredRate = desiredRate;
}

public void waitToPull() {
if (desiredRate <= 0) return;
long now = System.nanoTime();
long elapsedNanosecs = Math.max(now - lastSyncTime, 1);
double rate = Double.valueOf(messagesWrittenSinceSync) * 1000000000 / elapsedNanosecs;
if (rate < desiredRate) {
messagesWrittenSinceSync += 1;
if (now > lastSyncTime + SYNC_INTERVAL) {
lastSyncTime = now;
messagesWrittenSinceSync = 1;
}
} else {
long targetTimeInMills = messagesWrittenSinceSync * 1000 / desiredRate;
long elapsedTimeInMills = elapsedNanosecs / 1000000;
long sleepTimeInMills = targetTimeInMills - elapsedTimeInMills;
if (sleepTimeInMills > 0) {
try {
Thread.sleep(sleepTimeInMills);
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println(sleepTimeInMills);
waitToPull();
}
}
}


4.使用redis来存储数据拉取的状态,按照一定的心跳机制向redis同步状态并修改。5. 一般的处理流程是将任务提交给线程池来执行, 线程池可以通过控制任务队列的大小和队列满之后的操作来控制数据拉取和消费, java 里面ThreadPoolExecutor 可以定制队列大小和队列满的时候可以进行的操作,下面通过RejectedExecutionHandler 来实现队列满的时候调用put操作阻塞任务队列!
public static RejectedExecutionHandler getBlockedConsumerHandler() {
return new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
LOGGER.warn("getRejectedExecutionHandler error! e = {}", e);
}
}
}
};
}
/*线程池的初始化*/
ExecutorService executor = new ThreadPoolExecutor(threads, maxThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(10240), getBlockedConsumerHandler());
<>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: