您的位置:首页 > 其它

第六章并发集合

2015-09-04 06:38 435 查看

Java 7 并发编程实战手册目录

代码下载(https://github.com/Wang-Jun-Chao/java-concurrency)

第六章并发集合

6.1简介

  数据结构(DataStnreture)是编程中的基本元素,几乎每个程序都使用一种或多种数据结构来存储和管理数据。Java API提供了包含接口、类和算法的Java集合框架(Java Collection Framework),它实现了可用在程序中的大量数据结构。

  当需要在并发程序中使用数据集合时,必须要谨慎地选择相应的实现方式。大多数集合类不能直接用于并发应用,因为它们没有对本身数据的并发访问进行控制。如果一些并发任务共享了一个不适用于并发任务的数据结构,将会遇到数据不一致的错误,并将影响程序的准确运行。这类数据结构的一个例子是Arraylist类。

  Java提供了一些可以用于并发程序中的数据集合,它们不会引起任何问题。—般来说,Java提供了两类适用于并发应用的集合。

  ♦阻塞式集合(Blocking Collection):这类集合包括添加和移除数据的方法。当集合已满或为空时,被调用的添加或者移除方法就不能立即被执行,那么调用这个方法的线程将被阻塞,一直到该方法可以被成功执行。

  ♦非阻塞式集合(Non-Blocking Collection):这类集合也包括添加和移除数据的方法。如果方法不能立即被执行,则返回null或抛出异常,但是调用这个方法的线程不会被阻塞。

通过本章的各个小节,你将学会如何在并发应用中使用一些Java集合。

  ♦非阻塞式列表对应的实现类:ConcurrentLinkedDeque类;

  ♦阻塞式列表对应的实现类:LinkedBlockingDeque类:

  ♦用于数据生成或消费的阻塞式列表对应的实现类:LinkedTransferQueire类:

  ♦按优先级排序列表元素的阻塞式列表对应的实现类:PriorityBlockingQueiK类;

  ♦带有延迟列表元素的阻塞式列表对应的实现类:DelayQueue类;

  ♦非阻塞式可遍历映射对应的实现类:ConcurrentSkipListMap类;

  ♦随机数字对应的实现类:ThreadLocalRandom类;

  ♦原子变量对应的实现类:AtomicLong和AtomicIntegerArray类。

6.2使用非阻塞式线程安全列表

  最基本的集合类型是列表(List)。一个列表包含的元素数量不定,可以在任何位置添加-读取或移除元素。并发列表允许不同的线程在同一时间添加或移除列表中的元素,而不会造成数据不一致。

  在本节,将会学到如何在并发程序中使用非阻塞式列表。非阻塞式列表提供了一些操作,如果被执行的操作不能够立即运行(例如,在列表为空时,从列表取出一个元素),方法会抛出异常或返回null。Java 7引入了 ConcurrentLinkedDeque类来实现非阻塞式并发。

  将要实现的范例包括以下两个不同的任务:

  ♦添加大量的数据到一个列表中;

  ♦从同一个列表中移除大量的数据。

package com.concurrency.task;

import java.util.concurrent.ConcurrentLinkedDeque;

/**
 * 添加数据任务类,向并发队列中添加10000个数据
 */
public class AddTask implements Runnable {

    /**
     * 等待添加数组的队列
     */
    private ConcurrentLinkedDeque<String> list;

    /**
     * 构造函数
     *
     * @param list 等待添加数组的队列
     */
    public AddTask(ConcurrentLinkedDeque<String> list) {
        this.list = list;
    }

    /**
     * 核心方法,向并发队列中添加10000个数据
     */
    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        for (int i = 0; i < 10000; i++) {
            list.add(name + ": Element " + i);
        }
    }

}


package com.concurrency.task;

import java.util.concurrent.ConcurrentLinkedDeque;

/**
 * 取数据任务类,从并发队列中删除10000个数据
 */
public class PollTask implements Runnable {

    /**
     * 待删除元素的队列
     */
    private ConcurrentLinkedDeque<String> list;

    /**
     * 构造函数
     *
     * @param list 待删除元素的队列
     */
    public PollTask(ConcurrentLinkedDeque<String> list) {
        this.list = list;
    }

    /**
     * 核心方法,在并发队列的头部和尾部各删除5000个元素
     */
    @Override
    public void run() {
        for (int i = 0; i < 5000; i++) {
            list.pollFirst();
            list.pollLast();
        }
    }
}


package com.concurrency.core;

import com.concurrency.task.AddTask;
import com.concurrency.task.PollTask;

import java.util.concurrent.ConcurrentLinkedDeque;

public class Main {
    public static void main(String[] args) throws Exception {
        // 创建一个并发双向队列对象
        ConcurrentLinkedDeque<String> list = new ConcurrentLinkedDeque<>();
        // 创建长度为100的线程数组
        Thread threads[] = new Thread[100];

        // 创建100个AddTask对象,并且让他们在各自的线程中运行
        for (int i = 0; i < threads.length; i++) {
            AddTask task = new AddTask(list);
            threads[i] = new Thread(task);
            threads[i].start();
        }
        System.out.printf("Main: %d AddTask threads have been launched\n", threads.length);

        // 等待所有的线程执行完
        for (Thread thread : threads) {
            thread.join();
        }

        // 输出队列长度信息
        System.out.printf("Main: Size of the List: %d\n", list.size());

        // 创建100个PollTask对象,并且让他们在各自的线程中运行
        for (int i = 0; i < threads.length; i++) {
            PollTask task = new PollTask(list);
            threads[i] = new Thread(task);
            threads[i].start();
        }
        System.out.printf("Main: %d PollTask threads have been launched\n", threads.length);

        // 等待线程执行完
        for (Thread thread : threads) {
            thread.join();
        }

        // 输出队列长度信息
        System.out.printf("Main: Size of the List: %d\n", list.size());
    }
}




图6.2-1 运行结果

  本节使用的泛型参数是String类的ConcurrentLinkedDeque对象,用来实现一个非阻塞式并发数据列表。

  首先,执行100个AddTask任务将元素添加到ConcurrentLinkedDeque对象list中。每个任务使用add()方法向这个列表中插入10,000个元素。add()方法将新元素添加到列表 尾部.当所有任务运行完毕,列表中的元素数量将被打印到控制台,在这一刻,列表中有 1,000,000 个元素。

  接下来,执行100个PollTask任务将元素从列表中移除。每个任务使用pollFirst()和 pollLast()方法从列表中移除10,000个元素。putFirst()方法返回并移除列表中的第一个元 素,pollLast()方法返回并移除列表中的最后一个元素。如果列表为空,这些方法返回null。 当所有任务运行完毕,列表中的元素数量将被打印到控制台。在这一刻,列表中有0个元素。

使用size()方法输出列表中的元素数量。需要注意的是,这个方法返回的值可能不是真实的,尤其当有线程在添加数据或移除数据时,这个方法需要遍历整个列表来计算元素数量,而遍历过的数据可能已经改变。仅当没有任何线程修改列表时,才能保证返回的结果是准确的。

  ConcurrentLinkedDeque类提供了其他从列表中读取数据的方法。

  ♦ getFirsl()和getlast():分别返回列表中第一个和最后一个元素,返回的元素不会从列表中移除。如果列表为空,这两个方法抛出NoSuchElementExcpetion异常。

  ♦ peek()、peekFirst()和peekLast():分别返回列表中第一个和最后一个元素,返回的元素不会从列表中移除。如果列表为空,这些方法返回null。

  ♦ remove()、removeFirst()和removeLast():分别返回列表中第一个和最后一元素,返回的元素将会从列表中移除。如果列表为空,这些方法抛出NoSuchElementExcprtion 异常。

6.3使用阻塞式线程安全列表

  最基本的集合类型是列表。一个列表包含的元素数量不定,可以在任何位置添加、读取或移除元素。并发列表允许不同的线程在同一时间添加或移除列表中的元素,而不会造成数据不一致。

  在本节,你会学到如何在并发辦中賴隨式列表。阻塞式列表与非阻塞式列表的主要差别是:阻塞式列表在插入和刪除操作时,如果列表已满或为空,操作不会被立即执行, 而是将调用这个操作的线程阻塞队列直到操作可以执行成功。Java引入了 LinkedBlocldngDeqne类来实现阻塞式列表。

  将要实现的范例包括以下两个不同的任务:

  ♦添加大量的数据到一个列表中:

  ♦从同一个列表中移除大量的数据。

package com.concurrency.task;

import java.util.Date;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

/**
 * 向阻塞队列中添加数据的类
 */
public class Client implements Runnable {

    private LinkedBlockingDeque<String> requestList;

    public Client(LinkedBlockingDeque<String> requestList) {
        this.requestList = requestList;
    }

    /**
     * 核心方法,添加15个对象
     */
    @Override
    public void run() {
        for (int i = 0; i < 3; i++) {
            for (int j = 0; j < 5; j++) {
                StringBuilder request = new StringBuilder();
                request.append(i);
                request.append(":");
                request.append(j);
                try {
                    requestList.put(request.toString());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.printf("Client: %s at %s.\n", request, new Date());
            }
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.printf("Client: End.\n");
    }
}


package com.concurrency.core;

import com.concurrency.task.Client;

import java.util.Date;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) throws Exception {
        // 创建一个并发链式双向队列
        LinkedBlockingDeque<String> list = new LinkedBlockingDeque<>(3);

        Client client = new Client(list);
        Thread thread = new Thread(client);
        thread.start();

        for (int i = 0; i < 5; i++) {
            for (int j = 0; j < 3; j++) {
                String request = list.take();
                System.out.printf("Main: Request: %s at %s. Size: %d\n", request, new Date(), list.size());
            }
            TimeUnit.MILLISECONDS.sleep(300);
        }

        System.out.printf("Main: End of the program.\n");
    }
}




图6.3-1 运行结果

  本节使用的泛型参数是String的LinkedBlockingDeque对象,用来实现一个阻塞式并发数据列表。

Client类使用put()方法将字符串插入到列表中。如果列表已满(列表生成时指定了固定的容量),调用这个方法的线程将被阻塞直到列表中有了可用的空间。

Main类使用take()方法从列表中取字符串。如果列表为空,调用这个方法的线程将被阻塞直到列表不为空(即有可用的元素)。

  这个例子中使用了 LinkedBlockingDeque对象的两个方法,调用它们的线程可能会被阻塞,在阻塞时如果线程被中断,方法会抛出InterruptedException异常,所以必须捕获和处理这个异常。

LinkedBlockingDeqire类也提供了其他存取元素的方法,这些方法不会引起阻塞,而是抛出异常或返回null。

  ♦ takeFirst()和takeLast():分别返回列表中第一个和最后一个元素,返回的元素会从列表中移除。如果列表为空,调用方法的线程将被阻塞直到列表中有可用的元素出现.

  ♦ getFirst()和getLast():分别返回列表中第一个和最后一个元素,返回的元素不会从列表中移除。如果列表为空,则抛出NoSuchElementExcpetinon异常。

  ♦ peek()、peekFirst()和peekLast():分别返回列表中第一个和最后一个元素,返回的元素不会从列表中移除。如果列表为空,返回null。

  ♦ poll()、pollFirst()和pollLast():分别返回列表中第一个和最后一个元素,返回的元素将会从列表中移除。如果列表为空,返回null。

  ♦ add()、addFirst()和addLast():分别将元素添加到列表中第一位和最后一位。如果列表已满(列表生成时指定了固定的容量),这些方法将抛出IllegalStateException异常。

6.4使用按优先级排序的阻塞式线程安全列表

  数据结构应用中的一个经典需求是实现一个有序列表。Java引入了 PriorityBlockingQueue类来满足这类需求。

  所有添加进PriorityBlockingQueue的元素必须实现Comparable接口。这个接口提供了 compareTo()方法,它的传入参数是一个同类型的对象。这样就有了两个同类型的对象并且相互比较:其中一个是执行这个方法的对象,另一个是参数传入的对象。这个方法必须返回一个数字值,如果当前对象小于参数传入的对象,那么返回一个小于0的值;如果当前对象大于参数传入的对象,那么返回一个大于0的值;如果两个对象相等就返回0。

  当插入元素时.PriortyBlockingQueue使用compareTo()方法来决定插入元素的位置。元素越大越靠后。

  PriorityBlockingQueue的另一个重要的特性是:它是阻塞式数据结构(BlockingData Structure)。当它的方法被调用并且不能立即执行时,调用这个方法的线程将被阻塞直到方法执行成功。

  在本节,你将学习如何使用PriorityBlockingQueue类。在范例中我们将大量不同优先级的事件存放到同一个列表中,并且检査队列是否按预期排序。

package com.concurrency.task;

import com.sun.istack.internal.NotNull;

/**
 * 事件类,存储一个件事的属性,它包括一个事件的优先级,这个类实现了Comparable接口的方法,
 * 用来确定哪个事件对象的优先级更高
 */
public class Event implements Comparable<Event> {

    /**
     * 线程编号
     */
    private int thread;
    /**
     * 线程优先级
     */
    private int priority;

    /**
     * 构造构造,用于初始化属性
     *
     * @param thread   产生事件的线程编号
     * @param priority 事件的优先级
     */
    public Event(int thread, int priority) {
        this.thread = thread;
        this.priority = priority;
    }

    /**
     * 获取线程编号
     *
     * @return 线程编号
     */
    public int getThread() {
        return thread;
    }

    /**
     * 获取线程优先级
     *
     * @return 线程优先级
     */
    public int getPriority() {
        return priority;
    }

    /**
     * 比较那个线程的优先级更高
     */
    @Override
    public int compareTo(Event e) {
        if (this.priority > e.getPriority()) {
            return -1;
        } else if (this.priority < e.getPriority()) {
            return 1;
        } else {
            return 0;
        }
    }
}


package com.concurrency.task;

import java.util.concurrent.PriorityBlockingQueue;

/**
 * 任务对象,生成1000个事件,并且将其存放在一个优先队列中
 */
public class Task implements Runnable {

    /**
     * 任务编号
     */
    private int id;

    /**
     * 存储事件的优先队列
     */
    private PriorityBlockingQueue<Event> queue;

    /**
     * 构造函数,初始化属性
     *
     * @param id    任务编号
     * @param queue 存储事件的优先队列
     */
    public Task(int id, PriorityBlockingQueue<Event> queue) {
        this.id = id;
        this.queue = queue;
    }

    /**
     * 核心方法,生成1000个事件,并且将其存放在一个优先队列中
     */
    @Override
    public void run() {
        for (int i = 0; i < 1000; i++) {
            Event event = new Event(id, i);
            queue.add(event);
        }
    }
}


package com.concurrency.core;

import com.concurrency.task.Event;
import com.concurrency.task.Task;

import java.util.concurrent.PriorityBlockingQueue;

public class Main {
    public static void main(String[] args) {

        // 存储事件的优先级队列
        PriorityBlockingQueue<Event> queue = new PriorityBlockingQueue<>();

        // 存储5个线程对象的数组
        Thread taskThreads[] = new Thread[5];

        // 创建5个线程运行5个任务,每个任务创建1000事件对象
        for (int i = 0; i < taskThreads.length; i++) {
            Task task = new Task(i, queue);
            taskThreads[i] = new Thread(task);
        }

        //  启动5个线程
        for (Thread taskThread : taskThreads) {
            taskThread.start();
        }

        // 等待5个线程完成
        for (Thread taskThread : taskThreads) {
            try {
                taskThread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // 输出事件信息
        System.out.printf("Main: Queue Size: %d\n", queue.size());
        for (int i = 0; i < taskThreads.length * 1000; i++) {
            Event event = queue.poll();
            System.out.printf("Thread %s: Priority %d\n", event.getThread(), event.getPriority());
        }
        System.out.printf("Main: Queue Size: %d\n", queue.size());
        System.out.printf("Main: End of the program\n");
    }
}




图6.4-1 部分运行结果

  本节使用了PriorityBlockingQueue类实现了一个含有Event对象的优先级队列。在本节开篇时我们提到过,PriorttyBlockingQueue中存放的所有元素都必须实现 Comparable接口,所以Event类也实现了 compareTo()方法。

  所有event都有优先级属性。带有最高优先级值的元素是队列中第一个元素。当实现 compareTo()方法时,如果event本身的优先级值高于作为参数的event的优先级值,结果返回-1。另一方面,如果event本身的优先级值低于作为参数的event的优先级值,结果返回1。如果两个对象的优先级值相等,结果返回0。在返回值为0的情况下, PriorityBlockingQueue类不保证元素的次序。

  Task类添加Event对象到优先级队列中。每个task对象使用add()方法添加1,000个 event到队列中,优先级值在0到999之间。

  Main类的maln()方法创建了 5个Task对象并在对应线程中执行。当所有线程执行完时。将所有的元素输出到了控制台。为了从队列中取元素,线程使用了put()方法。它返回队列中的第一个元素并将其移除。

  PriorityBlockingQueue类还提供了其他方法。

  ♦ clear():移除队列中的所有元素。

  ♦ take():返回队列中的第一个元素并将其移除。如果队列为空,线程阻塞直到队列中有可用的元素。

  ♦ put(E e): E是PriorityBlockingQueue的泛型参数,表示传入参数的类型。这个方法把参数对应的元素插入到队列中。

  ♦ peek():返回队列中的第一个元素,但不将其移除。

6.5使用带有延迟元素的线程安全列表

  Java API提供了一种用于并发应用的有趣的数据结构,即DelayQueue类。这个类可以存放带有激活日期的元素。当调用方法从队列中返回或提取元素时,未来的元素日期将被忽略。这些元素对于这些方法是不可见的。

  为了具有调用行为,存放到DelayQueue类中的元素必须继承Delayed接口。Delayed接口使对象成为延迟对象,它使存放在DelayQueue类中的对象具有了激活日期,即到激活日期的时间。该接口强制执行下列两个方法。

  ♦ compareTo(Delayed o): Delayed 接口继承了Comparable 接口,因此有了这个方法。如果当前对象的延迟值小于参数对象的值,将返回一个小于0的值:如果当前对象的延迟值大于参数对象的值,将返回一个大于0的值:如果两者的延迟值相等则返回0。

  ♦ getDelay(TimeUnit unit):这个方法返回到激活日期的剩余时间,单位由单位参数指定。TimeUint类是一个由下列常量组成的枚举类型:DAYS、HOURS、MICROSECONDS、 MIIXISECONDS、MINUTES、NANOSECONDS 和 SECONDS。

  本例中,将会学习如何使用DelayQueue类来存放具有不同激活日期的event。

package com.concurrency.task;

import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * 事件类,实现了延迟队列接口
 */
public class Event implements Delayed {

    /**
     * 激活事件的时间
     */
    private Date startDate;

    /**
     * 构造函数
     *
     * @param startDate 激活事件的时间
     */
    public Event(Date startDate) {
        this.startDate = startDate;
    }

    /**
     * 比较两个事件
     */
    @Override
    public int compareTo(Delayed o) {
        long result = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
        if (result < 0) {
            return -1;
        } else if (result > 0) {
            return 1;
        }
        return 0;
    }

    /**
     * 返回离激活时间还剩余的毫秒数
     */
    @Override
    public long getDelay(TimeUnit unit) {
        Date now = new Date();
        long diff = startDate.getTime() - now.getTime();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

}


package com.concurrency.task;

import java.util.Date;
import java.util.concurrent.DelayQueue;

/**
 * 任务类,其事件存储在一个延迟队列中
 */
public class Task implements Runnable {

    /**
     * 任务编号
     */
    private int id;

    /**
     * 存储事件的任务队列
     */
    private DelayQueue<Event> queue;

    /**
     * 构造函数,初始化属性
     *
     * @param id    任务编号
     * @param queue 存储事件的任务队列
     */
    public Task(int id, DelayQueue<Event> queue) {
        this.id = id;
        this.queue = queue;
    }

    /**
     * 核心方法,产生1000事件,每个事件有相同的激活时间,将这些事件存储在延迟队列中
     */
    @Override
    public void run() {

        Date now = new Date();
        Date delay = new Date();
        delay.setTime(now.getTime() + (id * 1000));

        System.out.printf("Thread %s: %s\n", id, delay);

        for (int i = 0; i < 100; i++) {
            Event event = new Event(delay);
            queue.add(event);
        }
    }

}




图6.5-1 运行结果

  在本节中我们实现了Event类。它只有一个属性,即对象的激活日期,因为继承了 Delayed接口,所以Event对象可以存放到DelayQueue队列中,

  getDelay()方法用来计算激活日期和实际日期之间的毫秒数。这两个日期都是Date 类的对象,并使用日期对象的getTime()方法将日期转化为毫秒数后进行比较,然后通 过getDelay()方法的传入参数TimeUnit的comvert()方法,将时间间隔转化为event激活时间的剩余这毫秒数□DelayQueue类本身是使用纳秒工作的,但是对于使用者来讲,是透明的。

  如果当前对象的延迟值小于参数对象的值,compareTo()方法将返回一个小于0的值; 如果当前对象的延迟值大于参数对象的值,则返回一个大于0的值:如果两者的延迟值相等则返回0。

Task类已被实现,这个类有一个名为id的整型属性。当Task对象执行时,它添加与task id相同数量的秒数到

  实际日期,作为DelayQueiie类中当前task存放的event的激活日期。每个Task对象使用add()方法在队列中存放100个event。

  最后,在Main类的main()方法中,创建5个Task对象,并在对应线程中执行。当执行完时,使用poll()方法输出所有到控制台,poll()方法提取并移除队列中的第一个元素,如果队列中没有活动的元素,此方法返回null值。每调用一次put()方法,如果返回 一个Event对象,计数器加1。当调用poll()返回null值时,输出计数器中的值到控制台, 使线程体眠半秒钟以等待更多活动的当队列中存放了 500个event时,程序执行完毕。

  DelayQueue类还提供了其他一些方法。

  ♦ clear():移除队列中的所有元素。

  ♦ offer(E e): E是DelayQueue的泛型参数,表示传入参数的类型。这个方法把参数对应的元素插入到队列中。

  ♦ put():返回队列中的第一个元素,但不将其移除。

  ♦ take():返回队列中的第一个元素,并将其移除。如果队列为空,线程将被阻塞直到队列中有可用的元素。

6.6使用线程安全可遍历映射

  Java API还提供了一种用于并发应用程序中的有趣数据结构,即ConcurrentNavigableMap 接口及其实现类。实现这个接口的类以如下两部分存放元素:

  ♦ 一个键值(Key),它是元素的标识并且是唯一的;

  ♦元素其他部分数据。

  每一个组成部分都必须在不同的类中实现。

  Java API 也提供了一个实现 ConcurrentSkipListMap 接口的类,ConcurrentSkipListMap 接口实现了与ConcurrentNavigableMap接口有相同行为的一个非阻塞式列表。从内部实现机制来讲,它使用了一个Skip List来存放数据。Skip List是基于并发列表的数据结构, 效率与二叉树相近。有了它,就有了一个数据结构,比有序列表在添加、搜索或删除元素 时耗费更少的访问时间。

  备注:Skip List 由 William Pugh 在 1990 年引入,详见

http://www.cs.umd.edu/~pugh/

http://opendatastructures.org/versions/edition-0.1e/ods-java/4_Skiplists.html

  当你插入元素到映射中时,ConcurrentSkipListMap接口类使用键值来排序所有元素。 除了提供返回一个具体元素的方法之外,这个类也提供获取子映射的方法。

  本节将要学习如何使用ConcurrentSkipListMap类实现对联系人对象的映射。

package com.concurrency.utils;

/**
 * 联系人类
 */
public class Contact {

    /**
     * 联系人的名称
     */
    private String name;

    /**
     * 联系人的电话
     */
    private String phone;

    /**
     * 构造函数
     *
     * @param name  联系人的名称
     * @param phone 联系人的电话
     */
    public Contact(String name, String phone) {
        this.name = name;
        this.phone = phone;
    }

    /**
     * 获取 联系人的名称
     *
     * @return 联系人的名称
     */
    public String getName() {
        return name;
    }

    /**
     * 获取联系人的电话
     *
     * @return 联系人的电话
     */
    public String getPhone() {
        return phone;
    }
}


package com.concurrency.task;

import com.concurrency.utils.Contact;

import java.util.concurrent.ConcurrentSkipListMap;

/**
 * 任务类,将联系人存储在一个可遍历的图中
 */
public class Task implements Runnable {

    /**
     * 存储联系人的可遍历的映射
     */
    private ConcurrentSkipListMap<String, Contact> map;

    /**
     * 任务编号
     */
    private String id;

    /**
     * 构造函数,初始化属性
     *
     * @param map 存储联系人的可遍历的映射
     * @param id  任务编号
     */
    public Task(ConcurrentSkipListMap<String, Contact> map, String id) {
        this.id = id;
        this.map = map;
    }

    /**
     * 核心方法,产生1000个联系人,并且交它们存储在一个可遍历的映射中
     */
    @Override
    public void run() {
        for (int i = 0; i < 1000; i++) {
            Contact contact = new Contact(id, String.valueOf(i + 1000));
            map.put(id + contact.getPhone(), contact);
        }
    }
}


package com.concurrency.core;

import com.concurrency.task.Task;
import com.concurrency.utils.Contact;

import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

public class Main {
    public static void main(String[] args) {
        // 创建一个可遍历的映射对象
        ConcurrentSkipListMap<String, Contact> map;
        map = new ConcurrentSkipListMap<>();

        // 创建长度为25的线程数组
        Thread threads[] = new Thread[25];
        int counter = 0;

        // 在25个不同的线程中执行25个任务
        for (char i = 'A'; i < 'Z'; i++) {
            Task task = new Task(map, String.valueOf(i));
            threads[counter] = new Thread(task);
            threads[counter].start();
            counter++;
        }

        // 等待任务执行完成
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // 输出映射的大小
        System.out.printf("Main: Size of the map: %d\n", map.size());

        // 保存映射条目的对象
        Map.Entry<String, Contact> element;
        // 保存联系人的对象
        Contact contact;

        element = map.firstEntry();
        contact = element.getValue();
        System.out.printf("Main: First Entry: %s: %s\n", contact.getName(), contact.getPhone());

        // 输出最后一个映射条目
        element = map.lastEntry();
        contact = element.getValue();
        System.out.printf("Main: Last Entry: %s: %s\n", contact.getName(), contact.getPhone());

        // 输出映射的字集
        System.out.printf("Main: Submap from A1996 to B1002: \n");
        ConcurrentNavigableMap<String, Contact> submap = map.subMap("A1996", "B1002");
        do {
            element = submap.pollFirstEntry();
            if (element != null) {
                contact = element.getValue();
                System.out.printf("%s: %s\n", contact.getName(), contact.getPhone());
            }
        } while (element != null);
    }
}




图6.6-1 运行结果

  本节中,我们实现了用Task类在一个可遍历的映射中存放Contect对象。每一个contact对象都有一个名称,即创建的task对象的标识ID,和一个介于1,000到2,000的电话号码.拼接这两个值作为contact对象的键。每个Task对象将生成1,000个contact对 象,并使用put()方法将它们存放在可遍历的映射中。

备注:如果插入的元素的键值已经存在,就用新插入的值覆盖已有的值。

  Main类的main()方法创建了 25个Task对象,使用介于A到Z之间的字符作为ID, 接下来,使用其提供的方法从映射中获取数据,firstEntry()方法返回一个Map.Entry对象, 含有映射中的第一个元素。这个方法不会从映射中移除元素。Map.Entry对象包含键值和元素。使用getValue()方法就能够获取元素。使用getKey()就能够获取元素的键值。

  lastEntry()方法返回一个Map.Entry对象,含有映射中的最后一个元素。subMap()方法返回含有映射中部分元素的ConcurrentNavigableMap对象。在本例中,元素的键值介于A1996到B1002之间。可以使用pollFirst()方法来处理subMap()方法获取的元素。这个方法会返回并移除子映射中的第一个Map.Entry对象。

  ConcurrentSkipListMap类还提供了其他的方法。

  ♦ headMap(K toKey): K是在ConcurrentSkipListMap对象的泛型参数里用到的

键。这个方法返回映射中所有键值小于参数值toKey的子映射。

  ♦ tailMap(K fromKey): K 是在 ConcurreirtSkipListMap 对象的泛型参数里用到的键。这个方法返回映射中所有键值大于参数值fromKey的子映射。

  ♦ putIfAbsent(K key, V value):如果映射中不存在键key,那么就将key和value保存到映射中。

6.7生成并发随机数

  Java并发API提供了一个特殊类用以在并发程序中生成伪随机数(Pseudo-Random Number),即Java 7新引入的ThreadLocalRandom类。它是线程本地变量。每个生成随机数的线程都有一个不同的生成器,但是都在同一个类中被管理,对程序员来讲是透明的。相比于使用共享的Random对象为所有线程生成随机数,这种机制具有更好的性能。

  在本节,你将学习如何使用ThreadlocalRaiidom类在并发应用中生成随机数。

package com.concurrency.task;

import java.util.concurrent.ThreadLocalRandom;

/**
 * 产生随机数的任务类
 */
public class TaskLocalRandom implements Runnable {

    /**
     * 构造函数,初始化当前类的随机数生成对象
     */
    public TaskLocalRandom() {
        ThreadLocalRandom.current();
    }

    /**
     * 核心方法,生成一个[0, 10)的随机数
     */
    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        for (int i = 0; i < 10; i++) {
            System.out.printf("%s: %d\n", name, ThreadLocalRandom.current().nextInt(10));
        }
    }
}


package com.concurrency.core;

import com.concurrency.task.TaskLocalRandom;

public class Main {
    public static void main(String[] args) {
        // 长度为3的线程数组
        Thread threads[] = new Thread[3];

        // 创建线程并且运行任务
        for (int i = 0; i < threads.length; i++) {
            TaskLocalRandom task = new TaskLocalRandom();
            threads[i] = new Thread(task);
            threads[i].start();
        }
    }
}




图6.7-1 部分运行结果

  本例的核心是TaskLocalRandom类。在类的构造器中,调用了 TaskLocalRandom类的current()方法。current()方法是一个静态方法,返回与当前线程关联的 TaskLocalRandom对象,所以可以使用这个对象生成随机数。如果调用这个方法的线程还没有关联随机数对象,就会生成一个新的。在本例中,使用这个方法初始化与本任务关联的随机数生成器,所以它将在下一次调用这个方法时被创建。

  在TasklocalRandom类的run()方法中,调用current()获取与本线程关联的随机数生成器,同时也调用了nextInt()方法并以数字10作为传入参数。这个方法返回一个介于0 到10之间的伪随机数。每个任务生成10个随机数。

  TaskLocalRandom类也提供了方法来生成long、float和double数字和Boolean值;还可以为方法指定一个数字作为输入参数,来生成介于0与该数字之间的随机数;还可以为方法指定两个数字作为输入参数,来生成介于两个参数之间的随机数。

6.8使用原子变量

  原子变量(Atomic Variable)是从Java 5开始引入的,它提供了单个变量上的原子操作。在编译程序时,Java代码中的每个变量、每个操作都将被转换成机器可以理解的指令。 例如,当给一个变量賦值时,在Java代码中只使用一个指令,但是编译这个程序时,指令被转换成JVM语言中的不同指令。当多个线程共享同一个变量时,就会发生数据不一致的错误。

  为了避免这类错误,Java引入了原子变量。当一个线程在对原子变量操作时,如果其他线程也试图对同一原子变量执行操作,原子变量的实现类提供了一套机制来检查操作是否在一步内完成。一般来说,这个操作先获取变量值,然后在本地改变变量的值,然后试图用这个改变的值去替换之前的值。如果之前的值没有被其他线程改变,就可以执行这个替换操作。否则,方法将再执行这个操作。这种操作称为CAS原子操作(Compare and Set)。

  原子变量不使用锁或其他同步机制来保护对其值的并发访问。所有操作都是基于CAS 原子操作的。它保证了多线程在同一时间操作一个原子变量而不会产生数据不一致的错误,并且它的性能优于使用同步机制保护的普通变量。

package com.concurrency.task;

import java.util.concurrent.atomic.AtomicLong;

/**
 * 帐户类
 */
public class Account {

    /**
     * 帐户余额
     */
    private AtomicLong balance;

    public Account() {
        balance = new AtomicLong();
    }

    /**
     */
    /**
     * 获取帐户余额
     *
     * @return 帐户余额
     */
    public long getBalance() {
        return balance.get();
    }

    /**
     * 设置帐户余额
     *
     * @param balance 帐户余额
     */
    public void setBalance(long balance) {
        this.balance.set(balance);
    }

    /**
     * 增加余额
     *
     * @param amount 增加的数目
     */
    public void addAmount(long amount) {
        this.balance.getAndAdd(amount);
    }

    /**
     * 减少余额
     *
     * @param amount 减少的数目
     */
    public void subtractAmount(long amount) {
        this.balance.getAndAdd(-amount);
    }

}


package com.concurrency.task;

/**
 * 公司类,模拟向一个帐户存钱
 */
public class Company implements Runnable {

    /**
     * 帐户对象
     */
    private Account account;

    /**
     * 构造函数,初始化帐户属性
     *
     * @param account 帐户对象
     */
    public Company(Account account) {
        this.account = account;
    }

    /**
     * 核心方法,存钱
     */
    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            account.addAmount(1000);
        }
    }

}


package com.concurrency.core;

import com.concurrency.task.Account;
import com.concurrency.task.Bank;
import com.concurrency.task.Company;

public class Main {
    public static void main(String[] args) {
        // 创建一个帐户对象
        Account account = new Account();
        // 初始化帐户余额
        account.setBalance(1000);

        // 创建一个公司对象,并且将公司对象放到线程中去运行
        Company company = new Company(account);
        Thread companyThread = new Thread(company);
        // 创建一个银行对象,并且将银行对象放到线程中去运行
        Bank bank = new Bank(account);
        Thread bankThread = new Thread(bank);

        // 输出帐户对象最初的信息
        System.out.printf("Account : Initial Balance: %d\n", account.getBalance());

        // 启动线程
        companyThread.start();
        bankThread.start();

        try {
            // 等待线程完成
            companyThread.join();
            bankThread.join();
            // 输出余额
            System.out.printf("Account : Final Balance: %d\n", account.getBalance());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}




图6.8-1 运行结果

  本例的核心在Account类中。该类声明了名为balance的AtomicLong变量来存放帐户余额,然后实现对余额的存和取操作。为了获得余额,我们实现了 getBalance()方法, 它使用了 AtomicLong类的get()方法。为了设置余额,我们实现了 setBalance()方法,它使用了 Atomiclong类的set()方法。为了增加余额,我们实现了 addAmount()方法,它使用了 AtomicLong类的getAndAdd()方法,这个方法返回增加指定参数值后的余额值。最后,为了减少余额,我们实现了 subtractAmount()方法,它也使用了 AtomicLong类的 getAndAdd() 方法。

  接下来,实现两个不同的任务类。

  ♦ Company类模拟一家公司,它将增加账户余额。这个类将执行10次,每次将余 额增加1,000。

  ♦ Bank类模拟一家银行,它将从账户中取钱,这个类执行10次,每次将余额减少1,000。

在Main类中,创建Account对象余额为1,000。然后执行一个bank线程和一个company线程,执行完成后,最终账户余额与初始余额应该相同。

  执行完程序,我们会发现最终账户余额与初始值相同。

  Java还提供了其它的原子类,AtomicBoolean,AtomicInteger和AtomicReference是原子类的其它实现类。

6.9使用原子数组

  当实现一个并发应用时,将不可避免地会有多线程共享一个或多个对象的现象,为了避免数据不一致错误,需要使用同步机制(如锁或synchronized关键字)来保护对这些共享属性的访问。但是,这些同步机制存在下列问题。

  ♦死锁: 一个线程被阻塞,并且试图获得的锁正被其他线程使用,但其他线程永远不会释放这个锁。这种情况使得应用不会继续执行,并且永远不会结束。

  ♦即使只有一个线程访问共享对象,它仍然需要执行必须的代码来获取和释放锁。

针对这种情况,为了提供更优的性能,Java于是引入了比较和交换操作 (Compare-and-Swap Operation)。这个操作使用以下三步修改变量的值。

  1. 取得变量值,即变量的旧值。

  2. 在一个临时变量中修改变量值,即变量的新值。

  3. 如果上面获得的变量旧值与当前变量值相等,就用新值替换旧值。如果已有其他线程修改了这个变量的值,上面获得的变量的旧值就可能与当前变量值不同。

  采用比较和交换机制不需要使用同步机制,不仅可以避免死锁并且性能更好。

  Java在原子变量(Atomic Variable)中实现了这种机制。这些变量提供了实现比较和交换操作的compareAndSet()方法,其他方法也基于它展开。

  Java也引入了原子数组(Atomic Array)提供对integer或long数字数组的原子操作本节将学习如何使用AtomicIntegerArray类的原子数组。

package com.concurrency.task;

import java.util.concurrent.atomic.AtomicIntegerArray;

/**
 * 加法器,将数组中的每元素增加指定个单位
 */
public class Incrementer implements Runnable {

    /**
     * 要执行加法的数组
     */
    private AtomicIntegerArray vector;

    /**
     * 构造函数
     *
     * @param vector 要执行加法的数组
     */
    public Incrementer(AtomicIntegerArray vector) {
        this.vector = vector;
    }

    /**
     * 核心方法,将数组中的每元素增加指定个单位
     */
    @Override
    public void run() {

        for (int i = 0; i < vector.length(); i++) {
            vector.getAndIncrement(i);
        }

    }

}


package com.concurrency.task;

import java.util.concurrent.atomic.AtomicIntegerArray;

/**
 * 减法器,将数组中的每元素减少指定个单位
 */
public class Decrementer implements Runnable {

    /**
     * 要执行减法的数组
     */
    private AtomicIntegerArray vector;

    /**
     * 构造函数
     *
     * @param vector 要执行减法的数组
     */
    public Decrementer(AtomicIntegerArray vector) {
        this.vector = vector;
    }

    /**
     * 核心方法, 将数组中的每元素减少指定个单位
     */
    @Override
    public void run() {
        for (int i = 0; i < vector.length(); i++) {
            vector.getAndDecrement(i);
        }
    }

}


package com.concurrency.core;

import com.concurrency.task.Decrementer;
import com.concurrency.task.Incrementer;

import java.util.concurrent.atomic.AtomicIntegerArray;

public class Main {
    public static void main(String[] args) {
        // 线程个数
        final int THREADS = 100;
        // 原子数组对象,它有1000个元素
        AtomicIntegerArray vector = new AtomicIntegerArray(1000);
        // 创建一个加法器对象
        Incrementer incrementer = new Incrementer(vector);
        // 创建一个减法器对象
        Decrementer decrementer = new Decrementer(vector);

        // 创建并且执行100个加法线程和100个减法线程
        Thread threadIncrementer[] = new Thread[THREADS];
        Thread threadDecrementer[] = new Thread[THREADS];
        for (int i = 0; i < THREADS; i++) {
            threadIncrementer[i] = new Thread(incrementer);
            threadDecrementer[i] = new Thread(decrementer);

            threadIncrementer[i].start();
            threadDecrementer[i].start();
        }

        // 等待所有的任务完成
        for (int i = 0; i < THREADS; i++) {
            try {
                threadIncrementer[i].join();
                threadDecrementer[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // 输出不为0的元素
        for (int i = 0; i < vector.length(); i++) {
            if (vector.get(i) != 0) {
                System.out.println("Vector[" + i + "] : " + vector.get(i));
            }
        }

        System.out.println("Main: End of the example");
    }
}




图6.9-1 运行结果

  本例使用AtomicIntegerArray对象实现了下面两个不同的任务。

  ♦ Incrementer任务:这个类使用getAndIncrement()方法增加数组中所有元素的值。

  ♦ Decrementer任务:这个类使用getAndDecrcment()方法减少数组中所有元素的值。

  在 Main 类中,创建了 1,000 个元素的 AtomiclirtegerArray 数组,执行了 100 个 Incrementer任务和100个Decrementei•任务。在任务的结尾,如果没有不一致的错误,数组中的所有元素值都必须是0。执行程序后将会看到,程序只将最后的消息打印到控制台,因为所有元素值为0。

  现今,Java还提供了另一个原子数组类,即AtomicLongArray类,它的方法与 AtomicIntegerArray 类相同。

欢迎转载,转载请注明出处http://blog.csdn.net/DERRANTCM/article/details/48206123

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: