基于 RabbitMQ 构建一个类似 Resque 的作业处理系统
2018-08-28 16:12
931 查看
RabbitMQ的是一个复杂的野兽。
它灵活,强大,但也很难完全把控和掌握。
许多不同的使用情况和使用模式都可以建立在这个强大的软件之上,但在第一次尝试为一个特定的解决方案编写代码时,差错和设计错误也是司空见惯的事情。
在本文中,我将讨论 Palermo 的设计和实现,它是一个实现了可以将RabbitMQ用作底层排队机制构建的那些可能的使用模式其中之一:批处理作业处理系统。
通过批处理作业处理系统,我们引用了一种机制来由 作业程序从不同队列中提取作业的自动执行。客户可以排入新的作业到这些队列,它们最终将被传递给将执行作业的程序。如果在执行一个任务失败了,作业程序线程将会把失败的作业放入一个特殊的队列,它可以重新执行,或者检查出来进行错误调试。 首先,工作者采用显式确认的方式处理完消息后可以通知RabbitMQ.消息只有确认后才会从队列删除.如果工作者停止运行而没有确认消息,RabbitMQ会自动将消息重新排队.同时,服务质量(qos)配置可以告知RabbitMQ,发送给特定工作者的最大未确认消息数.如果设置了这个值,即从预取数量到1,只有最后一条消息被工作者进程确认,其他消息才能从队列发送出去.
通过这种方式,可以实现在工作者之间合理的分配作业.
我们仍然需要找到一种方式实现写内存的操作.RabbitMQ不同于其他队列排队系统的一个重要的方面,是强烈的订阅者和消费者分离的考量.在RabbitMQ体系结构中,这点是通过队列和订阅者交换的介绍来实现的.订阅者不知道消息的最终目的地,他们只知道一个交换的名字和路由关键字,这个关键字可以看作是发送消息的地址.针对不同的交换类型,RabbitMQ支持不同的语义,这将影响一个带有地址的消息匹配一个交换的方式.它将传递到实际的信箱(队列),消费者将从这里接收消息.在我们的例子中,写方法的第一个参数是队列名称。 队列名称也是工作者预测下一个内存中待处理的任务时使用的参数。Palermo中该方法下一个参数是一种直接交换类型。 在这种交换类型中,路由关键字必须和传输消息的队列名称匹配。
如果一条消息被发往 RabbitMQ 进行交换,并且没有队列连接到交换,那么这条消息会被丢弃或者被发往相关的备用交换。 为了避免这种情况, Palermo中一个发布者每次把一个新的任务加入到队列中时,在发送任务的数据之前,发布者会声明匹配路由关键字的队列。 通过这种方式,我们可以确定当没有工作者在等待处理任务时消息也不会被丢弃。 在 RabbitMQ中,使用相同的参数重新声明一个已存在的队列是一个合法的操作,且没有任何影响。使用之前为RabbitMQ所描述的设置,我们可以建立工作处理系统的核心。然而,某些特性,像失败事务的管理或者消息的序列化,它们不能被编址放到RabbitMQ的特性中。在Palermo中,这个特性已经被实现,并被作为一个附加的应用软件层,这个层可以被分配作为一个Java库。首要的问题是如何处理失败的事务。我们来看它是怎么(工作的),过去我们使用显式的人工确认,在人工处理或超时时,RabbitMQ将会处理致命的错误。这套机制也可以用于处理在事务处理逻辑中(产生的)任何类型的异常条件,除了我们期望的功能需求外,失败信息必定会发送给一个特别的失败事务队列,他们可以被查阅,移除和重入队。这个功能已经在Palermo中被完成,被包装为一个通用的Java事务用来处理try/catch块,并通过管道将失败信息添加到队列中,并添加一些关于事务的错误信息,(例如)重试次数和在消息元数据的头中的原始队列一并发送消息给RabbitMQ。序列化的问题,已经通过定义一个作业消息来陈述了,这个消息包含作业Java类的头信息,作业序列化参数和序列化类型.有关参数形象化的一小部分内容,通过一个人眼可识别的字符串,也随消息一起发送了. Palermo已经通过支持插件和去插件的方式,对系统进行不同的序列化,同时,包含一个基于JSON的序列化转换器.但是,默认的序列化技术是使用JBoss Serialization.只有作业的参数被序列化,并发送给工作者,作业类的字节码必须在工作者的class path中可用,以便正确执行作业.
Palermo工作者只是执行实际作业逻辑的一种通用方式,其中作业逻辑是封装在作业类的定义之中.Palermo的整个逻辑已经在Clojure编程语言中实现,但是,多亏Clojure Java inter-op特性,它可以在java代码或其他任意基于JVM的语言中使用.由于Palermo工作者线程在JVM中运行,它们从底层的操作系统中被隔离出来.像Resque工作者那样,集成在OS之中,很难实现,但是某种程度的集成,在可能的情况下,已经尝试过,如,为关联到Palermo工作者的RabbitMQ消费者的身份标识,使用进程标识符.一个命令行的接口也已经实现了,所以,新的工作者,通过脚本和使用者,可以很简单的启动.解决方案的最后一个组件是Palermo 系统中运行的一个web接口。 这只是一个使用了Palermo 类库自检特性的简单的web应用,它让人们更容易理解Palermo 系统的运行机制。 这个接口是Resque web接口的复制,在管理交换、队列、工作者方面,它可以作为RabbitMQ 通用接口的替代者。通过Palermo 类库,web接口提供的所有功能可以在任意java代码中进行使用。
考虑到所有之前我们能想到的, Palermo 只是薄薄的一层,它建立在一个特定的 RabbitMQ设置的顶部,并被封装成一个可重用的作业处理使用模式的实现类库。 相同的方法可能适用于不同的使用模式,这些模式使用 RabbitMQ作为底层引擎,这样可以在写入非特定应用程序的代码、处理设置和由 RabbitMQ特定配置而引入其余的复杂性方面节省时间。如果你现在在JAVA这条路上挣扎,也想在IT行业拿高薪,可以参加我们的训练营课程,选择最适合自己的课程学习,技术大牛亲授,7个月后,进入名企拿高薪。我们的课程内容有:Java工程化、高性能及分布式、高性能、深入浅出。高架构。性能调优、Spring,MyBatis,Netty源码分析和大数据等多个知识点。如果你想拿高薪的,想学习的,想就业前景好的,想跟别人竞争能取得优势的,想进阿里面试但担心面试不过的,你都可以来,q群号为:779792048注:加群要求1、具有1-5工作经验的,面对目前流行的技术不知从何下手,需要突破技术瓶颈的可以加。2、在公司待久了,过得很安逸,但跳槽时面试碰壁。需要在短时间内进修、跳槽拿高薪的可以加。3、如果没有工作经验,但基础非常扎实,对java工作机制,常用设计思想,常用java开发框架掌握熟练的,可以加。4、觉得自己很牛B,一般需求都能搞定。但是所学的知识点没有系统化,很难在技术领域继续突破的可以加。5.阿里Java高级大牛直播讲解知识点,分享知识,多年工作经验的梳理和总结,带着大家全面、科学地建立自己的技术体系和技术认知!
它灵活,强大,但也很难完全把控和掌握。
许多不同的使用情况和使用模式都可以建立在这个强大的软件之上,但在第一次尝试为一个特定的解决方案编写代码时,差错和设计错误也是司空见惯的事情。
在本文中,我将讨论 Palermo 的设计和实现,它是一个实现了可以将RabbitMQ用作底层排队机制构建的那些可能的使用模式其中之一:批处理作业处理系统。
通过批处理作业处理系统,我们引用了一种机制来由 作业程序从不同队列中提取作业的自动执行。客户可以排入新的作业到这些队列,它们最终将被传递给将执行作业的程序。如果在执行一个任务失败了,作业程序线程将会把失败的作业放入一个特殊的队列,它可以重新执行,或者检查出来进行错误调试。 首先,工作者采用显式确认的方式处理完消息后可以通知RabbitMQ.消息只有确认后才会从队列删除.如果工作者停止运行而没有确认消息,RabbitMQ会自动将消息重新排队.同时,服务质量(qos)配置可以告知RabbitMQ,发送给特定工作者的最大未确认消息数.如果设置了这个值,即从预取数量到1,只有最后一条消息被工作者进程确认,其他消息才能从队列发送出去.
通过这种方式,可以实现在工作者之间合理的分配作业.
我们仍然需要找到一种方式实现写内存的操作.RabbitMQ不同于其他队列排队系统的一个重要的方面,是强烈的订阅者和消费者分离的考量.在RabbitMQ体系结构中,这点是通过队列和订阅者交换的介绍来实现的.订阅者不知道消息的最终目的地,他们只知道一个交换的名字和路由关键字,这个关键字可以看作是发送消息的地址.针对不同的交换类型,RabbitMQ支持不同的语义,这将影响一个带有地址的消息匹配一个交换的方式.它将传递到实际的信箱(队列),消费者将从这里接收消息.在我们的例子中,写方法的第一个参数是队列名称。 队列名称也是工作者预测下一个内存中待处理的任务时使用的参数。Palermo中该方法下一个参数是一种直接交换类型。 在这种交换类型中,路由关键字必须和传输消息的队列名称匹配。
如果一条消息被发往 RabbitMQ 进行交换,并且没有队列连接到交换,那么这条消息会被丢弃或者被发往相关的备用交换。 为了避免这种情况, Palermo中一个发布者每次把一个新的任务加入到队列中时,在发送任务的数据之前,发布者会声明匹配路由关键字的队列。 通过这种方式,我们可以确定当没有工作者在等待处理任务时消息也不会被丢弃。 在 RabbitMQ中,使用相同的参数重新声明一个已存在的队列是一个合法的操作,且没有任何影响。使用之前为RabbitMQ所描述的设置,我们可以建立工作处理系统的核心。然而,某些特性,像失败事务的管理或者消息的序列化,它们不能被编址放到RabbitMQ的特性中。在Palermo中,这个特性已经被实现,并被作为一个附加的应用软件层,这个层可以被分配作为一个Java库。首要的问题是如何处理失败的事务。我们来看它是怎么(工作的),过去我们使用显式的人工确认,在人工处理或超时时,RabbitMQ将会处理致命的错误。这套机制也可以用于处理在事务处理逻辑中(产生的)任何类型的异常条件,除了我们期望的功能需求外,失败信息必定会发送给一个特别的失败事务队列,他们可以被查阅,移除和重入队。这个功能已经在Palermo中被完成,被包装为一个通用的Java事务用来处理try/catch块,并通过管道将失败信息添加到队列中,并添加一些关于事务的错误信息,(例如)重试次数和在消息元数据的头中的原始队列一并发送消息给RabbitMQ。序列化的问题,已经通过定义一个作业消息来陈述了,这个消息包含作业Java类的头信息,作业序列化参数和序列化类型.有关参数形象化的一小部分内容,通过一个人眼可识别的字符串,也随消息一起发送了. Palermo已经通过支持插件和去插件的方式,对系统进行不同的序列化,同时,包含一个基于JSON的序列化转换器.但是,默认的序列化技术是使用JBoss Serialization.只有作业的参数被序列化,并发送给工作者,作业类的字节码必须在工作者的class path中可用,以便正确执行作业.
Palermo工作者只是执行实际作业逻辑的一种通用方式,其中作业逻辑是封装在作业类的定义之中.Palermo的整个逻辑已经在Clojure编程语言中实现,但是,多亏Clojure Java inter-op特性,它可以在java代码或其他任意基于JVM的语言中使用.由于Palermo工作者线程在JVM中运行,它们从底层的操作系统中被隔离出来.像Resque工作者那样,集成在OS之中,很难实现,但是某种程度的集成,在可能的情况下,已经尝试过,如,为关联到Palermo工作者的RabbitMQ消费者的身份标识,使用进程标识符.一个命令行的接口也已经实现了,所以,新的工作者,通过脚本和使用者,可以很简单的启动.解决方案的最后一个组件是Palermo 系统中运行的一个web接口。 这只是一个使用了Palermo 类库自检特性的简单的web应用,它让人们更容易理解Palermo 系统的运行机制。 这个接口是Resque web接口的复制,在管理交换、队列、工作者方面,它可以作为RabbitMQ 通用接口的替代者。通过Palermo 类库,web接口提供的所有功能可以在任意java代码中进行使用。
考虑到所有之前我们能想到的, Palermo 只是薄薄的一层,它建立在一个特定的 RabbitMQ设置的顶部,并被封装成一个可重用的作业处理使用模式的实现类库。 相同的方法可能适用于不同的使用模式,这些模式使用 RabbitMQ作为底层引擎,这样可以在写入非特定应用程序的代码、处理设置和由 RabbitMQ特定配置而引入其余的复杂性方面节省时间。如果你现在在JAVA这条路上挣扎,也想在IT行业拿高薪,可以参加我们的训练营课程,选择最适合自己的课程学习,技术大牛亲授,7个月后,进入名企拿高薪。我们的课程内容有:Java工程化、高性能及分布式、高性能、深入浅出。高架构。性能调优、Spring,MyBatis,Netty源码分析和大数据等多个知识点。如果你想拿高薪的,想学习的,想就业前景好的,想跟别人竞争能取得优势的,想进阿里面试但担心面试不过的,你都可以来,q群号为:779792048注:加群要求1、具有1-5工作经验的,面对目前流行的技术不知从何下手,需要突破技术瓶颈的可以加。2、在公司待久了,过得很安逸,但跳槽时面试碰壁。需要在短时间内进修、跳槽拿高薪的可以加。3、如果没有工作经验,但基础非常扎实,对java工作机制,常用设计思想,常用java开发框架掌握熟练的,可以加。4、觉得自己很牛B,一般需求都能搞定。但是所学的知识点没有系统化,很难在技术领域继续突破的可以加。5.阿里Java高级大牛直播讲解知识点,分享知识,多年工作经验的梳理和总结,带着大家全面、科学地建立自己的技术体系和技术认知!
相关文章推荐
- 【Spark深入学习 -10】基于spark构建企业级流处理系统
- 使用 Ansible 在树莓派上构建一个基于 Linux 的高性能计算系统 | Linux 中国
- 【转】基于Trident构建大规模实时流数据处理系统
- 深度学习与神经网络实战:快速构建一个基于神经网络的手写数字识别系统
- logz.io一个企业级的ELK日志分析器 内部集成了机器学习识别威胁——核心:利用用户对于特定日志事件的反馈处理动作来学习判断日志威胁 + 类似语音识别的专家系统从各方收集日志威胁信息
- 基于Express React AntD和MongoDB构建一个CMS系统
- 一个基于qt的桌面聊天系统的学习笔记(1)-邮件模块(2)收发信处理流程
- 基于ERP系统评价特点构建一个成功模型
- 使用 Ansible 在树莓派上构建一个基于 Linux 的高性能计算系统
- 使用 Ansible 在树莓派上构建一个基于 Linux 的高性能计算系统
- spring security 一个能够为基于Spring的企业应用系统提供声明式的安全访问控制解决方案的安全框架
- 基于Maven + SSM (Spring、SpringMVC、Mybatis)构建一个简单的
- 关于构建一个使用EJB组件的新系统
- RabbitMQ学习之(五)_一个基于PHP的RabbitMQ操作类
- [Unity通信]一个基于socket的3DARPG网络游戏(二):消息分类处理和json的使用
- 分享一个基于ligerui的系统应用案例ligerRM V2(权限管理系统)(提供下载)
- 一个基于D-BUS的中间件系统
- rhel5.8的系统上安装nginx+php(FastCGI)+mysql 来构建一个高效的web服务器
- 分享一个基于ligerui的系统应用案例ligerRM V2(权限管理系统)(提供下载)
- PLUTO平台是由美林数据技术股份有限公司下属西安交大美林数据挖掘研究中心自主研发的一款基于云计算技术架构的数据挖掘产品,产品设计严格遵循国际数据挖掘标准CRISP-DM(跨行业数据挖掘过程标准),具备完备的数据准备、模型构建、模型评估、模型管理、海量数据处理和高纬数据可视化分析能力。