您的位置:首页 > 其它

Flume-ng MemoryChannel 源码分析

2015-02-02 11:10 471 查看


 首先,MemoryChannel继承了BasicChannelSemantics这个类,所以在分析Memory之前,要分析一下它继承的类。              

观察了BasicChannelSemantics这个类的结构,又继承了一个叫AbstarctChannel的抽象类,而AbstarctChannel实现了Channel、Configurable和LifecycleAware这三个接口,而Channel又继承了NameCampoent、LifecycleAware这两个接口。(非常绕的感觉。。。)。绕回来,总结下,就是AbstarctChannel这个抽象类含了Configurable、LifecycleAware、NameCampoent三个接口,并且涵盖了Channel接口的三个方法:put(),take(),getTransaction(),最终要实现这些接口。如下图:

现在,BasicChannelSemantics继承了AbstractChannel,可以继承和改写AbstractChannel这个抽象类的方法。而事实上,BasicChannelSemantics只是覆盖了put(),take(),getTransaction().这些函数。继承了BasicChannelSemantics的MemoryChannel,也同样改写了这些函数,并且还有些其他的函数。具体可以自己去看。然后其中声明了一个抽象函数,如下图:

言归正传,分析下MemoryChannel的组成结构。

如上述所说,Memory授信实现了从BasicChannelSemantics那里继承下来的方法,实现createTransaction()方法,覆盖了start(),stop(),configure()的方法。

1.createTransaction(),用于创建transaction的事务。这个函数又是在哪里被调用的呢,可以看以下BasicChannelSemantics的getTransaction()函数。如下。在getTransaction()中,如果transaction没有,就新建一个transaction,调用了createTransaction(),而所有的Transaction都是放在currentTransaction这个对象里面。

@Override
protected BasicTransactionSemantics createTransaction() {
return new MemoryTransaction(transCapacity, channelCounter);
}


@Override
public Transaction getTransaction() {

if (!initialized) {
synchronized (this) {
if (!initialized) {
initialize();
initialized = true;
}
}
}

BasicTransactionSemantics transaction = currentTransaction.get();
if (transaction == null || transaction.getState().equals(
BasicTransactionSemantics.State.CLOSED)) {
transaction = createTransaction();
currentTransaction.set(transaction);
}
return transaction;
}

2.estimateEventSize(Event)很简单,就是返回event的字节长度。

3.自定义了一个继承BasicTransactionSemantics的类,MemoryTransaction。该类的就是声明了MemoryChannel所对应的事务的类。对event的所有操作都是在这个类里面进行的。在这个类里面有3个非常至关重要的函数:doput(),dotake(),doCommit().

在MemoryChannel中,所有Transaction中的event都是放在全局变量:中,每个Transaction中的event都是放在,而dotake()和doput()则是对takeList和putList和queue进行放入拿出的操作。

3.1dotake();做了这么几件事:1、从queue中拿出event。2、把event放入takeList()。当然这些操作要保证资源的安全,线程同步。

@Override
protected Event doTake() throws InterruptedException {
channelCounter.incrementEventTakeAttemptCount();
if(takeList.remainingCapacity() == 0) {
throw new ChannelException("Take list for MemoryTransaction, capacity " +
takeList.size() + " full, consider committing more frequently, " +
"increasing capacity, or increasing thread count");
}
if(!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
return null;
}
Event event;
synchronized(queueLock) {
event = queue.poll();
}
Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
"signalling existence of entry");
takeList.put(event);

int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
takeByteCounter += eventByteSize;

return event;
}


3.2doput().把event放入putList().

@Override
protected void doPut(Event event) throws InterruptedException {
channelCounter.incrementEventPutAttemptCount();
int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);

if (!putList.offer(event)) {
throw new ChannelException(
"Put queue for MemoryTransaction of capacity " +
putList.size() + " full, consider committing more frequently, " +
"increasing capacity or increasing thread count");
}
putByteCounter += eventByteSize;
}

3.3doCommit().putList里面的events放入queue中,清空putlist和takeList的资源。

@Override
protected void doCommit() throws InterruptedException {
int remainingChange = takeList.size() - putList.size();
if(remainingChange < 0) {
if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive,
TimeUnit.SECONDS)) {
throw new ChannelException("Cannot commit transaction. Heap space " +
"limit of " + byteCapacity + "reached. Please increase heap space" +
" allocated to the channel as the sinks may not be keeping up " +
"with the sources");
}
if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
bytesRemaining.release(putByteCounter);
throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
" Sinks are likely not keeping up with sources, or the buffer size is too tight");
}
}
int puts = putList.size();
int takes = takeList.size();
synchronized(queueLock) {
if(puts > 0 ) {
while(!putList.isEmpty()) {
if(!queue.offer(putList.removeFirst())) {
throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
}
}
}
putList.clear();
takeList.clear();
}
bytesRemaining.release(takeByteCounter);
takeByteCounter = 0;
putByteCounter = 0;

queueStored.release(puts);
if(remainingChange > 0) {
queueRemaining.release(remainingChange);
}
if (puts > 0) {
channelCounter.addToEventPutSuccessCount(puts);
}
if (takes > 0) {
channelCounter.addToEventTakeSuccessCount(takes);
}

channelCounter.setChannelSize(queue.size());
}




内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  flume-ng