Learning Java 8 Streams

I’ve never had a good opportunity to use Java 8 Streams since traditional iteration approaches always seemed to be appropriate for what I needed. Working on my Sudoku related projects (in particular an implementation of a Human Solver) I needed an approach to count occurrences of numbers in Lists and also find where pairs existed in Lists.

To get started, I started with a number of simpler examples to gradually work up to what I was looking for. I’ve collected a number of my examples in this project here.

Kafka Streams error: “PolicyViolationException: Topic replication factor must be 3”

I’m creating a Streams app to consume a Topic and do a count with results in a KTable, and I’ve got this error:

2020-05-03 15:23:26,373 [streamsapp1-e7955018-aca6-43ce-8967-abd6c6d238d9-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.InternalTopicManager [] - stream-thread [main] Unexpected error during topic creation for streamsapp1-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog.
Error message was: org.apache.kafka.common.errors.PolicyViolationException: Topic replication factor must be 3
2020-05-03 15:23:26,374 [streamsapp1-e7955018-aca6-43ce-8967-abd6c6d238d9-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread [] - stream-thread [streamsapp1-e7955018-aca6-43ce-8967-abd6c6d238d9-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.streams.errors.StreamsException: Could not create topic streamsapp1-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog.

I think the clue here is that the app is trying to create an intermediate topic, streamsapp1-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog, and is failing because the Replication Factor for the intermediate topic does not match the rf of the source topic. This is described in the Streams docs for option config options here.

Adding config property replication.factor=3 to match the rf of the source topic fixes the issue.