使用线程执行框架的一次经历
2016-10-16 00:00
579 查看
场景
一个线程从某个地方接收消息(数据),可以是其他主机或者消息队列,然后转由另外的一个线程池来执行具体处理消息的逻辑,并且消息的处理速度小于接收消息的速度。这种情景很常见,试想一下,你会怎么设计和实现?直观想法
很显然采用JUC的线程框架,可以迅速写出代码。消息接收者:
public class Receiver {
private static volatile boolean inited = false;
private static volatile boolean shutdown = false;
private static volatile int cnt = 0;
private MessageHandler messageHandler;
public void start(){
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
while(!shutdown){
init();
recv();
}
}
});
}
/**
* 模拟消息接收
*/
public void recv(){
Message msg = new Message("Msg" + System.currentTimeMillis());
System.out.println(String.format("接收到消息(%d): %s", ++cnt, msg));
messageHandler.handle(msg);
}
public void init(){
if(!inited){
messageHandler = new MessageHandler();
inited = true;
}
}
public static void main(String[] args) {
new Receiver().start();
}
}
消息处理:
public class MessageHandler {
private static final int THREAD_POOL_SIZE = 4;
private ExecutorService service = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
public void handle(Message msg) {
try {
service.execute(new Runnable() {
@Override
public void run() {
parseMsg(msg);
}
});
} catch (Throwable e) {
System.out.println("消息处理异常" + e);
}
}
/**
* 比较耗时的消息处理流程
*/
public void parseMsg(Message message) {
while (true) {
try {
System.out.println("解析消息:" + message);
Thread.sleep(5000);
System.out.println("============================");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
效果:这种方案导致的现象是接收到的消息会迅速堆积,我们从消息队列(或者其他地方)取出了大量消息,但是处理线程的速度又跟不上,所以导致的问题是大量的Task会堆积在线程池底层维护的一个阻塞队列中,这会极大的耗费存储空间,影响系统的性能。
分析:当execute()一个任务的时候,如果有空闲的worker线程,那么投入运行,否则看设置的最大线程个数,没有达到线程个数限制就创建新线程,接新任务,否则就把任务缓冲到一个阻塞队列中,问题就是这个队列,默认的大小是没有限制的,所以就会大量的堆积任务,必然耗费heap空间。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE); // capacity
}
计数限制
面对上述问题,想到了要限制消息接收的速度,自然就想到了各种线程同步的原语,不过在这里最简单的就是使用一个Volatile的计数器。消息接收者:
public class Receiver {
private static volatile boolean inited = false;
private static volatile boolean shutdown = false;
private static volatile int cnt = 0;
private MessageHandler messageHandler;
public void start(){
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
while(!shutdown){
init();
recv();
}
}
});
}
/**
* 模拟消息接收
*/
public void recv(){
Message msg = new Message("Msg" + System.currentTimeMillis());
System.out.println(String.format("接收到消息(%d): %s", ++cnt, msg));
messageHandler.handle(msg);
}
public void init(){
if(!inited){
messageHandler = new MessageHandler();
inited = true;
}
}
public static void main(String[] args) {
new Receiver().start();
}
}
消息处理:
public class MessageHandler {
private static final int THREAD_POOL_SIZE = 1;
private ExecutorService service = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
public void handle(Message msg){
try {
service.execute(new Runnable() {
@Override
public void run() {
parseMsg(msg);
}
});
} catch (Throwable e) {
System.out.println("消息处理异常" + e);
}
}
/**
* 比较耗时的消息处理流程
*/
public void parseMsg(Message message){
try {
Thread.sleep(10000);
System.out.println("解析消息:" + message);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
Receiver.limit --;
}
}
}
效果:通过控制消息的个数来阻塞消息的接收过程,就不会导致任务的堆积,系统的内存消耗会比较平缓,限制消息的个数本质就和下面限制任务队列大小一样。
使用同步队列 SynchronousQueue
SynchronousQueue 虽名为队列,但是其实不会缓冲任务的对象,只是作为对象传递的控制点,如果有空闲线程或者没有达到最大线程限制,就会交付给worker线程去执行,否则就会拒绝,我们需要自己实现对应的拒绝策略RejectedExecutionHandler,默认的是抛出异常RejectedExecutionException。消息接收者同上。
消息处理:
public class MessageHandler {
private static final int THREAD_POOL_SIZE = 4;
ThreadPoolExecutor service = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("自定义拒绝策略");
try {
executor.getQueue().put(r);
System.out.println("重新放任务回队列");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
public void handle(Message msg) {
try {
System.out.println(service.getTaskCount());
System.out.println(service.getQueue().size());
System.out.println(service.getCompletedTaskCount());
service.execute(new Runnable() {
@Override
public void run() {
parseMsg(msg);
}
});
} catch (Throwable e) {
System.out.println("消息处理异常" + e);
}
}
/**
* 比较耗时的消息处理流程
*/
public void parseMsg(Message message) {
while (true) {
try {
System.out.println("线程名:" + Thread.currentThread().getName());
System.out.println("解析消息:" + message);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
效果:能够控制消息的接收速度,但是我们需要在rejectedExecution中实现某种阻塞的操作,但是选择在发生拒绝的时候把任务重新放回队列,带来的问题就是这个Task会发生饥饿现象。
使用大小限制的阻塞队列
使用LinkedBlockingQueue作为线程框架底层的任务缓冲区,并且设置大小限制,思想上和上述方案一样,都是有一个阻塞的点,但是通过最后的jvm monitor看到这里的CPU消耗更少,内存使用有所降低,并且波动小(具体原因有待探索)。消息接收者同上。
消息处理:
public class MessageHandler {
private static final int THREAD_POOL_SIZE = 4;
private static final int BLOCK_QUEUE_CAP = 500;
ThreadPoolExecutor service = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(BLOCK_QUEUE_CAP), new SimpleThreadFactory(), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("自定义拒绝策略");
try {
executor.getQueue().put(r);
System.out.println("重新放任务回队列");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
public void handle(Message msg) {
try {
service.execute(new Runnable() {
@Override
public void run() {
parseMsg(msg);
}
});
} catch (Throwable e) {
System.out.println("消息处理异常" + e);
}
}
/**
* 比较耗时的消息处理流程
*/
public void parseMsg(Message message) {
try {
Thread.sleep(5000);
System.out.println("线程名:" + Thread.currentThread().getName());
System.out.println("解析消息:" + message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class SimpleThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("Thread-" + System.currentTimeMillis());
return thread;
}
}
}
总结
多线程是比较容易出问题的地方,特别当对方法不熟悉的时候相关文章推荐
- 每隔一段时间自动执行一次某个方法(使用线程)[C#]
- pthread_once()使用(某个时间在整个程序中仅执行一次,不确定是那个线程)
- 每隔一段时间自动执行一次某个方法(使用线程)[C#]
- pthread_once()使用(某个时间在整个程序中仅执行一次,不确定是那个线程)
- 每隔一段时间自动执行一次某个方法(使用线程)
- GCD的定义及使用详解(同步异步、并发串行、线程间通信、延时执行、只执行一次代码)
- volatile变量在多线程同步时运用的优点: 1、锁一次只允许一个线程访问值,volatile 允许多个线程执行读操作,因此当使用 volatile 保证读代码路径时,要比使用锁执行全部代码路径获得
- zthread学习 实例三 使用Executor器来执行线程
- 使用Anthem.NET框架的一个调试经历
- 在Global中使用线程隔时执行一项任务
- 在Android中使用Handler和Thread线程执行后台操作
- 超级终端:另一个程序正在使用选定的电话服务设备,请在其他程序执行完后再试一次.
- 在该控件上执行的操作正从错误的线程调用。使用 Control.Invoke 或 Control.BeginInvoke 封送到正确的线程才能执行此操作。(以解决)
- 使用 Anthem.NET 框架的一个调试经历
- 多线程 不使用同步工具, 手动协调线程依次执行
- 在Android中使用Handler和Thread线程执行后台操作
- 【插件式框架探索系列】使用多UI线程提升性能
- 使用Anthem.NET框架的一个调试经历
- 四个线程a,b,c,d. 线程a,b对变量i加一. 线程c,d对变量i减去一.四个线程顺序执行, 每个线程每次只执行一次.i的初始值为0, 打印结果0 1 2 1 0 1 2 1 0 1 2...
- 键盘无法正常使用的一次解决经历