您的位置:首页 > 其它


2015-09-15 22:13 344 查看



# example.conf: A single-node Flume configuration

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = ping localhost

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

将上述配置存为:example.conf,然后我们就可以启动 Flume 了:

flume-ng agent -n a1 -f example.conf -Dflume.root.logger=INFO,console

-Dflume.root.logger=INFO,console 仅为 debug 使用,请勿生产环境生搬硬套,否则大量的日志会返回到终端。

* -c/–conf 后跟配置目录;

* -f/–conf-file 后跟具体的配置文件;

* -n/–name 指定agent的名称;

然后我们再开一个 shell 终端窗口,执行 ping localhost,就可以发消息看到效果了;

Flume 终端窗口此时会打印出如下信息,就表示成功了:

INFO ({SinkRunner-PollingRunner-DefaultSinkProcessor} LoggerSink.java[process]:70) [2015-09-15 09:25:08,457] - Event: { headers:{} body: 36 34 20 62 79 74 65 73 20 66 72 6F 6D 20 6C 6F 64 bytes from lo }

INFO ({SinkRunner-PollingRunner-DefaultSinkProcessor} LoggerSink.java[process]:70) [2015-09-15 09:25:08,457] - Event: { headers:{} body: 36 34 20 62 79 74 65 73 20 66 72 6F 6D 20 6C 6F 64 bytes from lo }

至此,咱们的第一个 Flume Agent 算是部署成功了!

单节点 Flume 直接写入 HDFS

# Define a memory channel called ch1 on agent1

agent1.channels.ch1.type = memory

agent1.channels.ch1.capacity = 100000

agent1.channels.ch1.transactionCapacity = 100000

agent1.channels.ch1.keep-alive = 30

#define source monitor a file

agent1.sources.avro-source1.type = exec

agent1.sources.avro-source1.shell = /bin/bash -c

agent1.sources.avro-source1.command = tail -n +0 -F /home/lkm/tmp/id.txt

agent1.sources.avro-source1.channels = ch1

agent1.sources.avro-source1.threads = 5

# Define a logger sink that simply logs all events it receives

# and connect it to the other end of the same channel.

agent1.sinks.log-sink1.channel = ch1

agent1.sinks.log-sink1.type = hdfs

agent1.sinks.log-sink1.hdfs.path = hdfs://nameservice1/flume

agent1.sinks.log-sink1.hdfs.writeFormat = Text

agent1.sinks.log-sink1.hdfs.fileType = DataStream

agent1.sinks.log-sink1.hdfs.rollInterval = 0

agent1.sinks.log-sink1.hdfs.rollSize = 1000000

agent1.sinks.log-sink1.hdfs.rollCount = 0

agent1.sinks.log-sink1.hdfs.batchSize = 1000

agent1.sinks.log-sink1.hdfs.txnEventMax = 1000

agent1.sinks.log-sink1.hdfs.callTimeout = 60000

agent1.sinks.log-sink1.hdfs.appendTimeout = 60000

启动如下命令,就可以在 hdfs 上看到效果了。

flume-ng agent -n agent1 -f agent1.conf -Dflume.root.logger=INFO,console


Flume 实现了两个Trigger,分别为SizeTriger(在调用HDFS输出流写的同时,count该流已经写入的大小总和,若超过一定大小,则创建新的文件和输出流,写入操作指向新的输出流,同时close以前的输出流)和TimeTriger(开启定时器,当到达该点时,自动创建新的文件和输出流,新的写入重定向到该流中,同时close以前的输出流)。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息