001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network.jms;
018
019import javax.jms.Connection;
020import javax.jms.Destination;
021import javax.jms.ExceptionListener;
022import javax.jms.JMSException;
023import javax.jms.Queue;
024import javax.jms.QueueConnection;
025import javax.jms.QueueConnectionFactory;
026import javax.jms.QueueSession;
027import javax.jms.Session;
028import javax.naming.NamingException;
029
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 */
035public class SimpleJmsQueueConnector extends JmsConnector {
036    private static final Logger LOG = LoggerFactory.getLogger(SimpleJmsQueueConnector.class);
037    private String outboundQueueConnectionFactoryName;
038    private String localConnectionFactoryName;
039    private QueueConnectionFactory outboundQueueConnectionFactory;
040    private QueueConnectionFactory localQueueConnectionFactory;
041    private InboundQueueBridge[] inboundQueueBridges;
042    private OutboundQueueBridge[] outboundQueueBridges;
043
044    /**
045     * @return Returns the inboundQueueBridges.
046     */
047    public InboundQueueBridge[] getInboundQueueBridges() {
048        return inboundQueueBridges;
049    }
050
051    /**
052     * @param inboundQueueBridges The inboundQueueBridges to set.
053     */
054    public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges) {
055        this.inboundQueueBridges = inboundQueueBridges;
056    }
057
058    /**
059     * @return Returns the outboundQueueBridges.
060     */
061    public OutboundQueueBridge[] getOutboundQueueBridges() {
062        return outboundQueueBridges;
063    }
064
065    /**
066     * @param outboundQueueBridges The outboundQueueBridges to set.
067     */
068    public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges) {
069        this.outboundQueueBridges = outboundQueueBridges;
070    }
071
072    /**
073     * @return Returns the localQueueConnectionFactory.
074     */
075    public QueueConnectionFactory getLocalQueueConnectionFactory() {
076        return localQueueConnectionFactory;
077    }
078
079    /**
080     * @param localQueueConnectionFactory The localQueueConnectionFactory to
081     *                set.
082     */
083    public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory) {
084        this.localQueueConnectionFactory = localConnectionFactory;
085    }
086
087    /**
088     * @return Returns the outboundQueueConnectionFactory.
089     */
090    public QueueConnectionFactory getOutboundQueueConnectionFactory() {
091        return outboundQueueConnectionFactory;
092    }
093
094    /**
095     * @return Returns the outboundQueueConnectionFactoryName.
096     */
097    public String getOutboundQueueConnectionFactoryName() {
098        return outboundQueueConnectionFactoryName;
099    }
100
101    /**
102     * @param outboundQueueConnectionFactoryName The
103     *                outboundQueueConnectionFactoryName to set.
104     */
105    public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName) {
106        this.outboundQueueConnectionFactoryName = foreignQueueConnectionFactoryName;
107    }
108
109    /**
110     * @return Returns the localConnectionFactoryName.
111     */
112    public String getLocalConnectionFactoryName() {
113        return localConnectionFactoryName;
114    }
115
116    /**
117     * @param localConnectionFactoryName The localConnectionFactoryName to set.
118     */
119    public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
120        this.localConnectionFactoryName = localConnectionFactoryName;
121    }
122
123    /**
124     * @return Returns the localQueueConnection.
125     */
126    public QueueConnection getLocalQueueConnection() {
127        return (QueueConnection) localConnection.get();
128    }
129
130    /**
131     * @param localQueueConnection The localQueueConnection to set.
132     */
133    public void setLocalQueueConnection(QueueConnection localQueueConnection) {
134        this.localConnection.set(localQueueConnection);
135    }
136
137    /**
138     * @return Returns the outboundQueueConnection.
139     */
140    public QueueConnection getOutboundQueueConnection() {
141        return (QueueConnection) foreignConnection.get();
142    }
143
144    /**
145     * @param outboundQueueConnection The outboundQueueConnection to set.
146     */
147    public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) {
148        this.foreignConnection.set(foreignQueueConnection);
149    }
150
151    /**
152     * @param outboundQueueConnectionFactory The outboundQueueConnectionFactory
153     *                to set.
154     */
155    public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory) {
156        this.outboundQueueConnectionFactory = foreignQueueConnectionFactory;
157    }
158
159    @Override
160    protected void initializeForeignConnection() throws NamingException, JMSException {
161
162        final QueueConnection newConnection;
163
164        if (foreignConnection.get() == null) {
165            // get the connection factories
166            if (outboundQueueConnectionFactory == null) {
167                // look it up from JNDI
168                if (outboundQueueConnectionFactoryName != null) {
169                    outboundQueueConnectionFactory = (QueueConnectionFactory)jndiOutboundTemplate
170                        .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class);
171                    if (outboundUsername != null) {
172                        newConnection = outboundQueueConnectionFactory
173                            .createQueueConnection(outboundUsername, outboundPassword);
174                    } else {
175                        newConnection = outboundQueueConnectionFactory.createQueueConnection();
176                    }
177                } else {
178                    throw new JMSException("Cannot create foreignConnection - no information");
179                }
180            } else {
181                if (outboundUsername != null) {
182                    newConnection = outboundQueueConnectionFactory
183                        .createQueueConnection(outboundUsername, outboundPassword);
184                } else {
185                    newConnection = outboundQueueConnectionFactory.createQueueConnection();
186                }
187            }
188        } else {
189            // Clear if for now in case something goes wrong during the init.
190            newConnection = (QueueConnection) foreignConnection.getAndSet(null);
191        }
192
193        if (outboundClientId != null && outboundClientId.length() > 0) {
194            newConnection.setClientID(getOutboundClientId());
195        }
196        newConnection.start();
197
198        outboundMessageConvertor.setConnection(newConnection);
199
200        // Configure the bridges with the new Outbound connection.
201        initializeInboundDestinationBridgesOutboundSide(newConnection);
202        initializeOutboundDestinationBridgesOutboundSide(newConnection);
203
204        // Register for any async error notifications now so we can reset in the
205        // case where there's not a lot of activity and a connection drops.
206        newConnection.setExceptionListener(new ExceptionListener() {
207            @Override
208            public void onException(JMSException exception) {
209                handleConnectionFailure(newConnection);
210            }
211        });
212
213        // At this point all looks good, so this our current connection now.
214        foreignConnection.set(newConnection);
215    }
216
217    @Override
218    protected void initializeLocalConnection() throws NamingException, JMSException {
219
220        final QueueConnection newConnection;
221
222        if (localConnection.get() == null) {
223            // get the connection factories
224            if (localQueueConnectionFactory == null) {
225                if (embeddedConnectionFactory == null) {
226                    // look it up from JNDI
227                    if (localConnectionFactoryName != null) {
228                        localQueueConnectionFactory = (QueueConnectionFactory)jndiLocalTemplate
229                            .lookup(localConnectionFactoryName, QueueConnectionFactory.class);
230                        if (localUsername != null) {
231                            newConnection = localQueueConnectionFactory
232                                .createQueueConnection(localUsername, localPassword);
233                        } else {
234                            newConnection = localQueueConnectionFactory.createQueueConnection();
235                        }
236                    } else {
237                        throw new JMSException("Cannot create localConnection - no information");
238                    }
239                } else {
240                    newConnection = embeddedConnectionFactory.createQueueConnection();
241                }
242            } else {
243                if (localUsername != null) {
244                    newConnection = localQueueConnectionFactory.
245                            createQueueConnection(localUsername, localPassword);
246                } else {
247                    newConnection = localQueueConnectionFactory.createQueueConnection();
248                }
249            }
250
251        } else {
252            // Clear if for now in case something goes wrong during the init.
253            newConnection = (QueueConnection) localConnection.getAndSet(null);
254        }
255
256        if (localClientId != null && localClientId.length() > 0) {
257            newConnection.setClientID(getLocalClientId());
258        }
259        newConnection.start();
260
261        inboundMessageConvertor.setConnection(newConnection);
262
263        // Configure the bridges with the new Local connection.
264        initializeInboundDestinationBridgesLocalSide(newConnection);
265        initializeOutboundDestinationBridgesLocalSide(newConnection);
266
267        // Register for any async error notifications now so we can reset in the
268        // case where there's not a lot of activity and a connection drops.
269        newConnection.setExceptionListener(new ExceptionListener() {
270            @Override
271            public void onException(JMSException exception) {
272                handleConnectionFailure(newConnection);
273            }
274        });
275
276        // At this point all looks good, so this our current connection now.
277        localConnection.set(newConnection);
278    }
279
280    protected void initializeInboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException {
281        if (inboundQueueBridges != null) {
282            QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
283
284            for (InboundQueueBridge bridge : inboundQueueBridges) {
285                String queueName = bridge.getInboundQueueName();
286                Queue foreignQueue = createForeignQueue(outboundSession, queueName);
287                bridge.setConsumer(null);
288                bridge.setConsumerQueue(foreignQueue);
289                bridge.setConsumerConnection(connection);
290                bridge.setJmsConnector(this);
291                addInboundBridge(bridge);
292            }
293            outboundSession.close();
294        }
295    }
296
297    protected void initializeInboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException {
298        if (inboundQueueBridges != null) {
299            QueueSession localSession = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
300
301            for (InboundQueueBridge bridge : inboundQueueBridges) {
302                String localQueueName = bridge.getLocalQueueName();
303                Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
304                bridge.setProducerQueue(activemqQueue);
305                bridge.setProducerConnection(connection);
306                if (bridge.getJmsMessageConvertor() == null) {
307                    bridge.setJmsMessageConvertor(getInboundMessageConvertor());
308                }
309                bridge.setJmsConnector(this);
310                addInboundBridge(bridge);
311            }
312            localSession.close();
313        }
314    }
315
316    protected void initializeOutboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException {
317        if (outboundQueueBridges != null) {
318            QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
319
320            for (OutboundQueueBridge bridge : outboundQueueBridges) {
321                String queueName = bridge.getOutboundQueueName();
322                Queue foreignQueue = createForeignQueue(outboundSession, queueName);
323                bridge.setProducerQueue(foreignQueue);
324                bridge.setProducerConnection(connection);
325                if (bridge.getJmsMessageConvertor() == null) {
326                    bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
327                }
328                bridge.setJmsConnector(this);
329                addOutboundBridge(bridge);
330            }
331            outboundSession.close();
332        }
333    }
334
335    protected void initializeOutboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException {
336        if (outboundQueueBridges != null) {
337            QueueSession localSession =
338                    connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
339
340            for (OutboundQueueBridge bridge : outboundQueueBridges) {
341                String localQueueName = bridge.getLocalQueueName();
342                Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
343                bridge.setConsumer(null);
344                bridge.setConsumerQueue(activemqQueue);
345                bridge.setConsumerConnection(connection);
346                bridge.setJmsConnector(this);
347                addOutboundBridge(bridge);
348            }
349            localSession.close();
350        }
351    }
352
353    protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
354                                              Connection replyToConsumerConnection) {
355        Queue replyToProducerQueue = (Queue)destination;
356        boolean isInbound = replyToProducerConnection.equals(localConnection.get());
357
358        if (isInbound) {
359            InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue);
360            if (bridge == null) {
361                bridge = new InboundQueueBridge() {
362                    protected Destination processReplyToDestination(Destination destination) {
363                        return null;
364                    }
365                };
366                try {
367                    QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
368                        .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
369                    Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
370                    replyToConsumerSession.close();
371                    bridge.setConsumerQueue(replyToConsumerQueue);
372                    bridge.setProducerQueue(replyToProducerQueue);
373                    bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
374                    bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
375                    bridge.setDoHandleReplyTo(false);
376                    if (bridge.getJmsMessageConvertor() == null) {
377                        bridge.setJmsMessageConvertor(getInboundMessageConvertor());
378                    }
379                    bridge.setJmsConnector(this);
380                    bridge.start();
381                    LOG.info("Created replyTo bridge for {}", replyToProducerQueue);
382                } catch (Exception e) {
383                    LOG.error("Failed to create replyTo bridge for queue: {}", replyToProducerQueue, e);
384                    return null;
385                }
386                replyToBridges.put(replyToProducerQueue, bridge);
387            }
388            return bridge.getConsumerQueue();
389        } else {
390            OutboundQueueBridge bridge = (OutboundQueueBridge)replyToBridges.get(replyToProducerQueue);
391            if (bridge == null) {
392                bridge = new OutboundQueueBridge() {
393                    protected Destination processReplyToDestination(Destination destination) {
394                        return null;
395                    }
396                };
397                try {
398                    QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
399                        .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
400                    Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
401                    replyToConsumerSession.close();
402                    bridge.setConsumerQueue(replyToConsumerQueue);
403                    bridge.setProducerQueue(replyToProducerQueue);
404                    bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
405                    bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
406                    bridge.setDoHandleReplyTo(false);
407                    if (bridge.getJmsMessageConvertor() == null) {
408                        bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
409                    }
410                    bridge.setJmsConnector(this);
411                    bridge.start();
412                    LOG.info("Created replyTo bridge for {}", replyToProducerQueue);
413                } catch (Exception e) {
414                    LOG.error("Failed to create replyTo bridge for queue: {}", replyToProducerQueue, e);
415                    return null;
416                }
417                replyToBridges.put(replyToProducerQueue, bridge);
418            }
419            return bridge.getConsumerQueue();
420        }
421    }
422
423    protected Queue createActiveMQQueue(QueueSession session, String queueName) throws JMSException {
424        return session.createQueue(queueName);
425    }
426
427    protected Queue createForeignQueue(QueueSession session, String queueName) throws JMSException {
428        Queue result = null;
429
430        if (preferJndiDestinationLookup) {
431            try {
432                // look-up the Queue
433                result = (Queue)jndiOutboundTemplate.lookup(queueName, Queue.class);
434            } catch (NamingException e) {
435                try {
436                    result = session.createQueue(queueName);
437                } catch (JMSException e1) {
438                    String errStr = "Failed to look-up or create Queue for name: " + queueName;
439                    LOG.error(errStr, e);
440                    JMSException jmsEx = new JMSException(errStr);
441                    jmsEx.setLinkedException(e1);
442                    throw jmsEx;
443                }
444            }
445        } else {
446            try {
447                result = session.createQueue(queueName);
448            } catch (JMSException e) {
449                // look-up the Queue
450                try {
451                    result = (Queue)jndiOutboundTemplate.lookup(queueName, Queue.class);
452                } catch (NamingException e1) {
453                    String errStr = "Failed to look-up Queue for name: " + queueName;
454                    LOG.error(errStr, e);
455                    JMSException jmsEx = new JMSException(errStr);
456                    jmsEx.setLinkedException(e1);
457                    throw jmsEx;
458                }
459            }
460        }
461
462        return result;
463    }
464}