001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network.jms;
018
019import java.util.Iterator;
020import java.util.List;
021import java.util.Map;
022import java.util.concurrent.CopyOnWriteArrayList;
023import java.util.concurrent.LinkedBlockingQueue;
024import java.util.concurrent.ThreadFactory;
025import java.util.concurrent.ThreadPoolExecutor;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicBoolean;
028import java.util.concurrent.atomic.AtomicReference;
029
030import javax.jms.Connection;
031import javax.jms.Destination;
032
033import org.apache.activemq.ActiveMQConnectionFactory;
034import org.apache.activemq.Service;
035import org.apache.activemq.broker.BrokerService;
036import org.apache.activemq.util.LRUCache;
037import org.apache.activemq.util.ThreadPoolUtils;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * This bridge joins the gap between foreign JMS providers and ActiveMQ As some
043 * JMS providers are still only in compliance with JMS v1.0.1 , this bridge itself
044 * aimed to be in compliance with the JMS 1.0.2 specification.
045 */
046public abstract class JmsConnector implements Service {
047
048    private static int nextId;
049    private static final Logger LOG = LoggerFactory.getLogger(JmsConnector.class);
050
051    protected boolean preferJndiDestinationLookup = false;
052    protected JndiLookupFactory jndiLocalTemplate;
053    protected JndiLookupFactory jndiOutboundTemplate;
054    protected JmsMesageConvertor inboundMessageConvertor;
055    protected JmsMesageConvertor outboundMessageConvertor;
056    protected AtomicBoolean initialized = new AtomicBoolean(false);
057    protected AtomicBoolean localSideInitialized = new AtomicBoolean(false);
058    protected AtomicBoolean foreignSideInitialized = new AtomicBoolean(false);
059    protected AtomicBoolean started = new AtomicBoolean(false);
060    protected AtomicBoolean failed = new AtomicBoolean();
061    protected AtomicReference<Connection> foreignConnection = new AtomicReference<Connection>();
062    protected AtomicReference<Connection> localConnection = new AtomicReference<Connection>();
063    protected ActiveMQConnectionFactory embeddedConnectionFactory;
064    protected int replyToDestinationCacheSize = 10000;
065    protected String outboundUsername;
066    protected String outboundPassword;
067    protected String localUsername;
068    protected String localPassword;
069    protected String outboundClientId;
070    protected String localClientId;
071    protected LRUCache<Destination, DestinationBridge> replyToBridges = createLRUCache();
072
073    private ReconnectionPolicy policy = new ReconnectionPolicy();
074    protected ThreadPoolExecutor connectionSerivce;
075    private final List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
076    private final List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
077    private String name;
078
079    private static LRUCache<Destination, DestinationBridge> createLRUCache() {
080        return new LRUCache<Destination, DestinationBridge>() {
081            private static final long serialVersionUID = -7446792754185879286L;
082
083            @Override
084            protected boolean removeEldestEntry(Map.Entry<Destination, DestinationBridge> enty) {
085                if (size() > maxCacheSize) {
086                    Iterator<Map.Entry<Destination, DestinationBridge>> iter = entrySet().iterator();
087                    Map.Entry<Destination, DestinationBridge> lru = iter.next();
088                    remove(lru.getKey());
089                    DestinationBridge bridge = lru.getValue();
090                    try {
091                        bridge.stop();
092                        LOG.info("Expired bridge: {}", bridge);
093                    } catch (Exception e) {
094                        LOG.warn("Stopping expired bridge {} caused an exception", bridge, e);
095                    }
096                }
097                return false;
098            }
099        };
100    }
101
102    public boolean init() {
103        boolean result = initialized.compareAndSet(false, true);
104        if (result) {
105            if (jndiLocalTemplate == null) {
106                jndiLocalTemplate = new JndiLookupFactory();
107            }
108            if (jndiOutboundTemplate == null) {
109                jndiOutboundTemplate = new JndiLookupFactory();
110            }
111            if (inboundMessageConvertor == null) {
112                inboundMessageConvertor = new SimpleJmsMessageConvertor();
113            }
114            if (outboundMessageConvertor == null) {
115                outboundMessageConvertor = new SimpleJmsMessageConvertor();
116            }
117            replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize());
118
119            connectionSerivce = createExecutor();
120
121            // Subclasses can override this to customize their own it.
122            result = doConnectorInit();
123        }
124        return result;
125    }
126
127    protected boolean doConnectorInit() {
128
129        // We try to make a connection via a sync call first so that the
130        // JmsConnector is fully initialized before the start call returns
131        // in order to avoid missing any messages that are dispatched
132        // immediately after startup.  If either side fails we queue an
133        // asynchronous task to manage the reconnect attempts.
134
135        try {
136            initializeLocalConnection();
137            localSideInitialized.set(true);
138        } catch(Exception e) {
139            // Queue up the task to attempt the local connection.
140            scheduleAsyncLocalConnectionReconnect();
141        }
142
143        try {
144            initializeForeignConnection();
145            foreignSideInitialized.set(true);
146        } catch(Exception e) {
147            // Queue up the task for the foreign connection now.
148            scheduleAsyncForeignConnectionReconnect();
149        }
150
151        return true;
152    }
153
154    @Override
155    public void start() throws Exception {
156        if (started.compareAndSet(false, true)) {
157            init();
158            for (DestinationBridge bridge : inboundBridges) {
159                bridge.start();
160            }
161            for (DestinationBridge bridge : outboundBridges) {
162                bridge.start();
163            }
164            LOG.info("JMS Connector {} started", getName());
165        }
166    }
167
168    @Override
169    public void stop() throws Exception {
170        if (started.compareAndSet(true, false)) {
171
172            ThreadPoolUtils.shutdown(connectionSerivce);
173            connectionSerivce = null;
174
175            if (foreignConnection.get() != null) {
176                try {
177                    foreignConnection.get().close();
178                } catch (Exception e) {
179                }
180            }
181
182            if (localConnection.get() != null) {
183                try {
184                    localConnection.get().close();
185                } catch (Exception e) {
186                }
187            }
188
189            for (DestinationBridge bridge : inboundBridges) {
190                bridge.stop();
191            }
192            for (DestinationBridge bridge : outboundBridges) {
193                bridge.stop();
194            }
195            LOG.info("JMS Connector {} stopped", getName());
196        }
197    }
198
199    public void clearBridges() {
200        inboundBridges.clear();
201        outboundBridges.clear();
202        replyToBridges.clear();
203    }
204
205    protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection);
206
207    /**
208     * One way to configure the local connection - this is called by The
209     * BrokerService when the Connector is embedded
210     *
211     * @param service
212     */
213    public void setBrokerService(BrokerService service) {
214        embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI());
215    }
216
217    public Connection getLocalConnection() {
218        return this.localConnection.get();
219    }
220
221    public Connection getForeignConnection() {
222        return this.foreignConnection.get();
223    }
224
225    /**
226     * @return Returns the jndiTemplate.
227     */
228    public JndiLookupFactory getJndiLocalTemplate() {
229        return jndiLocalTemplate;
230    }
231
232    /**
233     * @param jndiTemplate The jndiTemplate to set.
234     */
235    public void setJndiLocalTemplate(JndiLookupFactory jndiTemplate) {
236        this.jndiLocalTemplate = jndiTemplate;
237    }
238
239    /**
240     * @return Returns the jndiOutboundTemplate.
241     */
242    public JndiLookupFactory getJndiOutboundTemplate() {
243        return jndiOutboundTemplate;
244    }
245
246    /**
247     * @param jndiOutboundTemplate The jndiOutboundTemplate to set.
248     */
249    public void setJndiOutboundTemplate(JndiLookupFactory jndiOutboundTemplate) {
250        this.jndiOutboundTemplate = jndiOutboundTemplate;
251    }
252
253    /**
254     * @return Returns the inboundMessageConvertor.
255     */
256    public JmsMesageConvertor getInboundMessageConvertor() {
257        return inboundMessageConvertor;
258    }
259
260    /**
261     * @param inboundMessageConvertor The inboundMessageConvertor to set.
262     */
263    public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
264        this.inboundMessageConvertor = jmsMessageConvertor;
265    }
266
267    /**
268     * @return Returns the outboundMessageConvertor.
269     */
270    public JmsMesageConvertor getOutboundMessageConvertor() {
271        return outboundMessageConvertor;
272    }
273
274    /**
275     * @param outboundMessageConvertor The outboundMessageConvertor to set.
276     */
277    public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor) {
278        this.outboundMessageConvertor = outboundMessageConvertor;
279    }
280
281    /**
282     * @return Returns the replyToDestinationCacheSize.
283     */
284    public int getReplyToDestinationCacheSize() {
285        return replyToDestinationCacheSize;
286    }
287
288    /**
289     * @param replyToDestinationCacheSize The replyToDestinationCacheSize to set.
290     */
291    public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) {
292        this.replyToDestinationCacheSize = replyToDestinationCacheSize;
293    }
294
295    /**
296     * @return Returns the localPassword.
297     */
298    public String getLocalPassword() {
299        return localPassword;
300    }
301
302    /**
303     * @param localPassword The localPassword to set.
304     */
305    public void setLocalPassword(String localPassword) {
306        this.localPassword = localPassword;
307    }
308
309    /**
310     * @return Returns the localUsername.
311     */
312    public String getLocalUsername() {
313        return localUsername;
314    }
315
316    /**
317     * @param localUsername The localUsername to set.
318     */
319    public void setLocalUsername(String localUsername) {
320        this.localUsername = localUsername;
321    }
322
323    /**
324     * @return Returns the outboundPassword.
325     */
326    public String getOutboundPassword() {
327        return outboundPassword;
328    }
329
330    /**
331     * @param outboundPassword The outboundPassword to set.
332     */
333    public void setOutboundPassword(String outboundPassword) {
334        this.outboundPassword = outboundPassword;
335    }
336
337    /**
338     * @return Returns the outboundUsername.
339     */
340    public String getOutboundUsername() {
341        return outboundUsername;
342    }
343
344    /**
345     * @param outboundUsername The outboundUsername to set.
346     */
347    public void setOutboundUsername(String outboundUsername) {
348        this.outboundUsername = outboundUsername;
349    }
350
351    /**
352     * @return the outboundClientId
353     */
354    public String getOutboundClientId() {
355        return outboundClientId;
356    }
357
358    /**
359     * @param outboundClientId the outboundClientId to set
360     */
361    public void setOutboundClientId(String outboundClientId) {
362        this.outboundClientId = outboundClientId;
363    }
364
365    /**
366     * @return the localClientId
367     */
368    public String getLocalClientId() {
369        return localClientId;
370    }
371
372    /**
373     * @param localClientId the localClientId to set
374     */
375    public void setLocalClientId(String localClientId) {
376        this.localClientId = localClientId;
377    }
378
379    /**
380     * @return the currently configured reconnection policy.
381     */
382    public ReconnectionPolicy getReconnectionPolicy() {
383        return this.policy;
384    }
385
386    /**
387     * @param policy The new reconnection policy this {@link JmsConnector} should use.
388     */
389    public void setReconnectionPolicy(ReconnectionPolicy policy) {
390        this.policy = policy;
391    }
392
393    /**
394     * @return the preferJndiDestinationLookup
395     */
396    public boolean isPreferJndiDestinationLookup() {
397        return preferJndiDestinationLookup;
398    }
399
400    /**
401     * Sets whether the connector should prefer to first try to find a destination in JNDI before
402     * using JMS semantics to create a Destination.  By default the connector will first use JMS
403     * semantics and then fall-back to JNDI lookup, setting this value to true will reverse that
404     * ordering.
405     *
406     * @param preferJndiDestinationLookup the preferJndiDestinationLookup to set
407     */
408    public void setPreferJndiDestinationLookup(boolean preferJndiDestinationLookup) {
409        this.preferJndiDestinationLookup = preferJndiDestinationLookup;
410    }
411
412    /**
413     * @return returns true if the {@link JmsConnector} is connected to both brokers.
414     */
415    public boolean isConnected() {
416        return localConnection.get() != null && foreignConnection.get() != null;
417    }
418
419    protected void addInboundBridge(DestinationBridge bridge) {
420        if (!inboundBridges.contains(bridge)) {
421            inboundBridges.add(bridge);
422        }
423    }
424
425    protected void addOutboundBridge(DestinationBridge bridge) {
426        if (!outboundBridges.contains(bridge)) {
427            outboundBridges.add(bridge);
428        }
429    }
430
431    protected void removeInboundBridge(DestinationBridge bridge) {
432        inboundBridges.remove(bridge);
433    }
434
435    protected void removeOutboundBridge(DestinationBridge bridge) {
436        outboundBridges.remove(bridge);
437    }
438
439    public String getName() {
440        if (name == null) {
441            name = "Connector:" + getNextId();
442        }
443        return name;
444    }
445
446    public void setName(String name) {
447        this.name = name;
448    }
449
450    private static synchronized int getNextId() {
451        return nextId++;
452    }
453
454    public boolean isFailed() {
455        return this.failed.get();
456    }
457
458    /**
459     * Performs the work of connection to the local side of the Connection.
460     * <p>
461     * This creates the initial connection to the local end of the {@link JmsConnector}
462     * and then sets up all the destination bridges with the information needed to bridge
463     * on the local side of the connection.
464     *
465     * @throws Exception if the connection cannot be established for any reason.
466     */
467    protected abstract void initializeLocalConnection() throws Exception;
468
469    /**
470     * Performs the work of connection to the foreign side of the Connection.
471     * <p>
472     * This creates the initial connection to the foreign end of the {@link JmsConnector}
473     * and then sets up all the destination bridges with the information needed to bridge
474     * on the foreign side of the connection.
475     *
476     * @throws Exception if the connection cannot be established for any reason.
477     */
478    protected abstract void initializeForeignConnection() throws Exception;
479
480    /**
481     * Callback method that the Destination bridges can use to report an exception to occurs
482     * during normal bridging operations.
483     *
484     * @param connection
485     *          The connection that was in use when the failure occured.
486     */
487    void handleConnectionFailure(Connection connection) {
488
489        // Can happen if async exception listener kicks in at the same time.
490        if (connection == null || !this.started.get()) {
491            return;
492        }
493
494        LOG.info("JmsConnector handling loss of connection [{}]", connection.toString());
495
496        // TODO - How do we handle the re-wiring of replyToBridges in this case.
497        replyToBridges.clear();
498
499        if (this.foreignConnection.compareAndSet(connection, null)) {
500
501            // Stop the inbound bridges when the foreign connection is dropped since
502            // the bridge has no consumer and needs to be restarted once a new connection
503            // to the foreign side is made.
504            for (DestinationBridge bridge : inboundBridges) {
505                try {
506                    bridge.stop();
507                } catch(Exception e) {
508                }
509            }
510
511            // We got here first and cleared the connection, now we queue a reconnect.
512            this.connectionSerivce.execute(new Runnable() {
513
514                @Override
515                public void run() {
516                    try {
517                        doInitializeConnection(false);
518                    } catch (Exception e) {
519                        LOG.error("Failed to initialize foreign connection for the JMSConnector", e);
520                    }
521                }
522            });
523
524        } else if (this.localConnection.compareAndSet(connection, null)) {
525
526            // Stop the outbound bridges when the local connection is dropped since
527            // the bridge has no consumer and needs to be restarted once a new connection
528            // to the local side is made.
529            for (DestinationBridge bridge : outboundBridges) {
530                try {
531                    bridge.stop();
532                } catch(Exception e) {
533                }
534            }
535
536            // We got here first and cleared the connection, now we queue a reconnect.
537            this.connectionSerivce.execute(new Runnable() {
538
539                @Override
540                public void run() {
541                    try {
542                        doInitializeConnection(true);
543                    } catch (Exception e) {
544                        LOG.error("Failed to initialize local connection for the JMSConnector", e);
545                    }
546                }
547            });
548        }
549    }
550
551    private void scheduleAsyncLocalConnectionReconnect() {
552        this.connectionSerivce.execute(new Runnable() {
553            @Override
554            public void run() {
555                try {
556                    doInitializeConnection(true);
557                } catch (Exception e) {
558                    LOG.error("Failed to initialize local connection for the JMSConnector", e);
559                }
560            }
561        });
562    }
563
564    private void scheduleAsyncForeignConnectionReconnect() {
565        this.connectionSerivce.execute(new Runnable() {
566            @Override
567            public void run() {
568                try {
569                    doInitializeConnection(false);
570                } catch (Exception e) {
571                    LOG.error("Failed to initialize foreign connection for the JMSConnector", e);
572                }
573            }
574        });
575    }
576
577    private void doInitializeConnection(boolean local) throws Exception {
578
579        int attempt = 0;
580
581        final int maxRetries;
582        if (local) {
583            maxRetries = !localSideInitialized.get() ? policy.getMaxInitialConnectAttempts() :
584                                                       policy.getMaxReconnectAttempts();
585        } else {
586            maxRetries = !foreignSideInitialized.get() ? policy.getMaxInitialConnectAttempts() :
587                                                         policy.getMaxReconnectAttempts();
588        }
589
590        do
591        {
592            if (attempt > 0) {
593                try {
594                    Thread.sleep(policy.getNextDelay(attempt));
595                } catch(InterruptedException e) {
596                }
597            }
598
599            if (connectionSerivce.isTerminating()) {
600                return;
601            }
602
603            try {
604
605                if (local) {
606                    initializeLocalConnection();
607                    localSideInitialized.set(true);
608                } else {
609                    initializeForeignConnection();
610                    foreignSideInitialized.set(true);
611                }
612
613                // Once we are connected we ensure all the bridges are started.
614                if (localConnection.get() != null && foreignConnection.get() != null) {
615                    for (DestinationBridge bridge : inboundBridges) {
616                        bridge.start();
617                    }
618                    for (DestinationBridge bridge : outboundBridges) {
619                        bridge.start();
620                    }
621                }
622
623                return;
624            } catch(Exception e) {
625                LOG.debug("Failed to establish initial {} connection for JmsConnector [{}]", new Object[]{ (local ? "local" : "foreign"), attempt }, e);
626            }
627        }
628        while (maxRetries < ++attempt && !connectionSerivce.isTerminating());
629
630        this.failed.set(true);
631    }
632
633    private final ThreadFactory factory = new ThreadFactory() {
634        @Override
635        public Thread newThread(Runnable runnable) {
636            Thread thread = new Thread(runnable, "JmsConnector Async Connection Task: ");
637            thread.setDaemon(true);
638            return thread;
639        }
640    };
641
642    private ThreadPoolExecutor createExecutor() {
643        ThreadPoolExecutor exec = new ThreadPoolExecutor(0, 2, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
644        exec.allowCoreThreadTimeOut(true);
645        return exec;
646    }
647}