Using Avro Serializer with Kafka Consumers and Producers

Some of the Avro Serializer/Deserializer and Schema Registry classes are not available in jars from the usual maven-central repo. Confluent manage their own repository which you can add to your pom.xml with:

<repositories>
    <!-- For io.confluent Jars not in maven central -->
    <repository>
      <id>confluent</id>
      <url>http://packages.confluent.io/maven/</url>
  </repository>
</repositories>

And then you can add dependency:

<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-avro-serializer</artifactId>
  <version>5.4.1</version>
</dependency>

This dependency will allow you to use the AvroSerializer in your properties:

value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

To generate Avro Specific classes from an .avsc file following the Avro developer guide here, add the Avro dependency and generator plugin:

<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.9.2</version>
</dependency>

and the plugin:

<plugin>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-maven-plugin</artifactId>
  <version>1.9.2</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals> <goal>schema</goal> </goals>
      <configuration>.     <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
      <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
      </configuration>
    </execution>
  </executions>
</plugin>

The plugin configuration is looking for .avsc schema files in the /srv/main/avro folder. An example schema file looks like this:

{
  "namespace": "kh.kafkaexamples.avro",
  "type": "record",
  "name": "TestMessage",
  "fields": [
    {"name": "firstName", "type": "string"},
    {"name": "lastName", "type": "string"}
  ]
}

The plugin will generate the Avro class for any .avsc file it finds in the configured folder.

To use Avro messages with Confluent Platform (or Confluent Cloud), you also need to specify a url to the Schema Registry, otherwise you’ll see this error:

Caused by: io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:251)

You also need to prefix the url with http/https, otherwise you’ll see this exception:

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.MalformedURLException: unknown protocol: localhost

Assuming you’re running Confluent Platform locally, the Schema Registry property is:

schema.registry.url=http://localhost:8081

To publish a message using the generated TestMessage class from the above schema:

Producer producer = new KafkaProducer<>(props);
TestMessage message = new TestMessage();
message.setFirstName("firstname");
message.setLastName("lastname");
producer.send(new ProducerRecord("test-avro-topic", "1", message));
producer.flush();
producer.close();

Done!

Leave a Reply

Your email address will not be published.

This site uses Akismet to reduce spam. Learn how your comment data is processed.