001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.io.IOException; 020import java.security.GeneralSecurityException; 021import java.security.cert.X509Certificate; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Properties; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.ExecutionException; 031import java.util.concurrent.ExecutorService; 032import java.util.concurrent.Executors; 033import java.util.concurrent.Future; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.TimeoutException; 036import java.util.concurrent.atomic.AtomicBoolean; 037import java.util.concurrent.atomic.AtomicLong; 038 039import javax.management.ObjectName; 040 041import org.apache.activemq.DestinationDoesNotExistException; 042import org.apache.activemq.Service; 043import org.apache.activemq.advisory.AdvisoryBroker; 044import org.apache.activemq.advisory.AdvisorySupport; 045import org.apache.activemq.broker.BrokerService; 046import org.apache.activemq.broker.BrokerServiceAware; 047import org.apache.activemq.broker.ConnectionContext; 048import org.apache.activemq.broker.TransportConnection; 049import org.apache.activemq.broker.region.AbstractRegion; 050import org.apache.activemq.broker.region.DurableTopicSubscription; 051import org.apache.activemq.broker.region.Region; 052import org.apache.activemq.broker.region.RegionBroker; 053import org.apache.activemq.broker.region.Subscription; 054import org.apache.activemq.broker.region.policy.PolicyEntry; 055import org.apache.activemq.command.ActiveMQDestination; 056import org.apache.activemq.command.ActiveMQMessage; 057import org.apache.activemq.command.ActiveMQTempDestination; 058import org.apache.activemq.command.ActiveMQTopic; 059import org.apache.activemq.command.BrokerId; 060import org.apache.activemq.command.BrokerInfo; 061import org.apache.activemq.command.Command; 062import org.apache.activemq.command.ConnectionError; 063import org.apache.activemq.command.ConnectionId; 064import org.apache.activemq.command.ConnectionInfo; 065import org.apache.activemq.command.ConsumerId; 066import org.apache.activemq.command.ConsumerInfo; 067import org.apache.activemq.command.DataStructure; 068import org.apache.activemq.command.DestinationInfo; 069import org.apache.activemq.command.ExceptionResponse; 070import org.apache.activemq.command.KeepAliveInfo; 071import org.apache.activemq.command.Message; 072import org.apache.activemq.command.MessageAck; 073import org.apache.activemq.command.MessageDispatch; 074import org.apache.activemq.command.MessageId; 075import org.apache.activemq.command.NetworkBridgeFilter; 076import org.apache.activemq.command.ProducerInfo; 077import org.apache.activemq.command.RemoveInfo; 078import org.apache.activemq.command.RemoveSubscriptionInfo; 079import org.apache.activemq.command.Response; 080import org.apache.activemq.command.SessionInfo; 081import org.apache.activemq.command.ShutdownInfo; 082import org.apache.activemq.command.SubscriptionInfo; 083import org.apache.activemq.command.WireFormatInfo; 084import org.apache.activemq.filter.DestinationFilter; 085import org.apache.activemq.filter.MessageEvaluationContext; 086import org.apache.activemq.security.SecurityContext; 087import org.apache.activemq.transport.DefaultTransportListener; 088import org.apache.activemq.transport.FutureResponse; 089import org.apache.activemq.transport.ResponseCallback; 090import org.apache.activemq.transport.Transport; 091import org.apache.activemq.transport.TransportDisposedIOException; 092import org.apache.activemq.transport.TransportFilter; 093import org.apache.activemq.transport.nio.NIOSSLTransport; 094import org.apache.activemq.transport.failover.FailoverTransport; 095import org.apache.activemq.transport.tcp.SslTransport; 096import org.apache.activemq.util.IdGenerator; 097import org.apache.activemq.util.IntrospectionSupport; 098import org.apache.activemq.util.LongSequenceGenerator; 099import org.apache.activemq.util.MarshallingSupport; 100import org.apache.activemq.util.ServiceStopper; 101import org.apache.activemq.util.ServiceSupport; 102import org.slf4j.Logger; 103import org.slf4j.LoggerFactory; 104 105/** 106 * A useful base class for implementing demand forwarding bridges. 107 */ 108public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware { 109 private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class); 110 protected static final String DURABLE_SUB_PREFIX = "NC-DS_"; 111 protected final Transport localBroker; 112 protected final Transport remoteBroker; 113 protected IdGenerator idGenerator = new IdGenerator(); 114 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 115 protected ConnectionInfo localConnectionInfo; 116 protected ConnectionInfo remoteConnectionInfo; 117 protected SessionInfo localSessionInfo; 118 protected ProducerInfo producerInfo; 119 protected String remoteBrokerName = "Unknown"; 120 protected String localClientId; 121 protected ConsumerInfo demandConsumerInfo; 122 protected int demandConsumerDispatched; 123 protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false); 124 protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false); 125 protected final AtomicBoolean bridgeFailed = new AtomicBoolean(); 126 protected final AtomicBoolean disposed = new AtomicBoolean(); 127 protected BrokerId localBrokerId; 128 protected ActiveMQDestination[] excludedDestinations; 129 protected ActiveMQDestination[] dynamicallyIncludedDestinations; 130 protected ActiveMQDestination[] staticallyIncludedDestinations; 131 protected ActiveMQDestination[] durableDestinations; 132 protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>(); 133 protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>(); 134 protected final BrokerId localBrokerPath[] = new BrokerId[]{null}; 135 protected final CountDownLatch startedLatch = new CountDownLatch(2); 136 protected final CountDownLatch localStartedLatch = new CountDownLatch(1); 137 protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false); 138 protected NetworkBridgeConfiguration configuration; 139 protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory(); 140 141 protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null}; 142 protected BrokerId remoteBrokerId; 143 144 final AtomicLong enqueueCounter = new AtomicLong(); 145 final AtomicLong dequeueCounter = new AtomicLong(); 146 147 private NetworkBridgeListener networkBridgeListener; 148 private boolean createdByDuplex; 149 private BrokerInfo localBrokerInfo; 150 private BrokerInfo remoteBrokerInfo; 151 152 private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed); 153 private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed); 154 155 private final AtomicBoolean started = new AtomicBoolean(); 156 private TransportConnection duplexInitiatingConnection; 157 private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean(); 158 protected BrokerService brokerService = null; 159 private ObjectName mbeanObjectName; 160 private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor(); 161 private Transport duplexInboundLocalBroker = null; 162 private ProducerInfo duplexInboundLocalProducerInfo; 163 164 public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { 165 this.configuration = configuration; 166 this.localBroker = localBroker; 167 this.remoteBroker = remoteBroker; 168 } 169 170 public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception { 171 this.localBrokerInfo = localBrokerInfo; 172 this.remoteBrokerInfo = remoteBrokerInfo; 173 this.duplexInitiatingConnection = connection; 174 start(); 175 serviceRemoteCommand(remoteBrokerInfo); 176 } 177 178 @Override 179 public void start() throws Exception { 180 if (started.compareAndSet(false, true)) { 181 182 if (brokerService == null) { 183 throw new IllegalArgumentException("BrokerService is null on " + this); 184 } 185 186 if (isDuplex()) { 187 duplexInboundLocalBroker = NetworkBridgeFactory.createLocalAsyncTransport(brokerService.getBroker().getVmConnectorURI()); 188 duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() { 189 190 @Override 191 public void onCommand(Object o) { 192 Command command = (Command) o; 193 serviceLocalCommand(command); 194 } 195 196 @Override 197 public void onException(IOException error) { 198 serviceLocalException(error); 199 } 200 }); 201 duplexInboundLocalBroker.start(); 202 } 203 204 localBroker.setTransportListener(new DefaultTransportListener() { 205 206 @Override 207 public void onCommand(Object o) { 208 Command command = (Command) o; 209 serviceLocalCommand(command); 210 } 211 212 @Override 213 public void onException(IOException error) { 214 if (!futureLocalBrokerInfo.isDone()) { 215 LOG.info("error with pending local brokerInfo on: " + localBroker, error); 216 futureLocalBrokerInfo.cancel(true); 217 return; 218 } 219 serviceLocalException(error); 220 } 221 }); 222 223 remoteBroker.setTransportListener(new DefaultTransportListener() { 224 225 @Override 226 public void onCommand(Object o) { 227 Command command = (Command) o; 228 serviceRemoteCommand(command); 229 } 230 231 @Override 232 public void onException(IOException error) { 233 if (!futureRemoteBrokerInfo.isDone()) { 234 LOG.info("error with pending remote brokerInfo on: " + remoteBroker, error); 235 futureRemoteBrokerInfo.cancel(true); 236 return; 237 } 238 serviceRemoteException(error); 239 } 240 }); 241 242 remoteBroker.start(); 243 localBroker.start(); 244 245 if (!disposed.get()) { 246 try { 247 triggerStartAsyncNetworkBridgeCreation(); 248 } catch (IOException e) { 249 LOG.warn("Caught exception from remote start", e); 250 } 251 } else { 252 LOG.warn("Bridge was disposed before the start() method was fully executed."); 253 throw new TransportDisposedIOException(); 254 } 255 } 256 } 257 258 @Override 259 public void stop() throws Exception { 260 if (started.compareAndSet(true, false)) { 261 if (disposed.compareAndSet(false, true)) { 262 LOG.debug(" stopping {} bridge to {}", configuration.getBrokerName(), remoteBrokerName); 263 264 futureRemoteBrokerInfo.cancel(true); 265 futureLocalBrokerInfo.cancel(true); 266 267 NetworkBridgeListener l = this.networkBridgeListener; 268 if (l != null) { 269 l.onStop(this); 270 } 271 try { 272 // local start complete 273 if (startedLatch.getCount() < 2) { 274 LOG.trace("{} unregister bridge ({}) to {}", new Object[]{ 275 configuration.getBrokerName(), this, remoteBrokerName 276 }); 277 brokerService.getBroker().removeBroker(null, remoteBrokerInfo); 278 brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); 279 } 280 281 remoteBridgeStarted.set(false); 282 final CountDownLatch sendShutdown = new CountDownLatch(1); 283 284 brokerService.getTaskRunnerFactory().execute(new Runnable() { 285 @Override 286 public void run() { 287 try { 288 serialExecutor.shutdown(); 289 if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) { 290 List<Runnable> pendingTasks = serialExecutor.shutdownNow(); 291 LOG.info("pending tasks on stop {}", pendingTasks); 292 } 293 localBroker.oneway(new ShutdownInfo()); 294 remoteBroker.oneway(new ShutdownInfo()); 295 } catch (Throwable e) { 296 LOG.debug("Caught exception sending shutdown", e); 297 } finally { 298 sendShutdown.countDown(); 299 } 300 301 } 302 }, "ActiveMQ ForwardingBridge StopTask"); 303 304 if (!sendShutdown.await(10, TimeUnit.SECONDS)) { 305 LOG.info("Network Could not shutdown in a timely manner"); 306 } 307 } finally { 308 ServiceStopper ss = new ServiceStopper(); 309 stopFailoverTransport(remoteBroker); 310 ss.stop(remoteBroker); 311 ss.stop(localBroker); 312 ss.stop(duplexInboundLocalBroker); 313 // Release the started Latch since another thread could be 314 // stuck waiting for it to start up. 315 startedLatch.countDown(); 316 startedLatch.countDown(); 317 localStartedLatch.countDown(); 318 319 ss.throwFirstException(); 320 } 321 } 322 323 LOG.info("{} bridge to {} stopped", configuration.getBrokerName(), remoteBrokerName); 324 } 325 } 326 327 private void stopFailoverTransport(Transport transport) { 328 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class); 329 if (failoverTransport != null) { 330 // may be blocked on write, in which case stop will block 331 try { 332 failoverTransport.handleTransportFailure(new IOException("Bridge stopped")); 333 } catch (InterruptedException ignored) {} 334 } 335 } 336 337 protected void triggerStartAsyncNetworkBridgeCreation() throws IOException { 338 brokerService.getTaskRunnerFactory().execute(new Runnable() { 339 @Override 340 public void run() { 341 final String originalName = Thread.currentThread().getName(); 342 Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " + 343 "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker); 344 345 try { 346 // First we collect the info data from both the local and remote ends 347 collectBrokerInfos(); 348 349 // Once we have all required broker info we can attempt to start 350 // the local and then remote sides of the bridge. 351 doStartLocalAndRemoteBridges(); 352 } finally { 353 Thread.currentThread().setName(originalName); 354 } 355 } 356 }); 357 } 358 359 private void collectBrokerInfos() { 360 361 // First wait for the remote to feed us its BrokerInfo, then we can check on 362 // the LocalBrokerInfo and decide is this is a loop. 363 try { 364 remoteBrokerInfo = futureRemoteBrokerInfo.get(); 365 if (remoteBrokerInfo == null) { 366 serviceLocalException(new Throwable("remoteBrokerInfo is null")); 367 return; 368 } 369 } catch (Exception e) { 370 serviceRemoteException(e); 371 return; 372 } 373 374 try { 375 localBrokerInfo = futureLocalBrokerInfo.get(); 376 if (localBrokerInfo == null) { 377 serviceLocalException(new Throwable("localBrokerInfo is null")); 378 return; 379 } 380 381 // Before we try and build the bridge lets check if we are in a loop 382 // and if so just stop now before registering anything. 383 remoteBrokerId = remoteBrokerInfo.getBrokerId(); 384 if (localBrokerId.equals(remoteBrokerId)) { 385 LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{ 386 configuration.getBrokerName(), remoteBrokerName, remoteBrokerId 387 }); 388 ServiceSupport.dispose(localBroker); 389 ServiceSupport.dispose(remoteBroker); 390 // the bridge is left in a bit of limbo, but it won't get retried 391 // in this state. 392 return; 393 } 394 395 // Fill in the remote broker's information now. 396 remoteBrokerPath[0] = remoteBrokerId; 397 remoteBrokerName = remoteBrokerInfo.getBrokerName(); 398 if (configuration.isUseBrokerNamesAsIdSeed()) { 399 idGenerator = new IdGenerator(brokerService.getBrokerName() + "->" + remoteBrokerName); 400 } 401 } catch (Throwable e) { 402 serviceLocalException(e); 403 } 404 } 405 406 private void doStartLocalAndRemoteBridges() { 407 408 if (disposed.get()) { 409 return; 410 } 411 412 if (isCreatedByDuplex()) { 413 // apply remote (propagated) configuration to local duplex bridge before start 414 Properties props = null; 415 try { 416 props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties()); 417 IntrospectionSupport.getProperties(configuration, props, null); 418 if (configuration.getExcludedDestinations() != null) { 419 excludedDestinations = configuration.getExcludedDestinations().toArray( 420 new ActiveMQDestination[configuration.getExcludedDestinations().size()]); 421 } 422 if (configuration.getStaticallyIncludedDestinations() != null) { 423 staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray( 424 new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]); 425 } 426 if (configuration.getDynamicallyIncludedDestinations() != null) { 427 dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray( 428 new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]); 429 } 430 } catch (Throwable t) { 431 LOG.error("Error mapping remote configuration: {}", props, t); 432 } 433 } 434 435 try { 436 startLocalBridge(); 437 } catch (Throwable e) { 438 serviceLocalException(e); 439 return; 440 } 441 442 try { 443 startRemoteBridge(); 444 } catch (Throwable e) { 445 serviceRemoteException(e); 446 return; 447 } 448 449 try { 450 if (safeWaitUntilStarted()) { 451 setupStaticDestinations(); 452 } 453 } catch (Throwable e) { 454 serviceLocalException(e); 455 } 456 } 457 458 private void startLocalBridge() throws Throwable { 459 if (!bridgeFailed.get() && localBridgeStarted.compareAndSet(false, true)) { 460 synchronized (this) { 461 LOG.trace("{} starting local Bridge, localBroker={}", configuration.getBrokerName(), localBroker); 462 if (!disposed.get()) { 463 464 if (idGenerator == null) { 465 throw new IllegalStateException("Id Generator cannot be null"); 466 } 467 468 localConnectionInfo = new ConnectionInfo(); 469 localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 470 localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName(); 471 localConnectionInfo.setClientId(localClientId); 472 localConnectionInfo.setUserName(configuration.getUserName()); 473 localConnectionInfo.setPassword(configuration.getPassword()); 474 Transport originalTransport = remoteBroker; 475 while (originalTransport instanceof TransportFilter) { 476 originalTransport = ((TransportFilter) originalTransport).getNext(); 477 } 478 setTransportContext(originalTransport, localConnectionInfo); 479 // sync requests that may fail 480 Object resp = localBroker.request(localConnectionInfo); 481 if (resp instanceof ExceptionResponse) { 482 throw ((ExceptionResponse) resp).getException(); 483 } 484 localSessionInfo = new SessionInfo(localConnectionInfo, 1); 485 localBroker.oneway(localSessionInfo); 486 487 if (configuration.isDuplex()) { 488 // separate in-bound channel for forwards so we don't 489 // contend with out-bound dispatch on same connection 490 remoteBrokerInfo.setNetworkConnection(true); 491 duplexInboundLocalBroker.oneway(remoteBrokerInfo); 492 493 ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo(); 494 duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 495 duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_" 496 + configuration.getBrokerName()); 497 duplexLocalConnectionInfo.setUserName(configuration.getUserName()); 498 duplexLocalConnectionInfo.setPassword(configuration.getPassword()); 499 500 setTransportContext(originalTransport, duplexLocalConnectionInfo); 501 502 // sync requests that may fail 503 resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo); 504 if (resp instanceof ExceptionResponse) { 505 throw ((ExceptionResponse) resp).getException(); 506 } 507 SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1); 508 duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1); 509 duplexInboundLocalBroker.oneway(duplexInboundSession); 510 duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo); 511 } 512 brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString()); 513 NetworkBridgeListener l = this.networkBridgeListener; 514 if (l != null) { 515 l.onStart(this); 516 } 517 518 // Let the local broker know the remote broker's ID. 519 localBroker.oneway(remoteBrokerInfo); 520 // new peer broker (a consumer can work with remote broker also) 521 brokerService.getBroker().addBroker(null, remoteBrokerInfo); 522 523 LOG.info("Network connection between {} and {} ({}) has been established.", new Object[]{ 524 localBroker, remoteBroker, remoteBrokerName 525 }); 526 LOG.trace("{} register bridge ({}) to {}", new Object[]{ 527 configuration.getBrokerName(), this, remoteBrokerName 528 }); 529 } else { 530 LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed."); 531 } 532 startedLatch.countDown(); 533 localStartedLatch.countDown(); 534 } 535 } 536 } 537 538 private void setTransportContext(Transport transport, ConnectionInfo connectionInfo) { 539 if (transport instanceof SslTransport) { 540 connectionInfo.setTransportContext(((SslTransport)transport).getPeerCertificates()); 541 } else if (transport instanceof NIOSSLTransport) { 542 connectionInfo.setTransportContext(((NIOSSLTransport)transport).getPeerCertificates()); 543 } 544 } 545 546 protected void startRemoteBridge() throws Exception { 547 if (!bridgeFailed.get() && remoteBridgeStarted.compareAndSet(false, true)) { 548 LOG.trace("{} starting remote Bridge, remoteBroker={}", configuration.getBrokerName(), remoteBroker); 549 synchronized (this) { 550 if (!isCreatedByDuplex()) { 551 BrokerInfo brokerInfo = new BrokerInfo(); 552 brokerInfo.setBrokerName(configuration.getBrokerName()); 553 brokerInfo.setBrokerURL(configuration.getBrokerURL()); 554 brokerInfo.setNetworkConnection(true); 555 brokerInfo.setDuplexConnection(configuration.isDuplex()); 556 // set our properties 557 Properties props = new Properties(); 558 IntrospectionSupport.getProperties(configuration, props, null); 559 props.remove("networkTTL"); 560 String str = MarshallingSupport.propertiesToString(props); 561 brokerInfo.setNetworkProperties(str); 562 brokerInfo.setBrokerId(this.localBrokerId); 563 remoteBroker.oneway(brokerInfo); 564 } 565 if (remoteConnectionInfo != null) { 566 remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand()); 567 } 568 remoteConnectionInfo = new ConnectionInfo(); 569 remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 570 remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound"); 571 remoteConnectionInfo.setUserName(configuration.getUserName()); 572 remoteConnectionInfo.setPassword(configuration.getPassword()); 573 remoteBroker.oneway(remoteConnectionInfo); 574 575 SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1); 576 remoteBroker.oneway(remoteSessionInfo); 577 producerInfo = new ProducerInfo(remoteSessionInfo, 1); 578 producerInfo.setResponseRequired(false); 579 remoteBroker.oneway(producerInfo); 580 // Listen to consumer advisory messages on the remote broker to determine demand. 581 if (!configuration.isStaticBridge()) { 582 demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1); 583 // always dispatch advisory message asynchronously so that 584 // we never block the producer broker if we are slow 585 demandConsumerInfo.setDispatchAsync(true); 586 String advisoryTopic = configuration.getDestinationFilter(); 587 if (configuration.isBridgeTempDestinations()) { 588 advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC; 589 } 590 demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic)); 591 demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize()); 592 remoteBroker.oneway(demandConsumerInfo); 593 } 594 startedLatch.countDown(); 595 } 596 } 597 } 598 599 @Override 600 public void serviceRemoteException(Throwable error) { 601 if (!disposed.get()) { 602 if (error instanceof SecurityException || error instanceof GeneralSecurityException) { 603 LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{ 604 localBroker, remoteBroker, error 605 }); 606 } else { 607 LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{ 608 localBroker, remoteBroker, error 609 }); 610 } 611 LOG.debug("The remote Exception was: {}", error, error); 612 brokerService.getTaskRunnerFactory().execute(new Runnable() { 613 @Override 614 public void run() { 615 ServiceSupport.dispose(getControllingService()); 616 } 617 }); 618 fireBridgeFailed(error); 619 } 620 } 621 622 protected void serviceRemoteCommand(Command command) { 623 if (!disposed.get()) { 624 try { 625 if (command.isMessageDispatch()) { 626 safeWaitUntilStarted(); 627 MessageDispatch md = (MessageDispatch) command; 628 serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure()); 629 ackAdvisory(md.getMessage()); 630 } else if (command.isBrokerInfo()) { 631 futureRemoteBrokerInfo.set((BrokerInfo) command); 632 } else if (command.getClass() == ConnectionError.class) { 633 ConnectionError ce = (ConnectionError) command; 634 serviceRemoteException(ce.getException()); 635 } else { 636 if (isDuplex()) { 637 LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType()); 638 if (command.isMessage()) { 639 final ActiveMQMessage message = (ActiveMQMessage) command; 640 if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) { 641 serviceRemoteConsumerAdvisory(message.getDataStructure()); 642 ackAdvisory(message); 643 } else { 644 if (!isPermissableDestination(message.getDestination(), true)) { 645 return; 646 } 647 // message being forwarded - we need to 648 // propagate the response to our local send 649 if (canDuplexDispatch(message)) { 650 message.setProducerId(duplexInboundLocalProducerInfo.getProducerId()); 651 if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) { 652 duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() { 653 final int correlationId = message.getCommandId(); 654 655 @Override 656 public void onCompletion(FutureResponse resp) { 657 try { 658 Response reply = resp.getResult(); 659 reply.setCorrelationId(correlationId); 660 remoteBroker.oneway(reply); 661 } catch (IOException error) { 662 LOG.error("Exception: {} on duplex forward of: {}", error, message); 663 serviceRemoteException(error); 664 } 665 } 666 }); 667 } else { 668 duplexInboundLocalBroker.oneway(message); 669 } 670 serviceInboundMessage(message); 671 } else { 672 if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) { 673 Response reply = new Response(); 674 reply.setCorrelationId(message.getCommandId()); 675 remoteBroker.oneway(reply); 676 } 677 } 678 } 679 } else { 680 switch (command.getDataStructureType()) { 681 case ConnectionInfo.DATA_STRUCTURE_TYPE: 682 if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) { 683 // end of initiating connection setup - propogate to initial connection to get mbean by clientid 684 duplexInitiatingConnection.processAddConnection((ConnectionInfo) command); 685 } else { 686 localBroker.oneway(command); 687 } 688 break; 689 case SessionInfo.DATA_STRUCTURE_TYPE: 690 localBroker.oneway(command); 691 break; 692 case ProducerInfo.DATA_STRUCTURE_TYPE: 693 // using duplexInboundLocalProducerInfo 694 break; 695 case MessageAck.DATA_STRUCTURE_TYPE: 696 MessageAck ack = (MessageAck) command; 697 DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId()); 698 if (localSub != null) { 699 ack.setConsumerId(localSub.getLocalInfo().getConsumerId()); 700 localBroker.oneway(ack); 701 } else { 702 LOG.warn("Matching local subscription not found for ack: {}", ack); 703 } 704 break; 705 case ConsumerInfo.DATA_STRUCTURE_TYPE: 706 localStartedLatch.await(); 707 if (started.get()) { 708 final ConsumerInfo consumerInfo = (ConsumerInfo) command; 709 if (isDuplicateSuppressionOff(consumerInfo)) { 710 addConsumerInfo(consumerInfo); 711 } else { 712 synchronized (brokerService.getVmConnectorURI()) { 713 addConsumerInfo(consumerInfo); 714 } 715 } 716 } else { 717 // received a subscription whilst stopping 718 LOG.warn("Stopping - ignoring ConsumerInfo: {}", command); 719 } 720 break; 721 case ShutdownInfo.DATA_STRUCTURE_TYPE: 722 // initiator is shutting down, controlled case 723 // abortive close dealt with by inactivity monitor 724 LOG.info("Stopping network bridge on shutdown of remote broker"); 725 serviceRemoteException(new IOException(command.toString())); 726 break; 727 default: 728 LOG.debug("Ignoring remote command: {}", command); 729 } 730 } 731 } else { 732 switch (command.getDataStructureType()) { 733 case KeepAliveInfo.DATA_STRUCTURE_TYPE: 734 case WireFormatInfo.DATA_STRUCTURE_TYPE: 735 case ShutdownInfo.DATA_STRUCTURE_TYPE: 736 break; 737 default: 738 LOG.warn("Unexpected remote command: {}", command); 739 } 740 } 741 } 742 } catch (Throwable e) { 743 LOG.debug("Exception processing remote command: {}", command, e); 744 serviceRemoteException(e); 745 } 746 } 747 } 748 749 private void ackAdvisory(Message message) throws IOException { 750 demandConsumerDispatched++; 751 if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) { 752 final MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched); 753 ack.setConsumerId(demandConsumerInfo.getConsumerId()); 754 brokerService.getTaskRunnerFactory().execute(new Runnable() { 755 @Override 756 public void run() { 757 try { 758 remoteBroker.oneway(ack); 759 } catch (IOException e) { 760 LOG.warn("Failed to send advisory ack " + ack, e); 761 } 762 } 763 }); 764 demandConsumerDispatched = 0; 765 } 766 } 767 768 private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException { 769 final int networkTTL = configuration.getConsumerTTL(); 770 if (data.getClass() == ConsumerInfo.class) { 771 // Create a new local subscription 772 ConsumerInfo info = (ConsumerInfo) data; 773 BrokerId[] path = info.getBrokerPath(); 774 775 if (info.isBrowser()) { 776 LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName); 777 return; 778 } 779 780 if (path != null && networkTTL > -1 && path.length >= networkTTL) { 781 LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{ 782 configuration.getBrokerName(), remoteBrokerName, networkTTL, info 783 }); 784 return; 785 } 786 787 if (contains(path, localBrokerPath[0])) { 788 // Ignore this consumer as it's a consumer we locally sent to the broker. 789 LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{ 790 configuration.getBrokerName(), remoteBrokerName, info 791 }); 792 return; 793 } 794 795 if (!isPermissableDestination(info.getDestination())) { 796 // ignore if not in the permitted or in the excluded list 797 LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{ 798 configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info 799 }); 800 return; 801 } 802 803 // in a cyclic network there can be multiple bridges per broker that can propagate 804 // a network subscription so there is a need to synchronize on a shared entity 805 // if duplicate suppression is required 806 if (isDuplicateSuppressionOff(info)) { 807 addConsumerInfo(info); 808 } else { 809 synchronized (brokerService.getVmConnectorURI()) { 810 addConsumerInfo(info); 811 } 812 } 813 } else if (data.getClass() == DestinationInfo.class) { 814 // It's a destination info - we want to pass up information about temporary destinations 815 final DestinationInfo destInfo = (DestinationInfo) data; 816 BrokerId[] path = destInfo.getBrokerPath(); 817 if (path != null && networkTTL > -1 && path.length >= networkTTL) { 818 LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{ 819 configuration.getBrokerName(), destInfo, networkTTL 820 }); 821 return; 822 } 823 if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) { 824 LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo); 825 return; 826 } 827 destInfo.setConnectionId(localConnectionInfo.getConnectionId()); 828 if (destInfo.getDestination() instanceof ActiveMQTempDestination) { 829 // re-set connection id so comes from here 830 ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination(); 831 tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId()); 832 } 833 destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath())); 834 LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{ 835 configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo 836 }); 837 if (destInfo.isRemoveOperation()) { 838 // Serialize with removeSub operations such that all removeSub advisories 839 // are generated 840 serialExecutor.execute(new Runnable() { 841 @Override 842 public void run() { 843 try { 844 localBroker.oneway(destInfo); 845 } catch (IOException e) { 846 LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e); 847 } 848 } 849 }); 850 } else { 851 localBroker.oneway(destInfo); 852 } 853 } else if (data.getClass() == RemoveInfo.class) { 854 ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); 855 removeDemandSubscription(id); 856 } else if (data.getClass() == RemoveSubscriptionInfo.class) { 857 RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data); 858 SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()); 859 for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) { 860 DemandSubscription ds = i.next(); 861 boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo); 862 if (removed) { 863 if (ds.getDurableRemoteSubs().isEmpty()) { 864 865 // deactivate subscriber 866 RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId()); 867 localBroker.oneway(removeInfo); 868 869 // remove subscriber 870 RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo(); 871 sending.setClientId(localClientId); 872 sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName()); 873 sending.setConnectionId(this.localConnectionInfo.getConnectionId()); 874 localBroker.oneway(sending); 875 } 876 } 877 } 878 } 879 } 880 881 @Override 882 public void serviceLocalException(Throwable error) { 883 serviceLocalException(null, error); 884 } 885 886 public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) { 887 LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error); 888 if (!disposed.get()) { 889 if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) { 890 // not a reason to terminate the bridge - temps can disappear with 891 // pending sends as the demand sub may outlive the remote dest 892 if (messageDispatch != null) { 893 LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error); 894 try { 895 MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1); 896 poisonAck.setPoisonCause(error); 897 localBroker.oneway(poisonAck); 898 } catch (IOException ioe) { 899 LOG.error("Failed to posion ack message following forward failure: ", ioe); 900 } 901 fireFailedForwardAdvisory(messageDispatch, error); 902 } else { 903 LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error); 904 } 905 return; 906 } 907 908 LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error}); 909 LOG.debug("The local Exception was: {}", error, error); 910 911 brokerService.getTaskRunnerFactory().execute(new Runnable() { 912 @Override 913 public void run() { 914 ServiceSupport.dispose(getControllingService()); 915 } 916 }); 917 fireBridgeFailed(error); 918 } 919 } 920 921 private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) { 922 if (configuration.isAdvisoryForFailedForward()) { 923 AdvisoryBroker advisoryBroker = null; 924 try { 925 advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class); 926 927 if (advisoryBroker != null) { 928 ConnectionContext context = new ConnectionContext(); 929 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 930 context.setBroker(brokerService.getBroker()); 931 932 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 933 advisoryMessage.setStringProperty("cause", error.getLocalizedMessage()); 934 advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null, 935 advisoryMessage); 936 937 } 938 } catch (Exception e) { 939 LOG.warn("failed to fire forward failure advisory, cause: {}", e); 940 LOG.debug("detail", e); 941 } 942 } 943 } 944 945 protected Service getControllingService() { 946 return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this; 947 } 948 949 protected void addSubscription(DemandSubscription sub) throws IOException { 950 if (sub != null) { 951 localBroker.oneway(sub.getLocalInfo()); 952 } 953 } 954 955 protected void removeSubscription(final DemandSubscription sub) throws IOException { 956 if (sub != null) { 957 LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()}); 958 959 // ensure not available for conduit subs pending removal 960 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); 961 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); 962 963 // continue removal in separate thread to free up this thread for outstanding responses 964 // Serialize with removeDestination operations so that removeSubs are serialized with 965 // removeDestinations such that all removeSub advisories are generated 966 serialExecutor.execute(new Runnable() { 967 @Override 968 public void run() { 969 sub.waitForCompletion(); 970 try { 971 localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); 972 } catch (IOException e) { 973 LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e); 974 } 975 } 976 }); 977 } 978 } 979 980 protected Message configureMessage(MessageDispatch md) throws IOException { 981 Message message = md.getMessage().copy(); 982 // Update the packet to show where it came from. 983 message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath)); 984 message.setProducerId(producerInfo.getProducerId()); 985 message.setDestination(md.getDestination()); 986 message.setMemoryUsage(null); 987 if (message.getOriginalTransactionId() == null) { 988 message.setOriginalTransactionId(message.getTransactionId()); 989 } 990 message.setTransactionId(null); 991 if (configuration.isUseCompression()) { 992 message.compress(); 993 } 994 return message; 995 } 996 997 protected void serviceLocalCommand(Command command) { 998 if (!disposed.get()) { 999 try { 1000 if (command.isMessageDispatch()) { 1001 safeWaitUntilStarted(); 1002 enqueueCounter.incrementAndGet(); 1003 final MessageDispatch md = (MessageDispatch) command; 1004 final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); 1005 if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) { 1006 1007 if (suppressMessageDispatch(md, sub)) { 1008 LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{ 1009 configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage() 1010 }); 1011 // still ack as it may be durable 1012 try { 1013 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 1014 } finally { 1015 sub.decrementOutstandingResponses(); 1016 } 1017 return; 1018 } 1019 1020 Message message = configureMessage(md); 1021 LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{ 1022 configuration.getBrokerName(), remoteBrokerName, (LOG.isTraceEnabled() ? message : message.getMessageId()), md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), message 1023 }); 1024 1025 if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) { 1026 try { 1027 // never request b/c they are eventually acked async 1028 remoteBroker.oneway(message); 1029 } finally { 1030 sub.decrementOutstandingResponses(); 1031 } 1032 return; 1033 } 1034 1035 if (message.isPersistent() || configuration.isAlwaysSyncSend()) { 1036 1037 // The message was not sent using async send, so we should only 1038 // ack the local broker when we get confirmation that the remote 1039 // broker has received the message. 1040 remoteBroker.asyncRequest(message, new ResponseCallback() { 1041 @Override 1042 public void onCompletion(FutureResponse future) { 1043 try { 1044 Response response = future.getResult(); 1045 if (response.isException()) { 1046 ExceptionResponse er = (ExceptionResponse) response; 1047 serviceLocalException(md, er.getException()); 1048 } else { 1049 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 1050 dequeueCounter.incrementAndGet(); 1051 } 1052 } catch (IOException e) { 1053 serviceLocalException(md, e); 1054 } finally { 1055 sub.decrementOutstandingResponses(); 1056 } 1057 } 1058 }); 1059 1060 } else { 1061 // If the message was originally sent using async send, we will 1062 // preserve that QOS by bridging it using an async send (small chance 1063 // of message loss). 1064 try { 1065 remoteBroker.oneway(message); 1066 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 1067 dequeueCounter.incrementAndGet(); 1068 } finally { 1069 sub.decrementOutstandingResponses(); 1070 } 1071 } 1072 serviceOutbound(message); 1073 } else { 1074 LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage()); 1075 } 1076 } else if (command.isBrokerInfo()) { 1077 futureLocalBrokerInfo.set((BrokerInfo) command); 1078 } else if (command.isShutdownInfo()) { 1079 LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName()); 1080 stop(); 1081 } else if (command.getClass() == ConnectionError.class) { 1082 ConnectionError ce = (ConnectionError) command; 1083 serviceLocalException(ce.getException()); 1084 } else { 1085 switch (command.getDataStructureType()) { 1086 case WireFormatInfo.DATA_STRUCTURE_TYPE: 1087 break; 1088 default: 1089 LOG.warn("Unexpected local command: {}", command); 1090 } 1091 } 1092 } catch (Throwable e) { 1093 LOG.warn("Caught an exception processing local command", e); 1094 serviceLocalException(e); 1095 } 1096 } 1097 } 1098 1099 private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception { 1100 boolean suppress = false; 1101 // for durable subs, suppression via filter leaves dangling acks so we 1102 // need to check here and allow the ack irrespective 1103 if (sub.getLocalInfo().isDurable()) { 1104 MessageEvaluationContext messageEvalContext = new MessageEvaluationContext(); 1105 messageEvalContext.setMessageReference(md.getMessage()); 1106 messageEvalContext.setDestination(md.getDestination()); 1107 suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext); 1108 //AMQ-6465 - Need to decrement the reference count after checking matches() as 1109 //the call above will increment the reference count by 1 1110 messageEvalContext.getMessageReference().decrementReferenceCount(); 1111 } 1112 return suppress; 1113 } 1114 1115 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { 1116 if (brokerPath != null) { 1117 for (BrokerId id : brokerPath) { 1118 if (brokerId.equals(id)) { 1119 return true; 1120 } 1121 } 1122 } 1123 return false; 1124 } 1125 1126 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) { 1127 if (brokerPath == null || brokerPath.length == 0) { 1128 return pathsToAppend; 1129 } 1130 BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length]; 1131 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); 1132 System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length); 1133 return rc; 1134 } 1135 1136 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) { 1137 if (brokerPath == null || brokerPath.length == 0) { 1138 return new BrokerId[]{idToAppend}; 1139 } 1140 BrokerId rc[] = new BrokerId[brokerPath.length + 1]; 1141 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); 1142 rc[brokerPath.length] = idToAppend; 1143 return rc; 1144 } 1145 1146 protected boolean isPermissableDestination(ActiveMQDestination destination) { 1147 return isPermissableDestination(destination, false); 1148 } 1149 1150 protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) { 1151 // Are we not bridging temporary destinations? 1152 if (destination.isTemporary()) { 1153 if (allowTemporary) { 1154 return true; 1155 } else { 1156 return configuration.isBridgeTempDestinations(); 1157 } 1158 } 1159 1160 ActiveMQDestination[] dests = staticallyIncludedDestinations; 1161 if (dests != null && dests.length > 0) { 1162 for (ActiveMQDestination dest : dests) { 1163 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); 1164 if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { 1165 return true; 1166 } 1167 } 1168 } 1169 1170 dests = excludedDestinations; 1171 if (dests != null && dests.length > 0) { 1172 for (ActiveMQDestination dest : dests) { 1173 DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest); 1174 if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { 1175 return false; 1176 } 1177 } 1178 } 1179 1180 dests = dynamicallyIncludedDestinations; 1181 if (dests != null && dests.length > 0) { 1182 for (ActiveMQDestination dest : dests) { 1183 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); 1184 if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { 1185 return true; 1186 } 1187 } 1188 1189 return false; 1190 } 1191 return true; 1192 } 1193 1194 /** 1195 * Subscriptions for these destinations are always created 1196 */ 1197 protected void setupStaticDestinations() { 1198 ActiveMQDestination[] dests = staticallyIncludedDestinations; 1199 if (dests != null) { 1200 for (ActiveMQDestination dest : dests) { 1201 DemandSubscription sub = createDemandSubscription(dest); 1202 sub.setStaticallyIncluded(true); 1203 try { 1204 addSubscription(sub); 1205 } catch (IOException e) { 1206 LOG.error("Failed to add static destination {}", dest, e); 1207 } 1208 LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest); 1209 } 1210 } 1211 } 1212 1213 protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException { 1214 ConsumerInfo info = consumerInfo.copy(); 1215 addRemoteBrokerToBrokerPath(info); 1216 DemandSubscription sub = createDemandSubscription(info); 1217 if (sub != null) { 1218 if (duplicateSuppressionIsRequired(sub)) { 1219 undoMapRegistration(sub); 1220 } else { 1221 if (consumerInfo.isDurable()) { 1222 sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName())); 1223 } 1224 addSubscription(sub); 1225 LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub); 1226 } 1227 } 1228 } 1229 1230 private void undoMapRegistration(DemandSubscription sub) { 1231 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); 1232 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); 1233 } 1234 1235 /* 1236 * check our existing subs networkConsumerIds against the list of network 1237 * ids in this subscription A match means a duplicate which we suppress for 1238 * topics and maybe for queues 1239 */ 1240 private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) { 1241 final ConsumerInfo consumerInfo = candidate.getRemoteInfo(); 1242 boolean suppress = false; 1243 1244 if (isDuplicateSuppressionOff(consumerInfo)) { 1245 return suppress; 1246 } 1247 1248 List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds(); 1249 Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination()); 1250 for (Subscription sub : currentSubs) { 1251 List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds(); 1252 if (!networkConsumers.isEmpty()) { 1253 if (matchFound(candidateConsumers, networkConsumers)) { 1254 if (isInActiveDurableSub(sub)) { 1255 suppress = false; 1256 } else { 1257 suppress = hasLowerPriority(sub, candidate.getLocalInfo()); 1258 } 1259 break; 1260 } 1261 } 1262 } 1263 return suppress; 1264 } 1265 1266 private boolean isDuplicateSuppressionOff(final ConsumerInfo consumerInfo) { 1267 return !configuration.isSuppressDuplicateQueueSubscriptions() && !configuration.isSuppressDuplicateTopicSubscriptions() 1268 || consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() 1269 || consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions(); 1270 } 1271 1272 private boolean isInActiveDurableSub(Subscription sub) { 1273 return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive()); 1274 } 1275 1276 private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) { 1277 boolean suppress = false; 1278 1279 if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) { 1280 LOG.debug("{} Ignoring duplicate subscription from {}, sub: {} is duplicate by network subscription with equal or higher network priority: {}, networkConsumerIds: {}", new Object[]{ 1281 configuration.getBrokerName(), remoteBrokerName, candidateInfo, existingSub, existingSub.getConsumerInfo().getNetworkConsumerIds() 1282 }); 1283 suppress = true; 1284 } else { 1285 // remove the existing lower priority duplicate and allow this candidate 1286 try { 1287 removeDuplicateSubscription(existingSub); 1288 1289 LOG.debug("{} Replacing duplicate subscription {} with sub from {}, which has a higher priority, new sub: {}, networkConsumerIds: {}", new Object[]{ 1290 configuration.getBrokerName(), existingSub.getConsumerInfo(), remoteBrokerName, candidateInfo, candidateInfo.getNetworkConsumerIds() 1291 }); 1292 } catch (IOException e) { 1293 LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: {}", existingSub, e); 1294 } 1295 } 1296 return suppress; 1297 } 1298 1299 private void removeDuplicateSubscription(Subscription existingSub) throws IOException { 1300 for (NetworkConnector connector : brokerService.getNetworkConnectors()) { 1301 if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) { 1302 break; 1303 } 1304 } 1305 } 1306 1307 private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) { 1308 boolean found = false; 1309 for (ConsumerId aliasConsumer : networkConsumers) { 1310 if (candidateConsumers.contains(aliasConsumer)) { 1311 found = true; 1312 break; 1313 } 1314 } 1315 return found; 1316 } 1317 1318 protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) { 1319 RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker(); 1320 Region region; 1321 Collection<Subscription> subs; 1322 1323 region = null; 1324 switch (dest.getDestinationType()) { 1325 case ActiveMQDestination.QUEUE_TYPE: 1326 region = region_broker.getQueueRegion(); 1327 break; 1328 case ActiveMQDestination.TOPIC_TYPE: 1329 region = region_broker.getTopicRegion(); 1330 break; 1331 case ActiveMQDestination.TEMP_QUEUE_TYPE: 1332 region = region_broker.getTempQueueRegion(); 1333 break; 1334 case ActiveMQDestination.TEMP_TOPIC_TYPE: 1335 region = region_broker.getTempTopicRegion(); 1336 break; 1337 } 1338 1339 if (region instanceof AbstractRegion) { 1340 subs = ((AbstractRegion) region).getSubscriptions().values(); 1341 } else { 1342 subs = null; 1343 } 1344 1345 return subs; 1346 } 1347 1348 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 1349 // add our original id to ourselves 1350 info.addNetworkConsumerId(info.getConsumerId()); 1351 return doCreateDemandSubscription(info); 1352 } 1353 1354 protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException { 1355 DemandSubscription result = new DemandSubscription(info); 1356 result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); 1357 if (info.getDestination().isTemporary()) { 1358 // reset the local connection Id 1359 ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination(); 1360 dest.setConnectionId(localConnectionInfo.getConnectionId().toString()); 1361 } 1362 1363 if (configuration.isDecreaseNetworkConsumerPriority()) { 1364 byte priority = (byte) configuration.getConsumerPriorityBase(); 1365 if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) { 1366 // The longer the path to the consumer, the less it's consumer priority. 1367 priority -= info.getBrokerPath().length + 1; 1368 } 1369 result.getLocalInfo().setPriority(priority); 1370 LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info}); 1371 } 1372 configureDemandSubscription(info, result); 1373 return result; 1374 } 1375 1376 final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) { 1377 ConsumerInfo info = new ConsumerInfo(); 1378 info.setNetworkSubscription(true); 1379 info.setDestination(destination); 1380 1381 // Indicate that this subscription is being made on behalf of the remote broker. 1382 info.setBrokerPath(new BrokerId[]{remoteBrokerId}); 1383 1384 // the remote info held by the DemandSubscription holds the original 1385 // consumerId, the local info get's overwritten 1386 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); 1387 DemandSubscription result = null; 1388 try { 1389 result = createDemandSubscription(info); 1390 } catch (IOException e) { 1391 LOG.error("Failed to create DemandSubscription ", e); 1392 } 1393 return result; 1394 } 1395 1396 protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException { 1397 if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) || 1398 AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) { 1399 sub.getLocalInfo().setDispatchAsync(true); 1400 } else { 1401 sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync()); 1402 } 1403 sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize()); 1404 subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub); 1405 subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub); 1406 1407 sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info)); 1408 if (!info.isDurable()) { 1409 // This works for now since we use a VM connection to the local broker. 1410 // may need to change if we ever subscribe to a remote broker. 1411 sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter()); 1412 } else { 1413 sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); 1414 } 1415 } 1416 1417 protected void removeDemandSubscription(ConsumerId id) throws IOException { 1418 DemandSubscription sub = subscriptionMapByRemoteId.remove(id); 1419 LOG.debug("{} remove request on {} from {}, consumer id: {}, matching sub: {}", new Object[]{ 1420 configuration.getBrokerName(), localBroker, remoteBrokerName, id, sub 1421 }); 1422 if (sub != null) { 1423 removeSubscription(sub); 1424 LOG.debug("{} removed sub on {} from {}: {}", new Object[]{ 1425 configuration.getBrokerName(), localBroker, remoteBrokerName, sub.getRemoteInfo() 1426 }); 1427 } 1428 } 1429 1430 protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) { 1431 boolean removeDone = false; 1432 DemandSubscription sub = subscriptionMapByLocalId.get(consumerId); 1433 if (sub != null) { 1434 try { 1435 removeDemandSubscription(sub.getRemoteInfo().getConsumerId()); 1436 removeDone = true; 1437 } catch (IOException e) { 1438 LOG.debug("removeDemandSubscriptionByLocalId failed for localId: {}", consumerId, e); 1439 } 1440 } 1441 return removeDone; 1442 } 1443 1444 /** 1445 * Performs a timed wait on the started latch and then checks for disposed 1446 * before performing another wait each time the the started wait times out. 1447 */ 1448 protected boolean safeWaitUntilStarted() throws InterruptedException { 1449 while (!disposed.get()) { 1450 if (startedLatch.await(1, TimeUnit.SECONDS)) { 1451 break; 1452 } 1453 } 1454 return !disposed.get(); 1455 } 1456 1457 protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException { 1458 NetworkBridgeFilterFactory filterFactory = defaultFilterFactory; 1459 if (brokerService != null && brokerService.getDestinationPolicy() != null) { 1460 PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination()); 1461 if (entry != null && entry.getNetworkBridgeFilterFactory() != null) { 1462 filterFactory = entry.getNetworkBridgeFilterFactory(); 1463 } 1464 } 1465 return filterFactory.create(info, getRemoteBrokerPath(), configuration.getMessageTTL(), configuration.getConsumerTTL()); 1466 } 1467 1468 protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException { 1469 info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath())); 1470 } 1471 1472 protected BrokerId[] getRemoteBrokerPath() { 1473 return remoteBrokerPath; 1474 } 1475 1476 @Override 1477 public void setNetworkBridgeListener(NetworkBridgeListener listener) { 1478 this.networkBridgeListener = listener; 1479 } 1480 1481 private void fireBridgeFailed(Throwable reason) { 1482 LOG.trace("fire bridge failed, listener: {}", this.networkBridgeListener, reason); 1483 NetworkBridgeListener l = this.networkBridgeListener; 1484 if (l != null && this.bridgeFailed.compareAndSet(false, true)) { 1485 l.bridgeFailed(); 1486 } 1487 } 1488 1489 /** 1490 * @return Returns the dynamicallyIncludedDestinations. 1491 */ 1492 public ActiveMQDestination[] getDynamicallyIncludedDestinations() { 1493 return dynamicallyIncludedDestinations; 1494 } 1495 1496 /** 1497 * @param dynamicallyIncludedDestinations 1498 * The dynamicallyIncludedDestinations to set. 1499 */ 1500 public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) { 1501 this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations; 1502 } 1503 1504 /** 1505 * @return Returns the excludedDestinations. 1506 */ 1507 public ActiveMQDestination[] getExcludedDestinations() { 1508 return excludedDestinations; 1509 } 1510 1511 /** 1512 * @param excludedDestinations The excludedDestinations to set. 1513 */ 1514 public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) { 1515 this.excludedDestinations = excludedDestinations; 1516 } 1517 1518 /** 1519 * @return Returns the staticallyIncludedDestinations. 1520 */ 1521 public ActiveMQDestination[] getStaticallyIncludedDestinations() { 1522 return staticallyIncludedDestinations; 1523 } 1524 1525 /** 1526 * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set. 1527 */ 1528 public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) { 1529 this.staticallyIncludedDestinations = staticallyIncludedDestinations; 1530 } 1531 1532 /** 1533 * @return Returns the durableDestinations. 1534 */ 1535 public ActiveMQDestination[] getDurableDestinations() { 1536 return durableDestinations; 1537 } 1538 1539 /** 1540 * @param durableDestinations The durableDestinations to set. 1541 */ 1542 public void setDurableDestinations(ActiveMQDestination[] durableDestinations) { 1543 this.durableDestinations = durableDestinations; 1544 } 1545 1546 /** 1547 * @return Returns the localBroker. 1548 */ 1549 public Transport getLocalBroker() { 1550 return localBroker; 1551 } 1552 1553 /** 1554 * @return Returns the remoteBroker. 1555 */ 1556 public Transport getRemoteBroker() { 1557 return remoteBroker; 1558 } 1559 1560 /** 1561 * @return the createdByDuplex 1562 */ 1563 public boolean isCreatedByDuplex() { 1564 return this.createdByDuplex; 1565 } 1566 1567 /** 1568 * @param createdByDuplex the createdByDuplex to set 1569 */ 1570 public void setCreatedByDuplex(boolean createdByDuplex) { 1571 this.createdByDuplex = createdByDuplex; 1572 } 1573 1574 @Override 1575 public String getRemoteAddress() { 1576 return remoteBroker.getRemoteAddress(); 1577 } 1578 1579 @Override 1580 public String getLocalAddress() { 1581 return localBroker.getRemoteAddress(); 1582 } 1583 1584 @Override 1585 public String getRemoteBrokerName() { 1586 return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName(); 1587 } 1588 1589 @Override 1590 public String getRemoteBrokerId() { 1591 return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString(); 1592 } 1593 1594 @Override 1595 public String getLocalBrokerName() { 1596 return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName(); 1597 } 1598 1599 @Override 1600 public long getDequeueCounter() { 1601 return dequeueCounter.get(); 1602 } 1603 1604 @Override 1605 public long getEnqueueCounter() { 1606 return enqueueCounter.get(); 1607 } 1608 1609 protected boolean isDuplex() { 1610 return configuration.isDuplex() || createdByDuplex; 1611 } 1612 1613 public ConcurrentMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() { 1614 return subscriptionMapByRemoteId; 1615 } 1616 1617 @Override 1618 public void setBrokerService(BrokerService brokerService) { 1619 this.brokerService = brokerService; 1620 this.localBrokerId = brokerService.getRegionBroker().getBrokerId(); 1621 localBrokerPath[0] = localBrokerId; 1622 } 1623 1624 @Override 1625 public void setMbeanObjectName(ObjectName objectName) { 1626 this.mbeanObjectName = objectName; 1627 } 1628 1629 @Override 1630 public ObjectName getMbeanObjectName() { 1631 return mbeanObjectName; 1632 } 1633 1634 @Override 1635 public void resetStats() { 1636 enqueueCounter.set(0); 1637 dequeueCounter.set(0); 1638 } 1639 1640 /* 1641 * Used to allow for async tasks to await receipt of the BrokerInfo from the local and 1642 * remote sides of the network bridge. 1643 */ 1644 private static class FutureBrokerInfo implements Future<BrokerInfo> { 1645 1646 private final CountDownLatch slot = new CountDownLatch(1); 1647 private final AtomicBoolean disposed; 1648 private volatile BrokerInfo info = null; 1649 1650 public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) { 1651 this.info = info; 1652 this.disposed = disposed; 1653 } 1654 1655 @Override 1656 public boolean cancel(boolean mayInterruptIfRunning) { 1657 slot.countDown(); 1658 return true; 1659 } 1660 1661 @Override 1662 public boolean isCancelled() { 1663 return slot.getCount() == 0 && info == null; 1664 } 1665 1666 @Override 1667 public boolean isDone() { 1668 return info != null; 1669 } 1670 1671 @Override 1672 public BrokerInfo get() throws InterruptedException, ExecutionException { 1673 try { 1674 if (info == null) { 1675 while (!disposed.get()) { 1676 if (slot.await(1, TimeUnit.SECONDS)) { 1677 break; 1678 } 1679 } 1680 } 1681 return info; 1682 } catch (InterruptedException e) { 1683 Thread.currentThread().interrupt(); 1684 LOG.debug("Operation interrupted: {}", e, e); 1685 throw new InterruptedException("Interrupted."); 1686 } 1687 } 1688 1689 @Override 1690 public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 1691 try { 1692 if (info == null) { 1693 long deadline = System.currentTimeMillis() + unit.toMillis(timeout); 1694 1695 while (!disposed.get() || System.currentTimeMillis() < deadline) { 1696 if (slot.await(1, TimeUnit.MILLISECONDS)) { 1697 break; 1698 } 1699 } 1700 if (info == null) { 1701 throw new TimeoutException(); 1702 } 1703 } 1704 return info; 1705 } catch (InterruptedException e) { 1706 throw new InterruptedException("Interrupted."); 1707 } 1708 } 1709 1710 public void set(BrokerInfo info) { 1711 this.info = info; 1712 this.slot.countDown(); 1713 } 1714 } 1715 1716 protected void serviceOutbound(Message message) { 1717 NetworkBridgeListener l = this.networkBridgeListener; 1718 if (l != null) { 1719 l.onOutboundMessage(this, message); 1720 } 1721 } 1722 1723 protected void serviceInboundMessage(Message message) { 1724 NetworkBridgeListener l = this.networkBridgeListener; 1725 if (l != null) { 1726 l.onInboundMessage(this, message); 1727 } 1728 } 1729 1730 protected boolean canDuplexDispatch(Message message) { 1731 boolean result = true; 1732 if (configuration.isCheckDuplicateMessagesOnDuplex()){ 1733 final long producerSequenceId = message.getMessageId().getProducerSequenceId(); 1734 // messages are multiplexed on this producer so we need to query the persistenceAdapter 1735 long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId()); 1736 if (producerSequenceId <= lastStoredForMessageProducer) { 1737 result = false; 1738 LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{ 1739 (LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer 1740 }); 1741 } 1742 } 1743 return result; 1744 } 1745 1746 protected long getStoredSequenceIdForMessage(MessageId messageId) { 1747 try { 1748 return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId()); 1749 } catch (IOException ignored) { 1750 LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored); 1751 } 1752 return -1; 1753 } 1754 1755}