您的位置:首页 > 其它

第三章线程同步辅助类

2015-09-01 06:15 253 查看

Java 7 并发编程实战手册目录

代码下载(https://github.com/Wang-Jun-Chao/java-concurrency)

第三章线程同步辅助类

3.1简介

  ♦信号量(Semaphore):是一种计数器,用来保护一个或者多个共享资源的访问。它是并发编程的一种基础工具,大多数编程语言都提供了这个机制。

  ♦ CountDownLatch:是Java语言提供的同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许线程一直等待。

  ♦ CyclicBarrier也是Java语言提供的同步辅助类,它允许多个线程在某个集合点 (common point)处进行相互等待。

  

  ♦ Phaser:也是Java语言提供的同步辅助类。它把并发任务分成多个阶段运行,在开始下一阶段之前,当前阶段中的所有线程都必须执行完成,这是API中的新特性。

  

  ♦ Exchanger:也是Java语言撤的同步辅助类。它提供了两个线程之间的数据交换点。

  

  在应用程序中,任何时候都可以使用Semaphore来保护临界区,因为它是一个基础的同步机制。而其他的同步机制,则需要根据各自的上述特性来对其选择使用.所以我们需 要根据应用程序的特性来选择合适的同步机制。

  

3.2资源的并发访问控制

  在本节中,我们将学习如何使用Java语言(Semaphore) 机制。信号量是一种计数器,用来保护一个或者多个共享资源的访问。

如果线程要访问一个共享资源,它必须先获得信号量。如果信号量的内部计数器大于0 ,信号量将减1 ,然后允许访问这个共享资源。计数器大于0意味着有可以使用的资源, 因此线程将被允许使用其中一个资源。

  否则,如果信号量的计数器等于0 ,信号量将会把线程置入休眠直至计数器大于0。计数器等于0的时候意味着所有的共享资源已经被其他线程使用了,所以需要访问这个共享资源的线程必须等待。

  当线程使用完某个共享资源时,信号量必须被释放,以便其他线程能够访问共享资源。释放操作将使信号量的内部计数器增加1。

本节中,我们将学习如何使用信号量类Semaphore来实现二进制信号量(Binary Semaphore)。二进制信号量是一种比较特殊的信号量,用来保护对唯一共享资源的访问, 因而它的内部计数器只有0和1两个值。为了演示它的使用方式,我们将实现一个打印队列,并发任务将使用它来完成打印。这个打印队列受二进制信号量保护,因而同时只有一个线程可以执行打印。

package com.concurrency.task;

import com.concurrency.core.Main;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * 打印队列类,使用信号量来控制打钱作业的访问
 */
public class PrintQueue {
    /**
     * 信号量,控制队列的访问
     */
    private final Semaphore semaphore;

    /**
     * 构造函数,初始化信号量
     */
    public PrintQueue() {
        semaphore = new Semaphore(1);
    }

    /**
     * 模拟文档打印的方法
     *
     * @param document 需要打印的对象
     */
    public void printJob(Object document) {
        try {
            // 请求信号量,如果已经被其它线程请求过,则当前请求的线程会休眠,直到获得这个信号量
            semaphore.acquire();

            long duration = (long) (Math.random() * 10);
            System.out.printf("%s: PrintQueue: Printing a Job during %d seconds\n", Thread.currentThread().getName(), duration);
            Thread.sleep(duration);
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 释放信号量,如果有其它的线程在请求这个信号量,JVN会选择其中的某一个程获取信号量,让其运行
            semaphore.release();
        }
    }
}


package com.concurrency.task;

public class Job implements Runnable {
    /**
     * 打印队列对象
     */
    private PrintQueue printQueue;

    /**
     * 构造函数,初始化打印队列对象
     *
     * @param printQueue 打印队列对象
     */
    public Job(PrintQueue printQueue) {
        this.printQueue = printQueue;
    }

    /**
     * 核心方法,向打印队列中发送打印任务,并且等待它完成
     */
    @Override
    public void run() {
        System.out.printf("%s: Going to print a job\n", Thread.currentThread().getName());
        printQueue.printJob(new Object());
        System.out.printf("%s: The document has been printed\n", Thread.currentThread().getName());
    }
}


package com.concurrency.core;

import com.concurrency.task.Job;
import com.concurrency.task.PrintQueue;

public class Main {
    public static void main(String[] args) {
        // 创建一个打印队列对象
        PrintQueue printQueue = new PrintQueue();

        // 创建10个线程
        Thread thread[] = new Thread[10];
        for (int i = 0; i < 10; i++) {
            thread[i] = new Thread(new Job(printQueue), "Thread " + i);
        }

        // 启动10个线程
        for (int i = 0; i < 10; i++) {
            thread[i].start();
        }
    }
}




图3-2.1 运行结果

  这个范例的核心部分是打印队列类PrintQueue的printJob()方法。它指出了使用信号量实现临界区必须遵循的三个步骤,从而保护对共享资源的访问:

  首先,必须通过acquire()方法获得信号量;

  其次,使用共享资源执行必要的操作;

  最后,须通过release()方法释放信号量。

  这个范例的另一个要点是打印队列类PrintQueue的构造器,它初始化了信号量对象。范例中将1作为传入参数,所以创建的就是二进制信号量。信号量的内部计数器初始值是 1,所以它只能保护一个共享资源的访问,如本例中的打印队列。

当启动10个线程时,第一个获得信号量的线程将能够访问临界区,其余的线程将被信号量阻塞,直到信号量被释放。一旦信号量被释放,信号量将选择一个正在等待的线程并 且允许它访问临界区,从而所有的工作都将一个接一个地打印它们的文档。

  Semaphore类还有其他两种acquire()方法。

  ♦ acquireUninterruptibly():它其实就是 acquire()方法。当信号量的内部计数器变成0的时候,信号量将阻塞线程直到其被释放。线程在被阻塞的这段时间中,可能会被中断,从而导致 acquire()方法抛出InterruptedException 异常。而 acquireUninterruptibly()方法会忽略线程的中断并且不会抛出任何异常-

  ♦ tryAcquire():这个方法试图获得信号量。如果能获得就返回true;如果不能,就返回false,从而避开线程的阻塞和等待信号量的释放。我们可以根据返回值是true还是ftase来做出恰当的处理。

  信号量的公平性

  在Java语言中,只要一个类可能出现多个线程被阻塞并且等待同步资源的释放(例如 信号量),就会涉及公平性概念.默认的模式是非公平模式。在这种模式中,被同步的资源被释放后,所有等待的线程中会有一个被选中来使用共享资源,而这个选择是没有任何条件的。公平模式则不然,它选择的是等待共享资源时间最长的那个线程。

  跟其他的类一样,Semaphore类的构造器也提供了第二个传入参数。这个参数是 boolean型的。如果传入false值,那么创建的信号量就是非公平模式的,与不使用这个参数的效果一样。如果传入true值,那么创建的信号量是公平模式的。

3.3资源的多副本的并发访问控制

  信号量也可以用来保护一个资源的多个副本,或者被多个线程同时执行的临界区。

  在本节中,我们将学习如何使用信号量来保护一个资源的多个副本。我们将实现这样 的范例:一个打印队列,它将被三个不同的打印机使用。

package com.concurrency.task;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 打印队列对象,它可以访问三台打印机,使用信号量来控制打印机的访问,当有一个打印作业,
 * 如果有空闲的打印杨就将作业分配到某个打印机上,否则就等待打印机空闲,再分配打印机
 */
public class PrintQueue {
    /**
     * 资源信号量,控制打印机的访问
     */
    private Semaphore semaphore;
    /**
     * 标记打印机是否空闲的数组
     */
    private boolean[] freePrinters;
    /**
     * 锁,控制打印机是否空闲的数组的访问
     */
    private Lock lockPrinters;

    /**
     * 构造函数,初始化变量
     */
    public PrintQueue() {
        semaphore = new Semaphore(3); // 资源信号量的个数为3,说明有3个打印机
        freePrinters = new boolean[3];
        for (int i = 0; i < freePrinters.length; i++) {
            freePrinters[i] = true;
        }

        lockPrinters = new ReentrantLock();
    }

    /**
     * 模拟文档打印的方法
     *
     * @param document 需要打印的对象
     */
    public void printJob(Object document) {
        try {
            // 请求信号量,如果有一个打印机是空闲的,就会访问其中一个空闲的打印机
            semaphore.acquire();

            // 获取分配的打印机编号
            int assignedPrinter = getPrinter();

            Long duration = (long) (Math.random() * 10);
            System.out.printf("%s: PrintQueue: Printing a Job in Printer %d during %d seconds\n", Thread.currentThread().getName(), assignedPrinter, duration);
            TimeUnit.SECONDS.sleep(duration);

            // 释放打印机
            freePrinters[assignedPrinter] = true;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 释放信号量
            semaphore.release();
        }
    }

    /**
     * 获取分配的打印机编号
     * @return   分配的打印机编号
     */
    private int getPrinter() {
        int ret = -1;
        try {
            // 获取打印机状态数组的访问能力
            lockPrinters.lock();
            // 查找第一个空闲的打印机
            for (int i = 0; i < freePrinters.length; i++) {
                if (freePrinters[i]) {
                    ret = i;
                    freePrinters[i] = false;
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 释放打印机状态数组的访问能力
            lockPrinters.unlock();
        }
        return ret;
    }
}


package com.concurrency.task;

public class Job implements Runnable {
    /**
     * 打印队列对象
     */
    private PrintQueue printQueue;

    /**
     * 构造函数,初始化打印队列对象
     *
     * @param printQueue 打印队列对象
     */
    public Job(PrintQueue printQueue) {
        this.printQueue = printQueue;
    }

    /**
     * 核心方法,向打印队列中发送打印任务,并且等待它完成
     */
    @Override
    public void run() {
        System.out.printf("%s: Going to print a job\n", Thread.currentThread().getName());
        printQueue.printJob(new Object());
        System.out.printf("%s: The document has been printed\n", Thread.currentThread().getName());
    }
}


package com.concurrency.core;

import com.concurrency.task.Job;
import com.concurrency.task.PrintQueue;

public class Main {
    public static void main(String[] args) {
        // 创建一个打印对队对象
        PrintQueue printQueue = new PrintQueue();

        // 创建12个线程,运行作业任务,这些任务都向同一个打印队列对象发出打印请求
        Thread thread[] = new Thread[12];
        for (int i = 0; i < 12; i++) {
            thread[i] = new Thread(new Job(printQueue), "Thread " + i);
        }

        // 启动12个线程
        for (int i = 0; i < 12; i++) {
            thread[i].start();
        }
    }
}




图3.3-1 运行结果

  范例的核心是打印队列类PrintQueue,在它的构造器中使用3作为传入参数来创建信号量对象。在本例中,最开始调用acquire()方法的3个线程将获得对临界区的访问, 其余的线程将被阻塞。当一个线程完成了对临界区的访问并且释放了信号量,另一个线程将获得这个信号量。

  在临界区代码中,线程获得可以分配打印工作的打印机编号。范例中的这部分代码只是为了给出更完整的实现,并没有使用信号量相关的代码。

  Acquire()、acquireUninterruptibly()、tryAcquire()和 release()方法都有另一种实现方式,即提供了一个int型的传入参数,这个参数声明了线程试图获取或者释放的共享资源数目,也就是这个线程想要在信号量内部计数器上删除或增加的数目。对于acquire(). acquireUninterruptibly(). tryAcquire方法来讲,如果计数器的值少于参数对应的值,那么线程将被阻塞直到计数器重新累加到或者超过这个值。

3.4等待多个并发事件的完成

  Java并发API提供了CountDownLatch类,它是一个同步辅助类。在完成一组正在其他线程中执行的操作之前,它允许线程一直等待。这个类使用一个整数进行初始化,这个整数就是线程要等待完成的操作的数目。当一个线程要等待某些操作先执行完时,需要调用await()方法,这个方法让线程进入体眠直到等待的所有操作都完成。当某一个操作完成后,它将调用countDown()方法将CountDownLatch类的内部计数器减1。当计数器变成0的时候,CountDownLatch类将唤醒所有调用await()方法而进入休眠的线程。

  在本节中,我们将学习如何使用CountDomiLatch类实现视频会议系统。这个视频会议系统将等待所有的参会者都到齐才开始。

package com.concurrency.task;

import java.util.concurrent.CountDownLatch;

/**
 * 视频会类
 * 使用倒计时闩来控制所有参与者都到达后才发生事件
 */
public class VideoConference implements Runnable {
    /**
     * 倒计时闩对象
     */
    private final CountDownLatch controller;

    /**
     * 构造函数,初始化倒计时闩
     * @param number 参与者人数
     */
    public VideoConference(int number) {
        controller = new CountDownLatch(number);
    }

    /**
     * 每个参与到视频会议的都会调用此方法
     * @param name 参与者
     */
    public void arrive(String name) {
        System.out.printf("%s has arrived.\n", name);
        controller.countDown();
        System.out.printf("VideoConference: Waiting for %d participants.\n", controller.getCount());
    }

    /**
     * 核心方法,当所有参与者都到达了,就开始视频仁义
     */
    @Override
    public void run() {
        System.out.printf("VideoConference: Initialization: %d participants.\n", controller.getCount());
        try {
            // Wait for all the participants
            controller.await();
            // Starts the conference
            System.out.printf("VideoConference: All the participants have come\n");
            System.out.printf("VideoConference: Let's start...\n");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
package com.concurrency.task;

import java.util.concurrent.TimeUnit;

/**
 * 参与者类
 */
public class Participant implements Runnable {
    /**
     * 视频会议对象
     */
    private VideoConference conference;
    /**
     * 参与者的名称(仅仅是为了记录使用)
     */
    private String name;

    /**
     * 构造函数
     *
     * @param conference 视频会议对象
     * @param name       参与者的名称
     */
    public Participant(VideoConference conference, String name) {
        this.conference = conference;
        this.name = name;
    }

    /**
     * 核心方法,等待一个随机时间就进入视频会议
     */
    @Override
    public void run() {
        long duration = (long) (Math.random() * 10);
        try {
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        conference.arrive(name);
    }
}


package com.concurrency.core;

import com.concurrency.task.Participant;
import com.concurrency.task.VideoConference;

public class Main {
    public static void main(String[] args) {
        // 创建一个视频会议对象,它有10个参与者
        VideoConference conference = new VideoConference(10);
        // 创建一个线程去运行视频会议
        Thread threadConference = new Thread(conference);
        threadConference.start();

        // 创建十个参与者,每个参与者在一个单独的线程中运行
        for (int i = 0; i < 10; i++) {
            Participant p = new Participant(conference, "Participant " + i);
            Thread t = new Thread(p);
            t.start();
        }
    }
}




图3.4-1 运行结果

  CountDownLatch类有三个基本元素;

  ♦ 一个初始值,

  ♦ await() 方法,

  ♦ countdown()方法,

  当创建CountDownLatch对象时,使用构造器来初始化内部计数器。当countDown()方法被调用后,计数器将减1。当计数器到达0的时候,CountDownLatch对象将唤起所有在await()方法上等待的线程。

  CouirtDownlatch对象的内部计数器被初始化之后就不能被再次初始化或者修改。一旦计数器被初始化后,唯一能改变参数值的方法是countDown()方法。当计数器到达0时,所有因调用await()方法而等待的线程立刻被唤醒,再执countDown()将不起任何作用。

  和其他同步方法相比,CountDownLatch机制有下述不同。

  ♦ CountDownLatch机制不是用来保护共享资源或者临界区的,它是用来同步执行多个任务的一个或者多个线程;

  ♦ CountDownLatch准许进入一次。如同刚刚解释的,一旦CountDownLatch的内部计数器到达0,再调用这个方法将不起作用。如果要做类似的同步,就必须创建一个新的 CountDownLatch 对象。

  CountDownLatch 类提供了另外一种 await()方法,即 await(long time, TimeUnit unit)。这个方法被调用后,线程将休眠直到被中断,或者CountDownLatch的内部计数器达到0, 或者指定的时间已经过期。第二个参数是TimeUnit类型,TimeUnit类是以下常量的枚举: DAYS、HOURS、MICROSECONDS、MILLISECONDS、MINUTES、NANOSECONDS、SECONDS。

3.5在集合点的同步

  Java并发API提供了 CycleBarrie类,它也是一个同步辅助类。它允许两个或者多个线程在某个点上进行同步。这个类与上一节所讲述的CountDownLatch类类似,但也有不同之处,使之成为更强大的类。

  CycleBarrie类使用一个整型数进行初始化,这个数是需要在某个点上同步的线程数。当一个线程到达指定的点后,它将调用await()方法等待其他的线程。当线程调用mvaitO 方法后,CycleBarrie类将阻塞这个线程并使之休眠直到所有其他线程到达。当最后一个 线程调用CycleBarrie类的await()方法时,CycleBarrie对象将唤醒所有在等待的线程,然后这些线程将继续执行。

  CycleBarrie类有一个很有意义的改进,即它可以传入另一个Runnable对象作为初始化参数。当所有的线程都到达集合点后,CycleBarrie类将这个Runnable对象作为线程执行。这个特性使得这个类在并行任务上可以媲美分治编程技术(Divide and Conquer Programming Technigue)。

  在本节中,我们将学习如何使用CyclicBarrier类使一组线程在集合点上同步。在所有线程都到达集合点后,我们将使用Rumiable对象并且运行它。在这个范例中,我们将在数字矩阵中寻找一个数字(使用分治编程技术)。这个矩阵会被分成几个子集,然后每个线程在一个子集中査找。一旦所有线程都完成查找,最终的任务将统一这些结果。

package com.concurrency.utils;

import java.util.Random;

/**
 * 矩阵模拟类,随机生成0-9之间数字二维矩
 */
public class MatrixMock {
    /**
     * 0-9之间数字二维矩阵
     */
    private int[][] data;

    /**
     * 构造函数
     *
     * @param size   矩阵的行数
     * @param length 每行的长度
     * @param number 要查找的数字
     */
    public MatrixMock(int size, int length, int number) {
        int counter = 0;
        data = new int[size][length];
        Random random = new Random();
        for (int i = 0; i < size; i++) {
            for (int j = 0; j < length; j++) {
                data[i][j] = random.nextInt(10);
                if (data[i][j] == number) {
                    counter++;
                }
            }
        }

        System.out.printf("Mock: There are %d ocurrences of number in generated data.\n", counter, number);
    }

    /**
     * 获取行row行数据
     *
     * @param row 行数
     * @return 第row行数据,没有就返回null
     */
    public int[] getRow(int row) {
        if (row >= 0 && row < data.length) {
            return data[row];
        }

        return null;
    }
}


package com.concurrency.utils;

/**
 * 结果类,保存矩阵中每行找到指定数字的次数
 */
public class Results {
    /**
     * 保存矩阵中每行找到指定数字的次数
     */
    private int[] data;

    /**
     * 构造函数
     *
     * @param size 数组长度
     */
    public Results(int size) {
        this.data = new int[size];
    }

    /**
     * 设置数组的值
     *
     * @param position 位置
     * @param value    要设置的值
     */
    public void setData(int position, int value) {
        data[position] = value;
    }

    /**
     * 获取保存的数据
     *
     * @return 保存的数据
     */
    public int[] getData() {
        return data;
    }
}


package com.concurrency.task;

import com.concurrency.utils.MatrixMock;
import com.concurrency.utils.Results;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * 查找类
 */
public class Searcher implements Runnable {
    /**
     * 开始查找的行数
     */
    private int firstRow;
    /**
     * 最后查找的行数(不包含)
     */
    private int lastRow;
    /**
     * 矩阵模拟对象
     */
    private MatrixMock mock;
    /**
     * 结果对象
     */
    private Results results;
    /**
     * 要查找的数字
     */
    private int number;
    /**
     * 同步栅
     */
    private final CyclicBarrier barrier;

    /**
     * 构造函数
     *
     * @param barrier  同步栅
     * @param firstRow 开始查找的行数
     * @param lastRow  最后查找的行数(不包含)
     * @param mock     矩阵模拟对象
     * @param results  结果对象
     * @param number   要查找的数字
     */
    public Searcher(CyclicBarrier barrier, int firstRow, int lastRow, MatrixMock mock, Results results, int number) {
        this.barrier = barrier;
        this.firstRow = firstRow;
        this.lastRow = lastRow;
        this.mock = mock;
        this.results = results;
        this.number = number;
    }

    /**
     * 核心方法,查找指定行数范围内的指定数字,将结果保存在结果数组对应的位置
     */
    @Override
    public void run() {
        int counter;
        System.out.printf("%s: Processing lines from %d to %d.\n", Thread.currentThread().getName(), firstRow, lastRow);
        for (int i = firstRow; i < lastRow; i++) {
            int row[] = mock.getRow(i);
            counter = 0;
            for (int j = 0; j < row.length; j++) {
                if (row[j] == number) {
                    counter++;
                }
            }

            results.setData(i, counter);
        }

        System.out.printf("%s: Lines processed.\n", Thread.currentThread().getName());
        try {
            barrier.await(); // 等待所有查找完成
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}


package com.concurrency.task;

import com.concurrency.utils.Results;

/**
 * 组合类,汇总查找的结果
 */
public class Grouper implements Runnable {
    /**
     * 结果对象
     */
    private Results results;

    /**
     * 构造函数
     *
     * @param results 结果对象
     */
    public Grouper(Results results) {
        this.results = results;
    }

    /**
     * 核心方法,对查找的结果进行汇总
     */
    @Override
    public void run() {
        int finalResult = 0;
        System.out.printf("Grouper: Processing results...\n");
        int data[] = results.getData();
        for (int number : data) {
            finalResult += number;
        }
        System.out.printf("Grouper: Total result: %d.\n", finalResult);
    }
}


package com.concurrency.core;

import com.concurrency.task.Grouper;
import com.concurrency.task.Searcher;
import com.concurrency.utils.MatrixMock;
import com.concurrency.utils.Results;

import java.util.concurrent.CyclicBarrier;

public class Main {
    public static void main(String[] args) {

        final int ROWS = 10000; // 矩阵的行数
        final int NUMBERS = 1000; // 矩阵的列数
        final int SEARCH = 5; // 要查询的数字
        final int PARTICIPANTS = 5; // 查询线程的个数
        final int LINES_PARTICIPANT = 2000; // 每个查询线程处理的行数
        MatrixMock mock = new MatrixMock(ROWS, NUMBERS, SEARCH); // 创建矩阵模拟对象
        Results results = new Results(ROWS); // 创建结果对象
        Grouper grouper = new Grouper(results); // 创建组合对象

        // 创建一个同步栅对象,它有5个参与者, 5个参与者线程完成后,会调用grouper中的run方法
        CyclicBarrier barrier = new CyclicBarrier(PARTICIPANTS, grouper);

        // 创建5个参与者对象,并且让它们各自在单独的线程中运行
        Searcher[] searchers = new Searcher[PARTICIPANTS];
        for (int i = 0; i < searchers.length; i++) {
            searchers[i] = new Searcher(barrier, i * LINES_PARTICIPANT, (i * LINES_PARTICIPANT) + LINES_PARTICIPANT,
                    mock, results, PARTICIPANTS);

            Thread thread = new Thread(searchers[i]);
            thread.start();
        }
        System.out.printf("Main: The main thread has finished.\n");
    }
}





图3.5-1 运行结果

  这个范例解决的问题很简单。我们有一个很大的随机数矩阵,想要知道这个矩阵中包含了指定数据的个数。为了获得更好的性能,我们使用了分治编程技术——将矩阵分离成5个子集,并且在每个子集中使用线程进行査找,这些线程是查找类Searcher对象。

  我们使用CyclkBarrier对象同步5个线程,执行Grouper查找任务处理结果,并且计算最终的结果。

  如之前提到的,CyclicBarrieir类有一个内部计数器,可以控制指定数目的几个线程必 须都到达集合点。每一个线程到达集合点后就会调用await()方法通知CyclicBarrier对象, CyclicBarrier对象会让这个线程休眠直到其他所有的线程都到达集合点。

  当所有线程都到达集合点之后,CyclicBarrier对象就唤醒所有在await()方法里等待的线程,同时,还可以以构造器传入的Runnable对象(范例中的Grouper对象)创建一个新的线程,以执行其他任务。

  CyclicBarrier类还提供了另一种方法:

  await(long time,TimeUnit unit)。这个方法被调用后,线程将一直休眠到被中断,或者CyclicBarrier的内部计数器到达0,或者指定的时间已经过期.第二个参数是TimeUnit类型,它是一个常量枚举类型,它的值包含:DAYS、HOURS、MICROSECONDS、 MILLISECONDS、MINUTES、 NANOSECONDS 和SECONDS.

  CyclicBarrier类还提供了 getNumberWaiting()方法和getParties()方法,前者将返回在await()上阻塞的线程的数目,后者返回被CyclicBarrier对象同步的任务数。

  重置 CyclicBarrier对象

  虽然CyclicBarrier类和CountDowmLatch类有很多共性,但是它们也有一定的差异。 其中最重要的不同是,CyclicBarrier对象可以被重置回初始状态,并把它的内部计数器重置成初始化时的值。

  CyclicBarrier对象的重置,是通过CyclicBarrier类提供的reset()方法完成的。当重 置发生后,在await()方法中等待的线程将收到一个BrokenBarrierException异常。本例是将这个异常打印出来,但是在更复杂的应用程序中,它可以用来执行其他的操作,比如重新执行或者将操作复原回被中断时的状态。

损坏的CyclicBarrier对象

  CyclicBarrier对象有一种特殊的状态即损坏状态(Broken)。当很多线程在await()方法上等待的时候,如果其中一个线程被中断,这个线程将抛出InterruptedException异常, 其他的等待线程将抛出BrokenBarrierException异常,于是CyclicBarrier对象就处于损坏状态了。

  CyclicBarrier类提供了 isBroken()方法,如果处于损坏状态就返回true,否则返回false。

3.6并发阶段任务的运行

  Java并发API还提供了一个更复杂、更强大的同步辅助类,即Phaser,它允许执行并发多阶段任务。当我们有并发任务并且需要分解成几步执行时,这种机制就非常适用。 Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步。

  跟其他同步工具-样,必对Phaser类中参与同步操作的任务数进行初始化,不同的是,我们可以动态地增加或者减少任务数。

  在本节中,我们将学习如何使用Phaser类同步三个并发任务。这三个任务将在三个不同的文件夹及其子文件夹中查找过去24小时内修改过扩展名为.log的文件。这个任务分成 以下三个步骤:

  1. 在指定的文件夹及其子文件夹中获得扩展名为.log的文件:

  2. 对第一步的结果进行过滤,删除修改时间超过24小时的文件;

  3. 将结果打印到控制台。

  在第一步和第二步结束的时候,都会检查所查找到的结果列表是不是有元素存在。如 果结果列表是空的,对应的线程将结束执行,并且从phaser中删除。

package com.concurrency.task;

import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

/**
 * 文件查找类,它将在一个文件夹及子文件夹中查找过去24小时内修改过的指定扩展名的文件
 */
public class FileSearch implements Runnable {
    /**
     * 用于查找的文件夹
     */
    private String initPath;
    /**
     * 要查找的文件的扩展名
     */
    private String end;
    /**
     * 查找到的文件的完整路径
     */
    private List<String> results;
    /**
     * 阶段对象,用来控制任务不同阶段的同步
     */
    private Phaser phaser;

    /**
     * 构造函数,初始化声明的属性
     *
     * @param initPath 用于查找的文件夹
     * @param end      要查找的文件的扩展名
     * @param phaser   阶段对象
     */
    public FileSearch(String initPath, String end, Phaser phaser) {
        this.initPath = initPath;
        this.end = end;
        this.phaser = phaser;
        this.results = new ArrayList<>();
    }

    /**
     * 核心方法
     */
    @Override
    public void run() {
        // 等待所有文件对象被创建
        this.phaser.arriveAndAwaitAdvance();
        System.out.printf("%s: Starting.\n", Thread.currentThread().getName());

        // 第一个阶段:查找所有的文件
        File file = new File(initPath);
        if (file.isDirectory()) {
            directoryProcess(file);
        }

        // 如果没有找到结果,就从这个阶段对象中注销,并且退出程序
        if (!checkResults()) {
            return;
        }

        // 第二阶段:过滤查找到的结果
        filterResults();

        // 如果过滤后没有结果,就从这个阶段对象中注销,并且退出程序
        if (!checkResults()) {
            return;
        }

        // 第三阶段:显示查找信息
        showInfo();
        // 通知Phaser对象,当前线程已经结束这个阶段,并且将不再参与接下来的阶段操作
        phaser.arriveAndDeregister();
        System.out.printf("%s: Work completed.\n", Thread.currentThread().getName());

    }

    /**
     * 将结果集中的元素打印到控制台
     */
    private void showInfo() {
        for (String result : this.results) {
            File file = new File(result);
            System.out.printf("%s: %s\n", Thread.currentThread().getName(), file.getAbsolutePath());
        }

        // 通知Phaser对象,当前线程已经完成了当前阶段,需要被阻塞直到其它线程都完成当前阶段
        this.phaser.arriveAndAwaitAdvance();
    }

    /**
     * 检查结果集是否为空,如果结果集为空就从阶段对象中注销,否则等待其它的线程完成同样的的任务阶段
     *
     * @return false结果集为空,true结果集不为空
     */
    private boolean checkResults() {
        if (this.results.isEmpty()) {
            System.out.printf("%s: Phase %d: 0 results.\n", Thread.currentThread().getName(), phaser.getPhase());
            System.out.printf("%s: Phase %d: End.\n", Thread.currentThread().getName(), phaser.getPhase());
            // 通知Phaser对象,当前线程已经结束这个阶段,并且将不再参与接下来的阶段操作
            this.phaser.arriveAndDeregister();
            return false;
        } else {
            System.out.printf("%s: Phase %d: %d results.\n", Thread.currentThread().getName(), phaser.getPhase(), results.size());
            // 通知Phaser对象,当前线程已经完成了当前阶段,需要被阻塞直到其它线程都完成当前阶段
            this.phaser.arriveAndAwaitAdvance();
            return true;
        }
    }

    /**
     * 文件过滤方法,将不是24小时前修改过的文件删除
     */
    private void filterResults() {
        List<String> newResults = new ArrayList<>();
        long actualDate = new Date().getTime();
        for (String result : results) {
            File file = new File(result);
            long fileDate = file.lastModified();
            // 表明修改是在24小时前发生的
            if (actualDate - fileDate < TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS)) {
                newResults.add(result);
            }
        }
        results = newResults;
    }

    /**
     * 目录处理方法,处理file目录下的所有文件夹和文件
     *
     * @param file 开始处理的文件
     */
    private void directoryProcess(File file) {

        // 获取file目录下的所有文件和目录
        File list[] = file.listFiles();
        if (list != null) {
            for (File aFile : list) {
                if (aFile.isDirectory()) {
                    // 如果是目录就递归处理它
                    directoryProcess(aFile);
                } else {
                    // 如果是一个文件,就调用文件处理方法
                    fileProcess(aFile);
                }
            }
        }
    }

    /**
     * 文件处理方法
     *
     * @param file 文件对象
     */
    private void fileProcess(File file) {
        // 如果文件以指定的后缀点结束,就将文件的绝对路径保存到结果集合中
        if (file.getName().endsWith(end)) {
            results.add(file.getAbsolutePath());
        }
    }
}


package com.concurrency.core;

import com.concurrency.task.FileSearch;

import java.util.concurrent.Phaser;

public class Main {
    public static void main(String[] args) {
        // 创建一个阶段对象,它有三个参与者
        Phaser phaser = new Phaser(3);

        // 创建一个文件搜索对象,每一个搜索不同的目录
        FileSearch system = new FileSearch("C:\\Windows", "log", phaser);
        FileSearch apps = new FileSearch("C:\\Program Files", "log", phaser);
        FileSearch documents = new FileSearch("C:\\Documents And Settings", "log", phaser);

        // 创建一个线程运行文件搜索对象,并且启动线程
        Thread systemThread = new Thread(system, "System");
        systemThread.start();

        // 创建一个线程运行文件搜索对象,并且启动线程
        Thread appsThread = new Thread(apps, "Apps");
        appsThread.start();

        // 创建一个线程运行文件搜索对象,并且启动线程
        Thread documentsThread = new Thread(documents, "Documents");
        documentsThread.start();

        // 等待所有的线程都结束
        try {
            systemThread.join();
            appsThread.join();
            documentsThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.printf("Terminated: %s\n", phaser.isTerminated());
    }
}




图3.6-1 运行结果

  本范例开始的时候创建了 Phaser对象,用于在每个阶段结束时对线程同步进行控制。 Phaser构造器传入了参与阶段同步的线程的个数。在这个例子中,Phaser有三个参与线程。这个数字通知Phaser在唤醒所有休眠线程以进行下一个阶段之前,必须执行 arriveAndAwaitAdvance() 方法的线程数。

在Phaser创建后,我们使用三个不同的文件査找对象创建了三个线程并且启动了它们。

  备注:这个例子使用了 Windows操作系统的路径。如果是其他操作系统,需要根据相应环境修改文件路径。

  在文件査找类FileSearch的run()方法中,第一个指令是调用Phaser对象的 arriveAndAwaitAdvance()方法。如前所述,Phaser知道我们要同步的线程数,当一个线程调用这个方法时,Phaser对象将减1,并且把这个线程置于休眠状态,直到所有其他线程完成这个阶段。在run()方法的开头调用这个方法可以保障在所有线程创建好之前没有线程开始执行任务。(即所有线程都在同一个起跑线上。)

  在第一阶段和第二阶段结束的时候,检査在这个阶段中是不是生成了结果集以及结果集中是不是有元素。在第一个阶段,checkResults()方法调用了 arriveAndAwaitAdvance()方法。在第二个阶段,如果结果集是空的,对应的线程没有理由继续执行,所以返回:但是必须通知Phaser对象参与同步的线程少了一个。为了达到这个目地,我们使用了 arriveAndDeregister()方法。这就实现了对phaser对象的通知,即这个线程已经完成了当前语句,并且不会在下一个阶段中参与,因而phaser对象在开始下一个阶段时不会等待这 个线程了。

  在第三阶段结束的时候,在showInfo()方法中调用了 phaser对象的 arriveAndAwaitAdvance()方法。通过这个调用,确保三个线程都已完成。当showInfo()方法执行完成之后,还调用了 Phaser对象的arriveAndDeregister()方法。通过这个调用,撤销了 phaser中线程的注册,所以当所有线程运行结束的时候,phaser对象就没有参与同步的线程了。

  最后,main()方法等待所有三个线程完成后,调用了 Phaser对象的isTerminated()方法。当phaser对象不存在参与同步的线程时,phaser是终止状态的.isTerminated()方法 将返回ture。当取消所有线程的注册时,phaser对象会变成终止状态,所以,这个调用将打印到控制台的是true信息。

  一个Phaser对象有两种状态。

  ♦活跃态(Active):当存在参与同步的线程的时候,Phaser就是活跃的,并且在每个阶段结束的时候进行同步。在这种状态中,Phaser的执行如前文所述。但在Java并发 API中并没有提到这种状态。

  ♦终止态(Termination):当所有参与同步的线程都取消注册的时候,Phaser就处于终止状态,在这种状态下,Phaser没有任何参与者。更具体地说,当Phaser对象的 onAdvance()方法返回true的时候,Phaser对象就处于了终止态。通过覆盖这个方法可以 改变默认的行为。当Phaser是终止态的时候,同步方法arriveAndAwaitAdvance()会立即返回,而且不会做任何同步的操作。

  Phaser类的一个重大特性就是不必对它的方法进行异常处理。不像其他的同步辅助类,被Phaser类置于休眠的线程不会响应中断事件,也不会抛出InterruptedException异常(只有一种特例会抛出异常,见下文)。

  Phaser类提供了一些其他改变Phaser对象的方法,这些方法如下。

  ♦ arrive()这个方法通知Phaser对象一个参与者已经完成了当前阶段,但是它不应该等待其他参与者都完成当前阶段。必须小心使用这个方法,因为它不会与其他线程同步。

  ♦ awaitAdvance(int phase):如果传入的阶段参数与当前阶段一致,这个方法会将当前线程置于休眠,直到这个阶段的所有参与者都运行完成。如果传入的阶段参数与当前 段不一致,这个方法将立即返回。

  ♦ awaitAdvanceInterruptibly(int phaser):这个方法跟 awaitAdvance(int phase)一样,不同之处是,如果在这个方法中休眠的线程被中断,它将抛出InterruirtedException 异常。

将参与者注册到Phaser中

创建一个Phaser对象时,需要指出有多少个参与者。Phaser类提供了两种方法增加注册者的数量,这些方法如下。

  ♦ register():这个方法将一个新的参与者注册到Phaser中,这个新的参与者将被当成没有执行完本阶段的线程。

  ♦ bulkRegister(intParties>:这个方法将指定数目的参与者注册到Phaser中,所有这些新的参与者都将被当成没有执完本阶段的线程。

Phaser类只提供了一种方法减少注册者的数目,即airiveAndDeregisterO方法。它通知Phaser对象对应的线程己经完成了当前阶段,并且它不会参与到下一个阶段的操作中。

  强制终止Phaser

  当一个phaser对象没有参与线程的时候,它就处于终止状态。Phaser类提供了 forceTermination()方法来强制phaser进入终止态,这个方法不管phaser中是否存在注册的参与线程。当一个参与线程产生错误的时候,强制phaser终止是很有意义的。

当一个phaser 处于终止态的时候,awaitAdvance()和 arriveAndAwaitAdvanceG方法立即返回一个负数,而不再是一个正值了。如果知道Phaser可能会被终止,就需要验证这些方法的返回值,以确定phaser是不是被终止了。

3.7并发阶段任务中的阶段切换

  Phaser类提供了 onAdvance()方法,它在phaser阶段改变的时候会被自动执行。 onAdvance()方法需要两个int型的传入参数:当前的阶段数以及注册的参与者数量。它返回的是boolean值,如果返回false表示phaser在继续执行,返回true表示phaser已经经完成执行并且进入了终止态。

  这个方法默认实现如下:如果注册的参与者数量是0就返回true,否则就返回false。 但是我们可以通过继承Phaser类覆盖这个方法。一般来说,当必须在从一个阶段到另一个 阶段过渡的时候执行一些操作,那么我们就得这么做。

  在本节中,我们将通过范例学习如何控制Phaser中的阶段改变。这个范例将实现自己的Phaser类,并且覆盖onAdvance()方法在每个阶段改变的时候执行一些操作。这个范例将模拟考试,考生必须做三道试题,只有当所有学生都完成一道试题的时候,才能继续下一个。

package com.concurrency.task;

import java.util.concurrent.Phaser;

/**
 * 线程阶段类,控制线程阶段的改变
 */
public class MyPhaser extends Phaser {
    /**
     * @param phase             实际的阶段
     * @param registeredParties 注册的线程数
     * @return false表明要进一步执行,true表明已经完成
     */
    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
        switch (phase) {
            case 0:
                return studentArrived();
            case 1:
                return finishFirstExercise();
            case 2:
                return finishSecondExercise();
            case 3:
                return finishExam();
            default:
                return true;
        }
    }

    /**
     * 从0->1阶段改变时调用这个方法
     *
     * @return 总是返回false,表明还要断续执行
     */
    private boolean studentArrived() {
        System.out.printf("Phaser: The exam are going to start. The students are ready.\n");
        System.out.printf("Phaser: We have %d students.\n", getRegisteredParties());
        return false;
    }

    /**
     * 从1->2阶段改变时调用这个方法
     *
     * @return 总是返回false,表明还要断续执行
     */
    private boolean finishFirstExercise() {
        System.out.printf("Phaser: All the students has finished the first exercise.\n");
        System.out.printf("Phaser: It's turn for the second one.\n");
        return false;
    }

    /**
     * 从2->3阶段改变时调用这个方法
     *
     * @return 总是返回false,表明还要断续执行
     */
    private boolean finishSecondExercise() {
        System.out.printf("Phaser: All the students has finished the second exercise.\n");
        System.out.printf("Phaser: It's turn for the third one.\n");
        return false;
    }

    /**
     * 从3->4阶段改变时调用这个方法
     *
     * @return 总是返回false,表明还要断续执行
     */
    private boolean finishExam() {
        System.out.printf("Phaser: All the students has finished the exam.\n");
        System.out.printf("Phaser: Thank you for your time.\n");
        return true;
    }
}


package com.concurrency.task;

import java.util.Date;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

/**
 * 学生类
 */
public class Student implements Runnable {
    /**
     * 控制线程执行的阶段对象
     */
    private Phaser phaser;

    /**
     * 构造函数
     * @param phaser  控制线程招手执行的阶段对象
     */
    public Student(Phaser phaser) {
        this.phaser = phaser;
    }

    /**
     * 核心方法,进入考试状态,做三个测试题,每做完一个测试题,它调用阶段对象等待所有其的学生都完成同样的测试题
     */
    @Override
    public void run() {
        System.out.printf("%s: Has arrived to do the exam. %s\n", Thread.currentThread().getName(), new Date());
        phaser.arriveAndAwaitAdvance();

        System.out.printf("%s: Is going to do the first exercise. %s\n", Thread.currentThread().getName(), new Date());
        doExercise1();
        System.out.printf("%s: Has done the first exercise. %s\n", Thread.currentThread().getName(), new Date());
        phaser.arriveAndAwaitAdvance();

        System.out.printf("%s: Is going to do the second exercise. %s\n", Thread.currentThread().getName(), new Date());
        doExercise2();
        System.out.printf("%s: Has done the second exercise. %s\n", Thread.currentThread().getName(), new Date());
        phaser.arriveAndAwaitAdvance();

        System.out.printf("%s: Is going to do the third exercise. %s\n", Thread.currentThread().getName(), new Date());
        doExercise3();
        System.out.printf("%s: Has finished the exam. %s\n", Thread.currentThread().getName(), new Date());
        phaser.arriveAndAwaitAdvance();
    }

    /**
     * 做一个测试题,并且休眠[0, 10)秒
     */
    private void doExercise1() {
        try {
            Long duration = (long) (Math.random() * 10);
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 做一个测试题,并且休眠[0, 10)秒
     */
    private void doExercise2() {
        try {
            Long duration = (long) (Math.random() * 10);
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 做一个测试题,并且休眠[0, 10)秒
     */
    private void doExercise3() {
        try {
            Long duration = (long) (Math.random() * 10);
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


package com.concurrency.core;

import com.concurrency.task.MyPhaser;
import com.concurrency.task.Student;

public class Main {
    public static void main(String[] args) {
        // 创建一个阶段对象
        MyPhaser phaser = new MyPhaser();
        // 创建一个学生对象,将它们注册到同一个阶段对象中
        Student[] students = new Student[5];
        for (int i = 0; i < students.length; i++) {
            students[i] = new Student(phaser);
            phaser.register();
        }

        // 创建五个线程来运行五个学生对象,并且启动线程
        Thread[] threads = new Thread[5];
        for (int i = 0; i < students.length; i++) {
            threads[i] = new Thread(students[i]);
            threads[i].start();
        }

        // 等待所有线程完成执行
        try {
            for (int i = 0; i < threads.length; i++) {
                threads[i].join();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 检查阶段是否已经到达完成状态
        System.out.printf("Main: The phaser has finished: %s.\n", phaser.isTerminated());
    }
}




图3.7-1 运行结果

  这个范例模拟了有三道试题的考试过程。所有的学生必须做完第一道题才可以开始做第二道。为了实现同步,我们使用了 Phaser类,并且通过继承Phaser类和覆盖onAdvance()方法,实现了自己的Phaser。

  phaser对象进行阶段切换的时候,在所有在arriveAndAwaitAdvance()方法里休眠的 线程被唤醒之前,onAdrance()方法将被自动调用。这个方法的传入参数是当前阶段序号, 其中0表示第一个阶段,另一个传入参数是注册的参与者。其中当前阶段序号最有用。如果要根据阶段序号执行不同的操作,那么就必须使用一个可选择的结构(if-else或者 switch)来选择要执行的操作。在本范例中,我们使用switch结构为每个阶段来选择不同的方法。

  onAdvance()方法返回布尔值以表明phaser终止与否,false表示没有终止,因而线程可以继续执行其他的阶段。如果返回值是true,则Phaser仍然唤醒等待的线程,但是状态已经改变成终止状态,所以继续调用Phaser的方法将立即返回,并且isTerminated()方法 也将返回true。

  在主类中,创建MyPhaser对象时,并没有指定Phaser的参与者数目,但是每个学生对象都调用了 phaser的register()方法,这将在phaser中注册。这个调用并没有建立学生对象或者它对应的执行线程与phasei()之间的关联。实际上,phasei()中的参与者数目只是一 个数字,Phaser与参与者不存丰任何关联。

3.8并发任务间的数据交换

  Java并发API还提供了一个同步辅助类,它就是Exchanger,它允许在并发任务之间交换数据。具体来说,Exchanger类允许在两个线程之间定义同步点(Synchronization Point)。当两个线程都到达同步点时,它们交换数据结构,因此第一个线程的数据结构进入到第二个线程中,同时第二个线程的数据结构进入到第一个线程中。

  Exchanger类在生产者-消费者问题情境中很有用。这是一个经典的并发场景,包含一个数据缓冲区,一个或者多个数据生产者,一个或者多个数据消费者。Exchanger类只能同步两个线程,如果有类似的只有一个生产者和消费者的问题,就可以使用Exchanger类。

  在本节中,我们将学习如何使用Exchanger类来解决一对一的生产者-消费者问题。

package com.concurrency.task;

import java.util.List;
import java.util.concurrent.Exchanger;

/**
 * 生产者类
 */
public class Producer implements Runnable {
    /**
     * 生产者生产数后存储的地方,也是与消费者交换数据的地方
     */
    private List<String> buffer;
    /**
     * 同步生产者与消息者交换数据的交换对象
     */
    private final Exchanger<List<String>> exchanger;

    /**
     * 构造函数,初始化属性
     *
     * @param exchanger 数据的交换对象
     * @param buffer    数据存储对象
     */
    public Producer(Exchanger<List<String>> exchanger, List<String> buffer) {
        this.exchanger = exchanger;
        this.buffer = buffer;
    }

    /**
     * 核心方法,产生100个事件,分10次产生,每次产生10个事件,每个产生10个事件后,调用数据交换对象去同步消费者。
     * 生产者将存放10个事件的缓存对象发送给消费者,并且从消费者那里接收到一个空的缓存对象
     */
    @Override
    public void run() {
        int cycle = 1;
        for (int i = 0; i < 10; i++) {
            System.out.printf("Producer: Cycle %d\n", cycle);
            // 生产10个事件
            for (int j = 0; j < 10; j++) {
                String message = "Event " + ((i * 10) + j);
                System.out.printf("Producer: %s\n", message);
                buffer.add(message);
            }

            try {
                // 与消费者交换数据
                buffer = exchanger.exchange(buffer);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.printf("Producer: %d\n", buffer.size());

            cycle++;
        }
    }
}


package com.concurrency.task;

import java.util.List;
import java.util.concurrent.Exchanger;

/**
 * 消费者类
 */
public class Consumer implements Runnable {

    /**
     * 消费者消费数据的地方,也是与消费者交换数据的地方
     */
    private List<String> buffer;
    /**
     * 同步生产者与消息者交换数据的交换对象
     */
    private final Exchanger<List<String>> exchanger;

    /**
     * 构造函数,初始化属性
     *
     * @param exchanger 数据的交换对象
     * @param buffer    数据存储对象
     */
    public Consumer(Exchanger<List<String>> exchanger, List<String> buffer) {
        this.exchanger = exchanger;
        this.buffer = buffer;
    }

    /**
     * 核心方法,它消费生产者产生的事件,每消费10个事件后,它使用交换对象去同步生产者。
     * 它将已经消费完的空缓存对象发送给生产者,同时获取生产者生产的装有10个事件的缓存对象
     */
    @Override
    public void run() {
        int cycle = 1;
        for (int i = 0; i < 10; i++) {
            System.out.printf("Consumer: Cycle %d\n", cycle);

            try {
                // 等待生产的数据,并且将空的缓存对象发送给生产者
                buffer = exchanger.exchange(buffer);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.printf("Consumer: %d\n", buffer.size());

            for (int j = 0; j < 10; j++) {
                String message = buffer.get(0);
                System.out.printf("Consumer: %s\n", message);
                buffer.remove(0);
            }

            cycle++;
        }
    }
}


package com.concurrency.core;

import com.concurrency.task.Consumer;
import com.concurrency.task.Producer;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;

public class Main {
    public static void main(String[] args) {
        // 创建两个缓存对象
        List<String> buffer1 = new ArrayList<>();
        List<String> buffer2 = new ArrayList<>();

        // 创建一个交换器对象
        Exchanger<List<String>> exchanger = new Exchanger<>();

        // 创建生产者对象
        Producer producer = new Producer(exchanger, buffer1);
        // 创建消费者对象
        Consumer consumer = new Consumer(exchanger, buffer2);

        // 创建消费者对象和生产者对象放置在不同的线程中
        Thread threadProducer = new Thread(producer);
        Thread threadConsumer = new Thread(consumer);

        // 启动两个线程
        threadProducer.start();
        threadConsumer.start();
    }
}





图3.8-1 部分运行结果

  消费者先创建一个空的缓存列表,然后通过调用Exchanger与生产者同步来获得可以消费的数据。生产者从一个空的缓存列表开始执行,它创建了10个字符串,然后存储在这个缓存中,并且使用exchanger对象与消费者同步。

  在这个同步点上,两个线程(生产者和消费者)都在Exchanger里,它们交换数据结构,当消费者从exchange()方法返回的时候,它的缓存列表有10个字符串。当生产者从 Consumer返回的时候,它的缓存列表是空的。这个操作将循环执行10次。

运行这个范例,我们可以看到生产者和消费者是怎样并发工作的以及两个对象是怎样在每一步中交换数据的。如果使用其他的同步辅助类,第一个线程调用exchange()后会被置于休眠,直到其他的线程到达。

  Exchanger 类还提供了另外的 exchange 方法,即 exchange(V data, long time, TimeUnitimtt)方法。其中第一个传入参数的类型是V,即要交换的数据结构(本例中是 List)。这个方法被调用后,线程将休眠直到被中断,或者其他的线程到达,或者已耗费了指定的time值。第三个传入参数的类型是TimeUnit,它是枚举类型,其值包含以下常最:DAYS、HOURS、MICROSECONDS、MILLISECONDS、MINUTES、NANOSECONDS 和 SECONDS。

欢迎转载,转载请注明出处/article/1365092.html

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: