selector示例
2016-04-13 15:37
316 查看
11、复制:同一份数据,发送给两个sink
1个source、2个channel、2个sink
case12_replicate_sink.conf
#配置内容
#name the component on this agent
a1.sources = r1
a1.channels= c1 c2
a1.sinks = k1 k2
#describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.host= localhost
a1.sources.r1.port = 5140
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2
#decribe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname= 192.168.0.106
a1.sinks.k1.port = 4443
a1.sinks.k1.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname= 192.168.0.107
a1.sinks.k2.port = 4443
a1.sinks.k2.channel = c1
#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
case12_replicate_s1.conf
#配置内容
#name the component on this agent
a1.sources = r1
a1.channels= c1
a1.sinks = k1
#describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.0.105
a1.sources.r1.port = 4443
a1.sources.r1.channels = c1
#decribe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
case12_replicate_s2.conf
#配置内容
#name the component on this agent
a1.sources = r1
a1.channels= c1
a1.sinks = k1
#describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.0.105
a1.sources.r1.port = 4443
a1.sources.r1.channels = c1
#decribe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
12、复用:
case13_multi_sink.conf、case13_multi_s1.conf、case13_multi_s2.conf
#配置内容
#name the component on this agent
a1.sources = r1
a1.channels= c1 c2
a1.sinks = k1 k2
#describe/configure the source
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.host= localhost
a1.sources.r1.port = 5140
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.header= state
a1.sources.r1.selector.mapping.CZ= c1
a1.sources.r1.selector.mapping.US= c2
a1.sources.r1.selector.default= c1
#decribe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname= 192.168.0.106
a1.sinks.k1.port = 4443
a1.sinks.k1.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname= 192.168.0.107
a1.sinks.k2.port = 4443
a1.sinks.k2.channel = c1
#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
case13_multi_s1.conf
#配置内容
#name the component on this agent
a1.sources = r1
a1.channels= c1
a1.sinks = k1
#describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.0.105
a1.sources.r1.port = 4443
a1.sources.r1.channels = c1
#decribe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
case13_multi_s2.conf
#配置内容
#name the component on this agent
a1.sources = r1
a1.channels= c1
a1.sinks = k1
#describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.0.105
a1.sources.r1.port = 4443
a1.sources.r1.channels = c1
#decribe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
测试 在source端
curl -X POST -d '[{"headers": {"state":"CZ","datanode":"xx2"},"body":"really_random_body"}]' http://localhost:5140
curl -X POST -d '[{"headers": {"state":"US","datanode":"xx1"},"body":"really_random_body"}]' http://localhost:5140 curl -X POST -d '[{"headers": {"state":"SH","datanode":"xx3"},"body":"really_random_body"}]' http://localhost:5140
1个source、2个channel、2个sink
case12_replicate_sink.conf
#配置内容
#name the component on this agent
a1.sources = r1
a1.channels= c1 c2
a1.sinks = k1 k2
#describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.host= localhost
a1.sources.r1.port = 5140
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2
#decribe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname= 192.168.0.106
a1.sinks.k1.port = 4443
a1.sinks.k1.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname= 192.168.0.107
a1.sinks.k2.port = 4443
a1.sinks.k2.channel = c1
#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
case12_replicate_s1.conf
#配置内容
#name the component on this agent
a1.sources = r1
a1.channels= c1
a1.sinks = k1
#describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.0.105
a1.sources.r1.port = 4443
a1.sources.r1.channels = c1
#decribe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
case12_replicate_s2.conf
#配置内容
#name the component on this agent
a1.sources = r1
a1.channels= c1
a1.sinks = k1
#describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.0.105
a1.sources.r1.port = 4443
a1.sources.r1.channels = c1
#decribe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
12、复用:
case13_multi_sink.conf、case13_multi_s1.conf、case13_multi_s2.conf
#配置内容
#name the component on this agent
a1.sources = r1
a1.channels= c1 c2
a1.sinks = k1 k2
#describe/configure the source
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.host= localhost
a1.sources.r1.port = 5140
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.header= state
a1.sources.r1.selector.mapping.CZ= c1
a1.sources.r1.selector.mapping.US= c2
a1.sources.r1.selector.default= c1
#decribe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname= 192.168.0.106
a1.sinks.k1.port = 4443
a1.sinks.k1.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname= 192.168.0.107
a1.sinks.k2.port = 4443
a1.sinks.k2.channel = c1
#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
case13_multi_s1.conf
#配置内容
#name the component on this agent
a1.sources = r1
a1.channels= c1
a1.sinks = k1
#describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.0.105
a1.sources.r1.port = 4443
a1.sources.r1.channels = c1
#decribe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
case13_multi_s2.conf
#配置内容
#name the component on this agent
a1.sources = r1
a1.channels= c1
a1.sinks = k1
#describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.0.105
a1.sources.r1.port = 4443
a1.sources.r1.channels = c1
#decribe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
测试 在source端
curl -X POST -d '[{"headers": {"state":"CZ","datanode":"xx2"},"body":"really_random_body"}]' http://localhost:5140
curl -X POST -d '[{"headers": {"state":"US","datanode":"xx1"},"body":"really_random_body"}]' http://localhost:5140 curl -X POST -d '[{"headers": {"state":"SH","datanode":"xx3"},"body":"really_random_body"}]' http://localhost:5140
相关文章推荐
- processor示例
- NSNotificationCenter 的使用详解
- sql 关于dblink和多条update、insert事务回滚写法
- 053(十二)
- hibernate.cfg.xml的参数详解
- ToolBar
- 手把手教会popupWindow从下往上弹出效果的实现
- java模拟PHP的pack和unpack类
- Java 输入
- unity 得到js的Json,类似\u624b\u8868这种转换为汉字的方法
- RabbitMQ入门(五) —— vhost
- 服务端开发团队人员构成
- 大型网站架构学习笔记
- JSP关于Frameset的简单用法
- 4.1 最大子数组问题(分治法)-NlogN
- 在Ubuntu下安装Python
- BPFP系列:优化电池使用时间:根据需要操作广播接收器
- 175.Which two statements are true regarding savepoints? (Choose two.)
- sink示例
- 第06篇 MyEclipse 2016 安装/破解