Using Avro Serializer with Kafka Consumers and Producers

Some of the Avro Serializer/Deserializer and Schema Registry classes are not available in jars from the usual maven-central repo. Confluent manage their own repository which you can add to your pom.xml with:

<repositories>
    <!-- For io.confluent Jars not in maven central -->
    <repository>
      <id>confluent</id>
      <url>http://packages.confluent.io/maven/</url>
  </repository>
</repositories>

And then you can add dependency:

<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-avro-serializer</artifactId>
  <version>5.4.1</version>
</dependency>

This dependency will allow you to use the AvroSerializer in your properties:

value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

To generate Avro Specific classes from an .avsc file following the Avro developer guide here, add the Avro dependency and generator plugin:

<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.9.2</version>
</dependency>

and the plugin:

<plugin>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-maven-plugin</artifactId>
  <version>1.9.2</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals> <goal>schema</goal> </goals>
      <configuration>.     <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
      <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
      </configuration>
    </execution>
  </executions>
</plugin>

The plugin configuration is looking for .avsc schema files in the /srv/main/avro folder. An example schema file looks like this:

{
  "namespace": "kh.kafkaexamples.avro",
  "type": "record",
  "name": "TestMessage",
  "fields": [
    {"name": "firstName", "type": "string"},
    {"name": "lastName", "type": "string"}
  ]
}

The plugin will generate the Avro class for any .avsc file it finds in the configured folder.

To use Avro messages with Confluent Platform (or Confluent Cloud), you also need to specify a url to the Schema Registry, otherwise you’ll see this error:

Caused by: io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:251)

You also need to prefix the url with http/https, otherwise you’ll see this exception:

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.MalformedURLException: unknown protocol: localhost

Assuming you’re running Confluent Platform locally, the Schema Registry property is:

schema.registry.url=http://localhost:8081

To publish a message using the generated TestMessage class from the above schema:

Producer producer = new KafkaProducer<>(props);
TestMessage message = new TestMessage();
message.setFirstName("firstname");
message.setLastName("lastname");
producer.send(new ProducerRecord("test-avro-topic", "1", message));
producer.flush();
producer.close();

Done!

Using confluent cli to start/stop a single node Kafka cluster

Install steps for Confluent Platform are here.

Using confluent cli:

confluent local status
$ confluent local start
The local commands are intended for a single-node development environment
only, NOT for production usage. https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /tmp/confluent.9Uym9FYU
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]
confluent local stop