您的位置:首页 > 其它

Kafka官方文档翻译(二)快速入门

2016-12-09 17:52 465 查看
本文假定你是尝鲜而且系统里没有任何kafka或者zookeeper的数据。kafka控制台脚本在基于Unix的系统和windows平台中是不同的,在windows平台中使用“bin\windows\”来代替“bin/”,还要把脚本扩展名改为“.bat”。

第一步:下载代码

下载0.10.1.0release版并解压:

> tar -xzf kafka_2.11-0.10.1.0.tgz

> cd kafka_2.11-0.10.1.0

第二步:启动服务

kafka用到了zookeeper,所以你需要先启动一个zookeeper服务(如果你没有启动的话)。你可以用kafka的简易脚本包来快速干净的启动一个单节点的zookeeper实例。

> bin/zookeeper-server-start.sh config/zookeeper.properties

然后启动kafka服务:

> bin/kafka-server-start.sh config/server.properties

第三步:创建一个主题

让我们创建一个名为“test”的主题,只有一个分区和一个备份:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

我们可以用“list”指令查看刚才创建的主题:

> bin/kafka-topics.sh --list --zookeeper localhost:2181

或者,你可以配置用代理自动在不存在的主题发布时创建它,来代替上面的手动操作。

第四步:发送一些消息

从命令行客户端登录kafka,之后可以从一个文件或者一个标准输入进行数据输入,然后把他们当做消息发送到kafka集群。默认情况下,每一行发送一条独立消息。

尝试运行生产者,然后在控制台定义一些消息发往服务器:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

This is a message

This is another message

第五步:启动一个消费者

kafka也有一个命令行的消费者,可以从标准输出打印消息:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

This is a message

This is another message

如果你在不同终端执行以上的每一个命令,那么你可以在生产者终端定义消息,在消费者终端看到它们。

所有的命令行工具都有附加选项;直接运行命令而不带参数,会显示详细的命令功能信息。

第六步:设置一个多代理的集群

截至目前,我们已经运行了一个代理,但这还不够。在kafka中,一个单独的代理,仅仅代表大小为一的集群,所以多启动几个代理实例也不会有什么改变。不过同情同情就算了,让我们把集群扩展到三个节点(都在我们本机上)

首先,我们为每个代理创建一个配置文件(在windows下用 copy 命令代替):

> cp config/server.properties config/server-1.properties

> cp config/server.properties config/server-2.properties

现在编辑这两个新文件中的属性如下:

config/server-1.properties:

    broker.id=1

    listeners=PLAINTEXT://:9093

    log.dir=/tmp/kafka-logs-1

config/server-2.properties:

    broker.id=2

    listeners=PLAINTEXT://:9094

    log.dir=/tmp/kafka-logs-2

broker.id属性是集群中每个节点的唯一永久ID。因为我们在同一台电脑上运行这些节点,所以我们需要重写了端口和log目录,尽可能不让所有代理注册在同一个端口上或者覆写每个代理的数据。

我们已经启动了Zookeeper和一个单独的节点,所以我们现在只需要启动两个新节点:

> bin/kafka-server-start.sh config/server-1.properties &

...

> bin/kafka-server-start.sh config/server-2.properties &

...

现在创建一个备份因子为3的新主题:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

一切正常,但是现在我们有一个集群了,我们怎么知道每个代理都在干什么?试试执行命令“describe topics”:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic PartitionCount:1ReplicationFactor:3
Configs:
Topic: my-replicated-topicPartition: 0
Leader: 1Replicas: 1,2,0
Isr: 1,2,0

下面解释一下输出。第一行显示了所有分区的概要,每个附加行描述了一个分区。这里我们的主题只有一个分区,所以上面只有一行。

*“leader”负责指定分区的所有读写操作。每个节点是分区随机选择的一部分的leader。

*“replicas”是分区日志备份节点列表,不论他们是leader或者他们只是普通节点。

*“isr"是一个“in-sync(同步中)”备份组。他们是备份列表的子集,即当前运行并向leader报告的备份节点。

注意,在上面的示例中,节点1是主题唯一分区的leader。

我们可以在最开始的测试主题运行同样的命令,看看它储存在哪儿:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

Topic:test PartitionCount:1ReplicationFactor:1
Configs:
Topic: test
Partition: 0 Leader: 0Replicas: 0
Isr: 0

看看结果,没什么意外的————最初的主题没有备份,储存在编号0的服务器中,也是我们创建它时集群中唯一的服务器。

让我们给新主题发布一些消息:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

...

my test message 1

my test message 2

^C

现在消费这些消息:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

...

my test message 1

my test message 2

^C

现在我们测试一下容错。1号代理现在是leader,我们关闭它:

> ps aux | grep server-1.properties

7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...

> kill -9 7564

在windows上使用:

> wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"

java.exe    java  -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.10-0.10.1.0.jar"  kafka.Kafka config\server-1.properties    644

> taskkill /pid 644 /f

领导关系被转移到了子节点之一,节点1号不再存在于同步中备份组。

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic PartitionCount:1ReplicationFactor:3
Configs:
Topic: my-replicated-topicPartition: 0
Leader: 2Replicas: 1,2,0
Isr: 2,0

但是消息仍然可以消费,即使最初写入数据的leader节点挂掉了:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

...

my test message 1

my test message 2

^C

第七步:使用kafka Connect导入/导出数据

从控制台写入数据或者在控制台后台写入数据都是很便捷的开始,但是你也可能想从其他源输入数据或者导出数据到其他系统。在大部分系统中,通过编写消费者代码,你可以用kafka Connect导入/导出数据。

kafka Connect(连接)是kafka的一个工具,提供导入/导出数据到/从kafka的功能。它是一个运行connectors实例的扩展工具,该实例实现了和外部系统交互的消费者逻辑。在本文中,我们将看到如何使用简单的connectors对象运行Kafka Connect,来导入数据到一个kafka主题的文件中,或者导出kafka主题到一个文件。

首先,我们创建一些测试数据:

> echo -e "foo\nbar" > test.txt

接着,我们在standalone模式下启动两个connectors,也就是他们运行在一个本地独立专享进程中。我们输入了三个配置文件作为参数,第一个总是kafka Connect进程的配置,包括常用配置如要连接的kafka代理或数据的序列号格式。剩下的配置文件每一个都指定了一个connector实例的创建方式。这些文件包含了一个唯一的connector名称,connector类的示例,还有其他connector需要的配置。

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

这些样板配置文件,可以在kafka中找到,用你早前启动的默认本地集群配置来创建两个connectors实例:第一个是源connector实例用于从输入文件中按行读取数据并把每行生产到kafka的一个主题,第二个是一个槽connector用于从一个kafka主题读取消息并按行输出到文件。

在启动过程中,你会看到一些日志消息,包括正在实例化的connectors信息。一旦kafka Connect过程启动完成,源connector就开始从文件“test.txt”读取各行数据,发布它们到主题“connect-test”,而槽connector则开始从主题“connect-test”读取消息并把他们写入文件“test.sink.txt”。我们可以通过检查输出文件的文本内容来检验通过这个管道传输的数据:

> cat test.sink.txt

foo

bar

注意,数据是储存在kafka主题“connect-test”中,所以我们可以开一个控制台消费者看看这个主题中的数据(或者用客户端的消费者代码处理它):

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

{"schema":{"type":"string","optional":false},"payload":"foo"}

{"schema":{"type":"string","optional":false},"payload":"bar"}

让connectors继续处理数据,所以我们可以向文件添加数据,然后看数据通过这个通路进行传递:

> echo "Another line" >> test.txt

你现在应该可以在控制台消费者输出看到一行新数据,也可以在槽文件中看到它。

第八步:使用kafka Streams(流)处理数据

kafka流(Streams)是一个kafka客户端库,用于kafka代理中的实时流数据处理和分析。下面的快速示例会向你演示如何运行一个用该库写成的流处理应用程序。下面是示例“WordCountDemo”的主要代码(为便于阅读已经用java8的lambda表达式改写):

KTable wordCounts = textLines

    // Split each text line, by whitespace, into words.

    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

    // Ensure the words are available as record keys for the next aggregate operation.

    .map((key, value) -> new KeyValue<>(value, value))

    // Count the occurrences of each word (record key) and store the results into a table named "Counts".

    .countByKey("Counts")

WordCount实现的算法,是计算每个单词在输入文件中出现的次数。不同于以往你见过的WordCount例子操作有界(有限)数据,这里的WordCount例子算法行为有一点儿不同,它操作的是无穷的、无边界的数据流。类似于动态边界,这是一个稳定的追踪和更新单词计数的算法。不过,因为前提是假定输入数据是无界的,它将定期输出它的当前状态,并在处理更多数据的同时输出结果,因为当它处理“所有”已输入数据时并不知道还有更多数据。

我们现在可以为kafka主题准备输入数据了,它们稍后将被kafka流应用处理:

> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt

或者windows下:

> echo all streams lead to kafka> file-input.txt

> echo hello kafka streams>> file-input.txt

> echo|set /p=join kafka summit>> file-input.txt

接下来,我们把输入数据发送到输入主题(名字是“streams-file-input),用控制台生产者进行该操作(作为练习,应用使用和运行的数据流会像一个kafka中的连续流):

> bin/kafka-topics.sh --create \

            --zookeeper localhost:2181 \

            --replication-factor 1 \

            --partitions 1 \

            --topic streams-file-input

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt

现在我们可以运行WordCount示例处理输入数据了:

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

只有当log记录也作为结果在持续写入另一个被称为“streams-wordcount-output"时,STDOUT输出才会运行。示例将会运行几秒钟,然后不同于通常的流处理应用,会自动终止。

我们现在可以读取WordCount示例的输出主题,检查执行结果:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \

            --topic streams-wordcount-output \

            --from-beginning \

            --formatter kafka.tools.DefaultMessageFormatter \

            --property print.key=true \

            --property print.value=true \

            --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \

            --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

下面的输出数据会被打印到控制台:

all     1

lead    1

to      1

hello   1

streams 2

join    1

kafka   3

summit  1

如上,第一列在kafka中称为消息主键,第二列是消息的值,都是java.lang.String类型。注意,实际上输出是一个持续更新的流,每一条数据记录(这里是上面原始输出中的每一行)是一个单词计数的更新,比如主键“kafka”的记录。对于相同主键的多个记录,每一个稍晚的记录都是更新自稍早的记录。

现在你可以写更多的输入消息到“streams-file-input”主题,然后观察新增加的消息被添加到“streams-wordcount-output”主题,再看看单词计数的更新(可以用上面讲过的控制台生产者和消费者)。

然后你可以用ctrl-C关闭控制台消费者了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: