您的位置:首页 > 数据库 > Redis

基于Redis的incr、发布订阅防并发和setnx、轮询防并发的理解

2018-02-11 21:00 781 查看
先上代码:

1、incr、发布订阅防并发

package com.xxx.epps.sfeicuss.common.anticoncurrency;

import com.xxx.epps.sfeicuss.common.enums.ResponseCode;
import com.xxx.epps.sfeicuss.common.exception.AppException;
import com.xxx.epps.sfeicuss.common.util.RedisClientUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

/**
* 〈一句话功能简述〉<br>
* 〈功能详细描述〉
*
* @author wangzha
* @see [相关类/方法](可选)
* @since [产品/模块版本] (可选)
*/
@Service
public class PublishAndSubscribe {

private static final Logger LOGGER = LoggerFactory.getLogger(PublishAndSubscribe.class);

/**
* 频道key
*/
private static final String PUBANDSUB_KEY = "PUBANDSUB_KEY_";

/**
* 结果key
*/
private static final String PUBANDSUB_RES_KEY = "PUB_RES_KEY_";

/**
* 发布成功
*/
private static final String MSG_TRUE = "true";

/**
* 发布失败
*/
private static final String MSG_FALSE = "false";

/**
* 默认重试次数
*/
private static final int RETRY_TIME = 3;

/**
* 线程全局变量,用于统计重试次数
*/
private ThreadLocal<Integer> threadLocal = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 1;
}
};

/**
* 入口
*
* @param key
* @param clazz
* @param retryTime
* @param callBack
* @param <T>
* @return
*/
public <T> T execute(String key, Class<T> clazz, int retryTime, CallBack callBack) {
T result;
String pasKey = PUBANDSUB_KEY + key;
String pasResKey = PUBANDSUB_RES_KEY + key;
if (retryTime < 1) {
//重试次数不合法,默认为3
retryTime = RETRY_TIME;
}
//抢占入口
long incr = RedisClientUtils.incr(pasKey);
LOGGER.debug("抢占到资源");
boolean flag;
if (incr == 1) {
String msg = MSG_FALSE;
//处理业务逻辑
try {
result = callBack.process();
flag = callBack.handlerIsSuccessOrNot(result);
LOGGER.debug("业务结果:{}", flag);
} catch (Exception e) {
LOGGER.error("回调业务处理异常", e);
throw new AppException("回调业务处理异常", ResponseCode.ERROR.getCode());
}
if (flag) {
//业务处理成功,则推送缓存,且发布
RedisClientUtils.putBeanObj(pasResKey, result);
msg = MSG_TRUE;
}
RedisClientUtils.del(pasKey);
RedisClientUtils.publish(pasKey, msg);
return result;
} else {
String subMsg = RedisClientUtils.subscribe(pasKey);
LOGGER.debug("subMsg", subMsg);
if (MSG_TRUE.equals(subMsg)) {
result = RedisClientUtils.getBeanObj(pasResKey, clazz);
LOGGER.debug("收到订阅消息:{}", result);
return callBack.handlerResult(result);
} else {
//重试
while (true) {
int time = threadLocal.get();
if (time < retryTime) {
time++;
return execute(key, clazz, retryTime, callBack);
} else {
throw new AppException("超过重试次数");
}
}
}
}
}
}


package com.xxx.epps.sfeicuss.common.anticoncurrency;

/**
* 〈一句话功能简述〉<br>
* 〈功能详细描述〉
*
* @author wangzha
* @see [相关类/方法](可选)
* @since [产品/模块版本] (可选)
*/
public interface CallBack{
<T> T process();

<T> boolean handlerIsSuccessOrNot(T result);

<T> T handlerResult(T result);
}


2、setnx、轮询防并发

package com.xxx.epps.sfeicuss.common.anticoncurrency;

import com.xxx.epps.sfeicuss.common.enums.ResponseCode;
import com.xxx.epps.sfeicuss.common.exception.AppException;
import com.xxx.epps.sfeicuss.common.util.RedisClientUtils;
import com.xxx.epps.sfeicuss.common.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 〈一句话功能简述〉<br>
* 〈功能详细描述〉
*
* @author wangzha
* @see [相关类/方法](可选)
* @since [产品/模块版本] (可选)
*/
public class OneAfterOne {

private static final Logger LOGGER = LoggerFactory.getLogger(OneAfterOne.class);

/**
* 锁key
*/
private static final String OAO_LOCK_KEY = "OAO_LOCK_KEY_";

/**
*
*/
private static final String OAO_LOCK_VALUE = "OAO_LOCK_VALUE";

/**
* 返回结果缓存key
*/
private static final String OAO_RESULT_KEY = "OAO_RESULT_KEY_";

/**
* 轮询超时时间
*/
private static final long TIME_OUT = 2000;

/**
* 锁超时时间
*/
private static final long LOCK_KEY_TIME_OUT = 5;

/**
* 重试次数
*/
private static final int RETRY_TIME = 3;

/**
* 线程全局变量,用于统计重试次数
*/
private ThreadLocal<Integer> threadLocal = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 1;
}
};

/**
* 订制入口
*
* @param cachekey
* @param timeOut
* @param lockTimeOut
* @param retryTime
* @param callBack
* @return
*/
public String execute(String cachekey, long timeOut, long lockTimeOut, int retryTime, OneAfterOneCallBack callBack) {
String result;
/************缓存key初始化**************/
//构造锁key
String key = OAO_LOCK_KEY + cachekey;
//设置轮询超时时间
if (timeOut <= 0) {
timeOut = TIME_OUT;
}
//设置锁超时时间
if (lockTimeOut <= 0) {
lockTimeOut = LOCK_KEY_TIME_OUT;
}

/****************并发处理****************/
try {
//获取锁
boolean lock = RedisClientUtils.setnx(key, OAO_LOCK_VALUE, lockTimeOut);
//构造结果key
String resultKey = OAO_RESULT_KEY + cachekey;
if (lock) {
//获取锁,回调业务处理
result = callBack.process();
//判断业务是否成功,成功将结果放入缓存,否则回调异常处理
boolean resultFlag = callBack.handlerIsSuccessOrNot(result);
if (resultFlag) {
RedisClientUtils.setExpire(resultKey, result, 1);
} else {
callBack.handlerSerivceException(result);
}
RedisClientUtils.del(key);
return result;
} else {
int time = threadLocal.get();
while (true) {
//轮询
result = this.pollingReuslt(resultKey, timeOut);
LOGGER.info("轮询获取结果:{}", result);
if (StringUtils.isNotEmpty(result)) {
//处理轮询结果
return callBack.handlerResult(result);
}
if (time > retryTime) {
//超过重试次数
throw new AppException("超过重试次数");
} else {
time++;
//轮训次数已满,尝试重新获取锁
return execute(cachekey, timeOut, lockTimeOut, retryTime, callBack);
}
}
}
} catch (AppException e) {
throw new AppException(ResponseCode.ERROR.getCode(), "OneAfterOne异常");
}

}

/**
* 通用入口
*
* @param cachekey
* @param callBack
* @return
*/
public String execute(String cachekey, OneAfterOneCallBack callBack) {
return execute(cachekey, TIME_OUT, LOCK_KEY_TIME_OUT, RETRY_TIME, callBack);
}

/**
* 轮询等待结果
*
* @param resultKey
* @return
*/
private String pollingReuslt(String resultKey, long timeOut) {
long startTime = System.currentTimeMillis();
while (true) {
try {
//休眠
Thread.sleep(50);
} catch (InterruptedException e) {
LOGGER.error("系统异常", e);
throw new AppException("系统异常", ResponseCode.ERROR.getCode());
}
//尝试获取上一笔的处理结果
String result = RedisClientUtils.get(resultKey);
if (!(StringUtils.isEmpty(result) && (System.currentTimeMillis() - startTime) / 1000 < timeOut)) {
return result;
}
}
}
}


package com.xxx.epps.sfeicuss.common.anticoncurrency;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.xxx.epps.sfeicuss.common.enums.ResponseCode;
import com.xxx.epps.sfeicuss.common.lang.ObjectResponse;

import java.util.Map;

/**
* 〈一句话功能简述〉<br>
* 〈功能详细描述〉
*
* @author wangzha
* @see [相关类/方法](可选)
* @since [产品/模块版本] (可选)
*/
public abstract class OneAfterOneCallBack {

/**
* 业务需实现的防并发方法
*
* @return
*/
protected abstract String process();

/**
* 判断第一笔是否处理成功
* 默认0000为成功,可以根据业务场景重写
*/
protected boolean handlerIsSuccessOrNot(String result) {
boolean flag = false;
ObjectResponse<Map<String, Object>> response = JSON.parseObject(result, new TypeReference<ObjectResponse<Map<String, Object>>>() {
});
String reponseCode = response.getResponseCode();
if (ResponseCode.SUCCESS.getCode().equals(reponseCode)) {
flag = true;
}
return flag;
}

/**
* 轮询到的结果,处理.业务可以根据需要重写
*
* @return
*/
protected String handlerResult(String result) {
return result;
}

/**
* 业务异常情况
*
* @return
*/
protected String handlerSerivceException(String result) {
return result;
}
}


先说明:笔者对于redis的中层、底层还没有开始阅读理解。这里先表达运用层的理解。

1、incr和setnx都能实现锁的功能

2、incr额外提供计数功能,老大一直不明白我为啥要用incr实现这个锁,我自己也不清楚。后面对redis开撸之后可能会理解。

3、incr没有超时时间的概念,发布订阅也不存在。incr只能删key,发布订阅只能退订频道。发布是多链接,订阅是单链接。

4、第一种方案有一个bug,如果第一个线程发布意外失败,其余等待线程全部阻塞在subscribe,那么其余线程全部会死掉。等待的线程需要时间控制他们。但是还没想到什么方案来对付这种情况。

5、对于发布订阅来说,20笔请求进来,19笔请求必须等待第一笔处理结果publish后,自己subscribe后才能进行其他操作。也就是说如果业务5S超时,那么5秒内堆积的线程都要等第一笔处理结束。大量并发的情况下就会出现拥堵。

对于这一点,第二种防并发方案可以通过,轮询的超时时间和锁的超时时间来避免,即在业务5s的处理过程中,假如等待线程如果轮询了2s,我们可以选择不让他们接着等待,而是自己再去发起业务请求。

6、注意轮询时间最好不要小于锁key的时间。可以选择相等。key的时间和业务处理的时间需要衡量对比。以及上游系统调用本系统的超时时间结合起来 来决定是否必要重试,以及重试次数。

7、注意轮询拿不到结果如果选择重试一定要设置重试次数。否则可能会无限轮询。

中低层了解的太少不敢妄言,希望大家能指点指点,说错的地方谅解。

对于轮询的又一种编写:

/**
* 〈一句话功能简述〉<br>
* 〈功能详细描述〉
*
* @author wangzha
* @see [相关类/方法](可选)
* @since [产品/模块版本] (可选)
*/
@Component
public class OneAfterOne<T extends GenericResponse> {

private static final Logger LOGGER = LoggerFactory.getLogger(OneAfterOne.class);

/**
* 锁key
*/
private static final String BIZTYPE_DEFAULT = "DEFAULT_";
/**
* 锁key
*/
private static final String ONEAFTERONE_LOCK_KEY = "ONEAFTERONE_LOCK_";

/**
* 锁value
*/
private static final String ONEAFTERONE_LOCK_VALUE = "ONEAFTERONE_LOCK_VALUE";

/**
* 返回结果缓存key
*/
private static final String ONEAFTERONE_RESULT_FLAG_KEY = "ONEAFTERONE_RESULT_FLAG_";

/**
* 业务处理有结果标记
*/
private static final String ONEAFTERONE_OK = "OK";

/**
* 服务超时时间
*/
private static final String SERVICE_TIME_OUT = "serviceTimeOut";

/**
* 轮询间隔时间
*/
private static final String POLLING_INTERVAL = "pollingInterval";

/**
* 功能描述: 防止一段时间内,相同的入参多比重复请求<br>
* 〈功能详细描述〉 1、多笔请求进来,用setnx去获取锁. 2、拿到锁的进行业务处理回调,并将完整的业务处理结果返回.
* 3、组件OAO将第一笔处理的是否成功的标记(OK)放入结果缓存.(此处只要业务系统不抛异常,都是为业务处理有结果) 回调缓存处理
* 4、未拿到锁的请求,进行轮询获取结果缓存来判断第一笔是否处理有结果,如果轮询有结果,回调获取缓存;无结果,抛出业务处理异常
* <p>
* 注意:serviceTimeOut与rsf接口超时时间一致,标记缓存时间和锁的缓存时间一致
*
* @param cachekey        业务缓存key
* @param serviceTimeOut  服务超时时间
* @param pollingInterval 轮询间隔时间
* @param callBack        回调
* @return T
* @see [相关类/方法](可选)
* @since [产品/模块版本] (可选)
*/
@SuppressWarnings("unchecked")
public T execute(String cachekey, long serviceTimeOut, long pollingInterval, OneAfterOneCallBack<T> callBack) {
LOGGER.debug("uniqueKeys:{},进入oneAfterOne组件,服务超时时间:{},轮询间隔时间:{}", cachekey, serviceTimeOut, pollingInterval);
T result;
String key = ONEAFTERONE_LOCK_KEY + cachekey;
// 获取锁
boolean lock = RedisClientUtils.setnx(key, ONEAFTERONE_LOCK_VALUE, serviceTimeOut / 1000);
// 构造结果标记key
String resultKey = ONEAFTERONE_RESULT_FLAG_KEY + cachekey;
if (lock) {
// 获取锁,回调业务处理
try {
result = callBack.process();
} catch (AppException e) {
LOGGER.error("该笔业务处理抛出异常" + key, e);
GenericResponse response = new GenericResponse();
response.response(e.getErrorCode(), e.getMessage());
result = (T) response;
} catch (RuntimeException e) {
LOGGER.error("该笔业务处理抛出异常" + key, e);
GenericResponse response = new GenericResponse(ResponseCode.ERROR);
result = (T) response;
}
// 缓存结果
callBack.cache(cachekey, result);
// 回调业务处理有结果,将OK标记放入缓存
RedisClientUtils.setExpire(resultKey, ONEAFTERONE_OK, 1);
return result;
} else {
// 轮询
String pollingResult;
long startTime = System.currentTimeMillis();
LOGGER.info("{},该笔进入轮训", cachekey);
while (true) {
LOGGER.debug("{}该笔进入轮训", cachekey);
// 尝试获取上一笔的处理结果
pollingResult = RedisClientUtils.get(resultKey);
// 轮询结果不为空或者超出轮询时间,跳出循环,否则继续循环
if (pollingResult != null || (System.currentTimeMillis() - startTime) > serviceTimeOut) {
break;
}
// 休眠
sleep(pollingInterval);
}
LOGGER.info("{},该笔退出轮训,轮询获取结果:{}", cachekey, pollingResult);
if (ONEAFTERONE_OK.equals(pollingResult)) {
// 处理轮询结果
result = callBack.getCache();
if (result == null) {
LOGGER.error(ResponseCode.ERROR.getCode(), "轮训到OK,无结果明细!");
throw new AppException(ResponseCode.ERROR);
} else {
return callBack.getCache();
}
} else if (StringUtils.isEmpty(pollingResult)) {
throw new AppException(ResponseCode.POLLING_TIME_OUT);
} else {
throw new AppException(ResponseCode.ERROR);
}
}
}

/**
* 通用入口
*
* @param prefix
* @param cachekey
* @param callBack
* @param <T>
* @return
*/
public T execute(String bizType, String[] uniqueKeys, OneAfterOneCallBack<T> callBack) {
Assert.notEmpty(uniqueKeys, "业务入参用于拼装唯一key,不可以为空!");
String cachekey = bizType + StringUtils.join(Symbols.UNDERLINE, uniqueKeys);
return execute(cachekey, getServiceTimeOut(bizType), getPoolingInterval(bizType), callBack);
}

/**
* 功能描述: <br>
* 〈功能详细描述〉
*
* @param string
* @return
* @see [相关类/方法](可选)
* @since [产品/模块版本](可选)
*/
private long getServiceTimeOut(String bizType) {
long serviceTimeOut = SCMServiceCfg.getNode(bizType + SERVICE_TIME_OUT);
return serviceTimeOut > 0 ? serviceTimeOut : SCMServiceCfg.getNode(BIZTYPE_DEFAULT + SERVICE_TIME_OUT);
}

/**
* 功能描述: <br>
* 〈功能详细描述〉
*
* @param string
* @return
* @see [相关类/方法](可选)
* @since [产品/模块版本](可选)
*/
private long getPoolingInterval(String bizType) {
long poolingInterval = SCMServiceCfg.getNode(bizType + POLLING_INTERVAL);
return poolingInterval > 0 ? poolingInterval : SCMServiceCfg.getNode(BIZTYPE_DEFAULT + POLLING_INTERVAL);
}

/**
* 休眠
*
* @param pollingInterval
*/
private void sleep(long pollingInterval) {
try {
// 休眠
Thread.sleep(pollingInterval);
} catch (Exception e) {
LOGGER.error("系统异常", e);
}
}

//    public static void main(String[] args) {
//        long startTime = System.currentTimeMillis();
//        while (true) {
//            long time = System.currentTimeMillis() - startTime;
//            long time2 = (System.currentTimeMillis() - startTime)/1000;
//            System.out.println("time:" + time);
//            System.out.println("time2:" + time2);
//            System.out.println("time3:" + System.currentTimeMillis());
//            if (time / 1000 > 5000) {
//                System.out.println("in");
//                break;
//            }
//            try {
//                // 休眠
//                Thread.sleep(50);
//            } catch (Exception e) {
//                LOGGER.error("系统异常", e);
//            }
//        }
//        System.out.println("out");
//    }
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: