您的位置:首页 > 编程语言 > Java开发

[笔记][Java7并发编程实战手册]6.并发集合

2015-09-20 18:06 513 查看
[笔记][Java7并发编程实战手册]系列目录

并发集合

本章内容包括:

使用非阻塞式线程安全列表

使用阻塞式线程安全列表

使用按优先级排序的阻塞式线程安全列表

使用带有延迟元素的线程安全列表 推荐:该队列马上解决了我在工作中的一个问题

使用线程安全可遍历映射

生成并发随机数

使用原子变量

使用原子数组

6.1.简介

  说道了并发集合,当然就有不适合在并发情况下的集合,例如:ArrayList就是这样一个集合数据结构

Java提供了两类适用于并发场景下的集合:

阻塞式集合(Blocking Collection):这类集合包括添加和移除数据的方法,当集合已满或为空时,被调用的添加或者移除方法就不能立即被执行,那么调用这个方法的线程将被阻塞,一直到该方法可以被成功执行。

非阻塞式集合(Non-Blocking Collection):这类集合也包括添加和移除数据的方法,只是如果方法不能立即被执行,则返回null或抛出异常,但是调用这个方法的线程不会被阻塞。

Java并发集合:

非阻塞式列表:ConcurrentLinkedDeque

阻塞式列表:LinkedBlockingDeque

用于数据生成活消费的阻塞式列表:LinkedTransferQueue

按优先级排序列表元素的阻塞式列表:PriorityBlockingQueue

带有延迟列表元素的阻塞式列表:DelayQueue

非阻塞式可遍历映射:ConcurrentSkipListMap

随机数字:ThreadLocalRandom

原子变量:AtomicLong 和 AtomicIntegerArray

6.2.使用非阻塞式线程安全列表ConcurrentLinkedDeque

一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素。

本小节将学习:如何在并发程序中使用非阻塞式列表。非阻塞式列表提供了一些操作,如果被执行的操作不能够立即运行(例如:在列表为空时,从列表取出一个元素)方法会抛出异常活返回null。

要注意 size 在并发情况下。是不准确的。

getFirst 和 getLast : 分别返回列表中第一个和最后一个元素,返回的元素不会从列表中移除。如果列表为空,这两个方法抛出NoSuchElementExcpetion

peek、peekFirst、peekLast:分别返回列表中第一个和最后一个元素,返回的元素不会从列表中移除。如果列表为空,这些方法返回null

remove、removeFirst、removeLast:分别返回列表中第一个和最后一个元素,反回的元素将从列表中移除。如果列表为空,这两个方法抛出NoSuchElementExcpetion

示例

场景描述:将用:添加大量的数据到一个列表中、从同一个列表中移除大量的数据。来实现示例

/**
* Created by zhuqiang on 2015/9/17 0017.
*/
public class Clinet {
public static void main(String[] args) throws InterruptedException {
ConcurrentLinkedDeque<String> deque = new ConcurrentLinkedDeque<String>();
Thread[] addTh = new Thread[100];

for (int i = 0; i < 100; i++) {
AddTask task = new AddTask(deque);
addTh[i] = new Thread(task);
addTh[i].start();
}
System.out.println("Main:addTask共有线程:" + addTh.length);

for (int i = 0; i < addTh.length; i++) {
addTh[i].join();
}
System.out.println("Main:deque有元素:" + deque.size());

for (int i = 0; i < 100; i++) {
PollTask task = new PollTask(deque);
addTh[i] = new Thread(task);
addTh[i].start();
}
System.out.println("Main:PollTask共有线程:" + addTh.length);

for (int i = 0; i < addTh.length; i++) {
addTh[i].join();  //等待线程执行完成
}
System.out.println("Main:deque有元素:" + deque.size());
}
}

/**
* 添加10000元素的线程类
*/
class AddTask implements Runnable {
private ConcurrentLinkedDeque<String> deque;

public AddTask(ConcurrentLinkedDeque<String> deque) {
this.deque = deque;
}

@Override
public void run() {
String name = Thread.currentThread().getName();
for (int i = 0; i < 10000; i++) {
deque.add(name + ":element " + i);
}
}
}

/**
* 移除10000元素的线程类
*/
class PollTask implements Runnable {
private ConcurrentLinkedDeque<String> deque;

public PollTask(ConcurrentLinkedDeque<String> deque) {
this.deque = deque;
}

@Override
public void run() {
for (int i = 0; i < 5000; i++) {  //每次移除两个元素。共5000次
deque.pollFirst();
deque.pollLast();
}
}
}


运行结果:

Main:addTask共有线程:100
Main:deque有元素:1000000
Main:PollTask共有线程:100
Main:deque有元素:0


结果说明:

先用100个线程往deque中添加100 0000 个元素。等待线程完成,并打印出元素的个数,随后再用100个线程移除deque中的元素。 值得注意的是 获取列表的大小 在并发情况下是不准确的。因为实时有线程在修改列表

6.3.使用阻塞式线程安全列表LinkedBlockingDeque

一个基于已链接节点的、任选范围的阻塞双端队列。

可选的容量范围构造方法参数是一种防止过度膨胀的方式。如果未指定容量,那么容量将等于 Integer.MAX_VALUE。只要插入元素不会使双端队列超出容量,每次插入后都将动态地创建链接节点。

  并发列表允许不同的线程在同一时间添加活移除列表中的元素,而不会造成数据不一致。

  在本节,将学会如何在并发程序中使用阻塞式列表。

阻塞式列表与非阻塞式列表的主要差别是:

阻塞式:在插入时,如果列表已满,在删除时,如果列表为空,操作不会被立即执行,而是将调用这个操作的线程阻塞队列直到可以执行成功。(这个说法是范范的。因为此类中有一些方法是可以被阻塞,可以抛出异常的)

非阻塞式:在取出一个元素时,如果列表为空,则会返回null或则抛出异常。

该类其他api:

takeFirst() 和 takeLast() : 分别返回列表中的第一个和最后ige元素,返回的元素不会从列表中移除。如果列表为空,调用方法的线程将被阻塞直到列表中有可用的元素出现。

getFitst() 和 getLast():分别返回列表中第一个和最后一个元素,返回的元素不会从列表中移除。如果列表为空,则抛出NoSuchElementExcpetion

peek() 、peekFirst() 和 peekLast():分别返回列表中第一个和最后一个元素,返回的元素不会从列表中删除。如果列表为空,返回null

poll()、pollFirst() 和 pollLast():分别返回列表中第一个和最后一个元素,返回的元素将会从列表中移除。如果列表为空,返回null

add()、addFirst() 和 addLast():分别将元素添加到列表中第一位和最后一位。如果列表已满(指定了列表容量),这些方法将抛出IllegalStateException

示例

场景描述:声明了一个有大小的队列列表,在一个线程中不断的写入,每次写入5个,重复3次,在主线程中不断的读取,每次读取3个,重复5次。 就可以看到 阻塞的效果了

/**
* Created by zhuqiang on 2015/9/20 0020.
*/
public class Client {
public static void main(String[] args) throws InterruptedException {
LinkedBlockingDeque deque = new LinkedBlockingDeque(3);  //创建一个只能装3个元素的队列列表
Thread task = new Thread(new Task(deque));
task.start();

//每次取3个元素。重复5次,和 添加进的元素个数一致,内外循环错开,方便测试 有无可用元素的时候阻塞
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 3; j++) {
//take获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素),必要时将一直等待可用元素。此方法等效于 takeFirst。
System.out.printf("main:deque的size=%s,当前取出的元素=%s\n",deque.size(),deque.take());
}
TimeUnit.MILLISECONDS.sleep(300);
}
System.out.println("main----------结束了");

}
}

class Task implements Runnable{
private LinkedBlockingDeque deque;

public Task(LinkedBlockingDeque deque) {**
this.deque = deque;**
}

@Override
public void run() {
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 5; j++) {
try {
deque.put(i+ ":" + j);  //队列如果满了。则阻塞到可用
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
TimeUnit.SECONDS.sleep(5);  //休眠两秒再执行下一轮
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}


某一次的运行结果:

main:deque的size=0,当前取出的元素=0:0
main:deque的size=3,当前取出的元素=0:1
main:deque的size=3,当前取出的元素=0:2
main:deque的size=2,当前取出的元素=0:3
main:deque的size=1,当前取出的元素=0:4
main:deque的size=0,当前取出的元素=1:0
main:deque的size=3,当前取出的元素=1:1
main:deque的size=3,当前取出的元素=1:2
main:deque的size=2,当前取出的元素=1:3
main:deque的size=1,当前取出的元素=1:4
main:deque的size=0,当前取出的元素=2:0
main:deque的size=3,当前取出的元素=2:1
main:deque的size=3,当前取出的元素=2:2
main:deque的size=2,当前取出的元素=2:3
main:deque的size=1,当前取出的元素=2:4
main----------结束了


结果说明:

size不准确,看jdk文档,没有说不是准确的,那么这里出现了不准确的size,那么造成这结果的可能性只能是:main中读,另外一个线程在写,而读size的方法和读元素的方法不是一个原子操作。读的一端和写的一端也不是同步的。所以这里造成了size的错误显

6.4.使用按优先级排序的阻塞式线程安全列表PriorityBlockingQueue

一个无界阻塞队列,它使用与类 PriorityQueue 相同的顺序规则,并且提供了阻塞获取操作。虽然此队列逻辑上是无界的,但是资源被耗尽时试图执行 add 操作也将失败(导致 OutOfMemoryError)。此类不允许使用 null 元素。依赖自然顺序的优先级队列也不允许插入不可比较的对象(这样做会导致抛出 ClassCastException)。

存入的队列元素 最终结果不是有序的

取出的时候才会取出最小的。

示例

场景描述:好把,下面的示例其实我觉得没有什么必要。无非就是说,这个PriorityBlockingQueue是一个可以自动排序的无界队列。把元素放进去,然后取出来看看是否排序了。

/**
* Created by zhuqiang on 2015/9/20 0020.
*/
public class Client {
public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<Event> queue = new PriorityBlockingQueue<Event>();
Thread[] tasks = new Thread[3];
for (int i = 0; i < 3; i++) {
Task task = new Task(i, queue);
Thread thread = new Thread(task);
thread.start();
tasks[i] = thread;
}

for (int i = 0; i < tasks.length; i++) {
tasks[i].join();
}

System.out.println("**********  queue.seze = " + queue.size());
Iterator<Event> iterator = queue.iterator();
while (iterator.hasNext()){ //队列中的元素,不是有序的。但是poll取出是有序的,每次取出 都会通过Comparable取得最小的,(这里是按从小到大排序的)
Event next = iterator.next();
System.out.println(next.getThreadId() + " : 优先级:" + next.getPriority());
}
System.out.println("*******************  poll 取出");
for (int i = 0; i < 15; i++) {
Event e = queue.poll();
System.out.printf("%s,优先级:%s\n",e.getThreadId(),e.getPriority());
}
}
}

class Event implements Comparable<Event>{
int priority;
int threadId;

@Override
public int compareTo(Event o) {
//当前优先级高,返回1,相等返回0,小于返回-1
return this.priority > o.getPriority() ? 1 : this.getPriority() == o.getPriority() ? 0 : -1;
}

public int getPriority() {
return priority;
}

public void setPriority(int priority) {
this.priority = priority;
}

public int getThreadId() {
return threadId;
}

public Event(int priority, int threadId) {
this.priority = priority;
this.threadId = threadId;
}

public void setThreadId(int threadId) {
this.threadId = threadId;
}
}

class Task implements Runnable{
private int threadId;
private PriorityBlockingQueue<Event> queue;

public Task(int threadId, PriorityBlockingQueue<Event> queue) {
this.threadId = threadId;
this.queue = queue;
}

@Override
public void run() {
for (int i = 0; i < 5; i++) {
queue.add(new Event(threadId,i));
}
}
}


某一次的运行结果:

**********  queue.seze = 15
0 : 优先级:0
2 : 优先级:0
1 : 优先级:0
3 : 优先级:0
4 : 优先级:0
2 : 优先级:1
0 : 优先级:1
3 : 优先级:1
1 : 优先级:1
4 : 优先级:1
0 : 优先级:2
1 : 优先级:2
2 : 优先级:2
3 : 优先级:2
4 : 优先级:2
*******************  poll 取出
0,优先级:0
2,优先级:0
3,优先级:0
4,优先级:0
1,优先级:0
3,优先级:1
1,优先级:1
4,优先级:1
2,优先级:1
0,优先级:1
1,优先级:2
3,优先级:2
0,优先级:2
4,优先级:2
2,优先级:2


结果说明:

存入的队列元素 最终结果不是有序的

取出的时候才会取出最小的。

6.5.使用带有延迟元素的线程安全列表DelayQueue

Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不允许使用 null 元素。

本小节将学会一种非常有趣的数据队列,DelayQueue 延迟效果的队列;根据jdk中的描述。是否延迟 是根据compareTo 返回的结果来判定的。 有这样一个场景:在微信公众号开发中,基础支持的accessToken是2个小时超时,那么我就可以把这个accessToken 设置一个超时的时间,放到这个队列中。然后用一个线程去遍历take()出这个元素然后将其更新即可,因为,take() 会一直阻塞直到有可用元素出现在头部。

要达到这个效果,必须让进入队列的 元素 实现Delayed并覆盖以下两个方法:

getDelay() : 返回与此对象相关的剩余延迟时间,以给定的时间单位表示。该方法是在 compareTo中调用的。 所以要以 compareTo 中调用的时候传入的时间单位 来计算剩余的延迟时间。

compareTo() : 队列会根据此方法来获取超时的元素

DelayQueue的其他Api

clear() 自动移除此延迟队列的所有元素。

offer(E e) 将指定元素插入此延迟队列。

peek() 获取但不移除此队列的头部;如果此队列为空,则返回 null。

take() 获取并移除此队列的头部,在可从此队列获得到期延迟的元素之前一直等待(如有必要)。

示例

场景描述:以下示例,在几个线程中,添加未来超时的event对象到DelayQueue队列中,然后在主任务取出到期或则超时的元素。 非常有趣的一个队列:我马上就想到了一个使用场景:在微信公众号开发中,有一个token超时时间是2个小时。就可以使用该队列来处理这类似的场景了。而不需要再一直循环的调用判断是否超时了

/**
* Created by zhuqiang on 2015/9/20 0020.
*/
public class Client {
public static void main(String[] args) throws InterruptedException {
/*
//测试  getDelay 中的转换是什么意思。
Date now = new Date();
TimeUnit.SECONDS.sleep(2);
long diff = new Date().getTime() - now.getTime();  //毫秒
System.out.println(diff);
TimeUnit unit = TimeUnit.valueOf(TimeUnit.NANOSECONDS.name());  //获取纳秒的timeUnit对象
System.out.println(unit.convert(diff, TimeUnit.MILLISECONDS)); //把 号码转换为 纳秒*/

DelayQueue<Event> queue = new DelayQueue<Event>();
Thread[] tasks = new Thread[3];
for (int i = 0; i < 3; i++) {
Task task = new Task(i, queue);
Thread thread = new Thread(task);
thread.start();
tasks[i] = thread;
}

for (int i = 0; i < tasks.length; i++) {
tasks[i].join();
}

while (queue.size() > 0){
Event event = queue.take();//获取并移除一个头部元素(到期元素),该方法阻塞
System.out.printf("现在时间:%s,开始时间:%s\n", new Date(),event.getStartDate());
}
}
}

class Event implements Delayed {
private Date startDate;  //开始时间

public Event(Date startDate) {
this.startDate = startDate;
}

@Override
public long getDelay(TimeUnit unit) { //返回延迟时间(开始时间 - 当前时间 就是延迟时间,要配合 compareTo来计算)
Date now = new Date();
long diff = startDate.getTime() - now.getTime();
return unit.convert(diff, TimeUnit.MILLISECONDS); //排序
}

@Override
public int compareTo(Delayed o) {
long time = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
return time < 0 ? -1 : time > 0 ? 1 : 0; // 返回-1.表示 已经超时了,返回1:表示 还没有超时,返回0 表示开始时间和当前时间相等
}

public Date getStartDate() {
return startDate;
}
}

class Task implements Runnable{
private int id; //task 编号
private DelayQueue queue;

public Task(int id, DelayQueue queue) {
this.id = id;
this.queue = queue;
}

@Override
public void run() {
Date now = new Date();
Date delay = new Date();
delay.setTime(now.getTime() + (id * 1000 * 2)); //当前任务的超时时间设置在,当前时间的毫秒数 + id * 1000 * 2
System.out.printf("Task id:%s:超时时间:%s\n",id,delay);
for (int i = 0; i < 2; i++) {
queue.add(new Event(delay));
}
}
}


某一次的运行结果:

Task id:2:超时时间:Sun Sep 20 16:12:48 CST 2015
Task id:0:超时时间:Sun Sep 20 16:12:44 CST 2015
Task id:1:超时时间:Sun Sep 20 16:12:46 CST 2015
现在时间:Sun Sep 20 16:12:44 CST 2015,开始时间:Sun Sep 20 16:12:44 CST 2015
现在时间:Sun Sep 20 16:12:44 CST 2015,开始时间:Sun Sep 20 16:12:44 CST 2015
现在时间:Sun Sep 20 16:12:46 CST 2015,开始时间:Sun Sep 20 16:12:46 CST 2015
现在时间:Sun Sep 20 16:12:46 CST 2015,开始时间:Sun Sep 20 16:12:46 CST 2015
现在时间:Sun Sep 20 16:12:48 CST 2015,开始时间:Sun Sep 20 16:12:48 CST 2015
现在时间:Sun Sep 20 16:12:48 CST 2015,开始时间:Sun Sep 20 16:12:48 CST 2015


结果说明:

1. 先看任务的超时时间。

2. 再看现在的时间,和 开始时间,你会发现现在的时间和开始时间一样的,这个是正确的,因为你设定的该对象取出的开始时间(也就是超时时间 和 现在时间一致,或则大于现在时间),该元素才能被取出来

3. 我故意在设置开始时间的时候 把每次任务的间隔时间拉得很开,就是为了让你看到 在控制台上 有一个 阻塞停顿等待超时的效果。

6.6.使用线程安全可遍历映射
ConcurrentSkipListMap<K,V>

ConcurrentSkipListMap<K,V>
可缩放的并发 ConcurrentNavigableMap 实现。映射可以根据键的自然顺序进行排序,也可以根据创建映射时所提供的 Comparator 进行排序,具体取决于使用的构造方法。

此类实现 SkipLists 的并发变体,为 containsKey、get、put、remove 操作及其变体提供预期平均 log(n) 时间开销。多个线程可以安全地并发执行插入、移除、更新和访问操作。迭代器是弱一致 的,返回的元素将反映迭代器创建时或创建后某一时刻的映射状态。它们不 抛出 ConcurrentModificationException,可以并发处理其他操作。升序键排序视图及其迭代器比降序键排序视图及其迭代器更快。

  SkipList: 是基于并发列表的数据结构,效率与二叉树相近。

  插入数据的时候,会根据键的自然顺序进行排序

本章节将学会如何使用 
ConcurrentSkipListMap<K,V>
 实现对联系人对象的映射。

ConcurrentSkipListMap的其他API:

headMap(K toKey):返回映射中所有键值小于参数k的子集

tailMap(K fromKey):返回映射中所有键值大于fromkey的子集

putIfAbsent(K key,V value):如果映射中不存在键key,那么就将key 和 value添加到map中。

pollLastEntry():返回并移除最后一个Map.Entry对象。

replace(K key,V value):如果映射中已经存在key,则用参数中的value替换掉。

示例

场景描述:以下示例做的事情就是,在一个线程中不断的写入联系人信息。在主线程中异步的读取自然顺序最大和最小的元素,还有根据key的规律取一个子集。

/**
* Created by zhuqiang on 2015/9/20 0020.
*/
public class Client {
public static void main(String[] args) throws InterruptedException {
ConcurrentSkipListMap<String, Contact> map = new ConcurrentSkipListMap<String, Contact>();
Thread[] tasks = new Thread[3];
int index = 0;
for (char i = 'A'; i < 'D'; i++) {
Task task = new Task(map, i + "");
Thread thread = new Thread(task);
tasks[index] = thread;
thread.start();
index++;
}
//        for (int i = 0; i < tasks.length; i++) {
//            tasks[i].join();
//        }
TimeUnit.MILLISECONDS.sleep(1); //休眠一毫秒,方便看到异步存取的现象
//获取自然顺序最小的元素
System.out.println("Map.seize() = " + map.size());
Map.Entry<String, Contact> firstEntry = map.firstEntry();
Contact value = firstEntry.getValue();
System.out.printf("最小的firstEntry:key=%s,电话=%s,姓名=%s\n", firstEntry.getKey(), value.getPhone(), value.getName());

//获取自然顺序最大的元素
System.out.println("Map.seize() = " + map.size());
Map.Entry<String, Contact> lastEntry = map.lastEntry();
Contact lastEntryValue = lastEntry.getValue();
System.out.printf("最大的lastEntry:key=%s,电话=%s,姓名=%s\n", lastEntry.getKey(), lastEntryValue.getPhone(), lastEntryValue.getName());

//取得一个子集
System.out.println("************获取A1008-B1002 的子集,包含头不包含尾巴");
ConcurrentNavigableMap<String, Contact> subMap = map.subMap("A1008", "B1002");
Map.Entry<String, Contact> e;
do {
e = subMap.pollFirstEntry();
if(e != null){
Contact ev = e.getValue();
System.out.printf("key=%s,电话=%s,姓名=%s\n", e.getKey(), ev.getPhone(), ev.getName());
}
} while (e != null);
System.out.println("Map.seize() = " + map.size());

}
}

// 联系人类
class Contact{
private String name;
private String phone;

public Contact(String name, String phone) {
this.name = name;
this.phone = phone;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getPhone() {
return phone;
}

public void setPhone(String phone) {
this.phone = phone;
}
}

class  Task implements  Runnable{
private ConcurrentSkipListMap<String,Contact> map;
private String id;  //任务id

public Task(ConcurrentSkipListMap<String, Contact> map, String id) {
this.map = map;
this.id = id;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
Contact contact = new Contact(id, i + 1000 + "");
map.put(id+contact.getPhone(),contact); //创建联系人信息并添加到 map中
}
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


某一次的运行结果:

Map.seize() = 3
最小的firstEntry:key=A1000,电话=1000,姓名=A
Map.seize() = 30
最大的lastEntry:key=C1009,电话=1009,姓名=C
************获取A1008-B1002 的子集,包含头不包含尾巴
key=A1008,电话=1008,姓名=A
key=A1009,电话=1009,姓名=A
key=B1000,电话=1000,姓名=B
key=B1001,电话=1001,姓名=B
Map.seize() = 26


结果说明:

休眠一号码后,map中已经存在有3个元素了,取出了最小的一个元素信息

取最大元素的时候,任务已经执行完成了。我们只添加了30个信息。

取子集的时候。使用了pollFirstEntry来遍历子集,获得并删除元素。所以最后map的size比遍历前少了。

6.7.生成并发随机数ThreadLocalRandom

jdk中说,这种新的多线程随机数生成器,在多线程环境下比random拥有更好的性能。但从示例看出来的好处是:

感觉就像是threadLocal使用一样,使用静态方法就能取得实例。

在主线程中启动的多个线程也算共享一个变量咯?

ps:源码看不太懂。也没有去深究。所以不知道怎么说。

示例

/**
* Created by zhuqiang on 2015/9/21 0021.
*/
public class Client {
public static void main(String[] args) {
System.out.println("MAIN:start***** " + ThreadLocalRandom.current());
for (int i = 0; i < 3 ; i++) {
new Thread(new Task()).start();
}
System.out.println("MAIN:end***** " + ThreadLocalRandom.current());
}
}

class Task implements Runnable{
private ThreadLocalRandom random = ThreadLocalRandom.current();
@Override
public void run() {
String name = Thread.currentThread().getName();
System.out.println( name + ":start***** " + random);
for (int i = 0; i < 5; i++) {

System.out.printf("%s,num:%s\n", name,random.nextInt(10));
}
System.out.println( name + ":end***** " + ThreadLocalRandom.current());
}
}


运行结果

MAIN:start***** java.util.concurrent.ThreadLocalRandom@677327b6
MAIN:end***** java.util.concurrent.ThreadLocalRandom@677327b6
Thread-0:start***** java.util.concurrent.ThreadLocalRandom@677327b6
Thread-1:start***** java.util.concurrent.ThreadLocalRandom@677327b6
Thread-0,num:4
Thread-2:start***** java.util.concurrent.ThreadLocalRandom@677327b6
Thread-2,num:4
Thread-2,num:9
Thread-2,num:8
Thread-0,num:9
Thread-0,num:8
Thread-0,num:5
Thread-0,num:6
Thread-1,num:4
Thread-0:end***** java.util.concurrent.ThreadLocalRandom@677327b6
Thread-2,num:5
Thread-1,num:9
Thread-1,num:8
Thread-1,num:5
Thread-1,num:6
Thread-1:end***** java.util.concurrent.ThreadLocalRandom@677327b6
Thread-2,num:6
Thread-2:end***** java.util.concurrent.ThreadLocalRandom@677327b6


结果说明:

  可以看到。几个线程全部共享的一个生成器实例。也不用构造去注入共享随机数生成器了

6.8.使用原子变量 atomic variable

书上一大段的描述总结下来如下:

什么是原子变量?

  在java中声明一个普通变量 int i=1; i 就是一个变量,在多线程中对这个i并发操作,那么就会出现数据竞争,而原子变量就是jdk5引进的一种机制,在多线程中对这个原子变量进行并发操作。不会出现错误的数据。

  在java中,原子变量使用CAS(CAS:Compare and Swap, 翻译成比较并交换。)来进行原子操作。

什么是CAS?

  cas就是借助C来调用CPU底层指令实现的,使用禁止重排序,锁定总线事务,使用缓存锁保证原子性等来让 一个线程在操作这个原子变量的时候,其他线程不能访问主内存,等待上一个线程操作完成之后把改变的值刷新到主内存中为止,可以想象成 一个锁,不过是cpu指令级别的。

参考cas说明:/article/8182138.html ,我感觉说得很好,和 java内存模型中提到的一些符合。

原子变量还有其他的:AtomicInteger、AtomicBoolean、AtomicReference

示例

场景描述:以下示例就是演示,普通变量 和 原子变量,不使用任何同步手段,看他们在并发下的一个累加值。

至于其他的 示例 银行加减款,和这个类似,当然该类还有其他的一些api。jdk6有中文,使用比较简单。就不写了

/**
* Created by zhuqiang on 2015/9/21 0021.
*/
public class Client {
public static void main(String[] args) throws InterruptedException {
Integer[] num = {0}; //使用数组类型,不使用 Integer 是因为 Integer 和String 类似,是不可改变的。

Thread[] ts = new Thread[3];
for (int i = 0; i < ts.length; i++) {
Thread t = new Thread(new Task(num));
ts[i] = t;
t.start();
}

for (int i = 0; i < ts.length; i++) {
ts[i].join();
}

System.out.println("************ 普通变量结果:" + num[0]);

AtomicInteger aci = new AtomicInteger(0);
for (int i = 0; i < ts.length; i++) {
Thread t = new Thread(new TaskAtomic(aci));
ts[i] = t;
t.start();
}

for (int i = 0; i < ts.length; i++) {
ts[i].join();
}
System.out.println("************ 原子变量结果:" + aci);
}
}

class Task implements Runnable{
private Integer[] i;

public Task(Integer[] i) {
this.i = i;
}

@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);  //加休眠,让结果更能明显一点
} catch (InterruptedException e) {
e.printStackTrace();
}
i[0] = ++i[0];
}
}

class TaskAtomic implements  Runnable{
private AtomicInteger i;

public TaskAtomic(AtomicInteger i) {
this.i = i;
}

@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
i.addAndGet(1);
}
}


运行结果:

************ 普通变量结果:1
************ 原子变量结果:3


结果说明:

都使用了3个线程累加,期望的值应该为3,但是普通变量明显的是错误的

6.9.使用原子数组

顾名思义,原子数组语义上应该和原子变量类似,只是变成了数组。同样也是使用的cas操作来保证内存一致性;

CAS三步:

取得变量值,即旧值

在本地内存中修改这个变量的值。即新值

如果旧值与本地变量相等,则用新值替换掉旧值,然后刷新到主内存中。

要记得cas 会使用锁住总线事务等来保证原子操作。

示例

场景描述:以下示例就是讲:两个线程,一个线程把线程把每个元素都增加1,一个线程把每个线程都减少1.然后查看是否有 没有被剪掉的值;

对于下面的操作,我其实很好奇,这个类 怎么知道我要在加1的基础上剪掉1的呢? 换个说法就是。在并发运行的类。他们两个不会向普通变量哪样会读取到同一个内存副本,然后就造成数据错误了。 能说得过去是因为cas操作。但是具体cas 是怎么来实现的呢,cas 通过内存屏障,cpu指令等来实现,这些也都是只是 干货。对于自己的理解还是不知道。 好烦。

/**
* Created by zhuqiang on 2015/9/21 0021.
*/
public class Client {
public static void main(String[] args) throws InterruptedException {
AtomicIntegerArray array = new AtomicIntegerArray(100);
Thread t1 = new Thread(new TaskAdd(array));
Thread t2 = new Thread(new TaskSub(array));
t1.start();
t2.start();  //可以把 t2 的启动关闭 查看 数组中的数据

t1.join();
t2.join();

//打印不为0的数据
for (int i = 0; i < array.length(); i++) {
int i1 = array.get(i);
if(i1 != 0){
System.out.println(i +" = " + i1);
}
}
}
}

class TaskAdd implements Runnable{
private AtomicIntegerArray array;

public TaskAdd(AtomicIntegerArray array) {
this.array = array;
}

@Override
public void run() {
for (int i = 0; i < array.length(); i++) {
array.incrementAndGet(i); //以原子方式将索引 i 的元素加 1。
}
}
}

class TaskSub implements Runnable{
private AtomicIntegerArray array;

public TaskSub(AtomicIntegerArray array) {
this.array = array;
}

@Override
public void run() {
for (int i = 0; i < array.length(); i++) {
array.decrementAndGet(i); //以原子方式将索引 i 的元素-1。
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: