您的位置:首页 > 编程语言

分享一段线程队列--生产者和消费者代码

2017-11-10 08:53 495 查看
分享一段线程队列--生产者和消费者代码,如有问题,请不吝赐教

import com.alibaba.fastjson.JSON;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentLinkedQueue;

@Component
@Scope("singleton")
public class SystemLogMqSingleton {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(SystemLogMqSingleton.class);

private final Object notifyer = new Object();
private Thread work;
private static ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();
private volatile boolean cancelled = true;

private SystemLogMqSingleton() throws InterruptedException {
long timeStart = System.currentTimeMillis();
work = new Thread(new Poll());
work.setName("cardbag-MqSingleton");
work.start();
if(logger.isDebugEnabled()) {
logger.debug("##耗时##=======cost time " + (System.currentTimeMillis() - timeStart) + "ms");
}
}

/**
* 生产消息
*/
public void write(Object log) {
queue.offer(log);
}

public void stop(){
cancelled=false;
}

/**
* 消费消息
*/
class Poll implements Runnable {
public void run() {
while (cancelled) {
if (!queue.isEmpty()) {
try {
Object msg = queue.poll();
long startMili = System.currentTimeMillis();// 当前时间对应的毫秒数

//这里可写针对从队列里取出来的消息的处理逻辑

long endMili = System.currentTimeMillis();
if(logger.isDebugEnabled()) {
logger.debug("总耗时为:" + (endMili - startMili) + "毫秒");
}
} catch (Exception ex) {
logger.error("================#记录数据异常#=================" + ex.getMessage() + ex.getStackTrace());
}
} else {
try {
//如何没有消息可以等待2s,节约cpu资源 或 Thread.currentThread().sleep(2000)
synchronized (notifyer) {
notifyer.wait(2000);
}
} catch (InterruptedException e) {
logger.error("##消费异常##======>"+e.getMessage()+e.getStackTrace());
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐