您的位置:首页 > 其它

flume整合kafka

2016-02-27 11:09 357 查看
# Please paste flume.conf here. Example:
# Sources, channels, and sinks are defined per
# agent name, in this case 'tier1'.
tier1.sources  = source1
tier1.channels = channel1
tier1.sinks    = sink1

# For each source, channel, and sink, set
# standard properties.
tier1.sources.source1.type     = syslogtcp
tier1.sources.source1.bind     = 127.0.0.1
tier1.sources.source1.port     = 9999
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type   = memory

tier1.sinks.sink1.channel      = channel1
tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.topic = ggz
tier1.sinks.sink1.brokerList = ha1:9092
tier1.sinks.sink1.requiredAcks = 1
tier1.sinks.sink1.batchSize = 20

# Other properties are specific to each type of
# source, channel, or sink. In this case, we
# specify the capacity of the memory channel.

tier1.channels.channel1.capacity = 100


测试:

生产者:nc ha1 9999

消费者:kafka-console-consumer –zookeeper ha0 –topic ggz –from-beginning
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: