您的位置:首页 > 其它

rabbitmq 学习笔记

2012-07-11 21:54 337 查看
1。消息会处于阻塞状态,可以通过(man rabbitmqctl 可以获得更多使用方法,常用的有list_queues,list_consumers.list_connections,close_connection,add_vhost,...)

list_channels pid connection ; close_connection [connection] "" ,查看和解决阻塞

rabbitmqctl list_queues -p [vhost] name messages_ready messages_unacknowledged


解决阻塞的办法,可以在subscribe消息队列是设置autoAck=true,这样会避免消息队列中消息阻塞,这种情况是worker接到消息后,就会把消息从消息队列删除,不管消息是否被正确处理,另一种是设置autoAck=false,这样worker在接受消息后,必须给予服务端一个ack响应,该消息才会从消息队列中删除,这样会防止消息的意外丢失,但要注意的是,消息队列如果没有接收到ack响应,该消息对了的消息就会一直阻塞,对于rabbitmq-server来说,他是没有超时存在的,即除非重启rabbitmq,否则该消息队列会一直阻塞,直到收到响应,但如果与该消息队列的subscirbe断开的话,则表明过期,即该消息队列中消息会尝试重新发消息给一个订阅者进行处理。

2。关于健壮的消息处理
当rabbitmq server重启,或意外当掉的话,所用消息的订阅都会跟着坏掉(当然也可以设置持久化的消息队列设置),解决办法是捕获ShutdownSignalException异常(对rabbitmq)出现该异常说明消息服务无法连到,故可以进行相应的处理,另每次消息发送,消息订阅之前都要进行一次消息队列,exchange,绑定的重定义,防止消息对了重启后改消息队列/exchang已消失。

组合命令

list_consumers -p [vhost]
list_channels -p [vhost] pid connection
list_connections
close_connection


3。关于消息处理的管理
每个worker都应该有一个可标识的tag,尽量不使用系统生成的,这样便于以后的debug

Connection conn = null;
Channel channel = null;
QueueingConsumer consumer = null;
try {
conn = qFactory.newConnection();
channel = conn.createChannel();
channel.queueDeclare(queueName, false, false, false, null);

/**
* This tells RabbitMQ not to give more than one message to a
* worker at a time. Or, in other words, don't dispatch a new
* message to a worker until it has processed and acknowledged
* the previous one. Instead, it will dispatch it to the next
* worker that is not still busy
*/
channel.basicQos(1);

consumer = new QueueingConsumer(channel);
//Use hostname as consume tagname , So that We can monitor who consume this Queue
channel.basicConsume(this.queueName, false, hostName, consumer);
} catch (IOException e) {
e.printStackTrace();
}

while (true) {
try {
//Get next message
delivery = consumer.nextDelivery();
} catch (ShutdownSignalException e) {
//If rabbitmq-server has closed , out of loop
e.printStackTrace();
isSignalBroken = true ;
break;
} catch (ConsumerCancelledException e) {
e.printStackTrace();
log.warn("The consumer has cancelled , Try to re-consume");
//If the channel and conn have closed .
try{
//Sleep 1s and reconnect to rabbitmq-server
Thread.sleep(1000);
conn = qFactory.newConnection();
channel = conn.createChannel();
channel.queueDeclare(queueName, false, false, false, null);

channel.basicQos(1);

consumer = new  QueueingConsumer(channel);
channel.basicConsume(this.queueName, false, consumer);
continue;
}catch (IOException e1) {
e1.printStackTrace();
} catch (InterruptedException e2) {
e2.printStackTrace();
}

} catch (InterruptedException e) {
e.printStackTrace();
}

try{

//process message

}catch (Exception e) {
//If throw exception when process message , close channel and conn , make sure this message not block . then re-work
e.printStackTrace();
try {
channel.basicCancel(hostName);
channel.close();
conn.close();
} catch (IOException e1) {
e1.printStackTrace();
}
continue;
}

/**
* If a consumer dies without sending an ack, RabbitMQ will
* understand that a message wasn't processed fully and will
* redeliver it to another consumer
* There aren't any message timeouts;
* RabbitMQ will redeliver the message only when
* the worker connection dies. It's fine even if processing
* a message takes a very, very long time
*/
try {
channel.basicAck(delivery.getEnvelope()
.getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
}
//If because of the rabbitmq-server stop ,We will re-try connect to rabbtimq-server after 60s
if(isSignalBroken){
log.warn("The rabbitmq Server have broken , We Try to re-connect again After 60 seconds");
try {
Thread.sleep(1000*60);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.run();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: