190715-ActiveMQ消息重试与死信队列

ActiveMQ消息重试与死信队列

activeMQ中的消息重发,指的是消息可以被broker重新分派给消费者,不一定的之前的消费者。重发消息之后,消费者可以重新消费。

消息重发

消息重发的情况有以下几种:

  1. 事务会话中,当还未进行session.commit()时,进行session.rollback(),那么所有还没commit的消息都会进行重发
  2. 使用客户端手动确认的方式时,还未进行确认并且执行Session.recover(),那么所有还没acknowledge的消息都会进行重发
  3. 所有未ack的消息,当进行session.closed()关闭事务,那么所有还没ack的消息broker端都会进行重发,而且是马上重发
  4. 消息被消费者拉取之后,超时没有响应ack,消息会被broker重发

重发指的是消息经过broker重新进行转发给消费者,经过测试,1和2的情况消息重发会发送给原来的消费者,3和4可以转发消息给别的消费者。累计次数超过设置的maximumRedeliveries时消息都会都会进入死信队列。

有毒消息

当一个消息被接收的次数超过maximumRedeliveries(默认为6次)次数时,会给broker发送一个poison _ack,这种ack类型告诉broker这个消息“有毒”,尝试多次依然失败,这时broker会将这个消息发送到DLQ,以便后续处理。activeMQ默认的死信队列是ActiveMQ.DLQ,如果没有特别指定,死信消息都会被发送到这个队列。

默认情况下持久消息过期都会被送到DLQ,非持久消息过期默认不会送到DLQ。

可以通过配置文件为指定队列创建死信队列。

重试策略配置

  1. 下面是模拟重试的消费者代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    public class JMSClientAckConsumer {
    public static void main(String[] args) throws JMSException {
    //根据broker URL建立连接工厂

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.0.15:61616");
    //创建连接
    ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
    connection.start();
    //创建会话
    Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
    //重发策略
    RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
    queuePolicy.setInitialRedeliveryDelay(0);
    queuePolicy.setRedeliveryDelay(1000);
    queuePolicy.setUseExponentialBackOff(false);
    queuePolicy.setMaximumRedeliveries(2);

    //创建队列(有则不创建)
    Destination destination = session.createQueue("garine-queue");

    RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
    map.put((ActiveMQDestination) destination, queuePolicy);


    session.createConsumer(destination).setMessageListener(new MessageListener() {
    @Override
    public void onMessage(Message message) {
    TextMessage textMessage = (TextMessage) message;
    try {
    System.out.println(textMessage.getText());
    session.rollback();
    } catch (JMSException e) {
    e.printStackTrace();
    }
    }
    });

    }
    }

    上面模拟的是在事务会话中调用rollback进行重试,经过模拟,发现消息每次重试都是直接在原来的消费者进行重试,即使在重试次数内该消费者挂了,消息依然不会马上分发给别的消费者重试。

  2. url地址配置

    1
    tcp://192.168.0.15:61616?jms.redeliveryPolicy.initialRedeliveryDelay=0&jms.redeliveryPolicy.redeliveryDelay=1000
  3. 在activeMq broker中配置

    重发策略配置可以在官方文档找到重发策略配置

死信队列配置

dlq_detail

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ."
useQueueForQueueMessages="true" />
</deadLetterStrategy>
</policyEntry>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>

死信队列也是队列,消息当然可以被重新消费。

参考资料

  1. spring ActiveMQ 死信队列
  2. activeMQ中的消息重试与死信队列
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×