Kafka快速上手教程 3
2017-05-18 10:09
906 查看
Flume 与 Kafka 集成
一、实验介绍
1.1 实验内容
分别使用 Kafka 作为数据源,通道,沉漕与 Flume 集成 ,最后简要分析 Kafka, Flume 二者的区别联系。
1.2 先学课程
Flume 介绍与安装:https://www.shiyanlou.com/courses/237
1.3 实验知识点
flume 原理kafka 消息队列
hdfs 文件系统
Zookeeper 单机安装
1.4 实验环境
Hadoop 2.6.1Kafka_2.10-0.10.0.0
Flume-1.6.0
Zookeeper-3.4.6
Xfce 终端
1.5 适合人群
本课程属于中等难度级别,适合具有大数据 hadoop 基础的用户,如果对数据采集 了解能够更好的上手本课程。
二、实验步骤
我们已经在实验楼环境里下载并配置启动 hadoop-2.6.1 所需的文件,免除您配置文件的麻烦,您可以在 /opt找到,只需格式化并启动
hadoop 进程即可。
2.1 准备工作
双击打开桌面上的 Xfce 终端,用 sudo命令切换到 hadoop 用户,hadoop 用户密码为 hadoop,用
cd命令进入
/opt目录。
$ su hadoop $ cd /opt/
在
/opt目录下格式化 hadoop。
$ hadoop-2.6.1/bin/hdfs namenode -format
在
/opt目录下启动 hadoop 进程。
$ hadoop-2.6.1/sbin/start-all.sh
用
jps查看 hadoop 进程是否启动。
2.2 下载所需安装包
你可以通过下面命令将 Flume, Kafka, Zookeeper 下载到实验楼环境中,进行学习。在
/opt目录下,用 hadoop 用户通过
wget命令下载,并用
tar解压。
$ hadoop@945f39ae074b:/opt$ sudo wget http://labfile.oss.aliyuncs.com/courses/785/apache-flume-1.6.0-bin.tar.gz $ hadoop@945f39ae074b:/opt$ sudo tar -zxvf apache-flume-1.6.0-bin.tar.gz $ hadoop@945f39ae074b:/opt$ sudo wget http://labfile.oss.aliyuncs.com/courses/785/kafka_2.10-0.10.0.0.tgz $ hadoop@945f39ae074b:/opt$ sudo tar -zxvf kafka_2.10-0.10.0.0.tgz $ hadoop@945f39ae074b:/opt$ sudo wget http://labfile.oss.aliyuncs.com/courses/785/zookeeper-3.4.6.tar.gz $ hadoop@945f39ae074b:/opt$ sudo tar -zxvf zookeeper-3.4.6.tar.gz
修改 flume-env.sh
hadoop@945f39ae074b:/opt$ cd apache-flume-1.6.0-bin/conf/ hadoop@945f39ae074b:/opt/apache-flume-1.6.0-bin/conf$ sudo cp flume-env.sh.template flume-env.sh hadoop@945f39ae074b:/opt/apache-flume-1.6.0-bin/conf$ sudo vi flume-env.sh
flume-env.sh文件需修改内容:
export JAVA_HOME=/usr/lib/jvm/java-8-oracle # Give Flume more memory and pre-allocate, enable remote monitoring via JMX export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
修改 flume-conf.properties
hadoop@945f39ae074b:/opt/apache-flume-1.6.0-bin/conf$sudo cp flume-conf.properties.template flume-conf.properties
启动 Zookeeper 及 Kafka 服务,这里我们使用外部的 Zookeeper,需要进行一些配置修改才行。
hadoop@945f39ae074b:/opt$ sudo chmod 777 -R zookeeper-3.4.6 hadoop@945f39ae074b:/opt$ cd zookeeper-3.4.6/ hadoop@945f39ae074b:/opt/zookeeper-3.4.6$ sudo cp conf/zoo_sample.cfg conf/zoo.cfg
修改
zoo.cfg的配置。
hadoop@945f39ae074b:/opt/zookeeper-3.4.6$ sudo vi conf/zoo.cfg
添加修改以下内容。
dataDir=/opt/zookeeper-3.4.6/data dataLogDir=/opt/zookeeper-3.4.6/logs
并在末尾添加。
server.1=localhost:2888:3888
执行命令创建相应的文件。
hadoop@945f39ae074b:/opt/zookeeper-3.4.6$ sudo mkdir /opt/zookeeper-3.4.6/data hadoop@945f39ae074b:/opt/zookeeper-3.4.6$ sudo mkdir /opt/zookeeper-3.4.6/logs hadoop@945f39ae074b:/opt/zookeeper-3.4.6$ sudo chmod 777 -R ../zookeeper-3.4.6 hadoop@945f39ae074b:/opt/zookeeper-3.4.6$ sudo echo 1 >/opt/zookeeper-3.4.6/data/myid
启动 Zookeeper 并查看状态。
hadoop@945f39ae074b:/opt/zookeeper-3.4.6/bin$ ./zkServer.sh start hadoop@945f39ae074b:/opt/zookeeper-3.4.6/bin$ ./zkServer.sh status
启动kafka
#权限不足,授权 hadoop@945f39ae074b:/opt$ sudo chmod 777 -R kafka_2.10-0.10.0.0 hadoop@945f39ae074b:/opt$ cd kafka_2.10-0.10.0.0 #启动kafka hadoop@945f39ae074b:/opt/kafka_2.10-0.10.0.0$ bin/kafka-server-start.sh config/server.properties &
2.3 Kafka Source
前提开启 Zookeeper 服务,Kafka 服务,这里用 Kafka 作为 Source。#添加 kafkaSource.properties,配置如下 hadoop@945f39ae074b:/opt/apache-flume-1.6.0-bin/conf$ sudo vi kafkaSource.properties
# Name the components on this agent a1.sources=r1 a1.channels=c1 a1.sinks=k1 # Describe/configure the source a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource a1.sources.r1.zookeeperConnect=localhost:2181 a1.sources.r1.topic=testkafkasource a1.sources.r1.groupId=flume a1.sources.r1.kafka.consumer.timeout.ms=100 a1.sources.source1.interceptors.i1.type = timestamp # Use a channel which buffers events in memory a1.channels.c1.type=memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 1000 # Describe the sink a1.sinks.k1.type=hdfs a1.sinks.k1.hdfs.path=hdfs://localhost:9000/flume-kafa/%{topic}/%y-%m-%d a1.sinks.k1.hdfs.fileType=DataStream # Bind the source and sink to the channel a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
根据
kafkaSource.properties的配置创建主题
testkafkaSource。
hadoop@945f39ae074b:/opt/kafka_2.10-0.10.0.0$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testkafkasource
用一个 producer 向 testkafkasource 中写入消息。
hadoop@945f39ae074b:/opt/kafka_2.10-0.10.0.0$bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testkafkasource
flume-ng 启动 agent:
hadoop@945f39ae074b:/opt/apache-flume-1.6.0-bin$ bin/flume-ng agent --conf conf --conf-file conf/kafkaSource.properties --name a1 -Dflume.root.logger=INFO,console
在刚才启动的
agent输出的信息显示存储到
hdfs文件系统上。
使用
hadoop fs命令查看
hdfs文件系统的上的文件内容。
$ hadoop fs -cat /flume-kafa/testkafkasource/17-04-12/FlumeData.1491986407870
双击桌面上的浏览器输入:localhost:50070 ,然后点击
Utilities->
Browse the file system
输入
/flume-kafa/testkafkasource/17-04-12,当然您也可以用鼠标点击查找。
通过以上实验得知,我们成功的使用 Kafka 作为数据源向 HDFS 接收器发送数据。
2.4 Kafka Sink
前提是开启 Zookeeper 服务,Kafka 服务,这里用 Kafka 作为 Sink。#添加 kafkaSink.properties,配置如下 hadoop@945f39ae074b:/opt/apache-flume-1.6.0-bin/conf$ sudo vi conf/kafkaSink.properties
添加如下代码:
# Name the components on this agent a1.sources=r1 a1.channels=c1 a1.sinks=k1 # Describe/configure the source a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=8888 # Use a channel which buffers events in memory a1.channels.c1.type=memory # Describe the sink a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic=testkafkasink a1.sinks.k1.brokerList=localhost:9092 a1.sinks.k1.requiredAcks=1 a1.sinks.k1.batchSize=20 # Bind the source and sink to the channel a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
根据
kafkaSink.properties的配置创建主题
testkafkasink。
hadoop@945f39ae074b:/opt/kafka_2.10-0.10.0.0$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testkafkasink
flume-ng 启动 agent:
hadoop@945f39ae074b:/opt/apache-flume-1.6.0-bin$ bin/flume-ng agent --conf conf --conf-file conf/kafkaSink.properties --name a1 -Dflume.root.logger=INFO,console
另外开启 Xfce 终端输入:
$ nc localhost 8888
再次开启 Xfce 终端,启动 consumer。
hadoop@945f39ae074b:/opt/kafka_2.10-0.10.0.0$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testkafkasink --from-beginning
查看是否消费了消息。注意这里需要关掉 flume-ng 启动的 agent。
通过以上实验得知,我们成功的使用 Kafka 作为 Sink 来消费送数据。
2.5 Kafka Channel
前提开启 Zookeeper 服务,Kafka 服务,这里用 Kafka 作为 Channel。#添加 kafkaChannel.properties,配置如下 hadoop@945f39ae074b:/opt/apache-flume-1.6.0-bin/conf$ sudo vi conf/kafkaChannel.properties
# Name the components on this agent a1.sources=r1 a1.channels=c1 a1.sinks=k1 # Describe/configure the source a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=8888 # Describe the channel a1.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=1000 a1.channels.c1.brokerList=localhost:9092 a1.channels.c1.topic=testkafkachannel a1.channels.c1.zookeeperConnect=localhost:2181 # Describe the sink a1.sinks.k1.type=hdfs a1.sinks.k1.hdfs.path=hdfs://localhost:9000/kafka-channel/logs/ a1.sinks.k1.hdfs.filePrefix=log a1.sinks.k1.hdfs.rollInterval=0 a1.sinks.k1.hdfs.rollCount=10000 a1.sinks.k1.hdfs.fileType=DataStream # Bind the source and sink to the channel a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
根据
kafkaChannel.properties的配置创建主题 testkafkachannel。
hadoop@945f39ae074b:/opt/kafka_2.10-0.10.0.0$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testkafkachannel
flume-ng 启动 agent:
hadoop@945f39ae074b:/opt/apache-flume-1.6.0-bin$ bin/flume-ng agent --conf conf --conf-file conf/kafkaChannel.properties --name a1 -Dflume.root.logger=INFO,console
另外开启终端输入:
$ nc localhost 8888
再次开启 kafka 终端,启动 consumer。
hadoop@945f39ae074b:/opt/kafka_2.10-0.10.0.0$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testkafkachannel --from-beginning
刚才用
nc命令输入内容,在此处就可以打印输出。
另外通过使用
hadoop fs命令查看
hdfs文件系统的上的文件内容也是可以看到。
$ hadoop fs -lsr hdfs://localhost:9000/kafka-channel/
通过以上实验得知,我们成功的使用 Kafka 作为 Channel 来发送数据到 HDFS。
三、区别与联系
Kafka 是一个可持久化的分布式的消息队列,你可以有许多生产者和很多的消费者共享多个主题 Topics。相比之下,Flume 是一个专用工具被设计为旨在往 HDFS,HBase 发送数据。它对 HDFS 有特殊的优化,并且集成了 Hadoop 的安全特性。所以,Cloudera 建议如果数据被多个系统消费的话,使用 kafka;如果数据被设计给 Hadoop 使用,使用 Flume。Flume 和 Kafka 可以很好地结合起来使用。如果你的设计需要从 Kafka 到 Hadoop 的流数据,使用 Flume 代理并配置 Kafka 的 Source 读取数据也是可行的,你没有必要实现自己的消费者。你可以直接利用 Flume 与 HDFS 及 HBase 的结合的所有好处。你可以使用 Cloudera Manager 对消费者的监控,并且你甚至可以添加拦截器进行一些流处理。
Flume 和 Kafka 可以结合起来使用。通常会使用 Flume + Kafka 的方式。其实如果为了利用 Flume 已有的写 HDFS 功能,也可以使用 Kafka + Flume 的方式。
四、实验总结
正如你们所知 Flume 内置很多的 source 和sink 组件。然而,Kafka 明显有一个更小的生产消费者生态系统,并且 Kafka 的社区支持不好,希望将来这种情况会得到改善,使用 Kafka 意味着你准备好了编写你自己的生产者和消费者代码。如果已经存在的 Flume Sources 和 Sinks 满足你的需求,并且你更喜欢不需要任何开发的系统,请使用 Flume。
五、扩展阅读
https://cwiki.apache.org//confluence/display/FLUME/Getting+Startedhttp://kafka.apache.org/quickstart
相关文章推荐
- Kafka快速上手教程 2
- Kafka快速上手教程 5
- Kafka快速上手教程 1
- Kafka快速上手教程 4
- 快速上手java.util.Formatter 类,(实例教程)
- Jmock快速上手教程
- [教程]正则快速上手指南:1 - 初次相识
- [教程]正则快速上手指南:2 – 神奇的分组
- Spring MVC快速上手教程
- smarty半小时快速上手教程
- smarty半小时快速上手教程(4).
- Castle.ActiveRecord 使用快速上手教程
- Python程序语言快速上手教程
- Spring MVC快速上手教程
- makefiles快速上手教程 原文地址:http://mrbook.org/tutorials/make/
- samrty学习快速上手 教程+实例
- PGP快速上手教程
- 循序渐进学Ajax【01】--目前最好的学习Ajax的教程--浅显易懂,快速是上手
- Spring.net 基础教程快速上手指南
- Git教程 快速上手