您的位置:首页 > 大数据

大数据协作框架之flume详解

2016-11-06 18:49 381 查看
数据源

    1、关系型数据库中的业务数据(sqoop)

    2、日志文件(业务系统的操作数据)(flume)

数据收集的类型

    1、hadoop 执行命令put上传(exec)

        bin/hdfs dfs -put /path/....

    2、hive加载本地数据

        load data local inpath /path/...

    3、flume实时收集数据到指定的位置(ip,hdfs,hbase)

执行类型

    1、人为执行命令

    2、shell+命令》》周期性的上传文件数据

flume简介

     flume.apache.org提供,用jave语言实现,一般用于收集、汇集web server的日志文件,仅仅支持linux操作,以数据流的形式读取写入数据,

    常见的数据源(log):apache(html),tomcat(servlet容器),iis, nginx

    及时(实时)架构:Flume、kafka实时进行数据收集,spark、storm实时去处理,impala实时去查询

flume组成部分(场景:安检)

    source:从数据源元读取数据,用于采集数据,产生数据流的地方,同时source会将产生的数据流传输到channel

    channel:在source和sinks之间传递数,类似于队列

        evens:是flume数据传输的基本单元(header(key),数据体(value))

                      header:是容纳了key-value字符串的无序集合吧,key在集合内是唯一的

           

    sinks:从channel收集数据并向目标写入数据

 

    channel/events/sinks 流程:

     source监控某个文件,将数据拿到,封装在一个event当中,并put/commit到channel当中,channel是一个队列,

队列的优点是先进先出,放好后尾部一个个event出来,sink主动去从chennel当中去拉数据,sink再把数据写到某个

地方,比如HDFS上面去。

            下载CDHhttp://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.6.tar.gz

flume机制

    web server=inputdata=》source=put=》channel【event1....eventn】=pull=>sinks=outputdata=》hdfs、hbase

    source监控某个文件,将数据拿到,封装在一个event当中,并put/commit到chennel当中,chennel是一个队列,队列的优点是先进先出,放好后尾部一个个event出来,sink主动去从chennel当中去拉数据,sink再把数据写到某个地方,比如HDFS上面去

常用的sources:

exec source:可以将命令产生的输出作为源

spooling directory source:这个Source允许你将文件将要收集的数据放置到"自动搜集"目录中。这个Source将监视该目录,并将解析新文件的出现。事件处理逻辑是可插拔的,当一个文件被完全读入信道,它会被重命名或可选的直接删除。

要注意的是,放置到自动搜集目录下的文件不能修改,如果修改,则flume会报错。

另外,也不能产生重名的文件,如果有重名的文件被放置进来,则flume会报错
kafka source

syslog source

HTTP source :

此Source接受HTTP的GET和POST请求作为Flume的事件。
其中GET方式应该只用于试验。
需要提供一个可插拔的"处理器"来将请求转换为事件对象,这个处理器必须实现HTTPSourceHandler接口
这个处理器接受一个 HttpServletRequest对象,并返回一个Flume Envent对象集合。
从一个HTTP请求中得到的事件将在一个事务中提交到通道中。thus allowing for increased efficiency on channels like the file channel。
如果处理器抛出一个异常,Source将会返回一个400的HTTP状态码。
如果通道已满,无法再将Event加入Channel,则Source返回503的HTTP状态码,表示暂时不可用。

常用的channel

kafka channel

file channel

常用的sink

hdfs sink

hive sink

hbase sink

flume的安装配置

    1、下载

    2、加压

        $tar zxf /sourcepath/ -C /copypath

    3、配置flumu-env.sh文件

        exprt JAVA_HOME=/jdkpath

    4、启动

        $bin/flume-ng help/version

flume的使用

    常用命令

        一般使用的命令

        $bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/flume-telnet.conf -Dflume.root.logger=INFO,console

        **-c或--conf 后面跟配置目录

        **-f或—-conf-file 后面跟具体的配置文件

        **-n或—-name 指定Agent的名称

        案例1、

        使用flume监控某个端口,把端口写入的数据输出为logger

        配置文件名称:flum-telnet.conf

        ==================agent a1=======================

        # Name the components on this agent

        #定义一个source

        a1.sources = r1

        #定义一个sinks

        a1.sinks = k1

        #定义一个channel

        a1.channels = c1

        # Describe/configure the source

        #指定source类型

        a1.sources.r1.type = netcat

        #指定监控主机ip

        a1.sources.r1.bind = 192.168.242.128

        #指定监控主机端口

        a1.sources.r1.port = 44444

        # Describe the sink

        #sinks通过logger输出

        a1.sinks.k1.type = logger

        # Use a channel which buffers events in memory

        #设置channel的类型:memory

        a1.channels.c1.type = memory

        #设置channel中evens最大有个数

        a1.channels.c1.capacity = 1000

        #设置sink从channel获取的个数

        a1.channels.c1.transactionCapacity = 100

        # Bind the source and sink to the channel

        #把source和channl建立链接

        a1.sources.r1.channels = c1

        #把channel和sink建立链接

        a1.sinks.k1.channel = c1

    准备工作:

        telnet:是基于tcp协议的一个登陆访问远程机器的服务

        yum -y install telnet

        

        查看端口是否占用

        netstat -an|grep 44444

        

        telnet使用端口

        telnet 192.168.242.128 44444

    启动flum的agent

        $bin/flume-ng

        **flume的agent

        agent

        **flumed配置文件目录

        --conf conf/

        **配置文件的agent的名称

        --name a1

        **flume中agent的具体配置文件

        --conf-file conf/flume-telnet.conf

        **在控制台输出info级别的日志

        -Dflume.root.logger=INFO,console

    案例2、

    ** 企业常用

    ** 日志文件 -->  新添加[追加]

    使用flume去监控某个文件,将新添加进文件的内容抽取到其他地方[HDFS]

    =======================agent(flume-apache.conf)=========================

    # Name the components on this agent

    a2.sources = r2

    a2.channels = c2

    a2.sinks = k2

    # define sources

    #设置source为命令类型(exec)

    a2.sources.r2.type = exec

    #执行的命令

    a2.sources.r2.command = tail -F /var/log/httpd/access_log

    #执行方式(shell)

    a2.sources.r2.shell = /bin/bash -c

    # define channels

    #设置channel的缓存类型为memory

    a2.channels.c2.type = memory

    a2.channels.c2.capacity = 1000

    a2.channels.c2.transactionCapacity = 100

    # define sinks

    #设置sink为写入目标为hdfs

    a2.sinks.k2.type = hdfs

    #设置写入的ip,并且定义文件为日期格式的二级目录结构

    a2.sinks.k2.hdfs.path=hdfs://192.168.242.128:8020/flume/%Y%m%d/%H%M

    #定义存放在hdfs上的文件名称前缀

    a2.sinks.k2.hdfs.filePrefix = accesslog

    #启用日期文件命名格式

    a2.sinks.k2.hdfs.round=true

    #设置创建文件目录结构的时间间隔以及单位

    a2.sinks.k2.hdfs.roundValue=5

    a2.sinks.k2.hdfs.roundUnit=minute

    #设置使用本地时间戳

    a2.sinks.k2.hdfs.useLocalTimeStamp=true

    

    #设置批处理的大小

    a2.sinks.k2.hdfs.batchSize=1000

    #设置文件类型

    a2.sinks.k2.hdfs.fileType=DataStream

    #设置文件格式

    a2.sinks.k2.hdfs.writeFormat=Text

    # bind the sources and sinks to the channels

    a2.sources.r2.channels = c2

    a2.sinks.k2.channel = c2

    准备工作

        安装apache,并且启动服务

        $su - root

        $yum -y install httpd

        $service httpd start

        在apache中创建可以访问的html页面

        $vi /var/www/html/index.html

        查看日志

        $tail -f /var/log/httpd/access_log

        

        

    [beifeng@bigdata ~]$ tail -f /var/log/httpd/access_log

    tail: 无法打开"/var/log/httpd/access_log" 读取数据: 权限不够

    一般用户可以读取/var/log/httpd下文件

        $su - root

        $chmod 755 /var/log/httpd

    启动

        bin/flume-ng agent  --conf conf/ --name a2 --conf-file conf/flume-apache.conf -Dflume.root.logger=INFO,console

    java.lang.NoClassDefFoundError: org/apache/hadoop/io/SequenceFile$CompressionType

        at org.apache.flume.sink.hdfs.HDFSEventSink.configure(HDFSEventSink.java:251)

        at org.apache.flume.conf.Configurables.configure(Configurables.java:41)

        at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:413)

        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)

        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)

        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)

        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

        at java.lang.Thread.run(Thread.java:745)

    Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.io.SequenceFile$CompressionType

        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)

        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

        at java.security.AccessController.doPrivileged(Native Method)

        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)

        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)

        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

        ... 12 more

    

    flume往目标hdfs上写文件,flume相当于hadoop hdfs的一个客户端,也就需要在flumelib下导入hadoop jar,如下四个:

        hadoop-hdfs-2.5.0-cdh5.3.6.jar

        hadoop-common-2.5.0-cdh5.3.6.jar

        hadoop-auth-2.5.0-cdh5.3.6.jar

        commons-configuration-1.6.jar

    解决flume文件过多过小的问题

    #设置解决文件过多过小问题

    #每600秒生成一个文件

    a2.sinks.k2.hdfs.rollInterval=600

    #当达到128000000bytes时,创建新文件 127*1024*1024

    #实际环境中如果按照128M回顾文件,那么这里设置一般设置成127M

    a2.sinks.k2.hdfs.rollSize=128000000

    #设置文件的生成不和events数相关

    a2.sinks.k2.hdfs.rollCount=0

    #设置成1,否则当有副本复制时就重新生成文件,上面三条则没有效果

    a2.sinks.k2.hdfs.minBlockReplicas=1

    案例3:                

    利用flume监控某个目录[/var/log/httpd],把里面回滚好的文件

    实时抽取到HDFS平台。

    # Name the components on this agent

    a3.sources = r3

    a3.channels = c3

    a3.sinks = k3

    # define sources

    #设置监控的类型,文件目录

    a3.sources.r3.type = spooldir

    #设置目录

    a3.sources.r3.spoolDir = /home/beifeng/logs

    #设置忽略目录中的文件

    a3.sources.r3.ignorePattern = ^.*\_log$

    # define channels

    #设置channel缓存数据的类型(file)

    a3.channels.c3.type = file

    #设置检测点目录

    a3.channels.c3.checkpointDir = /opt/modules/apache-flume-1.5.0-cdh5.3.6-bin/checkpoint

    #文件缓存位置

    a3.channels.c3.dataDirs = /opt/modules/apache-flume-1.5.0-cdh5.3.6-bin/checkdata

    # define sinks

    a3.sinks.k3.type = hdfs

    a3.sinks.k3.hdfs.path=hdfs://192.168.17.129:8020/flume2/%Y%m%d/%H

    a3.sinks.k3.hdfs.filePrefix = accesslog

    a3.sinks.k3.hdfs.round=true

    a3.sinks.k3.hdfs.roundValue=1

    a3.sinks.k3.hdfs.roundUnit=hour

    a3.sinks.k3.hdfs.useLocalTimeStamp=true

    a3.sinks.k3.hdfs.batchSize=1000

    a3.sinks.k3.hdfs.fileType=DataStream

    a3.sinks.k3.hdfs.writeFormat=Text

    #设置解决文件过多过小问题

    #每600秒生成一个文件

    a3.sinks.k3.hdfs.rollInterval=600

    #当达到128000000bytes时,创建新文件 127*1024*1024

    #实际环境中如果按照128M回顾文件,那么这里设置一般设置成127M

    a3.sinks.k3.hdfs.rollSize=128000000

    #设置文件的生成不和events数相关

    a3.sinks.k3.hdfs.rollCount=0

    #设置成1,否则当有副本复制时就重新生成文件,上面三条则没有效果

    a3.sinks.k3.hdfs.minBlockReplicas=1

    # bind the sources and sinks to the channels

    a3.sources.r3.channels = c3

    a3.sinks.k3.channel = c3

       
   实时收集文件框架有很多,但是其中Flume使用最广泛,主要由于其架构设计和使用简单清晰明了,又支持Hadoop存储。
  Flume是针对日志文件数据进行实时收集的框架,一个程序其实就是一个FlumeAgent,包含三个部分Source、Channel及Sink。
  在企业中针对实时分析统计要求高,通常使用Flume进行实时收集文件数据,再给Kafka类似的消息队列框架进行可靠性存储,最后又实时分布式框架进行计算;当然也是用Flume将数据收集以后放入HDFS中或检索框架Solr等中。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息