beanstalkd消息队列
2016-05-26 10:55
204 查看
简介
Beanstalkd,一个高性能、轻量级的分布式内存队列系统,最初设计的目的是想通过后台异步执行耗时的任务来降低高容量Web应用系统的页面访问延迟,支持过有9.5 million用户的Facebook Causes应用。后来开源,现在有PostRank大规模部署和使用,每天处理百万级任务。Beanstalkd是典型的类Memcached设计,协议和使用方式都是同样的风格,所以使用过memcached的用户会觉得Beanstalkd似曾相识。
beanstalkd安装启动
安装命令:yum install beanstalkd
后台启动:beanstalkd -l 192.168.2.231 -p 11300 &
(beanstalkd -l 地址 -p 端口号 -z 最大的任务大小(byte) -c &)
启动选项
-b DIR wal directory
-f MS fsync at most once every MS milliseconds (use -f0 for “always fsync”)
-F never fsync (default)
-l ADDR listen on address (default is 0.0.0.0)
-p PORT listen on port (default is 11300)
-u USER become user and group
-z BYTES set the maximum job size in bytes (default is 65535)
-s BYTES set the size of each wal file (default is 10485760)
(will be rounded up to a multiple of 512 bytes)
-c compact the binlog (default)
-n do not compact the binlog
-v show version information
-V increase verbosity
-h show this help
项目配置
文件shop/hnguanli/config/params.default.php重命名为shop/hnguanli/config/params.php
1、增加发送队列(管道)
按照实体分组,将管道组置于subscribers数组元素里面,要通知的对象作为一个管道添加到对应的管道组下面,如goods为一个管道组别,通知对象为solr。
2、配置beanstalkd
注意ip地址和端口号要与beanstalkd进程相对应
发送消息队列
业务完成后通知对应管道组中对象更新,可使用lib_message_queue.php的product方法
监听消息队列脚本
消息已发出了,当然也需要一个来消费消息并执行相应业务的脚本,目前放置监听脚本的目录为/data/shop/hnguanli/scripts/
具体例子可参照/data/shop/hnguanli/scripts/goods_watch.php
先了解一下MessageQueue::watch()的定义:
该方法第一次会创建一个到消息服务器的长连接,并监听订阅的队列,只要队列中有消息(无论是新的还是未处理成功的),就会阻塞当前获取的消息,调用消息处理函数,待处理完毕后继续监听队列。
注意:消息处理函数处理完更新业务后,一定要显式返回true,否则下次接收到的消息永远是上一条消息。
另外变量$queue为脚本命令中发送的第一个参数,如果通知对象不同但使用同一个监听脚本时,需要在执行脚本命令中带上这个参数。
发送和接收消息都已经实现了,接收就需要启动对应的监听脚本。下列以后台运行的方式启动。
nohup /usr/local/php/bin/php /data/shop/hnguanli/scripts/goods_watch.php &
或者
(/usr/local/php/bin/php /data/shop/hnguanli/scripts/goods_watch.php &)
准备工作都做好了,接下来就可以进行测试,以下的log文件会有助于你分析和定位问题。
生产及消费消息log:/data/shop/hnguanli/hnlog_/mqlog/管道组名.管道名.log
脚本执行错误log:/data/shop/hnguanli/hnlog_/mqlog/脚本名.管道名.log
Beanstalkd,一个高性能、轻量级的分布式内存队列系统,最初设计的目的是想通过后台异步执行耗时的任务来降低高容量Web应用系统的页面访问延迟,支持过有9.5 million用户的Facebook Causes应用。后来开源,现在有PostRank大规模部署和使用,每天处理百万级任务。Beanstalkd是典型的类Memcached设计,协议和使用方式都是同样的风格,所以使用过memcached的用户会觉得Beanstalkd似曾相识。
beanstalkd安装启动
安装命令:yum install beanstalkd
后台启动:beanstalkd -l 192.168.2.231 -p 11300 &
(beanstalkd -l 地址 -p 端口号 -z 最大的任务大小(byte) -c &)
启动选项
-b DIR wal directory
-f MS fsync at most once every MS milliseconds (use -f0 for “always fsync”)
-F never fsync (default)
-l ADDR listen on address (default is 0.0.0.0)
-p PORT listen on port (default is 11300)
-u USER become user and group
-z BYTES set the maximum job size in bytes (default is 65535)
-s BYTES set the size of each wal file (default is 10485760)
(will be rounded up to a multiple of 512 bytes)
-c compact the binlog (default)
-n do not compact the binlog
-v show version information
-V increase verbosity
-h show this help
项目配置
文件shop/hnguanli/config/params.default.php重命名为shop/hnguanli/config/params.php
1、增加发送队列(管道)
return array( 'subscribers' => array( // 消息订阅者清单 'goods' => array( // 商品更新 -- 管道组 'solr', // 通知对象 -- 管道 ) ), );
按照实体分组,将管道组置于subscribers数组元素里面,要通知的对象作为一个管道添加到对应的管道组下面,如goods为一个管道组别,通知对象为solr。
2、配置beanstalkd
'beanstalkd' => array( // beanstalkd服务配置 'persistent' => true, // 是否保持长连接 'host' => '192.168.2.231', // ip地址 'port' => 11300, // 端口号 'timeout' => 3, // 连接超时时间 ),
注意ip地址和端口号要与beanstalkd进程相对应
发送消息队列
业务完成后通知对应管道组中对象更新,可使用lib_message_queue.php的product方法
require_once '/data/shop/hnguanli/includes/lib_message_queue.php'; $mq = new MessageQueue(); $mq->product('goods', $goods_id); //goods为通知的管道组,$goods_id可为int或array
监听消息队列脚本
消息已发出了,当然也需要一个来消费消息并执行相应业务的脚本,目前放置监听脚本的目录为/data/shop/hnguanli/scripts/
具体例子可参照/data/shop/hnguanli/scripts/goods_watch.php
先了解一下MessageQueue::watch()的定义:
该方法第一次会创建一个到消息服务器的长连接,并监听订阅的队列,只要队列中有消息(无论是新的还是未处理成功的),就会阻塞当前获取的消息,调用消息处理函数,待处理完毕后继续监听队列。
注意:消息处理函数处理完更新业务后,一定要显式返回true,否则下次接收到的消息永远是上一条消息。
//solr--管道, goods--管道组 $queue = isset($_SERVER["argv"][1]) ? $_SERVER["argv"][1] : 'goods'; $mq = new MessageQueue('solr'); try { $mq->watch($queue, function($message) { $goods_id = intval($message); if ($goods_id) update_solr_goods($goods_id); //执行相应的业务 return true; }); } catch (Exception $e) { $mq->_log('error', 'goods_watch', $queue, $e->getMessage(), "scripts"); }
另外变量$queue为脚本命令中发送的第一个参数,如果通知对象不同但使用同一个监听脚本时,需要在执行脚本命令中带上这个参数。
发送和接收消息都已经实现了,接收就需要启动对应的监听脚本。下列以后台运行的方式启动。
nohup /usr/local/php/bin/php /data/shop/hnguanli/scripts/goods_watch.php &
或者
(/usr/local/php/bin/php /data/shop/hnguanli/scripts/goods_watch.php &)
准备工作都做好了,接下来就可以进行测试,以下的log文件会有助于你分析和定位问题。
生产及消费消息log:/data/shop/hnguanli/hnlog_/mqlog/管道组名.管道名.log
脚本执行错误log:/data/shop/hnguanli/hnlog_/mqlog/脚本名.管道名.log
相关文章推荐
- beanstalkd消息队列在生产环境的应用
- Beanstalkd的使用(Golang)
- Beanstalkd队列安装使用心得
- Beanstalkd , zeromq,rabbitmq对比
- php消息队列beanstalkd使用
- 分布式队列Beanstalkd小记
- beanstalkd实现延迟任务
- 基于netty4的beanstalkd的java客户端实现
- 消息队列 Beanstalkd 源码解析(一)
- 消息队列-beanstalkd
- beanstalkd 的安装和使用
- Beanstalkd 分布式内存队列系统
- laravel通过supervisor管理beanstalkd任务队列
- beanstalkd消息队列在生产环境的应用
- Beanstalkd一个高性能分布式内存队列系统
- beanstalkd介绍
- beanstalkd基础使用(C/C++语言)
- beanstalkd实现延迟任务
- 消息队列_Beanstalkd-0001.Beanstalkd之轻量级分布式内存队列部署?
- 消息队列的使用场景和使用技巧