您的位置:首页 > 其它

异步任务执行之-队列

2015-11-09 10:32 295 查看
public class WorkingQueue<T extends BaseRequest> {

private static Logger log = Logger.getLogger(WorkingQueue.class);
private BlockingQueue<T> queue;
private final ThreadFactory threadFactory;
private Thread thread;
private AtomicBoolean started = new AtomicBoolean(false);
private volatile boolean shouldContinue = false;

public WorkingQueue() {
this(null);
}

public WorkingQueue(final ThreadFactory tf) {
this.queue = new LinkedBlockingQueue<T>();
this.threadFactory = tf == null ? Executors.defaultThreadFactory() : tf;
this.thread = null;
}

public void start() {
if (started.getAndSet(true)) {
// I prefer if we throw a runtime IllegalStateException here,
// but I want to maintain semantic backward compatibility.
// So it is returning immediately here
return;
}
shouldContinue = true;
thread = threadFactory.newThread(new Runnable() {
@SuppressWarnings("unchecked")
public void run() {
while (shouldContinue) {
try {
T req = queue.take();
req.getLazyExecutor().lazyExecute(req);
} catch (Exception e) {
log.error("Unexpected message caught... Shouldn't be here", e);
}
}
}
});
thread.start();
}

public void stop() {
started.set(false);
shouldContinue = false;
thread.interrupt();
}

public void execute(T request) {
if (!started.get()) {
start();
}
queue.add(request);
}
}
<pre name="code" class="java">public class WorkingService<T extends BaseRequest> {private WorkingQueue<T> workingQueue;public WorkingService() {workingQueue = new WorkingQueue<T>();}public void start() {workingQueue.start();}public void execute(final T t, LazyExecutable<T> callback ) {t.setLazyExecutor(callback);workingQueue.execute(t);}public void stop() {if (workingQueue != null) {workingQueue.stop();}}}
public interface LazyExecutable<T extends BaseRequest> {public void lazyExecute(T request);}
public class BaseRequest {@SuppressWarnings("rawtypes")private LazyExecutable executor;@SuppressWarnings("rawtypes")public void setLazyExecutor(LazyExecutable callback) {this.executor = callback;}@SuppressWarnings("rawtypes")public LazyExecutable getLazyExecutor() {return executor;}}
调用的时候:
<pre name="code" class="java">public class Client {public static void main(String[] args) {WorkingService<WriteFileRequest> service = new WorkingService<WriteFileRequest>();service.start();WriteFileRequest request = new WriteFileRequest();request.setFilename("file1.txt");//这里不会阻塞service.execute(request, new LazyExecutable<WriteFileRequest>(){@Overridepublic void lazyExecute(WriteFileRequest request) {//这里可以用线程池并发执行,也可以单线程的执行System.out.println("wite file:"+request.getFilename());}});System.out.println("over");}public static class WriteFileRequest extends BaseRequest{private String filename;private byte[] data;public String getFilename() {return filename;}public void setFilename(String filename) {this.filename = filename;}public byte[] getData() {return data;}public void setData(byte[] data) {this.data = data;}}}

                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: