您的位置:首页 > 其它

Kafka快速上手教程 3

2017-05-18 10:09 906 查看


Flume 与 Kafka 集成


一、实验介绍


1.1 实验内容

分别使用 Kafka 作为数据源,通道,沉漕与 Flume 集成 ,最后简要分析 Kafka, Flume 二者的区别联系。


1.2 先学课程

Flume 介绍与安装:https://www.shiyanlou.com/courses/237


1.3 实验知识点

flume 原理
kafka 消息队列
hdfs 文件系统
Zookeeper 单机安装


1.4 实验环境

Hadoop 2.6.1
Kafka_2.10-0.10.0.0
Flume-1.6.0
Zookeeper-3.4.6
Xfce 终端


1.5 适合人群

本课程属于中等难度级别,适合具有大数据 hadoop 基础的用户,如果对数据采集 了解能够更好的上手本课程。


二、实验步骤

我们已经在实验楼环境里下载并配置启动 hadoop-2.6.1 所需的文件,免除您配置文件的麻烦,您可以在 
/opt
 找到,只需格式化并启动
hadoop 进程即可。


2.1 准备工作

双击打开桌面上的 Xfce 终端,用 
sudo
 命令切换到 hadoop 用户,hadoop 用户密码为 hadoop,用 
cd
 命令进入 
/opt
目录。
$ su hadoop
$ cd /opt/




在 
/opt
 目录下格式化 hadoop。
$ hadoop-2.6.1/bin/hdfs namenode -format




在 
/opt
 目录下启动 hadoop 进程。
$ hadoop-2.6.1/sbin/start-all.sh




用 
jps
 查看 hadoop 进程是否启动。




2.2 下载所需安装包

你可以通过下面命令将 Flume, Kafka, Zookeeper 下载到实验楼环境中,进行学习。

在 
/opt
 目录下,用 hadoop 用户通过 
wget
 命令下载,并用 
tar
 解压。
$ hadoop@945f39ae074b:/opt$ sudo wget  http://labfile.oss.aliyuncs.com/courses/785/apache-flume-1.6.0-bin.tar.gz $ hadoop@945f39ae074b:/opt$ sudo tar -zxvf apache-flume-1.6.0-bin.tar.gz
$ hadoop@945f39ae074b:/opt$ sudo wget http://labfile.oss.aliyuncs.com/courses/785/kafka_2.10-0.10.0.0.tgz $ hadoop@945f39ae074b:/opt$ sudo tar -zxvf kafka_2.10-0.10.0.0.tgz
$ hadoop@945f39ae074b:/opt$ sudo wget http://labfile.oss.aliyuncs.com/courses/785/zookeeper-3.4.6.tar.gz $ hadoop@945f39ae074b:/opt$ sudo tar -zxvf zookeeper-3.4.6.tar.gz


修改 flume-env.sh
hadoop@945f39ae074b:/opt$ cd apache-flume-1.6.0-bin/conf/
hadoop@945f39ae074b:/opt/apache-flume-1.6.0-bin/conf$ sudo cp flume-env.sh.template  flume-env.sh
hadoop@945f39ae074b:/opt/apache-flume-1.6.0-bin/conf$ sudo vi flume-env.sh




flume-env.sh文件需修改内容:
export JAVA_HOME=/usr/lib/jvm/java-8-oracle
# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"




修改 flume-conf.properties
hadoop@945f39ae074b:/opt/apache-flume-1.6.0-bin/conf$sudo cp flume-conf.properties.template  flume-conf.properties


启动 Zookeeper 及 Kafka 服务,这里我们使用外部的 Zookeeper,需要进行一些配置修改才行。
hadoop@945f39ae074b:/opt$ sudo chmod 777 -R zookeeper-3.4.6
hadoop@945f39ae074b:/opt$ cd zookeeper-3.4.6/
hadoop@945f39ae074b:/opt/zookeeper-3.4.6$ sudo cp conf/zoo_sample.cfg conf/zoo.cfg


修改
zoo.cfg
的配置。
hadoop@945f39ae074b:/opt/zookeeper-3.4.6$ sudo vi conf/zoo.cfg


添加修改以下内容。
dataDir=/opt/zookeeper-3.4.6/data
dataLogDir=/opt/zookeeper-3.4.6/logs


并在末尾添加。
server.1=localhost:2888:3888




执行命令创建相应的文件。
hadoop@945f39ae074b:/opt/zookeeper-3.4.6$ sudo mkdir /opt/zookeeper-3.4.6/data
hadoop@945f39ae074b:/opt/zookeeper-3.4.6$ sudo mkdir /opt/zookeeper-3.4.6/logs
hadoop@945f39ae074b:/opt/zookeeper-3.4.6$ sudo chmod 777 -R ../zookeeper-3.4.6
hadoop@945f39ae074b:/opt/zookeeper-3.4.6$ sudo echo 1 >/opt/zookeeper-3.4.6/data/myid


启动 Zookeeper 并查看状态。
hadoop@945f39ae074b:/opt/zookeeper-3.4.6/bin$ ./zkServer.sh start
hadoop@945f39ae074b:/opt/zookeeper-3.4.6/bin$ ./zkServer.sh status




启动kafka
#权限不足,授权
hadoop@945f39ae074b:/opt$ sudo chmod 777 -R kafka_2.10-0.10.0.0
hadoop@945f39ae074b:/opt$ cd  kafka_2.10-0.10.0.0
#启动kafka
hadoop@945f39ae074b:/opt/kafka_2.10-0.10.0.0$ bin/kafka-server-start.sh  config/server.properties  &





2.3 Kafka Source

前提开启 Zookeeper 服务,Kafka 服务,这里用 Kafka 作为 Source。
#添加 kafkaSource.properties,配置如下
hadoop@945f39ae074b:/opt/apache-flume-1.6.0-bin/conf$ sudo vi kafkaSource.properties

# Name the components on this agent
a1.sources=r1
a1.channels=c1
a1.sinks=k1

# Describe/configure the source
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.zookeeperConnect=localhost:2181
a1.sources.r1.topic=testkafkasource
a1.sources.r1.groupId=flume
a1.sources.r1.kafka.consumer.timeout.ms=100
a1.sources.source1.interceptors.i1.type = timestamp
# Use a channel which buffers events in memory
a1.channels.c1.type=memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
# Describe the sink
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://localhost:9000/flume-kafa/%{topic}/%y-%m-%d
a1.sinks.k1.hdfs.fileType=DataStream

# Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1


根据 
kafkaSource.properties
 的配置创建主题 
testkafkaSource

hadoop@945f39ae074b:/opt/kafka_2.10-0.10.0.0$ bin/kafka-topics.sh  --create --zookeeper localhost:2181  --replication-factor 1 --partitions 1 --topic testkafkasource




用一个 producer 向 testkafkasource 中写入消息。
hadoop@945f39ae074b:/opt/kafka_2.10-0.10.0.0$bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testkafkasource


flume-ng 启动 agent:
hadoop@945f39ae074b:/opt/apache-flume-1.6.0-bin$ bin/flume-ng agent --conf conf --conf-file conf/kafkaSource.properties --name a1 -Dflume.root.logger=INFO,console




在刚才启动的 
agent
 输出的信息显示存储到 
hdfs
 文件系统上。



使用 
hadoop fs
 命令查看 
hdfs
 文件系统的上的文件内容。
$ hadoop fs -cat /flume-kafa/testkafkasource/17-04-12/FlumeData.1491986407870


双击桌面上的浏览器输入:localhost:50070 ,然后点击 
Utilities
->
Browse
the file system


输入 
/flume-kafa/testkafkasource/17-04-12
,当然您也可以用鼠标点击查找。



通过以上实验得知,我们成功的使用 Kafka 作为数据源向 HDFS 接收器发送数据。


2.4 Kafka Sink

前提是开启 Zookeeper 服务,Kafka 服务,这里用 Kafka 作为 Sink。
#添加 kafkaSink.properties,配置如下
hadoop@945f39ae074b:/opt/apache-flume-1.6.0-bin/conf$ sudo vi conf/kafkaSink.properties


添加如下代码:
# Name the components on this agent
a1.sources=r1
a1.channels=c1
a1.sinks=k1

# Describe/configure the source
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888

# Use a channel which buffers events in memory
a1.channels.c1.type=memory

# Describe the sink
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic=testkafkasink
a1.sinks.k1.brokerList=localhost:9092
a1.sinks.k1.requiredAcks=1
a1.sinks.k1.batchSize=20

# Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1


根据 
kafkaSink.properties
 的配置创建主题 
testkafkasink

hadoop@945f39ae074b:/opt/kafka_2.10-0.10.0.0$ bin/kafka-topics.sh  --create --zookeeper localhost:2181  --replication-factor 1 --partitions 1 --topic testkafkasink


flume-ng 启动 agent:
hadoop@945f39ae074b:/opt/apache-flume-1.6.0-bin$ bin/flume-ng agent --conf conf --conf-file conf/kafkaSink.properties --name a1 -Dflume.root.logger=INFO,console


另外开启 Xfce 终端输入:
$ nc localhost 8888




再次开启 Xfce 终端,启动 consumer。
hadoop@945f39ae074b:/opt/kafka_2.10-0.10.0.0$  bin/kafka-console-consumer.sh --zookeeper  localhost:2181 --topic testkafkasink --from-beginning


查看是否消费了消息。注意这里需要关掉 flume-ng 启动的 agent。



通过以上实验得知,我们成功的使用 Kafka 作为 Sink 来消费送数据。


2.5 Kafka Channel

前提开启 Zookeeper 服务,Kafka 服务,这里用 Kafka 作为 Channel。
#添加 kafkaChannel.properties,配置如下
hadoop@945f39ae074b:/opt/apache-flume-1.6.0-bin/conf$ sudo vi conf/kafkaChannel.properties

# Name the components on this agent
a1.sources=r1
a1.channels=c1
a1.sinks=k1

# Describe/configure the source
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888

# Describe the channel
a1.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=1000
a1.channels.c1.brokerList=localhost:9092
a1.channels.c1.topic=testkafkachannel
a1.channels.c1.zookeeperConnect=localhost:2181

# Describe the sink
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://localhost:9000/kafka-channel/logs/
a1.sinks.k1.hdfs.filePrefix=log
a1.sinks.k1.hdfs.rollInterval=0
a1.sinks.k1.hdfs.rollCount=10000
a1.sinks.k1.hdfs.fileType=DataStream

# Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1


根据 
kafkaChannel.properties
 的配置创建主题 testkafkachannel。
hadoop@945f39ae074b:/opt/kafka_2.10-0.10.0.0$ bin/kafka-topics.sh  --create --zookeeper localhost:2181  --replication-factor 1 --partitions 1 --topic testkafkachannel


flume-ng 启动 agent:
hadoop@945f39ae074b:/opt/apache-flume-1.6.0-bin$ bin/flume-ng agent --conf conf --conf-file conf/kafkaChannel.properties --name a1 -Dflume.root.logger=INFO,console


另外开启终端输入:
$ nc localhost 8888




再次开启 kafka 终端,启动 consumer。
hadoop@945f39ae074b:/opt/kafka_2.10-0.10.0.0$  bin/kafka-console-consumer.sh --zookeeper  localhost:2181 --topic testkafkachannel --from-beginning


刚才用 
nc
 命令输入内容,在此处就可以打印输出。



另外通过使用 
hadoop fs
 命令查看 
hdfs
 文件系统的上的文件内容也是可以看到。
$ hadoop fs -lsr hdfs://localhost:9000/kafka-channel/




通过以上实验得知,我们成功的使用 Kafka 作为 Channel 来发送数据到 HDFS。


三、区别与联系

Kafka 是一个可持久化的分布式的消息队列,你可以有许多生产者和很多的消费者共享多个主题 Topics。相比之下,Flume 是一个专用工具被设计为旨在往 HDFS,HBase 发送数据。它对 HDFS 有特殊的优化,并且集成了 Hadoop 的安全特性。所以,Cloudera 建议如果数据被多个系统消费的话,使用 kafka;如果数据被设计给 Hadoop 使用,使用 Flume。

Flume 和 Kafka 可以很好地结合起来使用。如果你的设计需要从 Kafka 到 Hadoop 的流数据,使用 Flume 代理并配置 Kafka 的 Source 读取数据也是可行的,你没有必要实现自己的消费者。你可以直接利用 Flume 与 HDFS 及 HBase 的结合的所有好处。你可以使用 Cloudera Manager 对消费者的监控,并且你甚至可以添加拦截器进行一些流处理。

Flume 和 Kafka 可以结合起来使用。通常会使用 Flume + Kafka 的方式。其实如果为了利用 Flume 已有的写 HDFS 功能,也可以使用 Kafka + Flume 的方式。


四、实验总结

正如你们所知 Flume 内置很多的 source 和sink 组件。然而,Kafka 明显有一个更小的生产消费者生态系统,并且 Kafka 的社区支持不好,希望将来这种情况会得到改善,使用 Kafka 意味着你准备好了编写你自己的生产者和消费者代码。如果已经存在的 Flume Sources 和 Sinks 满足你的需求,并且你更喜欢不需要任何开发的系统,请使用 Flume。


五、扩展阅读

https://cwiki.apache.org//confluence/display/FLUME/Getting+Started
http://kafka.apache.org/quickstart
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kafka flume