大数据核心技术源码分析之-Avro篇-3
2013-09-16 22:19
471 查看
由于篇幅大小控制,本篇继续借助分析avro下的package org.apache.avro的
package分析点包括generic,io,ipc,reflect,specific,tool,util
1:package generic
GenericContainer
public interface GenericContainer {
/** The schema of this instance. */
Schema getSchema();
}
子类接口GenericArray
public interface GenericArray<T> extends List<T>, GenericContainer {
T peek();
void reverse();
}
GenericFixed
public interface GenericFixed extends GenericContainer {
/** Return the data. */
byte[] bytes();
}
GenericEnumSymbol
public interface GenericEnumSymbol
extends GenericContainer, Comparable<GenericEnumSymbol> {
/** Return the symbol. */
String toString();
}
另外一个接口
IndexedRecord
public interface IndexedRecord extends GenericContainer {
void put(int i, Object v);
Object get(int i);
}
GenericRecord
public interface GenericRecord extends IndexedRecord {
void put(String key, Object v);
Object get(String key);
}
看一下GenericRecordBuilder该Builder类
public class GenericRecordBuilder extends RecordBuilderBase<Record> {}
核心方法
@Override
public Record build() {
Record record;
try {
record = new GenericData.Record(schema());
} catch (Exception e) {
throw new AvroRuntimeException(e);
}
for (Field field : fields()) {
Object value;
try {
value = getWithDefault(field);
} catch(IOException e) {
throw new AvroRuntimeException(e);
}
if (value != null) {
record.put(field.pos(), value);
}
}
return record;
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + ((record == null) ? 0 : record.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (getClass() != obj.getClass())
return false;
GenericRecordBuilder other = (GenericRecordBuilder) obj;
if (record == null) {
if (other.record != null)
return false;
} else if (!record.equals(other.record))
return false;
return true;
}
另外两个类Reader和Writer
public class GenericDatumReader<D> implements DatumReader<D> {}
public class GenericDatumWriter<D> implements DatumWriter<D> {}
关注的方法如下
protected Object read(Object old, Schema expected,
ResolvingDecoder in) throws IOException {
switch (expected.getType()) {
case RECORD: return readRecord(old, expected, in);
case ENUM: return readEnum(expected, in);
case ARRAY: return readArray(old, expected, in);
case MAP: return readMap(old, expected, in);
case UNION: return read(old, expected.getTypes().get(in.readIndex()), in);
case FIXED: return readFixed(old, expected, in);
case STRING: return readString(old, expected, in);
case BYTES: return readBytes(old, expected, in);
case INT: return readInt(old, expected, in);
case LONG: return in.readLong();
case FLOAT: return in.readFloat();
case DOUBLE: return in.readDouble();
case BOOLEAN: return in.readBoolean();
case NULL: in.readNull(); return null;
default: throw new AvroRuntimeException("Unknown type: " + expected);
}
}
另外一个方法
protected void write(Schema schema, Object datum, Encoder out)
throws IOException {
try {
switch (schema.getType()) {
case RECORD: writeRecord(schema, datum, out); break;
case ENUM: writeEnum(schema, datum, out); break;
case ARRAY: writeArray(schema, datum, out); break;
case MAP: writeMap(schema, datum, out); break;
case UNION:
int index = resolveUnion(schema, datum);
out.writeIndex(index);
write(schema.getTypes().get(index), datum, out);
break;
case FIXED: writeFixed(schema, datum, out); break;
case STRING: writeString(schema, datum, out); break;
case BYTES: writeBytes(datum, out); break;
case INT: out.writeInt(((Number)datum).intValue()); break;
case LONG: out.writeLong((Long)datum); break;
case FLOAT: out.writeFloat((Float)datum); break;
case DOUBLE: out.writeDouble((Double)datum); break;
case BOOLEAN: out.writeBoolean((Boolean)datum); break;
case NULL: out.writeNull(); break;
default: error(schema,datum);
}
} catch (NullPointerException e) {
throw npe(e, " of "+schema.getFullName());
}
}
另外一个类为GenericData.java
关注方法
public GenericData(ClassLoader classLoader) {
this.classLoader = (classLoader != null)
? classLoader
: getClass().getClassLoader();
}
一个深度复制
@SuppressWarnings({ "rawtypes", "unchecked" })
public <T> T deepCopy(Schema schema, T value) {
if (value == null) {
return null;
}
switch (schema.getType()) {
case ARRAY:
List<Object> arrayValue = (List) value;
List<Object> arrayCopy = new GenericData.Array<Object>(
arrayValue.size(), schema);
for (Object obj : arrayValue) {
arrayCopy.add(deepCopy(schema.getElementType(), obj));
}
return (T)arrayCopy;
case BOOLEAN:
return (T)new Boolean((Boolean)value);
case BYTES:
ByteBuffer byteBufferValue = (ByteBuffer) value;
int start = byteBufferValue.position();
int length = byteBufferValue.limit() - start;
byte[] bytesCopy = new byte[length];
byteBufferValue.get(bytesCopy, 0, length);
byteBufferValue.position(start);
return (T)ByteBuffer.wrap(bytesCopy, 0, length);
case DOUBLE:
return (T)new Double((Double)value);
case ENUM:
// Enums are immutable; shallow copy will suffice
return value;
case FIXED:
return (T)createFixed(null, ((GenericFixed) value).bytes(), schema);
case FLOAT:
return (T)new Float((Float)value);
case INT:
return (T)new Integer((Integer)value);
case LONG:
return (T)new Long((Long)value);
case MAP:
Map<CharSequence, Object> mapValue = (Map) value;
Map<CharSequence, Object> mapCopy =
new HashMap<CharSequence, Object>(mapValue.size());
for (Map.Entry<CharSequence, Object> entry : mapValue.entrySet()) {
mapCopy.put((CharSequence)(deepCopy(STRINGS, entry.getKey())),
deepCopy(schema.getValueType(), entry.getValue()));
}
return (T)mapCopy;
case NULL:
return null;
case RECORD:
Object oldState = getRecordState(value, schema);
Object newRecord = newRecord(null, schema);
Object newState = getRecordState(newRecord, schema);
for (Field f : schema.getFields()) {
int pos = f.pos();
String name = f.name();
Object newValue = deepCopy(f.schema(),
getField(value, name, pos, oldState));
setField(newRecord, name, pos, newValue, newState);
}
return (T)newRecord;
case STRING:
// Strings are immutable
if (value instanceof String) {
return (T)value;
}
// Some CharSequence subclasses are mutable, so we still need to make
// a copy
else if (value instanceof Utf8) {
// Utf8 copy constructor is more efficient than converting
// to string and then back to Utf8
return (T)new Utf8((Utf8)value);
}
return (T)new Utf8(value.toString());
case UNION:
return deepCopy(
schema.getTypes().get(resolveUnion(schema, value)), value);
default:
throw new AvroRuntimeException(
"Deep copy failed for schema \"" + schema + "\" and value \"" +
value + "\"");
}
}
在另外一个package中的specific下,亦存在类似的接口和类
包括如下几个
接口类
SpecificRecord.java
public interface SpecificRecord extends IndexedRecord {}
抽象子类
public abstract class SpecificRecordBase implements SpecificRecord, Comparable<SpecificRecord>, GenericRecord {}
另外一个为:
public abstract class SpecificFixed extends GenericData.Fixed
抽象Builder基类
abstract public class SpecificRecordBuilderBase<T extends SpecificRecord> extends RecordBuilderBase<T> {}
抽象SpecificExceptionBase基类
public abstract class SpecificExceptionBase extends AvroRemoteException implements SpecificRecord {}
抽象错误Builder基类
abstract public class SpecificErrorBuilderBase<T extends SpecificExceptionBase> extends RecordBuilderBase<T> implements ErrorBuilder<T> {}
对应的Reader和Writer
public class SpecificDatumReader<T> extends GenericDatumReader<T> {}
public class SpecificDatumWriter<T> extends GenericDatumWriter<T> {}
另外
public class SpecificData extends GenericData {}
两个子类接口
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface AvroGenerated {
}
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
public @interface FixedSize {
int value();
}
接下来分析io包
package分析点包括generic,io,ipc,reflect,specific,tool,util
1:package generic
GenericContainer
public interface GenericContainer {
/** The schema of this instance. */
Schema getSchema();
}
子类接口GenericArray
public interface GenericArray<T> extends List<T>, GenericContainer {
T peek();
void reverse();
}
GenericFixed
public interface GenericFixed extends GenericContainer {
/** Return the data. */
byte[] bytes();
}
GenericEnumSymbol
public interface GenericEnumSymbol
extends GenericContainer, Comparable<GenericEnumSymbol> {
/** Return the symbol. */
String toString();
}
另外一个接口
IndexedRecord
public interface IndexedRecord extends GenericContainer {
void put(int i, Object v);
Object get(int i);
}
GenericRecord
public interface GenericRecord extends IndexedRecord {
void put(String key, Object v);
Object get(String key);
}
看一下GenericRecordBuilder该Builder类
public class GenericRecordBuilder extends RecordBuilderBase<Record> {}
核心方法
@Override
public Record build() {
Record record;
try {
record = new GenericData.Record(schema());
} catch (Exception e) {
throw new AvroRuntimeException(e);
}
for (Field field : fields()) {
Object value;
try {
value = getWithDefault(field);
} catch(IOException e) {
throw new AvroRuntimeException(e);
}
if (value != null) {
record.put(field.pos(), value);
}
}
return record;
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + ((record == null) ? 0 : record.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (getClass() != obj.getClass())
return false;
GenericRecordBuilder other = (GenericRecordBuilder) obj;
if (record == null) {
if (other.record != null)
return false;
} else if (!record.equals(other.record))
return false;
return true;
}
另外两个类Reader和Writer
public class GenericDatumReader<D> implements DatumReader<D> {}
public class GenericDatumWriter<D> implements DatumWriter<D> {}
关注的方法如下
protected Object read(Object old, Schema expected,
ResolvingDecoder in) throws IOException {
switch (expected.getType()) {
case RECORD: return readRecord(old, expected, in);
case ENUM: return readEnum(expected, in);
case ARRAY: return readArray(old, expected, in);
case MAP: return readMap(old, expected, in);
case UNION: return read(old, expected.getTypes().get(in.readIndex()), in);
case FIXED: return readFixed(old, expected, in);
case STRING: return readString(old, expected, in);
case BYTES: return readBytes(old, expected, in);
case INT: return readInt(old, expected, in);
case LONG: return in.readLong();
case FLOAT: return in.readFloat();
case DOUBLE: return in.readDouble();
case BOOLEAN: return in.readBoolean();
case NULL: in.readNull(); return null;
default: throw new AvroRuntimeException("Unknown type: " + expected);
}
}
另外一个方法
protected void write(Schema schema, Object datum, Encoder out)
throws IOException {
try {
switch (schema.getType()) {
case RECORD: writeRecord(schema, datum, out); break;
case ENUM: writeEnum(schema, datum, out); break;
case ARRAY: writeArray(schema, datum, out); break;
case MAP: writeMap(schema, datum, out); break;
case UNION:
int index = resolveUnion(schema, datum);
out.writeIndex(index);
write(schema.getTypes().get(index), datum, out);
break;
case FIXED: writeFixed(schema, datum, out); break;
case STRING: writeString(schema, datum, out); break;
case BYTES: writeBytes(datum, out); break;
case INT: out.writeInt(((Number)datum).intValue()); break;
case LONG: out.writeLong((Long)datum); break;
case FLOAT: out.writeFloat((Float)datum); break;
case DOUBLE: out.writeDouble((Double)datum); break;
case BOOLEAN: out.writeBoolean((Boolean)datum); break;
case NULL: out.writeNull(); break;
default: error(schema,datum);
}
} catch (NullPointerException e) {
throw npe(e, " of "+schema.getFullName());
}
}
另外一个类为GenericData.java
关注方法
public GenericData(ClassLoader classLoader) {
this.classLoader = (classLoader != null)
? classLoader
: getClass().getClassLoader();
}
一个深度复制
@SuppressWarnings({ "rawtypes", "unchecked" })
public <T> T deepCopy(Schema schema, T value) {
if (value == null) {
return null;
}
switch (schema.getType()) {
case ARRAY:
List<Object> arrayValue = (List) value;
List<Object> arrayCopy = new GenericData.Array<Object>(
arrayValue.size(), schema);
for (Object obj : arrayValue) {
arrayCopy.add(deepCopy(schema.getElementType(), obj));
}
return (T)arrayCopy;
case BOOLEAN:
return (T)new Boolean((Boolean)value);
case BYTES:
ByteBuffer byteBufferValue = (ByteBuffer) value;
int start = byteBufferValue.position();
int length = byteBufferValue.limit() - start;
byte[] bytesCopy = new byte[length];
byteBufferValue.get(bytesCopy, 0, length);
byteBufferValue.position(start);
return (T)ByteBuffer.wrap(bytesCopy, 0, length);
case DOUBLE:
return (T)new Double((Double)value);
case ENUM:
// Enums are immutable; shallow copy will suffice
return value;
case FIXED:
return (T)createFixed(null, ((GenericFixed) value).bytes(), schema);
case FLOAT:
return (T)new Float((Float)value);
case INT:
return (T)new Integer((Integer)value);
case LONG:
return (T)new Long((Long)value);
case MAP:
Map<CharSequence, Object> mapValue = (Map) value;
Map<CharSequence, Object> mapCopy =
new HashMap<CharSequence, Object>(mapValue.size());
for (Map.Entry<CharSequence, Object> entry : mapValue.entrySet()) {
mapCopy.put((CharSequence)(deepCopy(STRINGS, entry.getKey())),
deepCopy(schema.getValueType(), entry.getValue()));
}
return (T)mapCopy;
case NULL:
return null;
case RECORD:
Object oldState = getRecordState(value, schema);
Object newRecord = newRecord(null, schema);
Object newState = getRecordState(newRecord, schema);
for (Field f : schema.getFields()) {
int pos = f.pos();
String name = f.name();
Object newValue = deepCopy(f.schema(),
getField(value, name, pos, oldState));
setField(newRecord, name, pos, newValue, newState);
}
return (T)newRecord;
case STRING:
// Strings are immutable
if (value instanceof String) {
return (T)value;
}
// Some CharSequence subclasses are mutable, so we still need to make
// a copy
else if (value instanceof Utf8) {
// Utf8 copy constructor is more efficient than converting
// to string and then back to Utf8
return (T)new Utf8((Utf8)value);
}
return (T)new Utf8(value.toString());
case UNION:
return deepCopy(
schema.getTypes().get(resolveUnion(schema, value)), value);
default:
throw new AvroRuntimeException(
"Deep copy failed for schema \"" + schema + "\" and value \"" +
value + "\"");
}
}
在另外一个package中的specific下,亦存在类似的接口和类
包括如下几个
接口类
SpecificRecord.java
public interface SpecificRecord extends IndexedRecord {}
抽象子类
public abstract class SpecificRecordBase implements SpecificRecord, Comparable<SpecificRecord>, GenericRecord {}
另外一个为:
public abstract class SpecificFixed extends GenericData.Fixed
抽象Builder基类
abstract public class SpecificRecordBuilderBase<T extends SpecificRecord> extends RecordBuilderBase<T> {}
抽象SpecificExceptionBase基类
public abstract class SpecificExceptionBase extends AvroRemoteException implements SpecificRecord {}
抽象错误Builder基类
abstract public class SpecificErrorBuilderBase<T extends SpecificExceptionBase> extends RecordBuilderBase<T> implements ErrorBuilder<T> {}
对应的Reader和Writer
public class SpecificDatumReader<T> extends GenericDatumReader<T> {}
public class SpecificDatumWriter<T> extends GenericDatumWriter<T> {}
另外
public class SpecificData extends GenericData {}
两个子类接口
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface AvroGenerated {
}
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
public @interface FixedSize {
int value();
}
接下来分析io包
相关文章推荐
- 大数据核心技术源码分析之-Avro篇
- 大数据核心技术源码分析之-Avro篇-2
- java核心基础--jdk源码分析学习--基本数据类型
- 大数据分析你不能不懂的6个核心技术
- web数据采集核心技术分享系列(三)如何破解验证码?图像分析?特征匹配?人工智能?第三方集成?...哪个最强大?
- mosquitto 源码分析 (一)核心数据结构
- Android项目-智慧北京:02(三种技术设计主页面及源码分析点击事件传递的机制及Json数据传递的使用)
- web数据采集核心技术分享系列(三)如何破解验证码?图像分析?特征匹配?人工智能?第三方集成?...哪个最强大?
- 数据分析——以斗鱼为实例解析requests库与scrapy框架爬虫技术
- Kafka源码深度解析-序列12 -Server核心组件之2-ReplicaManager核心数据结构与Replica同步原理
- ”微软开放技术、聚合数据、阿里 JStorm、开源社 – 大数据实时分析编程黑客松“圆满收官
- Android 源码分析Application的生命周期及共享数据详解
- IDS中的数据分析技术简述
- IBM 技术文档:Spark, 快速数据分析的又一选择
- CI框架源码完全分析之核心文件(模型)Model.php
- Launcher3源码分析 — 数据加载过程
- Android系统原理与源码分析(1):利用Java反射技术阻止通过按钮关闭对话框
- 数据挖掘 多维分析技术理论基础知识
- Android系统原理与源码分析(1):利用Java反射技术阻止通过按钮关闭对话框