您的位置:首页 > 理论基础 > 计算机网络

Nutch学习笔记9---fetch优化 protocol-http VS httpclient

2014-07-07 00:00 423 查看
最近通过查看日志,发现nutch的fetch和parse阶段最耗时间。
然后查看代码,发现每次取网页都会新建一个socket,然后发送请求,读取响应,再关闭连接。
如果一个请求就对应着一个socket.这也太恐怖了吧。

我们都知道切换socket的HTTP层的长连接是非常有效的!!!

如果你用的http网页远程提取器插件是protcol-http的话,那么
Fetcher里的
ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.datum);
就会调用
org.apache.nutch.protocol.http.api.HttpBase类的
getProtocolOutput方法

而这个方法里面有什么呢?

public ProtocolOutput getProtocolOutput(Text url, CrawlDatum datum) {

String urlString = url.toString();
try {
URL u = new URL(urlString);
Response response = getResponse(u, datum, false); // make a request

int code = response.getCode();
byte[] content = response.getContent();
Content c = new Content(u.toString(), u.toString(),
(content == null ? EMPTY_CONTENT : content),
response.getHeader("Content-Type"),
response.getHeaders(), this.conf);
。。。
其它代码省略


我们的重点在于:

Response response = getResponse(u, datum, false); // make a request

方法getResponse实际上调用了org.apache.nutch.protocol.http.Http类的下面的方法

protected Response getResponse(URL url, CrawlDatum datum, boolean redirect)
throws ProtocolException, IOException {
return new HttpResponse(this, url, datum);
}

这里是返回了一个对象new HttpResponse(this,url,datum).

那就去看这个对象是怎么产生的。

~~~~~~~~

代码这里就不贴了,通过查看代码发现有这么些语句:

try {
socket = new Socket();                    // create the socket
......
finally {
if (socket != null)
socket.close();
}

尼玛,这每次都重新建立一个 socket再关闭,会死人的!

所以果断切换protocol-http到protocol-httpclient.

满以为这次会OK.没想到看了日志发现经常出现connection timeout.

开源软件到处都是坑啊!!!有木有!

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

那我们该怎么办,我的思路是仍然保持protocol-http模块作为网页获取模块。

这里的好处是自己能够接触到socket的底层,有很大的控制权。

然后自己添加代码来满足连接池。

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

直接贴代码:【我后来更新了代码,但是未贴到博客,在测试环境中测试通过,连接池有效!以下代码只做参考供提取思想用,不能直接用在生产环境,需要者可邮件索取最新代码 837500869 】

1 HttpResponse.java

/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nutch.protocol.http;

// JDK imports
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PushbackInputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;

import java.util.logging.Level;

// Nutch imports
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.metadata.Metadata;
import org.apache.nutch.metadata.SpellCheckedMetadata;
import org.apache.nutch.net.protocols.HttpDateFormat;
import org.apache.nutch.net.protocols.Response;
import org.apache.nutch.protocol.ProtocolException;
import org.apache.nutch.protocol.http.api.HttpBase;
import org.apache.nutch.protocol.http.api.HttpException;

//Commons Logging imports
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** An HTTP response. */
public class HttpResponse implements Response {

public static final Logger LOG = LoggerFactory
.getLogger(HttpResponse.class);
private HttpBase http;
private URL url;
private String orig;
private String base;
private byte[] content;
private int code;
private Metadata headers = new SpellCheckedMetadata();

public HttpResponse(HttpBase http, URL url, CrawlDatum datum)
throws ProtocolException, IOException {
LOG.info("new HttpResponse object now...---------------------------");
this.http = http;
this.url = url;
this.orig = url.toString();
this.base = url.toString();

if (!"http".equals(url.getProtocol()))
throw new HttpException("Not an HTTP url:" + url);

if (Http.LOG.isTraceEnabled()) {
Http.LOG.trace("fetching " + url);
}

String path = "".equals(url.getFile()) ? "/" : url.getFile();

// some servers will redirect a request with a host line like
// "Host: <hostname>:80" to "http://<hpstname>/<orig_path>"- they
// don't want the :80...

String host = url.getHost();
int port;
String portString;
if (url.getPort() == -1) {
port = 80;
portString = "";
} else {
port = url.getPort();
portString = ":" + port;
}
Socket socket = null;
LOG.info("now ,the socket is null...");
try {
LOG.info("try to get a socket from pool......");
socket = HttpPool.getSocket(host);
if (null != socket) {
LOG.info("get a socket ,let us check the validity...");
if (!socket.isConnected() || socket.isInputShutdown() || socket.isOutputShutdown() ) {
LOG.info("It is not connected......");
((HttpSocket) socket).release();
socket = null;
} else {
// yes, ok!!!
LOG.info("succeed to get a socket from pool...");
}
} else {
// no idle socket...
LOG.info("sorry,no Idle socket...");
}

if (null == socket) {
LOG.info("fail to get socket from pool,so create it now...");
socket = new HttpSocket(host); // create the socket
//socket = new Socket();
socket.setSoTimeout(http.getTimeout());
socket.setKeepAlive(true);
socket.setReuseAddress(true);
socket.setTcpNoDelay(true);

// connect
String sockHost = http.useProxy() ? http.getProxyHost() : host;
int sockPort = http.useProxy() ? http.getProxyPort() : port;
LOG.info("sockHost:sockPort --- " + sockHost + " : " + sockPort);
InetSocketAddress sockAddr = new InetSocketAddress(sockHost,
sockPort);
socket.connect(sockAddr, http.getTimeout());
LOG.info("create a socket succeed!!!");
}

// make request
LOG.info("get the output stream handle...");
OutputStream req = socket.getOutputStream();

StringBuffer reqStr = new StringBuffer("GET ");
if (http.useProxy()) {
reqStr.append(url.getProtocol() + "://" + host + portString
+ path);
} else {
reqStr.append(path);
}

reqStr.append(" HTTP/1.1\r\n");

reqStr.append("Host: ");
reqStr.append(host);
reqStr.append(portString);
reqStr.append("\r\n");

reqStr.append("Accept-Encoding: x-gzip, gzip, deflate\r\n");

String userAgent = http.getUserAgent();
if ((userAgent == null) || (userAgent.length() == 0)) {
if (Http.LOG.isErrorEnabled()) {
Http.LOG.error("User-agent is not set!");
}
} else {
userAgent = "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.89 Safari/537.1";
reqStr.append("User-Agent: ");
reqStr.append(userAgent);
reqStr.append("\r\n");
}

reqStr.append("Accept-Language: ");
reqStr.append(this.http.getAcceptLanguage());
reqStr.append("\r\n");

reqStr.append("Accept: ");
reqStr.append(this.http.getAccept());
reqStr.append("\r\n");

reqStr.append("Connection: ");
reqStr.append("keep-alive");
reqStr.append("\r\n");

if (datum.getModifiedTime() > 0) {
reqStr.append("If-Modified-Since: "
+ HttpDateFormat.toString(datum.getModifiedTime()));
reqStr.append("\r\n");
}
reqStr.append("\r\n");
LOG.info("sent as follows ");
LOG.info("" + reqStr);
byte[] reqBytes = reqStr.toString().getBytes();

req.write(reqBytes);
req.flush();
LOG.info("try to read Response ...");
LOG.info("hello,baby...");
PushbackInputStream in = // process response
new PushbackInputStream(new BufferedInputStream(
socket.getInputStream(), Http.BUFFER_SIZE),
Http.BUFFER_SIZE);

StringBuffer line = new StringBuffer();

boolean haveSeenNonContinueStatus = false;
while (!haveSeenNonContinueStatus) {
// parse status code line
this.code = parseStatusLine(in, line);
// parse headers
parseHeaders(in, line);
haveSeenNonContinueStatus = code != 100; // 100 is "Continue"
}

String transferEncodingString = headers.get("Transfer-Encoding");
if (null != transferEncodingString
&& transferEncodingString.toLowerCase().equals("chunked")) {
// read Chunked Content..
readChunkedContent(in, new StringBuffer());
LOG.info("read all chunked content succeed");
} else {
readPlainContent(in);
}

String contentEncoding = getHeader(Response.CONTENT_ENCODING);

if ("gzip".equals(contentEncoding)
|| "x-gzip".equals(contentEncoding)) {
content = http.processGzipEncoded(content, url);
} else if ("deflate".equals(contentEncoding)) {
content = http.processDeflateEncoded(content, url);
} else {
if (Http.LOG.isTraceEnabled()) {
Http.LOG.trace("fetched " + content.length + " bytes from "
+ url);
}
}
LOG.info("content length---" + content.length);

} catch (Exception e) {
LOG.info(e.toString());

} finally {
if (socket != null)
socket.close();
}

}

/*
* ------------------------- * <implementation:Response> *
* -------------------------
*/

public URL getUrl() {
return url;
}

public int getCode() {
return code;
}

public String getHeader(String name) {
return headers.get(name);
}

public Metadata getHeaders() {
return headers;
}

public byte[] getContent() {
return content;
}

/*
* ------------------------- * <implementation:Response> *
* -------------------------
*/
/*
* private void readContentByLength(InputStream in, int contentLen,
* ByteArrayOutputStream out) { byte[] bytes = new byte[Http.BUFFER_SIZE];
* int leftTotal = contentLen; int readCurrent; int readReal; while
* (leftTotal > 0) { readCurrent = leftTotal > Http.BUFFER_SIZE ?
* Http.BUFFER_SIZE : leftTotal; readReal = in.read(bytes, 0, readCurrent);
* if (-1 == readReal) break; // re-calculate... leftTotal -= readReal;
* out.write(bytes, 0, readReal); } return; }
*
* private int getContentLength(StringBuffer strBuffer) { int len = 0; int
* index = 0; for (; index < strBuffer.length(); index++) { char c =
* strBuffer.charAt(index); if (c >= '0' && c <= '9') { len = len * 16 + (c
* - '0'); } else if (c >= 'a' && c <= 'z') { len = len * 16 + (c - 'a'); }
* else if (c >= 'A' && c <= 'Z') { len = len * 16 + (c - 'A'); } else {
* break; } } return len; }
*
* private void readChunkedContent(InputStream in) throws HttpException,
* IOException { LOG.info("enter readChunkedContent...");
* ByteArrayOutputStream out = new ByteArrayOutputStream(Http.BUFFER_SIZE);
* byte[] bytes = new byte[Http.BUFFER_SIZE]; byte b[] = new byte[1];
* StringBuffer strBuffer = null; int contentLength = -1; while (true) {
* strBuffer = new StringBuffer(); in.read(b, 0, 1); strBuffer.append(b[0]);
* if (b[0] == '\n' && strBuffer.length() - 2 >= 0 &&
* strBuffer.charAt(strBuffer.length() - 2) == '\r') { contentLength =
* getContentLength(strBuffer); } if (-1 == contentLength) { // do nothing,
* continue to read next byte...
*
* } else if (0 == contentLength) { // consume next 2 bytes,then exit...
* in.read(b, 0, 1); // \r in.read(b, 0, 1); // \n break; } else { // should
* read some bytes... readContentByLength(in, contentLength, out);
* in.read(b, 0, 1); // \r in.read(b, 0, 1); // \n }
*
* // continue to do sth... }
*
* }
*/

private void readPlainContent(InputStream in) throws HttpException,
IOException {
LOG.info("enter readPlainContent...");
int contentLength = Integer.MAX_VALUE; // get content length
String contentLengthString = headers.get(Response.CONTENT_LENGTH);
if (contentLengthString != null) {
contentLengthString = contentLengthString.trim();
try {
if (!contentLengthString.isEmpty())
contentLength = Integer.parseInt(contentLengthString);
} catch (NumberFormatException e) {
throw new HttpException("bad content length: "
+ contentLengthString);
}
}

if (http.getMaxContent() >= 0 && contentLength > http.getMaxContent()) // limit
// download
// size
contentLength = http.getMaxContent();

ByteArrayOutputStream out = new ByteArrayOutputStream(Http.BUFFER_SIZE);
byte[] bytes = new byte[Http.BUFFER_SIZE];
int length = 0; // read content
LOG.info("the length we will read is " + contentLength);
for (int i = in.read(bytes); i != -1 && length + i <= contentLength; i = in
.read(bytes)) {

out.write(bytes, 0, i);
length += i;
}
content = out.toByteArray();
}

private void readChunkedContent(PushbackInputStream in, StringBuffer line)
throws HttpException, IOException {
boolean doneChunks = false;
int contentBytesRead = 0;
byte[] bytes = new byte[Http.BUFFER_SIZE];
ByteArrayOutputStream out = new ByteArrayOutputStream(Http.BUFFER_SIZE);

while (!doneChunks) {
if (Http.LOG.isTraceEnabled()) {
Http.LOG.trace("Http: starting chunk");
}

readLine(in, line, false);

String chunkLenStr;
// if (LOG.isTraceEnabled()) { LOG.trace("chunk-header: '" + line +
// "'"); }

int pos = line.indexOf(";");
if (pos < 0) {
chunkLenStr = line.toString();
} else {
chunkLenStr = line.substring(0, pos);
// if (LOG.isTraceEnabled()) { LOG.trace("got chunk-ext: " +
// line.substring(pos+1)); }
}
chunkLenStr = chunkLenStr.trim();
int chunkLen;
try {
chunkLen = Integer.parseInt(chunkLenStr, 16);
} catch (NumberFormatException e) {
throw new HttpException("bad chunk length: " + line.toString());
}

if (chunkLen == 0) {
doneChunks = true;
break;
}

if ((contentBytesRead + chunkLen) > http.getMaxContent())
chunkLen = http.getMaxContent() - contentBytesRead;

// read one chunk
int chunkBytesRead = 0;
while (chunkBytesRead < chunkLen) {

int toRead = (chunkLen - chunkBytesRead) < Http.BUFFER_SIZE ? (chunkLen - chunkBytesRead)
: Http.BUFFER_SIZE;
int len = in.read(bytes, 0, toRead);

if (len == -1)
throw new HttpException("chunk eof after "
+ contentBytesRead + " bytes in successful chunks"
+ " and " + chunkBytesRead + " in current chunk");

// DANGER!!! Will printed GZIPed stuff right to your
// terminal!
// if (LOG.isTraceEnabled()) { LOG.trace("read: " + new
// String(bytes, 0, len)); }

out.write(bytes, 0, len);
chunkBytesRead += len;
}

readLine(in, line, false);

}

if (!doneChunks) {
if (contentBytesRead != http.getMaxContent())
throw new HttpException(
"chunk eof: !doneChunk && didn't max out");
return;
}

content = out.toByteArray();
parseHeaders(in, line);

}

private int parseStatusLine(PushbackInputStream in, StringBuffer line)
throws IOException, HttpException {
readLine(in, line, false);
LOG.info("leave readLine...");
int codeStart = line.indexOf(" ");
int codeEnd = line.indexOf(" ", codeStart + 1);

// handle lines with no plaintext result code, ie:
// "HTTP/1.1 200" vs "HTTP/1.1 200 OK"
if (codeEnd == -1)
codeEnd = line.length();

int code;
try {
code = Integer.parseInt(line.substring(codeStart + 1, codeEnd));
} catch (NumberFormatException e) {
throw new HttpException("bad status line '" + line + "': "
+ e.getMessage(), e);
}
LOG.info("code =" + code);
return code;
}

private void processHeaderLine(StringBuffer line) throws IOException,
HttpException {

int colonIndex = line.indexOf(":"); // key is up to colon
if (colonIndex == -1) {
int i;
for (i = 0; i < line.length(); i++)
if (!Character.isWhitespace(line.charAt(i)))
break;
if (i == line.length())
return;
throw new HttpException("No colon in header:" + line);
}
String key = line.substring(0, colonIndex);

int valueStart = colonIndex + 1; // skip whitespace
while (valueStart < line.length()) {
int c = line.charAt(valueStart);
if (c != ' ' && c != '\t')
break;
valueStart++;
}
String value = line.substring(valueStart);
headers.set(key, value);
}

// Adds headers to our headers Metadata
private void parseHeaders(PushbackInputStream in, StringBuffer line)
throws IOException, HttpException {

while (readLine(in, line, true) != 0) {

// handle HTTP responses with missing blank line after headers
int pos;
if (((pos = line.indexOf("<!DOCTYPE")) != -1)
|| ((pos = line.indexOf("<HTML")) != -1)
|| ((pos = line.indexOf("<html")) != -1)) {

in.unread(line.substring(pos).getBytes("UTF-8"));
line.setLength(pos);

try {
// TODO: (CM) We don't know the header names here
// since we're just handling them generically. It would
// be nice to provide some sort of mapping function here
// for the returned header names to the standard metadata
// names in the ParseData class
processHeaderLine(line);
} catch (Exception e) {
// fixme:
Http.LOG.warn("Error: ", e);
}
return;
}

processHeaderLine(line);
}
}

private static int readLine(PushbackInputStream in, StringBuffer line,
boolean allowContinuedLine) throws IOException {
LOG.info("enter readLine...");
line.setLength(0);
for (int c = in.read(); c != -1; c = in.read()) {
switch (c) {
case '\r':
if (peek(in) == '\n') {
in.read();
}
case '\n':
if (line.length() > 0) {
// at EOL -- check for continued line if the current
// (possibly continued) line wasn't blank
if (allowContinuedLine)
switch (peek(in)) {
case ' ':
case '\t': // line is continued
in.read();
continue;
}
}
return line.length(); // else complete
default:
line.append((char) c);
}
}
throw new EOFException();
}

private static int peek(PushbackInputStream in) throws IOException {
int value = in.read();
in.unread(value);
return value;
}

}

2 Http.java

/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nutch.protocol.http;

// JDK imports
import java.io.IOException;
import java.net.URL;

import java.util.logging.Level;

// Commons Logging imports
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// Hadoop imports
import org.apache.hadoop.conf.Configuration;

// Nutch imports
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.net.protocols.Response;
import org.apache.nutch.protocol.ProtocolException;
import org.apache.nutch.protocol.http.api.HttpBase;
import org.apache.nutch.util.NutchConfiguration;

public class Http extends HttpBase {

public static final Logger LOG  = LoggerFactory.getLogger(Http.class);

public Http() {

super(LOG);
}

public void setConf(Configuration conf) {
super.setConf(conf);
//    Level logLevel = Level.WARNING;
//    if (conf.getBoolean("http.verbose", false)) {
//      logLevel = Level.FINE;
//    }
//    LOG.setLevel(logLevel);
}

public static void main(String[] args) throws Exception {
Http http = new Http();
http.setConf(NutchConfiguration.create());
main(http, args);
}

protected Response getResponse(URL url, CrawlDatum datum, boolean redirect)
throws ProtocolException, IOException {
LOG.info("will execute return new HttpResponse(this, url, datum);");
return new HttpResponse(this, url, datum);
}

}


3 HttpSocket.java

package org.apache.nutch.protocol.http;

import java.io.IOException;
import java.net.Socket;
//Commons Logging imports
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.nutch.protocol.http.HttpResponse;
public class HttpSocket extends Socket {
public static final Logger LOG  = LoggerFactory.getLogger(HttpSocket.class);
private String host;

public HttpSocket(String h) {
this.host = h;
}

public void release() {
LOG.info("HttpSocket release function invoked...");
try {
super.close();
} catch (IOException e) {
HttpResponse.LOG.info("close the actual socket :)");
// TODO Auto-generated catch block
e.printStackTrace();
}
}

@Override
public void close() {
LOG.info("HttpSocket close function invoked...");
HttpPool.addSocket(host, this);
}
}

4 HttpPool.java

package org.apache.nutch.protocol.http;

import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;

public class HttpPool {

private static HashMap<String, ArrayList<Socket>> httpPool = new HashMap<String, ArrayList<Socket>>();
private static int SOCKET_VALVE = 50;

private static ArrayList<Socket> GetSocketList(String host){
ArrayList<Socket> list = httpPool.get(host);
if(null == list){
synchronized (httpPool) {
list = httpPool.get(host);
if(null==list){
list = new ArrayList<Socket>();
httpPool.put(host, list);
}
}
}
return list;
}

public static Socket getSocket(String host) {
Socket result = null;
ArrayList<Socket> list = GetSocketList(host);
synchronized (list) {
if (null != list && list.size() >0 ) {
result = list.remove(0);
}
}
return result;
}

public static void addSocket(String host, Socket s) {
boolean release = false;
ArrayList<Socket> list = GetSocketList(host);

synchronized(list){
if(list.size()< SOCKET_VALVE){
list.add(s);
release = false;
} else {
release = true;
}
}

if(release){
if( s instanceof HttpSocket){
((HttpSocket)s).release();
}
}

}
}

大致测了下,单机每天能取600万网页。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息