您的位置:首页 > 其它

Kafka快速上手教程 5

2017-05-18 10:13 1626 查看


Kafka 监控之 KafkaOffsetMonitor 安装部署及使用


一、实验介绍


1.1 实验内容

KafkaOffsetMonitor 是一个可以用于监控 Kafka 的 Topic 及 Consumer 消费状况的工具,其配置和使用特别的方便。

本节课将介绍 KafkaOffsetMonitor 的安装及使用。


1.2 先学课程

Hadoop 介绍与安装 https://www.shiyanlou.com/courses/237

为了保证可以在实验楼环境中完成本次实验,我们在原书内容基础上补充了一系列的实验指导,比如实验截图,代码注释,帮助您更好得实战。

如果您对于实验有疑惑或者建议可以随时在讨论区中提问,与同学们一起探讨。


1.3 实验知识点

KafkaOffsetMonitor 介绍
KafkaOffsetMonitor 的安装部署
KafkaOffsetMonitor 使用


1.4 实验环境

Hadoop 2.6.1
Zookeeper-3.4.6
Kafka_2.10-0.10.0.0
Xfce 终端


1.5 适合人群

本课程属于中等难度级别,适合具有大数据 Kafka 基础的用户,如果对数据采集了解能够更好的上手本课程。


二、实验步骤


2.1 准备工作

注意:本节实验内容为独立内容,请重新使用新环境进行学习,避免前面的一些设置配置导致课程运行报错。


1) 格式化并启动 hadoop

我们已经在实验楼环境里下载并配置启动 hadoop-2.6.1 所需的文件,免除您配置文件的麻烦,您可以在 
/opt
 找到,只需格式化并启动
hadoop 进程即可。

双击打开桌面上的 Xfce 终端,用 
sudo
 命令切换到 hadoop 用户,hadoop 用户密码为 hadoop,用 
cd
 命令进入 
/opt
目录。
$ su hadoop
$ cd /opt/




在 
/opt
 目录下格式化 hadoop。
$ hdfs namenode -format




在 
/opt
 目录下启动 hadoop 进程。
$ hadoop-2.6.1/sbin/start-all.sh




用 
jps
 查看 hadoop 进程是否启动。



2) 在 
/opt
 目录下下载并解压 Zookeeper
$  sudo wget http://labfile.oss.aliyuncs.com/courses/785/zookeeper-3.4.6.tar.gz $  sudo tar -zxvf zookeeper-3.4.6.tar.gz


启动 Zookeeper ,需要进行一些配置修改才行。
$ sudo chmod 777 -R zookeeper-3.4.6
$ cd zookeeper-3.4.6/
$ sudo cp conf/zoo_sample.cfg conf/zoo.cfg


修改
zoo.cfg
的配置。
hadoop@945f39ae074b:/opt/zookeeper-3.4.6$ sudo vi conf/zoo.cfg


添加修改以下内容。
dataDir=/opt/zookeeper-3.4.6/data
dataLogDir=/opt/zookeeper-3.4.6/logs


并在末尾添加。
server.1=localhost:2888:3888




执行命令创建相应的文件。
hadoop@e59fbc967b89:/opt/zookeeper-3.4.6$ sudo mkdir /opt/zookeeper-3.4.6/data
hadoop@e59fbc967b89:/opt/zookeeper-3.4.6$ sudo mkdir /opt/zookeeper-3.4.6/logs
hadoop@e59fbc967b89:/opt/zookeeper-3.4.6$ sudo chmod 777 -R ../zookeeper-3.4.6
hadoop@e59fbc967b89:/opt/zookeeper-3.4.6$ sudo echo 1 >/opt/zookeeper-3.4.6/data/myid


启动 Zookeeper 并查看状态。
hadoop@e59fbc967b89:/opt/zookeeper-3.4.6/bin$ ./zkServer.sh start
hadoop@e59fbc967b89:/opt/zookeeper-3.4.6/bin$ ./zkServer.sh status




3) 安装并启动 Kafka

在 /opt 目录下下载 Kafka 并解压
$ hadoop@e59fbc967b89:/opt$ sudo wget http://labfile.oss.aliyuncs.com/courses/785/kafka_2.10-0.10.0.0.tgz $ hadoop@e59fbc967b89:/opt$ sudo tar -zxvf kafka_2.10-0.10.0.0.tgz


启动 Kafka
#权限不足,授权
hadoop@e59fbc967b89:/opt$ sudo chmod 777 -R kafka_2.10-0.10.0.0
hadoop@e59fbc967b89:/opt$ cd  kafka_2.10-0.10.0.0
#启动kafka
hadoop@e59fbc967b89:/opt/kafka_2.10-0.10.0.0$ bin/kafka-server-start.sh  config/server.properties  &


用 
jps
 查看进程是否启动。




2.2 KafkaOffsetMonitor 介绍

KafkaOffsetMonitor 是有由 Kafka 开源社区提供的一款Web管理界面,这个应用程序用来实时监控 Kafka 服务的 Consumer 以及它们所在的 Partition 中的 Offset,你可以通过浏览当前的消费者组,并且每个 Topic 的所有 Partition 的消费情况都可以观看的一清二楚。它让我们很直观的知道,每个 Partition 的 Message 是否消费掉,有木有阻塞等等。这个 Web 管理平台保留的 Partitio,Offset 和它的 Consumer 的相关历史数据,我们可以通过浏览
Web 管理的相关模块,清楚的知道最近一段时间的消费情况。

该 Web 管理平台有以下功能:
保护消费者组列表信息。
对 Consumer 的消费监控,并列出每个 Consumer 的 Offset 数据。
浏览查阅 Topic 的历史消费信息。
每个 Topic 的所有 Partition 列表包含:Topic、Pid、Offset、LogSize、Lag 以及 Owner 等等。


2.3 安装 KafkaOffsetMonitor

在安装 KafkaOffsetMonitor 管理平台时,我们需要先下载其安装包,其资源可以在 Github 上找到,但是 KafkaOffsetMonitor 中有些资源文件 (CSS,JS) 是访问外网的,特别是有访问 Google 资源,经常不能访问需要翻墙。

源码地址:

https://github.com/quantifind/KafkaOffsetMonitor

jar包地址:

https://github.com/quantifind/KafkaOffsetMonitor/releases/tag/v0.2.1

为方便大家学习,实验楼已将安装包放在内部云服务了。通过以下链接下载:

http://labfile.oss.aliyuncs.com/courses/785/KafkaOffsetMonitor-assembly-0.2.0.jar

在 kafka 的 bin 目录下创建并进入 kafka-monitor 目录,下载 KafkaOffsetMonitor。
$hadoop@e59fbc967b89:/opt/kafka_2.10-0.10.0.0/bin$ sudo mkdir kafka-monitor
$hadoop@e59fbc967b89:/opt/kafka_2.10-0.10.0.0/bin$ cd kafka-monitor
$hadoop@e59fbc967b89:/opt/kafka_2.10-0.10.0.0/bin/kafka-monitor$ sudo wget http://labfile.oss.aliyuncs.com/courses/785/KafkaOffsetMonitor-assembly-0.2.0.jar




并在 kafka-monitor 文件夹下用 
vi
 新建脚本文件 a.sh
$ sudo vi a.sh


添加以下内容
java -Xms128M -Xmx256M -Xss1024K -XX:PermSize=128m -XX:MaxPermSize=256m -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGette
rWeb --zk localhost:2181 --port 8086 --refresh 10.seconds --retain 7.days 1> kakfa.offset.monitor.logs.stdout.log 2>kafka.offset.monitor.logs.stderr.log &




#授予 a.sh 脚本权限
hadoop@e59fbc967b89:/opt$ sudo chmod 777 -R kafka_2.10-0.10.0.0


a.sh 的含义:

首先我们需要指明运行 Web 监控的类,然后需要用到 ZooKeeper,所有要填写 ZK 集群信息,接着是 Web 运行端口,页面数据刷新的时间以及保留数据的时间值,输入输出日志重定向。

启动 
a.sh
,并查看进程:
hadoop@e59fbc967b89:/opt/kafka_2.10-0.10.0.0/bin/kafka-monitor/$ sh a.sh


 



打开浏览器输入 Ip 地址: http://localhost:8086



创建主题 test
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test




创建主题 tes2
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic tes2




点击 
Visualizations
 -> 
Cluster
Overview
,因为我们只有一个 kafka broker,所以只显示一个,如果是 kafka 集群,则会显示多个 broker。



点击 
Topic List
 查看刚才创建的主题。




2.4 编写代码

双击打开 eclipse -> file -> New -> Java Project





输入 Project name -> 选择 javaSe-1.7 -> Finish



右键 src -> New -> Package



输入包名 -> Finish



添加 kafka jar 包



点击 Libraries -> Add External JARs...-> 选中已经下载的 kafka libs 目录下的 jar -> 确定



点击 OK



右键 test -> New -> Class



输入类名 -> 确认



MyTest.java
如下
package test;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MyTest {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
//zk
props.put("zk.connect", "localhost:2181");
//kafka broker
props.put("metadata.broker.list","localhost:9092");

//serialize
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);

// read socket
for (int i = 1; i <= 100000; i++) {
Thread.sleep(50);
producer.send(new KeyedMessage<String, String>("test",
"message: " + i ));
}

}
}


用一个 comsumer Xfce 终端从 test 中读取信息。
hadoop@54fd4c55166a:/opt/kafka_2.10-0.10.0.0$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic test


运行 main



点击 
Consumer Groups
 可以看到当前的 Consumer Groups。



点击 
Consumer Groups
 ->
console-consumer-70129
 ,可以看到当前消费的主题,分区,偏移量等。



点击 
kafkaMonitor
 可以查看你想监控的消费组。



点击 
Visualizations
-> 
Active
Topic Consumers
 可以看到活的主题及对应的消费者



一些参数的含义如下:
Topic:创建 Topic 名称。
Partition:分区编号。
Offset:表示该 Parition 已经消费了多少 Message。
LogSize:表示该 Partition 生产了多少 Message。
Lag:表示有多少条 Message 未被消费。
Owner:表示消费者。
Created:表示该 Partition 创建时间。
Last Seen:表示消费状态刷新最新时间。

观察到 consume Xfce 终端不断输出信息。



消费消息,通过监控 Web 查看更直观。

点击 
Topic List
 ->
test
->
console-consumer-70129





三、实验总结

本节课主要介绍了 KafkaOffsetMonitor 的安装并通过使用 Java 语言进行验证,很直观的显示了消费的状态。


四、扩展阅读

http://kafka.apache.org/quickstart
https://my.oschina.net/cjun/blog/514956
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息