2

I am getting the following error in my spark application when it is trying to serialize a protobuf field which is a map of key String and value float. Kryo serialization is being used in the spark app.

Caused by: java.lang.NullPointerException
    at com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
    at java.util.AbstractList.add(AbstractList.java:108)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    ... 71 more

Has anyone faced this issue before? Is there a way to resolve it?

Pooja Mazumdar
  • 223
  • 2
  • 14
  • Where do you use spark from and what you're trying to run? – Mikhail Berlinkov Nov 01 '18 at 20:57
  • I am using spark streaming to be specific v1.6.1. The app takes input stream from Kafka which is sent by another system that writes protobuf objects in Kafka which we read in our spark streaming app. We started seeing this issue when a new "map" field was added in the object. So, I am guessing the issue is arising from there? – Pooja Mazumdar Nov 01 '18 at 21:46
  • I see a similar thread with the same exception but there are no solutions - https://stackoverflow.com/questions/38535325/error-using-sparks-kryo-serializer-with-java-protocol-buffers-that-have-arrays – Pooja Mazumdar Nov 01 '18 at 21:48
  • I experienced similar issues while using pyspark and believe the cause is the same. Usually, it is caused by passing objects to spark which cannot be serialized properly (like weakrefs, etc.). As you mention that it happened when you added this map field, this is very likely the cause. What does this map field represent? – Mikhail Berlinkov Nov 01 '18 at 21:53
  • I encountered the same problem and the answer here works for me. https://stackoverflow.com/questions/36144618/spark-kryo-register-a-custom-serializer – K_Augus Nov 14 '19 at 02:53

4 Answers4

1

You can register ProtobufSerializer with kryo to serialize protobufs

  • first: include the dep:
"de.javakaffee" % "kryo-serializers" % "0.43" // in sbt
  • second: extend the kryo serializer
package com.my.serializer

class ExtendedKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    kryo.register(classOf[YourProtoMessageClass], new ProtobufSerializer())
  }
}
  • third: set the spark conf with ExtendedKryoRegistrator
val conf = new SparkConf().setAppName("appName")

conf.set("spark.kryo.registrator", "com.my.serializer.ExtendedKryoRegistrator")

        
val spark = SparkSession.builder()
  .config(conf)
  .enableHiveSupport()
  .getOrCreate()
Eric
  • 271
  • 3
  • 5
0

You have to register ProtobufSerializer with kryo to serialize protobufs.

StreamExecutionEnvironment.getExecutionEnvironment()
                          .registerTypeWithKryoSerializer(YourProtobufClass.class, 
                                                          ProtobufSerializer.class); 

add below dependency to access ProtobufSerializer class.

<dependency>
    <groupId>de.javakaffee</groupId>
    <artifactId>kryo-serializers</artifactId>
    <version>0.45</version>
</dependency>
marcolz
  • 2,880
  • 2
  • 23
  • 28
0

When Kryo encounters an object of a class it doesn't recognize, it falls back to Java serialization.

But it's possible to set Kryo to throw a exception instead of this:

final Kryo kryo = new Kryo();
kryo.setRegistrationRequired(true);

I've decided to keep the registration above because it helps avoiding slow serialization for some classes that could impact performacne negatively.

For tackling Protobuf generated classes serialization I used the following class:

package com.juarezr.serialization;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.protobuf.AbstractMessage;

import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

public class ProtobufSerializer<T extends AbstractMessage> extends Serializer<T> implements Serializable {
    
    static final long serialVersionUID = 1667386898559074449L;
    protected final Method parser;

    public ProtobufSerializer(final Class<T> protoMessageClass) {
        try {
            this.parser = protoMessageClass.getDeclaredMethod("parseFrom", byte[].class);
            this.parser.setAccessible(true);
        } catch (SecurityException | NoSuchMethodException ex) {
            throw new IllegalArgumentException(protoMessageClass.toString() + " doesn't have a protobuf parser", ex);
        }
    }

    @Override
    public void write(final Kryo kryo, final Output output, final T protobufMessage) {
        if (protobufMessage == null) {
            output.writeByte(Kryo.NULL);
            output.flush();
            return;
        }
        final byte[] bytes = protobufMessage.toByteArray();
        output.writeInt(bytes.length + 1, true);
        output.writeBytes(bytes);
        output.flush();
    }

    @SuppressWarnings({"unchecked", "JavaReflectionInvocation"})
    @Override
    public T read(final Kryo kryo, final Input input, final Class<T> protoMessageClass) {
        final int length = input.readInt(true);
        if (length == Kryo.NULL) {
            return null;
        }
        final Object bytesRead = input.readBytes(length - 1);
        try {
            final Object parsed = this.parser.invoke(protoMessageClass, bytesRead);
            return (T) parsed;
        } catch (IllegalAccessException | InvocationTargetException e) {
            throw new RuntimeException("Unable to deserialize protobuf for class: " + protoMessageClass.getName(), e);
        }
    }

    @Override
    public boolean getAcceptsNull() {
        return true;
    }

    @SuppressWarnings("unchecked")
    public static <M extends AbstractMessage> void registerMessagesFrom(final M rootMessage, final Kryo kryo) {

        final Class<M> messageClass = (Class<M>) rootMessage.getClass();
        final ProtobufSerializer<M> serializer = new ProtobufSerializer<>(messageClass);
        kryo.register(messageClass, serializer);

        final Class<?>[] nestedClasses = messageClass.getDeclaredClasses();
        for (final Class<?> innerClass : nestedClasses) {
            if ((AbstractMessage.class).isAssignableFrom(innerClass)) {
                final Class<M> typedClass = (Class<M>) innerClass;
                final ProtobufSerializer<M> serializer2 = new ProtobufSerializer<>(typedClass);
                kryo.register(typedClass, serializer2);
            }
        }
    }
}

You can configure the serialization with something like:

// ...
final Kryo kryo = new Kryo();
kryo.setRegistrationRequired(true);

// Add a registration for each generated file and top level class ...
ProtobufSerializer.registerMessagesFrom(MyProtoEnclosingClass.MyProtoTopLevelClass.getDefaultInstance(), kryo);

// Add a registration for each other Java/Scala class you would need...
Juarez Rudsatz
  • 369
  • 3
  • 7
0

Set this in the config, then the error fixed.

spark.serializer=org.apache.spark.serializer.JavaSerializer
jcyan
  • 51
  • 7