5

I'm building an integration test for our kafka system using the Spring Embedded Kafka Broker, with a MockSchemaRegistryClient. I am building a test for one of our Stream topologies, built using the Streams API (KStreamBuilder). This particular topology has a KStream (stream1) feeding into a KTable (table1).

I am encountering an error when I feed input into stream1, originating from table1's KTableProcessor:

Exception in thread "mortgage-kafka-consumers-it-c1dd9185-ce16-415c-ad82-293c1281c897-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=streaming.mortgage.application_party, partition=0, offset=0
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:202)
    at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:342)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:334)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:624)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 6
**Caused by: java.io.IOException: Cannot get schema from schema registry!**
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:106)
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getBySubjectAndID(MockSchemaRegistryClient.java:149)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:54)
    at com.sofi.kafka.serialization.AvroDeserializer.deserialize(AvroDeserializer.java:35)
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:151)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:135)
    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:62)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:45)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:131)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:188)
    at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:342)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:334)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:624)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)

The KTableProcessor is attempting to deserialize an entry from the RocksDB state store, however the schema does not exist in the mock schema registry. The topic whose schema is being requested is: **appname-KTABLE-SOURCE-STATE-STORE-0000000000-changelog**

As the exception states, the schema has not been registered. However, the topic **appname-KTABLE-SOURCE-STATE-STORE-0000000000-changelog-key** does have a registered schema (registered when the entry's key is serialized for the query). 

Since this is an internal topic, I don't expect to have to register this schema myself, however I'm failing because of the schema's absence in the registry. Is there a way to have changelog schemas registered prior to data ingestion? Is there a way to disable state store changelogging with the KStreamBuilder?

Thanks in advance!

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Freestyle076
  • 1,548
  • 19
  • 36

1 Answers1

3

Solved the problem, I shall now sheepishly recount: When using a KTable (via the Streams API) with an embedded kafka broker, you'll want to configure the KafkaStreams object with a State Store directory unique to each run of the embedded kafka broker (in my case, each run of the test).

You control the State Store directory via the StreamsConfig.STATE_DIR_CONFIG configuration. I made it unique by appending a timestamp to the default state store directory

properties.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kraken-streams/" + LocalDateTime.now().toString());

The problem was an old state store existed in the same location each time the embedded kafka broker was initialized. When the very first record was entered into the KTable's topic, the state store was able to return a previous value. This resulted in an attempt to deserialize a state store record that had not yet (in terms of the schema-registry instance) been serialized. Schemas are only registered on serialization, so the attempt to deserialize failed due to a missing registered schema.

Freestyle076
  • 1,548
  • 19
  • 36
  • I did how you suggested, however, I still get the same exception. I checked my STATE_DIR_CONFIG, it works dynamically (with timestamp). I even delete all the previous states but it didn't help me. – Dave Apr 14 '20 at 14:36
  • This doesn't work on windows... `/tmp` doesn't exist there https://stackoverflow.com/questions/617414/how-to-create-a-temporary-directory-folder-in-java – OneCricketeer May 06 '20 at 20:17