基于redis的消息队列
2017-03-06 19:10
351 查看
前面一篇文章介绍了redis对list的操作,为什么要特别介绍list呢?没错,本篇文章就是要用redis的list做消息存储的队列
问题缘由:
最近在开发中,遇到了一个问题,客户端和后台交互,其中有一个步骤是后台发送网络请求,调用一个外部系统,日志里面打印出每个请求耗时:客户端和内部后台一次请求完成时间是在100ms以内,和外部系统交互的时间是几千ms甚至更多,也就是说,绝大多数的处理时间几乎都浪费在了与外部交互的过程中。客户端有几个会频繁发生请求超时的点也几乎都是在这些与外部交互的接口上。
分析:
为了解决这个问题,决定将这几个外部交互的点改成异步模式,也就是说,服务发出请求之后,不再等待对方相应,而是转而去处理其他问题。
方案:
1.多线程方式请求,在发送请求时,另开一个线程去处理,当前线程继续执行剩余工作。这是我第一想到的解决方案,也是我最开始决定使用的方案。但是仔细一想,随即否决。请求的返回会有更改数据的操作,这样的话极容易出现同步问题,可能有人会说用锁来解决,但是简单的方法锁解决不了集群问题,zk锁又稍显复杂。所以在这采用了第二种方案
2.使用消息队列
废话不多说了,上代码吧:
1)定义一个MessageQueue接口:
package MessageQueueBasedRedis;
/**
* <p>创建人:刘星 创建日期:2017-3-3 下午2:15:57</p>
* <p>功能描述:(消息队列接口类)</p>
* @version V1.0
* @param <T>
*/
public interface MessageQueue {
/**
* <p>创建人:刘星 创建日期:2017-3-3下午5:19:30</p>
* <p>功能描述:(submit a task on the specify queue by queue name)</p>
* @param 对方法中某参数的说明
* @return 对方法返回值的说明
* @exception 对方法可能抛出的异常进行说明
* @version 1.0
*/
public void submitTask(String jsonData,String queueName);
/**
* <p>创建人:刘星 创建日期:2017-3-3下午5:19:30</p>
* <p>功能描述:(get a task on the specify queue by queue name)</p>
* @param 对方法中某参数的说明
* @return 对方法返回值的说明
* @exception 对方法可能抛出的异常进行说明
* @version 1.0
*/
public String getTask(String queueName);
}
2)实现类:
package MessageQueueBasedRedis;
import JedisPool.JedisPoolInstance;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
* <p>创建人:刘星 创建日期:2017-3-3 下午5:21:55</p>
* <p>功能描述:(手填)</p>
* @version V1.0
* @param <T>
*/
public class MessageQueueImpl implements MessageQueue {
public static JedisPool jedisPool = new JedisPoolInstance().getPool();
/**
* <p>创建人:刘星 创建日期:2017-3-3下午5:19:30</p>
* <p>功能描述:(submit a task on the specify queue by queue name)</p>
* @param 对方法中某参数的说明
* @return 对方法返回值的说明
* @exception 对方法可能抛出的异常进行说明
* @version 1.0
*/
public void submitTask(String jsonData,String queueName){
Jedis jedis = null;
try{
jedis = jedisPool.getResource();
jedis.lpush(queueName, jsonData);
}catch(Exception e){
//异常处理及日志输出
System.out.println(String.format("向redis队列提交任务时发生异常,打印异常信息:%s", e));
}
}
/**
* <p>创建人:刘星 创建日期:2017-3-3下午5:19:30</p>
* <p>功能描述:(get a task on the specify queue by queue name)</p>
* @param 对方法中某参数的说明
* @return 对方法返回值的说明
* @exception 对方法可能抛出的异常进行说明
* @version 1.0
*/
public String getTask(String queueName){
Jedis jedis = null;
String jsonData = null;
try{
jedis = jedisPool.getResource();
jsonData = jedis.lpop(queueName);
}catch(Exception e){
//异常处理及日志输出
System.out.println(String.format("从redis队列获取任务时发生异常,打印异常信息:%s", e));
}
return jsonData;
}
}
3.redispool实例对象
package JedisPool;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* <p>创建人:刘星 创建日期:2017-3-3 下午6:00:16</p>
* <p>功能描述:(手填)</p>
* @version V1.0
*/
public class JedisPoolInstance {
public static JedisPool jedisPool = null;
/**
* 构建redis连接池
* @param ip
* @param port
* @return JedisPool
*/
public JedisPool getPool() {
if (jedisPool == null) {
JedisPoolConfig config = new JedisPoolConfig();
//控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;
//如果赋值为-1,则表示不限制;如果pool已经分配了MaxTotal个jedis实例,则此时pool的状态为exhausted(耗尽)。
config.setMaxTotal(100);
//控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
config.setMaxIdle(5);
//表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;
config.setMaxWaitMillis(1000 * 100);
//在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
config.setTestOnBorrow(true);
jedisPool = new JedisPool(config, "10.10.38.39", 6379);
}
return jedisPool;
}
/**
* 返还到连接池
*
* @param pool
* @param redis
*/
public static void returnResource(JedisPool pool, Jedis redis) {
if (redis != null) {
pool.returnResource(redis);
}
}
}
数据存储类型为String,琢磨了一下,本来想写成对象的,在实现类内部实现json和对象的互相转化,但是犹豫了一下,还是把转换方法放到调用处了。
个人的牛刀小试,下一篇准备介绍activeMQ
问题缘由:
最近在开发中,遇到了一个问题,客户端和后台交互,其中有一个步骤是后台发送网络请求,调用一个外部系统,日志里面打印出每个请求耗时:客户端和内部后台一次请求完成时间是在100ms以内,和外部系统交互的时间是几千ms甚至更多,也就是说,绝大多数的处理时间几乎都浪费在了与外部交互的过程中。客户端有几个会频繁发生请求超时的点也几乎都是在这些与外部交互的接口上。
分析:
为了解决这个问题,决定将这几个外部交互的点改成异步模式,也就是说,服务发出请求之后,不再等待对方相应,而是转而去处理其他问题。
方案:
1.多线程方式请求,在发送请求时,另开一个线程去处理,当前线程继续执行剩余工作。这是我第一想到的解决方案,也是我最开始决定使用的方案。但是仔细一想,随即否决。请求的返回会有更改数据的操作,这样的话极容易出现同步问题,可能有人会说用锁来解决,但是简单的方法锁解决不了集群问题,zk锁又稍显复杂。所以在这采用了第二种方案
2.使用消息队列
废话不多说了,上代码吧:
1)定义一个MessageQueue接口:
package MessageQueueBasedRedis;
/**
* <p>创建人:刘星 创建日期:2017-3-3 下午2:15:57</p>
* <p>功能描述:(消息队列接口类)</p>
* @version V1.0
* @param <T>
*/
public interface MessageQueue {
/**
* <p>创建人:刘星 创建日期:2017-3-3下午5:19:30</p>
* <p>功能描述:(submit a task on the specify queue by queue name)</p>
* @param 对方法中某参数的说明
* @return 对方法返回值的说明
* @exception 对方法可能抛出的异常进行说明
* @version 1.0
*/
public void submitTask(String jsonData,String queueName);
/**
* <p>创建人:刘星 创建日期:2017-3-3下午5:19:30</p>
* <p>功能描述:(get a task on the specify queue by queue name)</p>
* @param 对方法中某参数的说明
* @return 对方法返回值的说明
* @exception 对方法可能抛出的异常进行说明
* @version 1.0
*/
public String getTask(String queueName);
}
2)实现类:
package MessageQueueBasedRedis;
import JedisPool.JedisPoolInstance;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
* <p>创建人:刘星 创建日期:2017-3-3 下午5:21:55</p>
* <p>功能描述:(手填)</p>
* @version V1.0
* @param <T>
*/
public class MessageQueueImpl implements MessageQueue {
public static JedisPool jedisPool = new JedisPoolInstance().getPool();
/**
* <p>创建人:刘星 创建日期:2017-3-3下午5:19:30</p>
* <p>功能描述:(submit a task on the specify queue by queue name)</p>
* @param 对方法中某参数的说明
* @return 对方法返回值的说明
* @exception 对方法可能抛出的异常进行说明
* @version 1.0
*/
public void submitTask(String jsonData,String queueName){
Jedis jedis = null;
try{
jedis = jedisPool.getResource();
jedis.lpush(queueName, jsonData);
}catch(Exception e){
//异常处理及日志输出
System.out.println(String.format("向redis队列提交任务时发生异常,打印异常信息:%s", e));
}
}
/**
* <p>创建人:刘星 创建日期:2017-3-3下午5:19:30</p>
* <p>功能描述:(get a task on the specify queue by queue name)</p>
* @param 对方法中某参数的说明
* @return 对方法返回值的说明
* @exception 对方法可能抛出的异常进行说明
* @version 1.0
*/
public String getTask(String queueName){
Jedis jedis = null;
String jsonData = null;
try{
jedis = jedisPool.getResource();
jsonData = jedis.lpop(queueName);
}catch(Exception e){
//异常处理及日志输出
System.out.println(String.format("从redis队列获取任务时发生异常,打印异常信息:%s", e));
}
return jsonData;
}
}
3.redispool实例对象
package JedisPool;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* <p>创建人:刘星 创建日期:2017-3-3 下午6:00:16</p>
* <p>功能描述:(手填)</p>
* @version V1.0
*/
public class JedisPoolInstance {
public static JedisPool jedisPool = null;
/**
* 构建redis连接池
* @param ip
* @param port
* @return JedisPool
*/
public JedisPool getPool() {
if (jedisPool == null) {
JedisPoolConfig config = new JedisPoolConfig();
//控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;
//如果赋值为-1,则表示不限制;如果pool已经分配了MaxTotal个jedis实例,则此时pool的状态为exhausted(耗尽)。
config.setMaxTotal(100);
//控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
config.setMaxIdle(5);
//表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;
config.setMaxWaitMillis(1000 * 100);
//在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
config.setTestOnBorrow(true);
jedisPool = new JedisPool(config, "10.10.38.39", 6379);
}
return jedisPool;
}
/**
* 返还到连接池
*
* @param pool
* @param redis
*/
public static void returnResource(JedisPool pool, Jedis redis) {
if (redis != null) {
pool.returnResource(redis);
}
}
}
数据存储类型为String,琢磨了一下,本来想写成对象的,在实现类内部实现json和对象的互相转化,但是犹豫了一下,还是把转换方法放到调用处了。
个人的牛刀小试,下一篇准备介绍activeMQ
相关文章推荐
- 基于Redis实现分布式消息队列(2)
- 项目分布式部署那些事(1):ONS消息队列、基于Redis的Session共享,开源共享
- 基于Redis的简单消息队列模块(Node.js)
- 基于Redis实现分布式消息队列(3)
- 基于redis的延迟消息队列设计
- 基于Redis实现的延迟消息队列
- KMQueue 基于Redis的分布式消息队列
- 基于Redis实现分布式消息队列(汇总目录)
- 基于redis的延迟消息队列设计
- 基于redis构建消息队列
- 基于Redis实现分布式消息队列
- 基于Redis的消息队列封装和测试
- [转载] 基于Redis实现分布式消息队列
- [置顶] Redis应用3-基于Redis消息队列实现的异步操作
- 基于redis的延迟消息队列设计
- 基于redis的延迟消息队列设计
- 基于redis的延迟消息队列设计
- PHP基于Redis消息队列实现发布微博的方法
- 基于Redis的消息队列php-resque
- 基于Redis实现分布式消息队列(4)