您的位置:首页 > 运维架构 > Tomcat

Tomcat源码分析之:ServletOutputStream的实现

2014-04-17 16:17 501 查看
貌似很久都没有写博客了,tomcat8的代码已经看了很多,主体部分的代码也都看得差不多了,发现在tomcat8中已经完全支持非阻塞的方式接收以及发送数据了。。。。但是比较遗憾的是,以前遗留下来的太多的老代码都不支持这种新的方式来发送数据。。。木有办法。。。

这里来看看Tomcat中是如何实现ServletOutputStream的吧。。。。

在具体的来看它之前,这里先来一张图来描述一下Tomcat的数据发送时候的流动。。。



这张图形已经比较好的展现成了Tomcat中对IO这一块封装的层次关系吧,首先是最上层的

ServletOutputStream对象,它是用户代码可以接触到的对象,它自带了自己的OutputBuffer,这里写数据其实是通过这个这个buffer完成的,数据是写到这个OutputBuffer里面

接下来的一层就是Tomcat内部的Response类型了,每一个ServletResponse对象都对应着一个唯一的Tomcat内部的response类型,当然了,他也有自己的buffer,InternalNioOutputBuffer类型的对象,这里上层ServletOutputStream写数据将会流动到这里。。。

最下层就是与channel关联的部分了,它也有自己的buffer,这里一般就是java类库中的niobuffer,上层的数据将会流动到这里。。。

最后才是将数据通过channel发送出去。。。

嗯,虽然好像层次稍微多了些,而且刚刚开始看代码就觉得稍微有点繁琐。。不过当将这个层次关系理清楚了之后还是蛮简单的。。。

好啦,接下来来看代码了。。。

这里就通过一次write来跟踪整个代码的执行轨迹吧,首先来看看CoyoteOutputStream类型的write方法: //发送一个byte数组的一部分数据
public void write(byte[] b, int off, int len) throws IOException {
boolean nonBlocking = checkNonBlockingWrite(); //判断是否支持非阻塞的发送数据,这里判断的标准其实是用户代码是否加入了WriteListener
ob.write(b, off, len); //将数据写到outputbuffer里面
if (nonBlocking) {
checkRegisterForWrite(); //如果是非阻塞的发送数据的话,需要确保channel有注册写事件
}
}
这里首先要判断是否支持非阻塞的发送数据,这里就不细讲这部分的类容了,,,来看看这里的outputbuffer的write行为吧://写入一个字节数组的一部分
public void write(byte b[], int off, int len) throws IOException {

if (suspended) {
return;
}

writeBytes(b, off, len);

}

private void writeBytes(byte b[], int off, int len)
throws IOException {

if (closed) { //如果已经关闭了,那么直接返回吧
return;
}
//这里其实是写到buffer里,如果数据过大,也有可能调用realWriteBytes方法真正的调用底层写数据
bb.append(b, off, len);
bytesWritten += len;

// if called from within flush(), then immediately flush
// remaining bytes
if (doFlush) {
bb.flushBuffer();
}

}
这里其实就是将数据直接放到当年的bytechunk就好了。。。但是这里如果数据比较多的话,会将数据直接写到下一层,也就是tomcat内置的response。。。来看看bytechunk的append方法吧: //加入一个字节数组的一部分数据
public void append( byte src[], int off, int len )
throws IOException
{
// will grow, up to limit
makeSpace( len ); //确保有这么多空间可以写

// if we don't have limit: makeSpace can grow as it wants
if( limit < 0 ) { //表示没有空间限制
// assert: makeSpace made enough space
System.arraycopy( src, off, buff, end, len ); //将数据复制过来
end+=len; //将end偏移加上len
return;
}

// Optimize on a common case.
// If the buffer is empty and the source is going to fill up all the
// space in buffer, may as well write it directly to the output,
// and avoid an extra copy
//如果一次就填满了,那么还是写出去好了
if ( len == limit && end == start && out != null ) {
out.realWriteBytes( src, off, len ); //因为要写的数据比较大,所以直接写到更下层去
return;
}
// if we have limit and we're below
if( len <= limit - end ) { //表示还有足够的空间可以写数据
// makeSpace will grow the buffer to the limit,
// so we have space
System.arraycopy( src, off, buff, end, len );
end+=len;
return;
}

// need more space than we can afford, need to flush
// buffer

// the buffer is already at ( or bigger than ) limit

// We chunk the data into slices fitting in the buffer limit, although
// if the data is written directly if it doesn't fit

//代码执行到这里,说明空间不够了
int avail=limit-end; //还剩下多大的空间可以写数据
System.arraycopy(src, off, buff, end, avail);
end += avail;

flushBuffer();

int remain = len - avail;

while (remain > (limit - end)) { //不断的尝试写数据到下面去
out.realWriteBytes( src, (off + len) - remain, limit - end );
remain = remain - (limit - end);
}

System.arraycopy(src, (off + len) - remain, buff, end, remain);
end += remain;

}
这里可以看到realWriteBytes方法,它其实就是外面的outputbuffer定义的方法,来看看吧: //这个才是真正的调用底层发送数据,其实又是调用tomcat的response来写数据
public void realWriteBytes(byte buf[], int off, int cnt)
throws IOException {

if (closed) {
return;
}
if (coyoteResponse == null) {
return;
}

// If we really have something to write
if (cnt > 0) {
// real write to the adapter
outputChunk.setBytes(buf, off, cnt); //设置outputchunk
try {
coyoteResponse.doWrite(outputChunk); //通过tomcat的response来写数据,其实是写到httpprocessor的buffer里面去了
} catch (IOException e) {
// An IOException on a write is almost always due to
// the remote client aborting the request. Wrap this
// so that it can be handled better by the error dispatcher.
throw new ClientAbortException(e);
}
}

}
嗯,这里就与上面的层次对应上了吧,其实就是写到tomcat内置的response。。好了,接下来继续。。 //在servlet的outputStream可能会调用这个方法来写数据
public void doWrite(ByteChunk chunk/*byte buffer[], int pos, int count*/)
throws IOException
{
outputBuffer.doWrite(chunk, this); //调用在httpprocessor里面创建的outputbuffer来写数据,这里会将数据写到niochannel的buffer,然后最终发送出去
contentWritten+=chunk.getLength(); //标记已经发送的数据量的大小
}
嗯,这里其实是写到internalNiobuffer里去。。。。继续看吧: public int doWrite(ByteChunk chunk, Response res) throws IOException {

int len = chunk.getLength();
int start = chunk.getStart();
byte[] b = chunk.getBuffer();
addToBB(b, start, len);
byteCount += chunk.getLength();
return chunk.getLength();
}
嗯,没啥意思,继续: private synchronized void addToBB(byte[] buf, int offset, int length)
throws IOException {

if (length == 0) return;

// Try to flush any data in the socket's write buffer first
//首先尝试先将数据发送出去
boolean dataLeft = flushBuffer(isBlocking());

// Keep writing until all the data is written or a non-blocking write
// leaves data in the buffer
//这里只有在缓冲区里面已经没有数据了才继续发送
while (!dataLeft && length > 0) {
//首先将要发送的数据copy到niochanel的发送buffer里面去
int thisTime = transfer(buf,offset,length,socket.getBufHandler().getWriteBuffer());
length = length - thisTime; //计算还剩下多少字节没有写到niochannel的buffer里面,其实这里也就当做将数据转移到了niochannel的buffer就算是写出去了
offset = offset + thisTime; //这里用于调整偏移量
//这里调用writeToSocket方法将niochannel的buffer的里面的数据通过socket写出去
int written = writeToSocket(socket.getBufHandler().getWriteBuffer(),
isBlocking(), true); //如果在tomcat的response里面有writelistener的话,可以异步的写
if (written == 0) { //都没有写出去字节
dataLeft = true;
} else {
dataLeft = flushBuffer(isBlocking()); //flush一下,看一下是否还会有数据剩余
}
}

NioEndpoint.KeyAttachment ka = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
if (ka != null) ka.access();//prevent timeouts for just doing client writes

if (!isBlocking() && length > 0) { //在非阻塞的发送中,如果实在发送不出去,需要保存在额外的buffer里面
// Remaining data must be buffered
addToBuffers(buf, offset, length);
}
}

这里其实主要是调用flushBuffer方法,将数据传给下层的niochannel,而且可以看到对于过多的数据这里还会 做一层缓存。。。。 //这里其实是flush niochannel的buffer
protected boolean flushBuffer(boolean block) throws IOException {

//prevent timeout for async,
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if (key != null) {
NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
attach.access();
}

boolean dataLeft = hasMoreDataToFlush(); //其实这里是判断niochannel的buffer里面是否还有数据要写

//write to the socket, if there is anything to write
if (dataLeft) { //如果niochannel的buffer里面还有数据发送,那么继续写
writeToSocket(socket.getBufHandler().getWriteBuffer(),block, !flipped);
}

dataLeft = hasMoreDataToFlush();

//这里如果niochannel的buffer里面的数据已经发送完了,那么将将以前缓冲的数据再发送出去
if (!dataLeft && bufferedWrites.size() > 0) {
Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator(); //遍历待发送的数据
while (!hasMoreDataToFlush() && bufIter.hasNext()) {
ByteBufferHolder buffer = bufIter.next();
buffer.flip();
while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
transfer(buffer.getBuf(), socket.getBufHandler().getWriteBuffer());
if (buffer.getBuf().remaining() == 0) { //如果当前buffer里面的所有数据都已经转移到了niochannel的buffer里面,那么可以将这个buffer移除了
bufIter.remove();
}
writeToSocket(socket.getBufHandler().getWriteBuffer(),block, true);
//here we must break if we didn't finish the write
}
}
}

return hasMoreDataToFlush();
}
嗯,这里其实主要是调用writeToSocket方法。。。来看看吧: //这里其实调用socket来写数据
private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException {
if ( flip ) {
bytebuffer.flip();
flipped = true;
}

int written = 0;
NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
if ( att == null ) throw new IOException("Key must be cancelled");
long writeTimeout = att.getWriteTimeout();
Selector selector = null;
try {
selector = pool.get();
} catch ( IOException x ) {
//ignore
}
try {
written = pool.write(bytebuffer, socket, selector, writeTimeout, block);
//make sure we are flushed
do {
//对于niochanel,这个flush方法其实是没用的
if (socket.flush(true,selector,writeTimeout)) break;
}while ( true );
}finally {
if ( selector != null ) {
pool.put(selector);
}
}
if ( block || bytebuffer.remaining()==0) {
//blocking writes must empty the buffer
//and if remaining==0 then we did empty it
bytebuffer.clear();
flipped = false;
}
// If there is data left in the buffer the socket will be registered for
// write further up the stack. This is to ensure the socket is only
// registered for write once as both container and user code can trigger
// write registration.
return written;
}

这里因为涉及到了一些阻塞的或者非阻塞的发送数据。。所以可能会用到selector。。。 public int write(ByteBuffer buf, NioChannel socket, Selector selector,
long writeTimeout, boolean block) throws IOException {
if ( SHARED && block ) { //对于写数据,一般都是这里
return blockingSelector.write(buf,socket,writeTimeout);
}
//但是如果有outputstream的listener的话,可以采用非阻塞的方式来发送大量的数据
SelectionKey key = null;
int written = 0;
boolean timedout = false;
int keycount = 1; //assume we can write //假装刚开始是可以写的
long time = System.currentTimeMillis(); //start the timeout timer
try {
while ( (!timedout) && buf.hasRemaining() ) {
int cnt = 0;
if ( keycount > 0 ) { //only write if we were registered for a write
cnt = socket.write(buf); //write the data
if (cnt == -1) throw new EOFException(); //出错了

written += cnt;
if (cnt > 0) {
time = System.currentTimeMillis(); //reset our timeout timer
continue; //we successfully wrote, try again without a selector
}
if (cnt==0 && (!block)) { //这里对于非阻塞的写,就直接返回了
break; //don't block
}
}
if ( selector != null ) {
//register OP_WRITE to the selector
if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_WRITE);
else key.interestOps(SelectionKey.OP_WRITE);
if (writeTimeout==0) {
timedout = buf.hasRemaining();
} else if (writeTimeout<0) {
keycount = selector.select();
} else {
keycount = selector.select(writeTimeout);
}
}
if (writeTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=writeTimeout;
}//while
if ( timedout ) throw new SocketTimeoutException();
} finally {
if (key != null) {
key.cancel();
if (selector != null) selector.selectNow();//removes the key from this selector
}
}
return written;
}
这里如果就直接调用niochannel来发送数据了。。不过其实这里还会涉及到将数据转移到niochannel的buffer。。然后才发送数据。。。

到这里整个数据的流动层次对照着上面的图形应该就算是 比较明白了吧。。。。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: