RabbitMQ_消息队列基本使用_1
什么叫消息队列
消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。
消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
为何用消息队列
从上面的描述中可以看出消息队列是一种应用间的异步协作机制,那什么时候需要使用 MQ(Message Queue)呢?
以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。
在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。
这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。
RabbitMQ
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性上表现优秀。使用消息中间件利于应用之间的解耦,生产者(客户端)无需知道消费者(服务端)的存在。而且两端可以使用不同的语言编写,大大提供了灵活性。
详见: 官方文档
RabbitMQ安装
for Linux: 安装配置epel源 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 安装erlang $ yum -y install erlang 安装RabbitMQ $ yum -y install rabbitmq-server 注意:service rabbitmq-server start/stop
for Mac: bogon:~ yuan$ brew install rabbitmq bogon:~ yuan$ export PATH=$PATH:/usr/local/sbin bogon:~ yuan$ rabbitmq-server
(windows版本直接官网下载即可)
rabbitMQ工作模型
简单模式
# ######################### 生产者 ######################### #!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
# ########################## 消费者 ########################## connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume( callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
相关参数
(1)no-ack = False
如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
- 回调函数中的
ch.basic_ack(delivery_tag=method.delivery_tag)
- basic_comsume中的
no_ack=False
消息接收端应该这么写:
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.211.55.4')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print 'ok' ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
(2) durable :消息不丢失
#!/usr/bin/env python import pika import uuid class FibonacciRpcClient(object): def __init__(self): ”“” 客户端启动时,创建回调队列,会开启会话用于发送RPC请求以及接受响应 “”“ # 建立连接,指定服务器的ip地址 self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) # 建立一个会话,每个channel代表一个会话任务 self.channel = self.connection.channel() # 声明回调队列,再次声明的原因是,服务器和客户端可能先后开启,该声明是幂等的,多次声明,但只生效一次 result = self.channel.queue_declare(exclusive=True) # 将次队列指定为当前客户端的回调队列 self.callback_queue = result.method.queue # 客户端订阅回调队列,当回调队列中有响应时,调用`on_response`方法对响应进行处理; self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) # 对回调队列中的响应进行处理的函数 def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body # 发出RPC请求 def call(self, n): # 初始化 response self.response = None #生成correlation_id self.corr_id = str(uuid.uuid4()) # 发送RPC请求内容到RPC请求队列`rpc_queue`,同时发送的还有`reply_to`和`correlation_id` self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) # 建立客户端 fibonacci_rpc = FibonacciRpcClient() # 发送RPC请求 print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)客户端
推荐阅读: RabbitMQ系列
- [官网翻译]RabbitMQ基本消息队列使用
- RabbitMQ_消息队列基本使用_2
- 使用rabbitmq消息队列
- 消息队列 RabbitMQ 与 Spring 整合使用的实例代码
- 基于Python语言使用RabbitMQ消息队列(三)
- RabbitMQ消息队列(六):使用主题进行消息分发
- 使用专业的消息队列产品rabbitmq之centos7环境安装
- 使用rabbitmq消息队列
- 基于PHP使用rabbitmq实现消息队列
- 在C#中使用消息队列RabbitMQ
- 基于PHP使用rabbitmq实现消息队列
- 消息队列 RabbitMQ 与 Spring 整合使用
- RabbitMq六种使用模式(1)_直接指定消息接收队列
- 关于消息队列的使用----ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
- 关于消息队列的使用----ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
- C#.NET使用消息队列RabbitMQ
- RabbitMQ消息队列(六):使用主题进行消息分发
- RabbitMQ学习系列二:.net 环境下 C#代码使用 RabbitMQ 消息队列
- 消息队列 RabbitMQ 与 Spring 整合使用