您的位置:首页 > 编程语言 > Java开发

Java 并发编程实战之 基础构建模块

2017-03-16 21:28 731 查看

同步容器类

包括 Vector与HashTable

注意与同步容器的区别

内部通过Synchronize同步代码块实现

同步容器类的问题

首先明确所谓的问题并不是线程安全问题

多个线程修改同一个同步容器,可能会出现失效数据问题。ArrayIndexOutOfBoundsException 例子如下:

public static Object getLast(Vector list){
//问题所在,可能是一个失效数据
int lastIndex = list.size();
return list.get(lastIndex);
}
public static void deleteLast(Vector list){
int lastIndex = list.size();
list.remove(lastIndex);
}


可以通过额外的客户端加锁 即使用同一个锁 守护 getLast() 与 deleteLast() 方法。这无疑进一步降低了并发性

迭代器与ConcurrentModificationException

当我们使用Iterator对容器进行迭代的时候,如果有其他线程修改了该容器,将会抛出ConcurrentModificationException。对该异常的检查是通过计数器实现的。

private class Itr implements Iterator<E> {
int cursor;       // index of next element to return
int lastRet = -1; // index of last element returned; -1 if no such
int expectedModCount = modCount;

public boolean hasNext() {
return cursor != size;
}

@SuppressWarnings("unchecked")
public E next() {
checkForComodification();
int i = cursor;
if (i >= size)
throw new NoSuchElementException();
Object[] elementData = ArrayLi
4000
st.this.elementData;
if (i >= elementData.length)
throw new ConcurrentModificationException();
cursor = i + 1;
return (E) elementData[lastRet = i];
}

public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
checkForComodification();

try {
ArrayList.this.remove(lastRet);
cursor = lastRet;
lastRet = -1;
expectedModCount = modCount;
} catch (IndexOutOfBoundsException ex) {
throw new ConcurrentModificationException();
}
}

@Override
@SuppressWarnings("unchecked")
public void forEachRemaining(Consumer<? super E> consumer) {
Objects.requireNonNull(consumer);
final int size = ArrayList.this.size;
int i = cursor;
if (i >= size) {
return;
}
final Object[] elementData = ArrayList.this.elementData;
if (i >= elementData.length) {
throw new ConcurrentModificationException();
}
while (i != size && modCount == expectedModCount) {
consumer.accept((E) elementData[i++]);
}
// update once at end of iteration to reduce heap write traffic
cursor = i;
lastRet = i - 1;
checkForComodification();
}

final void checkForComodification() {
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}
}


如果不想出现该问题,除了在客户端代码中加锁外,还可以考虑迭代容器的拷贝对象,而不是原始对象。

隐藏的迭代器

编译器会将字符串连接操作 println(“String”+set ) 转化成StringBuilder.append(set) ,而这个操作又会调用toString()方法,容器的toString()方法会调用迭代器。

同理还有 hashCode、equals、containsAll、removeAll和retainAll等。

package net.jcip.examples;

import java.util.*;

import net.jcip.annotations.*;

/**
* HiddenIterator
* <p/>
* Iteration hidden within string concatenation
*
* @author Brian Goetz and Tim Peierls
*/
public class HiddenIterator {
@GuardedBy("this") private final Set<Integer> set = new HashSet<Integer>();

public synchronized void add(Integer i) {
set.add(i);
}

public synchronized void remove(Integer i) {
set.remove(i);
}

public void addTenThings() {
Random r = new Random();
for (int i = 0; i < 10; i++)
add(r.nextInt());
System.out.println("DEBUG: added ten elements to " + set);
}
}


并发容器

Map

ConcurrentHashMap

ConcurrentSkipListMap(LinkedList)

ConcurrentSkipListSet(HashSet)

List

CopyOnWriteArrayList

Queue

BlockingQueue(FIFO)

LinkedBlockingQueue

ArrayBlockingQueue

PriorityQueue(优先级队列)

ConcurrentHashMap

总结

更细粒度机制的锁 分段锁

可以在迭代过程中修改迭代器结构。

弱一致性

size()

isEmpty()

原子操作

若没有则添加

若相等则删除

如相等则替换

适用于

事件通知系统

扩展

红黑树

ConcurrentHashMap扩容

生产者消费者问题

阻塞的put、take方法,非阻塞的offer和poll方法

生产者与消费者之间的解耦和

需要考虑有界队列(对于生产者来说),避免生产过剩,使机器负荷过载。

SynchronousQueue实现

它不会为队列中的元素维护存储空间,它维护的是一组线程,这些线程负责把数据加入或移除队列,消除了数据从生产者到存储队列再到消费者的中间过程,即生产者直接将数据交付给消费者,put、take双向阻塞,互相等待,成对工作。

SynchronousQueue实现原理

串行线程封闭

一个对象从一个线程(生产者)传递到下一个线程(消费者),在传递给下一个线程后,第一个线程不会在访问该对象。

双端队列与工作取密模式

Deque

ArrayDeque

BlockingDeque

LinkedBlockingDeque

生产者消费者模式中,所有的消费者共享一个队列,而取密模式中,所有消费者各自操作一个双端队列,当自己的队列消费完毕后,可以秘密的消费其他消费者的队列,但是,是从相反的端取数据,从而降低了竞争,确保每个线程都保持忙碌状态。

阻塞方法与中断方法

阻塞的原因

等待IO

主线程等待子线程的计算结果

等待一个锁

sleep()

InterruptedException抛出该异常的方法都是阻塞方法

Thread的interrupt方法,// Just to set the interrupt flag,只是设置了该线程的状态位为中断状态,如果该线程的实现中 有检查中断状态的代码 比如
if(this.isInterrupted)
此时该线程才可能在运行到该处时执行相应的操作,否则,该线程不会因为中断状态被改变而主动停下来。

当调用一个阻塞方法,而不能抛出InterruptException时,比如重写 自父类或借口的方法,在方法中调用了阻塞方法,而父类的方法签名中却没有抛出异常的声明,这时候必须使用tyr-catch捕获该异常,此时需要恢复被中断的状态
Thread.currentThread.interrupt()
,这样在更高层的代码中可以检查到该中断状态。

同步工具类

同步工具类可以是任何对象,只要根据其自身状态来协调线程的控制流。

常见同步工具类

Semaphore(信号量,可以理解为多个线程共享多个锁)

Barrier(栅栏 达到指定数目后一起放行)

Latch(闭锁)

package net.jcip.examples;

import java.util.concurrent.*;

/**
* TestHarness
* <p/>
* Using CountDownLatch for starting and stopping threads in timing tests
*
* @author Brian Goetz and Tim Peierls
*/
public class TestHarness {
public long timeTasks(int nThreads, final Runnable task)
throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);

for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException ignored) {
}
}
};
t.start();
}

long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end - start;
}
}


Future Task

通过Callable来实现的

Waiting to run 等待运行

Running 正在运行

Completed 运行完成

类似于Thread 和 Runable的关系

future.get()的行为取决于任务的执行状态,如果已完成,会立返回,否则会阻塞知道运行完成,返回结果或抛出异常。

package net.jcip.examples;

import java.util.concurrent.*;

/**
* Preloader
*
* Using FutureTask to preload data that is needed later
*
* @author Brian Goetz and Tim Peierls
*/

public class Preloader {
ProductInfo loadProductInfo() throws DataLoadException {
return null;
}

private final FutureTask<ProductInfo> future =
new FutureTask<ProductInfo>(new Callable<ProductInfo>() {
public ProductInfo call() throws DataLoadException {
return loadProductInfo();
}
});
private final Thread thread = new Thread(future);

public void start() { thread.start(); }

public ProductInfo get()
throws DataLoadException, InterruptedException {
try {
return future.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof DataLoadException)
throw (DataLoadException) cause;
else
throw LaunderThrowable.launderThrowable(cause);
}
}

interface ProductInfo {
}
}

class DataLoadException extends Exception { }


信号量

可以理解为共享的一组锁

常用于资源池的实现(有界的资源池)

使用信号量为容器设置边界。

package net.jcip.examples;

import java.util.*;
import java.util.concurrent.*;

/**
* BoundedHashSet
* <p/>
* Using Semaphore to bound a collection
*
* @author Brian Goetz and Tim Peierls
*/
public class BoundedHashSet <T> {
private final Set<T> set;
private final Semaphore sem;

public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}

ddfa
public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded)
sem.release();
}
}

public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved)
sem.release();
return wasRemoved;
}
}


栅栏

类似于闭锁

闭锁用于等待事件,且是一次性的(latch.countDown(),latch.await())

栅栏用于等待所有线程到齐

CyclicBarrier 可重复使用的栅栏

new CyclicBarrier(int count,Runnable run)
注意第二个参数,他的作用是,等所有线程通过栅栏后执行的一个操作。

package net.jcip.examples;

import java.util.concurrent.*;

/**
* CellularAutomata
*
* Coordinating computation in a cellular automaton with CyclicBarrier
*
* @author Brian Goetz and Tim Peierls
*/
public class CellularAutomata {
private final Board mainBoard;
private final CyclicBarrier barrier;
private final Worker[] workers;

public CellularAutomata(Board board) {
this.mainBoard = board;
int count = Runtime.getRuntime().availableProcessors();
this.barrier = new CyclicBarrier(count,
new Runnable() {
public void run() {
mainBoard.commitNewValues();
}});
this.workers = new Worker[count];
for (int i = 0; i < count; i++)
workers[i] = new Worker(mainBoard.getSubBoard(count, i));
}

private class Worker implements Runnable {
private final Board board;

public Worker(Board board) { this.board = board; }
public void run() {
while (!board.hasConverged()) {
for (int x = 0; x < board.getMaxX(); x++)
for (int y = 0; y < board.getMaxY(); y++)
board.setNewValue(x, y, computeValue(x, y));
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}

private int computeValue(int x, int y) {
// Compute the new value that goes in (x,y)
return 0;
}
}

public void start() {
for (int i = 0; i < workers.length; i++)
new Thread(workers[i]).start();
mainBoard.waitForConvergence();
}

interface Board {
int getMaxX();
int getMaxY();
int getValue(int x, int y);
int setNewValue(int x, int y, int value);
void commitNewValues();
boolean hasConverged();
void waitForConvergence();
Board getSubBoard(int numPartitions, int index);
}
}


构建高效且可伸缩的结果缓存

问题原型

存在一个方法
V C(A){...}


C执行一次消耗时间很长

C执行一次消耗很多资源(IO,数据库等)

解决思路

毋庸置疑想到了缓存计算结果。

解决方案1

使用HashMap缓存计算结果,并使用客户端加锁的方式保证线程安全。

弊端明显,并行效率奇差,尤其 计算过程耗时很长,阻塞的时间也很长。

package net.jcip.examples;

import java.math.BigInteger;
import java.util.*;

import net.jcip.annotations.*;

/**
* Memoizer1
*
* Initial cache attempt using HashMap and synchronization
*
* @author Brian Goetz and Tim Peierls
*/
public class Memoizer1 <A, V> implements Computable<A, V> {
@GuardedBy("this") private final Map<A, V> cache = new HashMap<A, V>();
private final Computable<A, V> c;

public Memoizer1(Computable<A, V> c) {
this.c = c;
}

public synchronized V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}
}

interface Computable <A, V> {
V compute(A arg) throws InterruptedException;
}

class ExpensiveFunction
implements Computable<String, BigInteger> {
public BigInteger compute(String arg) {
// after deep thought...
return new BigInteger(arg);
}
}


解决方案2

使用并行容器 ConcurrentHashMap

问题 虽然解决了并发问题,但是考虑一个场景,当同时有多个线程计算一个从未被缓存的运算时,由于结果未被缓存,所以这些线程都会执行,并且将该结果缓存多次。

package net.jcip.examples;

import java.util.*;
import java.util.concurrent.*;

/**
* Memoizer2
* <p/>
* Replacing HashMap with ConcurrentHashMap
*
* @author Brian Goetz and Tim Peierls
*/
public class Memoizer2 <A, V> implements Computable<A, V> {
private final Map<A, V> cache = new ConcurrentHashMap<A, V>();
private final Computable<A, V> c;

public Memoizer2(Computable<A, V> c) {
this.c = c;
}

public V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}
}


解决方案3

如果理解了Future,那么很自然的想到,我们不缓存结果了,因为运算一个结果不是立刻就能得到数据的,因此我们缓存一个“运算”,而不缓存一个结果。

问题 :
if(){concurrent.put()}
的方式明显不是一个原子操作,因此还会出现同一个结果多次运算的情况(但是几率已经大大降低,是一个可接受的方案),但是ConcurrentHashMap又不能通过加锁的方式保证原子性。

package net.jcip.examples;

import java.util.*;
import java.util.concurrent.*;

/**
* Memoizer3
* <p/>
* Memoizing wrapper using FutureTask
*
* @author Brian Goetz and Tim Peierls
*/
public class Memoizer3 <A, V> implements Computable<A, V> {
private final Map<A, Future<V>> cache
= new ConcurrentHashMap<A, Future<V>>();
private final Computable<A, V> c;

public Memoizer3(Computable<A, V> c) {
this.c = c;
}

public V compute(final A arg) throws InterruptedException {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = new Callable<V>() {
public V call() throws InterruptedException {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = ft;
cache.put(arg, ft);
ft.run(); // call to c.compute happens here
}
try {
return f.get();
} catch (ExecutionException e) {
throw LaunderThrowable.launderThrowable(e.getCause());
}
}
}


完美解决方案

使用ConcurrenHashMap提供的原子操作putIfAbsent()

package net.jcip.examples;

import java.util.concurrent.*;

/**
* Memoizer
* <p/>
* Final implementation of Memoizer
*
* @author Brian Goetz and Tim Peierls
*/
public class Memoizer <A, V> implements Computable<A, V> {
private final ConcurrentMap<A, Future<V>> cache
= new ConcurrentHashMap<A, Future<V>>();
private final Computable<A, V> c;

public Memoizer(Computable<A, V> c) {
this.c = c;
}

public V compute(final A arg) throws InterruptedException {
while (true) {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = new Callable<V>() {
public V call() throws InterruptedException {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = cache.putIfAbsent(arg, ft);
if (f == null) {
f = ft;
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
cache.remove(arg, f);
} catch (ExecutionException e) {
throw LaunderThrowable.launderThrowable(e.getCause());
}
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: