您的位置:首页 > 其它

rabbitmq学习笔记

2016-02-03 18:41 363 查看
前一阵子写SEDA异步框架的时候,使用了rabbitmq作为分布式消息队列(见前文),把一些学习官网教程的笔记简单备忘。示例都来自官网

 

Part 2 Work Queues

 

1、round-robin dispatchering  
缺陷:存在不能准确负载均衡的弊端
2、fair dispatch  --> 针对管道
采用channel.basicQos(prefetchCount); 缺陷:可能带来队列塞
3、message acknowledge  --> 针对消费者
boolean autoAck = false;channel.basicConsume("hello", autoAck, consumer);//默认是true,别用反了
while (true) {
  QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  //...     
  channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// 记得手动加上发送ack。不然server会认为client没收到
}
4、Message durability  --> 针对队列
采用boolean durable = true;channel.queueDeclare("hello", durable, false, false, null);
5、persistent --> 针对发布者
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
 
 
example:
 
 

public class NewTask {

private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] argv)
throws java.io.IOException {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

String message = getMessage(argv);

channel.basicPublish( "", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

channel.close();
connection.close();
}
//...
}

 
public class Worker {

private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());

System.out.println(" [x] Received '" + message + "'");
doWork(message);
System.out.println(" [x] Done" );

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
//...
}

 
Part 3 Publish/Subscribe
 

1、Exchanges(类似block)
type:direct, topic, headers, fanout(类似广播)
Declared way:  channel.exchangeDeclare("logs", "fanout");
 
2、Temporary queue
Non-durable, exclusive, autodelete
String queueName = channel.queueDeclare().getQueue();
 
3、Bindings
channel.queueBind(queueName, "logs", "");
 
example:
 
 
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
//...
}

public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}

 
 

Part 4 Routing


1、Direct exchange
type:direct, topic, headers, fanout(类似广播)
Declared way:  channel.exchangeDeclare("logs", "fanout");
 
2、Multiple bindings
 
example:
 
 
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String severity = getSeverity(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, severity, null,    message.getBytes());
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
channel.close();
connection.close();
}
//..
}

public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1){
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
}
}

 
 

Part 5 Topic

1、Topic exchange
比direct exchange方式好的地方是有通配符的概念在里面
* (star) can substitute for exactly one word.
# (hash) can substitute for zero or more words.
 
Delimited by dots
 
Routing_key up to the limit of 255 bytes
 
example:
 
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv)
throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = getRouting(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
connection.close();
}
//...
}

public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv)
throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1){
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
for(String bindingKey : argv){
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
}
}

 
集群方案:
一般采用镜像队列,内存节点作为主服务器+磁盘节点做冗余。

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: