基于Redis的incr、发布订阅防并发和setnx、轮询防并发的理解
2018-02-11 21:00
781 查看
先上代码:
1、incr、发布订阅防并发
2、setnx、轮询防并发
先说明:笔者对于redis的中层、底层还没有开始阅读理解。这里先表达运用层的理解。
1、incr和setnx都能实现锁的功能
2、incr额外提供计数功能,老大一直不明白我为啥要用incr实现这个锁,我自己也不清楚。后面对redis开撸之后可能会理解。
3、incr没有超时时间的概念,发布订阅也不存在。incr只能删key,发布订阅只能退订频道。发布是多链接,订阅是单链接。
4、第一种方案有一个bug,如果第一个线程发布意外失败,其余等待线程全部阻塞在subscribe,那么其余线程全部会死掉。等待的线程需要时间控制他们。但是还没想到什么方案来对付这种情况。
5、对于发布订阅来说,20笔请求进来,19笔请求必须等待第一笔处理结果publish后,自己subscribe后才能进行其他操作。也就是说如果业务5S超时,那么5秒内堆积的线程都要等第一笔处理结束。大量并发的情况下就会出现拥堵。
对于这一点,第二种防并发方案可以通过,轮询的超时时间和锁的超时时间来避免,即在业务5s的处理过程中,假如等待线程如果轮询了2s,我们可以选择不让他们接着等待,而是自己再去发起业务请求。
6、注意轮询时间最好不要小于锁key的时间。可以选择相等。key的时间和业务处理的时间需要衡量对比。以及上游系统调用本系统的超时时间结合起来 来决定是否必要重试,以及重试次数。
7、注意轮询拿不到结果如果选择重试一定要设置重试次数。否则可能会无限轮询。
中低层了解的太少不敢妄言,希望大家能指点指点,说错的地方谅解。
对于轮询的又一种编写:
1、incr、发布订阅防并发
package com.xxx.epps.sfeicuss.common.anticoncurrency; import com.xxx.epps.sfeicuss.common.enums.ResponseCode; import com.xxx.epps.sfeicuss.common.exception.AppException; import com.xxx.epps.sfeicuss.common.util.RedisClientUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; /** * 〈一句话功能简述〉<br> * 〈功能详细描述〉 * * @author wangzha * @see [相关类/方法](可选) * @since [产品/模块版本] (可选) */ @Service public class PublishAndSubscribe { private static final Logger LOGGER = LoggerFactory.getLogger(PublishAndSubscribe.class); /** * 频道key */ private static final String PUBANDSUB_KEY = "PUBANDSUB_KEY_"; /** * 结果key */ private static final String PUBANDSUB_RES_KEY = "PUB_RES_KEY_"; /** * 发布成功 */ private static final String MSG_TRUE = "true"; /** * 发布失败 */ private static final String MSG_FALSE = "false"; /** * 默认重试次数 */ private static final int RETRY_TIME = 3; /** * 线程全局变量,用于统计重试次数 */ private ThreadLocal<Integer> threadLocal = new ThreadLocal<Integer>() { @Override protected Integer initialValue() { return 1; } }; /** * 入口 * * @param key * @param clazz * @param retryTime * @param callBack * @param <T> * @return */ public <T> T execute(String key, Class<T> clazz, int retryTime, CallBack callBack) { T result; String pasKey = PUBANDSUB_KEY + key; String pasResKey = PUBANDSUB_RES_KEY + key; if (retryTime < 1) { //重试次数不合法,默认为3 retryTime = RETRY_TIME; } //抢占入口 long incr = RedisClientUtils.incr(pasKey); LOGGER.debug("抢占到资源"); boolean flag; if (incr == 1) { String msg = MSG_FALSE; //处理业务逻辑 try { result = callBack.process(); flag = callBack.handlerIsSuccessOrNot(result); LOGGER.debug("业务结果:{}", flag); } catch (Exception e) { LOGGER.error("回调业务处理异常", e); throw new AppException("回调业务处理异常", ResponseCode.ERROR.getCode()); } if (flag) { //业务处理成功,则推送缓存,且发布 RedisClientUtils.putBeanObj(pasResKey, result); msg = MSG_TRUE; } RedisClientUtils.del(pasKey); RedisClientUtils.publish(pasKey, msg); return result; } else { String subMsg = RedisClientUtils.subscribe(pasKey); LOGGER.debug("subMsg", subMsg); if (MSG_TRUE.equals(subMsg)) { result = RedisClientUtils.getBeanObj(pasResKey, clazz); LOGGER.debug("收到订阅消息:{}", result); return callBack.handlerResult(result); } else { //重试 while (true) { int time = threadLocal.get(); if (time < retryTime) { time++; return execute(key, clazz, retryTime, callBack); } else { throw new AppException("超过重试次数"); } } } } } }
package com.xxx.epps.sfeicuss.common.anticoncurrency; /** * 〈一句话功能简述〉<br> * 〈功能详细描述〉 * * @author wangzha * @see [相关类/方法](可选) * @since [产品/模块版本] (可选) */ public interface CallBack{ <T> T process(); <T> boolean handlerIsSuccessOrNot(T result); <T> T handlerResult(T result); }
2、setnx、轮询防并发
package com.xxx.epps.sfeicuss.common.anticoncurrency; import com.xxx.epps.sfeicuss.common.enums.ResponseCode; import com.xxx.epps.sfeicuss.common.exception.AppException; import com.xxx.epps.sfeicuss.common.util.RedisClientUtils; import com.xxx.epps.sfeicuss.common.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 〈一句话功能简述〉<br> * 〈功能详细描述〉 * * @author wangzha * @see [相关类/方法](可选) * @since [产品/模块版本] (可选) */ public class OneAfterOne { private static final Logger LOGGER = LoggerFactory.getLogger(OneAfterOne.class); /** * 锁key */ private static final String OAO_LOCK_KEY = "OAO_LOCK_KEY_"; /** * */ private static final String OAO_LOCK_VALUE = "OAO_LOCK_VALUE"; /** * 返回结果缓存key */ private static final String OAO_RESULT_KEY = "OAO_RESULT_KEY_"; /** * 轮询超时时间 */ private static final long TIME_OUT = 2000; /** * 锁超时时间 */ private static final long LOCK_KEY_TIME_OUT = 5; /** * 重试次数 */ private static final int RETRY_TIME = 3; /** * 线程全局变量,用于统计重试次数 */ private ThreadLocal<Integer> threadLocal = new ThreadLocal<Integer>() { @Override protected Integer initialValue() { return 1; } }; /** * 订制入口 * * @param cachekey * @param timeOut * @param lockTimeOut * @param retryTime * @param callBack * @return */ public String execute(String cachekey, long timeOut, long lockTimeOut, int retryTime, OneAfterOneCallBack callBack) { String result; /************缓存key初始化**************/ //构造锁key String key = OAO_LOCK_KEY + cachekey; //设置轮询超时时间 if (timeOut <= 0) { timeOut = TIME_OUT; } //设置锁超时时间 if (lockTimeOut <= 0) { lockTimeOut = LOCK_KEY_TIME_OUT; } /****************并发处理****************/ try { //获取锁 boolean lock = RedisClientUtils.setnx(key, OAO_LOCK_VALUE, lockTimeOut); //构造结果key String resultKey = OAO_RESULT_KEY + cachekey; if (lock) { //获取锁,回调业务处理 result = callBack.process(); //判断业务是否成功,成功将结果放入缓存,否则回调异常处理 boolean resultFlag = callBack.handlerIsSuccessOrNot(result); if (resultFlag) { RedisClientUtils.setExpire(resultKey, result, 1); } else { callBack.handlerSerivceException(result); } RedisClientUtils.del(key); return result; } else { int time = threadLocal.get(); while (true) { //轮询 result = this.pollingReuslt(resultKey, timeOut); LOGGER.info("轮询获取结果:{}", result); if (StringUtils.isNotEmpty(result)) { //处理轮询结果 return callBack.handlerResult(result); } if (time > retryTime) { //超过重试次数 throw new AppException("超过重试次数"); } else { time++; //轮训次数已满,尝试重新获取锁 return execute(cachekey, timeOut, lockTimeOut, retryTime, callBack); } } } } catch (AppException e) { throw new AppException(ResponseCode.ERROR.getCode(), "OneAfterOne异常"); } } /** * 通用入口 * * @param cachekey * @param callBack * @return */ public String execute(String cachekey, OneAfterOneCallBack callBack) { return execute(cachekey, TIME_OUT, LOCK_KEY_TIME_OUT, RETRY_TIME, callBack); } /** * 轮询等待结果 * * @param resultKey * @return */ private String pollingReuslt(String resultKey, long timeOut) { long startTime = System.currentTimeMillis(); while (true) { try { //休眠 Thread.sleep(50); } catch (InterruptedException e) { LOGGER.error("系统异常", e); throw new AppException("系统异常", ResponseCode.ERROR.getCode()); } //尝试获取上一笔的处理结果 String result = RedisClientUtils.get(resultKey); if (!(StringUtils.isEmpty(result) && (System.currentTimeMillis() - startTime) / 1000 < timeOut)) { return result; } } } }
package com.xxx.epps.sfeicuss.common.anticoncurrency; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import com.xxx.epps.sfeicuss.common.enums.ResponseCode; import com.xxx.epps.sfeicuss.common.lang.ObjectResponse; import java.util.Map; /** * 〈一句话功能简述〉<br> * 〈功能详细描述〉 * * @author wangzha * @see [相关类/方法](可选) * @since [产品/模块版本] (可选) */ public abstract class OneAfterOneCallBack { /** * 业务需实现的防并发方法 * * @return */ protected abstract String process(); /** * 判断第一笔是否处理成功 * 默认0000为成功,可以根据业务场景重写 */ protected boolean handlerIsSuccessOrNot(String result) { boolean flag = false; ObjectResponse<Map<String, Object>> response = JSON.parseObject(result, new TypeReference<ObjectResponse<Map<String, Object>>>() { }); String reponseCode = response.getResponseCode(); if (ResponseCode.SUCCESS.getCode().equals(reponseCode)) { flag = true; } return flag; } /** * 轮询到的结果,处理.业务可以根据需要重写 * * @return */ protected String handlerResult(String result) { return result; } /** * 业务异常情况 * * @return */ protected String handlerSerivceException(String result) { return result; } }
先说明:笔者对于redis的中层、底层还没有开始阅读理解。这里先表达运用层的理解。
1、incr和setnx都能实现锁的功能
2、incr额外提供计数功能,老大一直不明白我为啥要用incr实现这个锁,我自己也不清楚。后面对redis开撸之后可能会理解。
3、incr没有超时时间的概念,发布订阅也不存在。incr只能删key,发布订阅只能退订频道。发布是多链接,订阅是单链接。
4、第一种方案有一个bug,如果第一个线程发布意外失败,其余等待线程全部阻塞在subscribe,那么其余线程全部会死掉。等待的线程需要时间控制他们。但是还没想到什么方案来对付这种情况。
5、对于发布订阅来说,20笔请求进来,19笔请求必须等待第一笔处理结果publish后,自己subscribe后才能进行其他操作。也就是说如果业务5S超时,那么5秒内堆积的线程都要等第一笔处理结束。大量并发的情况下就会出现拥堵。
对于这一点,第二种防并发方案可以通过,轮询的超时时间和锁的超时时间来避免,即在业务5s的处理过程中,假如等待线程如果轮询了2s,我们可以选择不让他们接着等待,而是自己再去发起业务请求。
6、注意轮询时间最好不要小于锁key的时间。可以选择相等。key的时间和业务处理的时间需要衡量对比。以及上游系统调用本系统的超时时间结合起来 来决定是否必要重试,以及重试次数。
7、注意轮询拿不到结果如果选择重试一定要设置重试次数。否则可能会无限轮询。
中低层了解的太少不敢妄言,希望大家能指点指点,说错的地方谅解。
对于轮询的又一种编写:
/** * 〈一句话功能简述〉<br> * 〈功能详细描述〉 * * @author wangzha * @see [相关类/方法](可选) * @since [产品/模块版本] (可选) */ @Component public class OneAfterOne<T extends GenericResponse> { private static final Logger LOGGER = LoggerFactory.getLogger(OneAfterOne.class); /** * 锁key */ private static final String BIZTYPE_DEFAULT = "DEFAULT_"; /** * 锁key */ private static final String ONEAFTERONE_LOCK_KEY = "ONEAFTERONE_LOCK_"; /** * 锁value */ private static final String ONEAFTERONE_LOCK_VALUE = "ONEAFTERONE_LOCK_VALUE"; /** * 返回结果缓存key */ private static final String ONEAFTERONE_RESULT_FLAG_KEY = "ONEAFTERONE_RESULT_FLAG_"; /** * 业务处理有结果标记 */ private static final String ONEAFTERONE_OK = "OK"; /** * 服务超时时间 */ private static final String SERVICE_TIME_OUT = "serviceTimeOut"; /** * 轮询间隔时间 */ private static final String POLLING_INTERVAL = "pollingInterval"; /** * 功能描述: 防止一段时间内,相同的入参多比重复请求<br> * 〈功能详细描述〉 1、多笔请求进来,用setnx去获取锁. 2、拿到锁的进行业务处理回调,并将完整的业务处理结果返回. * 3、组件OAO将第一笔处理的是否成功的标记(OK)放入结果缓存.(此处只要业务系统不抛异常,都是为业务处理有结果) 回调缓存处理 * 4、未拿到锁的请求,进行轮询获取结果缓存来判断第一笔是否处理有结果,如果轮询有结果,回调获取缓存;无结果,抛出业务处理异常 * <p> * 注意:serviceTimeOut与rsf接口超时时间一致,标记缓存时间和锁的缓存时间一致 * * @param cachekey 业务缓存key * @param serviceTimeOut 服务超时时间 * @param pollingInterval 轮询间隔时间 * @param callBack 回调 * @return T * @see [相关类/方法](可选) * @since [产品/模块版本] (可选) */ @SuppressWarnings("unchecked") public T execute(String cachekey, long serviceTimeOut, long pollingInterval, OneAfterOneCallBack<T> callBack) { LOGGER.debug("uniqueKeys:{},进入oneAfterOne组件,服务超时时间:{},轮询间隔时间:{}", cachekey, serviceTimeOut, pollingInterval); T result; String key = ONEAFTERONE_LOCK_KEY + cachekey; // 获取锁 boolean lock = RedisClientUtils.setnx(key, ONEAFTERONE_LOCK_VALUE, serviceTimeOut / 1000); // 构造结果标记key String resultKey = ONEAFTERONE_RESULT_FLAG_KEY + cachekey; if (lock) { // 获取锁,回调业务处理 try { result = callBack.process(); } catch (AppException e) { LOGGER.error("该笔业务处理抛出异常" + key, e); GenericResponse response = new GenericResponse(); response.response(e.getErrorCode(), e.getMessage()); result = (T) response; } catch (RuntimeException e) { LOGGER.error("该笔业务处理抛出异常" + key, e); GenericResponse response = new GenericResponse(ResponseCode.ERROR); result = (T) response; } // 缓存结果 callBack.cache(cachekey, result); // 回调业务处理有结果,将OK标记放入缓存 RedisClientUtils.setExpire(resultKey, ONEAFTERONE_OK, 1); return result; } else { // 轮询 String pollingResult; long startTime = System.currentTimeMillis(); LOGGER.info("{},该笔进入轮训", cachekey); while (true) { LOGGER.debug("{}该笔进入轮训", cachekey); // 尝试获取上一笔的处理结果 pollingResult = RedisClientUtils.get(resultKey); // 轮询结果不为空或者超出轮询时间,跳出循环,否则继续循环 if (pollingResult != null || (System.currentTimeMillis() - startTime) > serviceTimeOut) { break; } // 休眠 sleep(pollingInterval); } LOGGER.info("{},该笔退出轮训,轮询获取结果:{}", cachekey, pollingResult); if (ONEAFTERONE_OK.equals(pollingResult)) { // 处理轮询结果 result = callBack.getCache(); if (result == null) { LOGGER.error(ResponseCode.ERROR.getCode(), "轮训到OK,无结果明细!"); throw new AppException(ResponseCode.ERROR); } else { return callBack.getCache(); } } else if (StringUtils.isEmpty(pollingResult)) { throw new AppException(ResponseCode.POLLING_TIME_OUT); } else { throw new AppException(ResponseCode.ERROR); } } } /** * 通用入口 * * @param prefix * @param cachekey * @param callBack * @param <T> * @return */ public T execute(String bizType, String[] uniqueKeys, OneAfterOneCallBack<T> callBack) { Assert.notEmpty(uniqueKeys, "业务入参用于拼装唯一key,不可以为空!"); String cachekey = bizType + StringUtils.join(Symbols.UNDERLINE, uniqueKeys); return execute(cachekey, getServiceTimeOut(bizType), getPoolingInterval(bizType), callBack); } /** * 功能描述: <br> * 〈功能详细描述〉 * * @param string * @return * @see [相关类/方法](可选) * @since [产品/模块版本](可选) */ private long getServiceTimeOut(String bizType) { long serviceTimeOut = SCMServiceCfg.getNode(bizType + SERVICE_TIME_OUT); return serviceTimeOut > 0 ? serviceTimeOut : SCMServiceCfg.getNode(BIZTYPE_DEFAULT + SERVICE_TIME_OUT); } /** * 功能描述: <br> * 〈功能详细描述〉 * * @param string * @return * @see [相关类/方法](可选) * @since [产品/模块版本](可选) */ private long getPoolingInterval(String bizType) { long poolingInterval = SCMServiceCfg.getNode(bizType + POLLING_INTERVAL); return poolingInterval > 0 ? poolingInterval : SCMServiceCfg.getNode(BIZTYPE_DEFAULT + POLLING_INTERVAL); } /** * 休眠 * * @param pollingInterval */ private void sleep(long pollingInterval) { try { // 休眠 Thread.sleep(pollingInterval); } catch (Exception e) { LOGGER.error("系统异常", e); } } // public static void main(String[] args) { // long startTime = System.currentTimeMillis(); // while (true) { // long time = System.currentTimeMillis() - startTime; // long time2 = (System.currentTimeMillis() - startTime)/1000; // System.out.println("time:" + time); // System.out.println("time2:" + time2); // System.out.println("time3:" + System.currentTimeMillis()); // if (time / 1000 > 5000) { // System.out.println("in"); // break; // } // try { // // 休眠 // Thread.sleep(50); // } catch (Exception e) { // LOGGER.error("系统异常", e); // } // } // System.out.println("out"); // } }
相关文章推荐
- 基于spring-redis发布订阅模式的实现
- 基于订阅/发布模式的简易聊天室实现(java+redis)
- 基于Redis的消息订阅/发布
- Redis发布/订阅模式
- 【入门】基于ActiveMQ 的发布/订阅(Pub/Sub) Chat 示例,上传了源码
- Redis发布订阅模式
- Redis——发布订阅模式&虚拟内存
- redis 消息订阅和发布
- redis发布与订阅
- .net core如何使用Redis发布订阅
- Spring基于事件驱动模型的订阅发布模式代码实例详解
- Redis 学习笔记(一):安装配置、发布订阅与事务
- (一)redis发布订阅
- Redis--发布订阅模式
- Redis实现消息的发布/订阅
- Redis 的发布与订阅
- 六 redis学习笔记之发布订阅
- 15天玩转redis —— 第九篇 发布/订阅模式
- redis发布、订阅
- Python操作redis的订阅发布功能