您的位置:首页 > 其它

第二章 Basic Thread Synchronization (基础线程同步) 【下】

2017-08-12 18:25 344 查看
涉及的内容
同步一个方法
同步类中分配一个独立属性
在同步代码中使用条件
使用Lock锁定代码块
同步数据的读写锁
修改Lock公平模式
在Lock中使用多条件

1、同步数据的读写锁

ReadWriteLock接口 和 ReentrantReadWriteLock类

例子:使用ReadWriteLock接口获取一个存储两个产品的价格

package com.jack;

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class PricesInfo {

private double price1;
private double price2;
private ReadWriteLock lock;
public PricesInfo() {
super();
this.price1 = 1.0;
this.price2 = 2.0;
lock = new ReentrantReadWriteLock();
}

public double getPrice1(){
lock.readLock().lock();
double value = price1;
lock.readLock().unlock();
return value;
}

public double getPrice2(){
lock.readLock().lock();
double value = price2;
lock.readLock().unlock();
return value;
}

public void setPrices(double price1, double price2){
lock.writeLock().lock();
this.price1 = price1;
this.price2= price2;
lock.writeLock().unlock();
}
}

package com.jack;

public class Reader implements Runnable {

private PricesInfo pricesInfo;

public Reader(PricesInfo pricesInfo) {
super();
this.pricesInfo = pricesInfo;
}

@Override
public void run() {
for (int i=0; i< 10; i++){
System.out.printf("%s:价格1 : %f\n", Thread.currentThread().getName()
,pricesInfo.getPrice1());
System.out.printf("%s:价格2:%f\n", Thread.currentThread().getName(),
pricesInfo.getPrice2());
}
}

}

package com.jack;

public class Writer implements Runnable{

private PricesInfo pricesInfo;

public Writer(PricesInfo pricesInfo) {
super();
this.pricesInfo = pricesInfo;
}

@Override
public void run() {
for(int i=0; i<3; i++){
System.out.printf("写:尝试修改价格.\n");
pricesInfo.setPrices(Math.random()*10, Math.random()*8);
System.out.printf("写:价格已经修改了.\n");
try{
Thread.sleep(2);
} catch(InterruptedException e){
e.printStackTrace();
}

}

}

}

package com.jack;

public class Main {
public static void main(String[] args){
PricesInfo pricesInfo = new PricesInfo();
Reader readers[] = new Reader[5];
Thread threadsReader[] = new Thread[5];
for (int i=0; i<5; i++){
readers[i] = new Reader(pricesInfo);
threadsReader[i] = new Thread(readers[i]);
}

Writer writer = new Writer(pricesInfo);
Thread threadWriter = new Thread(writer);
for(int i=0; i<5; i++){
threadsReader[i].start();
}
threadWriter.start();
}
}


日志:



总结:

1、创建5个读线程和一个写线程,
2、创建一个new ReentrantReadWriteLock() 读写锁。
3、对于需要锁定部分进行锁住,这时候只有一个线程可以执行。最后释放锁
4、写锁并不会限制读,所以出现脏读。

2、修改Lock公平模式

non-fair mode(非公平模式) : false  (选择执行线程没有规则)

fair mode(公平模式) :true  (选择规则等待时间最长)

例子:了解公平模式和非公平模式

package com.jack;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class PrintQueue {
private final Lock queueLock = new ReentrantLock(true);

public void printJob(Object document){
queueLock.lock();
try {
Long duration = (long) (Math.random()*10000);
System.out.printf(Thread.currentThread().getName()
+ ":打印队列:打印一个工作持续时间 %s ",(duration/1000)
+ " seconds \n");
Thread.sleep(duration);
} catch (InterruptedException e){
e.printStackTrace();
}finally{
queueLock.unlock();
}
queueLock.lock();
try {
Long duration = (long) (Math.random()*10000);
System.out.printf(Thread.currentThread().getName()
+ ":打印队列:打印一个工作持续时间 %s ",(duration/1000)
+ " seconds \n");
Thread.sleep(duration);
} catch (InterruptedException e){
e.printStackTrace();
}finally{
queueLock.unlock();
}
}
}

package com.jack;

public class Main {
public static void main(String[] args){
PrintQueue printQueue = new PrintQueue();
Thread thread[] = new Thread[10];
for (int i=0; i<10; i++){
thread[i]= new Thread(new Job(printQueue), "线程  " + i);
}

for(int i=0; i<10; i++){
thread[i].start();
try{
Thread.sleep(100);
} catch (InterruptedException e){
e.printStackTrace();
}
}
}
}


日志:
线程 0:去打印一个文档
线程 0:打印队列:打印一个工作持续时间 7 seconds
线程 1:去打印一个文档
线程 2:去打印一个文档
线程 3:去打印一个文档
线程 4:去打印一个文档
线程 5:去打印一个文档
线程 6:去打印一个文档
线程 7:去打印一个文档
线程 8:去打印一个文档
线程 9:去打印一个文档
线程 1:打印队列:打印一个工作持续时间 9 seconds
线程 2:打印队列:打印一个工作持续时间 4 seconds
线程 3:打印队列:打印一个工作持续时间 9 seconds
线程 4:打印队列:打印一个工作持续时间 8 seconds
线程 5:打印队列:打印一个工作持续时间 2 seconds
线程 6:打印队列:打印一个工作持续时间 9 seconds
线程 7:打印队列:打印一个工作持续时间 4 seconds
线程 8:打印队列:打印一个工作持续时间 5 seconds
线程 9:打印队列:打印一个工作持续时间 3 seconds
线程 0:打印队列:打印一个工作持续时间 4 seconds
线程 0: 这个文档已经打印了
线程 1:打印队列:打印一个工作持续时间 6 seconds
线程 1: 这个文档已经打印了
线程 2:打印队列:打印一个工作持续时间 3 seconds
线程 2: 这个文档已经打印了
线程 3:打印队列:打印一个工作持续时间 6 seconds
线程 3: 这个文档已经打印了
线程 4:打印队列:打印一个工作持续时间 5 seconds
线程 4: 这个文档已经打印了
线程 5:打印队列:打印一个工作持续时间 7 seconds
线程 5: 这个文档已经打印了
线程 6:打印队列:打印一个工作持续时间 4 seconds
线程 6: 这个文档已经打印了
线程 7:打印队列:打印一个工作持续时间 3 seconds
线程 7: 这个文档已经打印了
线程 8:打印队列:打印一个工作持续时间 4 seconds
线程 8: 这个文档已经打印了
线程 9:打印队列:打印一个工作持续时间 7 seconds
线程 9: 这个文档已经打印了


总结:

1、公平模式下,当线程0放弃一个锁的时候,线程1获取线程0放弃的锁,线程0继续等待(这个类似,打牌一样,轮着出牌,线程0要等到下一轮)
2、非公平模式就是全凭CPU大人指派了。
3、简单来说,公平:民主国家,非公平:独裁国家

3、在Lock中使用多条件(Condtion接口)

唤醒线程的boolean值

生产者-消费者

package com.jack;

public class FileMock {

private String content[];
private int index;

public FileMock(int size, int length){
content = new String[size];
for(int i=0; i<size; i++){
StringBuilder buffer = new StringBuilder(length);
for(int j=0; j<length; j++){
int indice = (int)Math.random()*255;
buffer.append((char)indice);
}
content[i] = buffer.toString();
}
index = 0;
}

public boolean hashMoreLines(){
return index < content.length;
}

public String getLine(){
if(this.hashMoreLines()) {
System.out.printf("Mock: ", (content.length-index));
return content[index++];
}
return null;
}
}
package com.jack;

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Buffer {

private LinkedList<String> buffer;
private int maxSize;
private ReentrantLock lock;
private Condition lines;
private Condition space;
//判断当前行是否在buffer中
private boolean pendingLines;

public Buffer(int maxSize) {
this.maxSize = maxSize;
buffer = new LinkedList<>();
lock = new ReentrantLock();
lines = lock.newCondition();
space = lock.newCondition();
pendingLines = true;
}

/**
* 插入一行
* @param line
*/
public void insert(String line){
lock.lock();
try{
while(buffer.size() == maxSize){
space.await();
}
buffer.offer(line);
System.out.printf("%s: 插入行: %d\n", Thread.currentThread().getName(), buffer.size());
//唤醒所有线程
lines.signalAll();
} catch (InterruptedException e){
e.printStackTrace();
}finally {
lock.unlock();
}
}

/**
* 获取一行
* @return
*/
public String get(){
String line = null;
lock.lock();
try{
while ((buffer.size()==0) && (hasPendingLines())){
lines.await();
}
if(hasPendingLines()){
line = buffer.poll();
System.out.printf("%s :行数已经读取:%d\n", Thread.currentThread().getName(), buffer.size());
space.signalAll();
}
}catch (InterruptedException e){
e.printStackTrace();
}finally{
lock.unlock();
}
return line;
}

public void setPendingLines(boolean pendingLines){
this.pendingLines = pendingLines;
}

public boolean hasPendingLines(){
return pendingLines||buffer.size() >0;
}
}

package com.jack;

public class Producer implements Runnable{

private  FileMock mock;
private Buffer buffer;

public Producer(FileMock mock, Buffer buffer) {
super();
this.mock = mock;
this.buffer = buffer;
}

@Override
public void run() {
buffer.setPendingLines(true);
while (mock.hashMoreLines()){
String line = mock.getLine();
buffer.insert(line);
}
buffer.setPendingLines(false);
}

}

package com.jack;

import java.util.Random;

public class Consumer implements Runnable{
private Buffer buffer;

public Consumer(Buffer buffer) {
super();
this.buffer = buffer;
}
@Override
public void run() {
while (buffer.hasPendingLines()){
String line = buffer.get();
processLine(line);
}
}
private void processLine(String line) {
try{
Random random = new Random();
Thread.sleep(random.nextInt(100));
} catch (InterruptedException e){
e.printStackTrace();
}

}

}
package com.jack;

public class Main {
public static void main(String[] args){
FileMock mock = new FileMock(100, 10);
Buffer buffer = new Buffer(20);
Producer producer = new Producer(mock, buffer);
Thread threadProducer = new Thread(producer, "Producer");

Consumer consumers[] = new Consumer[3];
Thread threadConsumers[] = new Thread[3];

for (int i=0; i<3; i++){
consumers[i] = new Consumer(buffer);
threadConsumers[i] = new Thread(consumers[i], "Consumer "+i);
}
threadProducer.start();
for(int i=0; i<3; i++){
threadConsumers[i].start();
}
}
}


日志:部分日志
MOck: Producer: 插入行: 1
MOck: Producer: 插入行: 2
MOck: Consumer 0 :行数已经读取:1
Consumer 1 :行数已经读取:0
Producer: 插入行: 1
MOck: Producer: 插入行: 2
MOck: Consumer 2 :行数已经读取:1
Producer: 插入行: 2
MOck: Producer: 插入行: 3
MOck: Producer: 插入行: 4
MOck: Producer: 插入行: 5
MOck: Producer: 插入行: 6
MOck: Producer: 插入行: 7
MOck: Producer: 插入行: 8
MOck: Producer: 插入行: 9
MOck: Producer: 插入行: 10
MOck: Producer: 插入行: 11
MOck: Producer: 插入行: 12
MOck: Producer: 插入行: 13
MOck: Producer: 插入行: 14
MOck: Producer: 插入行: 15
MOck: Producer: 插入行: 16
MOck: Producer: 插入行: 17
MOck: Producer: 插入行: 18
MOck: Producer: 插入行: 19
MOck: Consumer 1 :行数已经读取:18
Producer: 插入行: 19
MOck: Producer: 插入行: 20
MOck: Consumer 1 :行数已经读取:19
Producer: 插入行: 20
MOck: Consumer 1 :行数已经读取:19
Producer: 插入行: 20
MOck: Consumer 0 :行数已经读取:19
Producer: 插入行: 20
MOck: Consumer 0 :行数已经读取:19


总结:

1、采用space和lines都是两个条件来影响生产还是消费。
2、这里用了Condition的sign()和signAll()方法来唤醒对方
3、FileMock模拟生产文件。
4、pendingLines判断buffer还有未读取的行。true表示有未读行。(表示还在生产)

第二章完。。。。。。。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: