您的位置:首页 > 其它

高并发二级缓存的简单实现

2016-01-30 11:12 337 查看
高并发二级缓存的简单实现

我们的应用系统使用了两台Redis做缓存,一台持久化存储重要数据,另一台就是纯粹的缓存Mysql的数据。Redis是很强大,不过也有性能瓶颈的时候。官方公平的吞吐量是10W/S,从我在内网测试环境开多线程同时使用了JedisPool测出来的结果来看,使用管道Pipeline批量提交能执行写入指令数10+W/S,不使用管道最多也就1W/S,并不可观(当然,比Mysql的1K~8K的写速度还是强多了)。正式环境效果更好些,毕竟CPU和内存都不是一个级别的。网络上测试文章的结论参差不齐,高的能测出40W/S,我估计是批量读吧,低的有测出1K/S写,我估计瓶颈不在Redis上吧。

作为单线程的Redis,在现在多核CPU的时代,确实格格不入。毫无疑问单台机器上部署多个实例然后做cluster可以提升很多性能。不过更多的数量也意味着更高的维护时间成本。当然需要处理的数据量实在太大的时候这个还是有必要的,我这里要写的是二级缓存,实现方式是在应用内搭建一层一级缓存,减少对Redis的并发访问。

应用内缓存基础类InnerAppCache.java

public class InnerAppCache
{
private static final Logger log = LoggerFactory.getLogger(InnerAppCache.class);
private static final int EXPIRE_INTERVAL = 30000; // 删除过期数据间隔,单位毫秒
private static final int TTL = 60000; // 数据缓存时间,单位毫秒
private static final int MAX_OBJECT_COUNT = 100000; // 最多缓存的对象数量
private static final ConcurrentHashMap<String, CacheObject> cacheMap = new ConcurrentHashMap<String, CacheObject>(); // 缓存数据存放在此map

private InnerAppCache()
{
}

/**
* 将数据放入缓存
*/
public static boolean put(String key, Object value)
{
if (cacheMap.size() > MAX_OBJECT_COUNT)
{
return false;
}

CacheObject cacheObj = new CacheObject();
cacheObj.setObj(value);
cacheObj.setExpireTime(System.currentTimeMillis() + TTL);
cacheMap.put(key, cacheObj);
return true;
}

/**
* 获取数据
*/
public static Object get(String key)
{
CacheObject cacheObj = cacheMap.get(key);
if (cacheObj != null)
{
return cacheObj.getObj();
}
return null;
}

private static class ExpireChecker implements Runnable
{
@Override
public void run()
{
while (true)
{
try
{
check();
} catch (Exception e)
{
log.error("ExpireChecker error", e);
}

try
{
Thread.sleep(EXPIRE_INTERVAL);
} catch (InterruptedException e)
{
log.error("ExpireChecker error2", e);
}
}
}

/**
* 删除过期的数据
*/
private void check()
{
long now = System.currentTimeMillis();
for (Map.Entry<String, CacheObject> e : cacheMap.entrySet())
{
if (e.getValue().getExpireTime() <= now)
{
cacheMap.remove(e.getKey());
}
}
}
}

static
{
// 启动过期线程
new Thread(new ExpireChecker()).start();
}
}


可以看到,内部新开了一个线程,用于定时清空缓存数据。

缓存服务类CacheService.java

public class CacheService
{
// 二级缓存用户信息(一定能匹配到的数据)
private static ReentrantLock getUserDto_lock = new ReentrantLock();
public UserDto getUserDto(long userId)
{
// 构建key,就是自定义的字符串
String key = CacheKeyPrefix.USER + userId;

// 获取应用内缓存(一级缓存)
UserDto dto = (UserDto) InnerAppCache.get(key);
if (dto == null)
{
getUserDto_lock.lock();
try
{
// redis缓存(二级缓存)
dto = RedisService.getUserDto(userId);
if (dto == null)
{
// 数据库(底层存储)
dto = UserDao.getById(userId);
if (dto != null)
{
RedisService.setUserDto(dto);// 填充二级缓存(注意redis里面的key一定要设置合适的时效时间,比如1分钟)
InnerAppCache.put(key, dto);// 填充一级缓存
}
} else
{
InnerAppCache.put(key, dto);// 填充一级缓存
}
}finally
{
getUserDto_lock.unlock();
}
}
return dto;
}

// 二级缓存关键字(不一定能匹配到的数据)
private static ReentrantLock getKeywords_lock = new ReentrantLock();
public List<String> getKeywords()
{
// 构建key,就是自定义的字符串
String key = CacheKeyPrefix.KEYWORDS;

// 获取应用内缓存(一级缓存)
List<String> dto = (List<String>) InnerAppCache.get(key);
if (dto == null)
{
getKeywords_lock.lock();
try
{
// redis缓存(二级缓存)
dto = RedisService.getKeywords();
if (dto == null)
{
// 数据库(底层存储)
dto = KeywordDao.getKeywords();
if (dto == null)
{
dto = new ArrayList<String>();// 注意这里和上面的区别,关键字是不一定存在的,如果不存在创建一个空的列表,防止每次都会请求到底层存储
}
RedisService.setKeywords(dto);// 填充二级缓存(注意redis里面的key一定要设置合适的时效时间,比如1分钟)
InnerAppCache.put(key, dto);// 填充一级缓存
} else
{
InnerAppCache.put(key, dto);// 填充一级缓存
}
}finally
{
getKeywords_lock.unlock();
}
}
return dto;
}
}

要注意锁的位置,要在一级缓存和二级缓存之间(或者二级缓存和底层存储之间,看你想限制住哪部分的并发)。同时,锁是static的类型的,否则将没有任何效果,具体原因可以查阅我前之前的博文《锁》。

有一点相信每个人都会有疑惑,如何在下层数据改变的时候去同步上层数据。缓存既是牺牲了实时性来提高性能,所谓鱼和熊掌不可兼得,很多数据有1分钟缓存还是能接受的,所以无需纠结。对一些实时性要求十分严格的数据,比如钱,就不适合用上面的方式来实现了。当然,不是做不到,想办法总能实现,代码量就不只这些,高手可以留个思路。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: