您的位置:首页 > 数据库 > Redis

redis cluster模式key的模糊删除-java操作

2017-04-27 10:45 1576 查看
不管是redis单节点还是redis cluster模式都是不支持模糊删除的。当然你也可以自己去写shell脚本去删除。这里我自己想的一种方式,采用多线程操作集群下redis单节点,countdownlatch统计汇总删除。

上代码:

线程池:

public class ThreadSinleton {
private final static Logger LOGGER = LoggerFactory
.getLogger(ThreadSinleton.class);

private static class SingletonHolder {
static ExecutorService es = null;
static {
es = Executors.newFixedThreadPool(Runtime.getRuntime()
.availableProcessors());
LOGGER.error("线程池初始化完成!!!!");

}
private static final ThreadSinleton sinleton = new ThreadSinleton();
}

private ThreadSinleton() {
}

public static final ThreadSinleton getSinleton() {
return SingletonHolder.sinleton;
}

public static final ExecutorService getExecutorService() {
return SingletonHolder.es;
}
}


执行调用:

public long getKeys(String key,boolean isDel) throws Exception {
Long startTime = System.currentTimeMillis();
long resSum=0;
List<Jedis> jedisList = new ArrayList<Jedis>();// jedis 操作集合
for (HostAndPort hostAndport : jedisCluster.getHaps()) {
jedisList.add(new Jedis(hostAndport.getHost(), hostAndport
.getPort()));
}
int size =jedisList.size();

if (null != jedisList && size > 0) {
ScanParams params = new ScanParams();
params.match(key+"*");/**模糊匹配**/
//首选计算主node数量
List<Jedis> masterList = new ArrayList<Jedis>();
for (int i = 0; i < size; i++) {
Jedis jedis = null;
jedis = jedisList.get(i);
if (JedisUtils.isMaster(jedis)) {
masterList.add(jedis);
}else{
/**关掉slave连接 **/
jedis.close();
}
}
int masterSize =masterList.size();
LOGGER.info("jedis操作实例创建完毕,master数量:" +masterSize);

if(null!=masterList&&masterSize>0){
CountDownLatch countDownLatch = new CountDownLatch(masterSize);
@SuppressWarnings("rawtypes")
Future[] future_Arr = new Future[masterSize] ;
ExecutorService es =ThreadSinleton.getExecutorService();
for (int j =0; j < masterSize; j++) {
Jedis jedis = null;
Pipeline pipeline=null;
jedis = masterList.get(j);
future_Arr[j] = es.submit(new CacheBodyThread(
"子线程"+j, jedis, pipeline, isDel, params, countDownLatch));
}
try {
// LOGGER.info("*******主线程正在汇总************");
countDownLatch.await();
try {
if(null!=future_Arr&&future_Arr.length>0){
for (int i = 0; i < future_Arr.length; i++) {
resSum+=(long)future_Arr[i].get();
}
}

} catch (InterruptedException e) {
e.printStackTrace();
LOGGER.error(e.getMessage());
} catch (ExecutionException e) {
e.printStackTrace();
LOGGER.error(e.getMessage());
}
//es.shutdown(); 线程池不需关闭
LOGGER.info("*******线程池关闭,主线程正在汇总完毕==========,"+resSum);
} catch (InterruptedException e) {
e.printStackTrace();
LOGGER.error(e.getMessage());
}
}
}
Long endTime = System.currentTimeMillis();
LOGGER.error((true==isDel?"清理":"统计")+"缓存,[执行模糊查找所有键]end,待处理集合数据长度:"+resSum+",using time is<耗时>:" + (endTime - startTime)+"ms");
return resSum;
}

线程(因为这里我们不仅仅要删除,还要知道删除的量,所有用callable):
public class CacheBodyThread implements Callable<Long> {
private final static Logger LOGGER = LoggerFactory
.getLogger(CacheBodyThread.class);

String threadName;
Jedis jedis;
Pipeline pipeline;
boolean isDel;// isDel 清除 TRUE 统计FALSE
ScanParams params;
CountDownLatch countDownLatch;

public CacheBodyThread(String threadName, Jedis jedis, Pipeline pipeline,
boolean isDel, ScanParams params, CountDownLatch countDownLatch) {
super();
this.threadName = threadName;
this.jedis = jedis;
this.pipeline = pipeline;
this.isDel = isDel;
this.params = params;
this.countDownLatch = countDownLatch;
}

@SuppressWarnings("deprecation")
@Override
public Long call() throws Exception {
long sum =0;
LOGGER.info("【" + threadName + "】线程,统计缓存thread正在执行中========");

// List<String> keys = null;
// keys = new ArrayList<String>(); // 待处理key集合

String scanRet = "0";// 起始游标
ScanResult<String> scanResult = null;

do {
scanResult = jedis.scan(scanRet, params);
scanRet = scanResult.getStringCursor();
sum += scanResult.getResult().size();
if (isDel) {
pipeline = jedis.pipelined();// 批量操作
for (String key_i : scanResult.getResult()) {
pipeline.del(key_i);
}
pipeline.sync();// 执行
}
}
while (0 != scanResult.getCursor());/** 未来版本此方法会删除 **/
if (null != pipeline) {
try {
pipeline.close();
LOGGER.info("pipeline管道连接已关闭===============");
} catch (IOException e) {
e.printStackTrace();
}
}
if (null != jedis) {
jedis.close();// 关闭连接
LOGGER.info("jedis连接已关闭===============");
}
countDownLatch.countDown();
LOGGER.info("【" + threadName + "】线程执行完毕========");
return sum;
}

}


thread 配合countdownlatch 有点fork/join的意思,只是这里的fork,我们直接定位到单node上了,也可以直接使用jdk1.7之后的forkjointask配合forkjoinpool处理,原理差不多。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: