001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.fanout; 018 019import java.io.IOException; 020import java.io.InterruptedIOException; 021import java.net.URI; 022import java.util.ArrayList; 023import java.util.Iterator; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ConcurrentMap; 026import java.util.concurrent.atomic.AtomicInteger; 027 028import org.apache.activemq.command.Command; 029import org.apache.activemq.command.ConsumerInfo; 030import org.apache.activemq.command.Message; 031import org.apache.activemq.command.RemoveInfo; 032import org.apache.activemq.command.Response; 033import org.apache.activemq.state.ConnectionStateTracker; 034import org.apache.activemq.thread.Task; 035import org.apache.activemq.thread.TaskRunner; 036import org.apache.activemq.thread.TaskRunnerFactory; 037import org.apache.activemq.transport.CompositeTransport; 038import org.apache.activemq.transport.DefaultTransportListener; 039import org.apache.activemq.transport.FutureResponse; 040import org.apache.activemq.transport.ResponseCallback; 041import org.apache.activemq.transport.Transport; 042import org.apache.activemq.transport.TransportFactory; 043import org.apache.activemq.transport.TransportListener; 044import org.apache.activemq.util.IOExceptionSupport; 045import org.apache.activemq.util.ServiceStopper; 046import org.apache.activemq.util.ServiceSupport; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050/** 051 * A Transport that fans out a connection to multiple brokers. 052 * 053 * 054 */ 055public class FanoutTransport implements CompositeTransport { 056 057 private static final Logger LOG = LoggerFactory.getLogger(FanoutTransport.class); 058 059 private TransportListener transportListener; 060 private boolean disposed; 061 private boolean connected; 062 063 private final Object reconnectMutex = new Object(); 064 private final ConnectionStateTracker stateTracker = new ConnectionStateTracker(); 065 private final ConcurrentMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap<Integer, RequestCounter>(); 066 067 private final TaskRunnerFactory reconnectTaskFactory; 068 private final TaskRunner reconnectTask; 069 private boolean started; 070 071 private final ArrayList<FanoutTransportHandler> transports = new ArrayList<FanoutTransportHandler>(); 072 private int connectedCount; 073 074 private int minAckCount = 2; 075 076 private long initialReconnectDelay = 10; 077 private long maxReconnectDelay = 1000 * 30; 078 private long backOffMultiplier = 2; 079 private final boolean useExponentialBackOff = true; 080 private int maxReconnectAttempts; 081 private Exception connectionFailure; 082 private FanoutTransportHandler primary; 083 private boolean fanOutQueues = false; 084 085 static class RequestCounter { 086 087 final Command command; 088 final AtomicInteger ackCount; 089 090 RequestCounter(Command command, int count) { 091 this.command = command; 092 this.ackCount = new AtomicInteger(count); 093 } 094 095 @Override 096 public String toString() { 097 return command.getCommandId() + "=" + ackCount.get(); 098 } 099 } 100 101 class FanoutTransportHandler extends DefaultTransportListener { 102 103 private final URI uri; 104 private Transport transport; 105 106 private int connectFailures; 107 private long reconnectDelay = initialReconnectDelay; 108 private long reconnectDate; 109 110 public FanoutTransportHandler(URI uri) { 111 this.uri = uri; 112 } 113 114 @Override 115 public void onCommand(Object o) { 116 Command command = (Command)o; 117 if (command.isResponse()) { 118 Integer id = new Integer(((Response)command).getCorrelationId()); 119 RequestCounter rc = requestMap.get(id); 120 if (rc != null) { 121 if (rc.ackCount.decrementAndGet() <= 0) { 122 requestMap.remove(id); 123 transportListenerOnCommand(command); 124 } 125 } else { 126 transportListenerOnCommand(command); 127 } 128 } else { 129 transportListenerOnCommand(command); 130 } 131 } 132 133 @Override 134 public void onException(IOException error) { 135 try { 136 synchronized (reconnectMutex) { 137 if (transport == null || !transport.isConnected()) { 138 return; 139 } 140 141 LOG.debug("Transport failed, starting up reconnect task", error); 142 143 ServiceSupport.dispose(transport); 144 transport = null; 145 connectedCount--; 146 if (primary == this) { 147 primary = null; 148 } 149 reconnectTask.wakeup(); 150 } 151 } catch (InterruptedException e) { 152 Thread.currentThread().interrupt(); 153 if (transportListener != null) { 154 transportListener.onException(new InterruptedIOException()); 155 } 156 } 157 } 158 } 159 160 public FanoutTransport() { 161 // Setup a task that is used to reconnect the a connection async. 162 reconnectTaskFactory = new TaskRunnerFactory(); 163 reconnectTaskFactory.init(); 164 reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() { 165 @Override 166 public boolean iterate() { 167 return doConnect(); 168 } 169 }, "ActiveMQ Fanout Worker: " + System.identityHashCode(this)); 170 } 171 172 /** 173 * @return 174 */ 175 private boolean doConnect() { 176 long closestReconnectDate = 0; 177 synchronized (reconnectMutex) { 178 179 if (disposed || connectionFailure != null) { 180 reconnectMutex.notifyAll(); 181 } 182 183 if (transports.size() == connectedCount || disposed || connectionFailure != null) { 184 return false; 185 } else { 186 187 if (transports.isEmpty()) { 188 // connectionFailure = new IOException("No uris available to 189 // connect to."); 190 } else { 191 192 // Try to connect them up. 193 Iterator<FanoutTransportHandler> iter = transports.iterator(); 194 for (int i = 0; iter.hasNext() && !disposed; i++) { 195 196 long now = System.currentTimeMillis(); 197 198 FanoutTransportHandler fanoutHandler = iter.next(); 199 if (fanoutHandler.transport != null) { 200 continue; 201 } 202 203 // Are we waiting a little to try to reconnect this one? 204 if (fanoutHandler.reconnectDate != 0 && fanoutHandler.reconnectDate > now) { 205 if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) { 206 closestReconnectDate = fanoutHandler.reconnectDate; 207 } 208 continue; 209 } 210 211 URI uri = fanoutHandler.uri; 212 try { 213 LOG.debug("Stopped: " + this); 214 LOG.debug("Attempting connect to: " + uri); 215 Transport t = TransportFactory.compositeConnect(uri); 216 fanoutHandler.transport = t; 217 t.setTransportListener(fanoutHandler); 218 if (started) { 219 restoreTransport(fanoutHandler); 220 } 221 LOG.debug("Connection established"); 222 fanoutHandler.reconnectDelay = initialReconnectDelay; 223 fanoutHandler.connectFailures = 0; 224 if (primary == null) { 225 primary = fanoutHandler; 226 } 227 connectedCount++; 228 } catch (Exception e) { 229 LOG.debug("Connect fail to: " + uri + ", reason: " + e); 230 231 if( fanoutHandler.transport !=null ) { 232 ServiceSupport.dispose(fanoutHandler.transport); 233 fanoutHandler.transport=null; 234 } 235 236 if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) { 237 LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)"); 238 connectionFailure = e; 239 reconnectMutex.notifyAll(); 240 return false; 241 } else { 242 243 if (useExponentialBackOff) { 244 // Exponential increment of reconnect delay. 245 fanoutHandler.reconnectDelay *= backOffMultiplier; 246 if (fanoutHandler.reconnectDelay > maxReconnectDelay) { 247 fanoutHandler.reconnectDelay = maxReconnectDelay; 248 } 249 } 250 251 fanoutHandler.reconnectDate = now + fanoutHandler.reconnectDelay; 252 253 if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) { 254 closestReconnectDate = fanoutHandler.reconnectDate; 255 } 256 } 257 } 258 } 259 if (transports.size() == connectedCount || disposed) { 260 reconnectMutex.notifyAll(); 261 return false; 262 } 263 264 } 265 } 266 267 } 268 269 try { 270 long reconnectDelay = closestReconnectDate - System.currentTimeMillis(); 271 if (reconnectDelay > 0) { 272 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); 273 Thread.sleep(reconnectDelay); 274 } 275 } catch (InterruptedException e1) { 276 Thread.currentThread().interrupt(); 277 } 278 return true; 279 } 280 281 @Override 282 public void start() throws Exception { 283 synchronized (reconnectMutex) { 284 LOG.debug("Started."); 285 if (started) { 286 return; 287 } 288 started = true; 289 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 290 FanoutTransportHandler th = iter.next(); 291 if (th.transport != null) { 292 restoreTransport(th); 293 } 294 } 295 connected=true; 296 } 297 } 298 299 @Override 300 public void stop() throws Exception { 301 try { 302 synchronized (reconnectMutex) { 303 ServiceStopper ss = new ServiceStopper(); 304 305 if (!started) { 306 return; 307 } 308 started = false; 309 disposed = true; 310 connected=false; 311 312 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 313 FanoutTransportHandler th = iter.next(); 314 if (th.transport != null) { 315 ss.stop(th.transport); 316 } 317 } 318 319 LOG.debug("Stopped: " + this); 320 ss.throwFirstException(); 321 } 322 } finally { 323 reconnectTask.shutdown(); 324 reconnectTaskFactory.shutdownNow(); 325 } 326 } 327 328 public int getMinAckCount() { 329 return minAckCount; 330 } 331 332 public void setMinAckCount(int minAckCount) { 333 this.minAckCount = minAckCount; 334 } 335 336 public long getInitialReconnectDelay() { 337 return initialReconnectDelay; 338 } 339 340 public void setInitialReconnectDelay(long initialReconnectDelay) { 341 this.initialReconnectDelay = initialReconnectDelay; 342 } 343 344 public long getMaxReconnectDelay() { 345 return maxReconnectDelay; 346 } 347 348 public void setMaxReconnectDelay(long maxReconnectDelay) { 349 this.maxReconnectDelay = maxReconnectDelay; 350 } 351 352 public long getReconnectDelayExponent() { 353 return backOffMultiplier; 354 } 355 356 public void setReconnectDelayExponent(long reconnectDelayExponent) { 357 this.backOffMultiplier = reconnectDelayExponent; 358 } 359 360 public int getMaxReconnectAttempts() { 361 return maxReconnectAttempts; 362 } 363 364 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 365 this.maxReconnectAttempts = maxReconnectAttempts; 366 } 367 368 @Override 369 public void oneway(Object o) throws IOException { 370 final Command command = (Command)o; 371 try { 372 synchronized (reconnectMutex) { 373 374 // Wait for transport to be connected. 375 while (connectedCount < minAckCount && !disposed && connectionFailure == null) { 376 LOG.debug("Waiting for at least " + minAckCount + " transports to be connected."); 377 reconnectMutex.wait(1000); 378 } 379 380 // Still not fully connected. 381 if (connectedCount < minAckCount) { 382 383 Exception error; 384 385 // Throw the right kind of error.. 386 if (disposed) { 387 error = new IOException("Transport disposed."); 388 } else if (connectionFailure != null) { 389 error = connectionFailure; 390 } else { 391 error = new IOException("Unexpected failure."); 392 } 393 394 if (error instanceof IOException) { 395 throw (IOException)error; 396 } 397 throw IOExceptionSupport.create(error); 398 } 399 400 // If it was a request and it was not being tracked by 401 // the state tracker, 402 // then hold it in the requestMap so that we can replay 403 // it later. 404 boolean fanout = isFanoutCommand(command); 405 if (stateTracker.track(command) == null && command.isResponseRequired()) { 406 int size = fanout ? minAckCount : 1; 407 requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size)); 408 } 409 410 // Send the message. 411 if (fanout) { 412 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 413 FanoutTransportHandler th = iter.next(); 414 if (th.transport != null) { 415 try { 416 th.transport.oneway(command); 417 } catch (IOException e) { 418 LOG.debug("Send attempt: failed."); 419 th.onException(e); 420 } 421 } 422 } 423 } else { 424 try { 425 primary.transport.oneway(command); 426 } catch (IOException e) { 427 LOG.debug("Send attempt: failed."); 428 primary.onException(e); 429 } 430 } 431 432 } 433 } catch (InterruptedException e) { 434 // Some one may be trying to stop our thread. 435 Thread.currentThread().interrupt(); 436 throw new InterruptedIOException(); 437 } 438 } 439 440 /** 441 * @param command 442 * @return 443 */ 444 private boolean isFanoutCommand(Command command) { 445 if (command.isMessage()) { 446 if( fanOutQueues ) { 447 return true; 448 } 449 return ((Message)command).getDestination().isTopic(); 450 } 451 if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE || 452 command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) { 453 return false; 454 } 455 return true; 456 } 457 458 @Override 459 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 460 throw new AssertionError("Unsupported Method"); 461 } 462 463 @Override 464 public Object request(Object command) throws IOException { 465 throw new AssertionError("Unsupported Method"); 466 } 467 468 @Override 469 public Object request(Object command, int timeout) throws IOException { 470 throw new AssertionError("Unsupported Method"); 471 } 472 473 public void reconnect() { 474 LOG.debug("Waking up reconnect task"); 475 try { 476 reconnectTask.wakeup(); 477 } catch (InterruptedException e) { 478 Thread.currentThread().interrupt(); 479 } 480 } 481 482 @Override 483 public TransportListener getTransportListener() { 484 return transportListener; 485 } 486 487 @Override 488 public void setTransportListener(TransportListener commandListener) { 489 this.transportListener = commandListener; 490 } 491 492 @Override 493 public <T> T narrow(Class<T> target) { 494 495 if (target.isAssignableFrom(getClass())) { 496 return target.cast(this); 497 } 498 499 synchronized (reconnectMutex) { 500 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 501 FanoutTransportHandler th = iter.next(); 502 if (th.transport != null) { 503 T rc = th.transport.narrow(target); 504 if (rc != null) { 505 return rc; 506 } 507 } 508 } 509 } 510 511 return null; 512 513 } 514 515 protected void restoreTransport(FanoutTransportHandler th) throws Exception, IOException { 516 th.transport.start(); 517 stateTracker.setRestoreConsumers(th.transport == primary); 518 stateTracker.restore(th.transport); 519 for (Iterator<RequestCounter> iter2 = requestMap.values().iterator(); iter2.hasNext();) { 520 RequestCounter rc = iter2.next(); 521 th.transport.oneway(rc.command); 522 } 523 } 524 525 @Override 526 public void add(boolean reblance,URI uris[]) { 527 528 synchronized (reconnectMutex) { 529 for (int i = 0; i < uris.length; i++) { 530 URI uri = uris[i]; 531 532 boolean match = false; 533 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 534 FanoutTransportHandler th = iter.next(); 535 if (th.uri.equals(uri)) { 536 match = true; 537 break; 538 } 539 } 540 if (!match) { 541 FanoutTransportHandler th = new FanoutTransportHandler(uri); 542 transports.add(th); 543 reconnect(); 544 } 545 } 546 } 547 548 } 549 550 @Override 551 public void remove(boolean rebalance,URI uris[]) { 552 553 synchronized (reconnectMutex) { 554 for (int i = 0; i < uris.length; i++) { 555 URI uri = uris[i]; 556 557 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 558 FanoutTransportHandler th = iter.next(); 559 if (th.uri.equals(uri)) { 560 if (th.transport != null) { 561 ServiceSupport.dispose(th.transport); 562 connectedCount--; 563 } 564 iter.remove(); 565 break; 566 } 567 } 568 } 569 } 570 571 } 572 573 @Override 574 public void reconnect(URI uri) throws IOException { 575 add(true,new URI[]{uri}); 576 577 } 578 579 @Override 580 public boolean isReconnectSupported() { 581 return true; 582 } 583 584 @Override 585 public boolean isUpdateURIsSupported() { 586 return true; 587 } 588 @Override 589 public void updateURIs(boolean reblance,URI[] uris) throws IOException { 590 add(reblance,uris); 591 } 592 593 594 @Override 595 public String getRemoteAddress() { 596 if (primary != null) { 597 if (primary.transport != null) { 598 return primary.transport.getRemoteAddress(); 599 } 600 } 601 return null; 602 } 603 604 protected void transportListenerOnCommand(Command command) { 605 if (transportListener != null) { 606 transportListener.onCommand(command); 607 } 608 } 609 610 @Override 611 public boolean isFaultTolerant() { 612 return true; 613 } 614 615 public boolean isFanOutQueues() { 616 return fanOutQueues; 617 } 618 619 public void setFanOutQueues(boolean fanOutQueues) { 620 this.fanOutQueues = fanOutQueues; 621 } 622 623 @Override 624 public boolean isDisposed() { 625 return disposed; 626 } 627 628 629 @Override 630 public boolean isConnected() { 631 return connected; 632 } 633 634 @Override 635 public int getReceiveCounter() { 636 int rc = 0; 637 synchronized (reconnectMutex) { 638 for (FanoutTransportHandler th : transports) { 639 if (th.transport != null) { 640 rc += th.transport.getReceiveCounter(); 641 } 642 } 643 } 644 return rc; 645 } 646}