您的位置:首页 > 运维架构 > Apache

RocketMQ消息队列还没入门就想放弃

2018-10-19 21:46 281 查看

钉钉、微博极速扩容黑科技,点击观看阿里云弹性计算年度发布会!>>>

20170712182011089.gif

题外话

什么情况下的异步操作需要使用消息队列而不是多线程?

  • 消息队列和多线程两者并不冲突,多线程可以作为队列的生产者和消费者。
    使用外部的消息队列时,第一是可以提高应用的稳定性,当程序fail后,已经写入外部消息队列的数据依旧是保存的,如果使用两步commit的队列的话,可以更加提高这个项目。

  • 用线程的话,会占用主服务器资源, 消息队列的话, 可以放到其他机器上运行, 让主服务器尽量多的服务其他请求。我个人认为, 如果用户不急着知道结果的操作, 用消息队列, 否则再考虑用不用线程。

  • 解耦更充分,架构更合理

  • 多线程是在编程语言层面解决问题

  • 消息队列是在架构层面解决问题
    我认为架构层面解决问题是“觉悟比较高的方式“,理想情况下应该限制语言层面滥用多线程,能不用就不用

  • 不关心执行结果的都可以放到消息队列,不需要及时到达,放到消息队列中慢慢消化

  • 批量发送邮件时,数据量庞大,如果使用多线程对系统不安全

  • 消息队列能解决什么问题

    • 异步处理

    • 应用解耦

    • 流量削锋

    • 日志处理

    • 消息通讯

    # 环境介绍

    注意尽量将rocketmq的1.应用版本,2.jar包依赖,3.recketmq-console-ng的jar包依赖版本保持一致,不然可能会出现非常诡异的问题
    此项目所使用版本: rocketmq: 4.3.0,OS: win10

    1. jar包依赖

    compile group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.3.0'
    1. 下载 rocketmq应用
      http://rocketmq.apache.org/release_notes/release-notes-4.3.0/

    2. windows下rocketmq环境配置与启动
      参考 https://www.jianshu.com/p/4a275e779afa

      • 在rocketmq的bin目录下启动NAMESERVER(相当于服务注册中心)  
        start mqnamesrv.cmd

      • 启动 broker(真正工作的服务器,存储消息的服务器)
        start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

    3. 可视化rocketmq管理项目下载
      https://github.com/apache/rocketmq-externals.git

      • 将这个项目里面rocketmq-console-ng里的rocketmq依赖修改成与你项目依赖的版本一致,次项目是4.3.0

        image.png
    4. 第三步已经把rocketmq的nameServer与broker启动起来

    5. 启动rocket-console-ng可视化管理项目,该项目是基于springboot的

    6. 访问rocket-console-ng的服务地址

      image.png

    到此环境搭建完成!!!
    回到自己的程序↓↓↓

    # 配置信息

    ###producer
    #该应用是否启用生产者
    rocketmq:
      producer:
        isOnOff: on
        #发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
        groupName: ${spring.application.name}
        #mq的nameserver地址
        namesrvAddr: 127.0.0.1:9876
        #消息最大长度 默认1024*4(4M)
        maxMessageSize: 4096
        #发送消息超时时间,默认3000
        sendMsgTimeout: 3000
        #发送消息失败重试次数,默认2
        retryTimesWhenSendFailed: 2

      ###consumer
      ##该应用是否启用消费者
      consumer:
        isOnOff: on
        groupName: ${spring.application.name}
        #mq的nameserver地址
        namesrvAddr: 127.0.0.1:9876
        #该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
        topics: futaotopic~*;
        consumeThreadMin: 20
        consumeThreadMax: 64
        #设置一次消费消息的条数,默认为1条
        consumeMessageBatchMaxSize: 1

    reConsumerTimes: 3

    # 生产者Bean

    package com.futao.springmvcdemo.service.impl

    import com.futao.springmvcdemo.foundation.LogicException
    import com.futao.springmvcdemo.model.entity.constvar.ErrorMessage
    import com.futao.springmvcdemo.model.system.SystemConfig
    import com.futao.springmvcdemo.service.RocketMqService
    import org.apache.commons.lang3.StringUtils
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently
    import org.apache.rocketmq.client.producer.DefaultMQProducer
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel
    import org.slf4j.LoggerFactory
    import org.springframework.beans.factory.annotation.Value
    import org.springframework.context.annotation.Bean
    import org.springframework.stereotype.Service
    import java.nio.charset.Charset
    /**
     * @author futao
     * Created on 2018/10/18.
     */
    @Service
    open class RocketMqServiceImpl : RocketMqService {
        private val logger = LoggerFactory.getLogger(RocketMqServiceImpl::class.java)
    /**
         * 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
         */
        @Value("\${rocketmq.producer.groupName}")
        private lateinit var producerGroupName: String

        @Value("\${rocketmq.producer.namesrvAddr}")
        private lateinit var producerNamesrvAddr: String
        /**
         * 消息最大大小,默认4M
         */
        @Value("\${rocketmq.producer.maxMessageSize}")
        private var maxMessageSize: Int = 0
        /**
         * 消息发送超时时间,默认3秒
         */
        @Value("\${rocketmq.producer.sendMsgTimeout}")
        private var sendMsgTimeout: Int = 0
        /**
         * 消息发送失败重试次数,默认2次
         */
        @Value("\${rocketmq.producer.retryTimesWhenSendFailed}")
        private var retryTimesWhenSendFailed: Int = 0


        /**
         * 生产者Bean
         */
        @Bean
        override fun producer(): DefaultMQProducer {
            if (this.producerGroupName.isEmpty()) {
                throw LogicException.le(ErrorMessage.ROCKET_MQ_PRODUCER_GROUP_NAME_EMPTY)
            }
            if (this.producerNamesrvAddr.isEmpty()) {
                throw LogicException.le(ErrorMessage.ROCKET_MQ_PRODUCER_NAME_SERVER_EMPTY)
            }
            val defaultMQProducer = DefaultMQProducer(producerGroupName)
            defaultMQProducer.namesrvAddr = producerNamesrvAddr
            defaultMQProducer.maxMessageSize = maxMessageSize
            defaultMQProducer.sendMsgTimeout = sendMsgTimeout
            defaultMQProducer.isVipChannelEnabled = false
            //消息发送到mq服务器失败重试次数
            defaultMQProducer.retryTimesWhenSendFailed = retryTimesWhenSendFailed
            try {
                defaultMQProducer.start()
                logger.info("rocketMq Producer start success; nameServer:{},producerGroupName:{}", producerNamesrvAddr, producerGroupName)
            } catch (e: Exception) {
                logger.error("rocketMq Producer start fail;{}", e.message, e)
            }
            return defaultMQProducer
        }
    }

    # 消费者

    @Value("\${rocketmq.consumer.namesrvAddr}")
        private lateinit var consumerNamesrvAddr: String

        @Value("\${rocketmq.consumer.groupName}")
        private lateinit var consumerGroupName: String

        @Value("\${rocketmq.consumer.consumeThreadMin}")
        private var consumeThreadMin: Int = 0

        @Value("\${rocketmq.consumer.consumeThreadMax}")
        private var consumeThreadMax: Int = 0

        @Value("\${rocketmq.consumer.topics}")
        private lateinit var topics: String

        @Value("\${rocketmq.consumer.consumeMessageBatchMaxSize}")
        private var consumeMessageBatchMaxSize: Int = 0

    //    @Resource
    //    private lateinit var mqMessageListenerProcessor: MQConsumeMsgListenerProcessor


        @Value("\${reConsumerTimes}")
        private var reConsumerTimes: Int = 0


        /**
         * 消费者Bean
         */
        @Bean
        override fun consumer(): DefaultMQPushConsumer {
            val topic = SystemConfig.ROCKET_MQ_TOPIC_MAIL
            val tag = SystemConfig.ROCKET_MQ_TAG_MAIL_REGISTER
            if (this.consumerGroupName.isEmpty()) {
                throw LogicException.le(ErrorMessage.ROCKET_MQ_CONSUMER_GROUP_NAME_EMPTY)
            }
            if (this.consumerNamesrvAddr.isEmpty()) {
               1ff8   throw LogicException.le(ErrorMessage.ROCKET_MQ_PRODUCER_NAME_SERVER_EMPTY)
            }
            if (this.topics.isEmpty()) {
                throw LogicException.le(ErrorMessage.ROCKET_MQ_CONSUMER_TOPICS_EMPTY)
            }
            try {
                //DefaultMQPushConsumer DefaultMQPullConsumer
                val defaultMQPushConsumer = DefaultMQPushConsumer(consumerGroupName)
                defaultMQPushConsumer.namesrvAddr = consumerNamesrvAddr
                defaultMQPushConsumer.consumeThreadMin = consumeThreadMin
                defaultMQPushConsumer.isVipChannelEnabled = false
    //        defaultMQPushConsumer.createTopic()
                defaultMQPushConsumer.consumeThreadMax = consumeThreadMax
                //消费模式 集群还是广播,默认为集群(自动负载均衡)
                //广播消费: 消息会发给Consume Group中的每一个消费者进行消费,如果设置为广播消息会导致NOT_ONLINE异常,https://github.com/apache/rocketmq/issues/296
                defaultMQPushConsumer.messageModel = MessageModel.CLUSTERING
                // 设置消费模型,
                //consumer.setMessageModel(MessageModel.CLUSTERING);

                // * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
                // * 如果非第一次启动,那么按照上次消费的位置继续消费
                defaultMQPushConsumer.consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
                //设置一次消费消息的条数,默认为1条
                defaultMQPushConsumer.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize
                //订阅topic
                defaultMQPushConsumer.subscribe(topic, tag)

                //        defaultMQPushConsumer.registerMessageListener(mqMessageListenerProcessor)
                defaultMQPushConsumer.registerMessageListener(
                        MessageListenerConcurrently { msgs, _ ->
                            if (msgs == null || msgs.isEmpty()) {
                                logger.info("接受到的消息为空,不处理,直接返回成功")
                                return@MessageListenerConcurrently ConsumeConcurrentlyStatus.CONSUME_SUCCESS
                            }
                            val msg = msgs[0]
                            logger.info("接收到的消息为:" + msg.toString())
                            if (msg.topic == topic && msg.tags == tag) {
                                //判断该消息是否重复消费(RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重)
                                //获取该消息重试次数
                                if (msg.reconsumeTimes >= reConsumerTimes) {
                                    //消息已经重试了3次,如果不需要再次消费,则返回成功
                                    //TODO("如果重试了三次还是失败则执行对于失败的业务逻辑")
                                    logger.error("消息重试消费失败:", msg)
                                    return@MessageListenerConcurrently ConsumeConcurrentlyStatus.CONSUME_SUCCESS
                                } else {
                                    //如果失败重试次数还没到三次则继续重试
                                    ConsumeConcurrentlyStatus.RECONSUME_LATER
                                }
                                //TODO("开始正常的业务逻辑")
                                println(StringUtils.repeat(":", 30) + String(msg.body, Charset.forName(SystemConfig.UTF8_ENCODE)))
                            }
                            return@MessageListenerConcurrently ConsumeConcurrentlyStatus.CONSUME_SUCCESS    //消费成功
          2000 ;               }
                )
                defaultMQPushConsumer.start()
                logger.info("rocketMq Consumer start success; namesrvAddr:{},groupName:{},topics:{}", consumerNamesrvAddr, consumerGroupName, topics)
                return defaultMQPushConsumer
            } catch (e: Exception) {
                logger.error("rocketMq Consumer start fail;{}", e.message, e)
                return DefaultMQPushConsumer()
            }
        }

    # 简单测试

    • 发送注册邮件的topic与tag配置

    • 个人理解的topic: 一类业务可以归为一个topic,比如所有的发邮件功能

    • 个人理解的tag: 某类业务下的细分,比如发送邮件业务下的发送注册邮件可以使用一个tag,发送忘记密码邮件可以再使用一个tag

        /**
         * rocket mq 发送邮件的 topic
         */
        public static final String ROCKET_MQ_TOPIC_MAIL = "topic_mail";

        /**
         * rocket mq 发送邮件-注册邮件的tag
         */
        public static final String ROCKET_MQ_TAG_MAIL_REGISTER = "tag_mail_register";
    • 发送邮件消息队列Service

        @Resource
        lateinit var producer: DefaultMQProducer
    /**
         * 通过消息队列发送邮件
         */
        override fun sendMq(mailM: MailM) {
            val message = Message(SystemConfig.ROCKET_MQ_TOPIC_MAIL, SystemConfig.ROCKET_MQ_TAG_MAIL_REGISTER, JSON.toJSONString(mailM).toByteArray(Charset.forName(SystemConfig.UTF8_ENCODE)))
            try {
                producer.send(message)
            } catch (e: Exception) {
                logger.error(e.message, e)
            }
        }
    • 请求controller

    @GetMapping("sendMailMq")
        open fun sendMailMq() {
            val mailM = MailM().apply {
                to = arrayOf("1185172056@qq.com")
                cc = arrayOf("taof@wicrenet.com")
                subject = "消息队列"
                content = "<h1>您好,RocketMq</h1>"
            }
            mailService.sendMq(mailM)
        }
    • 在请求了controller之后可以在rocketmq-console-ng控制台查看到相应的topic与消息信息

    • topic

    image.png
    • 已发送到rocketmq服务器上的消息


    image.png
    • 查看消息状态


    image.png
    • 查看控制台

    image.png

    # 坑:

    image.png
    • 消息不能被消费使用RocketMq控制台resend提示NOT_CONSUME_YET:检查rocketmq应用版本,rocketmq-console-ng依赖版本,自己的项目依赖jar包版本是否一致

    • Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException:检查rocketmq应用版本,rocketmq-console-ng依赖版本,自己的项目依赖jar包版本是否一致

    • Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message:尝试把消费者的消费模式改成集群模式

    • NOT_CONSUME_YET:如果还是不能解决请不要使用公司的网络,公司的网络可能会有很多的限制,用自己的手机进行测试(我被这个网络给坑惨了)

    # 资源:

    Windows下安装RocketMq:https://www.jianshu.com/p/4a275e779afa
    RocketMq名词解释: https://my.oschina.net/javamaster/blog/2051703
    解释Push与Pull区别: https://www.jianshu.com/p/f071d5069059?utm_source=oschina-app
    官网:http://rocketmq.apache.org/
    windows下rocketmq的消息信息存储在 C:\Users\user\store文件夹下,删除该文件夹即可删除所有的消息

    项目完整代码:地址 https://gitee.com/FutaoSmile/springboot_framework

    FutaoSmile_springboot框架2.png


    周末愉快~

    本文分享自微信公众号 - 喜欢天文(AllUnderControl)。
    如有侵权,请联系 support@oschina.cn 删除。
    本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息