c++ – RabbitMQ queue received message which not belongs to the queue

Hi i want to ask I have tried many times, and many solution but still having this issue which is my client send one message to Queue A but my Queue B will receive the message. Sometimes, the Queue A can receive the message but sometime cannot.

  1. Is my implementation correct?
  2. when both queue are bound in the same exchange, when send message with routing key, only the specify queue will received?
  3. how the amqp_consume_message peroform?

//Consumer function for Queue A
void consumeQueueA(amqp_connection_state_t conn) {
//amqp_channel_open(conn, 1);
binding_mutex.lock();


amqp_queue_declare_ok_t* res = amqp_queue_declare(conn, 1, amqp_cstring_bytes("Queue_A"), 0, 1, 0, 1, amqp_empty_table);

if (!res) {
    amqp_rpc_reply_t reply = amqp_get_rpc_reply(conn);
    if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Queue declaration failed A: " << amqp_error_string2(reply.library_error) << std::endl;
    }
}

amqp_queue_bind_ok_t* bind_result = amqp_queue_bind(
    conn,
    1, // channel
    amqp_cstring_bytes("Queue_A"), // queue name
    amqp_cstring_bytes("topic_logs"), // exchange name
    amqp_cstring_bytes("app.queueA.AA"), // routing key
    amqp_empty_table // arguments
);

if (!bind_result) {
    amqp_rpc_reply_t reply = amqp_get_rpc_reply(conn);
    if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Queue binding failed A: " << amqp_error_string2(reply.library_error) << std::endl;
    }
}


amqp_basic_consume_ok_t* consume_result =  amqp_basic_consume(conn, 1, amqp_cstring_bytes("Queue_A"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
//std::cout << "Bound queue " << queueName << " to exchange " << exchangeName << " with routing key " << routingKey << std::endl;

if (!consume_result) {
    amqp_rpc_reply_t reply = amqp_get_rpc_reply(conn);
    if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Basic consume failed A : " << amqp_error_string2(reply.library_error) << std::endl;
    }
}

binding_mutex.unlock();
while (true) {
    amqp_rpc_reply_t res;
    amqp_envelope_t envelope;
    amqp_maybe_release_buffers(conn);
    res = amqp_consume_message(conn, &envelope, nullptr, 0);

    if (res.reply_type == AMQP_RESPONSE_NORMAL) {
        std::string message((char*)envelope.message.body.bytes, envelope.message.body.len);
        std::string queueName = "Queue_A"; // Since you're consuming from Queue_B


        std::string routingKey((char*)envelope.routing_key.bytes, envelope.routing_key.len);
        if (routingKey == "app.queueA.AA") {
            STAMP_LOG("Received message on: " + queueName + " with routing key: " + routingKey);
            std::cout << "Received message on " << queueName
                << ": " << message
                << " with routing key: " << routingKey << std::endl;
            // Process message for Queue_B
            // E.g., Data processing, notifications
            amqp_basic_ack(conn, 1, envelope.delivery_tag, 0);
        }
        else {
            // Handle unexpected message, do not acknowledge
            STAMP_LOG("Received message on: " + queueName + " with routing key: " + routingKey);
            std::cout << "Unexpected message for Queue_A, skipping: " << message << std::endl;
        }
    }
    else
    {
        std::cout << "Error Consuming message A: " << res.library_error << std::endl;
    }

    amqp_destroy_envelope(&envelope);

    /*amqp_rpc_reply_t reply = amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Channel close error A: " << amqp_error_string2(reply.library_error) << std::endl;
    }*/

}
}






//Consumer function for Queue B
void ConnectionMgr::consumeQueueB(amqp_connection_state_t conn) {
amqp_channel_open(conn, 2);
binding_mutex.lock();
amqp_queue_declare_ok_t* res =  amqp_queue_declare(conn, 2, 
amqp_cstring_bytes("Queue_B"), 0, 1, 0, 1, amqp_empty_table);

if (!res) {
    amqp_rpc_reply_t reply = amqp_get_rpc_reply(conn);
    if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Queue declaration failed B: " << amqp_error_string2(reply.library_error) << std::endl;
    }
}

amqp_queue_bind_ok_t* bind_result = amqp_queue_bind(
    conn,
    2, // channel
    amqp_cstring_bytes("Queue_B"), // queue name
    amqp_cstring_bytes("topic_logs"), // exchange name
    amqp_cstring_bytes("app.queueB.BB"), // routing key
    amqp_empty_table // arguments
);

if (!bind_result) {
    amqp_rpc_reply_t reply = amqp_get_rpc_reply(conn);
    if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Queue binding failed B: " << amqp_error_string2(reply.library_error) << std::endl;
    }
}


amqp_basic_consume_ok_t* consume_result =  amqp_basic_consume(conn, 2, amqp_cstring_bytes("Queue_B"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);

if (!consume_result) {
    amqp_rpc_reply_t reply = amqp_get_rpc_reply(conn);
    if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Basic consume failed B: " << amqp_error_string2(reply.library_error) << std::endl;
    }
}

binding_mutex.unlock();
while (true) {
    amqp_rpc_reply_t res;
    amqp_envelope_t envelope;
    amqp_maybe_release_buffers(conn);
    res = amqp_consume_message(conn, &envelope, nullptr, 0);

    if (res.reply_type == AMQP_RESPONSE_NORMAL) {
        std::string message((char*)envelope.message.body.bytes, envelope.message.body.len);
        std::string queueName = "Queue_B"; // Since you're consuming from Queue_B

        std::string routingKey((char*)envelope.routing_key.bytes, envelope.routing_key.len);
        if (routingKey == "app.queueB.BB") {
            STAMP_LOG("Received message on: " + queueName + " with routing key: " + routingKey);
            std::cout << "Received message on " << queueName
                << ": " << message
                << " with routing key: " << routingKey << std::endl;
            // Process message for Queue_B
            // E.g., Data processing, notifications
            amqp_basic_ack(conn, 2, envelope.delivery_tag, 0);
        }
        else {
            // Handle unexpected message, do not acknowledge
            STAMP_LOG("Received message on: " + queueName + " with routing key: " + routingKey);
            std::cout << "Unexpected message for Queue_B, skipping: " << message << std::endl;
        }
    
    }
    else
    {
        std::cout << "Error Consuming message B: " << res.library_error << std::endl;
    }

    amqp_destroy_envelope(&envelope);

    /*amqp_rpc_reply_t reply = amqp_channel_close(conn, 2, AMQP_REPLY_SUCCESS);
    if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Channel close error B: " << amqp_error_string2(reply.library_error) << std::endl;
    }*/

}
}




// Function to send a message to a specific queue
void sendMessage(const std::string& exchange, const std::string& routingKey, const std::string& message, const int channel) {

//amqp_exchange_declare(conn, 1, amqp_cstring_bytes("topic_logss"), amqp_cstring_bytes("topic"), 0, 1, 0, 0, amqp_empty_table);

amqp_bytes_t message_bytes;
message_bytes.len = message.size();
message_bytes.bytes = (void*)message.c_str();

// Publish the message to the exchange with the specified routing key
amqp_basic_publish(
    conn,
    channel, // channel
    amqp_cstring_bytes(exchange.c_str()), // exchange name
    amqp_cstring_bytes(routingKey.c_str()), // routing key
    0, // mandatory
    0, // immediate
    nullptr, // no properties
    message_bytes // message body
);

std::cout << " [x] Sent '" << routingKey << "':'" << message << "'" << std::endl;
}

Above is the implementation on how I declare the queue, exchange then binding and also consume the queue. Below show the result which, I send one message to Queue A and received success, second time send to Queue A but Queue B received.

Result

Read more here: Source link