001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.util;
018
019import org.apache.activemq.broker.Broker;
020import org.apache.activemq.broker.ConnectionContext;
021import org.apache.activemq.broker.ProducerBrokerExchange;
022import org.apache.activemq.command.ActiveMQDestination;
023import org.apache.activemq.command.Message;
024import org.apache.activemq.command.ProducerInfo;
025import org.apache.activemq.security.SecurityContext;
026import org.apache.activemq.state.ProducerState;
027
028/**
029 * Utility class for broker operations
030 *
031 */
032public final class BrokerSupport {
033
034    private BrokerSupport() {
035    }
036
037    public static void resendNoCopy(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination) throws Exception {
038        doResend(context, originalMessage, deadLetterDestination, false);
039    }
040
041    /**
042     * @param context
043     * @param originalMessage
044     * @param deadLetterDestination
045     * @throws Exception
046     */
047    public static void resend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination) throws Exception {
048        doResend(context, originalMessage, deadLetterDestination, true);
049    }
050
051    public static void doResend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination, boolean copy) throws Exception {
052        Message message = copy ? originalMessage.copy() : originalMessage;
053        message.setOriginalDestination(message.getDestination());
054        message.setOriginalTransactionId(message.getTransactionId());
055        message.setDestination(deadLetterDestination);
056        message.setTransactionId(null);
057        message.setMemoryUsage(null);
058        message.setRedeliveryCounter(0);
059        message.getMessageId().setDataLocator(null);
060        boolean originalFlowControl = context.isProducerFlowControl();
061        try {
062            context.setProducerFlowControl(false);
063            ProducerInfo info = new ProducerInfo();
064            ProducerState state = new ProducerState(info);
065            ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
066            producerExchange.setProducerState(state);
067            producerExchange.setMutable(true);
068            producerExchange.setConnectionContext(context);
069            context.getBroker().send(producerExchange, message);
070        } finally {
071            context.setProducerFlowControl(originalFlowControl);
072        }
073    }
074
075    /**
076     * Returns the broker's administration connection context used for
077     * configuring the broker at startup
078     */
079    public static ConnectionContext getConnectionContext(Broker broker) {
080        ConnectionContext adminConnectionContext = broker.getAdminConnectionContext();
081        if (adminConnectionContext == null) {
082            adminConnectionContext = createAdminConnectionContext(broker);
083            broker.setAdminConnectionContext(adminConnectionContext);
084        }
085        return adminConnectionContext;
086    }
087
088    /**
089     * Factory method to create the new administration connection context
090     * object. Note this method is here rather than inside a default broker
091     * implementation to ensure that the broker reference inside it is the outer
092     * most interceptor
093     */
094    protected static ConnectionContext createAdminConnectionContext(Broker broker) {
095        ConnectionContext context = new ConnectionContext();
096        context.setBroker(broker);
097        context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
098        return context;
099    }
100}