001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network.jms;
018
019import javax.jms.Connection;
020import javax.jms.Destination;
021import javax.jms.ExceptionListener;
022import javax.jms.JMSException;
023import javax.jms.Session;
024import javax.jms.Topic;
025import javax.jms.TopicConnection;
026import javax.jms.TopicConnectionFactory;
027import javax.jms.TopicSession;
028import javax.naming.NamingException;
029
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * A Bridge to other JMS Topic providers
035 */
036public class SimpleJmsTopicConnector extends JmsConnector {
037    private static final Logger LOG = LoggerFactory.getLogger(SimpleJmsTopicConnector.class);
038    private String outboundTopicConnectionFactoryName;
039    private String localConnectionFactoryName;
040    private TopicConnectionFactory outboundTopicConnectionFactory;
041    private TopicConnectionFactory localTopicConnectionFactory;
042    private InboundTopicBridge[] inboundTopicBridges;
043    private OutboundTopicBridge[] outboundTopicBridges;
044
045    /**
046     * @return Returns the inboundTopicBridges.
047     */
048    public InboundTopicBridge[] getInboundTopicBridges() {
049        return inboundTopicBridges;
050    }
051
052    /**
053     * @param inboundTopicBridges The inboundTopicBridges to set.
054     */
055    public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges) {
056        this.inboundTopicBridges = inboundTopicBridges;
057    }
058
059    /**
060     * @return Returns the outboundTopicBridges.
061     */
062    public OutboundTopicBridge[] getOutboundTopicBridges() {
063        return outboundTopicBridges;
064    }
065
066    /**
067     * @param outboundTopicBridges The outboundTopicBridges to set.
068     */
069    public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges) {
070        this.outboundTopicBridges = outboundTopicBridges;
071    }
072
073    /**
074     * @return Returns the localTopicConnectionFactory.
075     */
076    public TopicConnectionFactory getLocalTopicConnectionFactory() {
077        return localTopicConnectionFactory;
078    }
079
080    /**
081     * @param localTopicConnectionFactory The localTopicConnectionFactory to set.
082     */
083    public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory) {
084        this.localTopicConnectionFactory = localConnectionFactory;
085    }
086
087    /**
088     * @return Returns the outboundTopicConnectionFactory.
089     */
090    public TopicConnectionFactory getOutboundTopicConnectionFactory() {
091        return outboundTopicConnectionFactory;
092    }
093
094    /**
095     * @return Returns the outboundTopicConnectionFactoryName.
096     */
097    public String getOutboundTopicConnectionFactoryName() {
098        return outboundTopicConnectionFactoryName;
099    }
100
101    /**
102     * @param outboundTopicConnectionFactoryName The outboundTopicConnectionFactoryName to set.
103     */
104    public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) {
105        this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName;
106    }
107
108    /**
109     * @return Returns the localConnectionFactoryName.
110     */
111    public String getLocalConnectionFactoryName() {
112        return localConnectionFactoryName;
113    }
114
115    /**
116     * @param localConnectionFactoryName The localConnectionFactoryName to set.
117     */
118    public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
119        this.localConnectionFactoryName = localConnectionFactoryName;
120    }
121
122    /**
123     * @return Returns the localTopicConnection.
124     */
125    public TopicConnection getLocalTopicConnection() {
126        return (TopicConnection) localConnection.get();
127    }
128
129    /**
130     * @param localTopicConnection The localTopicConnection to set.
131     */
132    public void setLocalTopicConnection(TopicConnection localTopicConnection) {
133        this.localConnection.set(localTopicConnection);
134    }
135
136    /**
137     * @return Returns the outboundTopicConnection.
138     */
139    public TopicConnection getOutboundTopicConnection() {
140        return (TopicConnection) foreignConnection.get();
141    }
142
143    /**
144     * @param outboundTopicConnection The outboundTopicConnection to set.
145     */
146    public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) {
147        this.foreignConnection.set(foreignTopicConnection);
148    }
149
150    /**
151     * @param outboundTopicConnectionFactory The outboundTopicConnectionFactory to set.
152     */
153    public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) {
154        this.outboundTopicConnectionFactory = foreignTopicConnectionFactory;
155    }
156
157    @Override
158    protected void initializeForeignConnection() throws NamingException, JMSException {
159
160        final TopicConnection newConnection;
161
162        if (foreignConnection.get() == null) {
163            // get the connection factories
164            if (outboundTopicConnectionFactory == null) {
165                // look it up from JNDI
166                if (outboundTopicConnectionFactoryName != null) {
167                    outboundTopicConnectionFactory = (TopicConnectionFactory)jndiOutboundTemplate
168                        .lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class);
169                    if (outboundUsername != null) {
170                        newConnection = outboundTopicConnectionFactory
171                            .createTopicConnection(outboundUsername, outboundPassword);
172                    } else {
173                        newConnection = outboundTopicConnectionFactory.createTopicConnection();
174                    }
175                } else {
176                    throw new JMSException("Cannot create foreignConnection - no information");
177                }
178            } else {
179                if (outboundUsername != null) {
180                    newConnection = outboundTopicConnectionFactory
181                        .createTopicConnection(outboundUsername, outboundPassword);
182                } else {
183                    newConnection = outboundTopicConnectionFactory.createTopicConnection();
184                }
185            }
186        } else {
187            // Clear if for now in case something goes wrong during the init.
188            newConnection = (TopicConnection) foreignConnection.getAndSet(null);
189        }
190
191        if (outboundClientId != null && outboundClientId.length() > 0) {
192            newConnection.setClientID(getOutboundClientId());
193        }
194        newConnection.start();
195
196        outboundMessageConvertor.setConnection(newConnection);
197
198        // Configure the bridges with the new Outbound connection.
199        initializeInboundDestinationBridgesOutboundSide(newConnection);
200        initializeOutboundDestinationBridgesOutboundSide(newConnection);
201
202        // Register for any async error notifications now so we can reset in the
203        // case where there's not a lot of activity and a connection drops.
204        newConnection.setExceptionListener(new ExceptionListener() {
205            @Override
206            public void onException(JMSException exception) {
207                handleConnectionFailure(newConnection);
208            }
209        });
210
211        // At this point all looks good, so this our current connection now.
212        foreignConnection.set(newConnection);
213    }
214
215    @Override
216    protected void initializeLocalConnection() throws NamingException, JMSException {
217
218        final TopicConnection newConnection;
219
220        if (localConnection.get() == null) {
221            // get the connection factories
222            if (localTopicConnectionFactory == null) {
223                if (embeddedConnectionFactory == null) {
224                    // look it up from JNDI
225                    if (localConnectionFactoryName != null) {
226                        localTopicConnectionFactory = (TopicConnectionFactory)jndiLocalTemplate
227                            .lookup(localConnectionFactoryName, TopicConnectionFactory.class);
228                        if (localUsername != null) {
229                            newConnection = localTopicConnectionFactory
230                                .createTopicConnection(localUsername, localPassword);
231                        } else {
232                            newConnection = localTopicConnectionFactory.createTopicConnection();
233                        }
234                    } else {
235                        throw new JMSException("Cannot create localConnection - no information");
236                    }
237                } else {
238                    newConnection = embeddedConnectionFactory.createTopicConnection();
239                }
240            } else {
241                if (localUsername != null) {
242                    newConnection = localTopicConnectionFactory.
243                            createTopicConnection(localUsername, localPassword);
244                } else {
245                    newConnection = localTopicConnectionFactory.createTopicConnection();
246                }
247            }
248
249        } else {
250            // Clear if for now in case something goes wrong during the init.
251            newConnection = (TopicConnection) localConnection.getAndSet(null);
252        }
253
254        if (localClientId != null && localClientId.length() > 0) {
255            newConnection.setClientID(getLocalClientId());
256        }
257        newConnection.start();
258
259        inboundMessageConvertor.setConnection(newConnection);
260
261        // Configure the bridges with the new Local connection.
262        initializeInboundDestinationBridgesLocalSide(newConnection);
263        initializeOutboundDestinationBridgesLocalSide(newConnection);
264
265        // Register for any async error notifications now so we can reset in the
266        // case where there's not a lot of activity and a connection drops.
267        newConnection.setExceptionListener(new ExceptionListener() {
268            @Override
269            public void onException(JMSException exception) {
270                handleConnectionFailure(newConnection);
271            }
272        });
273
274        // At this point all looks good, so this our current connection now.
275        localConnection.set(newConnection);
276    }
277
278    protected void initializeInboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException {
279        if (inboundTopicBridges != null) {
280            TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
281
282            for (InboundTopicBridge bridge : inboundTopicBridges) {
283                String TopicName = bridge.getInboundTopicName();
284                Topic foreignTopic = createForeignTopic(outboundSession, TopicName);
285                bridge.setConsumer(null);
286                bridge.setConsumerTopic(foreignTopic);
287                bridge.setConsumerConnection(connection);
288                bridge.setJmsConnector(this);
289                addInboundBridge(bridge);
290            }
291            outboundSession.close();
292        }
293    }
294
295    protected void initializeInboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException {
296        if (inboundTopicBridges != null) {
297            TopicSession localSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
298
299            for (InboundTopicBridge bridge : inboundTopicBridges) {
300                String localTopicName = bridge.getLocalTopicName();
301                Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
302                bridge.setProducerTopic(activemqTopic);
303                bridge.setProducerConnection(connection);
304                if (bridge.getJmsMessageConvertor() == null) {
305                    bridge.setJmsMessageConvertor(getInboundMessageConvertor());
306                }
307                bridge.setJmsConnector(this);
308                addInboundBridge(bridge);
309            }
310            localSession.close();
311        }
312    }
313
314    protected void initializeOutboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException {
315        if (outboundTopicBridges != null) {
316            TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
317
318            for (OutboundTopicBridge bridge : outboundTopicBridges) {
319                String topicName = bridge.getOutboundTopicName();
320                Topic foreignTopic = createForeignTopic(outboundSession, topicName);
321                bridge.setProducerTopic(foreignTopic);
322                bridge.setProducerConnection(connection);
323                if (bridge.getJmsMessageConvertor() == null) {
324                    bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
325                }
326                bridge.setJmsConnector(this);
327                addOutboundBridge(bridge);
328            }
329            outboundSession.close();
330        }
331    }
332
333    protected void initializeOutboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException {
334        if (outboundTopicBridges != null) {
335            TopicSession localSession =
336                    connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
337
338            for (OutboundTopicBridge bridge : outboundTopicBridges) {
339                String localTopicName = bridge.getLocalTopicName();
340                Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
341                bridge.setConsumer(null);
342                bridge.setConsumerTopic(activemqTopic);
343                bridge.setConsumerConnection(connection);
344                bridge.setJmsConnector(this);
345                addOutboundBridge(bridge);
346            }
347            localSession.close();
348        }
349    }
350
351    protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
352                                              Connection replyToConsumerConnection) {
353        Topic replyToProducerTopic = (Topic)destination;
354        boolean isInbound = replyToProducerConnection.equals(localConnection.get());
355
356        if (isInbound) {
357            InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic);
358            if (bridge == null) {
359                bridge = new InboundTopicBridge() {
360                    protected Destination processReplyToDestination(Destination destination) {
361                        return null;
362                    }
363                };
364                try {
365                    TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
366                        .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
367                    Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
368                    replyToConsumerSession.close();
369                    bridge.setConsumerTopic(replyToConsumerTopic);
370                    bridge.setProducerTopic(replyToProducerTopic);
371                    bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
372                    bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
373                    bridge.setDoHandleReplyTo(false);
374                    if (bridge.getJmsMessageConvertor() == null) {
375                        bridge.setJmsMessageConvertor(getInboundMessageConvertor());
376                    }
377                    bridge.setJmsConnector(this);
378                    bridge.start();
379                    LOG.info("Created replyTo bridge for {}", replyToProducerTopic);
380                } catch (Exception e) {
381                    LOG.error("Failed to create replyTo bridge for topic: {}", replyToProducerTopic, e);
382                    return null;
383                }
384                replyToBridges.put(replyToProducerTopic, bridge);
385            }
386            return bridge.getConsumerTopic();
387        } else {
388            OutboundTopicBridge bridge = (OutboundTopicBridge)replyToBridges.get(replyToProducerTopic);
389            if (bridge == null) {
390                bridge = new OutboundTopicBridge() {
391                    protected Destination processReplyToDestination(Destination destination) {
392                        return null;
393                    }
394                };
395                try {
396                    TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
397                        .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
398                    Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
399                    replyToConsumerSession.close();
400                    bridge.setConsumerTopic(replyToConsumerTopic);
401                    bridge.setProducerTopic(replyToProducerTopic);
402                    bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
403                    bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
404                    bridge.setDoHandleReplyTo(false);
405                    if (bridge.getJmsMessageConvertor() == null) {
406                        bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
407                    }
408                    bridge.setJmsConnector(this);
409                    bridge.start();
410                    LOG.info("Created replyTo bridge for {}", replyToProducerTopic);
411                } catch (Exception e) {
412                    LOG.error("Failed to create replyTo bridge for topic: {}", replyToProducerTopic, e);
413                    return null;
414                }
415                replyToBridges.put(replyToProducerTopic, bridge);
416            }
417            return bridge.getConsumerTopic();
418        }
419    }
420
421    protected Topic createActiveMQTopic(TopicSession session, String topicName) throws JMSException {
422        return session.createTopic(topicName);
423    }
424
425    protected Topic createForeignTopic(TopicSession session, String topicName) throws JMSException {
426        Topic result = null;
427
428        if (preferJndiDestinationLookup) {
429            try {
430                // look-up the Queue
431                result = (Topic)jndiOutboundTemplate.lookup(topicName, Topic.class);
432            } catch (NamingException e) {
433                try {
434                    result = session.createTopic(topicName);
435                } catch (JMSException e1) {
436                    String errStr = "Failed to look-up or create Topic for name: " + topicName;
437                    LOG.error(errStr, e);
438                    JMSException jmsEx = new JMSException(errStr);
439                    jmsEx.setLinkedException(e1);
440                    throw jmsEx;
441                }
442            }
443        } else {
444            try {
445                result = session.createTopic(topicName);
446            } catch (JMSException e) {
447                // look-up the Topic
448                try {
449                    result = (Topic)jndiOutboundTemplate.lookup(topicName, Topic.class);
450                } catch (NamingException e1) {
451                    String errStr = "Failed to look-up Topic for name: " + topicName;
452                    LOG.error(errStr, e);
453                    JMSException jmsEx = new JMSException(errStr);
454                    jmsEx.setLinkedException(e1);
455                    throw jmsEx;
456                }
457            }
458        }
459        return result;
460    }
461
462}