您的位置:首页 > 其它

Tornado与Rabbitmq消息服务的集成方法

2016-06-23 11:51 369 查看
#-*- coding:utf-8 -*-

#!/usr/bin/python

import json

import redis

import logging

try:

    import pika

    from pika.adapters.tornado_connection import TornadoConnection

except ImportError:

    pika = None

try:

    import tornado

    import tornado.ioloop

except ImportError:

    tornado = None

logger = logging.getLogger('main.recieve_tornado')

class PikaClient(object):

    callbacks = {}

    def __init__(self):

        if tornado is None:

            raise Exception('You must add tornado to your requirements!')

        if pika is None:

            raise Exception('You must add pika to your requirements!')

        self.ioloop = tornado.ioloop.IOLoop.instance()

        self.connection = None

        self.channel = None

        self._delivery_tag = 0

        self.parameters = pika.URLParameters("amqp://admin:admin@192.167.1.101:5672/todo")

    def connect(self):

        self.connection = TornadoConnection(self.parameters, on_open_callback=self.on_connected, stop_ioloop_on_close=False)

        self.connection.add_on_close_callback(self.on_closed)

    def on_connected(self, connection):

        logger.info('PikaClient: connected to RabbitMQ')

        self.connection.channel(self.on_exchange_declare)

    def on_exchange_declare(self, channel):

        logger.info('PikaClient: Channel %s open, Declaring exchange' % channel)

        self.channel = channel

        self.channel.exchange_declare(self.on_queue_declare, exchange='compute', type='fanout')

    def on_queue_declare(self, method_frame):

        logger.info('PikaClient: Channel open, Declaring queue')

        self.channel.queue_declare(self.on_queue_bind, queue='compute_queue', durable=True)

    def on_queue_bind(self, method_frame):

        logger.info('Queue bound')

        self.channel.queue_bind(self.on_consume_bind, queue="compute_queue", exchange="compute", routing_key="compute_queue")

    def on_consume_bind(self, frame):

        self.channel.basic_qos(prefetch_count=1)

        self.channel.basic_consume(self.on_response, queue='compute_queue', no_ack=False)

    def on_response(self, channel, method, properties, body):

        message=json.loads(body)

        userID = message.get("userId","No User")

        userID = userID.lower()

        self.con.set(userID, body)

        channel.basic_ack(delivery_tag = method.delivery_tag)

        logger.info('Recieve a new Message: '+message)

    def on_closed(self, connection):

        logger.info('PikaClient: rabbit connection closed')

        self.connection.close()

        self.channel.close()

        self.ioloop.stop()

#常用的调用方式:

#-*- coding:utf-8 -*-

import os

import tornado.web

from url import urls

from models.recieve import PikaClient

class Application(tornado.web.Application):

    def __init__(self):

        self.recieve = PikaClient()

        self.recieve.connect()

        handlers = urls

        settings = dict(

            template_path=os.path.join(os.path.dirname(__file__), "templates"),

            static_path=os.path.join(os.path.dirname(__file__), "static"),

            cookie_secret="this_is_cookie_code",

            xsrf_cookies=False,

            login_url="/",

            debug=True

            )

        tornado.web.Application.__init__(self, handlers, **settings)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Tornado MQ