您的位置:首页 > 其它

实现一个可以在队列满时,将溢出的部分缓存入文件,待空闲时取出的队列

2016-07-10 14:17 519 查看
其中exeQueue是运行时缓存,当exeQueue满时先存入cacheQueue队列,待cacheQueue满时,将cacheQueue整个写入文件。

从队列中取值时,则是先从exeQueue里面取,再从文件里面取,最后是cacheQueue。

上代码:

import java.util.Collection;

import java.util.concurrent.TimeUnit;

public interface ICacheQueue<E> {

    /**

     * 将指定元素插入到此队列的尾部,在成功时返回 true,如果插入失败,则返回 false。

     * @param e 元素

     * @return boolean

     */

    boolean offer(E e);

    

    /**

     * 将指定元素插入到此队列的尾部,如果队列为空,则等待一段时间

     * @param e 元素

     * @param timeout 超时时长

     * @param unit 时长单位

     * @return boolean

     */

    boolean offer(E e, long timeout, TimeUnit unit);

    

    /**

     * 从队列取出并移除第一个元素,如果队列为空,则返回null

     * @return 元素

     */

    E poll();

    

    /**

     * 从队列取出并移除第一个元素,如果队列为空,则等待一段时间再取

     * @param timeout 超时时长

     * @param unit 时长单位

     * @return 元素

     */

    E poll(long timeout, TimeUnit unit);

    

    /**

     * 一次性取出队列中的所有元素

     * @param c 返回的容器

     * @return 取出的元素数量

     */

    int drainTo(Collection<? super E> c);

    

    /**

     * 一次性取出队列中的指定数量的元素

     * @param c 返回的容器

     * @param maxElements 指定的数量

     * @return 取出的元素数量

     */

    int drainTo(Collection<? super E> c, int maxElements);

    

}

import java.io.File;

import java.io.FileInputStream;

import java.io.FileOutputStream;

import java.io.ObjectInputStream;

import java.io.ObjectOutputStream;

import java.io.Serializable;

import java.util.Arrays;

import java.util.Collection;

import java.util.concurrent.LinkedBlockingQueue;

import java.util.concurrent.TimeUnit;

public class CacheQueue<E extends Serializable> implements ICacheQueue<E>, Serializable {

    /** 默认容量 */

    public static final int DEFAULTCAPACITY = 1000;

    

    /**

     * 持久化序列

     */

    private static final long serialVersionUID = -4556382039886130859L;

    private static final String BASE_FILE_NAME = "D:" + File.separator + "cache" + File.separator;

    

    /** 容量 */

    private int capacity;

    

    /** 运行队列,此队列满时往缓存队列存数据 */

    private LinkedBlockingQueue<E> exeQueue;

    

    /** 缓存队列,此队列满时,批量存入文件 */

    private LinkedBlockingQueue<E> cacheQueue;

    

    /** 运行队列溢出标志 */

    private boolean exeQueueOverflow = false;

    

    public CacheQueue(int capacity) {

        this.capacity = capacity;

        exeQueue = new LinkedBlockingQueue<E>(capacity);

        cacheQueue = new LinkedBlockingQueue<E>(capacity);

    }

    

    public CacheQueue() {

        this.capacity = DEFAULTCAPACITY;

        exeQueue = new LinkedBlockingQueue<E>(capacity);

        cacheQueue = new LinkedBlockingQueue<E>(capacity);

    }

    

    @Override

    public boolean offer(E element) {

        if (element == null)

        {

            return true;

        }

        if (!exeQueueOverflow)

        {

            if (exeQueue.offer(element))

            {

                return true;

            } else

            {

                exeQueueOverflow = true;

            }

        }

        if (cacheQueue.offer(element))

        {

            return true;

        } else

        {

            writeToFile(cacheQueue);

            cacheQueue.clear();

            return cacheQueue.offer(element);

        }

    }

    @Override

    public boolean offer(E element, long timeout, TimeUnit unit) {

        if (offer(element))

        {

            return true;

        } else

        {

            try {

                Thread.sleep(unit.toMillis(timeout));

                return offer(element);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

        }

        return false;

    }

    @Override

    public E poll() {

        flashCache();

        return exeQueue.poll();

    }

    @Override

    public E poll(long timeout, TimeUnit unit) {

        E element = poll();

        if (element == null)

        {

            try {

                Thread.sleep(unit.toMillis(timeout));

                return poll();

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

        }

        return element;

    }

    @Override

    public int drainTo(Collection<? super E> c) {

        if (!exeQueueOverflow)

        {

            return exeQueue.drainTo(c);

        } else

        {

            exeQueue.drainTo(c);

            flashCache();

            return exeQueue.drainTo(c, capacity - c.size());

        }

    }

    @Override

    public int drainTo(Collection<? super E> c, int maxElements) {

        if (!exeQueueOverflow)

        {

            if (maxElements < capacity)

            {

                return exeQueue.drainTo(c, maxElements);

            }

            return exeQueue.drainTo(c);

        } else

        {

            do

            {

                exeQueue.drainTo(c);

                flashCache();

            } while (maxElements >= c.size() + capacity);

            

            return exeQueue.drainTo(c, maxElements - c.size());

        }

    }

    

    private synchronized boolean writeToFile(LinkedBlockingQueue<E> queue)

    {

        FileOutputStream fos = null;

        ObjectOutputStream oos = null;

        try

        {

            File file = new File(BASE_FILE_NAME + System.currentTimeMillis());

            if(!file.exists())

            {

                file.createNewFile();

            }

            fos = new FileOutputStream(file, true);

            oos = new ObjectOutputStream(fos);

            

            oos.writeObject(queue);

            oos.flush();

            

        } catch (Exception e)

        {

            e.printStackTrace();

            return false;

        } finally

        {

            try

            {

                if(oos != null)

                {

                    oos.close();

                }

                if (fos != null)

                {

                    fos.close();

                }

            } catch (Exception e)

            {

                e.printStackTrace();

                return false;

            }

            

        }

        return true;

    }

    

    @SuppressWarnings("unchecked")

    private synchronized void readObjectFormFile()

    {

        FileInputStream fis = null;

        ObjectInputStream ois = null;

        File file = null;

        try

        {

            String fileName = getFile();

            if (fileName == null)

            {

                return ;

            }

            file = new File(fileName);

            fis = new FileInputStream(file);

            ois = new ObjectInputStream(fis);

            

            exeQueue = (LinkedBlockingQueue<E>)ois.readObject();

        } catch (Exception e)

        {

            e.printStackTrace();

        } finally

        {

            try

            {

                if (ois != null)

                {

                    ois.close();

                }

                if (fis != null)

                {

                    fis.close();

                }

            } catch (Exception e)

            {

                e.printStackTrace();

            } finally

            {

                if (file != null)

                {

                    //必须关闭所有的文件流,才能删除文件

                    file.delete();

                }

            }

        }

    }

    

    private void flashCache()

    {

        if (exeQueue.size() == 0)

        {

            String fileName = getFile();

            //缓存文件不为空,读取文件到运行队列中

            if (fileName != null)

            {

                readObjectFormFile();

            } else

            {

                //缓存文件为空,读取缓存队列到运行队列中

                exeQueue.addAll(cacheQueue);

                cacheQueue.clear();

                exeQueueOverflow = false;

            }

        }

    }

    

    /**

     * 获取最先保存的文件名

     */

    private String getFile()

    {

        File file = new File(BASE_FILE_NAME);

        String[] files = file.list();

        if (files.length > 0)

        {

            Arrays.sort(files);

            System.out.println(files[0]);

            return BASE_FILE_NAME + files[0];

        }

        return null;

    }

    @Override

    public String toString() {

        return "CacheQueue [exeQueue=" + exeQueue + ", exeQueueOverflow="

                + exeQueueOverflow + "]";

    }

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: