您的位置:首页 > 编程语言 > Java开发

译 -- Java 并发编程(多线程)三 | Semaphore | ThreadLocal | synchronized

2017-01-18 14:57 543 查看
接着上一篇译 – Java 并发编程(多线程)二

原文地址:http://stackoverflow.com/documentation/java/121/concurrent-programming-threads#t=201701170653119627647

Synchronization

在Java中, 有一个内置语言级别的锁机制:synchronized 块, 它使用任何的java对象作为一个固有锁(i.e. 每个java对象都有一个监视器和它相关联)

固有锁原子的提供了一组表述。为了理解它对我们有什么意义, 让我们来看一个
`synchronized
` 非常有用的例子 :

private static int t = 0;
private static Object mutex = new Object();

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
synchronized (mutex) {
t++;
System.out.println(MessageFormat.format("t: {0}", t));
}
});
}
executorService.shutdown();
}


这种情况下, 如果没有
`synchronized
同步块, 将会有很多的并发问题出现, 第一个是计算增量的操作(本身就不是原子的), 第二个是我们所观察到的t值是其它任意数量的线程修改之后的。然而, 既然我们获取了固有锁之后, 便没有了竞争条件, 输出也会以正常的顺序包含1到100.

固有锁在java 中是互斥的(i.e. 相互执行的锁)。 相互执行意味着一个线程占有了锁, 第二个线程获取锁之前被强制等待第一个线程释放它。注意: 一个操作可能会把线程放到等待(睡眠)队列状态,被称之为阻塞操作,因此获取一个锁是一个阻塞操作。

固有锁在Java 中是可重入的。意味着如果一个已经拥有锁的线程再次去获取锁, 它将不会阻塞,且会成功的获取它。 例如,下面的代码在调用时将不会阻塞:

public void bar(){
synchronized(this){
...
}
}
public void foo(){
synchronized(this){
bar();
}
}


synchronized
同步块旁边, 有
`synchronized
方法。

下面的代码块理论上是等价的(即使字节码看起来不同)

1、
synchronized
块在
this


public void foo() {
synchronized(this) {
doStuff();
}
}


2、
`synchronized
方法:

public synchronized void foo() {
doStuff();
}


3、同样的对于
static
方法:

class MyClass {
...
public static void bar() {
synchronized(MyClass.class) {
doSomeOtherStuff();
}
}
}


和下面有相同的作用:

class MyClass {
...
public static synchronized void bar() {
doSomeOtherStuff();
}
}


使用一个线程池完成两个
int
数组相加

一个线程池有一个任务队列, 每个任务都将会在某个线程中执行。下面的例子展示怎么使用线程池完成两个
int
数组相加。

Java SE 8
int[] firstArray = { 2, 4, 6, 8 };
int[] secondArray = { 1, 3, 5, 7 };
int[] result = { 0, 0, 0, 0 };

ExecutorService pool = Executors.newCachedThreadPool();

// Setup the ThreadPool:
// for each element in the array, submit a worker to the pool that adds elements
for (int i = 0; i < result.length; i++) {
final int worker = i;
pool.submit(() -> result[worker] = firstArray[worker] + secondArray[worker] );
}

// Wait for all Workers to finish:
try {
// execute all submitted tasks
pool.shutdown();
// waits until all workers finish, or the timeout ends
pool.awaitTermination(12, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
pool.shutdownNow(); //kill thread
}

System.out.println(Arrays.toString(result));


注意:

1、 这个例子仅仅是为了做解释说明, 实际上,对于这个小任务,使用线程并不会有任何的加速,更可能减慢, 因为任务创建和调度的花费将会消耗很多时间。

2、如果你使用java 7 或着更早的版本, 你将会看到匿名类而不是lambda表达式来实现这个任务。

原子操作

一个原子操作是一个被称之为“all at once”的操作, 在原子操作执行期间, 没有任何其它线程有查看或修改状态的机会。

看一个糟糕的示例:

private static int t = 0;

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
t++;
System.out.println(MessageFormat.format("t: {0}", t));
});
}
executorService.shutdown();
}


在这种情况下, 有两个问题, 第一个问题是增量操作不是原子的, 它有多个操作组成: 获取值, 值加1, 写回值。这也就是为什么我们运行这个示例,很可能我们输出不会看到
t:100
两个线程可能并行的获取这个值,修改,然后写回。假设值为10, 两个线程同时加1, 都将会把值设为11, 因为第二个线程看到的值t是在第一个线程还没有完成修改之前。

第二个问题是当我们在当前线程的进行的加值操作之后打印值t, 值t可能已经被 另一个线程修改了。

解决这个问题, 我们使用
java.util.concurrent.atomic.AtomicInteger
有很多原子操作供我们使用。

private static AtomicInteger t = new AtomicInteger(0);

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
int currentT = t.incrementAndGet();
System.out.println(MessageFormat.format("t: {0}", currentT));
});
}
executorService.shutdown();
}


AtomicInteger
`incrementAndGet
方法原子的增加且返回一个新值,因此消除了之前的竞争条件。 注意在这个示例中, 输出行仍然是无序的, 因为我们没有努力去串行化
println
调用, 这也不在本示例的讨论范围内,既然它要求同步, 示例的目标就是展示如何使用
AtomicInteger
来消除有关状态的竞争条件。

排它写/并行读访问

有些时候要求一些进程并发的读写一些数据

ReadWriteLock
接口, 和它的实现
`ReentrantReadWriteLock
允许描述如下的访问模式:

1、有任意数量的读者读数据, 如果至少一个读者被授权读, 那么没有任何写者有访问数据的可能

2、最多有一个写者写数据, 如果有一个写着写数据, 没有读者可以访问数据。

一个实现像下面这样:

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Sample {

// Our lock. The constructor allows a "fairness" setting, which guarantees the chronology of lock attributions.
protected static final ReadWriteLock RW_LOCK = new ReentrantReadWriteLock();

// This is a typical data that needs to be protected for concurrent access
protected static int data = 0;

/** This will write to the data, in an exclusive access */
public static void writeToData() {
RW_LOCK.writeLock().lock();
try {
data++;
} finally {
RW_LOCK.writeLock().unlock();
}
}

public static int readData() {
RW_LOCK.readLock().lock();
try {
return data;
} finally {
RW_LOCK.readLock().unlock();
}
}

}


终止执行

Thread.sleep
可以造成当前线程延迟特定期限执行, 这是一种有效的方式, 使处理器有更多可利用的时间给其它应用程序的线程, 在Tread类中有两个重载的
sleep
方法。

一个明确了毫秒级的睡眠时间

public static void sleep(long millis) throws InterruptedException


一个明确了纳米级的睡眠时间

public static void sleep(long millis, int nanos)


终止执行一秒

Thread.sleep(1000);


注意到这只是操作系统内核调度的一个示例, 它并不精确, 并且一些实现根本不考虑纳秒(很可能精确到毫秒)。

强烈推荐用try / catch 并且捕获
InterruptedException
来闭合
Thread.sleep
的调用.

信号量

基本上, 一个信号量是一个计数器, 它约束了能够访问临界区的线程数量。信号量维护了一组许可, 当被授权访问临界区,便会acquire进入, 当临界区可用便会release。

一个信号量的初始化:

Semaphore semaphore = new Semaphore(1); //the int value being the number of permits


Semaphore构造函数接受一个额外的boolean参数fairness,表示是否是公平模式, 当设置为false, 类就不会保证获取acquire许可的的线程的进入临界区顺序。 当设置为ture, 信号量保证了调用acquire方法的线程顺序的被选择获取许可, 在临界区内它们的方法调用被执行, 按以下方式声明:

Semaphore semaphore = new Semaphore(1, true);


让我们看一个来自javadocs的例子, Semaphore被用来控制一个items池的访问:

class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}

public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}

private Object getNextAvailableItem(){
//implementation
}

private void markAsUnused(Object o){
//implementation
}
}


线程终止/恢复函数

这个代码片段是在当你一段时间内不需要它的时候持有这个线程。之后恢复它的运行。

private volatile boolean killThread = false;
private Thread refreshThread;

refreshThread = new Thread(new Runnable() {
@Override
public void run() {
while (!killThread) {
try {
synchronized (refreshThread) {
while (!runningFlag) refreshThread.wait();
}
// do stuff e.g. refresh a RSS feed or smth.
Thread.sleep(2500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
refreshThread.start();

public void pauseAutoRefresh() {
runningFlag = false;
}

public void resumeAutoRefresh() {
runningFlag = true;
synchronized (refreshThread) {
refreshThread.notify();
}

public void killThread(){
killThread = true;
refreshThread.interrupt();
}


使用ThreadLocal

在Java 并发中一个有用的工具是
`ThreadLocal
, 它允许你有对于给定的线程独一无二的变量, 因此, 如果一些代码运行在不同的线程中, 这些执行体将不会共享这个值, 相反每个线程都有一个自己的本地变量。

例如, 这经常被频繁的使用在一个servlet的一个处理请求中来建立上下文(譬如授权信息),你可能会这样做:

private static final ThreadLocal<MyUserContext> contexts = new ThreadLocal<>();

public static MyUserContext getContext() {
return contexts.get(); // get returns the variable unique to this thread
}

public void doGet(...) {
MyUserContext context = magicGetContextFromRequest(request);
contexts.put(context); // save that context to our thread-local - other threads
// making this call don't overwrite ours
try {
// business logic
} finally {
contexts.remove(); // 'ensure' removal of thread-local variable
}
}


现在, 你能够在你需要的地方使用
MyServlet.getContext()
, 而不是通过传递
`MyUserContext
到每个方法中, 当然, 这的确引入了一个需要维护的变量, 但是它是线程安全的, 使用这个如此高全局作用域的变量消除了很多缺点。

这里主要的优点是每个线程在它的上下文容器中有一个本地变量. 只要从一个已定义的切入点(就像要求每个servlet维护它的上下文, 或添加一个servlet过滤器)使用它。当你需要的时候能够依赖这个上下文.

可视化的读写屏障当使用synchronized / volatile

我们都知道应该使用
`synchronized
原语去使一个方法或代码块的执行具有排他性。但是我们很少有人注意到一个使用
synchronized
volatile
很重要的方面.关键点:除了使一个单元代码具有原子性, 它也提供了读写屏障, 什么是读写屏障? 让我们使用下面的样例来讨论:

class Counter {

private Integer count = 10;

public synchronized void incrementCount() {
count++;
}

public Integer getCount() {
return count;
}
}


我们假设线程A首先调用了
`incrementCount()
接着另一个线程调用了
getCount()
, 在这个场景中并不能保证B会看到更新后的
count
值。 它可能看到
count
值还是10 ,甚至B可能永远都看不到更新后的
count
值。

要理解这个行为, 我们应该理解Java 内存模型是怎么和硬件结构结合的。 在Java中, 每个线程有自己的线程栈, 栈包含方法调用栈、线程创建的本地变量栈。 在多核系统中, 很可能两个线程并行的在独立的核中运行。 在这样的场景中,很可能线程栈的一部分位于一个核的寄存器/缓存中。如果在一个线程中, 一个线程访问被
synchronized
(或
volatile
)原语修饰的对象。
synchronized
块代码执行之后, 线程同步变量的本地拷贝到主存中。这就创建了一个读写屏障,确保线程看到最新的对象的值。

但是在我们的示例中, 因为线程B没有使用同步访问
count
, 它可能引用存储在寄存器中的
count
值, 且从来不会看到线程A的更新。 要确保线程B看到A最新值, 我们需要使
getCount()
也进行同步。

public synchronized Integer getCount() {
return count;
}


现在当线程A完成更新
count
的工作, 释放了
Counter
实例的锁, 同时创建了一个写屏障, 刷新代码块中进行的所有改变到主存中, 类似的, 当线程B在相同的
Counter
实例上获取锁时, 它进入了读屏障并且从主存中读取
count
值, 并能看到所有的更新。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息