001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.failover; 018 019import java.io.BufferedReader; 020import java.io.FileReader; 021import java.io.IOException; 022import java.io.InputStreamReader; 023import java.io.InterruptedIOException; 024import java.net.InetAddress; 025import java.net.MalformedURLException; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.net.URL; 029import java.util.ArrayList; 030import java.util.Collections; 031import java.util.HashSet; 032import java.util.Iterator; 033import java.util.LinkedHashMap; 034import java.util.LinkedHashSet; 035import java.util.List; 036import java.util.Map; 037import java.util.StringTokenizer; 038import java.util.concurrent.CopyOnWriteArrayList; 039import java.util.concurrent.atomic.AtomicReference; 040 041import org.apache.activemq.broker.SslContext; 042import org.apache.activemq.command.Command; 043import org.apache.activemq.command.ConnectionControl; 044import org.apache.activemq.command.ConsumerControl; 045import org.apache.activemq.command.ConnectionId; 046import org.apache.activemq.command.MessageDispatch; 047import org.apache.activemq.command.MessagePull; 048import org.apache.activemq.command.RemoveInfo; 049import org.apache.activemq.command.Response; 050 051import org.apache.activemq.state.ConnectionStateTracker; 052import org.apache.activemq.state.Tracked; 053import org.apache.activemq.thread.Task; 054import org.apache.activemq.thread.TaskRunner; 055import org.apache.activemq.thread.TaskRunnerFactory; 056import org.apache.activemq.transport.CompositeTransport; 057import org.apache.activemq.transport.DefaultTransportListener; 058import org.apache.activemq.transport.FutureResponse; 059import org.apache.activemq.transport.ResponseCallback; 060import org.apache.activemq.transport.Transport; 061import org.apache.activemq.transport.TransportFactory; 062import org.apache.activemq.transport.TransportListener; 063import org.apache.activemq.util.IOExceptionSupport; 064import org.apache.activemq.util.ServiceSupport; 065import org.apache.activemq.util.URISupport; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069/** 070 * A Transport that is made reliable by being able to fail over to another 071 * transport when a transport failure is detected. 072 */ 073public class FailoverTransport implements CompositeTransport { 074 075 private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class); 076 private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10; 077 private static final int INFINITE = -1; 078 private TransportListener transportListener; 079 private volatile boolean disposed; 080 private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>(); 081 private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>(); 082 083 private final Object reconnectMutex = new Object(); 084 private final Object backupMutex = new Object(); 085 private final Object sleepMutex = new Object(); 086 private final Object listenerMutex = new Object(); 087 private final ConnectionStateTracker stateTracker = new ConnectionStateTracker(); 088 private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>(); 089 090 private URI connectedTransportURI; 091 private URI failedConnectTransportURI; 092 private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>(); 093 private final TaskRunnerFactory reconnectTaskFactory; 094 private final TaskRunner reconnectTask; 095 private volatile boolean started; 096 private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY; 097 private long maxReconnectDelay = 1000 * 30; 098 private double backOffMultiplier = 2d; 099 private long timeout = INFINITE; 100 private boolean useExponentialBackOff = true; 101 private boolean randomize = true; 102 private int maxReconnectAttempts = INFINITE; 103 private int startupMaxReconnectAttempts = INFINITE; 104 private int connectFailures; 105 private int warnAfterReconnectAttempts = 10; 106 private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY; 107 private Exception connectionFailure; 108 private boolean firstConnection = true; 109 // optionally always have a backup created 110 private boolean backup = false; 111 private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>(); 112 private int backupPoolSize = 1; 113 private boolean trackMessages = false; 114 private boolean trackTransactionProducers = true; 115 private int maxCacheSize = 128 * 1024; 116 private final TransportListener disposedListener = new DefaultTransportListener() {}; 117 private boolean updateURIsSupported = true; 118 private boolean reconnectSupported = true; 119 // remember for reconnect thread 120 private SslContext brokerSslContext; 121 private String updateURIsURL = null; 122 private boolean rebalanceUpdateURIs = true; 123 private boolean doRebalance = false; 124 private boolean connectedToPriority = false; 125 126 private boolean priorityBackup = false; 127 private final ArrayList<URI> priorityList = new ArrayList<URI>(); 128 private boolean priorityBackupAvailable = false; 129 private String nestedExtraQueryOptions; 130 private volatile boolean shuttingDown = false; 131 132 public FailoverTransport() { 133 brokerSslContext = SslContext.getCurrentSslContext(); 134 stateTracker.setTrackTransactions(true); 135 // Setup a task that is used to reconnect the a connection async. 136 reconnectTaskFactory = new TaskRunnerFactory(); 137 reconnectTaskFactory.init(); 138 reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() { 139 @Override 140 public boolean iterate() { 141 boolean result = false; 142 if (!started) { 143 return result; 144 } 145 boolean buildBackup = true; 146 synchronized (backupMutex) { 147 if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) { 148 result = doReconnect(); 149 buildBackup = false; 150 } 151 } 152 if (buildBackup) { 153 buildBackups(); 154 if (priorityBackup && !connectedToPriority) { 155 try { 156 doDelay(); 157 if (reconnectTask == null) { 158 return true; 159 } 160 reconnectTask.wakeup(); 161 } catch (InterruptedException e) { 162 LOG.debug("Reconnect task has been interrupted.", e); 163 } 164 } 165 } else { 166 // build backups on the next iteration 167 buildBackup = true; 168 try { 169 if (reconnectTask == null) { 170 return true; 171 } 172 reconnectTask.wakeup(); 173 } catch (InterruptedException e) { 174 LOG.debug("Reconnect task has been interrupted.", e); 175 } 176 } 177 return result; 178 } 179 180 }, "ActiveMQ Failover Worker: " + System.identityHashCode(this)); 181 } 182 183 private void processCommand(Object incoming) { 184 Command command = (Command) incoming; 185 if (command == null) { 186 return; 187 } 188 if (command.isResponse()) { 189 Object object = null; 190 synchronized (requestMap) { 191 object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId())); 192 } 193 if (object != null && object.getClass() == Tracked.class) { 194 ((Tracked) object).onResponses(command); 195 } 196 } 197 198 if (command.isConnectionControl()) { 199 handleConnectionControl((ConnectionControl) command); 200 } else if (command.isConsumerControl()) { 201 ConsumerControl consumerControl = (ConsumerControl)command; 202 if (consumerControl.isClose()) { 203 stateTracker.processRemoveConsumer(consumerControl.getConsumerId(), RemoveInfo.LAST_DELIVERED_UNKNOWN); 204 } 205 } 206 207 if (transportListener != null) { 208 transportListener.onCommand(command); 209 } 210 } 211 212 private TransportListener createTransportListener(final Transport owner) { 213 return new TransportListener() { 214 215 @Override 216 public void onCommand(Object o) { 217 processCommand(o); 218 } 219 220 @Override 221 public void onException(IOException error) { 222 try { 223 handleTransportFailure(owner, error); 224 } catch (InterruptedException e) { 225 Thread.currentThread().interrupt(); 226 if (transportListener != null) { 227 transportListener.onException(new InterruptedIOException()); 228 } 229 } 230 } 231 232 @Override 233 public void transportInterupted() { 234 } 235 236 @Override 237 public void transportResumed() { 238 } 239 }; 240 } 241 242 public final void disposeTransport(Transport transport) { 243 transport.setTransportListener(disposedListener); 244 ServiceSupport.dispose(transport); 245 } 246 247 public final void handleTransportFailure(IOException e) throws InterruptedException { 248 handleTransportFailure(getConnectedTransport(), e); 249 } 250 251 public final void handleTransportFailure(Transport failed, IOException e) throws InterruptedException { 252 if (shuttingDown) { 253 // shutdown info sent and remote socket closed and we see that before a local close 254 // let the close do the work 255 return; 256 } 257 258 if (LOG.isTraceEnabled()) { 259 LOG.trace(this + " handleTransportFailure: " + e, e); 260 } 261 262 // could be blocked in write with the reconnectMutex held, but still needs to be whacked 263 Transport transport = null; 264 265 if (connectedTransport.compareAndSet(failed, null)) { 266 transport = failed; 267 if (transport != null) { 268 disposeTransport(transport); 269 } 270 } 271 272 synchronized (reconnectMutex) { 273 if (transport != null && connectedTransport.get() == null) { 274 boolean reconnectOk = false; 275 276 if (canReconnect()) { 277 reconnectOk = true; 278 } 279 280 LOG.warn("Transport ({}) failed{} attempting to automatically reconnect", 281 connectedTransportURI, (reconnectOk ? "," : ", not"), e); 282 283 failedConnectTransportURI = connectedTransportURI; 284 connectedTransportURI = null; 285 connectedToPriority = false; 286 287 if (reconnectOk) { 288 // notify before any reconnect attempt so ack state can be whacked 289 if (transportListener != null) { 290 transportListener.transportInterupted(); 291 } 292 293 reconnectTask.wakeup(); 294 } else if (!isDisposed()) { 295 propagateFailureToExceptionListener(e); 296 } 297 } 298 } 299 } 300 301 private boolean canReconnect() { 302 return started && 0 != calculateReconnectAttemptLimit(); 303 } 304 305 public final void handleConnectionControl(ConnectionControl control) { 306 String reconnectStr = control.getReconnectTo(); 307 if (LOG.isTraceEnabled()) { 308 LOG.trace("Received ConnectionControl: {}", control); 309 } 310 311 if (reconnectStr != null) { 312 reconnectStr = reconnectStr.trim(); 313 if (reconnectStr.length() > 0) { 314 try { 315 URI uri = new URI(reconnectStr); 316 if (isReconnectSupported()) { 317 reconnect(uri); 318 LOG.info("Reconnected to: " + uri); 319 } 320 } catch (Exception e) { 321 LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e); 322 } 323 } 324 } 325 processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers()); 326 } 327 328 private final void processNewTransports(boolean rebalance, String newTransports) { 329 if (newTransports != null) { 330 newTransports = newTransports.trim(); 331 if (newTransports.length() > 0 && isUpdateURIsSupported()) { 332 List<URI> list = new ArrayList<URI>(); 333 StringTokenizer tokenizer = new StringTokenizer(newTransports, ","); 334 while (tokenizer.hasMoreTokens()) { 335 String str = tokenizer.nextToken(); 336 try { 337 URI uri = new URI(str); 338 list.add(uri); 339 } catch (Exception e) { 340 LOG.error("Failed to parse broker address: " + str, e); 341 } 342 } 343 if (list.isEmpty() == false) { 344 try { 345 updateURIs(rebalance, list.toArray(new URI[list.size()])); 346 } catch (IOException e) { 347 LOG.error("Failed to update transport URI's from: " + newTransports, e); 348 } 349 } 350 } 351 } 352 } 353 354 @Override 355 public void start() throws Exception { 356 synchronized (reconnectMutex) { 357 LOG.debug("Started {}", this); 358 if (started) { 359 return; 360 } 361 started = true; 362 stateTracker.setMaxCacheSize(getMaxCacheSize()); 363 stateTracker.setTrackMessages(isTrackMessages()); 364 stateTracker.setTrackTransactionProducers(isTrackTransactionProducers()); 365 if (connectedTransport.get() != null) { 366 stateTracker.restore(connectedTransport.get()); 367 } else { 368 reconnect(false); 369 } 370 } 371 } 372 373 @Override 374 public void stop() throws Exception { 375 Transport transportToStop = null; 376 List<Transport> backupsToStop = new ArrayList<Transport>(backups.size()); 377 378 try { 379 synchronized (reconnectMutex) { 380 if (LOG.isDebugEnabled()) { 381 LOG.debug("Stopped {}", this); 382 } 383 if (!started) { 384 return; 385 } 386 started = false; 387 disposed = true; 388 389 if (connectedTransport.get() != null) { 390 transportToStop = connectedTransport.getAndSet(null); 391 } 392 reconnectMutex.notifyAll(); 393 } 394 synchronized (sleepMutex) { 395 sleepMutex.notifyAll(); 396 } 397 } finally { 398 reconnectTask.shutdown(); 399 reconnectTaskFactory.shutdownNow(); 400 } 401 402 synchronized(backupMutex) { 403 for (BackupTransport backup : backups) { 404 backup.setDisposed(true); 405 Transport transport = backup.getTransport(); 406 if (transport != null) { 407 transport.setTransportListener(disposedListener); 408 backupsToStop.add(transport); 409 } 410 } 411 backups.clear(); 412 } 413 for (Transport transport : backupsToStop) { 414 try { 415 LOG.trace("Stopped backup: {}", transport); 416 disposeTransport(transport); 417 } catch (Exception e) { 418 } 419 } 420 if (transportToStop != null) { 421 transportToStop.stop(); 422 } 423 } 424 425 public long getInitialReconnectDelay() { 426 return initialReconnectDelay; 427 } 428 429 public void setInitialReconnectDelay(long initialReconnectDelay) { 430 this.initialReconnectDelay = initialReconnectDelay; 431 } 432 433 public long getMaxReconnectDelay() { 434 return maxReconnectDelay; 435 } 436 437 public void setMaxReconnectDelay(long maxReconnectDelay) { 438 this.maxReconnectDelay = maxReconnectDelay; 439 } 440 441 public long getReconnectDelay() { 442 return reconnectDelay; 443 } 444 445 public void setReconnectDelay(long reconnectDelay) { 446 this.reconnectDelay = reconnectDelay; 447 } 448 449 public double getReconnectDelayExponent() { 450 return backOffMultiplier; 451 } 452 453 public void setReconnectDelayExponent(double reconnectDelayExponent) { 454 this.backOffMultiplier = reconnectDelayExponent; 455 } 456 457 public Transport getConnectedTransport() { 458 return connectedTransport.get(); 459 } 460 461 public URI getConnectedTransportURI() { 462 return connectedTransportURI; 463 } 464 465 public int getMaxReconnectAttempts() { 466 return maxReconnectAttempts; 467 } 468 469 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 470 this.maxReconnectAttempts = maxReconnectAttempts; 471 } 472 473 public int getStartupMaxReconnectAttempts() { 474 return this.startupMaxReconnectAttempts; 475 } 476 477 public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) { 478 this.startupMaxReconnectAttempts = startupMaxReconnectAttempts; 479 } 480 481 public long getTimeout() { 482 return timeout; 483 } 484 485 public void setTimeout(long timeout) { 486 this.timeout = timeout; 487 } 488 489 /** 490 * @return Returns the randomize. 491 */ 492 public boolean isRandomize() { 493 return randomize; 494 } 495 496 /** 497 * @param randomize The randomize to set. 498 */ 499 public void setRandomize(boolean randomize) { 500 this.randomize = randomize; 501 } 502 503 public boolean isBackup() { 504 return backup; 505 } 506 507 public void setBackup(boolean backup) { 508 this.backup = backup; 509 } 510 511 public int getBackupPoolSize() { 512 return backupPoolSize; 513 } 514 515 public void setBackupPoolSize(int backupPoolSize) { 516 this.backupPoolSize = backupPoolSize; 517 } 518 519 public int getCurrentBackups() { 520 return this.backups.size(); 521 } 522 523 public boolean isTrackMessages() { 524 return trackMessages; 525 } 526 527 public void setTrackMessages(boolean trackMessages) { 528 this.trackMessages = trackMessages; 529 } 530 531 public boolean isTrackTransactionProducers() { 532 return this.trackTransactionProducers; 533 } 534 535 public void setTrackTransactionProducers(boolean trackTransactionProducers) { 536 this.trackTransactionProducers = trackTransactionProducers; 537 } 538 539 public int getMaxCacheSize() { 540 return maxCacheSize; 541 } 542 543 public void setMaxCacheSize(int maxCacheSize) { 544 this.maxCacheSize = maxCacheSize; 545 } 546 547 public boolean isPriorityBackup() { 548 return priorityBackup; 549 } 550 551 public void setPriorityBackup(boolean priorityBackup) { 552 this.priorityBackup = priorityBackup; 553 } 554 555 public void setPriorityURIs(String priorityURIs) { 556 StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ","); 557 while (tokenizer.hasMoreTokens()) { 558 String str = tokenizer.nextToken(); 559 try { 560 URI uri = new URI(str); 561 priorityList.add(uri); 562 } catch (Exception e) { 563 LOG.error("Failed to parse broker address: " + str, e); 564 } 565 } 566 } 567 568 @Override 569 public void oneway(Object o) throws IOException { 570 571 Command command = (Command) o; 572 Exception error = null; 573 try { 574 575 synchronized (reconnectMutex) { 576 577 if (command != null && connectedTransport.get() == null) { 578 if (command.isShutdownInfo()) { 579 // Skipping send of ShutdownInfo command when not connected. 580 return; 581 } else if (command instanceof RemoveInfo || command.isMessageAck()) { 582 // Simulate response to RemoveInfo command or MessageAck (as it will be stale) 583 stateTracker.track(command); 584 if (command.isResponseRequired()) { 585 Response response = new Response(); 586 response.setCorrelationId(command.getCommandId()); 587 processCommand(response); 588 } 589 return; 590 } else if (command instanceof MessagePull) { 591 // Simulate response to MessagePull if timed as we can't honor that now. 592 MessagePull pullRequest = (MessagePull) command; 593 if (pullRequest.getTimeout() != 0) { 594 MessageDispatch dispatch = new MessageDispatch(); 595 dispatch.setConsumerId(pullRequest.getConsumerId()); 596 dispatch.setDestination(pullRequest.getDestination()); 597 processCommand(dispatch); 598 } 599 return; 600 } 601 } 602 603 // Keep trying until the message is sent. 604 for (int i = 0; !disposed; i++) { 605 try { 606 607 // Wait for transport to be connected. 608 Transport transport = connectedTransport.get(); 609 long start = System.currentTimeMillis(); 610 boolean timedout = false; 611 while (transport == null && !disposed && connectionFailure == null 612 && !Thread.currentThread().isInterrupted() && willReconnect()) { 613 614 LOG.trace("Waiting for transport to reconnect..: {}", command); 615 long end = System.currentTimeMillis(); 616 if (command.isMessage() && timeout > 0 && (end - start > timeout)) { 617 timedout = true; 618 LOG.info("Failover timed out after {} ms", (end - start)); 619 break; 620 } 621 try { 622 reconnectMutex.wait(100); 623 } catch (InterruptedException e) { 624 Thread.currentThread().interrupt(); 625 LOG.debug("Interupted:", e); 626 } 627 transport = connectedTransport.get(); 628 } 629 630 if (transport == null) { 631 // Previous loop may have exited due to use being 632 // disposed. 633 if (disposed) { 634 error = new IOException("Transport disposed."); 635 } else if (connectionFailure != null) { 636 error = connectionFailure; 637 } else if (timedout == true) { 638 error = new IOException("Failover timeout of " + timeout + " ms reached."); 639 } else if (!willReconnect()) { 640 error = new IOException("Reconnect attempts of " + maxReconnectAttempts + " exceeded"); 641 } else { 642 error = new IOException("Unexpected failure."); 643 } 644 break; 645 } 646 647 Tracked tracked = null; 648 try { 649 tracked = stateTracker.track(command); 650 } catch (IOException ioe) { 651 LOG.debug("Cannot track the command {} {}", command, ioe); 652 } 653 // If it was a request and it was not being tracked by 654 // the state tracker, 655 // then hold it in the requestMap so that we can replay 656 // it later. 657 synchronized (requestMap) { 658 if (tracked != null && tracked.isWaitingForResponse()) { 659 requestMap.put(Integer.valueOf(command.getCommandId()), tracked); 660 } else if (tracked == null && command.isResponseRequired()) { 661 requestMap.put(Integer.valueOf(command.getCommandId()), command); 662 } 663 } 664 665 // Send the message. 666 try { 667 transport.oneway(command); 668 stateTracker.trackBack(command); 669 if (command.isShutdownInfo()) { 670 shuttingDown = true; 671 } 672 } catch (IOException e) { 673 674 // If the command was not tracked.. we will retry in 675 // this method 676 if (tracked == null && canReconnect()) { 677 678 // since we will retry in this method.. take it 679 // out of the request 680 // map so that it is not sent 2 times on 681 // recovery 682 if (command.isResponseRequired()) { 683 requestMap.remove(Integer.valueOf(command.getCommandId())); 684 } 685 686 // Rethrow the exception so it will handled by 687 // the outer catch 688 throw e; 689 } else { 690 // Handle the error but allow the method to return since the 691 // tracked commands are replayed on reconnect. 692 LOG.debug("Send oneway attempt: {} failed for command: {}", i, command); 693 handleTransportFailure(e); 694 } 695 } 696 697 return; 698 } catch (IOException e) { 699 LOG.debug("Send oneway attempt: {} failed for command: {}", i, command); 700 handleTransportFailure(e); 701 } 702 } 703 } 704 } catch (InterruptedException e) { 705 // Some one may be trying to stop our thread. 706 Thread.currentThread().interrupt(); 707 throw new InterruptedIOException(); 708 } 709 710 if (!disposed) { 711 if (error != null) { 712 if (error instanceof IOException) { 713 throw (IOException) error; 714 } 715 throw IOExceptionSupport.create(error); 716 } 717 } 718 } 719 720 private boolean willReconnect() { 721 return firstConnection || 0 != calculateReconnectAttemptLimit(); 722 } 723 724 @Override 725 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 726 throw new AssertionError("Unsupported Method"); 727 } 728 729 @Override 730 public Object request(Object command) throws IOException { 731 throw new AssertionError("Unsupported Method"); 732 } 733 734 @Override 735 public Object request(Object command, int timeout) throws IOException { 736 throw new AssertionError("Unsupported Method"); 737 } 738 739 @Override 740 public void add(boolean rebalance, URI u[]) { 741 boolean newURI = false; 742 for (URI uri : u) { 743 if (!contains(uri)) { 744 uris.add(uri); 745 newURI = true; 746 } 747 } 748 if (newURI) { 749 reconnect(rebalance); 750 } 751 } 752 753 @Override 754 public void remove(boolean rebalance, URI u[]) { 755 for (URI uri : u) { 756 uris.remove(uri); 757 } 758 // rebalance is automatic if any connected to removed/stopped broker 759 } 760 761 public void add(boolean rebalance, String u) { 762 try { 763 URI newURI = new URI(u); 764 if (contains(newURI) == false) { 765 uris.add(newURI); 766 reconnect(rebalance); 767 } 768 769 } catch (Exception e) { 770 LOG.error("Failed to parse URI: {}", u); 771 } 772 } 773 774 public void reconnect(boolean rebalance) { 775 synchronized (reconnectMutex) { 776 if (started) { 777 if (rebalance) { 778 doRebalance = true; 779 } 780 LOG.debug("Waking up reconnect task"); 781 try { 782 reconnectTask.wakeup(); 783 } catch (InterruptedException e) { 784 Thread.currentThread().interrupt(); 785 } 786 } else { 787 LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport."); 788 } 789 } 790 } 791 792 private List<URI> getConnectList() { 793 // updated have precedence 794 LinkedHashSet<URI> uniqueUris = new LinkedHashSet<URI>(updated); 795 uniqueUris.addAll(uris); 796 797 boolean removed = false; 798 if (failedConnectTransportURI != null) { 799 removed = uniqueUris.remove(failedConnectTransportURI); 800 } 801 802 ArrayList<URI> l = new ArrayList<URI>(uniqueUris); 803 if (randomize) { 804 // Randomly, reorder the list by random swapping 805 for (int i = 0; i < l.size(); i++) { 806 // meed parenthesis due other JDKs (see AMQ-4826) 807 int p = ((int) (Math.random() * 100)) % l.size(); 808 URI t = l.get(p); 809 l.set(p, l.get(i)); 810 l.set(i, t); 811 } 812 } 813 if (removed) { 814 l.add(failedConnectTransportURI); 815 } 816 817 LOG.debug("urlList connectionList:{}, from: {}", l, uniqueUris); 818 819 return l; 820 } 821 822 @Override 823 public TransportListener getTransportListener() { 824 return transportListener; 825 } 826 827 @Override 828 public void setTransportListener(TransportListener commandListener) { 829 synchronized (listenerMutex) { 830 this.transportListener = commandListener; 831 listenerMutex.notifyAll(); 832 } 833 } 834 835 @Override 836 public <T> T narrow(Class<T> target) { 837 838 if (target.isAssignableFrom(getClass())) { 839 return target.cast(this); 840 } 841 Transport transport = connectedTransport.get(); 842 if (transport != null) { 843 return transport.narrow(target); 844 } 845 return null; 846 847 } 848 849 protected void restoreTransport(Transport t) throws Exception, IOException { 850 t.start(); 851 // send information to the broker - informing it we are an ft client 852 ConnectionControl cc = new ConnectionControl(); 853 cc.setFaultTolerant(true); 854 t.oneway(cc); 855 stateTracker.restore(t); 856 Map<Integer, Command> tmpMap = null; 857 synchronized (requestMap) { 858 tmpMap = new LinkedHashMap<Integer, Command>(requestMap); 859 } 860 for (Command command : tmpMap.values()) { 861 LOG.trace("restore requestMap, replay: {}", command); 862 t.oneway(command); 863 } 864 } 865 866 public boolean isUseExponentialBackOff() { 867 return useExponentialBackOff; 868 } 869 870 public void setUseExponentialBackOff(boolean useExponentialBackOff) { 871 this.useExponentialBackOff = useExponentialBackOff; 872 } 873 874 @Override 875 public String toString() { 876 return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString(); 877 } 878 879 @Override 880 public String getRemoteAddress() { 881 Transport transport = connectedTransport.get(); 882 if (transport != null) { 883 return transport.getRemoteAddress(); 884 } 885 return null; 886 } 887 888 @Override 889 public boolean isFaultTolerant() { 890 return true; 891 } 892 893 private void doUpdateURIsFromDisk() { 894 // If updateURIsURL is specified, read the file and add any new 895 // transport URI's to this FailOverTransport. 896 // Note: Could track file timestamp to avoid unnecessary reading. 897 String fileURL = getUpdateURIsURL(); 898 if (fileURL != null) { 899 BufferedReader in = null; 900 String newUris = null; 901 StringBuffer buffer = new StringBuffer(); 902 903 try { 904 in = new BufferedReader(getURLStream(fileURL)); 905 while (true) { 906 String line = in.readLine(); 907 if (line == null) { 908 break; 909 } 910 buffer.append(line); 911 } 912 newUris = buffer.toString(); 913 } catch (IOException ioe) { 914 LOG.error("Failed to read updateURIsURL: {} {}",fileURL, ioe); 915 } finally { 916 if (in != null) { 917 try { 918 in.close(); 919 } catch (IOException ioe) { 920 // ignore 921 } 922 } 923 } 924 925 processNewTransports(isRebalanceUpdateURIs(), newUris); 926 } 927 } 928 929 final boolean doReconnect() { 930 Exception failure = null; 931 synchronized (reconnectMutex) { 932 List<URI> connectList = null; 933 // First ensure we are up to date. 934 doUpdateURIsFromDisk(); 935 936 if (disposed || connectionFailure != null) { 937 reconnectMutex.notifyAll(); 938 } 939 if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) { 940 return false; 941 } else { 942 connectList = getConnectList(); 943 if (connectList.isEmpty()) { 944 failure = new IOException("No uris available to connect to."); 945 } else { 946 if (doRebalance) { 947 if (connectedToPriority || compareURIs(connectList.get(0), connectedTransportURI)) { 948 // already connected to first in the list, no need to rebalance 949 doRebalance = false; 950 return false; 951 } else { 952 LOG.debug("Doing rebalance from: {} to {}", connectedTransportURI, connectList); 953 954 try { 955 Transport transport = this.connectedTransport.getAndSet(null); 956 if (transport != null) { 957 disposeTransport(transport); 958 } 959 } catch (Exception e) { 960 LOG.debug("Caught an exception stopping existing transport for rebalance", e); 961 } 962 } 963 doRebalance = false; 964 } 965 966 resetReconnectDelay(); 967 968 Transport transport = null; 969 URI uri = null; 970 971 // If we have a backup already waiting lets try it. 972 synchronized (backupMutex) { 973 if ((priorityBackup || backup) && !backups.isEmpty()) { 974 ArrayList<BackupTransport> l = new ArrayList<BackupTransport>(backups); 975 if (randomize) { 976 Collections.shuffle(l); 977 } 978 BackupTransport bt = l.remove(0); 979 backups.remove(bt); 980 transport = bt.getTransport(); 981 uri = bt.getUri(); 982 processCommand(bt.getBrokerInfo()); 983 if (priorityBackup && priorityBackupAvailable) { 984 Transport old = this.connectedTransport.getAndSet(null); 985 if (old != null) { 986 disposeTransport(old); 987 } 988 priorityBackupAvailable = false; 989 } 990 } 991 } 992 993 // When there was no backup and we are reconnecting for the first time 994 // we honor the initialReconnectDelay before trying a new connection, after 995 // this normal reconnect delay happens following a failed attempt. 996 if (transport == null && !firstConnection && connectFailures == 0 && initialReconnectDelay > 0 && !disposed) { 997 // reconnectDelay will be equal to initialReconnectDelay since we are on 998 // the first connect attempt after we had a working connection, doDelay 999 // will apply updates to move to the next reconnectDelay value based on 1000 // configuration. 1001 doDelay(); 1002 } 1003 1004 Iterator<URI> iter = connectList.iterator(); 1005 while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) { 1006 1007 try { 1008 SslContext.setCurrentSslContext(brokerSslContext); 1009 1010 // We could be starting with a backup and if so we wait to grab a 1011 // URI from the pool until next time around. 1012 if (transport == null) { 1013 uri = addExtraQueryOptions(iter.next()); 1014 transport = TransportFactory.compositeConnect(uri); 1015 } 1016 1017 LOG.debug("Attempting {}th connect to: {}", connectFailures, uri); 1018 1019 transport.setTransportListener(createTransportListener(transport)); 1020 transport.start(); 1021 1022 if (started && !firstConnection) { 1023 restoreTransport(transport); 1024 } 1025 1026 LOG.debug("Connection established"); 1027 1028 reconnectDelay = initialReconnectDelay; 1029 connectedTransportURI = uri; 1030 connectedTransport.set(transport); 1031 connectedToPriority = isPriority(connectedTransportURI); 1032 reconnectMutex.notifyAll(); 1033 connectFailures = 0; 1034 1035 // Make sure on initial startup, that the transportListener 1036 // has been initialized for this instance. 1037 synchronized (listenerMutex) { 1038 if (transportListener == null) { 1039 try { 1040 // if it isn't set after 2secs - it probably never will be 1041 listenerMutex.wait(2000); 1042 } catch (InterruptedException ex) { 1043 } 1044 } 1045 } 1046 1047 if (transportListener != null) { 1048 transportListener.transportResumed(); 1049 } else { 1050 LOG.debug("transport resumed by transport listener not set"); 1051 } 1052 1053 if (firstConnection) { 1054 firstConnection = false; 1055 LOG.info("Successfully connected to {}", uri); 1056 } else { 1057 LOG.info("Successfully reconnected to {}", uri); 1058 } 1059 1060 return false; 1061 } catch (Exception e) { 1062 failure = e; 1063 LOG.debug("Connect fail to: {}, reason: {}", uri, e); 1064 if (transport != null) { 1065 try { 1066 transport.stop(); 1067 transport = null; 1068 } catch (Exception ee) { 1069 LOG.debug("Stop of failed transport: {} failed with reason: {}", transport, ee); 1070 } 1071 } 1072 } finally { 1073 SslContext.setCurrentSslContext(null); 1074 } 1075 } 1076 } 1077 } 1078 1079 int reconnectLimit = calculateReconnectAttemptLimit(); 1080 1081 connectFailures++; 1082 if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) { 1083 LOG.error("Failed to connect to {} after: {} attempt(s)", connectList, connectFailures); 1084 connectionFailure = failure; 1085 1086 // Make sure on initial startup, that the transportListener has been 1087 // initialized for this instance. 1088 synchronized (listenerMutex) { 1089 if (transportListener == null) { 1090 try { 1091 listenerMutex.wait(2000); 1092 } catch (InterruptedException ex) { 1093 } 1094 } 1095 } 1096 1097 propagateFailureToExceptionListener(connectionFailure); 1098 return false; 1099 } 1100 1101 int warnInterval = getWarnAfterReconnectAttempts(); 1102 if (warnInterval > 0 && (connectFailures == 1 || (connectFailures % warnInterval) == 0)) { 1103 LOG.warn("Failed to connect to {} after: {} attempt(s) with {}, continuing to retry.", 1104 connectList, connectFailures, (failure == null ? "?" : failure.getLocalizedMessage())); 1105 } 1106 } 1107 1108 if (!disposed) { 1109 doDelay(); 1110 } 1111 1112 return !disposed; 1113 } 1114 1115 private void doDelay() { 1116 if (reconnectDelay > 0) { 1117 synchronized (sleepMutex) { 1118 LOG.debug("Waiting {} ms before attempting connection", reconnectDelay); 1119 try { 1120 sleepMutex.wait(reconnectDelay); 1121 } catch (InterruptedException e) { 1122 Thread.currentThread().interrupt(); 1123 } 1124 } 1125 } 1126 1127 if (useExponentialBackOff) { 1128 // Exponential increment of reconnect delay. 1129 reconnectDelay *= backOffMultiplier; 1130 if (reconnectDelay > maxReconnectDelay) { 1131 reconnectDelay = maxReconnectDelay; 1132 } 1133 } 1134 } 1135 1136 private void resetReconnectDelay() { 1137 if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) { 1138 reconnectDelay = initialReconnectDelay; 1139 } 1140 } 1141 1142 /* 1143 * called with reconnectMutex held 1144 */ 1145 private void propagateFailureToExceptionListener(Exception exception) { 1146 if (transportListener != null) { 1147 if (exception instanceof IOException) { 1148 transportListener.onException((IOException)exception); 1149 } else { 1150 transportListener.onException(IOExceptionSupport.create(exception)); 1151 } 1152 } 1153 reconnectMutex.notifyAll(); 1154 } 1155 1156 private int calculateReconnectAttemptLimit() { 1157 int maxReconnectValue = this.maxReconnectAttempts; 1158 if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) { 1159 maxReconnectValue = this.startupMaxReconnectAttempts; 1160 } 1161 return maxReconnectValue; 1162 } 1163 1164 private boolean shouldBuildBackups() { 1165 return (backup && backups.size() < backupPoolSize) || (priorityBackup && !(priorityBackupAvailable || connectedToPriority)); 1166 } 1167 1168 final boolean buildBackups() { 1169 synchronized (backupMutex) { 1170 if (!disposed && shouldBuildBackups()) { 1171 ArrayList<URI> backupList = new ArrayList<URI>(priorityList); 1172 List<URI> connectList = getConnectList(); 1173 for (URI uri: connectList) { 1174 if (!backupList.contains(uri)) { 1175 backupList.add(uri); 1176 } 1177 } 1178 // removed disposed backups 1179 List<BackupTransport> disposedList = new ArrayList<BackupTransport>(); 1180 for (BackupTransport bt : backups) { 1181 if (bt.isDisposed()) { 1182 disposedList.add(bt); 1183 } 1184 } 1185 backups.removeAll(disposedList); 1186 disposedList.clear(); 1187 for (Iterator<URI> iter = backupList.iterator(); !disposed && iter.hasNext() && shouldBuildBackups(); ) { 1188 URI uri = addExtraQueryOptions(iter.next()); 1189 if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) { 1190 try { 1191 SslContext.setCurrentSslContext(brokerSslContext); 1192 BackupTransport bt = new BackupTransport(this); 1193 bt.setUri(uri); 1194 if (!backups.contains(bt)) { 1195 Transport t = TransportFactory.compositeConnect(uri); 1196 t.setTransportListener(bt); 1197 t.start(); 1198 bt.setTransport(t); 1199 if (priorityBackup && isPriority(uri)) { 1200 priorityBackupAvailable = true; 1201 backups.add(0, bt); 1202 // if this priority backup overflows the pool 1203 // remove the backup with the lowest priority 1204 if (backups.size() > backupPoolSize) { 1205 BackupTransport disposeTransport = backups.remove(backups.size() - 1); 1206 disposeTransport.setDisposed(true); 1207 Transport transport = disposeTransport.getTransport(); 1208 if (transport != null) { 1209 transport.setTransportListener(disposedListener); 1210 disposeTransport(transport); 1211 } 1212 } 1213 } else { 1214 backups.add(bt); 1215 } 1216 } 1217 } catch (Exception e) { 1218 LOG.debug("Failed to build backup ", e); 1219 } finally { 1220 SslContext.setCurrentSslContext(null); 1221 } 1222 } 1223 } 1224 } 1225 } 1226 return false; 1227 } 1228 1229 protected boolean isPriority(URI uri) { 1230 if (!priorityBackup) { 1231 return false; 1232 } 1233 1234 if (!priorityList.isEmpty()) { 1235 return priorityList.contains(uri); 1236 } 1237 return uris.indexOf(uri) == 0; 1238 } 1239 1240 @Override 1241 public boolean isDisposed() { 1242 return disposed; 1243 } 1244 1245 @Override 1246 public boolean isConnected() { 1247 return connectedTransport.get() != null; 1248 } 1249 1250 @Override 1251 public void reconnect(URI uri) throws IOException { 1252 add(true, new URI[]{uri}); 1253 } 1254 1255 @Override 1256 public boolean isReconnectSupported() { 1257 return this.reconnectSupported; 1258 } 1259 1260 public void setReconnectSupported(boolean value) { 1261 this.reconnectSupported = value; 1262 } 1263 1264 @Override 1265 public boolean isUpdateURIsSupported() { 1266 return this.updateURIsSupported; 1267 } 1268 1269 public void setUpdateURIsSupported(boolean value) { 1270 this.updateURIsSupported = value; 1271 } 1272 1273 @Override 1274 public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException { 1275 if (isUpdateURIsSupported()) { 1276 HashSet<URI> copy = new HashSet<URI>(); 1277 synchronized (reconnectMutex) { 1278 copy.addAll(this.updated); 1279 updated.clear(); 1280 if (updatedURIs != null && updatedURIs.length > 0) { 1281 for (URI uri : updatedURIs) { 1282 if (uri != null && !updated.contains(uri)) { 1283 updated.add(uri); 1284 if (failedConnectTransportURI != null && failedConnectTransportURI.equals(uri)) { 1285 failedConnectTransportURI = null; 1286 } 1287 } 1288 } 1289 } 1290 } 1291 if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(new HashSet<URI>(updated))) { 1292 buildBackups(); 1293 reconnect(rebalance); 1294 } 1295 } 1296 } 1297 1298 /** 1299 * @return the updateURIsURL 1300 */ 1301 public String getUpdateURIsURL() { 1302 return this.updateURIsURL; 1303 } 1304 1305 /** 1306 * @param updateURIsURL the updateURIsURL to set 1307 */ 1308 public void setUpdateURIsURL(String updateURIsURL) { 1309 this.updateURIsURL = updateURIsURL; 1310 } 1311 1312 /** 1313 * @return the rebalanceUpdateURIs 1314 */ 1315 public boolean isRebalanceUpdateURIs() { 1316 return this.rebalanceUpdateURIs; 1317 } 1318 1319 /** 1320 * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set 1321 */ 1322 public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) { 1323 this.rebalanceUpdateURIs = rebalanceUpdateURIs; 1324 } 1325 1326 @Override 1327 public int getReceiveCounter() { 1328 Transport transport = connectedTransport.get(); 1329 if (transport == null) { 1330 return 0; 1331 } 1332 return transport.getReceiveCounter(); 1333 } 1334 1335 public int getConnectFailures() { 1336 return connectFailures; 1337 } 1338 1339 public void connectionInterruptProcessingComplete(ConnectionId connectionId) { 1340 synchronized (reconnectMutex) { 1341 stateTracker.connectionInterruptProcessingComplete(this, connectionId); 1342 } 1343 } 1344 1345 public ConnectionStateTracker getStateTracker() { 1346 return stateTracker; 1347 } 1348 1349 private boolean contains(URI newURI) { 1350 boolean result = false; 1351 for (URI uri : uris) { 1352 if (compareURIs(newURI, uri)) { 1353 result = true; 1354 break; 1355 } 1356 } 1357 1358 return result; 1359 } 1360 1361 private boolean compareURIs(final URI first, final URI second) { 1362 1363 boolean result = false; 1364 if (first == null || second == null) { 1365 return result; 1366 } 1367 1368 if (first.getPort() == second.getPort()) { 1369 InetAddress firstAddr = null; 1370 InetAddress secondAddr = null; 1371 try { 1372 firstAddr = InetAddress.getByName(first.getHost()); 1373 secondAddr = InetAddress.getByName(second.getHost()); 1374 1375 if (firstAddr.equals(secondAddr)) { 1376 result = true; 1377 } 1378 1379 } catch(IOException e) { 1380 1381 if (firstAddr == null) { 1382 LOG.error("Failed to Lookup INetAddress for URI[{}] : {}", first, e); 1383 } else { 1384 LOG.error("Failed to Lookup INetAddress for URI[{}] : {}", second, e); 1385 } 1386 1387 if (first.getHost().equalsIgnoreCase(second.getHost())) { 1388 result = true; 1389 } 1390 } 1391 } 1392 1393 return result; 1394 } 1395 1396 private InputStreamReader getURLStream(String path) throws IOException { 1397 InputStreamReader result = null; 1398 URL url = null; 1399 try { 1400 url = new URL(path); 1401 result = new InputStreamReader(url.openStream()); 1402 } catch (MalformedURLException e) { 1403 // ignore - it could be a path to a a local file 1404 } 1405 if (result == null) { 1406 result = new FileReader(path); 1407 } 1408 return result; 1409 } 1410 1411 private URI addExtraQueryOptions(URI uri) { 1412 try { 1413 if( nestedExtraQueryOptions!=null && !nestedExtraQueryOptions.isEmpty() ) { 1414 if( uri.getQuery() == null ) { 1415 uri = URISupport.createURIWithQuery(uri, nestedExtraQueryOptions); 1416 } else { 1417 uri = URISupport.createURIWithQuery(uri, uri.getQuery()+"&"+nestedExtraQueryOptions); 1418 } 1419 } 1420 } catch (URISyntaxException e) { 1421 throw new RuntimeException(e); 1422 } 1423 return uri; 1424 } 1425 1426 public void setNestedExtraQueryOptions(String nestedExtraQueryOptions) { 1427 this.nestedExtraQueryOptions = nestedExtraQueryOptions; 1428 } 1429 1430 public int getWarnAfterReconnectAttempts() { 1431 return warnAfterReconnectAttempts; 1432 } 1433 1434 /** 1435 * Sets the number of Connect / Reconnect attempts that must occur before a warn message 1436 * is logged indicating that the transport is not connected. This can be useful when the 1437 * client is running inside some container or service as it give an indication of some 1438 * problem with the client connection that might not otherwise be visible. To disable the 1439 * log messages this value should be set to a value @{code attempts <= 0} 1440 * 1441 * @param warnAfterReconnectAttempts 1442 * The number of failed connection attempts that must happen before a warning is logged. 1443 */ 1444 public void setWarnAfterReconnectAttempts(int warnAfterReconnectAttempts) { 1445 this.warnAfterReconnectAttempts = warnAfterReconnectAttempts; 1446 } 1447 1448}