001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network.jms; 018 019import java.util.concurrent.atomic.AtomicBoolean; 020 021import javax.jms.Connection; 022import javax.jms.Destination; 023import javax.jms.JMSException; 024import javax.jms.Message; 025import javax.jms.MessageConsumer; 026import javax.jms.MessageListener; 027import javax.jms.MessageProducer; 028 029import org.apache.activemq.Service; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * A Destination bridge is used to bridge between to different JMS systems 035 */ 036public abstract class DestinationBridge implements Service, MessageListener { 037 038 private static final Logger LOG = LoggerFactory.getLogger(DestinationBridge.class); 039 040 protected MessageConsumer consumer; 041 protected AtomicBoolean started = new AtomicBoolean(false); 042 protected JmsMesageConvertor jmsMessageConvertor; 043 protected boolean doHandleReplyTo = true; 044 protected JmsConnector jmsConnector; 045 046 /** 047 * @return Returns the consumer. 048 */ 049 public MessageConsumer getConsumer() { 050 return consumer; 051 } 052 053 /** 054 * @param consumer The consumer to set. 055 */ 056 public void setConsumer(MessageConsumer consumer) { 057 this.consumer = consumer; 058 } 059 060 /** 061 * @param connector 062 */ 063 public void setJmsConnector(JmsConnector connector) { 064 this.jmsConnector = connector; 065 } 066 067 /** 068 * @return Returns the inboundMessageConvertor. 069 */ 070 public JmsMesageConvertor getJmsMessageConvertor() { 071 return jmsMessageConvertor; 072 } 073 074 /** 075 * @param jmsMessageConvertor 076 */ 077 public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) { 078 this.jmsMessageConvertor = jmsMessageConvertor; 079 } 080 081 protected Destination processReplyToDestination(Destination destination) { 082 return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer()); 083 } 084 085 public void start() throws Exception { 086 if (started.compareAndSet(false, true)) { 087 createConsumer(); 088 createProducer(); 089 } 090 } 091 092 public void stop() throws Exception { 093 started.set(false); 094 } 095 096 public void onMessage(Message message) { 097 098 int attempt = 0; 099 final int maxRetries = jmsConnector.getReconnectionPolicy().getMaxSendRetries(); 100 101 while (started.get() && message != null && attempt <= maxRetries) { 102 103 try { 104 105 if (attempt++ > 0) { 106 try { 107 Thread.sleep(jmsConnector.getReconnectionPolicy().getNextDelay(attempt)); 108 } catch(InterruptedException e) { 109 break; 110 } 111 } 112 113 Message converted; 114 if (jmsMessageConvertor != null) { 115 if (doHandleReplyTo) { 116 Destination replyTo = message.getJMSReplyTo(); 117 if (replyTo != null) { 118 converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo)); 119 } else { 120 converted = jmsMessageConvertor.convert(message); 121 } 122 } else { 123 message.setJMSReplyTo(null); 124 converted = jmsMessageConvertor.convert(message); 125 } 126 } else { 127 // The Producer side is not up or not yet configured, retry. 128 continue; 129 } 130 131 try { 132 sendMessage(converted); 133 } catch(Exception e) { 134 jmsConnector.handleConnectionFailure(getConnectionForProducer()); 135 continue; 136 } 137 138 try { 139 message.acknowledge(); 140 } catch(Exception e) { 141 jmsConnector.handleConnectionFailure(getConnnectionForConsumer()); 142 continue; 143 } 144 145 // if we got here then it made it out and was ack'd 146 return; 147 148 } catch (Exception e) { 149 LOG.info("failed to forward message on attempt: {} reason: {} message: {}", new Object[]{ attempt, e, message }, e); 150 } 151 } 152 } 153 154 /** 155 * @return Returns the doHandleReplyTo. 156 */ 157 protected boolean isDoHandleReplyTo() { 158 return doHandleReplyTo; 159 } 160 161 /** 162 * @param doHandleReplyTo The doHandleReplyTo to set. 163 */ 164 protected void setDoHandleReplyTo(boolean doHandleReplyTo) { 165 this.doHandleReplyTo = doHandleReplyTo; 166 } 167 168 protected abstract MessageConsumer createConsumer() throws JMSException; 169 170 protected abstract MessageProducer createProducer() throws JMSException; 171 172 protected abstract void sendMessage(Message message) throws JMSException; 173 174 protected abstract Connection getConnnectionForConsumer(); 175 176 protected abstract Connection getConnectionForProducer(); 177 178}