java – Apache Kafka Confluent Stream mutiple source topic and join master topic then produce to final topic
I have trouble when create kstream from topic source A and B. topic A and B is different schema.
so i build different stream in one project.
here snip my code :
public KStream<String, SchemaA> KstreamA(StreamsBuilder builder) {
KStream<String, SchemaA> kstreamA = builder.stream(sourceTopicA, Consumed.with(Serdes.String(), serdeA))
.selectKey("specific-key");
}
KStream<String, AccountStatement> schemaCompleteKstream=
KstreamA.join(ktableMaster, (k, v) -> k, new SchemaCompleteA());
schemaCompleteKstream
.map(new KeyValueMapper<String, SchemaA, KeyValue<? extends String, ? extends SchemaA>>() {
@Override
public KeyValue<? extends String, ? extends SchemaA> apply(String s, SchemaA SchemaCompleteA) {
return new KeyValue<>(null, SchemaCompleteA);
}
}).to(destTopicSchemaA, Produced.with(null, schemaASerdeComplete));
then i build code for schema B like that , then i get eror like this
Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic ktableMaster has already been registered by another source.
anyone can help with my issue? I know i can’t use one topology in join same topic but i got stuck when research any particular other way to solve this.
many thanks.
Read more here: Source link