您的位置:首页 > 职场人生

黑马程序员——Java5中的线程并发库(一)---概述、线程池、Callable和Future、Lock和Condition

2014-10-17 16:05 591 查看
 -------  android培训java培训、期待与您交流! ----------

第一部分 多线程与数据共享

一.ThreadLocal实现线程范围的共享变量

1.见下面的示意图和辅助代码解释ThreadLocal的作用和目的,用于实现线程内数据共享,即对于相同的程序代码,多个模块在同一个线程中运行时要共享一份数据,而在另外线程中运行时共享另外一份数据。

注1:线程范围内共享数据的示意图:

关于线程范围内变量共享的举例,直接用程序代码进行说明,创建三个线程,它们都访问三个对象,第一个对象设置值,第二、三个对象取值,同一个线程设置的值,只能被相同的线程获取。

首先,用如下代码来说明如何实现全局共享:

public class ThreadDataShare {
static Map<Thread,Integer> threadData=new HashMap<Thread, Integer>();
static class A {
public void get() {
int data = threadData.get(Thread.currentThread());
System.out.println("A:"+Thread.currentThread()
.getName()+":"+data);
}
}
static class B {
public void get() {
int data = threadData.get(Thread.currentThread());
System.out.println("B:"+Thread.currentThread()
.getName()+":"+data);
}
}
public static void main(String[] args) {
for (int i = 0; i < 2; i++) {
new Thread(new Runnable() {
public void run() {
int data = new Random().nextInt();
System.out.println(Thread.currentThread()
.getName()+":"+ data);
threadData.put(Thread.currentThread(), data);
new A().get();
new B().get();
}
}).start();
}
}
}


2.每个线程调用全局的ThreadLocal对象的set方法,就相当往其内部的map中增加一条记录,key分别是各自的线程,value是各自的set方法传进去的值。在线程结束时可以调用ThreadLocal.clear()方法,这样会更快释放内存,不调用也可以,因为线程结束后也可以释放相关的ThreadLocal变量。

3.ThreadLocal的应用场景:

>订单处理包含一系列操作:减去存量、增加一条流水账、修改总账,这几个操作要在同一个事物中完成,通常也即同一个线程中进行处理,如果累加公司应收款的操作失败了,则应该把前面的操作回滚,否则提交所有操作,这要求这些操作使用相同的数据库连接对象,而这些操作的代码分别位于不同的模块类中

>银行转账包含一系列操作:把转出账户的余额减少,把状如账户的余额增加,这两个操作要在同一个事物中完成,它们必须使用相同的数据库连接对象,转入和转出操作的代码分别是两个不同账户的方法。

>例如:Struts2的ActionContext,同一段代码被不同的线程调用运行时,该代码操作的数据时每个线程各自状态和数据,对不同的线程来说,getContext方法拿到的对象都不相同,对于同一个线程来说,不管调用getContext方法多少次和在那个模块中getContext,拿到的都是同一个。

4.实验案例:定义一个全局共享的ThreadLocal变量,然后启动多个线程向该ThreadLocal变量中存储一个随机值,接着线程调用其他多个类的方法,这多个类的方法中读取这个ThreadLocal变量的值,就可以看到多个类在同一个线程中共享同一份数据。

注4:改为用ThreadLocal实现

public class ThreadDataShare {
static ThreadLocal<Integer> x=new ThreadLocal<Integer>();
static class A {
public void get() {
int data = x.get();
System.out.println("A:"+Thread.currentThread()
.getName() + ":"+ data);
}
}
static class B {
public void get() {
int data = x.get();
System.out.println("B:" + 		Thread.currentThread().getName() + ":"
+ data);
}
}
public static void main(String[] args) {
for (int i = 0; i < 2; i++) {
new Thread(new Runnable() {
public void run() {
int data = new Random().nextInt();
System.out.println(Thread.currentThread().getName() + ":"
+ data);
x.set(data);
new A().get();
new B().get();
}
}).start();
}
}
}


5.实现对ThreadLocal变量的封装,让外界不要直接操作ThreadLocal变量

>对基本类型数据的封装,这种应用相对很少见

>对对象类型数据的封装,比较常见,即让某个类针对不同线程分别创建一个独立的实例对象。

注5:

class MyThreadScopeData {
private MyThreadScopeData() {
}
private static ThreadLocal<MyThreadScopeData> map = new ThreadLocal<MyThreadScopeData>();
public static MyThreadScopeData getInstance() {
MyThreadScopeData instance = map.get();
if (instance == null) {
instance = new MyThreadScopeData();
map.set(instance);
}
return instance;
}
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
public class ThreadDataShare {
static class A {
public void get() {
System.out.println("A:" + Thread.currentThread().getName()
+
4000
"  getMyData: " + MyThreadScopeData.getInstance().getName()
+ "," + MyThreadScopeData.getInstance().getAge());
}
}
static class B {
public void get() {
System.out.println("B:" + Thread.currentThread().getName()
+ "  getMyData: " + MyThreadScopeData.getInstance().getName()
+ "," + MyThreadScopeData.getInstance().getAge());
}
}
public static void main(String[] args) {
for (int i = 0; i < 2; i++) {
new Thread(new Runnable() {
public void run() {
int data = new Random().nextInt();
System.out.println(Thread.currentThread().getName() + ":"
+ data);
MyThreadScopeData.getInstance().setName("name:" + data);
MyThreadScopeData.getInstance().setAge(data);
new A().get();
new B().get();
}
}).start();
}
}
}


6.总结:一个ThreadLocal代表一个变量,故其中只能放一个数据,你有连个变量都要线程范围内共享,则要定义两个ThreadLocal对象,如果有100个变量要共享数据呢?那请先定义一个对象类装100个变量,然后在ThreadLocal中存储这样一个对象

二.多个线程访问共享对象和数据的方式

1.如果每个线程执行的代码相同,可以使用同一个Runnable对象,这个Runnable对象中有个共享数据,例如,买票系统就可以这么做。

2.如果每个线程执行的代码不同,这时候要用不同的Runnable对象,有如下两种方式来实现这些Runnable对象之间的数据共享。

>将共享数据封装在另外一个对象中,然后将这个对象传递给各个Runnable对象,每个线程共享数据的操作方法也分配到那个对象身上去完成,这样容易实现针对该数据进行的各个操作的互斥和通信

>将这些Runnable对象作为某一个类中的内部类,共享数据作为这个外部类中的成员变量,每个线程对共享数据的操作方法也分配给外部类,以便实现共享数据进行的各个操作的互斥和通信。作为内部类的各Runnable对象调用外部类的这些方法。

>上面两种方式的组合,将共享数据封装在另外一个对象中,每个线程对共享数据的操作方法也分配到那个对象身上去完成。对象作为这个外部类中的成员变量或方法中的局部变量,每个线程的Runnable对象作为外部类中的成员内部类或局部内部类

>总之,要同步互斥的几段代码最好是分别放在几个独立的方法中,这些方法再放在同一个类中,这样比较容易实现他们之间的同步和通信

3.极端且简单的方式:即在任意一个类中定义一个static变量,这将被所有线程共享

第二部分 Java5中的线程并发库(一)

一.概述

1.看java.util.concurrent包中的API帮助文档

>看concurrent包下的帮助文档页面,对并发库中涉及的内容有一个总体上的介绍

2.了解java.util.concurrent.atomic包

>查看atomic包文档页面下面的介绍,可以对基本数据,对数组中的基本数据,对类中的基本数据等进行操作。

AtomicInteger,AtomicBoolean,AtomicLong

AtomicIntegerArray,AtomicIntegerArray,AtomicIntegerFieldUpdater

>通过如下两个方法快速理解atomic包的意义

AtomicInteger类的boolean compareAndSet(expactedValue,updateValue)

AtomicIntegerArray类的int addAndGet(int i,int delta)

>顺带解释volatile类型的作用,需要查看java语言规范

3.了解java.util.concurrent.lock包

>下面通过案例详细讲解

二.线程池

1.线程池的概念与Executor类的作用

>创建固定大小的线程池

Executor.newFixedThreadPool(3);//创建有3个固定数量线程的线程池

>创建缓存线程池

Executor.new CachedThreadPool();

//缓存线程池中的线程的数量不固定,之后的任务需要多少线程就创建多少不会让任务处于等待中

>创建单一线程池

Executor.newSingleThreadExecutor();//始终保持池中只有一个线程,若这个线程死了,会重新启动一个线程

注1:线程池的概念,首先创建一些线程,他们的集合称为线程池,当服务器接收到一个请求后,就从线程池中取出一个空闲的线程位置服务,服务完后不关闭线程,而是将线程换回到线程池中。

在线程池的编程模式下,任务是提交给整个线程池,而不是直接交给摸某个线程,线程池在拿到任务后,它就在内部找有无空闲的线程,再把再把任务交给某个空闲的线程,这就是封装。记住任务是提交给整个线程池,一个线程同时只能执行一个任务,但可以同时向一个线程池提交多个任务。

2.关闭线程池

>shutdown()与shutdownNow的比较

注2:shutdownNow()会立即结束所有线程不管线程池中的线程有没有正在执行任务,而shutdown()会在线程池没有任务全部空闲时关闭线程池

源代码案例:

public class ThreadPoolTest {
public static void main(String[] args) {
ExecutorService threadpool = Executors.newFixedThreadPool(3);
for (int i = 1; i <= 10; i++) {
final int task = i;
threadpool.execute(new Runnable() {
public void run() {
for (int j = 1; j <= 10; j++) {
System.out.println(Thread.currentThread().getName()
+ " is looping of " + j + "for task of" + task);
}
}
});
}
threadpool.shutdown();
}
}


3.用线程池启动定时器

>调用ScheduleExecutorService的schedule方法,返回的scheduleFuture对象可以取消任务

>支持间隔重复任务的定时方式,不直接支持绝对定时方式,需要转换成相对时间方式

注3:案例源码:

//1.10秒后执行
Executors.newScheduledThreadPool(3).schedule(//
new Runnable() {
public void run() {
System.out.println("bombing!");
}
},//
10,//
TimeUnit.SECONDS);


// 2.10秒后执行,之后每隔两秒执行一次
Executors.newScheduledThreadPool(3).scheduleAtFixedRate(//
new Runnable() {
public void run() {
System.out.println("bombing!");
}
},//
10,//
2, //
TimeUnit.SECONDS);


三.Callable和Future(用来获取线程执行后的结果)

1.Future取得的结果类型和Callable返回的结果类型必须一致,这是通过泛型实现的

2.Callable要采用ExecuteService的submit方法提交,返回的Future对象可以取消任务

注2:案例代码:

ExecutorService threadpool=Executors.newSingleThreadExecutor();
Future<String> fure=threadpool.submit(new Callable<String>() {
public String call() throws Exception {
Thread.sleep(2000);
return "hello";
}
});
System.out.println(fure.get());

3.CompletionService用于提交一组Callable任务,其take方法返回已完成的一个Callable任务对应的Future对象

>好比我们同时种了几块地的麦子,然后就等待收割。收割时则是那块先成熟,就先收割哪块麦子。

注3:1~10号线程每个线程睡眠时长是随机的,哪个先醒来,先显示哪个。

public static void main(String[] args) throws Exception {
ExecutorService threadpool = Executors.newFixedThreadPool(10);
CompletionService<Integer> cs=new ExecutorCompletionService<Integer>(
threadpool);
for (int i = 1; i <= 10; i++) {
final int seq = i;
cs.submit(new Callable<Integer>() {
public Integer call() throws Exception {
Thread.sleep(new Random().nextInt(5000));
return seq;
}
});
}
for(int i=0;i<10;i++){
System.out.println(cs.take().get());
}
}


四.Lock和Condition

1.Lock比传统线程模型中的synchronized方式更加面向对象,与生活中的锁类似,锁本身也应该是一个对象,两个线程执行的代码片段要实现同步互斥的效果,他们必须用同一个Lock对象。锁是上在代表要操作的资源类的内部方法中,而不是线程代码中。

注1:Lock的例子,

class Outputor {
Lock lock = new ReentrantLock();
public void output(String name) {
int len = name.length();
lock.lock();
try {
for (int i = 0; i < len; i++)
System.out.print(name.charAt(i));
System.out.println();
} finally {
lock.unlock();
}
}
}


2.读写锁:分为读锁和和写锁,多个读锁不互斥,读锁与写锁互斥,写锁与写锁互斥,这是由JVM控制的,你只要上好相应的锁即可。如果你的代码只读数据,可以很多人同时读,但不能同时写,那就上读锁;如果你的代码修改数据,只能有一个人写,且不能同时读取,那就上写锁,终止读的时候上读锁,写的时候上写锁;

注2:读写锁的例子,

例1,

class Queue3 {
private Object data = null;
ReadWriteLock rwl = new ReentrantReadWriteLock();
public void get() {
rwl.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()
+ "be ready to read data");
Thread.sleep((long) (Math.random() * 1000));
System.out.println(Thread.currentThread().getName() + " read data:"
+ data);
} catch (Exception e) {
e.printStackTrace();
} finally {
rwl.readLock().unlock();
}
}
public void put(Object data) {
rwl.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()
+ "be ready to write data");
Thread.sleep((long) Math.random() * 1000);
this.data = data;
System.out.println(Thread.currentThread().getName()
+ "had write data:" + data);
} catch (Exception e) {
} finally {
rwl.writeLock().unlock();
}
}
}


例2,缓存代理:先用Hibernate的load()方法,用的是饿汉模式来讲解读写锁,延伸至缓存代理模式,

class CachedData {
Object data;
volatile boolean cacheValid;
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// 必须在获取写锁之前释放读锁
rwl.readLock().unlock();
rwl.writeLock().lock();
// 另一个线程有可能访问,所以要重新检查状态标志
// 进入写锁之后在写数据之前改变缓存状态标志的值
if (!cacheValid) {
data = ...;//此处通过查询从数据库中取数据
cacheValid = true;
}
//在释放写锁之前通过获取读锁来降级写锁
rwl.readLock().lock();
rwl.writeLock().unlock(); //释放写锁,但仍然持有读锁
}
use(data);//此处使用数据
rwl.readLock().unlock();
}
}


例3,设计缓存系统

public class CacheDemo {
private Map<String,Object> cache=new HashMap<String, Object>();
private ReadWriteLock rwl=new ReentrantReadWriteLock();
public Object getData(String key){
rwl.readLock().lock();
Object value=null;
try{
value=cache.get(key);
if(value==null){
rwl.readLock().unlock();
rwl.writeLock().lock();
try{
if(value==null)
value="aaa";//实际执行查询
}finally{
rwl.writeLock().unlock();
}
}
rwl.readLock().lock();
}finally{
rwl.readLock().unlock();
}
rwl.readLock().unlock();
return value;
}
}


3.Condition的功能类似在传统线程技术中的object.wait和object.notify的功能。在等待Condition时允许发生“虚假唤醒”,这通常作为对基础平台语义的让步。对于大多数程序这样带来的实际影响很小,因为Condition应该总是在一个循环中被等待,并测试正被等待的状态声明。某个实现可以随意移除可能的虚假唤醒,但建议应用程序员总是假定这些虚假唤醒可能发生,因此总在一个循环中等待。

注3:

案例1,创建两个线程通过Condition的阻塞和唤醒功能来进行线程通讯,从 而实现两个线程交替打印10次

public class ConditionCommunication {
public static void main(String[] args) {
final Business business=new Business();
new Thread(new Runnable() {
public void run() {
for(int i=1;i<=10;i++)
business.sub(i);
}
}).start();
for(int i=1;i<=10;i++)
business.main(i);
}
static class Business{
Lock lock=new ReentrantLock();
Condition condition=lock.newCondition();
private boolean sShouldSub=true;
public void sub(int i){
lock.lock();
try{
while(!sShouldSub)
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for(int j=1;j<=10;j++)
System.out.println("sub thread sequence of "
+j+",loop of "+i);
sShouldSub=false;
condition.signal();
}finally{
lock.unlock();
}
}
public void main(int i){
lock.lock();
try{
while(sShouldSub)
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for(int j=1;j<=10;j++)
System.out.println("main thread sequence of "
+j+",loop of "+i);
sShouldSub=true;
condition.signal();
}finally{
lock.unlock();
}
}
}
}


案例2:实现线程安全的阻塞队列

class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length)
putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length)
takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}


4.一个锁内部可以有多个Condition,即有多路等待和通知,可以参看jdk1.5提供的Lock与Condition实现的叫阻塞队列的应用案例,从中除了要体味算法还要体味面向对象的封装。在传统的线程机制中一个监视器对象上只能有一路等待和通知,要想实现多路等待和通知,必须嵌套使用多个同步监视器对象。(如果只有一个Condition,两个放的都在等,一旦一个方的进去了,那么它的通知可能导致另一个放的接着走)

注4.三个Condition通信

1,2,3号线程每个分别循环10,20,30次,一次循环显示

示例代码如下:

class Business{
Lock lock=new ReentrantLock();
Condition condition1=lock.newCondition();
Condition condition2=lock.newCondition();
Condition condition3=lock.newCondition();
private int shouldShow=1;
public void show1(int i){
lock.lock();
try{
while(shouldShow!=1)
try {
condition1.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for(int j=1;j<=10;j++)
System.out.println("show1 thread sequence of "
+j+" ,loop of "+i);
shouldShow=2;
condition2.signal();
}finally{
lock.unlock();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: