java并发——构建高效且可伸缩的结果缓存
2017-04-09 22:58
483 查看
几乎所有的服务器应用都会使用某种形式的缓存。重用之前的计算结果能降低延迟,提高吞吐量,但却要消耗更多内存。看上去简单的缓存,可能会将性能瓶颈转变成伸缩性瓶颈,即使缓存是用来提高单线程性能的。本文将开发一个高效且可伸缩的缓存,用于改进一个高计算开销的计算,我们会从HashMap开始,逐步完善功能,分析它们的并发问题,并讨论如何修改它们。
下面基于一个计算任务开始缓存的设计
第一阶段 HashMap
如上所示,Memorizer1将Computable实现类的计算结果缓存在
第二阶段 ConcurrentHashMap
Memorizer2比Memorizer1拥有更好的并发性,并且具有良好的伸缩性。但它仍然有一些不足——当两个线程同时计算同一个值,它们并不知道有其它线程在做同一的事,存在着资源被浪费的可能。这个不足,对于缓存的对象只提供单次初始化,会带来安全性问题。
第三阶段 ConcurrentHashMap+FutureTask
事实上,第二阶段的功能已经符合大部分情况的功能,但是当计算时间很长导致很多线程进行同一个运算,或者缓存的对象只提供单次初始化,问题就会很棘手,在这里,我们引入FutureTask来让进行运算的线程获知是否已经有其它正在,或已经进行该运算的线程。
Memorizer3缓存的不是计算的结果,而是进行运算的FutureTask。因此Memorizer3首先检查有没有执行该任务的FutureTask。如果有,则直接获得FutureTask,如果计算已经完成,FutureTask.get()方法可以立刻获得结果,如果计算未完成,后进入的线程阻塞直到get()返回结果;如果没有,则创建一个FutureTask进行运算,后续进了的同样的运算可以直接拿到结果或者等待运算完成获得结果。
Memorizer3的实现近乎完美,但是仍然存在一个问题,当A线程判断没有缓存是,进入到
第四阶段 ConcurrentHashMap + FutureTask + Map原子操作
Memorizer4做了两点改进:
1. 插入时会再次检查是否有缓存,并且这是个复合操作
这里考虑到了一种情况,如果正在运行的FutureTask被终止,那进行该运算的所有请求都会出问题,始料未及的遭遇
至此,整个设计过程就结束了。我们得到了一个在极端环境下依然能够保证高效且可伸缩运行的结果缓存。
下面基于一个计算任务开始缓存的设计
public interface Computable <A, R>{ R compute(A a) throws InterruptedException; } public class Function implements Computable <String, BigInteger>{ @Override public BigInteger compute(String a) throws InterruptedException { return new BigInteger(a); } }
第一阶段 HashMap
public class Memorizer1<A, V> implements Computable<A, V>{ private final Computable<A, V> compute; private final Map<A, V> cache; public Memorizer1(Computable<A, V> compute){ this.compute = compute; cache = new HashMap<A, V>(); } @Override public synchronized V compute(A a) throws InterruptedException { V result = cache.get(a); if(result == null){ result = compute.compute(a); cache.put(a, result); } return result; } }
如上所示,Memorizer1将Computable实现类的计算结果缓存在
Map<A, V> cache。因为HashMap不是线程安全的,为了保证并发性,Memorizer1用了个很保守的方法,对整个compute方法进行同步。这导致了Memorizer1会有很明显的可伸缩性问题,当有很多线程调用compute方法,将排一列很长的队,考虑到这么多线程的阻塞,线程状态切换,内存占用,这种方式甚至不如不使用缓存。
第二阶段 ConcurrentHashMap
public class Memorizer2<A, V> implements Computable<A, V>{ private final Computable<A, V> compute; private final Map<A, V> cache; public Memorizer2(Computable<A, V> compute){ this.compute = compute; cache = new ConcurrentHashMap<A, V>(); } @Override public V compute(A a) throws InterruptedException { V result = cache.get(a); if(result == null){ result = compute.compute(a); cache.put(a, result); } return result; } }
Memorizer2比Memorizer1拥有更好的并发性,并且具有良好的伸缩性。但它仍然有一些不足——当两个线程同时计算同一个值,它们并不知道有其它线程在做同一的事,存在着资源被浪费的可能。这个不足,对于缓存的对象只提供单次初始化,会带来安全性问题。
第三阶段 ConcurrentHashMap+FutureTask
事实上,第二阶段的功能已经符合大部分情况的功能,但是当计算时间很长导致很多线程进行同一个运算,或者缓存的对象只提供单次初始化,问题就会很棘手,在这里,我们引入FutureTask来让进行运算的线程获知是否已经有其它正在,或已经进行该运算的线程。
public class Memorizer3<A, V> implements Computable<A, V>{ private final Computable<A, V> compute; private final Map<A, FutureTask<V>> cache; public Memorizer3(Computable<A, V> compute){ this.compute = compute; cache = new ConcurrentHashMap<A, FutureTask<V>>(); } @Override public V compute(A a) throws InterruptedException { V f = cache.get(a); if(f == null){ Callable<V> eval = new Callable<V>(){ public V call() throw InterruptedException{ return c.compute(arg); } } FutureTask<V> ft = new FutureTask<V>(eval); f = ft; cache.put(a, ft); ft.run(); } try{ return f.get(); }cache(ExecutionException e){ throw launderThrowable(e.getCause()); } } }
Memorizer3缓存的不是计算的结果,而是进行运算的FutureTask。因此Memorizer3首先检查有没有执行该任务的FutureTask。如果有,则直接获得FutureTask,如果计算已经完成,FutureTask.get()方法可以立刻获得结果,如果计算未完成,后进入的线程阻塞直到get()返回结果;如果没有,则创建一个FutureTask进行运算,后续进了的同样的运算可以直接拿到结果或者等待运算完成获得结果。
Memorizer3的实现近乎完美,但是仍然存在一个问题,当A线程判断没有缓存是,进入到
cache.put(a, ft);这一步前,B线程恰好判断缓存为空,B线程创建的FutureTask会把A创建的FutureTask覆盖掉。虽然这相比Memorizer2已经是小概率事件,但是问题还是没根本解决。
第四阶段 ConcurrentHashMap + FutureTask + Map原子操作
第三阶段的ConcurrentHashMap + FutureTask由于存在“先检查再执行“的操作,会有并发问题,我们给cache使用复合操作(“若没有则添加“),避免该问题。 public class Memorizer4<A, V> implements Computable<A, V>{ private final Computable<A, V> compute; private final Map<A, FutureTask<V>> cache; public Memorizer4(Computable<A, V> compute){ this.compute = compute; cache = new ConcurrentHashMap<A, FutureTask<V>>(); } @Override public V compute(A a) throws InterruptedException { while(true){ V f = cache.get(a); if(f == null){ Callable<V> eval = new Callable<V>(){ public V call() throw InterruptedException{ return c.compute(arg); } } FutureTask<V> ft = new FutureTask<V>(eval); f = cache.putIfAbsent(a, ft); if(f == null){ f = ft; ft.run(); } } try{ return f.get(); }catch(CancellationException e){ cache.remove(arg, f); }catch(ExecutionException e){ throw launderThrowable(e.getCause()); } } } }
Memorizer4做了两点改进:
1. 插入时会再次检查是否有缓存,并且这是个复合操作
f = cache.putIfAbsent(a, ft); if(f == null){ f = ft; ft.run(); }
这里考虑到了一种情况,如果正在运行的FutureTask被终止,那进行该运算的所有请求都会出问题,始料未及的遭遇
CancellationException异常。Memorizer4的compute操作是一个循环,当在
get()阻塞的线程catch到
CancellationException异常,则会再一次申请一个创建FutureTask的机会。
至此,整个设计过程就结束了。我们得到了一个在极端环境下依然能够保证高效且可伸缩运行的结果缓存。
相关文章推荐
- 【JAVA并发编程实战】5、构建高效且可伸缩的结果缓存
- java并发工具类构建高效且可伸缩的结果缓存
- Java并发(具体实例)—— 构建高效且可伸缩的结果缓存
- Java并发(具体实例)——构建高效且可伸缩的结果缓存
- java多线程(十) 之 构建高效且可伸缩的结果缓存
- 构建高效且可伸缩的结果缓存引申的并发测试规范化
- Java并发编程之为计算结果建立高效、可伸缩的高速缓存
- 构建高效且可伸缩的结果缓存
- java并发编程实践学习(5)构建块为计算结果建立高效,可伸缩的高速缓存
- 《Java并发编程实战》 阅读笔记 构建高效且可伸缩的结果缓存
- 构建高效可伸缩的结果缓存
- Java并发编程基础构建模块(06)——高效缓存总结示例
- Java趣谈——如何构建一个高效且可伸缩的缓存
- 构建高效且可伸缩的结果缓存
- java并发编程实战-构建高效且可伸缩的结果缓存
- 一头扎进多线程-构建高效且可伸缩的结果缓存
- 构建高效可申缩的结果缓存
- 构建java高效的缓存
- 构建高效的结果缓存
- 高效可伸缩的结果缓存