您的位置:首页 > 产品设计 > UI/UE

RabbitMQ 学习笔记(二):work queues

2017-07-13 13:52 375 查看

Work Queues



在第一个教程中,我们编写了一些程序来发送和接收来自一个已命名队列(queue)的消息。在这个教程中,我们将创建一个工作队列(work queue),用于在多个worker之间分配耗时的任务。

work queue(即任务队列)的主要思想是:避免立即执行资源密集型任务,不得不等待它完成。相反,我们计划稍后完成任务。我们将任务封装为消息,并将其发送到queue。在后台运行的工作进程将弹出任务并最终执行任务。当你运行许多worker时,任务将在他们之间共享

这个概念在web应用程序中尤其有用,因为在一个简短的HTTP请求窗口中无法处理复杂的任务。

准备工作

在本教程的前一部分中,我们发送了一个包含“Hello World”的消息。现在我们将发送用于复杂任务的字符串。我们没有一个真实的任务,比如图像被调整大小,或者pdf文件被渲染,所以让我们假装我们很忙——通过使用thread . sleep()函数来进行伪装。我们将把字符串中圆点“.”的数量作为它的复杂性;每一个点将占据“工作”的1秒。例如,Hello…需要三秒钟。

我们将稍微修改我们之前的示例中的Send.java,允许从命令行发送任意消息。这个程序将任务调度到我们的工作队列,所以让我们把它命名为newtask . java:

String message = getMessage(argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");


以下代码帮助我们从命令行参数获取消息:

private static String getMessage(String[] strings){
if (strings.length < 1)
return "Hello World!"<
4000
/span>;
return joinStrings(strings, " ");
}

private static String joinStrings(String[] strings,                                                             String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}


同样地,我们上一个教程中的Recv.java程序也需要一些更改:它需要在消息正文中为每个圆点“.”伪造一秒钟的工作。因为它将处理传递的消息并执行任务,所以让我们把它命名为worker . java:

final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope   envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
}
}
};
boolean autoAck = true;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);


我们的伪装任务是模拟执行时间:

private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}


最后,像笔记一中一样将它们编译(同样使用工作目录中的jar文件和环境变量CP):

javac -cp $CP NewTask.java Worker.java

循环调度

使用任务队列的优点之一是能够轻松地并行工作。如果我们积累了大量的工作,我们可以增加更多的worker,这样就可以轻易地扩大规模

首先,让我们尝试同时运行两个worker。他们将从queue中获取消息,但具体如何实现?让我们来看看。

你需要有三个控制台(linux下可打开3个终端)。两个将用于运行worker程序。这两个控制台将是我们的两个消费者——C1和C2。

# shell 1

java -cp $CP Worker

# => [*] Waiting for messages. To exit press CTRL+C

# shell 2

java -cp $CP Worker

# => [*] Waiting for messages. To exit press CTRL+C

然后,我们用生产者发布新的任务(这就用到我们的第3个控制台)。一旦你启动了消费者,你就可以发布一些信息,例如这样:

# shell 3

java -cp $CP NewTask

# => First message.

java -cp $CP NewTask

# => Second message..

java -cp $CP NewTask

# => Third message…

java -cp $CP NewTask

# => Fourth message….

java -cp $CP NewTask

# => Fifth message…..

让我们看看我们两个worker都收到了什么:

java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'


java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'


默认情况下,RabbitMQ将依次向下一个用户发送每个消息,所以平均每个消费者将得到相同数量的消息。这种分发消息的方式称为round-robin(循环调度)。试着用三个或更多的worker来试一试。

消息确认

完成某些任务可能需要几秒钟。你可能会想,如果其中一个消费者开始了一项很长的任务,并且只完成了部分任务,会发生什么。使用当前的代码,一旦RabbitMQ将消息传递给客户,它将立即从内存中删除它。在这种情况下,如果你kill了一个worker,我们不仅将失去它处理的信息,还将丢失发送给了这个特定worker,但还没有被处理的所有消息

但我们不想失去任何任务。如果一个worker终止,我们希望把它未完成的任务交给另一个worker

为了确保消息不会丢失,RabbitMQ支持消息确认。使用者将发送一个ack(nowledgement)给RabbitMQ,告诉RabbitMQ某一任务已被接收、处理,然后RabbitMQ可以自由地从内存中删除该任务内容

如果一个worker死亡(它的通道关闭,连接关闭,或者TCP连接丢失)而不发送ack,RabbitMQ将理解其为有一个任务没有被完全处理,并且将该任务重新排队。如果同时有其他消费者在线,它会很快把它转送给另一个消费者。这样你就能确保没有任务丢失,即使有些worker偶尔会死亡。

即使处理消息需要很长时间,RabbitMQ将在消费者死亡时重新传递消息。因此,没有任何消息会超时。

消息确认在默认情况下开启。在前面的例子中,我们明确地将它们通过令autoAck = true关闭。现在是时候把这个flag设置为false了,一旦我们完成了任务,就向worker发出适当的消息确认

channel.basicQos(1); //同一时间只接受一个任务,接下来会讲。

final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false; //消息确认的flag,下边是设置。
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);


使用此代码,我们可以确信即使您在处理消息时使用CTRL + C杀死了一个worker,也不会丢失任何东西。在这个worker死后不久,所有未确认的信息将被重新发送

消息的耐久性

我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止了,我们的任务仍然会丢失。

当RabbitMQ**退出或崩溃时,它将会丢失所有队列和消息,除非你告诉它不要这样做。需要做两件事来确保消息不会丢失:我们需要将队列和消息都标记为持久的**。

首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为了做到这一点,我们需要声明它是持久的:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
//第二个参数即关乎队列的耐久与否性质。


虽然这个命令本身是正确的,但是在我们现在的设置中它是无效的。这是因为我们之前已经定义了一个名为hello的队列,而之前定义时,它不是持久的。RabbitMQ不允许你重新定义具有不同参数的现有队列,并将错误返回任何试图执行此操作的程序。但是这里有一个快速解决方案——让我们用不同的名称声明一个新队列,例如task_queue:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);


这个队列声明( queueDeclare)更改需要同时应用到生产者和消费者的代码中

此时,我们确保了即使RabbitMQ**重新启动,task_queue队列也不会丢失。现在,我们需要将消息标记为持久性——通过将MessageProperties(实现了之前的BasicProperties设置)成PERSISTENT_TEXT_PLAIN。

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
// 第三个参数设置消息的持久性。


注意,关于消息的持久性:

将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘,但是仍然存在一个短暂的时间窗口,RabbitMQ接受消息并没有保存它。而且,RabbitMQ并不是对每条消息执行保存到磁盘——它可能只保存到缓存,而不是真正写入磁盘。持久性保证不强,但是对于我们简单的任务队列来说已经足够了。如果您需要更强的保证,那么您可以使用 publisher confirms(发布者确认)。

平均分配

您可能已经注意到,分派任务仍然不像我们想的那样工作。例如,在两个worker的情况下,当所有排在奇数位置的消息都是沉重的,排在偶数位置的消息都是简单的时候,一个worker将会一直处于忙碌状态,而另一个worker几乎不做任何工作。嗯,但RabbitMQ对此一无所知,依旧会均匀地分派任务

这是因为,在某一任务进入队列时,RabbitMQ只是会发送该任务。它不考虑消费者未确认的任务的数量。它只是盲目地向n个消费者发送n个任务。



为了防止这样,我们可以使用basicQos方法和设置prefetchCount = 1。这将告诉RabbitMQ不要一次给worker发送一个以上的任务。换句话说,就是在处理并确认前一个任务之前,不要向worker发送新任务。相反,RabbitMQ会把它分派给下一个空闲的worker

int prefetchCount = 1;
channel.basicQos(prefetchCount);


注意队列大小

如果所有的worker都很忙,你的队列就会排满。你会想要关注这个问题,可能会增加更多的worker,或者有其他的策略。

最后,将以上代码和上次教程的jar包放在同一目录在一起,进行编译运行。

NewTask.java 最终代码如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

String message = getMessage(argv);

channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
me
c52c
ssage.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");

channel.close();
connection.close();
}

private static String getMessage(String[] strings) {
if (strings.length < 1)
return "Hello World!";
return joinStrings(strings, " ");
}

private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}


Worker.java最终代码如下:

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {

private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

channel.basicQos(1);

final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
}

private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}


通过使用消息确认和预取计数,你可以设置工作队列。即使RabbitMQ重新启动,持久性选项仍然允许任务存活并被处理

相关链接

rabbitmq-c++(SimpleAmqpClient) 笔记代码系列:

rabbitmq-c++(SimpleAmqpClient) 笔记代码一

rabbitmq-c++(SimpleAmqpClient) 笔记代码二

rabbitmq-c++(SimpleAmqpClient) 笔记代码三

rabbitmq-c++(SimpleAmqpClient) 笔记代码四

rabbitmq-c++(SimpleAmqpClient) 笔记代码五

rabbitmq-c++(SimpleAmqpClient) 笔记代码六

RabbitMQ 学习笔记系列:

RabbitMQ 学习笔记(一):简单介绍及”Hello World”

RabbitMQ 学习笔记(二):work queues

RabbitMQ 学习笔记(三):Publish/Subscribe

RabbitMQ 学习笔记(四):Routing

RabbitMQ 学习笔记(五):Topics

RabbitMQ 学习笔记(六):RPC
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: