Tornado与Rabbitmq消息服务的集成方法
2016-06-23 11:51
369 查看
#-*- coding:utf-8 -*-
#!/usr/bin/python
import json
import redis
import logging
try:
import pika
from pika.adapters.tornado_connection import TornadoConnection
except ImportError:
pika = None
try:
import tornado
import tornado.ioloop
except ImportError:
tornado = None
logger = logging.getLogger('main.recieve_tornado')
class PikaClient(object):
callbacks = {}
def __init__(self):
if tornado is None:
raise Exception('You must add tornado to your requirements!')
if pika is None:
raise Exception('You must add pika to your requirements!')
self.ioloop = tornado.ioloop.IOLoop.instance()
self.connection = None
self.channel = None
self._delivery_tag = 0
self.parameters = pika.URLParameters("amqp://admin:admin@192.167.1.101:5672/todo")
def connect(self):
self.connection = TornadoConnection(self.parameters, on_open_callback=self.on_connected, stop_ioloop_on_close=False)
self.connection.add_on_close_callback(self.on_closed)
def on_connected(self, connection):
logger.info('PikaClient: connected to RabbitMQ')
self.connection.channel(self.on_exchange_declare)
def on_exchange_declare(self, channel):
logger.info('PikaClient: Channel %s open, Declaring exchange' % channel)
self.channel = channel
self.channel.exchange_declare(self.on_queue_declare, exchange='compute', type='fanout')
def on_queue_declare(self, method_frame):
logger.info('PikaClient: Channel open, Declaring queue')
self.channel.queue_declare(self.on_queue_bind, queue='compute_queue', durable=True)
def on_queue_bind(self, method_frame):
logger.info('Queue bound')
self.channel.queue_bind(self.on_consume_bind, queue="compute_queue", exchange="compute", routing_key="compute_queue")
def on_consume_bind(self, frame):
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.on_response, queue='compute_queue', no_ack=False)
def on_response(self, channel, method, properties, body):
message=json.loads(body)
userID = message.get("userId","No User")
userID = userID.lower()
self.con.set(userID, body)
channel.basic_ack(delivery_tag = method.delivery_tag)
logger.info('Recieve a new Message: '+message)
def on_closed(self, connection):
logger.info('PikaClient: rabbit connection closed')
self.connection.close()
self.channel.close()
self.ioloop.stop()
#常用的调用方式:
#-*- coding:utf-8 -*-
import os
import tornado.web
from url import urls
from models.recieve import PikaClient
class Application(tornado.web.Application):
def __init__(self):
self.recieve = PikaClient()
self.recieve.connect()
handlers = urls
settings = dict(
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
cookie_secret="this_is_cookie_code",
xsrf_cookies=False,
login_url="/",
debug=True
)
tornado.web.Application.__init__(self, handlers, **settings)
#!/usr/bin/python
import json
import redis
import logging
try:
import pika
from pika.adapters.tornado_connection import TornadoConnection
except ImportError:
pika = None
try:
import tornado
import tornado.ioloop
except ImportError:
tornado = None
logger = logging.getLogger('main.recieve_tornado')
class PikaClient(object):
callbacks = {}
def __init__(self):
if tornado is None:
raise Exception('You must add tornado to your requirements!')
if pika is None:
raise Exception('You must add pika to your requirements!')
self.ioloop = tornado.ioloop.IOLoop.instance()
self.connection = None
self.channel = None
self._delivery_tag = 0
self.parameters = pika.URLParameters("amqp://admin:admin@192.167.1.101:5672/todo")
def connect(self):
self.connection = TornadoConnection(self.parameters, on_open_callback=self.on_connected, stop_ioloop_on_close=False)
self.connection.add_on_close_callback(self.on_closed)
def on_connected(self, connection):
logger.info('PikaClient: connected to RabbitMQ')
self.connection.channel(self.on_exchange_declare)
def on_exchange_declare(self, channel):
logger.info('PikaClient: Channel %s open, Declaring exchange' % channel)
self.channel = channel
self.channel.exchange_declare(self.on_queue_declare, exchange='compute', type='fanout')
def on_queue_declare(self, method_frame):
logger.info('PikaClient: Channel open, Declaring queue')
self.channel.queue_declare(self.on_queue_bind, queue='compute_queue', durable=True)
def on_queue_bind(self, method_frame):
logger.info('Queue bound')
self.channel.queue_bind(self.on_consume_bind, queue="compute_queue", exchange="compute", routing_key="compute_queue")
def on_consume_bind(self, frame):
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.on_response, queue='compute_queue', no_ack=False)
def on_response(self, channel, method, properties, body):
message=json.loads(body)
userID = message.get("userId","No User")
userID = userID.lower()
self.con.set(userID, body)
channel.basic_ack(delivery_tag = method.delivery_tag)
logger.info('Recieve a new Message: '+message)
def on_closed(self, connection):
logger.info('PikaClient: rabbit connection closed')
self.connection.close()
self.channel.close()
self.ioloop.stop()
#常用的调用方式:
#-*- coding:utf-8 -*-
import os
import tornado.web
from url import urls
from models.recieve import PikaClient
class Application(tornado.web.Application):
def __init__(self):
self.recieve = PikaClient()
self.recieve.connect()
handlers = urls
settings = dict(
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
cookie_secret="this_is_cookie_code",
xsrf_cookies=False,
login_url="/",
debug=True
)
tornado.web.Application.__init__(self, handlers, **settings)
相关文章推荐
- java结合WebSphere MQ实现接收队列文件功能
- Tornado Web服务器中处理空白字符的解决方案
- python为tornado添加recaptcha验证码功能
- 剖析Python的Tornado框架中session支持的实现代码
- Tornado Web服务器多进程启动的2个方法
- 高性能web服务器框架Tornado简单实现restful接口及开发实例
- Tornado服务器中绑定域名、虚拟主机的方法
- Python Web框架Tornado运行和部署
- Python Web服务器Tornado使用小结
- tornado捕获和处理404错误的方法
- Web服务器框架 Tornado简介
- tornado把static_path指向到七牛
- 在Mopaas上部署WSGI类型的(Django, Tornado, Flask)Python应用
- tornado 源码初识
- tornado 源码分析 之 异步io的实现方式
- 实践,用tornado实现自定义协议server
- redis集群搭建
- RocketMQ client客户端模块源码分析一(生产者)
- JMS-使用消息队列优化网站性能
- 架构优化 - 应用,MQ Broker,业务处理分层