您的位置:首页 > 编程语言 > PHP开发

RabbitMQ+PHP 消息队列环境配置

2016-07-01 21:46 691 查看
参考文档:
http://www.cnblogs.com/phpinfo/p/4104551...
http://blog.csdn.net/historyasamirror/ar...

依赖包安装

yuminstallncurses-develunixODBCunixODBC-devel

erlang环境

wgethttp://erlang.org/download/otp_src_18.1.tar.gztar-zxvfotp_src_18.1.tar.gz
cdotp_src_18.1
./configure--prefix=/usr/local/erlang
make
makeinstall

#配置erlang环境变量
vim/etc/profile
#增加内容:
exportPATH="$PATH:/usr/local/erlang/bin"

#保存退出,并刷新变量
source/etc/profile

#测试erlang是否安装成功
#安装完成以后,执行erl看是否能打开eshell,用’halt().’退出,注意后面的点号,那是erlang的结束符。
[root@localhostsrc]#erl
Erlang/OTP17[erts-6.1][source][64-bit][async-threads:10][hipe][kernel-poll:false]

EshellV6.1(abortwith^G)
2>9+3.
12
3>halt().

安装rabbitmq依赖文件,安装rabbitmq

#安装rabbitmq依赖包
yuminstallxmlto

#安装rabbitmq服务端
wgethttp://www.rabbitmq.com/releases/rabbitmq-server/v3.5.7/rabbitmq-server-3.5.7.tar.gztarzxvfrabbitmq-server-3.5.7.tar.gz
cdrabbitmq-server-3.5.7/
make
makeinstallTARGET_DIR=/usr/local/rabbitmqSBIN_DIR=/usr/local/rabbitmq/sbinMAN_DIR=/usr/local/rabbitmq/manDOC_INSTALL_DIR=/usr/local/rabbitmq/doc

#配置hosts
vim/etc/hosts
#增加一行内容
#当前IP地址绑定HOSTNAME名称(vim/etc/sysconfig/network)
192.168.2.208localhost.localdomain

#这种会提示错误(Warning:PIDfilenotwritten;-detachedwaspassed.)
/usr/local/rabbitmq/sbin/rabbitmq-server-detached启动rabbitmq
/usr/local/rabbitmq/sbin/rabbitmqctlstatus查看状态
/usr/local/rabbitmq/sbin/rabbitmqctlstop关闭rabbitmq

#目前我自己使用
/usr/local/rabbitmq/sbin/rabbitmq-serverstart&启动rabbitmq
/usr/local/rabbitmq/sbin/rabbitmqctlstatus查看状态
/usr/local/rabbitmq/sbin/rabbitmqctlstop关闭rabbitmq

启用管理插件

mkdir/etc/rabbitmq
/usr/local/rabbitmq/sbin/rabbitmq-pluginslist查看插件列表
/usr/local/rabbitmq/sbin/rabbitmq-pluginsenablerabbitmq_management(启用插件)
/usr/local/rabbitmq/sbin/rabbitmq-pluginsdisablerabbitmq_management(禁用插件)

#重启rabbitmq
#访问http://127.0.0.1:15672/
#如果有iptables
vim/etc/sysconfig/iptables

#增加一下内容
-AINPUT-mstate--stateNEW-mtcp-ptcp--dport15672-jACCEPT

#重启动iptable
serviceiptablesrestart

开机自启动配置

#!/bin/sh
#startrabbitMq
sudo/usr/local/rabbitmq/sbin/rabbitmq-server&>/usr/local/rabbitmq/logs/rabbitmq.log2>&1

RabbitMQPHP扩展安装

#安装rabbitmq-c依赖包
yuminstalllibtoolautoconf

#安装rabbitmq-c(最好下载0.5的,0.6安装可能会报错)
#版本下载:https://github.com/alanxz/rabbitmq-c/releases/tag/v0.5.0
wgethttps://github.com/alanxz/rabbitmq-c/releases/download/v0.5.0/rabbitmq-c-0.5.0.tar.gztar-zxvfv0.5.0
cdrabbitmq-c-0.5.0/
autoreconf-i
./configure--prefix=/usr/local/rabbitmq-c
make
makeinstall

#安装PHP扩展amqp
wgethttp://pecl.php.net/get/amqp-1.6.1.tgztarzxvfamqp-1.6.1.tgz
cdamqp-1.6.1
/usr/local/php/bin/phpize
./configure--with-php-config=/usr/local/php/bin/php-config--with-amqp--with-librabbitmq-dir=/usr/local/rabbitmq-c
make
makeinstall

#编辑php.ini文件,增加amqp扩展支持
vim/usr/local/php/etc/php.ini

#增加下面内容
;rabbitmq扩展支持
extension=amqp.so

#重启php-fpm
/etc/init.d/php-fpmrestart

验证是否成功phpinfo()查看下是否支持amqp扩展



相关配置

hostnamemq//设置hostname名称
vim/etc/sysconfig/network//设置hostname
vim/etc/hosts//编辑hosts
./rabbitmqctladd_useradminadmin//添加用户
./rabbitmqctlset_user_tagsadminadministrator//添加admin到administrator分组
./rabbitmqctlset_permissions-p/admin"*.""*.""*."//添加权限

创建配置文件

#在/usr/rabbitmq/sbin/rabbitmq-defaults查看config文件路径
#创建配置文件
touch/usr/rabbitmq/sbin
#vm_memory_high_watermark内存低水位线,若低于该水位线,则开启流控机制,阻止所有请求,默认值是0.4,即内存总量的40%,
#vm_memory_high_watermark_paging_ratio内存低水位线的多少百分比开始通过写入磁盘文件来释放内存
vi/usr/rabbitmq/sbin/rabbitmq.config输入
[
{rabbit,[{vm_memory_high_watermark_paging_ratio,0.75},
{vm_memory_high_watermark,0.7}]}
].

创建环境文件

touch/etc/rabbitmq/rabbitmq-env.conf
#输入
RABBITMQ_NODENAME=FZTEC-240088节点名称
RABBITMQ_NODE_IP_ADDRESS=127.0.0.1监听IP
RABBITMQ_NODE_PORT=5672监听端口
RABBITMQ_LOG_BASE=/data/rabbitmq/log日志目录
RABBITMQ_PLUGINS_DIR=/data/rabbitmq/plugins插件目录
RABBITMQ_MNESIA_BASE=/data/rabbitmq/mnesia后端存储目录

操作命令

查看exchange信息
/usr/rabbitmq/sbin/rabbitmqctllist_exchangesnametypedurableauto_deletearguments

查看队列信息
/usr/rabbitmq/sbin/rabbitmqctllist_queuesnamedurableauto_deletemessagesconsumersme
查看绑定信息
/usr/rabbitmq/sbin/rabbitmqctllist_bindings
查看连接信息
/usr/rabbitmq/sbin/rabbitmqctllist_connections

php的server端脚本

<?php
$routingkey='key';
//设置你的连接
$conn_args=array('host'=>'localhost','port'=>'5672','login'=>'guest','password'=>'guest');
$conn=newAMQPConnection($conn_args);
if($conn->connect()){
echo"Establishedaconnectiontothebroker\n";
}
else{
echo"Cannotconnecttothebroker\n";
}
//你的消息
$message=json_encode(array('HelloWorld3!','php3','c++3:'));
//创建channel
$channel=newAMQPChannel($conn);
//创建exchange
$ex=newAMQPExchange($channel);
$ex->setName('exchange');//创建名字
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
//$ex->setFlags(AMQP_AUTODELETE);
//echo"exchangestatus:".$ex->declare();
echo"exchangestatus:".$ex->declareExchange();
echo"\n";
for($i=0;$i<100;$i++){
if($routingkey=='key2'){
$routingkey='key';
}else{
$routingkey='key2';
}
$ex->publish($message,$routingkey);
}
/*
$ex->publish($message,$routingkey);
创建队列
$q=newAMQPQueue($channel);
设置队列名字如果不存在则添加
$q->setName('queue');
$q->setFlags(AMQP_DURABLE|AMQP_AUTODELETE);
echo"queuestatus:".$q->declare();
echo"\n";
echo'queuebind:'.$q->bind('exchange','route.key');
将你的队列绑定到routingKey
echo"\n";

$channel->startTransaction();
echo"send:".$ex->publish($message,'route.key');//将你的消息通过制定routingKey发送
$channel->commitTransaction();
$conn->disconnect();
*/

php客户端脚本

<?php
$bindingkey='key2';
//连接RabbitMQ
$conn_args=array('host'=>'127.0.0.1','port'=>'5672','login'=>'guest','password'=>'guest','vhost'=>'/');
$conn=newAMQPConnection($conn_args);
$conn->connect();
//设置queue名称,使用exchange,绑定routingkey
$channel=newAMQPChannel($conn);
$q=newAMQPQueue($channel);
$q->setName('queue2');
$q->setFlags(AMQP_DURABLE);
$q->declare();
$q->bind('exchange',$bindingkey);
//消息获取
$messages=$q->get(AMQP_AUTOACK);
if($messages){
var_dump(json_decode($messages->getBody(),true));
}
$conn->disconnect();
?>

翻译了部分mq常量设置,不正确的地方,大家以试验为准

全选复制放进笔记
/**
*Passinginthisconstantasaflagwillforcefullydisableallotherflags.
*Usethisifyouwanttotemporarilydisabletheamqp.auto_ackinisetting.
*传递这个参数作为标志将完全禁用其他标志,如果你想临时禁用amqp.auto_ack设置起效
*/
define('AMQP_NOPARAM',0);

/**
*Durableexchangesandqueueswillsurviveabrokerrestart,completewithalloftheirdata.
*持久化交换机和队列,当代理重启动后依然存在,并包括它们中的完整数据
*/
define('AMQP_DURABLE',2);

/**
*Passiveexchangesandqueueswillnotberedeclared,butthebrokerwillthrowanerroriftheexchangeorqueuedoesnotexist.
*被动模式的交换机和队列不能被重新定义,但是如果交换机和队列不存在,代理将扔出一个错误提示
*/
define('AMQP_PASSIVE',4);

/**
*Validforqueuesonly,thisflagindicatesthatonlyoneclientcanbelisteningtoandconsumingfromthisqueue.
*仅对队列有效,这个人标志定义队列仅允许一个客户端连接并且从其消费消息
*/
define('AMQP_EXCLUSIVE',8);

/**
*Forexchanges,theautodeleteflagindicatesthattheexchangewillbedeletedassoonasnomorequeuesarebound
*toit.Ifnoqueueswereeverboundtheexchange,theexchangewillneverbedeleted.Forqueues,theautodelete
*flagindicatesthatthequeuewillbedeletedassoonastherearenomorelistenerssubscribedtoit.Ifno
*subscriptionhaseverbeenactive,thequeuewillneverbedeleted.Note:Exclusivequeueswillalwaysbe
*automaticallydeletedwiththeclientdisconnects.
*对交换机而言,自动删除标志表示交换机将在没有队列绑定的情况下被自动删除,如果从没有队列和其绑定过,这个交换机将不会被删除.
*对队列而言,自动删除标志表示如果没有消费者和你绑定的话将被自动删除,如果从没有消费者和其绑定,将不被删除,独占队列在客户断
*开连接的时候将总是会被删除
*/
define('AMQP_AUTODELETE',16);

/**
*Clientsarenotallowedtomakespecificqueuebindingstoexchangesdefinedwiththisflag.
*这个标志标识不允许自定义队列绑定到交换机上
*/
define('AMQP_INTERNAL',32);

/**
*Whenpassedtotheconsumemethodforaclusteredenvironment,donotconsumefromthelocalnode.
*在集群环境消费方法中传递这个参数,表示将不会从本地站点消费消息
*/
define('AMQP_NOLOCAL',64);

/**
*Whenpassedtothe{@linkAMQPQueue::get()}and{@linkAMQPQueue::get()}methodsasaflag,
*themessageswillbeimmediatelymarkedasacknowledgedbytheserverupondelivery.
*当在队列get方法中作为标志传递这个参数的时候,消息将在被服务器输出之前标志为acknowledged(已收到)
*/
define('AMQP_AUTOACK',128);

/**
*Passedonqueuecreation,thisflagindicatesthatthequeueshouldbedeletedifitbecomesempty.
*在队列建立时候传递这个参数,这个标志表示队列将在为空的时候被删除
*/
define('AMQP_IFEMPTY',256);

/**
*Passedonqueueorexchangecreation,thisflagindicatesthatthequeueorexchangeshouldbe
*deletedwhennoclientsareconnectedtothegivenqueueorexchange.
*在交换机或者队列建立的时候传递这个参数,这个标志表示没有客户端连接的时候,交换机或者队列将被删除
*/
define('AMQP_IFUNUSED',512);

/**
*Whenpublishingamessage,themessagemustberoutedtoavalidqueue.Ifitisnot,anerrorwillbereturned.
*当发布消息的时候,消息必须被正确路由到一个有效的队列,否则将返回一个错误
*/
define('AMQP_MANDATORY',1024);

/**
*Whenpublishingamessage,markthismessageforimmediateprocessingbythebroker.(Highprioritymessage.)
*当发布消息时候,这个消息将被立即处理.
*/
define('AMQP_IMMEDIATE',2048);

/**
*Ifsetduringacallto{@linkAMQPQueue::ack()},thedeliverytagistreatedas"uptoandincluding",sothatmultiple
*messagescanbeacknowledgedwithasinglemethod.Ifsettozero,thedeliverytagreferstoasinglemessage.
*IftheAMQP_MULTIPLEflagisset,andthedeliverytagiszero,thisindicatesacknowledgementofalloutstanding
*messages.
*当在调用AMQPQueue::ack时候设置这个标志,传递标签将被视为最大包含数量,以便通过单个方法标示多个消息为已收到,如果设置为0
*传递标签指向单个消息,如果设置了AMQP_MULTIPLE,并且传递标签是0,将所有未完成消息标示为已收到
*/
define('AMQP_MULTIPLE',4096);

/**
*Ifsetduringacallto{@linkAMQPExchange::bind()},theserverwillnotrespondtothemethod.Theclientshouldnotwait
*forareplymethod.Iftheservercouldnotcompletethemethoditwillraiseachannelorconnectionexception.
*当在调用AMQPExchange::bind()方法的时候,服务器将不响应请求,客户端将不应该等待响应,如果服务器无法完成该方法,将会抛出一个异常
*/
define('AMQP_NOWAIT',8192);

/**
*Ifsetduringacallto{@linkAMQPQueue::nack()},themessagewillbeplacedbacktothequeue.
*如果在调用AMQPQueue::nack方法时候设置,消息将会被传递回队列
*/
define('AMQP_REQUEUE',16384);

/**
*Adirectexchangetype.
*direct类型交换机
*/
define('AMQP_EX_TYPE_DIRECT','direct');

/**
*Afanoutexchangetype.
*fanout类型交换机
*/
define('AMQP_EX_TYPE_FANOUT','fanout');

/**
*Atopicexchangetype.
*topic类型交换机
*/
define('AMQP_EX_TYPE_TOPIC','topic');

/**
*Aheaderexchangetype.
*header类型交换机
*/
define('AMQP_EX_TYPE_HEADERS','headers');

/**
*socket连接超时设置
*/
define('AMQP_OS_SOCKET_TIMEOUT_ERRNO',536870947);


https://segmentfault.com/a/1190000004627137
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: