java – How to publish mqtt message to kafka in spring

Develop mqtt connector for Kafka using spring.

Using the mqtt library provided by spring, messages are collected as follows.

message handler

@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
        return new MessageHandler() {

            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
                if(topic.equals("myTopic")) {
                    System.out.println("Mqtt data pub");    

                if(topic==null) {
                    topic = "mqttdata";
                String tag = "test/vib";
                String name = null;
                if(name==null) {
                    name = KafkaMessageService.MQTT_PRODUCER;
                HashMap<String, Object> datalist = new HashMap<String, Object>();
                        try {
                    datalist =convertJSONstringToMap(message.getPayload().toString());
                    counts = kafkaMessageService.publish(topic, name, tag, (HashMap<String,Object>[] datalist);
                } catch (Exception e) {
                    // TODO Auto-generated catch block

    public static HashMap<String,Object> convertJSONstringToMap(String json) throws Exception {
        ObjectMapper mapper = new ObjectMapper();
        HashMap<String, Object> map = new HashMap<String, Object>();
        map = mapper.readValue(json, new TypeReference<HashMap<String, Object>>() {});
        return map;

publish method

    public int publish(String topic,String producerName,String tag,HashMap<String,Object>[] datalist) throws NotMatchedProducerException,KafkaPubFailureException{
        KafkaProducerAdaptor adaptor = searchProducerAdaptor(producerName);
        if(adaptor==null) {
            throw new NotMatchedProducerException();
        KafkaTemplate<String,Object> kafkaTemplate = adaptor.getKafkaTemplate();
        LocalDateTime currentDateTime =;
        String receivedTime = currentDateTime.toString();
        ObjectMapper objectMapper = new ObjectMapper();
        String key = adaptor.getName();
        int counts = 0;
        for(HashMap<String,Object> data : datalist) {
            Map<String,Object> messagePacket = new HashMap<String,Object>();
            messagePacket.put("tag", tag);
            messagePacket.put("data", data);
            messagePacket.put("receivedtime", receivedTime);
            try {
      "Sent message : topic=["+topic+"],key=["+key+"] value=["+messagePacket+"]");
            } catch(Exception e) {
      "Unable to send message : topic=["+topic+"],key=["+key+"] message=["+messagePacket+"] / due to : "+e.getMessage());
                throw new KafkaPubFailureException(e);
        return counts;

I don’t know how to declare a hashmap <String, object> [] as an instance and how to use it.

The above source was taken from spring support as it is, and some modifications were made.

Read more here: Source link