001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.util;
018
019import java.io.IOException;
020
021import org.apache.activemq.RedeliveryPolicy;
022import org.apache.activemq.ScheduledMessage;
023import org.apache.activemq.broker.Broker;
024import org.apache.activemq.broker.BrokerPluginSupport;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.ProducerBrokerExchange;
027import org.apache.activemq.broker.region.Destination;
028import org.apache.activemq.broker.region.MessageReference;
029import org.apache.activemq.broker.region.Subscription;
030import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
031import org.apache.activemq.command.ActiveMQDestination;
032import org.apache.activemq.command.ActiveMQQueue;
033import org.apache.activemq.command.ActiveMQTopic;
034import org.apache.activemq.command.Message;
035import org.apache.activemq.command.ProducerInfo;
036import org.apache.activemq.filter.AnyDestination;
037import org.apache.activemq.state.ProducerState;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import static org.apache.activemq.broker.region.BaseDestination.DUPLICATE_FROM_STORE_MSG_PREFIX;
042
043/**
044 * Replace regular DLQ handling with redelivery via a resend to the original destination
045 * after a delay
046 * A destination matching RedeliveryPolicy controls the quantity and delay for re-sends
047 * If there is no matching policy or an existing policy limit is exceeded by default
048 * regular DLQ processing resumes. This is controlled via sendToDlqIfMaxRetriesExceeded
049 * and fallbackToDeadLetter
050 *
051 * @org.apache.xbean.XBean element="redeliveryPlugin"
052 */
053public class RedeliveryPlugin extends BrokerPluginSupport {
054    private static final Logger LOG = LoggerFactory.getLogger(RedeliveryPlugin.class);
055    public static final String REDELIVERY_DELAY = "redeliveryDelay";
056
057    RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
058    boolean sendToDlqIfMaxRetriesExceeded = true;
059    private boolean fallbackToDeadLetter = true;
060
061    @Override
062    public Broker installPlugin(Broker broker) throws Exception {
063        if (!broker.getBrokerService().isSchedulerSupport()) {
064            throw new IllegalStateException("RedeliveryPlugin requires schedulerSupport=true on the broker");
065        }
066        validatePolicyDelay(1000);
067        return super.installPlugin(broker);
068    }
069
070    /*
071     * sending to dlq is called as part of a poison ack processing, before the message is acknowledged  and removed
072     * by the destination so a delay is vital to avoid resending before it has been consumed
073     */
074    private void validatePolicyDelay(long limit) {
075        final ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
076        for (Object entry : redeliveryPolicyMap.get(matchAll)) {
077            RedeliveryPolicy redeliveryPolicy = (RedeliveryPolicy) entry;
078            validateLimit(limit, redeliveryPolicy);
079        }
080        RedeliveryPolicy defaultEntry = redeliveryPolicyMap.getDefaultEntry();
081        if (defaultEntry != null) {
082            validateLimit(limit, defaultEntry);
083        }
084    }
085
086    private void validateLimit(long limit, RedeliveryPolicy redeliveryPolicy) {
087        if (redeliveryPolicy.getInitialRedeliveryDelay() < limit) {
088            throw new IllegalStateException("RedeliveryPolicy initialRedeliveryDelay must exceed: " + limit + ". " + redeliveryPolicy);
089        }
090        if (redeliveryPolicy.getRedeliveryDelay() < limit) {
091            throw new IllegalStateException("RedeliveryPolicy redeliveryDelay must exceed: " + limit + ". " + redeliveryPolicy);
092        }
093    }
094
095    public RedeliveryPolicyMap getRedeliveryPolicyMap() {
096        return redeliveryPolicyMap;
097    }
098
099    public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
100        this.redeliveryPolicyMap = redeliveryPolicyMap;
101    }
102
103    public boolean isSendToDlqIfMaxRetriesExceeded() {
104        return sendToDlqIfMaxRetriesExceeded;
105    }
106
107    /**
108     * What to do if the maxretries on a matching redelivery policy is exceeded.
109     * when true, the region broker DLQ processing will be used via sendToDeadLetterQueue
110     * when false, there is no action
111     * @param sendToDlqIfMaxRetriesExceeded
112     */
113    public void setSendToDlqIfMaxRetriesExceeded(boolean sendToDlqIfMaxRetriesExceeded) {
114        this.sendToDlqIfMaxRetriesExceeded = sendToDlqIfMaxRetriesExceeded;
115    }
116
117    public boolean isFallbackToDeadLetter() {
118        return fallbackToDeadLetter;
119    }
120
121    /**
122     * What to do if there is no matching redelivery policy for a destination.
123     * when true, the region broker DLQ processing will be used via sendToDeadLetterQueue
124     * when false, there is no action
125     * @param fallbackToDeadLetter
126     */
127    public void setFallbackToDeadLetter(boolean fallbackToDeadLetter) {
128        this.fallbackToDeadLetter = fallbackToDeadLetter;
129    }
130
131    @Override
132    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause) {
133        if (messageReference.isExpired() || (poisonCause != null && poisonCause.getMessage() != null && poisonCause.getMessage().contains(DUPLICATE_FROM_STORE_MSG_PREFIX))) {
134            // there are three uses of  sendToDeadLetterQueue, we are only interested in valid messages
135            return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
136        } else {
137            try {
138                Destination regionDestination = (Destination) messageReference.getRegionDestination();
139                final RedeliveryPolicy redeliveryPolicy = redeliveryPolicyMap.getEntryFor(regionDestination.getActiveMQDestination());
140                if (redeliveryPolicy != null) {
141                    final int maximumRedeliveries = redeliveryPolicy.getMaximumRedeliveries();
142                    int redeliveryCount = messageReference.getRedeliveryCounter();
143                    if (RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES == maximumRedeliveries || redeliveryCount < maximumRedeliveries) {
144
145                        long delay = redeliveryPolicy.getInitialRedeliveryDelay();
146                        for (int i = 0; i < redeliveryCount; i++) {
147                            delay = redeliveryPolicy.getNextRedeliveryDelay(delay);
148                        }
149
150                        scheduleRedelivery(context, messageReference, delay, ++redeliveryCount);
151                    } else if (isSendToDlqIfMaxRetriesExceeded()) {
152                        return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
153                    } else {
154                        LOG.debug("Discarding message that exceeds max redelivery count({}), {}", maximumRedeliveries, messageReference.getMessageId());
155                    }
156                } else if (isFallbackToDeadLetter()) {
157                    return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
158                } else {
159                    LOG.debug("Ignoring dlq request for: {}, RedeliveryPolicy not found (and no fallback) for: {}", messageReference.getMessageId(), regionDestination.getActiveMQDestination());
160                }
161
162                return false;
163            } catch (Exception exception) {
164                // abort the ack, will be effective if client use transactions or individual ack with sync send
165                RuntimeException toThrow =  new RuntimeException("Failed to schedule redelivery for: " + messageReference.getMessageId(), exception);
166                LOG.error(toThrow.toString(), exception);
167                throw toThrow;
168            }
169        }
170    }
171
172    private void scheduleRedelivery(ConnectionContext context, MessageReference messageReference, long delay, int redeliveryCount) throws Exception {
173        if (LOG.isTraceEnabled()) {
174            Destination regionDestination = (Destination) messageReference.getRegionDestination();
175            LOG.trace("redelivery #{} of: {} with delay: {}, dest: {}", new Object[]{
176                    redeliveryCount, messageReference.getMessageId(), delay, regionDestination.getActiveMQDestination()
177            });
178        }
179        final Message old = messageReference.getMessage();
180        Message message = old.copy();
181
182        message.setTransactionId(null);
183        message.setMemoryUsage(null);
184        message.removeProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
185
186        message.setProperty(REDELIVERY_DELAY, delay);
187        message.setProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
188        message.setRedeliveryCounter(redeliveryCount);
189
190        boolean originalFlowControl = context.isProducerFlowControl();
191        try {
192            context.setProducerFlowControl(false);
193            ProducerInfo info = new ProducerInfo();
194            ProducerState state = new ProducerState(info);
195            ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
196            producerExchange.setProducerState(state);
197            producerExchange.setMutable(true);
198            producerExchange.setConnectionContext(context);
199            context.getBroker().send(producerExchange, message);
200        } finally {
201            context.setProducerFlowControl(originalFlowControl);
202        }
203    }
204
205}