您的位置:首页 > 运维架构

flume监控

2016-07-07 11:11 661 查看
flume提供了一个度量框架,可以通过http的方式进行展现,当启动agent的时候通过传递参数 -Dflume.monitoring.type=http参数给flume agent:

1
2
3
4

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1
-Dflume.monitoring.type=http
-Dflume.monitoring.port=5653
-Dflume.root.logger=INFO,console

这样flume会在5653端口上启动一个HTTP服务器,访问如下地址,将返回JSON格式的flume相关指标参数:

1
2
3
45
6
7
8
9
10
1112
13
14
15
16
17
18
19
20
2122
23
24
25
26
27
28
29
30
3132
33
34
35
36
37
38
39
40
4142
43
44
45

demo:

访问: http://flume-agent-host:5653/metrics 结果: 其中src-1是子自定义的source名称
{
"SOURCE.src-1":{
"OpenConnectionCount":"0",		//目前与客户端或sink保持连接的总数量(目前只有avro source展现该度量)
"Type":"SOURCE",
"AppendBatchAcceptedCount":"1355",	//成功提交到channel的批次的总数量
"AppendBatchReceivedCount":"1355",	//接收到事件批次的总数量
"EventAcceptedCount":"28286",	//成功写出到channel的事件总数量,且source返回success给创建事件的sink或RPC客户端系统
"AppendReceivedCount":"0",		//每批只有一个事件的事件总数量(与RPC调用中的一个append调用相等)
"StopTime":"0",			//source停止时自Epoch以来的毫秒值时间
"StartTime":"1442566410435",	//source启动时自Epoch以来的毫秒值时间
"EventReceivedCount":"28286",	//目前为止source已经接收到的事件总数量
"AppendAcceptedCount":"0"		//单独传入的事件到Channel且成功返回的事件总数量
},
"CHANNEL.ch-1":{
"EventPutSuccessCount":"28286",	//成功写入channel且提交的事件总数量
"ChannelFillPercentage":"0.0",	//channel满时的百分比
"Type":"CHANNEL",
"StopTime":"0",			//channel停止时自Epoch以来的毫秒值时间
"EventPutAttemptCount":"28286",	//Source尝试写入Channe的事件总数量
"ChannelSize":"0",			//目前channel中事件的总数量
"StartTime":"1442566410326",	//channel启动时自Epoch以来的毫秒值时间
"EventTakeSuccessCount":"28286",	//sink成功读取的事件的总数量
"ChannelCapacity":"1000000",       //channel的容量
"EventTakeAttemptCount":"313734329512" //sink尝试从channel拉取事件的总数量。这不意味着每次事件都被返回,因为sink拉取的时候channel可能没有任何数据
},
"SINK.sink-1":{
"Type":"SINK",
"ConnectionClosedCount":"0",	//下一阶段或存储系统关闭的连接数量(如在HDFS中关闭一个文件)
"EventDrainSuccessCount":"28286",	//sink成功写出到存储的事件总数量
"KafkaEventSendTimer":"482493",
"BatchCompleteCount":"0",		//与最大批量尺寸相等的批量的数量
"ConnectionFailedCount":"0",	//下一阶段或存储系统由于错误关闭的连接数量(如HDFS上一个新创建的文件因为超时而关闭)
"EventDrainAttemptCount":"0",	//sink尝试写出到存储的事件总数量
"ConnectionCreatedCount":"0",	//下一个阶段或存储系统创建的连接数量(如HDFS创建一个新文件)
"BatchEmptyCount":"0",		//空的批量的数量,如果数量很大表示souce写数据比sink清理数据慢速度慢很多
"StopTime":"0",
"RollbackCount":"9",			//
"StartTime":"1442566411897",
"BatchUnderflowCount":"0"		//比sink配置使用的最大批量尺寸更小的批量的数量,如果该值很高也表示sink比souce更快
}
}

Flume也可发送度量信息给Ganglia,用来监控Flume。在任何时候只能启用一个Ganglia或HTTP监控。Flume默认一分钟一次周期性的向Ganglia报告度量:

1
2
3
45
6
7

demo:

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1
-Dflume.monitoring.type=ganglia  # 默认情况下flume以Ganglia3.1格式报告指标
-Dflume.monitoring.pollFrequency=45 # 报告间隔时间(秒)
-Dflume.monitoring.isGanglia3=true # 启用ganglia3个格式报告
-Dflume.root.logger=INFO,console

日志相关

1
2
3
4

$ bin/flume-ng agent --conf conf --conf-file example.conf  --name a1 -Dflume.root.logger=INFO,console

# -Dflume.root.logger=INFO,console 该参数将会把flume的日志输出到console,为了将其输出到日志文件(默认
在$FLUME_HOME/logs),可以将console改为LOGFILE形式,具体的配置可以修改$FLUME_HOME/conf/log4j.properties

自定义Flume组件

Flume本身可插拔的架构设计,使得开发自定义插件变得很容易。Flume本身提供了非常丰富的source、channel、sink以及拦截器等插件可供选择,基本可以满足生产需要。具体可以参考Flume用户文档.

plugins.d目录

plugins.d
是flume事先约定的存放自定义组件的目录。flume在启动的时候会自动将该目录下的文件添加到classpath下,当然你也可以在flume-ng 启动时通过指定
--classpath,-C <cp>
参数将自己的文件手动添加到classpath下。
相关目录说明:

1
2
3

plugins.d/xxx/lib - 插件jar
plugins.d/xxx/libext - 插件依赖jar
plugins.d/xxx/native - 本地库文件如 .so文件

拦截器

拦截器(Interceptor)是简单插件式组件,设置在Source和Source写入数据的Channel之间。Source接收到的事件在写入对应的Channel之前,拦截器都可以转换或删除这些事件。每个拦截器实例只处理同一个Source接收的事件。拦截器可以基于任意标准删除或转换事件,但是拦截器必须返回尽可能多(尽可能少)的事件,如同原始传递过来的事件.因为拦截器必须在事件写入Channel之前完成操作,只有当拦截器已成功转换事件后,RPC Source(和任何其他可能产生超时的Source)才会响应发送事件的客户端或Sink。因此尽量不要在拦截器中做大量耗时的处理操作。如果不得已这么处理了,那么需要相应的调整超时时间属性。Flume自身提供了多种类型的拦截器,比如:时间戳拦截器主机拦截器正则过滤拦截器等等。更多内容可以参考Flume Interceptors

拦截器一般用于分析事件以及在需要的时候丢弃事件。编写拦截器时,实现者只需要写以一个实现Interceptor接口的类,同时实现Interceptor$Builder接口的Builer类。所有的Builder类必须有一个公共无参的构造方法,Flume使用该方法来进行实例化。可以使用传递到Builder类的Context实例配置拦截器。所有需要的参数都要传递到Context实例。下面是时间戳拦截器的实现:

1
2
3
45
6
7
8
9
10
1112
13
14
15
16
17
18
19
20
2122
23
24
25
26
27
28
29
30
3132
33
34
35
36
37
38
39
40
4142
43
44
45
46
47
48
49
50
5152
53
54
55
56
57
58
59
60
6162
63
64
65
66
67
68
69
70
7172
73
74
75
76

public class TimestampInterceptor implements Interceptor {

private final boolean preserveExisting;

/**
* 该构造方法只能被Builder调用
*/
private TimestampInterceptor(boolean preserveExisting) {
this.preserveExisting = preserveExisting;
}

@Override
public void initialize() {
// no-op
}

/**
* Modifies events in-place.
*/
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
if (preserveExisting && headers.containsKey(TIMESTAMP)) {
// we must preserve the existing timestamp
} else {
long now = System.currentTimeMillis();
headers.put(TIMESTAMP, Long.toString(now));
}
return event;
}

/**
* Delegates to {@link #intercept(Event)} in a loop.
* @param events
* @return
*/
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}

@Override
public void close() {
// no-op
}

/**
* Builder which builds new instances of the TimestampInterceptor.
*/
public static class Builder implements Interceptor.Builder {

private boolean preserveExisting = PRESERVE_DFLT;

@Override
public Interceptor build() {
return new TimestampInterceptor(preserveExisting);
}

//通过Context传递配置参数
@Override
public void configure(Context context) {
preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
}

}

public static class Constants {
public static String TIMESTAMP = "timestamp";
public static String PRESERVE = "preserveExisting";
public static boolean PRESERVE_DFLT = false;
}

}

注:

intercept()的两个方法必须是线程安全的,因为如果source运行在多线程情况下,这些方法可能会被多个线程调用。

自定义拦截器的配置方式,interceptors type配置的是
XXXInterceptor$Builder
:

1
2
3

#自定义拦截器  --producer agent名称  --src-1 source名称   —-i1 拦截器名称
producer.sources.src-1.interceptors = i1
producer.sources.src-1.interceptors.i1.type = com.networkbench.browser.flume.interceptor.MyBrowserInterceptor$Builder

将自定义代码打包放置到前面的
plugins.d/ext-interceptors(可以自己命名)/lib
目录下,启动flume时会自动加载该jar到classpath

解析器

Source使用嵌入式的反序列化器读取监控目录下的文件(这里以Spooling Directory Source为例),默认的反序列化器是LineDeserializer。该反序列化器会按行读取文件中的内容,封装成一个Event消息。默认一次读取的最大长度是2048个字符,你可以通过如下配置参数设置改值:

1
2

# --producer agent名称  --src-1 source名称
producer.sources.src-1.deserializer.maxLineLength = 20480

因此在使用LineDeserializer时对源文件内容有个粗略的估计,否则,当某行的内容超出最大长度时。该行内容会被截取成两个部分,封装成两个Event发送到channel中。这样,在某些场景下该行消息相当于非法消息了。如,某个文件按行记录一个http请求的所有内容,而事先我们无法预知一行http请求的最大长度(当然理论上你可以将maxLineLength设置成一个较大的值,解决该问题)。但是这里要说的是另外一种解决方案,很简单,参考LineDeserializer实现一个不限制最大长度的解析器(flume之所以这么设计是出于什么角度考虑?)。反序列化器的定义和前面的拦截器基本相同:

1
2
3
45
6
7
8
9
10
1112
13
14
15
16
17
18
19
20
2122
23
24
25
26
27
28
29
30
3132
33
34
35
36
37
38
39
40
4142
43
44
45
46
47
48
49
50
5152
53
54
55
56
57
58
59
60
6162
63
64
65
66
67
68
69
70
7172
73
74
75
76

public class LineDeserializer implements EventDeserializer {

private static final Logger logger = LoggerFactory.getLogger(LineDeserializer.class);

private final ResettableInputStream in;
private final Charset outputCharset;
private final int maxLineLength;
private volatile boolean isOpen;

public static final String OUT_CHARSET_KEY = "outputCharset";
public static final String CHARSET_DFLT = "UTF-8";

public static final String MAXLINE_KEY = "maxLineLength";
public static final int MAXLINE_DFLT = 2048;

LineDeserializer(Context context, ResettableInputStream in) {
this.in = in;
this.outputCharset = Charset.forName(context.getString(OUT_CHARSET_KEY, CHARSET_DFLT));
this.maxLineLength = context.getInteger(MAXLINE_KEY, MAXLINE_DFLT);
this.isOpen = true;
}

@Override
public Event readEvent() throws IOException {
ensureOpen();
String line = readLine();
if (line == null) {
return null;
} else {
return EventBuilder.withBody(line, outputCharset);
}
}

...

// TODO: consider not returning a final character that is a high surrogate
// when truncating
private String readLine() throws IOException {
StringBuilder sb = new StringBuilder();
int c;
int readChars = 0;
while ((c = in.readChar()) != -1) {
readChars++;

// FIXME: support \r\n
if (c == '\n') {
break;
}

sb.append((char)c);

// 限制最大长度
if (readChars >= maxLineLength) {
logger.warn("Line length exceeds max ({}), truncating line!", maxLineLength);
break;
}
}

if (readChars > 0) {
return sb.toString();
} else {
return null;
}
}

//这里和Interceptor$Builder很像
public static class Builder implements EventDeserializer.Builder {

@Override
public EventDeserializer build(Context context, ResettableInputStream in) {
return new LineDeserializer(context, in);
}

}

}

接下来的步骤和拦截器一致

1
2
3

#自定义解析器
producer.sources.src-1.deserializers = d1
producer.sources.src-1.deserializer = com.networkbench.browser.flume.interceptor.MyLineDeserializer$Builder

source

Flume提供了丰富的source类型,如Avro SourceExec SourceSpooling Directory Source .
这里要说的是实际使用过程中遇到的一个问题。还是前面记录http请求内容的场景,为了及时分析http请求的数据,我们将记录http请求的原始文件按照分钟进行切割,然后移动到spooling directory监控目录(如/tmp-logs)下。但是由于一些原因,会出现监控目录下文件重名的情况.

1
2

/tmp-logs/access_2015_10_01_16_30.log.COMPLETED   #flume处理完的文件会自动进行重命名.COMPLETED
/tmp-logs/access_2015_10_01_16_30.log		 #刚进来的文件

这种情况下后进来的access_2015_10_01_16_30.log,在flume读取完成后会对其进行重命名,但是该文件名已经被占用了,flume就会抛出如下的异常信息,停止处理该监控目录下的其他文件。

1
2
3
45
6
7
8
9
10
1112
13
14

25 九月 2015 16:48:59,228 INFO  [pool-22-thread-1] (org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile:348)  - Preparing to move file /opt/nginx/tmp_logs/access-2015-09-25-13-51.log to /opt/nginx/tmp_logs/access-2015-09-25-13-51.log.COMPLETED
25 九月 2015 16:48:59,229 ERROR [pool-22-thread-1] (org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:256)  - FATAL: Spool Directory source src-1: { spoolDir: /opt/nginx/tmp_logs }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.lang.IllegalStateException: File name has been re-used with different files. Spooling assumptions violated for /opt/nginx/tmp_logs/access-2015-09-25-13-51.log.COMPLETED
at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:378)
at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile(ReliableSpoolingFileEventReader.java:330)
at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:259)
at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:228)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

跟踪抛出异常的源码,SpoolDirectorySource会启动一个线程轮询监控目录下的目标文件,当读取完该文件(readEvents)之后会对该文件进行重名(rollCurrentFile),当重命名失败时会抛出IllegalStateException,被SpoolDirectoryRunnable catch重新抛出RuntimeException,导致当前线程退出,从源码看SpoolDirectoryRunnable是单线程执行的,因此线程结束后,监控目录下其他文件不再被处理:

1
2
3
45
6
7
8
9
10
1112
13
14
15
16
17
18
19
20
2122
23
24
25
26
27
28
29
30
3132
33
34
35
36
37
38
39
40
4142
43
44
45
46
47
48
49
50
5152
53
54
55
56
57
58
59
60
6162
63
64
65
66
67
68
69
70
7172
73
74
75
76
77
78
79
80
8182
83
84
85
86
87
88
89
90
9192
93
94
95
96
97
98
99
100
101102
103
104
105
106
107
108
109
110
111112
113
114
115
116

# SpoolDirectorySource 启动SpoolDirectoryRunnable

executor = Executors.newSingleThreadScheduledExecutor();

File directory = new File(spoolDirectory);
...

Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
//默认500毫秒
executor.scheduleWithFixedDelay(runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);
...

# SpoolDirectorySource$SpoolDirectoryRunnable.run():

@Override
public void run() {
int backoffInterval = 250;
try {
while (!Thread.interrupted()) {
//这里读取文件内容,当该文件没有可读内容时,会调用ReliableSpoolingFileEventReader.retireCurrentFile()->ReliableSpoolingFileEventReader.rollCurrentFile()
List<Event> events = reader.readEvents(batchSize);
if (events.isEmpty()) {
break;
}
sourceCounter.addToEventReceivedCount(events.size());
sourceCounter.incrementAppendBatchReceivedCount();

try {
getChannelProcessor().processEventBatch(events);
reader.commit();
} catch (ChannelException ex) {
logger.warn("The channel is full, and cannot write data now. The " +
"source will try again after " + String.valueOf(backoffInterval) +
" milliseconds");
hitChannelException = true;
if (backoff) {
TimeUnit.MILLISECONDS.sleep(backoffInterval);
backoffInterval = backoffInterval << 1;
backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
backoffInterval;
}
continue;
}
backoffInterval = 250;
sourceCounter.addToEventAcceptedCount(events.size());
sourceCounter.incrementAppendBatchAcceptedCount();
}
} catch (Throwable t) {
//异常输出部分
logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " +
"Uncaught exception in SpoolDirectorySource thread. " +
"Restart or reconfigure Flume to continue processing.", t);
hasFatalError = true;
//这里将该异常重新封装成了RuntimeException,导致当前线程退出
Throwables.propagate(t);
}
}

# ReliableSpoolingFileEventReader.rollCurrentFile():

private void rollCurrentFile(File fileToRoll) throws IOException {

File dest = new File(fileToRoll.getPath() + completedSuffix);
logger.info("Preparing to move file {} to {}", fileToRoll, dest);

// Before renaming, check whether destination file name exists
if (dest.exists() && PlatformDetect.isWindows()) {
/*
* If we are here, it means the completed file already exists. In almost
* every case this means the user is violating an assumption of Flume
* (that log files are placed in the spooling directory with unique
* names). However, there is a corner case on Windows systems where the
* file was already rolled but the rename was not atomic. If that seems
* likely, we let it pass with only a warning.
*/
if (Files.equal(currentFile.get().getFile(), dest)) {
logger.warn("Completed file " + dest +
" already exists, but files match, so continuing.");
boolean deleted = fileToRoll.delete();
if (!deleted) {
logger.error("Unable to delete file " + fileToRoll.getAbsolutePath() +
". It will likely be ingested another time.");
}
} else {
String message = "File name has been re-used with different" +
" files. Spooling assumptions violated for " + dest;
throw new IllegalStateException(message);
}

// Dest file exists and not on windows
} else if (dest.exists()) {
//这里抛出目标文件已经存在的异常
String message = "File name has been re-used with different" +
" files. Spooling assumptions violated for " + dest;
throw new IllegalStateException(message);

// Destination file does not already exist. We are good to go!
} else {
boolean renamed = fileToRoll.renameTo(dest);
if (renamed) {
logger.debug("Successfully rolled file {} to {}", fileToRoll, dest);

// now we no longer need the meta file
deleteMetaFile();
} else {
/* If we are here then the file cannot be renamed for a reason other
* than that the destination file exists (actually, that remains
* possible w/ small probability due to TOC-TOU conditions).*/
String message = "Unable to move " + fileToRoll + " to " + dest +
". This will likely cause duplicate events. Please verify that " +
"flume has sufficient permissions to perform these operations.";
throw new FlumeException(message);      }    }  }

现在基本清楚了异常栈的调用逻辑,那么和前面自定义解析器一样,我们可以重写ReliableSpoolingFileEventReader以及SpoolDirectorySource的相关实现,也就是自定义一个spooling source,在rollCurrentFile()重命名失败时,做些处理措施,比如将该文件重新命名为access_2015_10_01_16_30.log(2).COMPLETED(此时文件内容已经读取完毕了)继续处理(注意要是.COMPLETED结尾,不然flume会再次读取该文件)。

改写完成之后,就和前面自定义解析器的处理步骤一样了,打包放在plugins.d目录下,配置:

1

producer.sources.src-1.type = com.networkbench.flume.source.SpoolDirectoryExtSource

总结

基本上flume的各种组件都可以自定义开发,本人使用flume时间也没多久,截止到目前为止遇到问题还有以下几个:

消息重发

这个坑其实是自己挖的,当时想当然的理解flume的配置参数#producer.sinks.sink-1.requiredAcks = 1(默认是1),我设置成了10,当时使用的kafka sink,由于某个kafka节点出现了问题(还没有仔细验证,是否kafka正常时也会出现该问题?),导致flume一直重发某个时间点的数据,而最新的数据一直被阻塞(可能是被缓存在了channel中)。导致后台接收的一直是某个时间点的消息。后台想到自己改动的这个参数,改回1之后就正常了。下面是官方文档对该参数的说明:


requiredAcks 1 (默认值) How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure.


channel溢出

chanel溢出是因为前面的消息重发导致的,当时使用的channle是File Channel,其中有几个配置项值得注意:

配置项默认值说明
transactionCapacity10000单个事务中可以写入或读取的事务的最大数量
maxFileSize2146435071每个数据文件的最大大小(字节),一旦文件达到这个大小(或一旦写入下个文件达到这个大小),该文件保存关闭并在那个目录下创建一个新的数据文件。如果此值设置为高于默认值,仍以默认值为准
minimumRequiredSpace524288000channel继续操作时每个卷所需的最少空间(字节),如果任何一个挂载数据目录的卷只有这么多空间剩余,channel将停止操作来防止损坏和避免不完整的数据被写入
capacity1000000channel可以保存的提交事件的最大数量
keep-alive3每次写入或读取应该等待完成的最大的时间周期(秒)
前面的channel溢出推测就是由capacity的达到了限制造成的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: