RabbitMQ消息通信,生产者发送消息给指定的消费者的消息队列
2017-11-10 21:37
561 查看
上一篇文章描述了,通过使用广播式的通信方式,让生产者把消息广播给每一个消费者,本节我们介绍另外一个方式,生产者可以指定消费者,把消息发送给它:
client.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
#这里指定exchange的名字为direct_log,类型为direct(直接的,指定的)
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
#在这里我们创建一个变量用于介绍routing_key的名字,
#这样我们在发送消息的时候可以指定把消息发送给某一个消费者对应的消息队列
#默认的消费者绑定的消息队列为info
severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
server.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
#这里的exchange转换类型为direct(直接的,也可以理解为指定的)
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
#这里设置需要从哪个消息队列里面获取消息,任意字符串即可
#这里我们加入了提示,可以是info warning error也可以是其他的
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
#这里指定exchange的名字为direct_log,与client相对应
#指定routint_key为运行时指定的消息队列的名称
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()最后我们通过现象更加深入的了解这种通信方式:
在一个终端1运行python server.py info
在一个终端2运行python server.py warning
在一个终端3运行python server.py error
这样就会有三个消息队列,分别是info warning error,
然后执行python client.py info test_message 或python client.py test_message,这样终端1就会收到test_message消息
然后执行python client.py warning test_message,这样终端2就会收到test_message消息
然后执行python client.py error test_message,这样终端3就会收到test_message消息
更多信息请查询RabbitMQ官网:http://www.rabbitmq.com
client.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
#这里指定exchange的名字为direct_log,类型为direct(直接的,指定的)
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
#在这里我们创建一个变量用于介绍routing_key的名字,
#这样我们在发送消息的时候可以指定把消息发送给某一个消费者对应的消息队列
#默认的消费者绑定的消息队列为info
severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
server.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
#这里的exchange转换类型为direct(直接的,也可以理解为指定的)
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
#这里设置需要从哪个消息队列里面获取消息,任意字符串即可
#这里我们加入了提示,可以是info warning error也可以是其他的
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
#这里指定exchange的名字为direct_log,与client相对应
#指定routint_key为运行时指定的消息队列的名称
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()最后我们通过现象更加深入的了解这种通信方式:
在一个终端1运行python server.py info
在一个终端2运行python server.py warning
在一个终端3运行python server.py error
这样就会有三个消息队列,分别是info warning error,
然后执行python client.py info test_message 或python client.py test_message,这样终端1就会收到test_message消息
然后执行python client.py warning test_message,这样终端2就会收到test_message消息
然后执行python client.py error test_message,这样终端3就会收到test_message消息
更多信息请查询RabbitMQ官网:http://www.rabbitmq.com
相关文章推荐
- RabbitMQ-理解消息通信-消费者和生产者
- RabbitMQ消息通信,一个生产者和多个消费者
- RabbitMQ消息通信,一个生产者和多个消费者,广播式消息通信
- RabbitMQ 消息发送和消息获取 之 rabbitMQ消息生产者和消费者
- 互斥锁和条件变量(2)——生产者和消费者(发送消息,循环队列执行)
- java线程间通信[实现不同线程之间的消息传递(通信),生产者和消费者模型]
- Disruptor多个消费者不重复处理生产者发送的消息的demo
- spring JMS、activemq中消费者收不到生产者发送的消息的原因解析
- metaq生产者发送消息找不到指定partition调查
- kafka(java客户端)消费者取不到消息,生产者消息也没发送成功
- RabbitMQ消息队列之二:消费者和生产者 Demo
- RabbitMQ消息队列之二:消费者和生产者
- Disruptor多个消费者不重复处理生产者发送过来的消息
- Java——定时请求后端接口数据发送RabbitMQ消息队列到指定MQ服务器
- 消息队列_RabbitMQ-0002.深入MQ生产者/信道/交换机/队列/消费者?
- RabbitMQ消息队列生产者和消费者
- Java Socket TCP 通信,实现聊天室,服务器端指定客户端发送消息
- Java Smack SDK 结合 Openfire服务器,建立IM通信,发送聊天消息
- delphi 发送鼠标点击消息到指定窗口