java – How to publish mqtt message to kafka in spring
Develop mqtt connector for Kafka using spring.
Using the mqtt library provided by spring, messages are collected as follows.
message handler
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
if(topic.equals("myTopic")) {
System.out.println("Mqtt data pub");
}
System.out.println(message.getPayload());
if(topic==null) {
topic = "mqttdata";
}
String tag = "test/vib";
String name = null;
if(name==null) {
name = KafkaMessageService.MQTT_PRODUCER;
}
HashMap<String, Object> datalist = new HashMap<String, Object>();
try {
datalist =convertJSONstringToMap(message.getPayload().toString());
System.out.println(datalist.get("mac"));
counts = kafkaMessageService.publish(topic, name, tag, (HashMap<String,Object>[] datalist);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
public static HashMap<String,Object> convertJSONstringToMap(String json) throws Exception {
ObjectMapper mapper = new ObjectMapper();
HashMap<String, Object> map = new HashMap<String, Object>();
map = mapper.readValue(json, new TypeReference<HashMap<String, Object>>() {});
return map;
}
publish method
public int publish(String topic,String producerName,String tag,HashMap<String,Object>[] datalist) throws NotMatchedProducerException,KafkaPubFailureException{
KafkaProducerAdaptor adaptor = searchProducerAdaptor(producerName);
if(adaptor==null) {
throw new NotMatchedProducerException();
}
KafkaTemplate<String,Object> kafkaTemplate = adaptor.getKafkaTemplate();
LocalDateTime currentDateTime = LocalDateTime.now();
String receivedTime = currentDateTime.toString();
ObjectMapper objectMapper = new ObjectMapper();
String key = adaptor.getName();
int counts = 0;
for(HashMap<String,Object> data : datalist) {
Map<String,Object> messagePacket = new HashMap<String,Object>();
messagePacket.put("tag", tag);
messagePacket.put("data", data);
messagePacket.put("receivedtime", receivedTime);
try {
kafkaTemplate.send(topic,key,objectMapper.valueToTree(messagePacket)).get();
logger.info("Sent message : topic=["+topic+"],key=["+key+"] value=["+messagePacket+"]");
} catch(Exception e) {
logger.info("Unable to send message : topic=["+topic+"],key=["+key+"] message=["+messagePacket+"] / due to : "+e.getMessage());
throw new KafkaPubFailureException(e);
}
counts++;
}
return counts;
}
I don’t know how to declare a hashmap <String, object> [] as an instance and how to use it.
The above source was taken from spring support as it is, and some modifications were made.
Read more here: Source link