Flume NG 学习笔记(二)单机与集群Flume 配置
2017-03-15 15:14
363 查看
下面的内容基本来自官网:http://flume.apache.org/FlumeUserGuide.html本文使用的是最新版本的apache flume 1.5,安装完Flume然后测试下Flume是否可以用,在Flume目录下用以下语句测试:bin/flume-ng agent -n$agent_name -c conf -f conf/flume-conf.properties.template结果如图显示:
![](http://img.blog.csdn.net/20141022140540140?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvbG9va2xvb2s1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
Ok,我们接下去看下面常用架构、功能配置示例
[html] view plain copy
#文件名:single_case1.conf.conf
#配置内容:
#single_case1.conf.conf: A single-node Flume configuration
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= netcat
a1.sources.r1.bind= localhost
a1.sources.r1.port= 44444
#Describe the sink
a1.sinks.k1.type= logger
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
#Bind the source and sink to the channel
a1.sources.r1.channels= c1
a1.sinks.k1.channel= c1
说明下,这里所有的例子都是将配置文件放到 $FLUME_HOME/conf 目录下,后面就不赘述了。#敲命令flume-ng agent -cconf -f conf/single_case1.conf -n a1 -Dflume.root.logger=INFO,console #参数命令-c conf 指定配置目录为conf-f conf/single_case1.conf指定配置文件为conf/single_case1.conf-n a1 指定agent名字为a1,需要与case1_example.conf中的一致-Dflume.root.logger=INFO,console指定DEBUF模式在console输出INFO信息具体参数命令请通过flume-nghelp查看 #然后在另一个终端进行测试telnet 127.0.0.1 44444
![](http://img.blog.csdn.net/20141022140643375?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvbG9va2xvb2s1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
然后会看在之前启动的终端查看console输出到如下:
![](http://img.blog.csdn.net/20141022141103885?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvbG9va2xvb2s1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
这里会发现消息hello world! 输出了,而hello world! hello world!hello world!则被拦截了。因为在配置文件中,我们选择的输出方式为:a1.sinks.k1.type= logger,即console输出,flume-ng针对logger是只显示16个字节的,剩下的都被sink截了。下面是源码
在LoggerSink.Java中:
[java] view plain copy
if(event != null) {
if (logger.isInfoEnabled()) {
logger.info("Event: " + EventHelper.dumpEvent(event));
}
}
我们去看EventHelper.java的dumpEvent方法:
[java] view plain copy
privatestatic final int DEFAULT_MAX_BYTES = 16;
publicstatic String dumpEvent(Event event) {
return dumpEvent(event, DEFAULT_MAX_BYTES);
}
publicstatic String dumpEvent(Event event, int maxBytes) {
StringBuilder buffer = new StringBuilder();
if (event == null || event.getBody() == null) {
buffer.append("null");
} else if (event.getBody().length == 0) {
// do nothing... in this case, HexDump.dump() will throw anexception
} else {
byte[] body = event.getBody();
byte[] data = Arrays.copyOf(body, Math.min(body.length,maxBytes));
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
HexDump.dump(data, 0, out, 0);
String hexDump = new String(out.toByteArray());
// remove offset since it's not relevant for such a smalldataset
if(hexDump.startsWith(HEXDUMP_OFFSET)) {
hexDump =hexDump.substring(HEXDUMP_OFFSET.length());
}
buffer.append(hexDump);
} catch (Exception e) {
if(LOGGER.isInfoEnabled()) {
LOGGER.info("Exception while dumpingevent", e);
}
buffer.append("...Exception while dumping:").append(e.getMessage());
}
String result = buffer.toString();
if(result.endsWith(EOL) && buffer.length() >EOL.length()) {
buffer.delete(buffer.length() - EOL.length(),buffer.length()).toString();
}
}
return "{ headers:" + event.getHeaders() + " body:"+ buffer + " }";
}
不难看出,在event处理过程中,发生了数据截取操作。Ok,进入下一个环节。![](http://img.blog.csdn.net/20141022141145881?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvbG9va2xvb2s1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
这里集群的概念是多台机器的管理,最简单的就是两台机器一台代理主机从数据源获取数据,然后将数据在传送到另一台主机上,进行输出。这样做的意义是,一个业务多数据源的时候,我们可以对每个数据源设置代理,然后将它们汇总到一台代理主机上进行输出。下面实现最简单的集群配置,即两个代理,一台接受数据源数据的代理将数据推送到汇总的代理,而汇总的代理再将数据输出。因此这两台主机分别是push,pull根据上图需要用AVRO RPC通信,因此推数据sinks类型与拉数据的sources的类型都是avro 。而拉数据代理的数据源,我们用前文讲的Spool Source 形式来处理,这里我们预先建好目录与文件,test.log
![](http://img.blog.csdn.net/20141022140833562?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvbG9va2xvb2s1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
下面设置推代理主机的flume配置文件:
[html] view plain copy
#推数据代理的配置文件push.conf
#Name the components on this agent
a2.sources= r1
a2.sinks= k1
a2.channels= c1
#Describe/configure the source
a2.sources.r1.type= spooldir
a2.sources.r1.spoolDir= /tmp/logs
a2.sources.r1.channels= c1
#Use a channel which buffers events in memory
a2.channels.c1.type= memory
a2.channels.c1.keep-alive= 10
a2.channels.c1.capacity= 100000
a2.channels.c1.transactionCapacity= 100000
#Describe/configure the source
a2.sinks.k1.type= avro
a2.sinks.k1.channel= c1
a2.sinks.k1.hostname= pull
a2.sinks.k1.port= 4444
下面设置汇总代理主机的flume配置文件:
[html] view plain copy
#汇总数据代理的配置文件pull.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= avro
a1.sources.r1.channels= c1
a1.sources.r1.bind= pull
a1.sources.r1.port= 44444
#Describe the sink
a1.sinks.k1.type= logger
a1.sinks.k1.channel = c1
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.keep-alive= 10
a1.channels.c1.capacity= 100000
a1.channels.c1.transactionCapacity= 100000
虽然Spool Source是非实时的,但由于数据量少,处理还是很快的,因此我们只能先启动pull代理。#敲命令flume-ng agent -c conf -f conf/pull.conf -n a1 -Dflume.root.logger=INFO,console
![](http://img.blog.csdn.net/20141022141334426?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvbG9va2xvb2s1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
上图显示成功。先后去启动push主机的flume#敲命令flume-ng agent -n a2 -c conf -f conf/push.conf -Dflume.root.logger=INFO,console
![](http://img.blog.csdn.net/20141022141247064?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvbG9va2xvb2s1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
查看pull主机的状态,发现数据已经传过来了。然后会过去看push主机的文件
![](http://img.blog.csdn.net/20141022141420945?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvbG9va2xvb2s1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
已经加上后缀名.COMPLETED。这与前文说的是一致的。 下面只要将新数据存入到目录/tmp/logs,push主机就会将数据发送到pull主机输出,并修改新数据文件的文件名。
Ok,我们接下去看下面常用架构、功能配置示例
一、最简单的单一代理Flume 配置
下面是配置文件:[html] view plain copy
#文件名:single_case1.conf.conf
#配置内容:
#single_case1.conf.conf: A single-node Flume configuration
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= netcat
a1.sources.r1.bind= localhost
a1.sources.r1.port= 44444
#Describe the sink
a1.sinks.k1.type= logger
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
#Bind the source and sink to the channel
a1.sources.r1.channels= c1
a1.sinks.k1.channel= c1
说明下,这里所有的例子都是将配置文件放到 $FLUME_HOME/conf 目录下,后面就不赘述了。#敲命令flume-ng agent -cconf -f conf/single_case1.conf -n a1 -Dflume.root.logger=INFO,console #参数命令-c conf 指定配置目录为conf-f conf/single_case1.conf指定配置文件为conf/single_case1.conf-n a1 指定agent名字为a1,需要与case1_example.conf中的一致-Dflume.root.logger=INFO,console指定DEBUF模式在console输出INFO信息具体参数命令请通过flume-nghelp查看 #然后在另一个终端进行测试telnet 127.0.0.1 44444
然后会看在之前启动的终端查看console输出到如下:
这里会发现消息hello world! 输出了,而hello world! hello world!hello world!则被拦截了。因为在配置文件中,我们选择的输出方式为:a1.sinks.k1.type= logger,即console输出,flume-ng针对logger是只显示16个字节的,剩下的都被sink截了。下面是源码
在LoggerSink.Java中:
[java] view plain copy
if(event != null) {
if (logger.isInfoEnabled()) {
logger.info("Event: " + EventHelper.dumpEvent(event));
}
}
我们去看EventHelper.java的dumpEvent方法:
[java] view plain copy
privatestatic final int DEFAULT_MAX_BYTES = 16;
publicstatic String dumpEvent(Event event) {
return dumpEvent(event, DEFAULT_MAX_BYTES);
}
publicstatic String dumpEvent(Event event, int maxBytes) {
StringBuilder buffer = new StringBuilder();
if (event == null || event.getBody() == null) {
buffer.append("null");
} else if (event.getBody().length == 0) {
// do nothing... in this case, HexDump.dump() will throw anexception
} else {
byte[] body = event.getBody();
byte[] data = Arrays.copyOf(body, Math.min(body.length,maxBytes));
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
HexDump.dump(data, 0, out, 0);
String hexDump = new String(out.toByteArray());
// remove offset since it's not relevant for such a smalldataset
if(hexDump.startsWith(HEXDUMP_OFFSET)) {
hexDump =hexDump.substring(HEXDUMP_OFFSET.length());
}
buffer.append(hexDump);
} catch (Exception e) {
if(LOGGER.isInfoEnabled()) {
LOGGER.info("Exception while dumpingevent", e);
}
buffer.append("...Exception while dumping:").append(e.getMessage());
}
String result = buffer.toString();
if(result.endsWith(EOL) && buffer.length() >EOL.length()) {
buffer.delete(buffer.length() - EOL.length(),buffer.length()).toString();
}
}
return "{ headers:" + event.getHeaders() + " body:"+ buffer + " }";
}
不难看出,在event处理过程中,发生了数据截取操作。Ok,进入下一个环节。
二、“集群”代理Flume 配置
这里集群的概念是多台机器的管理,最简单的就是两台机器一台代理主机从数据源获取数据,然后将数据在传送到另一台主机上,进行输出。这样做的意义是,一个业务多数据源的时候,我们可以对每个数据源设置代理,然后将它们汇总到一台代理主机上进行输出。下面实现最简单的集群配置,即两个代理,一台接受数据源数据的代理将数据推送到汇总的代理,而汇总的代理再将数据输出。因此这两台主机分别是push,pull根据上图需要用AVRO RPC通信,因此推数据sinks类型与拉数据的sources的类型都是avro 。而拉数据代理的数据源,我们用前文讲的Spool Source 形式来处理,这里我们预先建好目录与文件,test.log
下面设置推代理主机的flume配置文件:
[html] view plain copy
#推数据代理的配置文件push.conf
#Name the components on this agent
a2.sources= r1
a2.sinks= k1
a2.channels= c1
#Describe/configure the source
a2.sources.r1.type= spooldir
a2.sources.r1.spoolDir= /tmp/logs
a2.sources.r1.channels= c1
#Use a channel which buffers events in memory
a2.channels.c1.type= memory
a2.channels.c1.keep-alive= 10
a2.channels.c1.capacity= 100000
a2.channels.c1.transactionCapacity= 100000
#Describe/configure the source
a2.sinks.k1.type= avro
a2.sinks.k1.channel= c1
a2.sinks.k1.hostname= pull
a2.sinks.k1.port= 4444
下面设置汇总代理主机的flume配置文件:
[html] view plain copy
#汇总数据代理的配置文件pull.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= avro
a1.sources.r1.channels= c1
a1.sources.r1.bind= pull
a1.sources.r1.port= 44444
#Describe the sink
a1.sinks.k1.type= logger
a1.sinks.k1.channel = c1
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.keep-alive= 10
a1.channels.c1.capacity= 100000
a1.channels.c1.transactionCapacity= 100000
虽然Spool Source是非实时的,但由于数据量少,处理还是很快的,因此我们只能先启动pull代理。#敲命令flume-ng agent -c conf -f conf/pull.conf -n a1 -Dflume.root.logger=INFO,console
上图显示成功。先后去启动push主机的flume#敲命令flume-ng agent -n a2 -c conf -f conf/push.conf -Dflume.root.logger=INFO,console
查看pull主机的状态,发现数据已经传过来了。然后会过去看push主机的文件
已经加上后缀名.COMPLETED。这与前文说的是一致的。 下面只要将新数据存入到目录/tmp/logs,push主机就会将数据发送到pull主机输出,并修改新数据文件的文件名。
相关文章推荐
- Flume NG 学习笔记(二)单机与集群Flume 配置
- Apache2.2+tomcat-7.0.42 集群配置学习笔记(windows篇)
- hadoop学习笔记之-生产环境Hadoop大集群配置安装
- 第117讲:Hadoop集群之安装IP配置、Slaves、namenode和secondarynamenode的配置学习笔记
- Flume NG 学习笔记(三)流配置
- Spark学习笔记7-在eclipse里用scala编写spark程序(单机和集群运行)
- 学习笔记(1)——实验室集群配置
- hadoop学习笔记3 hadoop2.5.2单机模式环境配置和eclipse运行
- hadoop学习笔记七 -- hadoop集群高可用架构安装配置
- hadoop学习笔记之hadoop集群安装配置
- 学习笔记(3)——实验室集群WMS服务配置
- Flume NG 学习笔记(四)Source配置
- Hadoop学习笔记-Hadoop集群配置(最全面总结)
- Spark1.5.1学习笔记(一)Standalone集群配置
- hadoop学习笔记<二>----hadoop集群环境的配置
- 学习笔记(4)——实验室集群管理结点IP配置
- Flume NG 学习笔记(三)流配置
- Zk笔记(一):Zookeeper的两种安装和配置(Windows):单机模式与集群模式
- hadoop学习笔记之安装配置和单机运行
- ZooKeeper3.4.6学习笔记(一)集群配置