c++ – RabbitMQ queue received message which not belongs to the queue
Hi i want to ask I have tried many times, and many solution but still having this issue which is my client send one message to Queue A but my Queue B will receive the message. Sometimes, the Queue A can receive the message but sometime cannot.
- Is my implementation correct?
- when both queue are bound in the same exchange, when send message with routing key, only the specify queue will received?
- how the amqp_consume_message peroform?
…
//Consumer function for Queue A
void consumeQueueA(amqp_connection_state_t conn) {
//amqp_channel_open(conn, 1);
binding_mutex.lock();
amqp_queue_declare_ok_t* res = amqp_queue_declare(conn, 1, amqp_cstring_bytes("Queue_A"), 0, 1, 0, 1, amqp_empty_table);
if (!res) {
amqp_rpc_reply_t reply = amqp_get_rpc_reply(conn);
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Queue declaration failed A: " << amqp_error_string2(reply.library_error) << std::endl;
}
}
amqp_queue_bind_ok_t* bind_result = amqp_queue_bind(
conn,
1, // channel
amqp_cstring_bytes("Queue_A"), // queue name
amqp_cstring_bytes("topic_logs"), // exchange name
amqp_cstring_bytes("app.queueA.AA"), // routing key
amqp_empty_table // arguments
);
if (!bind_result) {
amqp_rpc_reply_t reply = amqp_get_rpc_reply(conn);
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Queue binding failed A: " << amqp_error_string2(reply.library_error) << std::endl;
}
}
amqp_basic_consume_ok_t* consume_result = amqp_basic_consume(conn, 1, amqp_cstring_bytes("Queue_A"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
//std::cout << "Bound queue " << queueName << " to exchange " << exchangeName << " with routing key " << routingKey << std::endl;
if (!consume_result) {
amqp_rpc_reply_t reply = amqp_get_rpc_reply(conn);
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Basic consume failed A : " << amqp_error_string2(reply.library_error) << std::endl;
}
}
binding_mutex.unlock();
while (true) {
amqp_rpc_reply_t res;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
res = amqp_consume_message(conn, &envelope, nullptr, 0);
if (res.reply_type == AMQP_RESPONSE_NORMAL) {
std::string message((char*)envelope.message.body.bytes, envelope.message.body.len);
std::string queueName = "Queue_A"; // Since you're consuming from Queue_B
std::string routingKey((char*)envelope.routing_key.bytes, envelope.routing_key.len);
if (routingKey == "app.queueA.AA") {
STAMP_LOG("Received message on: " + queueName + " with routing key: " + routingKey);
std::cout << "Received message on " << queueName
<< ": " << message
<< " with routing key: " << routingKey << std::endl;
// Process message for Queue_B
// E.g., Data processing, notifications
amqp_basic_ack(conn, 1, envelope.delivery_tag, 0);
}
else {
// Handle unexpected message, do not acknowledge
STAMP_LOG("Received message on: " + queueName + " with routing key: " + routingKey);
std::cout << "Unexpected message for Queue_A, skipping: " << message << std::endl;
}
}
else
{
std::cout << "Error Consuming message A: " << res.library_error << std::endl;
}
amqp_destroy_envelope(&envelope);
/*amqp_rpc_reply_t reply = amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Channel close error A: " << amqp_error_string2(reply.library_error) << std::endl;
}*/
}
}
//Consumer function for Queue B
void ConnectionMgr::consumeQueueB(amqp_connection_state_t conn) {
amqp_channel_open(conn, 2);
binding_mutex.lock();
amqp_queue_declare_ok_t* res = amqp_queue_declare(conn, 2,
amqp_cstring_bytes("Queue_B"), 0, 1, 0, 1, amqp_empty_table);
if (!res) {
amqp_rpc_reply_t reply = amqp_get_rpc_reply(conn);
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Queue declaration failed B: " << amqp_error_string2(reply.library_error) << std::endl;
}
}
amqp_queue_bind_ok_t* bind_result = amqp_queue_bind(
conn,
2, // channel
amqp_cstring_bytes("Queue_B"), // queue name
amqp_cstring_bytes("topic_logs"), // exchange name
amqp_cstring_bytes("app.queueB.BB"), // routing key
amqp_empty_table // arguments
);
if (!bind_result) {
amqp_rpc_reply_t reply = amqp_get_rpc_reply(conn);
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Queue binding failed B: " << amqp_error_string2(reply.library_error) << std::endl;
}
}
amqp_basic_consume_ok_t* consume_result = amqp_basic_consume(conn, 2, amqp_cstring_bytes("Queue_B"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
if (!consume_result) {
amqp_rpc_reply_t reply = amqp_get_rpc_reply(conn);
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Basic consume failed B: " << amqp_error_string2(reply.library_error) << std::endl;
}
}
binding_mutex.unlock();
while (true) {
amqp_rpc_reply_t res;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
res = amqp_consume_message(conn, &envelope, nullptr, 0);
if (res.reply_type == AMQP_RESPONSE_NORMAL) {
std::string message((char*)envelope.message.body.bytes, envelope.message.body.len);
std::string queueName = "Queue_B"; // Since you're consuming from Queue_B
std::string routingKey((char*)envelope.routing_key.bytes, envelope.routing_key.len);
if (routingKey == "app.queueB.BB") {
STAMP_LOG("Received message on: " + queueName + " with routing key: " + routingKey);
std::cout << "Received message on " << queueName
<< ": " << message
<< " with routing key: " << routingKey << std::endl;
// Process message for Queue_B
// E.g., Data processing, notifications
amqp_basic_ack(conn, 2, envelope.delivery_tag, 0);
}
else {
// Handle unexpected message, do not acknowledge
STAMP_LOG("Received message on: " + queueName + " with routing key: " + routingKey);
std::cout << "Unexpected message for Queue_B, skipping: " << message << std::endl;
}
}
else
{
std::cout << "Error Consuming message B: " << res.library_error << std::endl;
}
amqp_destroy_envelope(&envelope);
/*amqp_rpc_reply_t reply = amqp_channel_close(conn, 2, AMQP_REPLY_SUCCESS);
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Channel close error B: " << amqp_error_string2(reply.library_error) << std::endl;
}*/
}
}
// Function to send a message to a specific queue
void sendMessage(const std::string& exchange, const std::string& routingKey, const std::string& message, const int channel) {
//amqp_exchange_declare(conn, 1, amqp_cstring_bytes("topic_logss"), amqp_cstring_bytes("topic"), 0, 1, 0, 0, amqp_empty_table);
amqp_bytes_t message_bytes;
message_bytes.len = message.size();
message_bytes.bytes = (void*)message.c_str();
// Publish the message to the exchange with the specified routing key
amqp_basic_publish(
conn,
channel, // channel
amqp_cstring_bytes(exchange.c_str()), // exchange name
amqp_cstring_bytes(routingKey.c_str()), // routing key
0, // mandatory
0, // immediate
nullptr, // no properties
message_bytes // message body
);
std::cout << " [x] Sent '" << routingKey << "':'" << message << "'" << std::endl;
}
…
Above is the implementation on how I declare the queue, exchange then binding and also consume the queue. Below show the result which, I send one message to Queue A and received success, second time send to Queue A but Queue B received.
Read more here: Source link