001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network.jms;
018
019import java.util.concurrent.atomic.AtomicBoolean;
020
021import javax.jms.Connection;
022import javax.jms.Destination;
023import javax.jms.JMSException;
024import javax.jms.Message;
025import javax.jms.MessageConsumer;
026import javax.jms.MessageListener;
027import javax.jms.MessageProducer;
028
029import org.apache.activemq.Service;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * A Destination bridge is used to bridge between to different JMS systems
035 */
036public abstract class DestinationBridge implements Service, MessageListener {
037
038    private static final Logger LOG = LoggerFactory.getLogger(DestinationBridge.class);
039
040    protected MessageConsumer consumer;
041    protected AtomicBoolean started = new AtomicBoolean(false);
042    protected JmsMesageConvertor jmsMessageConvertor;
043    protected boolean doHandleReplyTo = true;
044    protected JmsConnector jmsConnector;
045
046    /**
047     * @return Returns the consumer.
048     */
049    public MessageConsumer getConsumer() {
050        return consumer;
051    }
052
053    /**
054     * @param consumer The consumer to set.
055     */
056    public void setConsumer(MessageConsumer consumer) {
057        this.consumer = consumer;
058    }
059
060    /**
061     * @param connector
062     */
063    public void setJmsConnector(JmsConnector connector) {
064        this.jmsConnector = connector;
065    }
066
067    /**
068     * @return Returns the inboundMessageConvertor.
069     */
070    public JmsMesageConvertor getJmsMessageConvertor() {
071        return jmsMessageConvertor;
072    }
073
074    /**
075     * @param jmsMessageConvertor
076     */
077    public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
078        this.jmsMessageConvertor = jmsMessageConvertor;
079    }
080
081    protected Destination processReplyToDestination(Destination destination) {
082        return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer());
083    }
084
085    public void start() throws Exception {
086        if (started.compareAndSet(false, true)) {
087            createConsumer();
088            createProducer();
089        }
090    }
091
092    public void stop() throws Exception {
093        started.set(false);
094    }
095
096    public void onMessage(Message message) {
097
098        int attempt = 0;
099        final int maxRetries = jmsConnector.getReconnectionPolicy().getMaxSendRetries();
100
101        while (started.get() && message != null && attempt <= maxRetries) {
102
103            try {
104
105                if (attempt++ > 0) {
106                    try {
107                        Thread.sleep(jmsConnector.getReconnectionPolicy().getNextDelay(attempt));
108                    } catch(InterruptedException e) {
109                        break;
110                    }
111                }
112
113                Message converted;
114                if (jmsMessageConvertor != null) {
115                    if (doHandleReplyTo) {
116                        Destination replyTo = message.getJMSReplyTo();
117                        if (replyTo != null) {
118                            converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
119                        } else {
120                            converted = jmsMessageConvertor.convert(message);
121                        }
122                    } else {
123                        message.setJMSReplyTo(null);
124                        converted = jmsMessageConvertor.convert(message);
125                    }
126                } else {
127                    // The Producer side is not up or not yet configured, retry.
128                    continue;
129                }
130
131                try {
132                    sendMessage(converted);
133                } catch(Exception e) {
134                    jmsConnector.handleConnectionFailure(getConnectionForProducer());
135                    continue;
136                }
137
138                try {
139                    message.acknowledge();
140                } catch(Exception e) {
141                    jmsConnector.handleConnectionFailure(getConnnectionForConsumer());
142                    continue;
143                }
144
145                // if we got here then it made it out and was ack'd
146                return;
147
148            } catch (Exception e) {
149                LOG.info("failed to forward message on attempt: {} reason: {} message: {}", new Object[]{ attempt, e, message }, e);
150            }
151        }
152    }
153
154    /**
155     * @return Returns the doHandleReplyTo.
156     */
157    protected boolean isDoHandleReplyTo() {
158        return doHandleReplyTo;
159    }
160
161    /**
162     * @param doHandleReplyTo The doHandleReplyTo to set.
163     */
164    protected void setDoHandleReplyTo(boolean doHandleReplyTo) {
165        this.doHandleReplyTo = doHandleReplyTo;
166    }
167
168    protected abstract MessageConsumer createConsumer() throws JMSException;
169
170    protected abstract MessageProducer createProducer() throws JMSException;
171
172    protected abstract void sendMessage(Message message) throws JMSException;
173
174    protected abstract Connection getConnnectionForConsumer();
175
176    protected abstract Connection getConnectionForProducer();
177
178}