Spark 中的序列化
2016-05-18 10:34
225 查看
1.序列化常用于网络传输和数据持久化以便于存储和传输,Spark通过两种方式来创建序列化器
val serializer = instantiateClassFromConf[Serializer]( "spark.serializer", "org.apache.spark.serializer.JavaSerializer") logDebug(s"Using serializer: ${serializer.getClass}") //暂时在blockManager中没有用到该序列化方式 val closureSerializer = instantiateClassFromConf[Serializer]( "spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")
2.Spark中两种典型的序列化场景
序列化场景A:执行map等RDD操作时,首先执行cleanF,内部左F解析和F序列化
private def ensureSerializable(func: AnyRef) { try { if (SparkEnv.get != null) { SparkEnv.get.closureSerializer.newInstance().serialize(func) }
valclosureSerializer = instantiateClassFromConf[Serializer]( "spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")结论:
spark.closure.serializer配置决定了函数序列化的方式
序列化场景B:blockManager中
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster, serializer, conf, memoryManager, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores)
val serializer = instantiateClassFromConf[Serializer]( "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
结论:spark.serializer决定了BlockManager工作中序列化的方式
3.Spark默认采用Java的序列化器,本人建议采用Kryo序列化提高性能,下面分析下这两种序列化机制的异同
A.首先看看java的序列化机制
override def serialize[T: ClassTag](t: T): ByteBuffer = { val bos = new ByteArrayOutputStream() val out = serializeStream(bos) out.writeObject(t) out.close() ByteBuffer.wrap(bos.toByteArray) }
B.再看看Kyro的序列化机制
//创建两个延迟执行的工作流(Kyro输入输出流)
private lazy val output = ks.newKryoOutput() private lazy val input = new KryoInput() override def serialize[T: ClassTag](t: T): ByteBuffer = {
//设置position=0 && total=0 output.clear()
//获取Kryo对象
val kryo = borrowKryo()
try {
//开始对t函数进行序列化 kryo.writeClassAndObject(output, t) } catch { .... } finally {
//将当前Kryo作为缓存 releaseKryo(kryo) } ByteBuffer.wrap(output.toBytes) }
===================================Kryo分析华丽丽的分界线============================================
A:获取Kryo对象并初始化KryoSerializer的过程
首先查看是否有缓存的Kryo对象,如果有就rest清除掉kryo的状态,将缓存设置为null后返回
如果缓存中没有就创建一个新的Kryo,新建Kryo是一个繁琐的过程,包括新建Kryo对象,
public Kryo () {
this(new DefaultClassResolver(), new MapReferenceResolver()); }
public Kryo (ClassResolver classResolver, ReferenceResolver referenceResolver) { if (classResolver == null) throw new IllegalArgumentException("classResolver cannot be null."); this.classResolver = classResolver; classResolver.setKryo(this); this.referenceResolver = referenceResolver; if (referenceResolver != null) { referenceResolver.setKryo(this); references = true; } addDefaultSerializer(byte[].class, ByteArraySerializer.class); addDefaultSerializer(char[].class, CharArraySerializer.class); addDefaultSerializer(short[].class, ShortArraySerializer.class); addDefaultSerializer(int[].class, IntArraySerializer.class); addDefaultSerializer(long[].class, LongArraySerializer.class); addDefaultSerializer(float[].class, FloatArraySerializer.class); addDefaultSerializer(double[].class, DoubleArraySerializer.class); addDefaultSerializer(boolean[].class, BooleanArraySerializer.class); addDefaultSerializer(String[].class, StringArraySerializer.class); addDefaultSerializer(Object[].class, ObjectArraySerializer.class); addDefaultSerializer(BigInteger.class, BigIntegerSerializer.class); addDefaultSerializer(BigDecimal.class, BigDecimalSerializer.class); addDefaultSerializer(Class.class, ClassSerializer.class); addDefaultSerializer(Date.class, DateSerializer.class); addDefaultSerializer(Enum.class, EnumSerializer.class); addDefaultSerializer(EnumSet.class, EnumSetSerializer.class); addDefaultSerializer(Currency.class, CurrencySerializer.class); addDefaultSerializer(StringBuffer.class, StringBufferSerializer.class); addDefaultSerializer(StringBuilder.class, StringBuilderSerializer.class); addDefaultSerializer(Collections.EMPTY_LIST.getClass(), CollectionsEmptyListSerializer.class); addDefaultSerializer(Collections.EMPTY_MAP.getClass(), CollectionsEmptyMapSerializer.class); addDefaultSerializer(Collections.EMPTY_SET.getClass(), CollectionsEmptySetSerializer.class); addDefaultSerializer(Collections.singletonList(null).getClass(), CollectionsSingletonListSerializer.class); addDefaultSerializer(Collections.singletonMap(null, null).getClass(), CollectionsSingletonMapSerializer.class); addDefaultSerializer(Collections.singleton(null).getClass(), CollectionsSingletonSetSerializer.class); addDefaultSerializer(Collection.class, CollectionSerializer.class); addDefaultSerializer(TreeMap.class, TreeMapSerializer.class); addDefaultSerializer(Map.class, MapSerializer.class); addDefaultSerializer(KryoSerializable.class, KryoSerializableSerializer.class); addDefaultSerializer(TimeZone.class, TimeZoneSerializer.class); addDefaultSerializer(Calendar.class, CalendarSerializer.class); lowPriorityDefaultSerializerCount = defaultSerializers.size(); // Primitives and string. Primitive wrappers automatically use the same registration as primitives. register(int.class, new IntSerializer()); register(String.class, new StringSerializer()); register(float.class, new FloatSerializer()); register(boolean.class, new BooleanSerializer()); register(byte.class, new ByteSerializer()); register(char.class, new CharSerializer()); register(short.class, new ShortSerializer()); register(long.class, new LongSerializer()); register(double.class, new DoubleSerializer()); }
注册Spark应用中必须使用的类型,比如
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
kryo.register(classOf[Array[Tuple2[Any, Any]]])
kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas))
结论:register方法有很多种重载,能够给同一个class注册不同的序列化器,序列化器决定了执行序列化(读,写)的方法,register过程实际上是从ObjectMap<Class, Registration>中根据Class获取对应的
Registratio然后更新
Registratio中的Serializer
,若不存在Value则使用class,
Serializer,创建一个新的
Registratio,然后完成注册给IntMap<Registration>和
ObjectMap<Class, Registration>
B:序列化函数F的过程
Registration registration = writeClass(output, object.getClass());
//获取到序列化器后,最终使用 Kryo定制化的Serializer进行序列化
registration.getSerializer().write(this, output, object);writeClass的过程,实际上调用classResolver的writeClass方法,这个classResolver就是实例化Kryo时所用的
DefaultClassResolver,然后调用DefaultClassResolver的
writeClass方法
classResolver.writeClass(output, type);
DefaultClassResolver的writeClass方法
Registration registration = kryo.getRegistration(type);
根据不同的类型来获取Registration,如果原来的
ObjectMap<Class, Registration>中没有
Registration,则会进行类型变幻
if (registration == null) { if (Proxy.isProxyClass(type)) { // If a Proxy class, treat it like an InvocationHandler because the concrete class for a proxy is generated. registration = getRegistration(InvocationHandler.class); } else if (!type.isEnum() && Enum.class.isAssignableFrom(type)) { // This handles an enum value that is an inner class. Eg: enum A {b{}}; registration = getRegistration(type.getEnclosingClass()); } else if (EnumSet.class.isAssignableFrom(type)) { registration = classResolver.getRegistration(EnumSet.class); } if (registration == null) { if (registrationRequired) { throw new IllegalArgumentException("Class is not registered: " + className(type) + "\nNote: To register this class use: kryo.register(" + className(type) + ".class);"); } registration = classResolver.registerImplicit(type); } }
========================Spark 序列化 精华分界线================================================== 通上面的过程,已经弄清楚了spark序列化机制的来龙去脉,问道解惑,神明自得
相关文章推荐
- iOS开发之--- NSURLProtocol
- codevs1516 平均分数--逆序对
- SharePoint 2013网站突然不能登录了。
- iOS开发--应用内购买
- Java_Java Compiler 应用实例
- HDU 1542 —— Atlantis 【矩形面积并:扫描线】
- Cobbler自动化部署最佳实践
- ECMASCRIPT 5新特性
- android实现引导图全屏显示
- javascript学习——闭包
- duplicated with element declared at AndroidManifest.xml:11:9-20:20
- MySQL函数大全 及用法示例
- 【转】Android网络编程之Socket&Http
- J2EE Servlet 学习笔记5
- Canopy聚类算法简介
- NoSQL初探之人人都爱Redis:(1)Redis简介与简单安装
- Spark Context
- Spark env
- VMWare虚拟机提示:锁定文件失败,打不开磁盘的解决办法
- 设计模式之建造者模式