Kafka Streams error: “PolicyViolationException: Topic replication factor must be 3”

I’m creating a Streams app to consume a Topic and do a count with results in a KTable, and I’ve got this error:

2020-05-03 15:23:26,373 [streamsapp1-e7955018-aca6-43ce-8967-abd6c6d238d9-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.InternalTopicManager [] - stream-thread [main] Unexpected error during topic creation for streamsapp1-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog.
Error message was: org.apache.kafka.common.errors.PolicyViolationException: Topic replication factor must be 3
2020-05-03 15:23:26,374 [streamsapp1-e7955018-aca6-43ce-8967-abd6c6d238d9-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread [] - stream-thread [streamsapp1-e7955018-aca6-43ce-8967-abd6c6d238d9-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.streams.errors.StreamsException: Could not create topic streamsapp1-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog.

I think the clue here is that the app is trying to create an intermediate topic, streamsapp1-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog, and is failing because the Replication Factor for the intermediate topic does not match the rf of the source topic. This is described in the Streams docs for option config options here.

Adding config property replication.factor=3 to match the rf of the source topic fixes the issue.

Using Avro Serializer with Kafka Consumers and Producers

Some of the Avro Serializer/Deserializer and Schema Registry classes are not available in jars from the usual maven-central repo. Confluent manage their own repository which you can add to your pom.xml with:

<repositories>
    <!-- For io.confluent Jars not in maven central -->
    <repository>
      <id>confluent</id>
      <url>http://packages.confluent.io/maven/</url>
  </repository>
</repositories>

And then you can add dependency:

<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-avro-serializer</artifactId>
  <version>5.4.1</version>
</dependency>

This dependency will allow you to use the AvroSerializer in your properties:

value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

To generate Avro Specific classes from an .avsc file following the Avro developer guide here, add the Avro dependency and generator plugin:

<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.9.2</version>
</dependency>

and the plugin:

<plugin>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-maven-plugin</artifactId>
  <version>1.9.2</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals> <goal>schema</goal> </goals>
      <configuration>.     <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
      <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
      </configuration>
    </execution>
  </executions>
</plugin>

The plugin configuration is looking for .avsc schema files in the /srv/main/avro folder. An example schema file looks like this:

{
  "namespace": "kh.kafkaexamples.avro",
  "type": "record",
  "name": "TestMessage",
  "fields": [
    {"name": "firstName", "type": "string"},
    {"name": "lastName", "type": "string"}
  ]
}

The plugin will generate the Avro class for any .avsc file it finds in the configured folder.

To use Avro messages with Confluent Platform (or Confluent Cloud), you also need to specify a url to the Schema Registry, otherwise you’ll see this error:

Caused by: io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:251)

You also need to prefix the url with http/https, otherwise you’ll see this exception:

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.MalformedURLException: unknown protocol: localhost

Assuming you’re running Confluent Platform locally, the Schema Registry property is:

schema.registry.url=http://localhost:8081

To publish a message using the generated TestMessage class from the above schema:

Producer producer = new KafkaProducer<>(props);
TestMessage message = new TestMessage();
message.setFirstName("firstname");
message.setLastName("lastname");
producer.send(new ProducerRecord("test-avro-topic", "1", message));
producer.flush();
producer.close();

Done!

Using confluent cli to start/stop a single node Kafka cluster

Install steps for Confluent Platform are here.

Using confluent cli:

confluent local status
$ confluent local start
The local commands are intended for a single-node development environment
only, NOT for production usage. https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /tmp/confluent.9Uym9FYU
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]
confluent local stop

Kafka producer: ip addresses and host names

Scenario:

  • Kafka client 2.4.0
  • Java 1.8.0_151
  • Kafka cluster is running on a machine with hostname ‘ubuntu-confluent’
  • Producer has bootstrap.servers=10.0.0.x (ip of same host as ubuntu-confluent)
  • At runtime, it appears hosthame is passed back to client
  • Subsequent network calls from client back to cluster appear to use hostname instead of ip, and it fails

Exception on client:

2020-03-08 21:26:06,144 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient [] - [Producer clientId=producer-1] Error connecting to node ubuntu-confluent:9092 (id: 0 rack: null)
java.net.UnknownHostException: ubuntu-confluent: nodename nor servname provided, or not known
at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) ~[?:1.8.0_151]
at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928) ~[?:1.8.0_151]
at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323) ~[?:1.8.0_151]
at java.net.InetAddress.getAllByName0(InetAddress.java:1276) ~[?:1.8.0_151]
at java.net.InetAddress.getAllByName(InetAddress.java:1192) ~[?:1.8.0_151]
at java.net.InetAddress.getAllByName(InetAddress.java:1126) ~[?:1.8.0_151]
at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104) ~[kafka-clients-2.4.0.jar:?]

Easiest fix is to just add an entry to /etc/hosts.