您的位置:首页 > 大数据

大数据核心技术源码分析之-Avro篇-3

2013-09-16 22:19 471 查看
由于篇幅大小控制,本篇继续借助分析avro下的package org.apache.avro的

package分析点包括generic,io,ipc,reflect,specific,tool,util

1:package generic

GenericContainer

public interface GenericContainer {

/** The schema of this instance. */

Schema getSchema();

}

子类接口GenericArray

public interface GenericArray<T> extends List<T>, GenericContainer {

T peek();

void reverse();

}

GenericFixed

public interface GenericFixed extends GenericContainer {

/** Return the data. */

byte[] bytes();

}

GenericEnumSymbol

public interface GenericEnumSymbol

extends GenericContainer, Comparable<GenericEnumSymbol> {

/** Return the symbol. */

String toString();

}

另外一个接口

IndexedRecord

public interface IndexedRecord extends GenericContainer {

void put(int i, Object v);

Object get(int i);

}

GenericRecord

public interface GenericRecord extends IndexedRecord {

void put(String key, Object v);

Object get(String key);

}

看一下GenericRecordBuilder该Builder类

public class GenericRecordBuilder extends RecordBuilderBase<Record> {}

核心方法

@Override

public Record build() {

Record record;

try {

record = new GenericData.Record(schema());

} catch (Exception e) {

throw new AvroRuntimeException(e);

}

for (Field field : fields()) {

Object value;

try {

value = getWithDefault(field);

} catch(IOException e) {

throw new AvroRuntimeException(e);

}

if (value != null) {

record.put(field.pos(), value);

}

}

return record;

}

@Override

public int hashCode() {

final int prime = 31;

int result = super.hashCode();

result = prime * result + ((record == null) ? 0 : record.hashCode());

return result;

}

@Override

public boolean equals(Object obj) {

if (this == obj)

return true;

if (!super.equals(obj))

return false;

if (getClass() != obj.getClass())

return false;

GenericRecordBuilder other = (GenericRecordBuilder) obj;

if (record == null) {

if (other.record != null)

return false;

} else if (!record.equals(other.record))

return false;

return true;

}

另外两个类Reader和Writer

public class GenericDatumReader<D> implements DatumReader<D> {}

public class GenericDatumWriter<D> implements DatumWriter<D> {}

关注的方法如下

protected Object read(Object old, Schema expected,

ResolvingDecoder in) throws IOException {

switch (expected.getType()) {

case RECORD: return readRecord(old, expected, in);

case ENUM: return readEnum(expected, in);

case ARRAY: return readArray(old, expected, in);

case MAP: return readMap(old, expected, in);

case UNION: return read(old, expected.getTypes().get(in.readIndex()), in);

case FIXED: return readFixed(old, expected, in);

case STRING: return readString(old, expected, in);

case BYTES: return readBytes(old, expected, in);

case INT: return readInt(old, expected, in);

case LONG: return in.readLong();

case FLOAT: return in.readFloat();

case DOUBLE: return in.readDouble();

case BOOLEAN: return in.readBoolean();

case NULL: in.readNull(); return null;

default: throw new AvroRuntimeException("Unknown type: " + expected);

}

}

另外一个方法

protected void write(Schema schema, Object datum, Encoder out)

throws IOException {

try {

switch (schema.getType()) {

case RECORD: writeRecord(schema, datum, out); break;

case ENUM: writeEnum(schema, datum, out); break;

case ARRAY: writeArray(schema, datum, out); break;

case MAP: writeMap(schema, datum, out); break;

case UNION:

int index = resolveUnion(schema, datum);

out.writeIndex(index);

write(schema.getTypes().get(index), datum, out);

break;

case FIXED: writeFixed(schema, datum, out); break;

case STRING: writeString(schema, datum, out); break;

case BYTES: writeBytes(datum, out); break;

case INT: out.writeInt(((Number)datum).intValue()); break;

case LONG: out.writeLong((Long)datum); break;

case FLOAT: out.writeFloat((Float)datum); break;

case DOUBLE: out.writeDouble((Double)datum); break;

case BOOLEAN: out.writeBoolean((Boolean)datum); break;

case NULL: out.writeNull(); break;

default: error(schema,datum);

}

} catch (NullPointerException e) {

throw npe(e, " of "+schema.getFullName());

}

}

另外一个类为GenericData.java

关注方法

public GenericData(ClassLoader classLoader) {

this.classLoader = (classLoader != null)

? classLoader

: getClass().getClassLoader();

}

一个深度复制

@SuppressWarnings({ "rawtypes", "unchecked" })

public <T> T deepCopy(Schema schema, T value) {

if (value == null) {

return null;

}

switch (schema.getType()) {

case ARRAY:

List<Object> arrayValue = (List) value;

List<Object> arrayCopy = new GenericData.Array<Object>(

arrayValue.size(), schema);

for (Object obj : arrayValue) {

arrayCopy.add(deepCopy(schema.getElementType(), obj));

}

return (T)arrayCopy;

case BOOLEAN:

return (T)new Boolean((Boolean)value);

case BYTES:

ByteBuffer byteBufferValue = (ByteBuffer) value;

int start = byteBufferValue.position();

int length = byteBufferValue.limit() - start;

byte[] bytesCopy = new byte[length];

byteBufferValue.get(bytesCopy, 0, length);

byteBufferValue.position(start);

return (T)ByteBuffer.wrap(bytesCopy, 0, length);

case DOUBLE:

return (T)new Double((Double)value);

case ENUM:

// Enums are immutable; shallow copy will suffice

return value;

case FIXED:

return (T)createFixed(null, ((GenericFixed) value).bytes(), schema);

case FLOAT:

return (T)new Float((Float)value);

case INT:

return (T)new Integer((Integer)value);

case LONG:

return (T)new Long((Long)value);

case MAP:

Map<CharSequence, Object> mapValue = (Map) value;

Map<CharSequence, Object> mapCopy =

new HashMap<CharSequence, Object>(mapValue.size());

for (Map.Entry<CharSequence, Object> entry : mapValue.entrySet()) {

mapCopy.put((CharSequence)(deepCopy(STRINGS, entry.getKey())),

deepCopy(schema.getValueType(), entry.getValue()));

}

return (T)mapCopy;

case NULL:

return null;

case RECORD:

Object oldState = getRecordState(value, schema);

Object newRecord = newRecord(null, schema);

Object newState = getRecordState(newRecord, schema);

for (Field f : schema.getFields()) {

int pos = f.pos();

String name = f.name();

Object newValue = deepCopy(f.schema(),

getField(value, name, pos, oldState));

setField(newRecord, name, pos, newValue, newState);

}

return (T)newRecord;

case STRING:

// Strings are immutable

if (value instanceof String) {

return (T)value;

}

// Some CharSequence subclasses are mutable, so we still need to make

// a copy

else if (value instanceof Utf8) {

// Utf8 copy constructor is more efficient than converting

// to string and then back to Utf8

return (T)new Utf8((Utf8)value);

}

return (T)new Utf8(value.toString());

case UNION:

return deepCopy(

schema.getTypes().get(resolveUnion(schema, value)), value);

default:

throw new AvroRuntimeException(

"Deep copy failed for schema \"" + schema + "\" and value \"" +

value + "\"");

}

}

在另外一个package中的specific下,亦存在类似的接口和类

包括如下几个

接口类

SpecificRecord.java

public interface SpecificRecord extends IndexedRecord {}

抽象子类

public abstract class SpecificRecordBase implements SpecificRecord, Comparable<SpecificRecord>, GenericRecord {}

另外一个为:

public abstract class SpecificFixed extends GenericData.Fixed

抽象Builder基类

abstract public class SpecificRecordBuilderBase<T extends SpecificRecord> extends RecordBuilderBase<T> {}

抽象SpecificExceptionBase基类

public abstract class SpecificExceptionBase extends AvroRemoteException implements SpecificRecord {}

抽象错误Builder基类

abstract public class SpecificErrorBuilderBase<T extends SpecificExceptionBase> extends RecordBuilderBase<T> implements ErrorBuilder<T> {}

对应的Reader和Writer

public class SpecificDatumReader<T> extends GenericDatumReader<T> {}

public class SpecificDatumWriter<T> extends GenericDatumWriter<T> {}

另外

public class SpecificData extends GenericData {}

两个子类接口

@Target(ElementType.TYPE)

@Retention(RetentionPolicy.RUNTIME)

public @interface AvroGenerated {

}

@Retention(RetentionPolicy.RUNTIME)

@Target({ElementType.TYPE})

@Documented

public @interface FixedSize {

int value();

}

接下来分析io包
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐