您的位置:首页 > 其它

beanstalkd消息队列

2016-05-26 10:55 204 查看
简介

Beanstalkd,一个高性能、轻量级的分布式内存队列系统,最初设计的目的是想通过后台异步执行耗时的任务来降低高容量Web应用系统的页面访问延迟,支持过有9.5 million用户的Facebook Causes应用。后来开源,现在有PostRank大规模部署和使用,每天处理百万级任务。Beanstalkd是典型的类Memcached设计,协议和使用方式都是同样的风格,所以使用过memcached的用户会觉得Beanstalkd似曾相识。

beanstalkd安装启动

安装命令:yum install beanstalkd

后台启动:beanstalkd -l 192.168.2.231 -p 11300 &

(beanstalkd -l 地址 -p 端口号 -z 最大的任务大小(byte) -c &)

启动选项

-b DIR wal directory

-f MS fsync at most once every MS milliseconds (use -f0 for “always fsync”)

-F never fsync (default)

-l ADDR listen on address (default is 0.0.0.0)

-p PORT listen on port (default is 11300)

-u USER become user and group

-z BYTES set the maximum job size in bytes (default is 65535)

-s BYTES set the size of each wal file (default is 10485760)

(will be rounded up to a multiple of 512 bytes)

-c compact the binlog (default)

-n do not compact the binlog

-v show version information

-V increase verbosity

-h show this help

项目配置

文件shop/hnguanli/config/params.default.php重命名为shop/hnguanli/config/params.php

1、增加发送队列(管道)

return array(
'subscribers' => array(    // 消息订阅者清单
'goods' => array(      // 商品更新 -- 管道组
'solr',            // 通知对象 -- 管道
)
),
);


按照实体分组,将管道组置于subscribers数组元素里面,要通知的对象作为一个管道添加到对应的管道组下面,如goods为一个管道组别,通知对象为solr。

2、配置beanstalkd

'beanstalkd' => array(     // beanstalkd服务配置
'persistent' => true,  // 是否保持长连接
'host' => '192.168.2.231', // ip地址
'port' => 11300,       // 端口号
'timeout' => 3,        // 连接超时时间
),


注意ip地址和端口号要与beanstalkd进程相对应

发送消息队列

业务完成后通知对应管道组中对象更新,可使用lib_message_queue.php的product方法

require_once  '/data/shop/hnguanli/includes/lib_message_queue.php';
$mq = new MessageQueue();
$mq->product('goods', $goods_id);  //goods为通知的管道组,$goods_id可为int或array


监听消息队列脚本

消息已发出了,当然也需要一个来消费消息并执行相应业务的脚本,目前放置监听脚本的目录为/data/shop/hnguanli/scripts/

具体例子可参照/data/shop/hnguanli/scripts/goods_watch.php

先了解一下MessageQueue::watch()的定义:

该方法第一次会创建一个到消息服务器的长连接,并监听订阅的队列,只要队列中有消息(无论是新的还是未处理成功的),就会阻塞当前获取的消息,调用消息处理函数,待处理完毕后继续监听队列。

注意:消息处理函数处理完更新业务后,一定要显式返回true,否则下次接收到的消息永远是上一条消息。

//solr--管道, goods--管道组
$queue  = isset($_SERVER["argv"][1]) ? $_SERVER["argv"][1] : 'goods';
$mq    = new MessageQueue('solr');

try {
$mq->watch($queue, function($message) {

$goods_id = intval($message);

if ($goods_id)
update_solr_goods($goods_id); //执行相应的业务

return true;
});
} catch (Exception $e) {
$mq->_log('error', 'goods_watch', $queue, $e->getMessage(), "scripts");
}


另外变量$queue为脚本命令中发送的第一个参数,如果通知对象不同但使用同一个监听脚本时,需要在执行脚本命令中带上这个参数。

发送和接收消息都已经实现了,接收就需要启动对应的监听脚本。下列以后台运行的方式启动。

nohup /usr/local/php/bin/php /data/shop/hnguanli/scripts/goods_watch.php &

或者

(/usr/local/php/bin/php /data/shop/hnguanli/scripts/goods_watch.php &)

准备工作都做好了,接下来就可以进行测试,以下的log文件会有助于你分析和定位问题。

生产及消费消息log:/data/shop/hnguanli/hnlog_/mqlog/管道组名.管道名.log

脚本执行错误log:/data/shop/hnguanli/hnlog_/mqlog/脚本名.管道名.log
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  beanstalkd