apache kafka – KafkaStreams metrics, records processed per stream/extracting offset for each stream in topology

I’m using org.apache.kafka.streams.KafkaStreams and for example my topology looks this:

    StreamsBuilder builder = new StreamsBuilder();

    builder.stream("input-topic1")
            .mapValues((readOnlyKey, value) -> value.toUpperCase())
            .to("output-topic1");

    builder.stream("input-topic2")
            .mapValues((readOnlyKey, value) -> value.toUpperCase())
            .to("output-topic2");

Every two minutes by default KafkaStream logs:

Processed 14 total records, ran 0 punctuators, and committed 11 total tasks since the last update"}

I would like to have better overview if I have any incoming messages into each input topic. I want to see some more metrics out of each stream instead of total number records processed. Is it possible to give a name for a stream and then extract for example offset for builder.stream("input-topic1") and builder.stream("input-topic2") seperately? Or to have possibility to know how many records each stream processed in some kind of time frame.

  • Maybe i could use .peek and have some kind of static variable out of the stream, but i consider such approach very bad practice.
  • I also looked into KafkaStreams metrics, but i havent found such functionality im looking for.
  • Logging each time my streams processes one message is not an option too, because it would generate too many logs.

Read more here: Source link