Hadoop之多行读取数据
2016-05-06 20:17
465 查看
一,需求:
在map执行前,即setInputFormatClass过程,会进行数据的读入,默认的是每次读入一行数据,进行计算。现在需要改成每次读入两行数据并且合并结果输出。
二,思路及解决方法:
建议先看看他们的源码,理解思路。
我这里是采用的TextInputFormat.class的输入格式。它的key是每一行的偏移位置,value就是它这一行的内容。其中有创建LineRecordReader类,它就是用来读取数据的封装类,我们需要重写它。
在LineRecordReader类中,观察出其nextKeyValue()方法中,有涉及到读取数据的方法,readLine(),在这个readLine()方法之前加个boolean值,用来控制后面不会将已经读到了的数据清空,然后再加个for循环用来做多次读取。再把这个传到readLine()中重写这个方法。
这事又需要重写它的父类LineReader,在LineRecordReader中是调用的SplitLineReader类,它是继承的LineReader类,还需要重写其他两个类,UncompressedSplitLineReader和CompressedSplitLineReader这两个类好像是用来做压缩的,不用管直接复制就行。
回到LineReader类,我们需要重载他的readLine()方法增加了一个boolean的参数。并将参数传到重载的readCustomLine()和readDefaultLine()在这个两个方法中只需利用boolean值,对数据清除进行判断,其他代码复制即可。
下面一个简图展示这个过程:
1,输入的数据:
2,结果:
源码展示:
,1, 测试类
2,输入格式类
3,读取数据类:
4,读取数据的父类LineReader:
5,几个打酱油的类:
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class SplitLineReader extends LineReader {
public SplitLineReader(InputStream in, byte[] recordDelimiterBytes) {
super(in, recordDelimiterBytes);
}
在map执行前,即setInputFormatClass过程,会进行数据的读入,默认的是每次读入一行数据,进行计算。现在需要改成每次读入两行数据并且合并结果输出。
二,思路及解决方法:
建议先看看他们的源码,理解思路。
我这里是采用的TextInputFormat.class的输入格式。它的key是每一行的偏移位置,value就是它这一行的内容。其中有创建LineRecordReader类,它就是用来读取数据的封装类,我们需要重写它。
在LineRecordReader类中,观察出其nextKeyValue()方法中,有涉及到读取数据的方法,readLine(),在这个readLine()方法之前加个boolean值,用来控制后面不会将已经读到了的数据清空,然后再加个for循环用来做多次读取。再把这个传到readLine()中重写这个方法。
这事又需要重写它的父类LineReader,在LineRecordReader中是调用的SplitLineReader类,它是继承的LineReader类,还需要重写其他两个类,UncompressedSplitLineReader和CompressedSplitLineReader这两个类好像是用来做压缩的,不用管直接复制就行。
回到LineReader类,我们需要重载他的readLine()方法增加了一个boolean的参数。并将参数传到重载的readCustomLine()和readDefaultLine()在这个两个方法中只需利用boolean值,对数据清除进行判断,其他代码复制即可。
下面一个简图展示这个过程:
1,输入的数据:
2,结果:
源码展示:
,1, 测试类
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class TestUserInputFormat { public static class UserMapper extends Mapper<LongWritable,Text, LongWritable, Text>{ protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException { context.write(key, value); } } public static void main(String[] args) { try { Configuration conf=new Configuration(); Job job=Job.getInstance(conf,"Test lineRecordReader"); job.setJarByClass(TestUserInputFormat.class); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(UserMapper.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path("hdfs://192.168.61.128:9000/inline/")); FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.61.128:9000/outline/"+System.currentTimeMillis()+"/")); System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (IllegalStateException e) { e.printStackTrace(); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
2,输入格式类
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.SplittableCompressionCodec; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import com.google.common.base.Charsets; /** An {@link InputFormat} for plain text files. Files are broken into lines. * Either linefeed or carriage-return are used to signal end of line. Keys are * the position in the file, and values are the line of text.. */ @InterfaceAudience.Public @InterfaceStability.Stable public class TextInputFormat extends FileInputFormat<LongWritable, Text> { @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { String delimiter = context.getConfiguration().get( "textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); return new LineRecordReader(recordDelimiterBytes); } @Override protected boolean isSplitable(JobContext context, Path file) { final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file); if (null == codec) { return true; } return codec instanceof SplittableCompressionCodec; } }
3,读取数据类:
import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.SplitCompressionInputStream; import org.apache.hadoop.io.compress.SplittableCompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.Log; /** * Treats keys as offset in file and value as line. */ @InterfaceAudience.LimitedPrivate({"MapReduce", "Pig"}) @InterfaceStability.Evolving public class LineRecordReader extends RecordReader<LongWritable, Text> { private static final Log LOG = LogFactory.getLog(LineRecordReader.class); public static final String MAX_LINE_LENGTH = "mapreduce.input.linerecordreader.line.maxlength"; private long start; private long pos; private long end; private SplitLineReader in; private FSDataInputStream fileIn; private Seekable filePosition; private int maxLineLength; private LongWritable key; private Text value; private boolean isCompressedInput; private Decompressor decompressor; private byte[] recordDelimiterBytes; public LineRecordReader() { } public LineRecordReader(byte[] recordDelimiter) { this.recordDelimiterBytes = recordDelimiter; } public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); // open the file and seek to the start of the split final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file); CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); if (null!=codec) { isCompressedInput = true; decompressor = CodecPool.getDecompressor(codec); if (codec instanceof SplittableCompressionCodec) { final SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream( fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); in = new CompressedSplitLineReader(cIn, job, this.recordDelimiterBytes); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; } else { in = new SplitLineReader(codec.createInputStream(fileIn, decompressor), job, this.recordDelimiterBytes); filePosition = fileIn; } } else { fileIn.seek(start); in = new UncompressedSplitLineReader( fileIn, job, this.recordDelimiterBytes, split.getLength()); filePosition = fileIn; } // If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start; } private int maxBytesToConsume(long pos) { return isCompressedInput ? Integer.MAX_VALUE : (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength); } private long getFilePosition() throws IOException { long retVal; if (isCompressedInput && null != filePosition) { retVal = filePosition.getPos(); } else { retVal = pos; } return retVal; } private int skipUtfByteOrderMark() throws IOException { // Strip BOM(Byte Order Mark) // Text only support UTF-8, we only need to check UTF-8 BOM // (0xEF,0xBB,0xBF) at the start of the text stream. int newMaxLineLength = (int) Math.min(3L + (long) maxLineLength, Integer.MAX_VALUE); int newSize = in.readLine(value, newMaxLineLength, maxBytesToConsume(pos)); // Even we read 3 extra bytes for the first line, // we won't alter existing behavior (no backwards incompat issue). // Because the newSize is less than maxLineLength and // the number of bytes copied to Text is always no more than newSize. // If the return size from readLine is not less than maxLineLength, // we will discard the current line and read the next line. pos += newSize; int textLength = value.getLength(); byte[] textBytes = value.getBytes(); if ((textLength >= 3) && (textBytes[0] == (byte)0xEF) && (textBytes[1] == (byte)0xBB) && (textBytes[2] == (byte)0xBF)) { // find UTF-8 BOM, strip it. LOG.info("Found UTF-8 BOM and skipped it"); textLength -= 3; newSize -= 3; if (textLength > 0) { // It may work to use the same buffer and not do the copyBytes textBytes = value.copyBytes(); value.set(textBytes, 3, textLength); } else { value.clear(); } } return newSize; } public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos); if (value == null) { value = new Text(); } int newSize = 0; // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) boolean flag=true; for(int i=1;i<=2;i++){ if(i==2){ flag=false; } while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { if (pos == 0) { newSize = skipUtfByteOrderMark(); } else { newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos),flag); pos += newSize; } if ((newSize == 0) || (newSize < maxLineLength)) { break; } // line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } } if (newSize == 0) { key = null; value = null; return false; } else { return true; } } @Override public LongWritable getCurrentKey() { return key; } @Override public Text getCurrentValue() { return value; } /** * Get the progress within the split */ public float getProgress() throws IOException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start)); } } public synchronized void close() throws IOException { try { if (in != null) { in.close(); } } finally { if (decompressor != null) { CodecPool.returnDecompressor(decompressor); } } } }
4,读取数据的父类LineReader:
import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; /** * A class that provides a line reader from an input stream. * Depending on the constructor used, lines will either be terminated by: * <ul> * <li>one of the following: '\n' (LF) , '\r' (CR), * or '\r\n' (CR+LF).</li> * <li><em>or</em>, a custom byte sequence delimiter</li> * </ul> * In both cases, EOF also terminates an otherwise unterminated * line. */ @InterfaceAudience.LimitedPrivate({"MapReduce"}) @InterfaceStability.Unstable public class LineReader implements Closeable { private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; private int bufferSize = DEFAULT_BUFFER_SIZE; private InputStream in; private byte[] buffer; // the number of bytes of real data in the buffer private int bufferLength = 0; // the current position in the buffer private int bufferPosn = 0; private static final byte CR = '\r'; private static final byte LF = '\n'; // The line delimiter private final byte[] recordDelimiterBytes; /** * Create a line reader that reads from the given stream using the * default buffer-size (64k). * @param in The input stream * @throws IOException */ public LineReader(InputStream in) { this(in, DEFAULT_BUFFER_SIZE); } /** * Create a line reader that reads from the given stream using the * given buffer-size. * @param in The input stream * @param bufferSize Size of the read buffer * @throws IOException */ public LineReader(InputStream in, int bufferSize) { this.in = in; this.bufferSize = bufferSize; this.buffer = new byte[this.bufferSize]; this.recordDelimiterBytes = null; } /** * Create a line reader that reads from the given stream using the * <code>io.file.buffer.size</code> specified in the given * <code>Configuration</code>. * @param in input stream * @param conf configuration * @throws IOException */ public LineReader(InputStream in, Configuration conf) throws IOException { this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE)); } /** * Create a line reader that reads from the given stream using the * default buffer-size, and using a custom delimiter of array of * bytes. * @param in The input stream * @param recordDelimiterBytes The delimiter */ public LineReader(InputStream in, byte[] recordDelimiterBytes) { this.in = in; this.bufferSize = DEFAULT_BUFFER_SIZE; this.buffer = new byte[this.bufferSize]; this.recordDelimiterBytes = recordDelimiterBytes; } /** * Create a line reader that reads from the given stream using the * given buffer-size, and using a custom delimiter of array of * bytes. * @param in The input stream * @param bufferSize Size of the read buffer * @param recordDelimiterBytes The delimiter * @throws IOException */ public LineReader(InputStream in, int bufferSize, byte[] recordDelimiterBytes) { this.in = in; this.bufferSize = bufferSize; this.buffer = new byte[this.bufferSize]; this.recordDelimiterBytes = recordDelimiterBytes; } /** * Create a line reader that reads from the given stream using the * <code>io.file.buffer.size</code> specified in the given * <code>Configuration</code>, and using a custom delimiter of array of * bytes. * @param in input stream * @param conf configuration * @param recordDelimiterBytes The delimiter * @throws IOException */ public LineReader(InputStream in, Configuration conf, byte[] recordDelimiterBytes) throws IOException { this.in = in; this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE); this.buffer = new byte[this.bufferSize]; this.recordDelimiterBytes = recordDelimiterBytes; } /** * Close the underlying stream. * @throws IOException */ public void close() throws IOException { in.close(); } /** * Read one line from the InputStream into the given Text. * * @param str the object to store the given line (without newline) * @param maxLineLength the maximum number of bytes to store into str; * the rest of the line is silently discarded. * @param maxBytesToConsume the maximum number of bytes to consume * in this call. This is only a hint, because if the line cross * this threshold, we allow it to happen. It can overshoot * potentially by as much as one buffer length. * * @return the number of bytes read including the (longest) newline * found. * * @throws IOException if the underlying stream throws */ public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { if (this.recordDelimiterBytes != null) { return readCustomLine(str, maxLineLength, maxBytesToConsume); } else { return readDefaultLine(str, maxLineLength, maxBytesToConsume); } } public int readLine(Text str, int maxLineLength, int maxBytesToConsume,boolean flag) throws IOException { if (this.recordDelimiterBytes != null) { return readCustomLine(str, maxLineLength, maxBytesToConsume,flag); } else { return readDefaultLine(str, maxLineLength, maxBytesToConsume,flag); } } protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter) throws IOException { return in.read(buffer); } /** * Read a line terminated by one of CR, LF, or CRLF. */ private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { /* We're reading data from in, but the head of the stream may be * already buffered in buffer, so we have several cases: * 1. No newline characters are in the buffer, so we need to copy * everything and read another buffer from the stream. * 2. An unambiguously terminated line is in buffer, so we just * copy to str. * 3. Ambiguously terminated line is in buffer, i.e. buffer ends * in CR. In this case we copy everything up to CR to str, but * we also need to see what follows CR: if it's LF, then we * need consume LF as well, so next call to readLine will read * from after that. * We use a flag prevCharCR to signal if previous character was CR * and, if it happens to be at the end of the buffer, delay * consuming it until we have a chance to look at the char that * follows. */ str.clear(); int txtLength = 0; //tracks str.getLength(), as an optimization int newlineLength = 0; //length of terminating newline boolean prevCharCR = false; //true of prev char was CR long bytesConsumed = 0; do { int startPosn = bufferPosn; //starting from where we left off the last time if (bufferPosn >= bufferLength) { startPosn = bufferPosn = 0; if (prevCharCR) { ++bytesConsumed; //account for CR from previous read } bufferLength = fillBuffer(in, buffer, prevCharCR); if (bufferLength <= 0) { break; // EOF } } for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline if (buffer[bufferPosn] == LF) { newlineLength = (prevCharCR) ? 2 : 1; ++bufferPosn; // at next invocation proceed from following byte break; } if (prevCharCR) { //CR + notLF, we are at notLF newlineLength = 1; break; } prevCharCR = (buffer[bufferPosn] == CR); } int readLength = bufferPosn - startPosn; if (prevCharCR && newlineLength == 0) { --readLength; //CR at the end of the buffer } bytesConsumed += readLength; int appendLength = readLength - newlineLength; if (appendLength > maxLineLength - txtLength) { appendLength = maxLineLength - txtLength; } if (appendLength > 0) { str.append(buffer, startPosn, appendLength); txtLength += appendLength; } } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume); if (bytesConsumed > Integer.MAX_VALUE) { throw new IOException("Too many bytes before newline: " + bytesConsumed); } return (int)bytesConsumed; } private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume,boolean flag) throws IOException { /* We're reading data from in, but the head of the stream may be * already buffered in buffer, so we have several cases: * 1. No newline characters are in the buffer, so we need to copy * everything and read another buffer from the stream. * 2. An unambiguously terminated line is in buffer, so we just * copy to str. * 3. Ambiguously terminated line is in buffer, i.e. buffer ends * in CR. In this case we copy everything up to CR to str, but * we also need to see what follows CR: if it's LF, then we * need consume LF as well, so next call to readLine will read * from after that. * We use a flag prevCharCR to signal if previous character was CR * and, if it happens to be at the end of the buffer, delay * consuming it until we have a chance to look at the char that * follows. */ if(flag){ str.clear(); } int txtLength = 0; //tracks str.getLength(), as an optimization int newlineLength = 0; //length of terminating newline boolean prevCharCR = false; //true of prev char was CR long bytesConsumed = 0; do { int startPosn = bufferPosn; //starting from where we left off the last time if (bufferPosn >= bufferLength) { startPosn = bufferPosn = 0; if (prevCharCR) { ++bytesConsumed; //account for CR from previous read } bufferLength = fillBuffer(in, buffer, prevCharCR); if (bufferLength <= 0) { break; // EOF } } for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline if (buffer[bufferPosn] == LF) { newlineLength = (prevCharCR) ? 2 : 1; ++bufferPosn; // at next invocation proceed from following byte break; } if (prevCharCR) { //CR + notLF, we are at notLF newlineLength = 1; break; } prevCharCR = (buffer[bufferPosn] == CR); } int readLength = bufferPosn - startPosn; if (prevCharCR && newlineLength == 0) { --readLength; //CR at the end of the buffer } bytesConsumed += readLength; int appendLength = readLength - newlineLength; if (appendLength > maxLineLength - txtLength) { appendLength = maxLineLength - txtLength; } if (appendLength > 0) { str.append(buffer, startPosn, appendLength); txtLength += appendLength; } } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume); if (bytesConsumed > Integer.MAX_VALUE) { throw new IOException("Too many bytes before newline: " + bytesConsumed); } return (int)bytesConsumed; } private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume,boolean flag) throws IOException { /* We're reading data from inputStream, but the head of the stream may be * already captured in the previous buffer, so we have several cases: * * 1. The buffer tail does not contain any character sequence which * matches with the head of delimiter. We count it as a * ambiguous byte count = 0 * * 2. The buffer tail contains a X number of characters, * that forms a sequence, which matches with the * head of delimiter. We count ambiguous byte count = X * * // *** eg: A segment of input file is as follows * * " record 1792: I found this bug very interesting and * I have completely read about it. record 1793: This bug * can be solved easily record 1794: This ." * * delimiter = "record"; * * supposing:- String at the end of buffer = * "I found this bug very interesting and I have completely re" * There for next buffer = "ad about it. record 179 ...." * * The matching characters in the input * buffer tail and delimiter head = "re" * Therefore, ambiguous byte count = 2 **** // * * 2.1 If the following bytes are the remaining characters of * the delimiter, then we have to capture only up to the starting * position of delimiter. That means, we need not include the * ambiguous characters in str. * * 2.2 If the following bytes are not the remaining characters of * the delimiter ( as mentioned in the example ), * then we have to include the ambiguous characters in str. */ if(flag){ str.clear(); } int txtLength = 0; // tracks str.getLength(), as an optimization long bytesConsumed = 0; int delPosn = 0; int ambiguousByteCount=0; // To capture the ambiguous characters count do { int startPosn = bufferPosn; // Start from previous end position if (bufferPosn >= bufferLength) { startPosn = bufferPosn = 0; bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0); if (bufferLength <= 0) { if (ambiguousByteCount > 0) { str.append(recordDelimiterBytes, 0, ambiguousByteCount); bytesConsumed += ambiguousByteCount; } break; // EOF } } for (; bufferPosn < bufferLength; ++bufferPosn) { if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) { delPosn++; if (delPosn >= recordDelimiterBytes.length) { bufferPosn++; break; } } else if (delPosn != 0) { bufferPosn--; delPosn = 0; } } int readLength = bufferPosn - startPosn; bytesConsumed += readLength; int appendLength = readLength - delPosn; if (appendLength > maxLineLength - txtLength) { appendLength = maxLineLength - txtLength; } bytesConsumed += ambiguousByteCount; if (appendLength >= 0 && ambiguousByteCount > 0) { //appending the ambiguous characters (refer case 2.2) str.append(recordDelimiterBytes, 0, ambiguousByteCount); ambiguousByteCount = 0; // since it is now certain that the split did not split a delimiter we // should not read the next record: clear the flag otherwise duplicate // records could be generated unsetNeedAdditionalRecordAfterSplit(); } if (appendLength > 0) { str.append(buffer, startPosn, appendLength); txtLength += appendLength; } if (bufferPosn >= bufferLength) { if (delPosn > 0 && delPosn < recordDelimiterBytes.length) { ambiguousByteCount = delPosn; bytesConsumed -= ambiguousByteCount; //to be consumed in next } } } while (delPosn < recordDelimiterBytes.length && bytesConsumed < maxBytesToConsume); if (bytesConsumed > Integer.MAX_VALUE) { throw new IOException("Too many bytes before delimiter: " + bytesConsumed); } return (int) bytesConsumed; } /** * Read a line terminated by a custom delimiter. */ private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { /* We're reading data from inputStream, but the head of the stream may be * already captured in the previous buffer, so we have several cases: * * 1. The buffer tail does not contain any character sequence which * matches with the head of delimiter. We count it as a * ambiguous byte count = 0 * * 2. The buffer tail contains a X number of characters, * that forms a sequence, which matches with the * head of delimiter. We count ambiguous byte count = X * * // *** eg: A segment of input file is as follows * * " record 1792: I found this bug very interesting and * I have completely read about it. record 1793: This bug * can be solved easily record 1794: This ." * * delimiter = "record"; * * supposing:- String at the end of buffer = * "I found this bug very interesting and I have completely re" * There for next buffer = "ad about it. record 179 ...." * * The matching characters in the input * buffer tail and delimiter head = "re" * Therefore, ambiguous byte count = 2 **** // * * 2.1 If the following bytes are the remaining characters of * the delimiter, then we have to capture only up to the starting * position of delimiter. That means, we need not include the * ambiguous characters in str. * * 2.2 If the following bytes are not the remaining characters of * the delimiter ( as mentioned in the example ), * then we have to include the ambiguous characters in str. */ str.clear(); int txtLength = 0; // tracks str.getLength(), as an optimization long bytesConsumed = 0; int delPosn = 0; int ambiguousByteCount=0; // To capture the ambiguous characters count do { int startPosn = bufferPosn; // Start from previous end position if (bufferPosn >= bufferLength) { startPosn = bufferPosn = 0; bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0); if (bufferLength <= 0) { if (ambiguousByteCount > 0) { str.append(recordDelimiterBytes, 0, ambiguousByteCount); bytesConsumed += ambiguousByteCount; } break; // EOF } } for (; bufferPosn < bufferLength; ++bufferPosn) { if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) { delPosn++; if (delPosn >= recordDelimiterBytes.length) { bufferPosn++; break; } } else if (delPosn != 0) { bufferPosn--; delPosn = 0; } } int readLength = bufferPosn - startPosn; bytesConsumed += readLength; int appendLength = readLength - delPosn; if (appendLength > maxLineLength - txtLength) { appendLength = maxLineLength - txtLength; } bytesConsumed += ambiguousByteCount; if (appendLength >= 0 && ambiguousByteCount > 0) { //appending the ambiguous characters (refer case 2.2) str.append(recordDelimiterBytes, 0, ambiguousByteCount); ambiguousByteCount = 0; // since it is now certain that the split did not split a delimiter we // should not read the next record: clear the flag otherwise duplicate // records could be generated unsetNeedAdditionalRecordAfterSplit(); } if (appendLength > 0) { str.append(buffer, startPosn, appendLength); txtLength += appendLength; } if (bufferPosn >= bufferLength) { if (delPosn > 0 && delPosn < recordDelimiterBytes.length) { ambiguousByteCount = delPosn; bytesConsumed -= ambiguousByteCount; //to be consumed in next } } } while (delPosn < recordDelimiterBytes.length && bytesConsumed < maxBytesToConsume); if (bytesConsumed > Integer.MAX_VALUE) { throw new IOException("Too many bytes before delimiter: " + bytesConsumed); } return (int) bytesConsumed; } /** * Read from the InputStream into the given Text. * @param str the object to store the given line * @param maxLineLength the maximum number of bytes to store into str. * @return the number of bytes read including the newline * @throws IOException if the underlying stream throws */ public int readLine(Text str, int maxLineLength) throws IOException { return readLine(str, maxLineLength, Integer.MAX_VALUE); } /** * Read from the InputStream into the given Text. * @param str the object to store the given line * @return the number of bytes read including the newline * @throws IOException if the underlying stream throws */ public int readLine(Text str) throws IOException { return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE); } protected int getBufferPosn() { return bufferPosn; } protected int getBufferSize() { return bufferSize; } protected void unsetNeedAdditionalRecordAfterSplit() { // needed for custom multi byte line delimiters only // see MAPREDUCE-6549 for details } }
5,几个打酱油的类:
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class SplitLineReader extends LineReader {
public SplitLineReader(InputStream in, byte[] recordDelimiterBytes) {
super(in, recordDelimiterBytes);
}
public SplitLineReader(InputStream in, Configuration conf, byte[] recordDelimiterBytes) throws IOException { super(in, conf, recordDelimiterBytes); } public boolean needAdditionalRecordAfterSplit() { return false; } }
import java.io.IOException; import java.io.InputStream; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.SplitCompressionInputStream; /** * Line reader for compressed splits * * Reading records from a compressed split is tricky, as the * LineRecordReader is using the reported compressed input stream * position directly to determine when a split has ended. In addition the * compressed input stream is usually faking the actual byte position, often * updating it only after the first compressed block after the split is * accessed. * * Depending upon where the last compressed block of the split ends relative * to the record delimiters it can be easy to accidentally drop the last * record or duplicate the last record between this split and the next. * * Split end scenarios: * * 1) Last block of split ends in the middle of a record * Nothing special that needs to be done here, since the compressed input * stream will report a position after the split end once the record * is fully read. The consumer of the next split will discard the * partial record at the start of the split normally, and no data is lost * or duplicated between the splits. * * 2) Last block of split ends in the middle of a delimiter * The line reader will continue to consume bytes into the next block to * locate the end of the delimiter. If a custom delimiter is being used * then the next record must be read by this split or it will be dropped. * The consumer of the next split will not recognize the partial * delimiter at the beginning of its split and will discard it along with * the next record. * * However for the default delimiter processing there is a special case * because CR, LF, and CRLF are all valid record delimiters. If the * block ends with a CR then the reader must peek at the next byte to see * if it is an LF and therefore part of the same record delimiter. * Peeking at the next byte is an access to the next block and triggers * the stream to report the end of the split. There are two cases based * on the next byte: * * A) The next byte is LF * The split needs to end after the current record is returned. The * consumer of the next split will discard the first record, which * is degenerate since LF is itself a delimiter, and start consuming * records after that byte. If the current split tries to read * another record then the record will be duplicated between splits. * * B) The next byte is not LF * The current record will be returned but the stream will report * the split has ended due to the peek into the next block. If the * next record is not read then it will be lost, as the consumer of * the next split will discard it before processing subsequent * records. Therefore the next record beyond the reported split end * must be consumed by this split to avoid data loss. * * 3) Last block of split ends at the beginning of a delimiter * This is equivalent to case 1, as the reader will consume bytes into * the next block and trigger the end of the split. No further records * should be read as the consumer of the next split will discard the * (degenerate) record at the beginning of its split. * * 4) Last block of split ends at the end of a delimiter * Nothing special needs to be done here. The reader will not start * examining the bytes into the next block until the next record is read, * so the stream will not report the end of the split just yet. Once the * next record is read then the next block will be accessed and the * stream will indicate the end of the split. The consumer of the next * split will correctly discard the first record of its split, and no * data is lost or duplicated. * * If the default delimiter is used and the block ends at a CR then this * is treated as case 2 since the reader does not yet know without * looking at subsequent bytes whether the delimiter has ended. * * NOTE: It is assumed that compressed input streams *never* return bytes from * multiple compressed blocks from a single read. Failure to do so will * violate the buffering performed by this class, as it will access * bytes into the next block after the split before returning all of the * records from the previous block. */ @InterfaceAudience.Private @InterfaceStability.Unstable public class CompressedSplitLineReader extends SplitLineReader { SplitCompressionInputStream scin; private boolean usingCRLF; private boolean needAdditionalRecord = false; private boolean finished = false; public CompressedSplitLineReader(SplitCompressionInputStream in, Configuration conf, byte[] recordDelimiterBytes) throws IOException { super(in, conf, recordDelimiterBytes); scin = in; usingCRLF = (recordDelimiterBytes == null); } @Override protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter) throws IOException { int bytesRead = in.read(buffer); // If the split ended in the middle of a record delimiter then we need // to read one additional record, as the consumer of the next split will // not recognize the partial delimiter as a record. // However if using the default delimiter and the next character is a // linefeed then next split will treat it as a delimiter all by itself // and the additional record read should not be performed. if (inDelimiter && bytesRead > 0) { if (usingCRLF) { needAdditionalRecord = (buffer[0] != '\n'); } else { needAdditionalRecord = true; } } return bytesRead; } @Override public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { int bytesRead = 0; if (!finished) { // only allow at most one more record to be read after the stream // reports the split ended if (scin.getPos() > scin.getAdjustedEnd()) { finished = true; } bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume); } return bytesRead; } @Override public boolean needAdditionalRecordAfterSplit() { return !finished && needAdditionalRecord; } }
import java.io.IOException; import java.io.InputStream; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.io.Text; /** * SplitLineReader for uncompressed files. * This class can split the file correctly even if the delimiter is multi-bytes. */ @InterfaceAudience.Private @InterfaceStability.Unstable public class UncompressedSplitLineReader extends SplitLineReader { private boolean needAdditionalRecord = false; private long splitLength; /** Total bytes read from the input stream. */ private long totalBytesRead = 0; private boolean finished = false; private boolean usingCRLF; public UncompressedSplitLineReader(FSDataInputStream in, Configuration conf, byte[] recordDelimiterBytes, long splitLength) throws IOException { super(in, conf, recordDelimiterBytes); this.splitLength = splitLength; usingCRLF = (recordDelimiterBytes == null); } @Override protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter) throws IOException { int maxBytesToRead = buffer.length; if (totalBytesRead < splitLength) { maxBytesToRead = Math.min(maxBytesToRead, (int)(splitLength - totalBytesRead)); } int bytesRead = in.read(buffer, 0, maxBytesToRead); // If the split ended in the middle of a record delimiter then we need // to read one additional record, as the consumer of the next split will // not recognize the partial delimiter as a record. // However if using the default delimiter and the next character is a // linefeed then next split will treat it as a delimiter all by itself // and the additional record read should not be performed. if (totalBytesRead == splitLength && inDelimiter && bytesRead > 0) { if (usingCRLF) { needAdditionalRecord = (buffer[0] != '\n'); } else { needAdditionalRecord = true; } } if (bytesRead > 0) { totalBytesRead += bytesRead; } return bytesRead; } @Override public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { int bytesRead = 0; if (!finished) { // only allow at most one more record to be read after the stream // reports the split ended if (totalBytesRead > splitLength) { finished = true; } bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume); } return bytesRead; } @Override public boolean needAdditionalRecordAfterSplit() { return !finished && needAdditionalRecord; } @Override protected void unsetNeedAdditionalRecordAfterSplit() { needAdditionalRecord = false; } }
相关文章推荐
- Centos6.5 卸载和安装Java
- Openwrt一条命令把整个路由器搞蹦了
- CentOS杂记(1)
- Hadoop学习笔记---持续更新中。。。
- Hadoop完全分布式搭建
- hadoop2.6.0 伪分布式搭建
- hadoop平台搭建(2.6.0)
- html第四天网站首页的布局设计到实施
- Linux系统启动流程及grub简单应用
- Nginx的负载均衡 - 一致性哈希 (Consistent Hash)
- linux 七个运行级别
- 在CMD下启动vmware、Xshell连接虚拟机以及控制Chrome浏览器自动执行js登录校园网
- shell常用命令之curl: -w,–write-out参数详解
- 某银行MIS项目运维札记
- linux 下的编程效率分享
- 开山斧 WEBSHELL管理器 V0.2
- vsftpd:500 OOPS: vsftpd: refusing to run with writable root inside chroot ()错误的解决方法
- Ubuntu搭建Openstack平台(kilo)(三.glance)
- 爬虫程序定时执行和监控示例
- linux 命令中du 与df 的区别