14

I have a class that implements a custom Kryo serializer by implementing the read() and write() methods from com.esotericsoftware.kryo.Serializer (see example below). How can I register this custom serializer with Spark?

Here is a pseudo-code example of what I have:

class A() 

CustomASerializer extends com.esotericsoftware.kryo.Serializer[A]{
    override def write(kryo: Kryo, output: Output, a: A): Unit = ???
    override def read(kryo: Kryo, input: Input, t: Class[A]): A = ???
}

val kryo: Kryo = ... 
kryo.register(classOf[A], new CustomASerializer()); // I can register my serializer

Now in Spark:

val sparkConf = new SparkConf()
sparkConf.registerKryoClasses(Array(classOf[A]))

Unfortunately, Spark doesn't give me the option to register my custom serializer. Any idea if there is a way to do this?

Alejandro Alcalde
  • 5,990
  • 6
  • 39
  • 79
marios
  • 8,874
  • 3
  • 38
  • 62
  • Look at spark.kryo.classesToRegister – Sohaib Mar 22 '16 at 06:41
  • 1
    [This answer](http://stackoverflow.com/questions/32667068/save-spark-dataframe-into-elasticsearch-can-t-handle-type-exception) is not a direct answer to your question but the explanation provided will give you more details about custom serializer registration in spark. – eliasah Mar 22 '16 at 07:28

1 Answers1

23

Create your own KryoRegistrator with this custom serializer registered:

package com.acme

class MyRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[A], new CustomASerializer())
  } 
}

Then, set spark.kryo.registrator to your registrator's fully-qualified name, e.g. com.acme.MyRegistrator:

val conf = new SparkConf()
conf.set("spark.kryo.registrator", "com.acme.KryoRegistrator")
Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
  • 3
    This isn't very clear in the spark documentation, but this absolutely works. If you run into issues with Kryo not being able to serialize a class with a no-arg constructor in Spark (for me it was org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering), then this can solve the issue by using kryo.register(LazilyGeneratedOrdering.class, new JavaSerializer()); Thank you! – jhnclvr Nov 23 '16 at 15:09