001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.net.URI; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Locale; 026import java.util.Map; 027import java.util.Set; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.CopyOnWriteArrayList; 030import java.util.concurrent.ThreadPoolExecutor; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.locks.ReentrantReadWriteLock; 033 034import javax.jms.InvalidClientIDException; 035import javax.jms.JMSException; 036 037import org.apache.activemq.broker.Broker; 038import org.apache.activemq.broker.BrokerService; 039import org.apache.activemq.broker.Connection; 040import org.apache.activemq.broker.ConnectionContext; 041import org.apache.activemq.broker.ConsumerBrokerExchange; 042import org.apache.activemq.broker.EmptyBroker; 043import org.apache.activemq.broker.ProducerBrokerExchange; 044import org.apache.activemq.broker.TransportConnection; 045import org.apache.activemq.broker.TransportConnector; 046import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 047import org.apache.activemq.broker.region.policy.PolicyMap; 048import org.apache.activemq.command.ActiveMQDestination; 049import org.apache.activemq.command.ActiveMQMessage; 050import org.apache.activemq.command.BrokerId; 051import org.apache.activemq.command.BrokerInfo; 052import org.apache.activemq.command.ConnectionId; 053import org.apache.activemq.command.ConnectionInfo; 054import org.apache.activemq.command.ConsumerControl; 055import org.apache.activemq.command.ConsumerInfo; 056import org.apache.activemq.command.DestinationInfo; 057import org.apache.activemq.command.Message; 058import org.apache.activemq.command.MessageAck; 059import org.apache.activemq.command.MessageDispatch; 060import org.apache.activemq.command.MessageDispatchNotification; 061import org.apache.activemq.command.MessagePull; 062import org.apache.activemq.command.ProducerInfo; 063import org.apache.activemq.command.RemoveSubscriptionInfo; 064import org.apache.activemq.command.Response; 065import org.apache.activemq.command.TransactionId; 066import org.apache.activemq.state.ConnectionState; 067import org.apache.activemq.store.PListStore; 068import org.apache.activemq.thread.Scheduler; 069import org.apache.activemq.thread.TaskRunnerFactory; 070import org.apache.activemq.transport.TransmitCallback; 071import org.apache.activemq.usage.SystemUsage; 072import org.apache.activemq.util.BrokerSupport; 073import org.apache.activemq.util.IdGenerator; 074import org.apache.activemq.util.InetAddressUtil; 075import org.apache.activemq.util.LongSequenceGenerator; 076import org.apache.activemq.util.ServiceStopper; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080/** 081 * Routes Broker operations to the correct messaging regions for processing. 082 */ 083public class RegionBroker extends EmptyBroker { 084 public static final String ORIGINAL_EXPIRATION = "originalExpiration"; 085 private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class); 086 private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator(); 087 088 protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); 089 protected DestinationFactory destinationFactory; 090 protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>()); 091 092 private final Region queueRegion; 093 private final Region topicRegion; 094 private final Region tempQueueRegion; 095 private final Region tempTopicRegion; 096 protected final BrokerService brokerService; 097 private boolean started; 098 private boolean keepDurableSubsActive; 099 100 private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>(); 101 private final Map<ActiveMQDestination, ActiveMQDestination> destinationGate = new HashMap<ActiveMQDestination, ActiveMQDestination>(); 102 private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>(); 103 private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap<BrokerId, BrokerInfo>(); 104 105 private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); 106 private BrokerId brokerId; 107 private String brokerName; 108 private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>(); 109 private final DestinationInterceptor destinationInterceptor; 110 private ConnectionContext adminConnectionContext; 111 private final Scheduler scheduler; 112 private final ThreadPoolExecutor executor; 113 private boolean allowTempAutoCreationOnSend; 114 115 private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock(); 116 private final TaskRunnerFactory taskRunnerFactory; 117 private final AtomicBoolean purgeInactiveDestinationsTaskInProgress = new AtomicBoolean(false); 118 private final Runnable purgeInactiveDestinationsTask = new Runnable() { 119 @Override 120 public void run() { 121 if (purgeInactiveDestinationsTaskInProgress.compareAndSet(false, true)) { 122 taskRunnerFactory.execute(purgeInactiveDestinationsWork); 123 } 124 } 125 }; 126 private final Runnable purgeInactiveDestinationsWork = new Runnable() { 127 @Override 128 public void run() { 129 try { 130 purgeInactiveDestinations(); 131 } catch (Throwable ignored) { 132 LOG.error("Unexpected exception on purgeInactiveDestinations {}", this, ignored); 133 } finally { 134 purgeInactiveDestinationsTaskInProgress.set(false); 135 } 136 } 137 }; 138 139 public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory, 140 DestinationInterceptor destinationInterceptor, Scheduler scheduler, ThreadPoolExecutor executor) throws IOException { 141 this.brokerService = brokerService; 142 this.executor = executor; 143 this.scheduler = scheduler; 144 if (destinationFactory == null) { 145 throw new IllegalArgumentException("null destinationFactory"); 146 } 147 this.sequenceGenerator.setLastSequenceId(destinationFactory.getLastMessageBrokerSequenceId()); 148 this.destinationFactory = destinationFactory; 149 queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory); 150 topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory); 151 this.destinationInterceptor = destinationInterceptor; 152 tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory); 153 tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory); 154 this.taskRunnerFactory = taskRunnerFactory; 155 } 156 157 @Override 158 public Map<ActiveMQDestination, Destination> getDestinationMap() { 159 Map<ActiveMQDestination, Destination> answer = new HashMap<ActiveMQDestination, Destination>(getQueueRegion().getDestinationMap()); 160 answer.putAll(getTopicRegion().getDestinationMap()); 161 return answer; 162 } 163 164 @Override 165 public Map<ActiveMQDestination, Destination> getDestinationMap(ActiveMQDestination destination) { 166 try { 167 return getRegion(destination).getDestinationMap(); 168 } catch (JMSException jmse) { 169 return Collections.emptyMap(); 170 } 171 } 172 173 @Override 174 public Set<Destination> getDestinations(ActiveMQDestination destination) { 175 try { 176 return getRegion(destination).getDestinations(destination); 177 } catch (JMSException jmse) { 178 return Collections.emptySet(); 179 } 180 } 181 182 @Override 183 @SuppressWarnings("rawtypes") 184 public Broker getAdaptor(Class type) { 185 if (type.isInstance(this)) { 186 return this; 187 } 188 return null; 189 } 190 191 public Region getQueueRegion() { 192 return queueRegion; 193 } 194 195 public Region getTempQueueRegion() { 196 return tempQueueRegion; 197 } 198 199 public Region getTempTopicRegion() { 200 return tempTopicRegion; 201 } 202 203 public Region getTopicRegion() { 204 return topicRegion; 205 } 206 207 protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 208 return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 209 } 210 211 protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 212 return new TempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 213 } 214 215 protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 216 return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 217 } 218 219 protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 220 return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 221 } 222 223 @Override 224 public void start() throws Exception { 225 started = true; 226 queueRegion.start(); 227 topicRegion.start(); 228 tempQueueRegion.start(); 229 tempTopicRegion.start(); 230 int period = this.brokerService.getSchedulePeriodForDestinationPurge(); 231 if (period > 0) { 232 this.scheduler.executePeriodically(purgeInactiveDestinationsTask, period); 233 } 234 } 235 236 @Override 237 public void stop() throws Exception { 238 started = false; 239 this.scheduler.cancel(purgeInactiveDestinationsTask); 240 ServiceStopper ss = new ServiceStopper(); 241 doStop(ss); 242 ss.throwFirstException(); 243 // clear the state 244 clientIdSet.clear(); 245 connections.clear(); 246 destinations.clear(); 247 brokerInfos.clear(); 248 } 249 250 public PolicyMap getDestinationPolicy() { 251 return brokerService != null ? brokerService.getDestinationPolicy() : null; 252 } 253 254 public ConnectionContext getConnectionContext(String clientId) { 255 return clientIdSet.get(clientId); 256 } 257 258 @Override 259 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 260 String clientId = info.getClientId(); 261 if (clientId == null) { 262 throw new InvalidClientIDException("No clientID specified for connection request"); 263 } 264 265 ConnectionContext oldContext = null; 266 267 synchronized (clientIdSet) { 268 oldContext = clientIdSet.get(clientId); 269 if (oldContext != null) { 270 if (context.isAllowLinkStealing()) { 271 clientIdSet.put(clientId, context); 272 } else { 273 throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " 274 + oldContext.getConnection().getRemoteAddress()); 275 } 276 } else { 277 clientIdSet.put(clientId, context); 278 } 279 } 280 281 if (oldContext != null) { 282 if (oldContext.getConnection() != null) { 283 Connection connection = oldContext.getConnection(); 284 LOG.warn("Stealing link for clientId {} From Connection {}", clientId, oldContext.getConnection()); 285 if (connection instanceof TransportConnection) { 286 TransportConnection transportConnection = (TransportConnection) connection; 287 transportConnection.stopAsync(new IOException("Stealing link for clientId " + clientId + " From Connection " + oldContext.getConnection().getConnectionId())); 288 } else { 289 connection.stop(); 290 } 291 } else { 292 LOG.error("No Connection found for {}", oldContext); 293 } 294 } 295 296 connections.add(context.getConnection()); 297 } 298 299 @Override 300 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 301 String clientId = info.getClientId(); 302 if (clientId == null) { 303 throw new InvalidClientIDException("No clientID specified for connection disconnect request"); 304 } 305 synchronized (clientIdSet) { 306 ConnectionContext oldValue = clientIdSet.get(clientId); 307 // we may be removing the duplicate connection, not the first connection to be created 308 // so lets check that their connection IDs are the same 309 if (oldValue == context) { 310 if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) { 311 clientIdSet.remove(clientId); 312 } 313 } 314 } 315 connections.remove(context.getConnection()); 316 } 317 318 protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) { 319 return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2)); 320 } 321 322 @Override 323 public Connection[] getClients() throws Exception { 324 ArrayList<Connection> l = new ArrayList<Connection>(connections); 325 Connection rc[] = new Connection[l.size()]; 326 l.toArray(rc); 327 return rc; 328 } 329 330 @Override 331 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemp) throws Exception { 332 333 Destination answer; 334 335 answer = destinations.get(destination); 336 if (answer != null) { 337 return answer; 338 } 339 340 synchronized (destinationGate) { 341 answer = destinations.get(destination); 342 if (answer != null) { 343 return answer; 344 } 345 346 if (destinationGate.get(destination) != null) { 347 // Guard against spurious wakeup. 348 while (destinationGate.containsKey(destination)) { 349 destinationGate.wait(); 350 } 351 answer = destinations.get(destination); 352 if (answer != null) { 353 return answer; 354 } else { 355 // In case of intermediate remove or add failure 356 destinationGate.put(destination, destination); 357 } 358 } 359 } 360 361 try { 362 boolean create = true; 363 if (destination.isTemporary()) { 364 create = createIfTemp; 365 } 366 answer = getRegion(destination).addDestination(context, destination, create); 367 destinations.put(destination, answer); 368 } finally { 369 synchronized (destinationGate) { 370 destinationGate.remove(destination); 371 destinationGate.notifyAll(); 372 } 373 } 374 375 return answer; 376 } 377 378 @Override 379 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 380 if (destinations.containsKey(destination)) { 381 getRegion(destination).removeDestination(context, destination, timeout); 382 destinations.remove(destination); 383 } 384 } 385 386 @Override 387 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 388 addDestination(context, info.getDestination(), true); 389 390 } 391 392 @Override 393 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 394 removeDestination(context, info.getDestination(), info.getTimeout()); 395 } 396 397 @Override 398 public ActiveMQDestination[] getDestinations() throws Exception { 399 ArrayList<ActiveMQDestination> l; 400 401 l = new ArrayList<ActiveMQDestination>(getDestinationMap().keySet()); 402 403 ActiveMQDestination rc[] = new ActiveMQDestination[l.size()]; 404 l.toArray(rc); 405 return rc; 406 } 407 408 @Override 409 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 410 ActiveMQDestination destination = info.getDestination(); 411 if (destination != null) { 412 inactiveDestinationsPurgeLock.readLock().lock(); 413 try { 414 // This seems to cause the destination to be added but without 415 // advisories firing... 416 context.getBroker().addDestination(context, destination, isAllowTempAutoCreationOnSend()); 417 getRegion(destination).addProducer(context, info); 418 } finally { 419 inactiveDestinationsPurgeLock.readLock().unlock(); 420 } 421 } 422 } 423 424 @Override 425 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 426 ActiveMQDestination destination = info.getDestination(); 427 if (destination != null) { 428 inactiveDestinationsPurgeLock.readLock().lock(); 429 try { 430 getRegion(destination).removeProducer(context, info); 431 } finally { 432 inactiveDestinationsPurgeLock.readLock().unlock(); 433 } 434 } 435 } 436 437 @Override 438 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 439 ActiveMQDestination destination = info.getDestination(); 440 if (destinationInterceptor != null) { 441 destinationInterceptor.create(this, context, destination); 442 } 443 inactiveDestinationsPurgeLock.readLock().lock(); 444 try { 445 return getRegion(destination).addConsumer(context, info); 446 } finally { 447 inactiveDestinationsPurgeLock.readLock().unlock(); 448 } 449 } 450 451 @Override 452 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 453 ActiveMQDestination destination = info.getDestination(); 454 inactiveDestinationsPurgeLock.readLock().lock(); 455 try { 456 getRegion(destination).removeConsumer(context, info); 457 } finally { 458 inactiveDestinationsPurgeLock.readLock().unlock(); 459 } 460 } 461 462 @Override 463 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 464 inactiveDestinationsPurgeLock.readLock().lock(); 465 try { 466 topicRegion.removeSubscription(context, info); 467 } finally { 468 inactiveDestinationsPurgeLock.readLock().unlock(); 469 470 } 471 } 472 473 @Override 474 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { 475 ActiveMQDestination destination = message.getDestination(); 476 message.setBrokerInTime(System.currentTimeMillis()); 477 if (producerExchange.isMutable() || producerExchange.getRegion() == null 478 || (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed())) { 479 // ensure the destination is registered with the RegionBroker 480 producerExchange.getConnectionContext().getBroker() 481 .addDestination(producerExchange.getConnectionContext(), destination, isAllowTempAutoCreationOnSend()); 482 producerExchange.setRegion(getRegion(destination)); 483 producerExchange.setRegionDestination(null); 484 } 485 486 producerExchange.getRegion().send(producerExchange, message); 487 488 // clean up so these references aren't kept (possible leak) in the producer exchange 489 // especially since temps are transitory 490 if (producerExchange.isMutable()) { 491 producerExchange.setRegionDestination(null); 492 producerExchange.setRegion(null); 493 } 494 } 495 496 @Override 497 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 498 if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) { 499 ActiveMQDestination destination = ack.getDestination(); 500 consumerExchange.setRegion(getRegion(destination)); 501 } 502 consumerExchange.getRegion().acknowledge(consumerExchange, ack); 503 } 504 505 public Region getRegion(ActiveMQDestination destination) throws JMSException { 506 switch (destination.getDestinationType()) { 507 case ActiveMQDestination.QUEUE_TYPE: 508 return queueRegion; 509 case ActiveMQDestination.TOPIC_TYPE: 510 return topicRegion; 511 case ActiveMQDestination.TEMP_QUEUE_TYPE: 512 return tempQueueRegion; 513 case ActiveMQDestination.TEMP_TOPIC_TYPE: 514 return tempTopicRegion; 515 default: 516 throw createUnknownDestinationTypeException(destination); 517 } 518 } 519 520 @Override 521 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 522 ActiveMQDestination destination = pull.getDestination(); 523 return getRegion(destination).messagePull(context, pull); 524 } 525 526 @Override 527 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { 528 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 529 } 530 531 @Override 532 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 533 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 534 } 535 536 @Override 537 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 538 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 539 } 540 541 @Override 542 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 543 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 544 } 545 546 @Override 547 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 548 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 549 } 550 551 @Override 552 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { 553 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 554 } 555 556 @Override 557 public void gc() { 558 queueRegion.gc(); 559 topicRegion.gc(); 560 } 561 562 @Override 563 public BrokerId getBrokerId() { 564 if (brokerId == null) { 565 brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId()); 566 } 567 return brokerId; 568 } 569 570 public void setBrokerId(BrokerId brokerId) { 571 this.brokerId = brokerId; 572 } 573 574 @Override 575 public String getBrokerName() { 576 if (brokerName == null) { 577 try { 578 brokerName = InetAddressUtil.getLocalHostName().toLowerCase(Locale.ENGLISH); 579 } catch (Exception e) { 580 brokerName = "localhost"; 581 } 582 } 583 return brokerName; 584 } 585 586 public void setBrokerName(String brokerName) { 587 this.brokerName = brokerName; 588 } 589 590 public DestinationStatistics getDestinationStatistics() { 591 return destinationStatistics; 592 } 593 594 protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) { 595 return new JMSException("Unknown destination type: " + destination.getDestinationType()); 596 } 597 598 @Override 599 public synchronized void addBroker(Connection connection, BrokerInfo info) { 600 BrokerInfo existing = brokerInfos.get(info.getBrokerId()); 601 if (existing == null) { 602 existing = info.copy(); 603 existing.setPeerBrokerInfos(null); 604 brokerInfos.put(info.getBrokerId(), existing); 605 } 606 existing.incrementRefCount(); 607 LOG.debug("{} addBroker: {} brokerInfo size: {}", new Object[]{ getBrokerName(), info.getBrokerName(), brokerInfos.size() }); 608 addBrokerInClusterUpdate(info); 609 } 610 611 @Override 612 public synchronized void removeBroker(Connection connection, BrokerInfo info) { 613 if (info != null) { 614 BrokerInfo existing = brokerInfos.get(info.getBrokerId()); 615 if (existing != null && existing.decrementRefCount() == 0) { 616 brokerInfos.remove(info.getBrokerId()); 617 } 618 LOG.debug("{} removeBroker: {} brokerInfo size: {}", new Object[]{ getBrokerName(), info.getBrokerName(), brokerInfos.size()}); 619 // When stopping don't send cluster updates since we are the one's tearing down 620 // our own bridges. 621 if (!brokerService.isStopping()) { 622 removeBrokerInClusterUpdate(info); 623 } 624 } 625 } 626 627 @Override 628 public synchronized BrokerInfo[] getPeerBrokerInfos() { 629 BrokerInfo[] result = new BrokerInfo[brokerInfos.size()]; 630 result = brokerInfos.values().toArray(result); 631 return result; 632 } 633 634 @Override 635 public void preProcessDispatch(final MessageDispatch messageDispatch) { 636 final Message message = messageDispatch.getMessage(); 637 if (message != null) { 638 long endTime = System.currentTimeMillis(); 639 message.setBrokerOutTime(endTime); 640 if (getBrokerService().isEnableStatistics()) { 641 long totalTime = endTime - message.getBrokerInTime(); 642 ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime); 643 } 644 if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() && !message.isRedelivered()) { 645 final int originalValue = message.getRedeliveryCounter(); 646 message.incrementRedeliveryCounter(); 647 try { 648 if (message.isPersistent()) { 649 ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message); 650 } 651 messageDispatch.setTransmitCallback(new TransmitCallback() { 652 // dispatch is considered a delivery, so update sub state post dispatch otherwise 653 // on a disconnect/reconnect cached messages will not reflect initial delivery attempt 654 final TransmitCallback delegate = messageDispatch.getTransmitCallback(); 655 @Override 656 public void onSuccess() { 657 message.incrementRedeliveryCounter(); 658 if (delegate != null) { 659 delegate.onSuccess(); 660 } 661 } 662 663 @Override 664 public void onFailure() { 665 if (delegate != null) { 666 delegate.onFailure(); 667 } 668 } 669 }); 670 } catch (IOException error) { 671 RuntimeException runtimeException = new RuntimeException("Failed to persist JMSRedeliveryFlag on " + message.getMessageId() + " in " + message.getDestination(), error); 672 LOG.warn(runtimeException.getLocalizedMessage(), runtimeException); 673 throw runtimeException; 674 } finally { 675 message.setRedeliveryCounter(originalValue); 676 } 677 } 678 } 679 } 680 681 @Override 682 public void postProcessDispatch(MessageDispatch messageDispatch) { 683 } 684 685 @Override 686 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 687 ActiveMQDestination destination = messageDispatchNotification.getDestination(); 688 getRegion(destination).processDispatchNotification(messageDispatchNotification); 689 } 690 691 @Override 692 public boolean isStopped() { 693 return !started; 694 } 695 696 @Override 697 public Set<ActiveMQDestination> getDurableDestinations() { 698 return destinationFactory.getDestinations(); 699 } 700 701 protected void doStop(ServiceStopper ss) { 702 ss.stop(queueRegion); 703 ss.stop(topicRegion); 704 ss.stop(tempQueueRegion); 705 ss.stop(tempTopicRegion); 706 } 707 708 public boolean isKeepDurableSubsActive() { 709 return keepDurableSubsActive; 710 } 711 712 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 713 this.keepDurableSubsActive = keepDurableSubsActive; 714 ((TopicRegion) topicRegion).setKeepDurableSubsActive(keepDurableSubsActive); 715 } 716 717 public DestinationInterceptor getDestinationInterceptor() { 718 return destinationInterceptor; 719 } 720 721 @Override 722 public ConnectionContext getAdminConnectionContext() { 723 return adminConnectionContext; 724 } 725 726 @Override 727 public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { 728 this.adminConnectionContext = adminConnectionContext; 729 } 730 731 public Map<ConnectionId, ConnectionState> getConnectionStates() { 732 return connectionStates; 733 } 734 735 @Override 736 public PListStore getTempDataStore() { 737 return brokerService.getTempDataStore(); 738 } 739 740 @Override 741 public URI getVmConnectorURI() { 742 return brokerService.getVmConnectorURI(); 743 } 744 745 @Override 746 public void brokerServiceStarted() { 747 } 748 749 @Override 750 public BrokerService getBrokerService() { 751 return brokerService; 752 } 753 754 @Override 755 public boolean isExpired(MessageReference messageReference) { 756 return messageReference.canProcessAsExpired(); 757 } 758 759 private boolean stampAsExpired(Message message) throws IOException { 760 boolean stamped = false; 761 if (message.getProperty(ORIGINAL_EXPIRATION) == null) { 762 long expiration = message.getExpiration(); 763 message.setProperty(ORIGINAL_EXPIRATION, new Long(expiration)); 764 stamped = true; 765 } 766 return stamped; 767 } 768 769 @Override 770 public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) { 771 LOG.debug("Message expired {}", node); 772 getRoot().sendToDeadLetterQueue(context, node, subscription, new Throwable("Message Expired. Expiration:" + node.getExpiration())); 773 } 774 775 @Override 776 public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference node, Subscription subscription, Throwable poisonCause) { 777 try { 778 if (node != null) { 779 Message message = node.getMessage(); 780 if (message != null && node.getRegionDestination() != null) { 781 DeadLetterStrategy deadLetterStrategy = ((Destination) node.getRegionDestination()).getDeadLetterStrategy(); 782 if (deadLetterStrategy != null) { 783 if (deadLetterStrategy.isSendToDeadLetterQueue(message)) { 784 ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(message, subscription); 785 // Prevent a DLQ loop where same message is sent from a DLQ back to itself 786 if (deadLetterDestination.equals(message.getDestination())) { 787 LOG.debug("Not re-adding to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination()); 788 return false; 789 } 790 791 // message may be inflight to other subscriptions so do not modify 792 message = message.copy(); 793 long dlqExpiration = deadLetterStrategy.getExpiration(); 794 if (dlqExpiration > 0) { 795 dlqExpiration += System.currentTimeMillis(); 796 } else { 797 stampAsExpired(message); 798 } 799 message.setExpiration(dlqExpiration); 800 if (!message.isPersistent()) { 801 message.setPersistent(true); 802 message.setProperty("originalDeliveryMode", "NON_PERSISTENT"); 803 } 804 if (poisonCause != null) { 805 message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, 806 poisonCause.toString()); 807 } 808 // The original destination and transaction id do 809 // not get filled when the message is first sent, 810 // it is only populated if the message is routed to 811 // another destination like the DLQ 812 ConnectionContext adminContext = context; 813 if (context.getSecurityContext() == null || !context.getSecurityContext().isBrokerContext()) { 814 adminContext = BrokerSupport.getConnectionContext(this); 815 } 816 addDestination(adminContext, deadLetterDestination, false).getActiveMQDestination().setDLQ(true); 817 BrokerSupport.resendNoCopy(adminContext, message, deadLetterDestination); 818 return true; 819 } 820 } else { 821 LOG.debug("Dead Letter message with no DLQ strategy in place, message id: {}, destination: {}", message.getMessageId(), message.getDestination()); 822 } 823 } 824 } 825 } catch (Exception e) { 826 LOG.warn("Caught an exception sending to DLQ: {}", node, e); 827 } 828 829 return false; 830 } 831 832 @Override 833 public Broker getRoot() { 834 try { 835 return getBrokerService().getBroker(); 836 } catch (Exception e) { 837 LOG.error("Trying to get Root Broker", e); 838 throw new RuntimeException("The broker from the BrokerService should not throw an exception"); 839 } 840 } 841 842 /** 843 * @return the broker sequence id 844 */ 845 @Override 846 public long getBrokerSequenceId() { 847 synchronized (sequenceGenerator) { 848 return sequenceGenerator.getNextSequenceId(); 849 } 850 } 851 852 @Override 853 public Scheduler getScheduler() { 854 return this.scheduler; 855 } 856 857 @Override 858 public ThreadPoolExecutor getExecutor() { 859 return this.executor; 860 } 861 862 @Override 863 public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { 864 ActiveMQDestination destination = control.getDestination(); 865 try { 866 getRegion(destination).processConsumerControl(consumerExchange, control); 867 } catch (JMSException jmse) { 868 LOG.warn("unmatched destination: {}, in consumerControl: {}", destination, control); 869 } 870 } 871 872 protected void addBrokerInClusterUpdate(BrokerInfo info) { 873 List<TransportConnector> connectors = this.brokerService.getTransportConnectors(); 874 for (TransportConnector connector : connectors) { 875 if (connector.isUpdateClusterClients()) { 876 connector.addPeerBroker(info); 877 connector.updateClientClusterInfo(); 878 } 879 } 880 } 881 882 protected void removeBrokerInClusterUpdate(BrokerInfo info) { 883 List<TransportConnector> connectors = this.brokerService.getTransportConnectors(); 884 for (TransportConnector connector : connectors) { 885 if (connector.isUpdateClusterClients() && connector.isUpdateClusterClientsOnRemove()) { 886 connector.removePeerBroker(info); 887 connector.updateClientClusterInfo(); 888 } 889 } 890 } 891 892 protected void purgeInactiveDestinations() { 893 inactiveDestinationsPurgeLock.writeLock().lock(); 894 try { 895 List<Destination> list = new ArrayList<Destination>(); 896 Map<ActiveMQDestination, Destination> map = getDestinationMap(); 897 if (isAllowTempAutoCreationOnSend()) { 898 map.putAll(tempQueueRegion.getDestinationMap()); 899 map.putAll(tempTopicRegion.getDestinationMap()); 900 } 901 long maxPurgedDests = this.brokerService.getMaxPurgedDestinationsPerSweep(); 902 long timeStamp = System.currentTimeMillis(); 903 for (Destination d : map.values()) { 904 d.markForGC(timeStamp); 905 if (d.canGC()) { 906 list.add(d); 907 if (maxPurgedDests > 0 && list.size() == maxPurgedDests) { 908 break; 909 } 910 } 911 } 912 913 if (!list.isEmpty()) { 914 ConnectionContext context = BrokerSupport.getConnectionContext(this); 915 context.setBroker(this); 916 917 for (Destination dest : list) { 918 Logger log = LOG; 919 if (dest instanceof BaseDestination) { 920 log = ((BaseDestination) dest).getLog(); 921 } 922 log.info("{} Inactive for longer than {} ms - removing ...", dest.getName(), dest.getInactiveTimeoutBeforeGC()); 923 try { 924 getRoot().removeDestination(context, dest.getActiveMQDestination(), isAllowTempAutoCreationOnSend() ? 1 : 0); 925 } catch (Throwable e) { 926 LOG.error("Failed to remove inactive destination {}", dest, e); 927 } 928 } 929 } 930 } finally { 931 inactiveDestinationsPurgeLock.writeLock().unlock(); 932 } 933 } 934 935 public boolean isAllowTempAutoCreationOnSend() { 936 return allowTempAutoCreationOnSend; 937 } 938 939 public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) { 940 this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend; 941 } 942 943 @Override 944 public void reapplyInterceptor() { 945 queueRegion.reapplyInterceptor(); 946 topicRegion.reapplyInterceptor(); 947 tempQueueRegion.reapplyInterceptor(); 948 tempTopicRegion.reapplyInterceptor(); 949 } 950}