go – RabbitMQ cluster outage results in no consumer

I have a RabbitMQ cluster of 3 instances running on different servers using a HA proxy created using a docker-compose file. I also have a producer, sending data to the defined exchange using a simple PublishWithContext function.

When I run the programs, the consumer comsumes messages with no issue, however if I simulate an outage and stop the docker container on the server where the queues are currently located, the UI shows that no consumer is connected anymore. The queue continues to fill up, however, there is no consumer shown on the UI anymore, therefore the queue is not being drained.

My question is why this is happening?

My call to the consume function:

        err := centralMq.Consume(centralmq.ToProductStoreQueueNameTest, func(body []byte) bool {
            return handleProduct(body, store, count)
        })

Followed by my consume function:

func (cmq *CentralMq) Consume(queueName string, fn func(body []byte) bool) error {
    messages := cmq.channel.Consume(
        queueName, // queue
        "",        // consumer name
        false, false, false, false,
        nil,
        func() error { return nil },
    )
    // pull messages from queue and process them
    for msg := range messages {
        ack := fn(msg.Body)
        if ack {
            err := msg.Ack(false)
            if err != nil {
                return err
            }
        } else {
            err := msg.Reject(true)
            if err != nil {
                return err
            }
        }
    }
    return nil
}
// Consume wrap amqp.Channel.Consume, the returned delivery will end only when channel closed by developer
func (ch *Channel) Consume(
    queue,
    consumer string,
    autoAck, exclusive, noLocal, noWait bool, args amqp.Table,
    onReconnectFn func() error,
) <-chan amqp.Delivery {
    deliveries := make(chan amqp.Delivery)

    go func() {
        for {
            d, err := ch.Channel.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
            if err != nil {
                debugf("consume failed, err: %v", err)
                time.Sleep(time.Duration(delay) * time.Second)
                continue
            }

            for msg := range d {
                deliveries <- msg
            }

            // sleep before IsClose call. closed flag may not set before sleep.
            time.Sleep(time.Duration(delay) * time.Second)

            if ch.IsClosed() {
                break
            }

            err = onReconnectFn()
            if err != nil {
                // close channel if we do not want
                // to continue on reconnects
                debug("closing consumer because of reconnect upon error: %v", err)
                break
            }
        }
        close(deliveries)
    }()
    return deliveries
}

And finally, this is how I define my channel:

// Channel wrap amqp.Connection.Channel, get a auto reconnect channel
func (c *Connection) Channel() (*Channel, error) {
    ch, err := c.Connection.Channel()
    if err != nil {
        return nil, err
    }

    channel := &Channel{
        Channel: ch,
    }

    go func() {
        for {
            reason, ok := <-channel.Channel.NotifyClose(make(chan *amqp.Error))
            // exit this goroutine if closed by developer
            if !ok || channel.IsClosed() {
                debug("channel closed")
                channel.Close() // close again, ensure closed flag set when connection closed
                break
            }
            debugf("channel closed, reason: %v", reason)

            // reconnect if not closed by developer
            for {
                // wait 1s for connection reconnect
                time.Sleep(time.Duration(delay) * time.Second)

                ch, err := c.Connection.Channel()
                if err == nil {
                    channel.Channel = ch
                    break
                }

                debugf("channel recreate failed, err: %v", err)
            }
        }
    }()

    return channel, nil
}

And how the queue is declared:

    // product test store
    _, err = channel.QueueDeclare(
        ToProductStoreQueueNameTest, // name
        true, false, false, false,
        quorumQueue, // arguments
    )
    if err != nil {
        return errors.New(fmt.Sprintf("%s: %s", "Failed to declare a queue", err))
    }
    err = channel.QueueBind(
        ToProductStoreQueueNameTest, // queue name
        ToProductStoreKeyTest,       // routing key
        toStoreExchangeName,         // exchange
        false,
        nil,
    )
    if err != nil {
        return errors.New(fmt.Sprintf("%s: %s", "Failed to bind to queue", err))
    }

Read more here: Source link