您的位置:首页 > 其它

flume agent arvo传数据

2016-02-24 22:06 639 查看
flume agent之间的传导,目前我只知道通过arvo的方式,希望有高手能够提示,如果直接发udp到flume syslogudp中,再通过agent中转发到hdfs上。下面贴上配置以及启动命令:

agent.sources = execSource // sources名字

agent.channels = memoryChannel // channels名字

agent.sinks = k1 k2 // sinks名字

#source

agent.sources.execSource.type = exec // agent source 类型

agent.sources.execSource.command = tail -F /opt/logs/nginx/stat.log // source的输入源为 日志

agent.sources.execSource.channels = memoryChannel // source 的数据流入memoryChannel这个channel

agent.sources.execSource.interceptors = hostInterceptor staticInterceptor // 定义拦截器,我也有点不懂这个

agent.sources.execSource.interceptors.hostInterceptor.type = host

agent.sources.execSource.interceptors.staticInterceptor.type = static

agent.sources.execSource.interceptors.staticInterceptor.key = port

agent.sources.execSource.interceptors.staticInterceptor.value = 8080

#sink

agent.sinks.k1.type = avro // sink 类型为avro

agent.sinks.k1.channel = memoryChannel // sink的数据由memoryChannel流入

agent.sinks.k1.hostname = 10.10.10.10 // sink目的地的 ip

agent.sinks.k1.port = 10000 // sink目的地的 port

agent.sinks.k1.connect-timeout = 200000 // sink连接的超时时间,默认为20000ms

agent.sinks.k2.type = avro

agent.sinks.k2.channel = memoryChannel

agent.sinks.k2.hostname = 10.10.10.10

agent.sinks.k2.port = 10001

agent.sinks.k2.connect-timeout = 200000

#sink group

agent.sinkgroups = g1 // 定义 sinkgroup

agent.sinkgroups.g1.sinks = k1 k2

agent.sinkgroups.g1.processor.type = load_balance // 负载平衡

agent.sinkgroups.g1.processor.selector = round_robin // 选择sink的方式

#channel

agent.channels.memoryChannel.type = memory // channel 类型

agent.channels.memoryChannel.capacity = 100000 // 设置channel capacity

agent.channels.memoryChannel.transactionCapacity = 10000 // 设置channel 传输capacity

agent.channels.memoryChannel.keep-alive=60 // 不设容易超时

agent.channels.memoryChannel.write-timeout=20 // 不调易超时

接收端的agent配置:说明大概差不多,就是source是监听着相应ip端口的avro数据源。发往hdfs。

agent.sources = collection-nginx-stat

agent.channels = mem-nginx-stat

agent.sinks = k1 k2

# source & sink channel

agent.sources.collection-nginx-stat.channels = mem-nginx-stat

agent.sinks.k1.channel = mem-nginx-stat

agent.sinks.k2.channel = mem-nginx-stat

# source ip port binding

agent.sources.collection-nginx-stat.type = avro

agent.sources.collection-nginx-stat.bind = 10.10.10.10

agent.sources.collection-nginx-stat.port = 10000

agent.sources.collection-nginx-stat.interceptors = host-interceptor

agent.sources.collection-nginx-stat.interceptors.host-interceptor.type = host

agent.sources.collection-nginx-stat.interceptors.host-interceptor.preserveExisting = true

agent.sources.collection-nginx-stat.interceptors.host-interceptor.useIP = true

agent.sources.collection-nginx-stat.interceptors.host-interceptor.hostHeader = host

# channel property

agent.channels.mem-nginx-stat.type = memory

agent.channels.mem-nginx-stat.capacity = 1000000

agent.channels.mem-nginx-stat.transactionCapacity=10000

agent.channels.mem-nginx-stat.keep-alive=60

# sink property

agent.sinks.k1.type = hdfs

agent.sinks.k1.serializer = text

agent.sinks.k1.hdfs.path = hdfs://namenode:9000/nginx/stat/%y%m%d/%H/%{host}

agent.sinks.k1.hdfs.filePrefix = logData.sink1

agent.sinks.k1.hdfs.useLocalTimeStamp = true

agent.sinks.k1.hdfs.rollSize = 128000000

agent.sinks.k1.hdfs.rollInterval = 600

agent.sinks.k1.hdfs.rollCount = 3000000

agent.sinks.k1.hdfs.batchSize = 5000

agent.sinks.k1.hdfs.callTimeout = 300000

agent.sinks.k1.hdfs.writeFormat = Text

agent.sinks.k1.hdfs.fileType = DataStream

agent.sinks.k2.type = hdfs

agent.sinks.k2.serializer = text

agent.sinks.k2.hdfs.path = hdfs://namenode:9000/nginx/stat/%y%m%d/%H/%{host}

agent.sinks.k2.hdfs.filePrefix = logData.sink2

agent.sinks.k2.hdfs.useLocalTimeStamp = true

agent.sinks.k2.hdfs.rollSize = 128000000

agent.sinks.k2.hdfs.rollInterval = 600

agent.sinks.k2.hdfs.rollCount = 3000000

agent.sinks.k2.hdfs.batchSize = 5000

agent.sinks.k2.hdfs.callTimeout = 300000

agent.sinks.k2.hdfs.writeFormat = Text

agent.sinks.k2.hdfs.fileType = DataStream

#sink group

agent.sinkgroups = g1

agent.sinkgroups.g1.sinks = k1 k2

agent.sinkgroups.g1.processor.type = failover

agent.sinkgroups.g1.processor.priority.k1 = 5

agent.sinkgroups.g1.processor.priority.k2 = 10

agent.sinkgroups.g1.processor.maxpenalty = 10000

还有log4j.properties配置

#flume.root.logger=DEBUG,console

flume.root.logger=INFO,LOGFILE

#flume.root.logger=DEBUG,console

flume.root.logger=INFO,LOGFILE

flume.log.dir=/opt/logs/flume-nginx-stat

flume.log.file=flume.log

log4j.logger.org.apache.flume.lifecycle = INFO

log4j.logger.org.jboss = WARN

log4j.logger.org.mortbay = INFO

log4j.logger.org.apache.avro.ipc.NettyTransceiver = WARN

log4j.logger.org.apache.hadoop = INFO

# Define the root logger to the system property "flume.root.logger".

log4j.rootLogger=${flume.root.logger}

# Stock log4j rolling file appender

# Default log rotation configuration

log4j.appender.LOGFILE=org.apache.log4j.RollingFileAppender

log4j.appender.LOGFILE.MaxFileSize=100MB

log4j.appender.LOGFILE.MaxBackupIndex=10

log4j.appender.LOGFILE.File=${flume.log.dir}/${flume.log.file}

log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout

log4j.appender.LOGFILE.layout.ConversionPattern=%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n

# Warning: If you enable the following appender it will fill up your disk if you don't have a cleanup job!

# This uses the updated rolling file appender from log4j-extras that supports a reliable time-based rolling policy.

# See http://logging.apache.org/log4j/companions/extras/apidocs/org/apache/log4j/rolling/TimeBasedRollingPolicy.html
# Add "DAILY" to flume.root.logger above if you want to use this

log4j.appender.DAILY=org.apache.log4j.rolling.RollingFileAppender

log4j.appender.DAILY.rollingPolicy=org.apache.log4j.rolling.TimeBasedRollingPolicy

log4j.appender.DAILY.rollingPolicy.ActiveFileName=${flume.log.dir}/${flume.log.file}

log4j.appender.DAILY.rollingPolicy.FileNamePattern=${flume.log.dir}/${flume.log.file}.%d{yyyy-MM-dd}

log4j.appender.DAILY.layout=org.apache.log4j.PatternLayout

log4j.appender.DAILY.layout.ConversionPattern=%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n

# console

# Add "console" to flume.root.logger above if you want to use this

# log4j.appender.console=org.apache.log4j.ConsoleAppender

# log4j.appender.console.target=System.err

# log4j.appender.console.layout=org.apache.log4j.PatternLayout

# log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%n

以及flume-env.sh

# Enviroment variables can be set here.

JAVA_HOME=/opt/apps/jdk

# Give Flume more memory and pre-allocate, enable remote monitoring via JMX

JAVA_OPTS="-Xms4g -Xmx4g -Dcom.sun.management.jmxremote"

# Note that the Flume conf directory is always included in the classpath.

FLUME_CLASSPATH="/opt/conf/flume-nginx-stat"

启动命令:

nohup /opt/apps/flume/bin/flume-ng agent -n agent --conf /opt/conf/flume-nginx-stat --conf-file /opt/conf/flume-nginx-stat/flume-conf.properties -Dflume.monitoring.type=http -Dflume.monitoring.port=23403 >> /opt/logs/flume-nginx-stat/nohup.out 2>&1 &

有问题求救呀,最近碰到的情况是,日志数据量增大了,然后,就经常出现数据发着发着就超时了,有时是FAIL,有时是connect time out。有知道的大神么,想直接用udp的方式发,然后,接收方用syslogudp监听udp数据,不知道咋配发送的sink。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: