您的位置:首页 > 其它

Flume前述(二)--功能配置

2015-09-15 22:13 344 查看

三、功能配置示例

单节点Flume设置

# example.conf: A single-node Flume configuration

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = ping localhost

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

将上述配置存为:example.conf,然后我们就可以启动 Flume 了:

flume-ng agent -n a1 -f example.conf -Dflume.root.logger=INFO,console

-Dflume.root.logger=INFO,console 仅为 debug 使用,请勿生产环境生搬硬套,否则大量的日志会返回到终端。

* -c/–conf 后跟配置目录;

* -f/–conf-file 后跟具体的配置文件;

* -n/–name 指定agent的名称;

然后我们再开一个 shell 终端窗口,执行 ping localhost,就可以发消息看到效果了;

Flume 终端窗口此时会打印出如下信息,就表示成功了:

INFO ({SinkRunner-PollingRunner-DefaultSinkProcessor} LoggerSink.java[process]:70) [2015-09-15 09:25:08,457] - Event: { headers:{} body: 36 34 20 62 79 74 65 73 20 66 72 6F 6D 20 6C 6F 64 bytes from lo }

INFO ({SinkRunner-PollingRunner-DefaultSinkProcessor} LoggerSink.java[process]:70) [2015-09-15 09:25:08,457] - Event: { headers:{} body: 36 34 20 62 79 74 65 73 20 66 72 6F 6D 20 6C 6F 64 bytes from lo }

至此,咱们的第一个 Flume Agent 算是部署成功了!

单节点 Flume 直接写入 HDFS

# Define a memory channel called ch1 on agent1

agent1.channels.ch1.type = memory

agent1.channels.ch1.capacity = 100000

agent1.channels.ch1.transactionCapacity = 100000

agent1.channels.ch1.keep-alive = 30

#define source monitor a file

agent1.sources.avro-source1.type = exec

agent1.sources.avro-source1.shell = /bin/bash -c

agent1.sources.avro-source1.command = tail -n +0 -F /home/lkm/tmp/id.txt

agent1.sources.avro-source1.channels = ch1

agent1.sources.avro-source1.threads = 5

# Define a logger sink that simply logs all events it receives

# and connect it to the other end of the same channel.

agent1.sinks.log-sink1.channel = ch1

agent1.sinks.log-sink1.type = hdfs

agent1.sinks.log-sink1.hdfs.path = hdfs://nameservice1/flume

agent1.sinks.log-sink1.hdfs.writeFormat = Text

agent1.sinks.log-sink1.hdfs.fileType = DataStream

agent1.sinks.log-sink1.hdfs.rollInterval = 0

agent1.sinks.log-sink1.hdfs.rollSize = 1000000

agent1.sinks.log-sink1.hdfs.rollCount = 0

agent1.sinks.log-sink1.hdfs.batchSize = 1000

agent1.sinks.log-sink1.hdfs.txnEventMax = 1000

agent1.sinks.log-sink1.hdfs.callTimeout = 60000

agent1.sinks.log-sink1.hdfs.appendTimeout = 60000

启动如下命令,就可以在 hdfs 上看到效果了。

flume-ng agent -n agent1 -f agent1.conf -Dflume.root.logger=INFO,console

PS:实际环境中有这样的需求,通过在多个agent端tail日志,发送给channel,channel再把数据收集,统一发送给HDFS存储起来,当HDFS文件大小超过一定的大小或者超过在规定的时间间隔会生成一个文件。

Flume 实现了两个Trigger,分别为SizeTriger(在调用HDFS输出流写的同时,count该流已经写入的大小总和,若超过一定大小,则创建新的文件和输出流,写入操作指向新的输出流,同时close以前的输出流)和TimeTriger(开启定时器,当到达该点时,自动创建新的文件和输出流,新的写入重定向到该流中,同时close以前的输出流)。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: