001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.failover;
018
019import java.io.BufferedReader;
020import java.io.FileReader;
021import java.io.IOException;
022import java.io.InputStreamReader;
023import java.io.InterruptedIOException;
024import java.net.InetAddress;
025import java.net.MalformedURLException;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.net.URL;
029import java.util.ArrayList;
030import java.util.Collections;
031import java.util.HashSet;
032import java.util.Iterator;
033import java.util.LinkedHashMap;
034import java.util.LinkedHashSet;
035import java.util.List;
036import java.util.Map;
037import java.util.StringTokenizer;
038import java.util.concurrent.CopyOnWriteArrayList;
039import java.util.concurrent.atomic.AtomicReference;
040
041import org.apache.activemq.broker.SslContext;
042import org.apache.activemq.command.Command;
043import org.apache.activemq.command.ConnectionControl;
044import org.apache.activemq.command.ConsumerControl;
045import org.apache.activemq.command.ConnectionId;
046import org.apache.activemq.command.MessageDispatch;
047import org.apache.activemq.command.MessagePull;
048import org.apache.activemq.command.RemoveInfo;
049import org.apache.activemq.command.Response;
050
051import org.apache.activemq.state.ConnectionStateTracker;
052import org.apache.activemq.state.Tracked;
053import org.apache.activemq.thread.Task;
054import org.apache.activemq.thread.TaskRunner;
055import org.apache.activemq.thread.TaskRunnerFactory;
056import org.apache.activemq.transport.CompositeTransport;
057import org.apache.activemq.transport.DefaultTransportListener;
058import org.apache.activemq.transport.FutureResponse;
059import org.apache.activemq.transport.ResponseCallback;
060import org.apache.activemq.transport.Transport;
061import org.apache.activemq.transport.TransportFactory;
062import org.apache.activemq.transport.TransportListener;
063import org.apache.activemq.util.IOExceptionSupport;
064import org.apache.activemq.util.ServiceSupport;
065import org.apache.activemq.util.URISupport;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069/**
070 * A Transport that is made reliable by being able to fail over to another
071 * transport when a transport failure is detected.
072 */
073public class FailoverTransport implements CompositeTransport {
074
075    private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class);
076    private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10;
077    private static final int INFINITE = -1;
078    private TransportListener transportListener;
079    private volatile boolean disposed;
080    private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>();
081    private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>();
082
083    private final Object reconnectMutex = new Object();
084    private final Object backupMutex = new Object();
085    private final Object sleepMutex = new Object();
086    private final Object listenerMutex = new Object();
087    private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
088    private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>();
089
090    private URI connectedTransportURI;
091    private URI failedConnectTransportURI;
092    private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
093    private final TaskRunnerFactory reconnectTaskFactory;
094    private final TaskRunner reconnectTask;
095    private volatile boolean started;
096    private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
097    private long maxReconnectDelay = 1000 * 30;
098    private double backOffMultiplier = 2d;
099    private long timeout = INFINITE;
100    private boolean useExponentialBackOff = true;
101    private boolean randomize = true;
102    private int maxReconnectAttempts = INFINITE;
103    private int startupMaxReconnectAttempts = INFINITE;
104    private int connectFailures;
105    private int warnAfterReconnectAttempts = 10;
106    private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
107    private Exception connectionFailure;
108    private boolean firstConnection = true;
109    // optionally always have a backup created
110    private boolean backup = false;
111    private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>();
112    private int backupPoolSize = 1;
113    private boolean trackMessages = false;
114    private boolean trackTransactionProducers = true;
115    private int maxCacheSize = 128 * 1024;
116    private final TransportListener disposedListener = new DefaultTransportListener() {};
117    private boolean updateURIsSupported = true;
118    private boolean reconnectSupported = true;
119    // remember for reconnect thread
120    private SslContext brokerSslContext;
121    private String updateURIsURL = null;
122    private boolean rebalanceUpdateURIs = true;
123    private boolean doRebalance = false;
124    private boolean connectedToPriority = false;
125
126    private boolean priorityBackup = false;
127    private final ArrayList<URI> priorityList = new ArrayList<URI>();
128    private boolean priorityBackupAvailable = false;
129    private String nestedExtraQueryOptions;
130    private volatile boolean shuttingDown = false;
131
132    public FailoverTransport() {
133        brokerSslContext = SslContext.getCurrentSslContext();
134        stateTracker.setTrackTransactions(true);
135        // Setup a task that is used to reconnect the a connection async.
136        reconnectTaskFactory = new TaskRunnerFactory();
137        reconnectTaskFactory.init();
138        reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
139            @Override
140            public boolean iterate() {
141                boolean result = false;
142                if (!started) {
143                    return result;
144                }
145                boolean buildBackup = true;
146                synchronized (backupMutex) {
147                    if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) {
148                        result = doReconnect();
149                        buildBackup = false;
150                    }
151                }
152                if (buildBackup) {
153                    buildBackups();
154                    if (priorityBackup && !connectedToPriority) {
155                        try {
156                            doDelay();
157                            if (reconnectTask == null) {
158                                return true;
159                            }
160                            reconnectTask.wakeup();
161                        } catch (InterruptedException e) {
162                            LOG.debug("Reconnect task has been interrupted.", e);
163                        }
164                    }
165                } else {
166                    // build backups on the next iteration
167                    buildBackup = true;
168                    try {
169                        if (reconnectTask == null) {
170                            return true;
171                        }
172                        reconnectTask.wakeup();
173                    } catch (InterruptedException e) {
174                        LOG.debug("Reconnect task has been interrupted.", e);
175                    }
176                }
177                return result;
178            }
179
180        }, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
181    }
182
183    private void processCommand(Object incoming) {
184        Command command = (Command) incoming;
185        if (command == null) {
186            return;
187        }
188        if (command.isResponse()) {
189            Object object = null;
190            synchronized (requestMap) {
191                object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
192            }
193            if (object != null && object.getClass() == Tracked.class) {
194                ((Tracked) object).onResponses(command);
195            }
196        }
197
198        if (command.isConnectionControl()) {
199            handleConnectionControl((ConnectionControl) command);
200        } else if (command.isConsumerControl()) {
201            ConsumerControl consumerControl = (ConsumerControl)command;
202            if (consumerControl.isClose()) {
203                stateTracker.processRemoveConsumer(consumerControl.getConsumerId(), RemoveInfo.LAST_DELIVERED_UNKNOWN);
204            }
205        }
206
207        if (transportListener != null) {
208            transportListener.onCommand(command);
209        }
210    }
211
212    private TransportListener createTransportListener(final Transport owner) {
213        return new TransportListener() {
214
215            @Override
216            public void onCommand(Object o) {
217                processCommand(o);
218            }
219
220            @Override
221            public void onException(IOException error) {
222                try {
223                    handleTransportFailure(owner, error);
224                } catch (InterruptedException e) {
225                    Thread.currentThread().interrupt();
226                    if (transportListener != null) {
227                        transportListener.onException(new InterruptedIOException());
228                    }
229                }
230            }
231
232            @Override
233            public void transportInterupted() {
234            }
235
236            @Override
237            public void transportResumed() {
238            }
239        };
240    }
241
242    public final void disposeTransport(Transport transport) {
243        transport.setTransportListener(disposedListener);
244        ServiceSupport.dispose(transport);
245    }
246
247    public final void handleTransportFailure(IOException e) throws InterruptedException {
248        handleTransportFailure(getConnectedTransport(), e);
249    }
250
251    public final void handleTransportFailure(Transport failed, IOException e) throws InterruptedException {
252        if (shuttingDown) {
253            // shutdown info sent and remote socket closed and we see that before a local close
254            // let the close do the work
255            return;
256        }
257
258        if (LOG.isTraceEnabled()) {
259            LOG.trace(this + " handleTransportFailure: " + e, e);
260        }
261
262        // could be blocked in write with the reconnectMutex held, but still needs to be whacked
263        Transport transport = null;
264
265        if (connectedTransport.compareAndSet(failed, null)) {
266            transport = failed;
267            if (transport != null) {
268                disposeTransport(transport);
269            }
270        }
271
272        synchronized (reconnectMutex) {
273            if (transport != null && connectedTransport.get() == null) {
274                boolean reconnectOk = false;
275
276                if (canReconnect()) {
277                    reconnectOk = true;
278                }
279
280                LOG.warn("Transport ({}) failed{} attempting to automatically reconnect",
281                         connectedTransportURI, (reconnectOk ? "," : ", not"), e);
282
283                failedConnectTransportURI = connectedTransportURI;
284                connectedTransportURI = null;
285                connectedToPriority = false;
286
287                if (reconnectOk) {
288                    // notify before any reconnect attempt so ack state can be whacked
289                    if (transportListener != null) {
290                        transportListener.transportInterupted();
291                    }
292
293                    reconnectTask.wakeup();
294                } else if (!isDisposed()) {
295                    propagateFailureToExceptionListener(e);
296                }
297            }
298        }
299    }
300
301    private boolean canReconnect() {
302        return started && 0 != calculateReconnectAttemptLimit();
303    }
304
305    public final void handleConnectionControl(ConnectionControl control) {
306        String reconnectStr = control.getReconnectTo();
307        if (LOG.isTraceEnabled()) {
308            LOG.trace("Received ConnectionControl: {}", control);
309        }
310
311        if (reconnectStr != null) {
312            reconnectStr = reconnectStr.trim();
313            if (reconnectStr.length() > 0) {
314                try {
315                    URI uri = new URI(reconnectStr);
316                    if (isReconnectSupported()) {
317                        reconnect(uri);
318                        LOG.info("Reconnected to: " + uri);
319                    }
320                } catch (Exception e) {
321                    LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e);
322                }
323            }
324        }
325        processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers());
326    }
327
328    private final void processNewTransports(boolean rebalance, String newTransports) {
329        if (newTransports != null) {
330            newTransports = newTransports.trim();
331            if (newTransports.length() > 0 && isUpdateURIsSupported()) {
332                List<URI> list = new ArrayList<URI>();
333                StringTokenizer tokenizer = new StringTokenizer(newTransports, ",");
334                while (tokenizer.hasMoreTokens()) {
335                    String str = tokenizer.nextToken();
336                    try {
337                        URI uri = new URI(str);
338                        list.add(uri);
339                    } catch (Exception e) {
340                        LOG.error("Failed to parse broker address: " + str, e);
341                    }
342                }
343                if (list.isEmpty() == false) {
344                    try {
345                        updateURIs(rebalance, list.toArray(new URI[list.size()]));
346                    } catch (IOException e) {
347                        LOG.error("Failed to update transport URI's from: " + newTransports, e);
348                    }
349                }
350            }
351        }
352    }
353
354    @Override
355    public void start() throws Exception {
356        synchronized (reconnectMutex) {
357            LOG.debug("Started {}", this);
358            if (started) {
359                return;
360            }
361            started = true;
362            stateTracker.setMaxCacheSize(getMaxCacheSize());
363            stateTracker.setTrackMessages(isTrackMessages());
364            stateTracker.setTrackTransactionProducers(isTrackTransactionProducers());
365            if (connectedTransport.get() != null) {
366                stateTracker.restore(connectedTransport.get());
367            } else {
368                reconnect(false);
369            }
370        }
371    }
372
373    @Override
374    public void stop() throws Exception {
375        Transport transportToStop = null;
376        List<Transport> backupsToStop = new ArrayList<Transport>(backups.size());
377
378        try {
379            synchronized (reconnectMutex) {
380                if (LOG.isDebugEnabled()) {
381                    LOG.debug("Stopped {}", this);
382                }
383                if (!started) {
384                    return;
385                }
386                started = false;
387                disposed = true;
388
389                if (connectedTransport.get() != null) {
390                    transportToStop = connectedTransport.getAndSet(null);
391                }
392                reconnectMutex.notifyAll();
393            }
394            synchronized (sleepMutex) {
395                sleepMutex.notifyAll();
396            }
397        } finally {
398            reconnectTask.shutdown();
399            reconnectTaskFactory.shutdownNow();
400        }
401
402        synchronized(backupMutex) {
403            for (BackupTransport backup : backups) {
404                backup.setDisposed(true);
405                Transport transport = backup.getTransport();
406                if (transport != null) {
407                    transport.setTransportListener(disposedListener);
408                    backupsToStop.add(transport);
409                }
410            }
411            backups.clear();
412        }
413        for (Transport transport : backupsToStop) {
414            try {
415                LOG.trace("Stopped backup: {}", transport);
416                disposeTransport(transport);
417            } catch (Exception e) {
418            }
419        }
420        if (transportToStop != null) {
421            transportToStop.stop();
422        }
423    }
424
425    public long getInitialReconnectDelay() {
426        return initialReconnectDelay;
427    }
428
429    public void setInitialReconnectDelay(long initialReconnectDelay) {
430        this.initialReconnectDelay = initialReconnectDelay;
431    }
432
433    public long getMaxReconnectDelay() {
434        return maxReconnectDelay;
435    }
436
437    public void setMaxReconnectDelay(long maxReconnectDelay) {
438        this.maxReconnectDelay = maxReconnectDelay;
439    }
440
441    public long getReconnectDelay() {
442        return reconnectDelay;
443    }
444
445    public void setReconnectDelay(long reconnectDelay) {
446        this.reconnectDelay = reconnectDelay;
447    }
448
449    public double getReconnectDelayExponent() {
450        return backOffMultiplier;
451    }
452
453    public void setReconnectDelayExponent(double reconnectDelayExponent) {
454        this.backOffMultiplier = reconnectDelayExponent;
455    }
456
457    public Transport getConnectedTransport() {
458        return connectedTransport.get();
459    }
460
461    public URI getConnectedTransportURI() {
462        return connectedTransportURI;
463    }
464
465    public int getMaxReconnectAttempts() {
466        return maxReconnectAttempts;
467    }
468
469    public void setMaxReconnectAttempts(int maxReconnectAttempts) {
470        this.maxReconnectAttempts = maxReconnectAttempts;
471    }
472
473    public int getStartupMaxReconnectAttempts() {
474        return this.startupMaxReconnectAttempts;
475    }
476
477    public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) {
478        this.startupMaxReconnectAttempts = startupMaxReconnectAttempts;
479    }
480
481    public long getTimeout() {
482        return timeout;
483    }
484
485    public void setTimeout(long timeout) {
486        this.timeout = timeout;
487    }
488
489    /**
490     * @return Returns the randomize.
491     */
492    public boolean isRandomize() {
493        return randomize;
494    }
495
496    /**
497     * @param randomize The randomize to set.
498     */
499    public void setRandomize(boolean randomize) {
500        this.randomize = randomize;
501    }
502
503    public boolean isBackup() {
504        return backup;
505    }
506
507    public void setBackup(boolean backup) {
508        this.backup = backup;
509    }
510
511    public int getBackupPoolSize() {
512        return backupPoolSize;
513    }
514
515    public void setBackupPoolSize(int backupPoolSize) {
516        this.backupPoolSize = backupPoolSize;
517    }
518
519    public int getCurrentBackups() {
520        return this.backups.size();
521    }
522
523    public boolean isTrackMessages() {
524        return trackMessages;
525    }
526
527    public void setTrackMessages(boolean trackMessages) {
528        this.trackMessages = trackMessages;
529    }
530
531    public boolean isTrackTransactionProducers() {
532        return this.trackTransactionProducers;
533    }
534
535    public void setTrackTransactionProducers(boolean trackTransactionProducers) {
536        this.trackTransactionProducers = trackTransactionProducers;
537    }
538
539    public int getMaxCacheSize() {
540        return maxCacheSize;
541    }
542
543    public void setMaxCacheSize(int maxCacheSize) {
544        this.maxCacheSize = maxCacheSize;
545    }
546
547    public boolean isPriorityBackup() {
548        return priorityBackup;
549    }
550
551    public void setPriorityBackup(boolean priorityBackup) {
552        this.priorityBackup = priorityBackup;
553    }
554
555    public void setPriorityURIs(String priorityURIs) {
556        StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ",");
557        while (tokenizer.hasMoreTokens()) {
558            String str = tokenizer.nextToken();
559            try {
560                URI uri = new URI(str);
561                priorityList.add(uri);
562            } catch (Exception e) {
563                LOG.error("Failed to parse broker address: " + str, e);
564            }
565        }
566    }
567
568    @Override
569    public void oneway(Object o) throws IOException {
570
571        Command command = (Command) o;
572        Exception error = null;
573        try {
574
575            synchronized (reconnectMutex) {
576
577                if (command != null && connectedTransport.get() == null) {
578                    if (command.isShutdownInfo()) {
579                        // Skipping send of ShutdownInfo command when not connected.
580                        return;
581                    } else if (command instanceof RemoveInfo || command.isMessageAck()) {
582                        // Simulate response to RemoveInfo command or MessageAck (as it will be stale)
583                        stateTracker.track(command);
584                        if (command.isResponseRequired()) {
585                            Response response = new Response();
586                            response.setCorrelationId(command.getCommandId());
587                            processCommand(response);
588                        }
589                        return;
590                    } else if (command instanceof MessagePull) {
591                        // Simulate response to MessagePull if timed as we can't honor that now.
592                        MessagePull pullRequest = (MessagePull) command;
593                        if (pullRequest.getTimeout() != 0) {
594                            MessageDispatch dispatch = new MessageDispatch();
595                            dispatch.setConsumerId(pullRequest.getConsumerId());
596                            dispatch.setDestination(pullRequest.getDestination());
597                            processCommand(dispatch);
598                        }
599                        return;
600                    }
601                }
602
603                // Keep trying until the message is sent.
604                for (int i = 0; !disposed; i++) {
605                    try {
606
607                        // Wait for transport to be connected.
608                        Transport transport = connectedTransport.get();
609                        long start = System.currentTimeMillis();
610                        boolean timedout = false;
611                        while (transport == null && !disposed && connectionFailure == null
612                                && !Thread.currentThread().isInterrupted() && willReconnect()) {
613
614                            LOG.trace("Waiting for transport to reconnect..: {}", command);
615                            long end = System.currentTimeMillis();
616                            if (command.isMessage() && timeout > 0 && (end - start > timeout)) {
617                                timedout = true;
618                                LOG.info("Failover timed out after {} ms", (end - start));
619                                break;
620                            }
621                            try {
622                                reconnectMutex.wait(100);
623                            } catch (InterruptedException e) {
624                                Thread.currentThread().interrupt();
625                                LOG.debug("Interupted:", e);
626                            }
627                            transport = connectedTransport.get();
628                        }
629
630                        if (transport == null) {
631                            // Previous loop may have exited due to use being
632                            // disposed.
633                            if (disposed) {
634                                error = new IOException("Transport disposed.");
635                            } else if (connectionFailure != null) {
636                                error = connectionFailure;
637                            } else if (timedout == true) {
638                                error = new IOException("Failover timeout of " + timeout + " ms reached.");
639                            } else if (!willReconnect()) {
640                                error = new IOException("Reconnect attempts of " + maxReconnectAttempts + " exceeded");
641                            } else {
642                                error = new IOException("Unexpected failure.");
643                            }
644                            break;
645                        }
646
647                        Tracked tracked = null;
648                        try {
649                            tracked = stateTracker.track(command);
650                        } catch (IOException ioe) {
651                            LOG.debug("Cannot track the command {} {}", command, ioe);
652                        }
653                        // If it was a request and it was not being tracked by
654                        // the state tracker,
655                        // then hold it in the requestMap so that we can replay
656                        // it later.
657                        synchronized (requestMap) {
658                            if (tracked != null && tracked.isWaitingForResponse()) {
659                                requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
660                            } else if (tracked == null && command.isResponseRequired()) {
661                                requestMap.put(Integer.valueOf(command.getCommandId()), command);
662                            }
663                        }
664
665                        // Send the message.
666                        try {
667                            transport.oneway(command);
668                            stateTracker.trackBack(command);
669                            if (command.isShutdownInfo()) {
670                                shuttingDown = true;
671                            }
672                        } catch (IOException e) {
673
674                            // If the command was not tracked.. we will retry in
675                            // this method
676                            if (tracked == null && canReconnect()) {
677
678                                // since we will retry in this method.. take it
679                                // out of the request
680                                // map so that it is not sent 2 times on
681                                // recovery
682                                if (command.isResponseRequired()) {
683                                    requestMap.remove(Integer.valueOf(command.getCommandId()));
684                                }
685
686                                // Rethrow the exception so it will handled by
687                                // the outer catch
688                                throw e;
689                            } else {
690                                // Handle the error but allow the method to return since the
691                                // tracked commands are replayed on reconnect.
692                                LOG.debug("Send oneway attempt: {} failed for command: {}", i, command);
693                                handleTransportFailure(e);
694                            }
695                        }
696
697                        return;
698                    } catch (IOException e) {
699                        LOG.debug("Send oneway attempt: {} failed for command: {}", i, command);
700                        handleTransportFailure(e);
701                    }
702                }
703            }
704        } catch (InterruptedException e) {
705            // Some one may be trying to stop our thread.
706            Thread.currentThread().interrupt();
707            throw new InterruptedIOException();
708        }
709
710        if (!disposed) {
711            if (error != null) {
712                if (error instanceof IOException) {
713                    throw (IOException) error;
714                }
715                throw IOExceptionSupport.create(error);
716            }
717        }
718    }
719
720    private boolean willReconnect() {
721        return firstConnection || 0 != calculateReconnectAttemptLimit();
722    }
723
724    @Override
725    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
726        throw new AssertionError("Unsupported Method");
727    }
728
729    @Override
730    public Object request(Object command) throws IOException {
731        throw new AssertionError("Unsupported Method");
732    }
733
734    @Override
735    public Object request(Object command, int timeout) throws IOException {
736        throw new AssertionError("Unsupported Method");
737    }
738
739    @Override
740    public void add(boolean rebalance, URI u[]) {
741        boolean newURI = false;
742        for (URI uri : u) {
743            if (!contains(uri)) {
744                uris.add(uri);
745                newURI = true;
746            }
747        }
748        if (newURI) {
749            reconnect(rebalance);
750        }
751    }
752
753    @Override
754    public void remove(boolean rebalance, URI u[]) {
755        for (URI uri : u) {
756            uris.remove(uri);
757        }
758        // rebalance is automatic if any connected to removed/stopped broker
759    }
760
761    public void add(boolean rebalance, String u) {
762        try {
763            URI newURI = new URI(u);
764            if (contains(newURI) == false) {
765                uris.add(newURI);
766                reconnect(rebalance);
767            }
768
769        } catch (Exception e) {
770            LOG.error("Failed to parse URI: {}", u);
771        }
772    }
773
774    public void reconnect(boolean rebalance) {
775        synchronized (reconnectMutex) {
776            if (started) {
777                if (rebalance) {
778                    doRebalance = true;
779                }
780                LOG.debug("Waking up reconnect task");
781                try {
782                    reconnectTask.wakeup();
783                } catch (InterruptedException e) {
784                    Thread.currentThread().interrupt();
785                }
786            } else {
787                LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
788            }
789        }
790    }
791
792    private List<URI> getConnectList() {
793        // updated have precedence
794        LinkedHashSet<URI> uniqueUris = new LinkedHashSet<URI>(updated);
795        uniqueUris.addAll(uris);
796
797        boolean removed = false;
798        if (failedConnectTransportURI != null) {
799            removed = uniqueUris.remove(failedConnectTransportURI);
800        }
801
802        ArrayList<URI> l = new ArrayList<URI>(uniqueUris);
803        if (randomize) {
804            // Randomly, reorder the list by random swapping
805            for (int i = 0; i < l.size(); i++) {
806                // meed parenthesis due other JDKs (see AMQ-4826)
807                int p = ((int) (Math.random() * 100)) % l.size();
808                URI t = l.get(p);
809                l.set(p, l.get(i));
810                l.set(i, t);
811            }
812        }
813        if (removed) {
814            l.add(failedConnectTransportURI);
815        }
816
817        LOG.debug("urlList connectionList:{}, from: {}", l, uniqueUris);
818
819        return l;
820    }
821
822    @Override
823    public TransportListener getTransportListener() {
824        return transportListener;
825    }
826
827    @Override
828    public void setTransportListener(TransportListener commandListener) {
829        synchronized (listenerMutex) {
830            this.transportListener = commandListener;
831            listenerMutex.notifyAll();
832        }
833    }
834
835    @Override
836    public <T> T narrow(Class<T> target) {
837
838        if (target.isAssignableFrom(getClass())) {
839            return target.cast(this);
840        }
841        Transport transport = connectedTransport.get();
842        if (transport != null) {
843            return transport.narrow(target);
844        }
845        return null;
846
847    }
848
849    protected void restoreTransport(Transport t) throws Exception, IOException {
850        t.start();
851        // send information to the broker - informing it we are an ft client
852        ConnectionControl cc = new ConnectionControl();
853        cc.setFaultTolerant(true);
854        t.oneway(cc);
855        stateTracker.restore(t);
856        Map<Integer, Command> tmpMap = null;
857        synchronized (requestMap) {
858            tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
859        }
860        for (Command command : tmpMap.values()) {
861            LOG.trace("restore requestMap, replay: {}", command);
862            t.oneway(command);
863        }
864    }
865
866    public boolean isUseExponentialBackOff() {
867        return useExponentialBackOff;
868    }
869
870    public void setUseExponentialBackOff(boolean useExponentialBackOff) {
871        this.useExponentialBackOff = useExponentialBackOff;
872    }
873
874    @Override
875    public String toString() {
876        return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString();
877    }
878
879    @Override
880    public String getRemoteAddress() {
881        Transport transport = connectedTransport.get();
882        if (transport != null) {
883            return transport.getRemoteAddress();
884        }
885        return null;
886    }
887
888    @Override
889    public boolean isFaultTolerant() {
890        return true;
891    }
892
893    private void doUpdateURIsFromDisk() {
894        // If updateURIsURL is specified, read the file and add any new
895        // transport URI's to this FailOverTransport.
896        // Note: Could track file timestamp to avoid unnecessary reading.
897        String fileURL = getUpdateURIsURL();
898        if (fileURL != null) {
899            BufferedReader in = null;
900            String newUris = null;
901            StringBuffer buffer = new StringBuffer();
902
903            try {
904                in = new BufferedReader(getURLStream(fileURL));
905                while (true) {
906                    String line = in.readLine();
907                    if (line == null) {
908                        break;
909                    }
910                    buffer.append(line);
911                }
912                newUris = buffer.toString();
913            } catch (IOException ioe) {
914                LOG.error("Failed to read updateURIsURL: {} {}",fileURL, ioe);
915            } finally {
916                if (in != null) {
917                    try {
918                        in.close();
919                    } catch (IOException ioe) {
920                        // ignore
921                    }
922                }
923            }
924
925            processNewTransports(isRebalanceUpdateURIs(), newUris);
926        }
927    }
928
929    final boolean doReconnect() {
930        Exception failure = null;
931        synchronized (reconnectMutex) {
932            List<URI> connectList = null;
933            // First ensure we are up to date.
934            doUpdateURIsFromDisk();
935
936            if (disposed || connectionFailure != null) {
937                reconnectMutex.notifyAll();
938            }
939            if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) {
940                return false;
941            } else {
942                connectList = getConnectList();
943                if (connectList.isEmpty()) {
944                    failure = new IOException("No uris available to connect to.");
945                } else {
946                    if (doRebalance) {
947                        if (connectedToPriority || compareURIs(connectList.get(0), connectedTransportURI)) {
948                            // already connected to first in the list, no need to rebalance
949                            doRebalance = false;
950                            return false;
951                        } else {
952                            LOG.debug("Doing rebalance from: {} to {}", connectedTransportURI, connectList);
953
954                            try {
955                                Transport transport = this.connectedTransport.getAndSet(null);
956                                if (transport != null) {
957                                    disposeTransport(transport);
958                                }
959                            } catch (Exception e) {
960                                LOG.debug("Caught an exception stopping existing transport for rebalance", e);
961                            }
962                        }
963                        doRebalance = false;
964                    }
965
966                    resetReconnectDelay();
967
968                    Transport transport = null;
969                    URI uri = null;
970
971                    // If we have a backup already waiting lets try it.
972                    synchronized (backupMutex) {
973                        if ((priorityBackup || backup) && !backups.isEmpty()) {
974                            ArrayList<BackupTransport> l = new ArrayList<BackupTransport>(backups);
975                            if (randomize) {
976                                Collections.shuffle(l);
977                            }
978                            BackupTransport bt = l.remove(0);
979                            backups.remove(bt);
980                            transport = bt.getTransport();
981                            uri = bt.getUri();
982                            processCommand(bt.getBrokerInfo());
983                            if (priorityBackup && priorityBackupAvailable) {
984                                Transport old = this.connectedTransport.getAndSet(null);
985                                if (old != null) {
986                                    disposeTransport(old);
987                                }
988                                priorityBackupAvailable = false;
989                            }
990                        }
991                    }
992
993                    // When there was no backup and we are reconnecting for the first time
994                    // we honor the initialReconnectDelay before trying a new connection, after
995                    // this normal reconnect delay happens following a failed attempt.
996                    if (transport == null && !firstConnection && connectFailures == 0 && initialReconnectDelay > 0 && !disposed) {
997                        // reconnectDelay will be equal to initialReconnectDelay since we are on
998                        // the first connect attempt after we had a working connection, doDelay
999                        // will apply updates to move to the next reconnectDelay value based on
1000                        // configuration.
1001                        doDelay();
1002                    }
1003
1004                    Iterator<URI> iter = connectList.iterator();
1005                    while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) {
1006
1007                        try {
1008                            SslContext.setCurrentSslContext(brokerSslContext);
1009
1010                            // We could be starting with a backup and if so we wait to grab a
1011                            // URI from the pool until next time around.
1012                            if (transport == null) {
1013                                uri = addExtraQueryOptions(iter.next());
1014                                transport = TransportFactory.compositeConnect(uri);
1015                            }
1016
1017                            LOG.debug("Attempting {}th connect to: {}", connectFailures, uri);
1018
1019                            transport.setTransportListener(createTransportListener(transport));
1020                            transport.start();
1021
1022                            if (started && !firstConnection) {
1023                                restoreTransport(transport);
1024                            }
1025
1026                            LOG.debug("Connection established");
1027
1028                            reconnectDelay = initialReconnectDelay;
1029                            connectedTransportURI = uri;
1030                            connectedTransport.set(transport);
1031                            connectedToPriority = isPriority(connectedTransportURI);
1032                            reconnectMutex.notifyAll();
1033                            connectFailures = 0;
1034
1035                            // Make sure on initial startup, that the transportListener
1036                            // has been initialized for this instance.
1037                            synchronized (listenerMutex) {
1038                                if (transportListener == null) {
1039                                    try {
1040                                        // if it isn't set after 2secs - it probably never will be
1041                                        listenerMutex.wait(2000);
1042                                    } catch (InterruptedException ex) {
1043                                    }
1044                                }
1045                            }
1046
1047                            if (transportListener != null) {
1048                                transportListener.transportResumed();
1049                            } else {
1050                                LOG.debug("transport resumed by transport listener not set");
1051                            }
1052
1053                            if (firstConnection) {
1054                                firstConnection = false;
1055                                LOG.info("Successfully connected to {}", uri);
1056                            } else {
1057                                LOG.info("Successfully reconnected to {}", uri);
1058                            }
1059
1060                            return false;
1061                        } catch (Exception e) {
1062                            failure = e;
1063                            LOG.debug("Connect fail to: {}, reason: {}", uri, e);
1064                            if (transport != null) {
1065                                try {
1066                                    transport.stop();
1067                                    transport = null;
1068                                } catch (Exception ee) {
1069                                    LOG.debug("Stop of failed transport: {} failed with reason: {}", transport, ee);
1070                                }
1071                            }
1072                        } finally {
1073                            SslContext.setCurrentSslContext(null);
1074                        }
1075                    }
1076                }
1077            }
1078
1079            int reconnectLimit = calculateReconnectAttemptLimit();
1080
1081            connectFailures++;
1082            if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) {
1083                LOG.error("Failed to connect to {} after: {} attempt(s)", connectList, connectFailures);
1084                connectionFailure = failure;
1085
1086                // Make sure on initial startup, that the transportListener has been
1087                // initialized for this instance.
1088                synchronized (listenerMutex) {
1089                    if (transportListener == null) {
1090                        try {
1091                            listenerMutex.wait(2000);
1092                        } catch (InterruptedException ex) {
1093                        }
1094                    }
1095                }
1096
1097                propagateFailureToExceptionListener(connectionFailure);
1098                return false;
1099            }
1100
1101            int warnInterval = getWarnAfterReconnectAttempts();
1102            if (warnInterval > 0 && (connectFailures == 1 || (connectFailures % warnInterval) == 0)) {
1103                LOG.warn("Failed to connect to {} after: {} attempt(s) with {}, continuing to retry.",
1104                         connectList, connectFailures, (failure == null ? "?" : failure.getLocalizedMessage()));
1105            }
1106        }
1107
1108        if (!disposed) {
1109            doDelay();
1110        }
1111
1112        return !disposed;
1113    }
1114
1115    private void doDelay() {
1116        if (reconnectDelay > 0) {
1117            synchronized (sleepMutex) {
1118                LOG.debug("Waiting {} ms before attempting connection", reconnectDelay);
1119                try {
1120                    sleepMutex.wait(reconnectDelay);
1121                } catch (InterruptedException e) {
1122                    Thread.currentThread().interrupt();
1123                }
1124            }
1125        }
1126
1127        if (useExponentialBackOff) {
1128            // Exponential increment of reconnect delay.
1129            reconnectDelay *= backOffMultiplier;
1130            if (reconnectDelay > maxReconnectDelay) {
1131                reconnectDelay = maxReconnectDelay;
1132            }
1133        }
1134    }
1135
1136    private void resetReconnectDelay() {
1137        if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
1138            reconnectDelay = initialReconnectDelay;
1139        }
1140    }
1141
1142    /*
1143      * called with reconnectMutex held
1144     */
1145    private void propagateFailureToExceptionListener(Exception exception) {
1146        if (transportListener != null) {
1147            if (exception instanceof IOException) {
1148                transportListener.onException((IOException)exception);
1149            } else {
1150                transportListener.onException(IOExceptionSupport.create(exception));
1151            }
1152        }
1153        reconnectMutex.notifyAll();
1154    }
1155
1156    private int calculateReconnectAttemptLimit() {
1157        int maxReconnectValue = this.maxReconnectAttempts;
1158        if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) {
1159            maxReconnectValue = this.startupMaxReconnectAttempts;
1160        }
1161        return maxReconnectValue;
1162    }
1163
1164    private boolean shouldBuildBackups() {
1165       return (backup && backups.size() < backupPoolSize) || (priorityBackup && !(priorityBackupAvailable || connectedToPriority));
1166    }
1167
1168    final boolean buildBackups() {
1169        synchronized (backupMutex) {
1170            if (!disposed && shouldBuildBackups()) {
1171                ArrayList<URI> backupList = new ArrayList<URI>(priorityList);
1172                List<URI> connectList = getConnectList();
1173                for (URI uri: connectList) {
1174                    if (!backupList.contains(uri)) {
1175                        backupList.add(uri);
1176                    }
1177                }
1178                // removed disposed backups
1179                List<BackupTransport> disposedList = new ArrayList<BackupTransport>();
1180                for (BackupTransport bt : backups) {
1181                    if (bt.isDisposed()) {
1182                        disposedList.add(bt);
1183                    }
1184                }
1185                backups.removeAll(disposedList);
1186                disposedList.clear();
1187                for (Iterator<URI> iter = backupList.iterator(); !disposed && iter.hasNext() && shouldBuildBackups(); ) {
1188                    URI uri = addExtraQueryOptions(iter.next());
1189                    if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
1190                        try {
1191                            SslContext.setCurrentSslContext(brokerSslContext);
1192                            BackupTransport bt = new BackupTransport(this);
1193                            bt.setUri(uri);
1194                            if (!backups.contains(bt)) {
1195                                Transport t = TransportFactory.compositeConnect(uri);
1196                                t.setTransportListener(bt);
1197                                t.start();
1198                                bt.setTransport(t);
1199                                if (priorityBackup && isPriority(uri)) {
1200                                   priorityBackupAvailable = true;
1201                                   backups.add(0, bt);
1202                                   // if this priority backup overflows the pool
1203                                   // remove the backup with the lowest priority
1204                                   if (backups.size() > backupPoolSize) {
1205                                       BackupTransport disposeTransport = backups.remove(backups.size() - 1);
1206                                       disposeTransport.setDisposed(true);
1207                                       Transport transport = disposeTransport.getTransport();
1208                                       if (transport != null) {
1209                                           transport.setTransportListener(disposedListener);
1210                                           disposeTransport(transport);
1211                                       }
1212                                   }
1213                                } else {
1214                                    backups.add(bt);
1215                                }
1216                            }
1217                        } catch (Exception e) {
1218                            LOG.debug("Failed to build backup ", e);
1219                        } finally {
1220                            SslContext.setCurrentSslContext(null);
1221                        }
1222                    }
1223                }
1224            }
1225        }
1226        return false;
1227    }
1228
1229    protected boolean isPriority(URI uri) {
1230        if (!priorityBackup) {
1231            return false;
1232        }
1233
1234        if (!priorityList.isEmpty()) {
1235            return priorityList.contains(uri);
1236        }
1237        return uris.indexOf(uri) == 0;
1238    }
1239
1240    @Override
1241    public boolean isDisposed() {
1242        return disposed;
1243    }
1244
1245    @Override
1246    public boolean isConnected() {
1247        return connectedTransport.get() != null;
1248    }
1249
1250    @Override
1251    public void reconnect(URI uri) throws IOException {
1252        add(true, new URI[]{uri});
1253    }
1254
1255    @Override
1256    public boolean isReconnectSupported() {
1257        return this.reconnectSupported;
1258    }
1259
1260    public void setReconnectSupported(boolean value) {
1261        this.reconnectSupported = value;
1262    }
1263
1264    @Override
1265    public boolean isUpdateURIsSupported() {
1266        return this.updateURIsSupported;
1267    }
1268
1269    public void setUpdateURIsSupported(boolean value) {
1270        this.updateURIsSupported = value;
1271    }
1272
1273    @Override
1274    public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
1275        if (isUpdateURIsSupported()) {
1276            HashSet<URI> copy = new HashSet<URI>();
1277            synchronized (reconnectMutex) {
1278                copy.addAll(this.updated);
1279                updated.clear();
1280                if (updatedURIs != null && updatedURIs.length > 0) {
1281                    for (URI uri : updatedURIs) {
1282                        if (uri != null && !updated.contains(uri)) {
1283                            updated.add(uri);
1284                            if (failedConnectTransportURI != null && failedConnectTransportURI.equals(uri)) {
1285                                failedConnectTransportURI = null;
1286                            }
1287                        }
1288                    }
1289                }
1290            }
1291            if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(new HashSet<URI>(updated))) {
1292                buildBackups();
1293                reconnect(rebalance);
1294            }
1295        }
1296    }
1297
1298    /**
1299     * @return the updateURIsURL
1300     */
1301    public String getUpdateURIsURL() {
1302        return this.updateURIsURL;
1303    }
1304
1305    /**
1306     * @param updateURIsURL the updateURIsURL to set
1307     */
1308    public void setUpdateURIsURL(String updateURIsURL) {
1309        this.updateURIsURL = updateURIsURL;
1310    }
1311
1312    /**
1313     * @return the rebalanceUpdateURIs
1314     */
1315    public boolean isRebalanceUpdateURIs() {
1316        return this.rebalanceUpdateURIs;
1317    }
1318
1319    /**
1320     * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set
1321     */
1322    public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) {
1323        this.rebalanceUpdateURIs = rebalanceUpdateURIs;
1324    }
1325
1326    @Override
1327    public int getReceiveCounter() {
1328        Transport transport = connectedTransport.get();
1329        if (transport == null) {
1330            return 0;
1331        }
1332        return transport.getReceiveCounter();
1333    }
1334
1335    public int getConnectFailures() {
1336        return connectFailures;
1337    }
1338
1339    public void connectionInterruptProcessingComplete(ConnectionId connectionId) {
1340        synchronized (reconnectMutex) {
1341            stateTracker.connectionInterruptProcessingComplete(this, connectionId);
1342        }
1343    }
1344
1345    public ConnectionStateTracker getStateTracker() {
1346        return stateTracker;
1347    }
1348
1349    private boolean contains(URI newURI) {
1350        boolean result = false;
1351        for (URI uri : uris) {
1352            if (compareURIs(newURI, uri)) {
1353                result = true;
1354                break;
1355            }
1356        }
1357
1358        return result;
1359    }
1360
1361    private boolean compareURIs(final URI first, final URI second) {
1362
1363        boolean result = false;
1364        if (first == null || second == null) {
1365            return result;
1366        }
1367
1368        if (first.getPort() == second.getPort()) {
1369            InetAddress firstAddr = null;
1370            InetAddress secondAddr = null;
1371            try {
1372                firstAddr = InetAddress.getByName(first.getHost());
1373                secondAddr = InetAddress.getByName(second.getHost());
1374
1375                if (firstAddr.equals(secondAddr)) {
1376                    result = true;
1377                }
1378
1379            } catch(IOException e) {
1380
1381                if (firstAddr == null) {
1382                    LOG.error("Failed to Lookup INetAddress for URI[{}] : {}", first, e);
1383                } else {
1384                    LOG.error("Failed to Lookup INetAddress for URI[{}] : {}", second, e);
1385                }
1386
1387                if (first.getHost().equalsIgnoreCase(second.getHost())) {
1388                    result = true;
1389                }
1390            }
1391        }
1392
1393        return result;
1394    }
1395
1396    private InputStreamReader getURLStream(String path) throws IOException {
1397        InputStreamReader result = null;
1398        URL url = null;
1399        try {
1400            url = new URL(path);
1401            result = new InputStreamReader(url.openStream());
1402        } catch (MalformedURLException e) {
1403            // ignore - it could be a path to a a local file
1404        }
1405        if (result == null) {
1406            result = new FileReader(path);
1407        }
1408        return result;
1409    }
1410
1411    private URI addExtraQueryOptions(URI uri) {
1412        try {
1413            if( nestedExtraQueryOptions!=null && !nestedExtraQueryOptions.isEmpty() ) {
1414                if( uri.getQuery() == null ) {
1415                    uri = URISupport.createURIWithQuery(uri, nestedExtraQueryOptions);
1416                } else {
1417                    uri = URISupport.createURIWithQuery(uri, uri.getQuery()+"&"+nestedExtraQueryOptions);
1418                }
1419            }
1420        } catch (URISyntaxException e) {
1421            throw new RuntimeException(e);
1422        }
1423        return uri;
1424    }
1425
1426    public void setNestedExtraQueryOptions(String nestedExtraQueryOptions) {
1427        this.nestedExtraQueryOptions = nestedExtraQueryOptions;
1428    }
1429
1430    public int getWarnAfterReconnectAttempts() {
1431        return warnAfterReconnectAttempts;
1432    }
1433
1434    /**
1435     * Sets the number of Connect / Reconnect attempts that must occur before a warn message
1436     * is logged indicating that the transport is not connected.  This can be useful when the
1437     * client is running inside some container or service as it give an indication of some
1438     * problem with the client connection that might not otherwise be visible.  To disable the
1439     * log messages this value should be set to a value @{code attempts <= 0}
1440     *
1441     * @param warnAfterReconnectAttempts
1442     *      The number of failed connection attempts that must happen before a warning is logged.
1443     */
1444    public void setWarnAfterReconnectAttempts(int warnAfterReconnectAttempts) {
1445        this.warnAfterReconnectAttempts = warnAfterReconnectAttempts;
1446    }
1447
1448}