3

I am trying to use Kryo Serializer in spark streaming. I read in Spark tuning docs that -

Finally, if you don’t register your custom classes, Kryo will still work, but it will have to store the full class name with each object, which is wasteful.

So i am trying to register all classes. My case classes are -

trait Message extends java.io.Serializable

object MutableTypes {
  type Childs = scala.collection.mutable.Map[Int, (Long, Boolean)]
  type Parents = scala.collection.mutable.Map[Int, Childs]
}

case class IncomingRecord(id_1: String, id_raw: String, parents_to_add: MutableTypes.Parents, parents_to_delete: MutableTypes.Parents) extends Message

And i am registering class like this -

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrationRequired","true")
sparkConf.registerKryoClasses(Array(classOf[Tuple2[Long,Boolean]],classOf[IncomingRecord]))

I got this exception:

com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Class is not registered: scala.Tuple2$mcJZ$sp Note: To register this class use: kryo.register(scala.Tuple2$mcJZ$sp.class); Serialization trace: parents_to_add (com.test.IncomingRecord) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:194) at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:147) at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:185) at org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:659) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

How can I register my class? How to solve this?

Update:

I know turning registeration false will remove exception but that will not add that much performance due to extra overhead. I want to know how can I register my class.

Nishant Kumar
  • 2,199
  • 2
  • 22
  • 43

2 Answers2

1

Finally, if you don’t register your custom classes, Kryo will still work, but it will have to store the full class name with each object, which is wasteful. This is true only when using default value for spark.kryo.registrationRequired" (which is false)

The following should solve the exception issue (or avoiding setting any value for this parameter and using the default value which is false)

.set("spark.kryo.registrationRequired","false")

More info can be found here: http://spark.apache.org/docs/latest/configuration.html

spark.kryo.registrationRequired false (default value) Whether to require registration with Kryo. If set to 'true', Kryo will throw an exception if an unregistered class is serialized. If set to false (the default), Kryo will write unregistered class names along with each object. Writing class names can cause significant performance overhead, so enabling this option can enforce strictly that a user has not omitted classes from registration.

Some points - how to register kryo serialization:

Community
  • 1
  • 1
Yaron
  • 10,166
  • 9
  • 45
  • 65
  • I want to improve performance so wanted to register all my classes. – Nishant Kumar Jan 01 '17 at 08:49
  • @NishantKumar - setting the flag to "false" will remove the exception, I've added a few links which might help with kyro serialization. – Yaron Jan 01 '17 at 09:01
  • I have added the quote from Spark streaming website in my post. I know that false will remove the exception but that will be wasteful according to official document and doesnot improve performance because of overhead. I want to know how can I register all my class. – Nishant Kumar Jan 01 '17 at 15:25
0

I provide a method in another stackoverflow answer to get all the class names which are required to be registered quickly.

see: https://stackoverflow.com/a/55644422/5981256

Russell Bie
  • 341
  • 2
  • 11