您的位置:首页 > 其它

侧边栏滚动条

2016-01-25 22:59 302 查看
QueuedThreadPool



doStart()方法负责初始化并启动_minThreads个空线程,等待任务的到来。其中,

_theads用来存放当前池中所有线程;

_idle中存放当前空闲线程;

_jobs为存放任务的队列,先来先服务。

protected void doStart() throws Exception
{
if (_maxThreads<_minThreads || _minThreads<=0)
throw new IllegalArgumentException("!0<minThreads<maxThreads");

_threads=new HashSet();
_idle=new ArrayList();
_jobs=new Runnable[_maxThreads];

for (int i=0;i<_minThreads;i++)
{
newThread();
}
}

protected void newThread()
{
synchronized (_threadsLock)
{
if (_threads.size()<_maxThreads)
{
PoolThread thread =new PoolThread();
_threads.add(thread);
thread.setName(thread.hashCode()+"@"+_name+"-"+_id++);
thread.start();
}
else if (!_warned)
{
_warned=true;
Log.debug("Max threads for {}",this);
}
}
}

 

初始化完毕,可调用dispatch(Runnable job)方法给线程池分配任务。

如果_idle.size() > 0,即有空闲线程,直接把job分给最后一个空闲线程,并从_idle中删除之。至于为什么是给最后一个?没有为什么,只要你愿意,随便哪一个都行。

如果_idle.size() = 0,即没有空闲线程,那就只好到_jobs中排队啦。

_jobs的示意图如下图,这是一初始大小为5的队列。



_nextJob指向_jobs队列中当前可取任务地址,每取一次任务,_nextJob自增1;而_nextJobSlot指向当前可以放置任务的地址,也就是下一个任务进队列,要放在哪一个位置,每进一个任务,_nextJobSlot自增1。它们初始位置当然都是0。

一般来说,当_nextJobSlot总是处在_nextJob前面的,即_nextJobSlot >_nextJob。
当_nextJob == _nextJobSlot时,就说明_jobs队列排满了,没位置了,怎么办?增加队列长度呗。QueuedThreadPool中,每调整一次,增加_maxThreads个空位置。 调整的过程中还涉及到数据的复制和两个指针位置的调整。

一旦发现,队列中的任务数_queued大于_spawnOrShrinkAt这个阀值,说明任务太多,当前线程数偏少,我们需要增加更多的工作线程来执行任务,最多_maxThreads个。

public boolean dispatch(Runnable job)
{
if (!isRunning() || job==null)
return false;

PoolThread thread=null;
boolean spawn=false;

synchronized(_lock)
{
// Look for an idle thread
int idle=_idle.size();
if (idle>0) //有空闲线程,直接分配
thread=(PoolThread)_idle.remove(idle-1);
else
{
// queue the job
_queued++;
if (_queued>_maxQueued)
_maxQueued=_queued;
_jobs[_nextJobSlot++]=job;
if (_nextJobSlot==_jobs.length)
_nextJobSlot=0;
if (_nextJobSlot==_nextJob) //环形队列_jobs已满,需要调整大小
{
//在原有基础上增加_maxThreads
Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];
int split=_jobs.length-_nextJob;
if (split>0)
System.arraycopy(_jobs,_nextJob,jobs,0,split);
if (_nextJob!=0)
System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);

_jobs=jobs;
_nextJob=0;
_nextJobSlot=_queued;
}
// 队列任务数超过阀值,在可能的情况下,增加工作线程数目
spawn=_queued>_spawnOrShrinkAt;
}
}

if (thread!=null)
{
thread.dispatch(job);
}
else if (spawn)
{
newThread();
}
return true;
}

 

PoolThread

 

上面说了,一旦没有空闲线程,任务就仅仅是加入到_jobs队列中,那么负责从队列中取任务就是PoolThread的事了。

PoolThread在自己的任务完成之后会自觉的到_jobs环形队列中领取任务。有任务,执行之;没有,把自己加入_idle中,等待_maxIdleTimeMs的时间,或QueuedThreadPool的唤醒(调用PoolThread的dispatch(Runnable job)方法)。

当线程处于空闲状态,并发现当前空闲线程数多于_spawnOrShrinkAt阀值时,它就得考虑要不要自我了断了。条件就是距离上次缩减(shrink)空闲线程时间超过_maxIdleTimeMs。

 

public void run()
{
boolean idle=false;
Runnable job=null;
try
{
while (isRunning())
{
// Run any job that we have.
if (job!=null)
{
final Runnable todo=job;
job=null;
idle=false;
todo.run();
}

synchronized(_lock)
{
// is there a queued job?
if (_queued>0)
{
_queued--;
job=_jobs[_nextJob];
_jobs[_nextJob++]=null;
if (_nextJob==_jobs.length)
_nextJob=0;
continue;
}

// Should we shrink?
final int threads=_threads.size();

// 我不知道为什么_threads.size()会有可能大于_maxThreads
// 知道的兄弟请告诉我!!
if (threads>_minThreads &&
(threads>_maxThreads ||
_idle.size()>_spawnOrShrinkAt))
{
long now = System.currentTimeMillis();
if ((now-_lastShrink)>getMaxIdleTimeMs())
{
_lastShrink=now;
_idle.remove(this);
return;
}
}

if (!idle)
{
// Add ourselves to the idle set.
_idle.add(this);
idle=true;
}
}

// We are idle
// wait for a dispatched job
synchronized (this)
{
if (_job==null)
this.wait(getMaxIdleTimeMs());
job=_job;
_job=null;
}
}
}
catch (InterruptedException e)
{
Log.ignore(e);
}
finally
{
synchronized (_lock)
{
_idle.remove(this);
}
synchronized (_threadsLock)
{
_threads.remove(this);
}
synchronized (this)
{
job=_job;
}

// we died with a job! reschedule it
if (job!=null)
{
QueuedThreadPool.this.dispatch(job);
}
}
}

 

 

参考资料:

/article/3844260.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: