Flume-ng MemoryChannel 源码分析
2015-02-02 11:10
471 查看
首先,MemoryChannel继承了BasicChannelSemantics这个类,所以在分析Memory之前,要分析一下它继承的类。
观察了BasicChannelSemantics这个类的结构,又继承了一个叫AbstarctChannel的抽象类,而AbstarctChannel实现了Channel、Configurable和LifecycleAware这三个接口,而Channel又继承了NameCampoent、LifecycleAware这两个接口。(非常绕的感觉。。。)。绕回来,总结下,就是AbstarctChannel这个抽象类含了Configurable、LifecycleAware、NameCampoent三个接口,并且涵盖了Channel接口的三个方法:put(),take(),getTransaction(),最终要实现这些接口。如下图:
现在,BasicChannelSemantics继承了AbstractChannel,可以继承和改写AbstractChannel这个抽象类的方法。而事实上,BasicChannelSemantics只是覆盖了put(),take(),getTransaction().这些函数。继承了BasicChannelSemantics的MemoryChannel,也同样改写了这些函数,并且还有些其他的函数。具体可以自己去看。然后其中声明了一个抽象函数,如下图:
言归正传,分析下MemoryChannel的组成结构。
如上述所说,Memory授信实现了从BasicChannelSemantics那里继承下来的方法,实现createTransaction()方法,覆盖了start(),stop(),configure()的方法。
1.createTransaction(),用于创建transaction的事务。这个函数又是在哪里被调用的呢,可以看以下BasicChannelSemantics的getTransaction()函数。如下。在getTransaction()中,如果transaction没有,就新建一个transaction,调用了createTransaction(),而所有的Transaction都是放在currentTransaction这个对象里面。
@Override
public Transaction getTransaction() {
if (!initialized) {
synchronized (this) {
if (!initialized) {
initialize();
initialized = true;
}
}
}
BasicTransactionSemantics transaction = currentTransaction.get();
if (transaction == null || transaction.getState().equals(
BasicTransactionSemantics.State.CLOSED)) {
transaction = createTransaction();
currentTransaction.set(transaction);
}
return transaction;
}
2.estimateEventSize(Event)很简单,就是返回event的字节长度。
3.自定义了一个继承BasicTransactionSemantics的类,MemoryTransaction。该类的就是声明了MemoryChannel所对应的事务的类。对event的所有操作都是在这个类里面进行的。在这个类里面有3个非常至关重要的函数:doput(),dotake(),doCommit().
在MemoryChannel中,所有Transaction中的event都是放在全局变量:中,每个Transaction中的event都是放在,而dotake()和doput()则是对takeList和putList和queue进行放入拿出的操作。
3.1dotake();做了这么几件事:1、从queue中拿出event。2、把event放入takeList()。当然这些操作要保证资源的安全,线程同步。
3.2doput().把event放入putList().
@Override
protected void doPut(Event event) throws InterruptedException {
channelCounter.incrementEventPutAttemptCount();
int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
if (!putList.offer(event)) {
throw new ChannelException(
"Put queue for MemoryTransaction of capacity " +
putList.size() + " full, consider committing more frequently, " +
"increasing capacity or increasing thread count");
}
putByteCounter += eventByteSize;
}
3.3doCommit().putList里面的events放入queue中,清空putlist和takeList的资源。
@Override
protected void doCommit() throws InterruptedException {
int remainingChange = takeList.size() - putList.size();
if(remainingChange < 0) {
if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive,
TimeUnit.SECONDS)) {
throw new ChannelException("Cannot commit transaction. Heap space " +
"limit of " + byteCapacity + "reached. Please increase heap space" +
" allocated to the channel as the sinks may not be keeping up " +
"with the sources");
}
if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
bytesRemaining.release(putByteCounter);
throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
" Sinks are likely not keeping up with sources, or the buffer size is too tight");
}
}
int puts = putList.size();
int takes = takeList.size();
synchronized(queueLock) {
if(puts > 0 ) {
while(!putList.isEmpty()) {
if(!queue.offer(putList.removeFirst())) {
throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
}
}
}
putList.clear();
takeList.clear();
}
bytesRemaining.release(takeByteCounter);
takeByteCounter = 0;
putByteCounter = 0;
queueStored.release(puts);
if(remainingChange > 0) {
queueRemaining.release(remainingChange);
}
if (puts > 0) {
channelCounter.addToEventPutSuccessCount(puts);
}
if (takes > 0) {
channelCounter.addToEventTakeSuccessCount(takes);
}
channelCounter.setChannelSize(queue.size());
}
首先,MemoryChannel继承了BasicChannelSemantics这个类,所以在分析Memory之前,要分析一下它继承的类。
观察了BasicChannelSemantics这个类的结构,又继承了一个叫AbstarctChannel的抽象类,而AbstarctChannel实现了Channel、Configurable和LifecycleAware这三个接口,而Channel又继承了NameCampoent、LifecycleAware这两个接口。(非常绕的感觉。。。)。绕回来,总结下,就是AbstarctChannel这个抽象类含了Configurable、LifecycleAware、NameCampoent三个接口,并且涵盖了Channel接口的三个方法:put(),take(),getTransaction(),最终要实现这些接口。如下图:
现在,BasicChannelSemantics继承了AbstractChannel,可以继承和改写AbstractChannel这个抽象类的方法。而事实上,BasicChannelSemantics只是覆盖了put(),take(),getTransaction().这些函数。继承了BasicChannelSemantics的MemoryChannel,也同样改写了这些函数,并且还有些其他的函数。具体可以自己去看。然后其中声明了一个抽象函数,如下图:
言归正传,分析下MemoryChannel的组成结构。
如上述所说,Memory授信实现了从BasicChannelSemantics那里继承下来的方法,实现createTransaction()方法,覆盖了start(),stop(),configure()的方法。
1.createTransaction(),用于创建transaction的事务。这个函数又是在哪里被调用的呢,可以看以下BasicChannelSemantics的getTransaction()函数。如下。在getTransaction()中,如果transaction没有,就新建一个transaction,调用了createTransaction(),而所有的Transaction都是放在currentTransaction这个对象里面。
@Override protected BasicTransactionSemantics createTransaction() { return new MemoryTransaction(transCapacity, channelCounter); }
@Override
public Transaction getTransaction() {
if (!initialized) {
synchronized (this) {
if (!initialized) {
initialize();
initialized = true;
}
}
}
BasicTransactionSemantics transaction = currentTransaction.get();
if (transaction == null || transaction.getState().equals(
BasicTransactionSemantics.State.CLOSED)) {
transaction = createTransaction();
currentTransaction.set(transaction);
}
return transaction;
}
2.estimateEventSize(Event)很简单,就是返回event的字节长度。
3.自定义了一个继承BasicTransactionSemantics的类,MemoryTransaction。该类的就是声明了MemoryChannel所对应的事务的类。对event的所有操作都是在这个类里面进行的。在这个类里面有3个非常至关重要的函数:doput(),dotake(),doCommit().
在MemoryChannel中,所有Transaction中的event都是放在全局变量:中,每个Transaction中的event都是放在,而dotake()和doput()则是对takeList和putList和queue进行放入拿出的操作。
3.1dotake();做了这么几件事:1、从queue中拿出event。2、把event放入takeList()。当然这些操作要保证资源的安全,线程同步。
@Override protected Event doTake() throws InterruptedException { channelCounter.incrementEventTakeAttemptCount(); if(takeList.remainingCapacity() == 0) { throw new ChannelException("Take list for MemoryTransaction, capacity " + takeList.size() + " full, consider committing more frequently, " + "increasing capacity, or increasing thread count"); } if(!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) { return null; } Event event; synchronized(queueLock) { event = queue.poll(); } Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " + "signalling existence of entry"); takeList.put(event); int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize); takeByteCounter += eventByteSize; return event; }
3.2doput().把event放入putList().
@Override
protected void doPut(Event event) throws InterruptedException {
channelCounter.incrementEventPutAttemptCount();
int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
if (!putList.offer(event)) {
throw new ChannelException(
"Put queue for MemoryTransaction of capacity " +
putList.size() + " full, consider committing more frequently, " +
"increasing capacity or increasing thread count");
}
putByteCounter += eventByteSize;
}
3.3doCommit().putList里面的events放入queue中,清空putlist和takeList的资源。
@Override
protected void doCommit() throws InterruptedException {
int remainingChange = takeList.size() - putList.size();
if(remainingChange < 0) {
if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive,
TimeUnit.SECONDS)) {
throw new ChannelException("Cannot commit transaction. Heap space " +
"limit of " + byteCapacity + "reached. Please increase heap space" +
" allocated to the channel as the sinks may not be keeping up " +
"with the sources");
}
if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
bytesRemaining.release(putByteCounter);
throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
" Sinks are likely not keeping up with sources, or the buffer size is too tight");
}
}
int puts = putList.size();
int takes = takeList.size();
synchronized(queueLock) {
if(puts > 0 ) {
while(!putList.isEmpty()) {
if(!queue.offer(putList.removeFirst())) {
throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
}
}
}
putList.clear();
takeList.clear();
}
bytesRemaining.release(takeByteCounter);
takeByteCounter = 0;
putByteCounter = 0;
queueStored.release(puts);
if(remainingChange > 0) {
queueRemaining.release(remainingChange);
}
if (puts > 0) {
channelCounter.addToEventPutSuccessCount(puts);
}
if (takes > 0) {
channelCounter.addToEventTakeSuccessCount(takes);
}
channelCounter.setChannelSize(queue.size());
}
相关文章推荐
- 【Flume】【源码分析】深入flume-ng的三大组件——source,channel,sink
- flume-ng源码阅读memory-channel(原创)
- Flume NG源码分析(七)ChannelSelector
- 【Flume】【源码分析】深入flume-ng的三大组件——source,channel,sink
- 【Flume】【源码分析】深入flume-ng的三大组件——source,channel,sink
- flume channel monitor实现源码分析
- 【Java】【Flume】Flume-NG启动过程源码分析(三)
- 【Java】【Flume】Flume-NG启动过程源码分析(一)
- 【Java】【Flume】Flume-NG启动过程源码分析(一)
- Flume-NG内置计数器(监控)源码级分析
- Flume-NG启动过程源码分析(二)(原创)
- 【Java】【Flume】Flume-NG启动过程源码分析(一)
- Flume之ChannelSelector源码分析
- 日志收集之flume-ng源码分析
- 【Java】【Flume】Flume-NG启动过程源码分析(三)
- Flume-NG启动过程源码分析(一)(原创)
- Flume-NG源码阅读之FileChannel
- 【Java】【Flume】Flume-NG启动过程源码分析(二)
- Flume 实战(2)--Flume-ng-sdk源码分析
- Flume NG源码分析(二)支持运行时动态修改配置的配置模块