您的位置:首页 > 编程语言 > Java开发

java concurrent 包中并发类的Demo

2015-07-30 17:35 531 查看
并非纯原创,如有雷同纯属巧合。



Atomic(原子操作):

package com.gaode.concurrent;

import java.util.concurrent.atomic.AtomicInteger;

/**
* i++ 或者 i-- 原子操作的问题
*/
public class TestAtomic extends Thread
{

//private static volatile int count = 0;
private static AtomicInteger count = new AtomicInteger(0);
private static final int times = Integer.MAX_VALUE;

public static void main(String[] args) throws InterruptedException
{

long curTime = System.nanoTime();

Thread decThread = new DecThread();
decThread.start();

// 使用run()来运行结果为0,原因是单线程执行不会有线程安全问题
// new DecThread().run();

System.out.println("Start thread: " + Thread.currentThread() + " i++");

for (int i = 0; i < times; i++)
{
//count++;
count.incrementAndGet();
}

System.out.println("End thread: " + Thread.currentThread() + " i--");

decThread.join();

long duration = System.nanoTime() - curTime;
/**
* 如果 count 是 int 就没法保证 count 的结果为 0
*/
System.out.println("Result: " + count);
System.out.format("Duration: %.2fs\n", duration / 1.0e9);
}

private static class DecThread extends Thread
{

@Override
public void run()
{
System.out.println("Start thread: " + Thread.currentThread() + " i--");
for (int i = 0; i < times; i++)
{
//count--;
count.decrementAndGet();
}
System.out.println("End thread: " + Thread.currentThread() + " i--");
}
}
}


tools(并发工具):

CountDownLatch:

package com.gaode.concurrent;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;

/**
* CountDownLatch类位于java.util.concurrent包下,利用它可以实现类似计数器的功能。
* 比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch
* 来实现这种功能了。
*/
public class TestCountDownLatch
{
static int i = 6;
static CountDownLatch countDownLatch = new CountDownLatch(i);

public static void main(String[] args)
{
S s = new S();
s.start();

for (int i = 0; i < 8; i++)
{
T t = new T();
t.start();
}

}

static class T extends Thread
{
@Override
public void run()
{
try
{
Thread.sleep(1000);
/**
* 将计数器减为0
*/
countDownLatch.countDown();
long count = countDownLatch.getCount();
System.out.println(count);
} catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("所有线程处理完毕,继续处理其他任务...");
}
}

static class S extends Thread
{
@Override
public void run()
{
try
{
Thread.sleep(1000);
/**
* 当countDownLatch计数减为0时才会执行
*/
countDownLatch.await();
} catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("必须等待count为0的时候才会执行");
}
}
}


CyclicBarrier:

package com.gaode.concurrent;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
* 一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。
* 在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。
* 因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。CyclicBarrier 支持
* 一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),
* 该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。
*/
public class TestCyclicBarrier
{
/**
* 没有当前个数的线程不会往下执行
*/
static int i = 5;
static CyclicBarrier barrier = new CyclicBarrier(i);

public static void main(String[] args)
{
for (int i = 0; i < 5; i++)
{
T t = new T();
t.start();
}
}

static class T extends Thread
{
@Override
public void run()
{
try
{
Thread.sleep(1000);
barrier.await();
} catch (InterruptedException e)
{
e.printStackTrace();
} catch (BrokenBarrierException e)
{
e.printStackTrace();
}
System.out.println("所有线程处理完毕,继续处理其他任务...");
}
}
}


Exchange:

package com.gaode.concurrent;

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* 两个线程之间交互数据,多了没用只会有前两个线程交换其他线程执行都不会执行
*/
public class TestExchange
{
public static void main(String[] args)
{
ExecutorService service = Executors.newCachedThreadPool();
final Exchanger exchanger = new Exchanger();
service.execute(new Runnable()
{

@Override
public void run()
{
try
{
String a = "A";
String b = (String) exchanger.exchange(a);
System.out.println("线程 1 交换数据 a => ? :" + a + "=>" + b);
} catch (Exception e)
{
e.printStackTrace();
}

}
});

service.execute(new Runnable()
{

@Override
public void run()
{
try
{
String c = "C";
String b = (String) exchanger.exchange(c);
System.out.println("线程 2 交换数据 c => ? :" + c + "=>" + b);
} catch (Exception e)
{
e.printStackTrace();
}

}
});

service.execute(new Runnable()
{

@Override
public void run()
{
try
{
String d = "D";
String b = (String) exchanger.exchange(d);
System.out.println("线程 3 交换数据 d => ? :" + d + "=>" + b);
} catch (Exception e)
{
e.printStackTrace();
}

}
});
}
}


Semaphore:

package com.gaode.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
* 使用一个计数器,用来限制线程的同时访问数量 PS:抢购神器
*/
public class TestSemaphore
{
static Semaphore semaphore = new Semaphore(10);

public static void main(String[] args)
{
ExecutorService service = Executors.newCachedThreadPool();

for (int i = 0; i < 100; i++)
{
Thread thread = new Thread()
{

@Override
public void run()
{
try
{
System.out.println("获得一个计数器");
semaphore.acquire();

Thread.sleep(1000);
} catch (InterruptedException e)
{
e.printStackTrace();
}

semaphore.release();
System.out.println("释放一个计数器");
}
};
service.execute(thread);
}
}
}


Executor(线程池):

package com.gaode.concurrent;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;

/**
* 传说中的线程池而已,使用线程最好用他
*/
public class TestExecutors extends Thread
{
ExecutorService service = Executors.newFixedThreadPool(1000);

class Cl
{
public String getClassString()
{
return "抗压类啊";
}
}

public void execute() throws Exception
{
List<Callable<String>> callables = new LinkedList<>();

final int rnd = (int) (Math.random() * 100);
for (int i = 0; i < 1000; i++)
{
Callable callable = new Callable()
{
@Override
public String call() throws Exception
{
Cl cl = new Cl();
return cl.getClassString();
}
};
callables.add(callable);
}
try
{
List<Future<String>> results = service.invokeAll(callables);
String name = null;
for (Future<String> future : results)
{
String cname = future.get();
if (name == null)
{
name = cname.intern();
}
if (name != cname)
{
throw new Exception("报错啦!怪我咯!");
}
}
service.shutdown();
} catch (InterruptedException e)
{
e.printStackTrace();
} catch (ExecutionException e)
{
e.printStackTrace();
}
}

@Override
public void run()
{
try
{
this.execute();
} catch (Exception e)
{
e.printStackTrace();
}
}

public static void main(String[] args) throws InterruptedException
{
int i = 10000;
do
{
Thread thread = new TestExecutors();
thread.start();
thread.join();
i--;
} while (i < 0);
System.out.println("太狠了 1w个线程");
}
}


Queue(线程安全队列):

线程不安全队列就是 LinkedList

package com.gaode.concurrent;

import java.util.*;
import java.util.concurrent.*;

public class TestQueue
{
int count = 0;
static Queue queue = new LinkedBlockingQueue<>();
static final int times = 100000;

public void push()
{
for (int i = 0; i < times; i++)
{
queue.add(count++);
}
}

public void get()
{
for (int i = 0; i < times; i++)
{
int c = (int) queue.remove();
System.out.println("Next:" + c);
}
}

public static void main(String args[]) throws InterruptedException
{

ExecutorService service = Executors.newFixedThreadPool(1000);

List<Callable<String>> callables = new LinkedList<>();

for (int i = 0; i < 1000; i++)
{
Callable callable = new Callable()
{
@Override
public String call() throws Exception
{
TestQueue testQueue = new TestQueue();
int var = (int) (Math.random() * 10);
if (var >= 5)
{
testQueue.push();
} else
{
testQueue.get();
}
return null;
}
};
callables.add(callable);
}
service.invokeAll(callables);

System.out.println("完成了");
}
}


Lock(线程锁):

package com.gaode.concurrent;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* 生产者和消费者的问题,线程锁问题
*/
public class TestReentrantLock
{
int count = 10;

LinkedList<String> list = new LinkedList<>();

ReentrantLock reentrantLock = new ReentrantLock();

/**
* 生产锁
*/
Condition pushLock = reentrantLock.newCondition();

/**
* 消费锁
*/
Condition getLock = reentrantLock.newCondition();

public void push(String stg)
{
reentrantLock.lock();

/**
* 最好用try 避免Exception 死锁
*/
try
{
while (list.size() >= count)
{
System.out.println("List 已满!");
pushLock.await();
}

list.add(stg);

getLock.signal();
} catch (InterruptedException e)
{
e.printStackTrace();
} finally
{
reentrantLock.unlock();
}
}

public String get()
{
reentrantLock.lock();
String stg = null;
try
{
while (list.size() <= 0)
{
System.out.println("List 为空!");
getLock.await();
}

stg = list.removeFirst();
pushLock.signal();

} catch (InterruptedException e)
{
e.printStackTrace();
} finally
{
reentrantLock.unlock();
}
return stg;
}

public static void main(String args[])
{
ExecutorService service = Executors.newFixedThreadPool(1000);

List<Callable<String>> callables = new LinkedList<>();

for (int i = 0; i < 1000; i++)
{
Callable callable = new Callable()
{
@Override
public String call() throws Exception
{
TestReentrantLock lock = new TestReentrantLock();
int var = (int) (Math.random() * 10);
if (var >= 5)
{
for (int i = 0; i < 100; i++)
{
lock.push(i + "");
}
} else
{
for (int i = 0; i < 100; i++)
{
String is = lock.get();
System.out.println(is);
}
}
if (lock.list.size() < 0 || lock.list.size() > 100)
{
throw new Exception("报错说明线程锁没有控制住!");
}
return null;
}
};
callables.add(callable);
}
try
{
List<Future<String>> results = service.invokeAll(callables);
service.shutdown();
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: