ActiveMQ消息重试与死信队列
activeMQ中的消息重发,指的是消息可以被broker重新分派给消费者,不一定的之前的消费者。重发消息之后,消费者可以重新消费。
消息重发
消息重发的情况有以下几种:
- 事务会话中,当还未进行session.commit()时,进行session.rollback(),那么所有还没commit的消息都会进行重发
- 使用客户端手动确认的方式时,还未进行确认并且执行Session.recover(),那么所有还没acknowledge的消息都会进行重发
- 所有未ack的消息,当进行session.closed()关闭事务,那么所有还没ack的消息broker端都会进行重发,而且是马上重发
- 消息被消费者拉取之后,超时没有响应ack,消息会被broker重发
重发指的是消息经过broker重新进行转发给消费者,经过测试,1和2的情况消息重发会发送给原来的消费者,3和4可以转发消息给别的消费者。累计次数超过设置的maximumRedeliveries时消息都会都会进入死信队列。
有毒消息
当一个消息被接收的次数超过maximumRedeliveries(默认为6次)次数时,会给broker发送一个poison _ack,这种ack类型告诉broker这个消息“有毒”,尝试多次依然失败,这时broker会将这个消息发送到DLQ,以便后续处理。activeMQ默认的死信队列是ActiveMQ.DLQ,如果没有特别指定,死信消息都会被发送到这个队列。
默认情况下持久消息过期都会被送到DLQ,非持久消息过期默认不会送到DLQ。
可以通过配置文件为指定队列创建死信队列。
重试策略配置
下面是模拟重试的消费者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39public class JMSClientAckConsumer {
public static void main(String[] args) throws JMSException {
//根据broker URL建立连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.0.15:61616");
//创建连接
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();
//创建会话
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
//重发策略
RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
queuePolicy.setInitialRedeliveryDelay(0);
queuePolicy.setRedeliveryDelay(1000);
queuePolicy.setUseExponentialBackOff(false);
queuePolicy.setMaximumRedeliveries(2);
//创建队列(有则不创建)
Destination destination = session.createQueue("garine-queue");
RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
map.put((ActiveMQDestination) destination, queuePolicy);
session.createConsumer(destination).setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
session.rollback();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}上面模拟的是在事务会话中调用rollback进行重试,经过模拟,发现消息每次重试都是直接在原来的消费者进行重试,即使在重试次数内该消费者挂了,消息依然不会马上分发给别的消费者重试。
url地址配置
1
tcp://192.168.0.15:61616?jms.redeliveryPolicy.initialRedeliveryDelay=0&jms.redeliveryPolicy.redeliveryDelay=1000
在activeMq broker中配置
重发策略配置可以在官方文档找到重发策略配置
死信队列配置
1 | <destinationPolicy> |
死信队列也是队列,消息当然可以被重新消费。