Kafka简介及使用PHP处理Kafka消息
2017-04-07 10:22
501 查看
Kafka简介及使用PHP处理Kafka消息
Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。Kafka的特点:
以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。【据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)】
支持Kafka Server间的消息分区,同时保证每个Partition内的消息顺序传输。
分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式的。无需停机即可扩展机器。
消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡。
同时支持离线数据处理和实时数据处理。
Kafka的架构:
kafka架构图
Kafka的整体架构非常简单,producer、broker(kafka)和consumer都可以有多个。Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。broker分发注册到系统中的consumer。broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。客户端和服务器端的通信,是基于简单,高性能,且与编程语言无关的TCP协议。
Kafka基本概念:
Topic:特指Kafka处理的消息源(feeds of messages)的不同分类。
Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。
Producers:消息和数据生产者,向Kafka的一个topic发布消息的过程叫做producers。
Consumers:消息和数据消费者,订阅topics并处理其发布的消息的过程叫做consumers。
Broker:缓存代理,Kafa集群中的一台或多台服务器统称为broker。
Kafka消息发送的流程:
Kafka消息发送
下面是PHP生产、消费Kafka消息的例子(假设已经配置好Kafka):
1.从zookeeper源码src/c/src安装zookeeper c client
123 | cd zookeeper-3.4.8/src/c./configuremake && make install |
1 2 3 4 5 | git clone https://github.com/Timandes/libzookeeper.git cd libzookeeper phpize ./configure --with-libzookeeper=/usr/local/bin/cli_mt make && make install |
12345 | git clone https://github.com/andreiz/php-zookeeper.gitcd php-zookeeperphpize./configuremake && make install |
1 2 | extension=libzookeeper.so extension=zookeeper.so |
1.启动zookeeper和kafka
12 | kafka_2.11-0.10.0.0/bin/zookeeper-server-start.sh --daemon kafka_2.11-0.10.0.0/config/zookeeper.propertieskafka_2.11-0.10.0.0/bin/kafka-server-start.sh kafka_2.11-0.10.0.0/config/server.properties |
1 | kafka_2.11-0.10.0.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic testtopic |
1 | composer require "nmred/kafka-php" |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | <?php require_once('./vendor/autoload.php'); $produce = \Kafka\Produce::getInstance('localhost:2181', 3000); $produce->setRequireAck(-1); $topicName = 'testtopic'; //获取到topic下可用的partitions $partitions = $produce->getAvailablePartitions($topicName); $partitionCount = count($partitions); $count = 1; while(true){ $message = json_encode(array('uid' => $count, 'age' => $count%100, 'datetime' => date('Y-m-d H:i:s'))); //发送消息到不同的partition $partitionId = $count%$partitionCount; $produce->setMessages('testtopic', $partitionId, array($message)); $result = $produce->send(); var_dump($result); $count++; echo "producer sleeping\n"; sleep(1); } |
123456789101112131415161718192021222324252627 | <?php require_once('./vendor/autoload.php'); //获取需要处理的partitionId$partitionId = isset($argv[1]) ? intval($argv[1]) : 0; $consumer = \Kafka\Consumer::getInstance('localhost:2181'); $consumer->setGroup('test-consumer-group');$consumer->setPartition('testtopic', $partitionId);$consumer->setFromOffset(true);$consumer->setMaxBytes(102400); while(true){ $topic = $consumer->fetch(); foreach ($topic as $topicName => $partition) { foreach ($partition as $partId => $messageSet) { foreach ($messageSet as $message) { var_dump($message); } } } echo "consumer sleeping\n"; sleep(1);} |
1 2 3 | php producer.php php consumer.php 0 php consumer.php 1 |
两个consumer脚本依次收到producer发送的消息
php-kafka-consumer-output
Related Posts
使用七牛云快速搭建直播服务使用Let‘s Encrypt为网站添加HTTPS
Kafka简介及使用PHP处理Kafka消息
Github项目使用Travis
CI持续集成之PHP
相关文章推荐
- 消息队列(Message Queue)简介及其使用
- MFC程序中消息以及函数的处理顺序简介!
- 使用PHP的错误处理
- 消息队列(Message Queue)简介及其使用
- 消息队列(Message Queue)简介及其使用
- 使用C#和MSMQ开发消息处理程序
- 消息队列(Message Queue)简介及其使用 [转]
- 在同一窗体中使用PHP来处理多个提交任务
- 使用C#和MSMQ开发消息处理程序(转)
- 消息队列(Message Queue)简介及其使用
- 消息队列(Message Queue)简介及其使用
- Linux下,使用C/C++编写一个简单的消息处理程序
- 在同一窗体中使用PHP来处理多个提交任务
- 使用C#和MSMQ开发消息处理程序
- 通知php使用自定义的session处理函数来操作session,而不使用php预置的方法
- 使用单独的命令处理类来处理命令消息(适用于有很多命令处理函数的对象,以及共享命令处理函数)
- 消息队列(Message Queue)简介及其使用(zz)
- 使用SQL Mail收发和自动处理邮件中的扩展存储过程简介
- 消息队列(Message Queue)简介及其使用