java – How to publish mqtt message to kafka in spring

Develop mqtt connector for Kafka using spring.

Using the mqtt library provided by spring, messages are collected as follows.

message handler

@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
                
                if(topic.equals("myTopic")) {
                    System.out.println("Mqtt data pub");    
                }
                System.out.println(message.getPayload());

                if(topic==null) {
                    topic = "mqttdata";
                }
                String tag = "test/vib";
                String name = null;
                if(name==null) {
                    name = KafkaMessageService.MQTT_PRODUCER;
                }
                HashMap<String, Object> datalist = new HashMap<String, Object>();
                        try {
                    datalist =convertJSONstringToMap(message.getPayload().toString());
                    System.out.println(datalist.get("mac"));
                    counts = kafkaMessageService.publish(topic, name, tag, (HashMap<String,Object>[] datalist);
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

    public static HashMap<String,Object> convertJSONstringToMap(String json) throws Exception {
        ObjectMapper mapper = new ObjectMapper();
        HashMap<String, Object> map = new HashMap<String, Object>();
        
        map = mapper.readValue(json, new TypeReference<HashMap<String, Object>>() {});
        
        return map;
}

publish method

    public int publish(String topic,String producerName,String tag,HashMap<String,Object>[] datalist) throws NotMatchedProducerException,KafkaPubFailureException{
        KafkaProducerAdaptor adaptor = searchProducerAdaptor(producerName);
        if(adaptor==null) {
            throw new NotMatchedProducerException();
        }
        
        KafkaTemplate<String,Object> kafkaTemplate = adaptor.getKafkaTemplate();
        
        LocalDateTime currentDateTime = LocalDateTime.now();
        String receivedTime = currentDateTime.toString();
        
        ObjectMapper objectMapper = new ObjectMapper();
        
        String key = adaptor.getName();
        
        int counts = 0;
        for(HashMap<String,Object> data : datalist) {
            Map<String,Object> messagePacket = new HashMap<String,Object>();
            messagePacket.put("tag", tag);
            messagePacket.put("data", data);
            messagePacket.put("receivedtime", receivedTime);
            
            try {
                kafkaTemplate.send(topic,key,objectMapper.valueToTree(messagePacket)).get();
                logger.info("Sent message : topic=["+topic+"],key=["+key+"] value=["+messagePacket+"]");
            } catch(Exception e) {
                logger.info("Unable to send message : topic=["+topic+"],key=["+key+"] message=["+messagePacket+"] / due to : "+e.getMessage());
                throw new KafkaPubFailureException(e);
            }
            counts++;
        }
        
        return counts;
    }

I don’t know how to declare a hashmap <String, object> [] as an instance and how to use it.

The above source was taken from spring support as it is, and some modifications were made.

Read more here: Source link