001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.util.ArrayList; 020import java.util.HashMap; 021import java.util.Iterator; 022import java.util.List; 023import java.util.Map; 024import java.util.Set; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.locks.ReentrantReadWriteLock; 027 028import javax.jms.JMSException; 029import org.apache.activemq.DestinationDoesNotExistException; 030import org.apache.activemq.broker.ConnectionContext; 031import org.apache.activemq.broker.ConsumerBrokerExchange; 032import org.apache.activemq.broker.ProducerBrokerExchange; 033import org.apache.activemq.broker.region.policy.PolicyEntry; 034import org.apache.activemq.broker.region.virtual.CompositeDestinationFilter; 035import org.apache.activemq.command.ActiveMQDestination; 036import org.apache.activemq.command.ConsumerControl; 037import org.apache.activemq.command.ConsumerId; 038import org.apache.activemq.command.ConsumerInfo; 039import org.apache.activemq.command.Message; 040import org.apache.activemq.command.MessageAck; 041import org.apache.activemq.command.MessageDispatchNotification; 042import org.apache.activemq.command.MessagePull; 043import org.apache.activemq.command.ProducerInfo; 044import org.apache.activemq.command.RemoveSubscriptionInfo; 045import org.apache.activemq.command.Response; 046import org.apache.activemq.filter.DestinationFilter; 047import org.apache.activemq.filter.DestinationMap; 048import org.apache.activemq.security.SecurityContext; 049import org.apache.activemq.thread.TaskRunnerFactory; 050import org.apache.activemq.usage.SystemUsage; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * 056 */ 057public abstract class AbstractRegion implements Region { 058 059 private static final Logger LOG = LoggerFactory.getLogger(AbstractRegion.class); 060 061 protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>(); 062 protected final DestinationMap destinationMap = new DestinationMap(); 063 protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>(); 064 protected final SystemUsage usageManager; 065 protected final DestinationFactory destinationFactory; 066 protected final DestinationStatistics destinationStatistics; 067 protected final RegionBroker broker; 068 protected boolean autoCreateDestinations = true; 069 protected final TaskRunnerFactory taskRunnerFactory; 070 protected final ReentrantReadWriteLock destinationsLock = new ReentrantReadWriteLock(); 071 protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>(); 072 protected boolean started; 073 074 public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, 075 TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 076 if (broker == null) { 077 throw new IllegalArgumentException("null broker"); 078 } 079 this.broker = broker; 080 this.destinationStatistics = destinationStatistics; 081 this.usageManager = memoryManager; 082 this.taskRunnerFactory = taskRunnerFactory; 083 if (destinationFactory == null) { 084 throw new IllegalArgumentException("null destinationFactory"); 085 } 086 this.destinationFactory = destinationFactory; 087 } 088 089 @Override 090 public final void start() throws Exception { 091 started = true; 092 093 Set<ActiveMQDestination> inactiveDests = getInactiveDestinations(); 094 for (Iterator<ActiveMQDestination> iter = inactiveDests.iterator(); iter.hasNext();) { 095 ActiveMQDestination dest = iter.next(); 096 097 ConnectionContext context = new ConnectionContext(); 098 context.setBroker(broker.getBrokerService().getBroker()); 099 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 100 context.getBroker().addDestination(context, dest, false); 101 } 102 destinationsLock.readLock().lock(); 103 try{ 104 for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) { 105 Destination dest = i.next(); 106 dest.start(); 107 } 108 } finally { 109 destinationsLock.readLock().unlock(); 110 } 111 } 112 113 @Override 114 public void stop() throws Exception { 115 started = false; 116 destinationsLock.readLock().lock(); 117 try{ 118 for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) { 119 Destination dest = i.next(); 120 dest.stop(); 121 } 122 } finally { 123 destinationsLock.readLock().unlock(); 124 } 125 destinations.clear(); 126 } 127 128 @Override 129 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, 130 boolean createIfTemporary) throws Exception { 131 132 destinationsLock.writeLock().lock(); 133 try { 134 Destination dest = destinations.get(destination); 135 if (dest == null) { 136 if (destination.isTemporary() == false || createIfTemporary) { 137 LOG.debug("{} adding destination: {}", broker.getBrokerName(), destination); 138 dest = createDestination(context, destination); 139 // intercept if there is a valid interceptor defined 140 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); 141 if (destinationInterceptor != null) { 142 dest = destinationInterceptor.intercept(dest); 143 } 144 dest.start(); 145 addSubscriptionsForDestination(context, dest); 146 destinations.put(destination, dest); 147 destinationMap.unsynchronizedPut(destination, dest); 148 } 149 if (dest == null) { 150 throw new DestinationDoesNotExistException(destination.getQualifiedName()); 151 } 152 } 153 return dest; 154 } finally { 155 destinationsLock.writeLock().unlock(); 156 } 157 } 158 159 public Map<ConsumerId, Subscription> getSubscriptions() { 160 return subscriptions; 161 } 162 163 protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) 164 throws Exception { 165 166 List<Subscription> rc = new ArrayList<Subscription>(); 167 // Add all consumers that are interested in the destination. 168 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { 169 Subscription sub = iter.next(); 170 if (sub.matches(dest.getActiveMQDestination())) { 171 try { 172 ConnectionContext originalContext = sub.getContext() != null ? sub.getContext() : context; 173 dest.addSubscription(originalContext, sub); 174 rc.add(sub); 175 } catch (SecurityException e) { 176 if (sub.isWildcard()) { 177 LOG.debug("Subscription denied for " + sub + " to destination " + 178 dest.getActiveMQDestination() + ": " + e.getMessage()); 179 } else { 180 throw e; 181 } 182 } 183 } 184 } 185 return rc; 186 187 } 188 189 @Override 190 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) 191 throws Exception { 192 193 // No timeout.. then try to shut down right way, fails if there are 194 // current subscribers. 195 if (timeout == 0) { 196 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { 197 Subscription sub = iter.next(); 198 if (sub.matches(destination) ) { 199 throw new JMSException("Destination: " + destination + " still has an active subscription: " + sub); 200 } 201 } 202 } 203 204 if (timeout > 0) { 205 // TODO: implement a way to notify the subscribers that we want to 206 // take the down 207 // the destination and that they should un-subscribe.. Then wait up 208 // to timeout time before 209 // dropping the subscription. 210 } 211 212 LOG.debug("{} removing destination: {}", broker.getBrokerName(), destination); 213 214 destinationsLock.writeLock().lock(); 215 try { 216 Destination dest = destinations.remove(destination); 217 if (dest != null) { 218 // timeout<0 or we timed out, we now force any remaining 219 // subscriptions to un-subscribe. 220 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { 221 Subscription sub = iter.next(); 222 if (sub.matches(destination)) { 223 dest.removeSubscription(context, sub, 0l); 224 } 225 } 226 destinationMap.unsynchronizedRemove(destination, dest); 227 dispose(context, dest); 228 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); 229 if (destinationInterceptor != null) { 230 destinationInterceptor.remove(dest); 231 } 232 233 } else { 234 LOG.debug("Cannot remove a destination that doesn't exist: {}", destination); 235 } 236 } finally { 237 destinationsLock.writeLock().unlock(); 238 } 239 } 240 241 /** 242 * Provide an exact or wildcard lookup of destinations in the region 243 * 244 * @return a set of matching destination objects. 245 */ 246 @Override 247 @SuppressWarnings("unchecked") 248 public Set<Destination> getDestinations(ActiveMQDestination destination) { 249 destinationsLock.readLock().lock(); 250 try{ 251 return destinationMap.unsynchronizedGet(destination); 252 } finally { 253 destinationsLock.readLock().unlock(); 254 } 255 } 256 257 @Override 258 public Map<ActiveMQDestination, Destination> getDestinationMap() { 259 return destinations; 260 } 261 262 @Override 263 @SuppressWarnings("unchecked") 264 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 265 LOG.debug("{} adding consumer: {} for destination: {}", new Object[]{ broker.getBrokerName(), info.getConsumerId(), info.getDestination() }); 266 ActiveMQDestination destination = info.getDestination(); 267 if (destination != null && !destination.isPattern() && !destination.isComposite()) { 268 // lets auto-create the destination 269 lookup(context, destination,true); 270 } 271 272 Object addGuard; 273 synchronized (consumerChangeMutexMap) { 274 addGuard = consumerChangeMutexMap.get(info.getConsumerId()); 275 if (addGuard == null) { 276 addGuard = new Object(); 277 consumerChangeMutexMap.put(info.getConsumerId(), addGuard); 278 } 279 } 280 synchronized (addGuard) { 281 Subscription o = subscriptions.get(info.getConsumerId()); 282 if (o != null) { 283 LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this."); 284 return o; 285 } 286 287 // We may need to add some destinations that are in persistent store 288 // but not active 289 // in the broker. 290 // 291 // TODO: think about this a little more. This is good cause 292 // destinations are not loaded into 293 // memory until a client needs to use the queue, but a management 294 // agent viewing the 295 // broker will not see a destination that exists in persistent 296 // store. We may want to 297 // eagerly load all destinations into the broker but have an 298 // inactive state for the 299 // destination which has reduced memory usage. 300 // 301 DestinationFilter.parseFilter(info.getDestination()); 302 303 Subscription sub = createSubscription(context, info); 304 305 // At this point we're done directly manipulating subscriptions, 306 // but we need to retain the synchronized block here. Consider 307 // otherwise what would happen if at this point a second 308 // thread added, then removed, as would be allowed with 309 // no mutex held. Remove is only essentially run once 310 // so everything after this point would be leaked. 311 312 // Add the subscription to all the matching queues. 313 // But copy the matches first - to prevent deadlocks 314 List<Destination> addList = new ArrayList<Destination>(); 315 destinationsLock.readLock().lock(); 316 try { 317 for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) { 318 addList.add(dest); 319 } 320 // ensure sub visible to any new dest addSubscriptionsForDestination 321 subscriptions.put(info.getConsumerId(), sub); 322 } finally { 323 destinationsLock.readLock().unlock(); 324 } 325 326 List<Destination> removeList = new ArrayList<Destination>(); 327 for (Destination dest : addList) { 328 try { 329 dest.addSubscription(context, sub); 330 removeList.add(dest); 331 } catch (SecurityException e){ 332 if (sub.isWildcard()) { 333 LOG.debug("Subscription denied for " + sub + " to destination " + 334 dest.getActiveMQDestination() + ": " + e.getMessage()); 335 } else { 336 // remove partial subscriptions 337 for (Destination remove : removeList) { 338 try { 339 remove.removeSubscription(context, sub, info.getLastDeliveredSequenceId()); 340 } catch (Exception ex) { 341 LOG.error("Error unsubscribing " + sub + " from " + remove + ": " + ex.getMessage(), ex); 342 } 343 } 344 subscriptions.remove(info.getConsumerId()); 345 removeList.clear(); 346 throw e; 347 } 348 } 349 } 350 removeList.clear(); 351 352 if (info.isBrowser()) { 353 ((QueueBrowserSubscription) sub).destinationsAdded(); 354 } 355 356 return sub; 357 } 358 } 359 360 /** 361 * Get all the Destinations that are in storage 362 * 363 * @return Set of all stored destinations 364 */ 365 @SuppressWarnings("rawtypes") 366 public Set getDurableDestinations() { 367 return destinationFactory.getDestinations(); 368 } 369 370 /** 371 * @return all Destinations that don't have active consumers 372 */ 373 protected Set<ActiveMQDestination> getInactiveDestinations() { 374 Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations(); 375 destinationsLock.readLock().lock(); 376 try { 377 inactiveDests.removeAll(destinations.keySet()); 378 } finally { 379 destinationsLock.readLock().unlock(); 380 } 381 return inactiveDests; 382 } 383 384 @Override 385 @SuppressWarnings("unchecked") 386 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 387 LOG.debug("{} removing consumer: {} for destination: {}", new Object[]{ broker.getBrokerName(), info.getConsumerId(), info.getDestination() }); 388 389 Subscription sub = subscriptions.remove(info.getConsumerId()); 390 // The sub could be removed elsewhere - see ConnectionSplitBroker 391 if (sub != null) { 392 393 // remove the subscription from all the matching queues. 394 List<Destination> removeList = new ArrayList<Destination>(); 395 destinationsLock.readLock().lock(); 396 try { 397 for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) { 398 removeList.add(dest); 399 } 400 } finally { 401 destinationsLock.readLock().unlock(); 402 } 403 for (Destination dest : removeList) { 404 dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId()); 405 } 406 407 destroySubscription(sub); 408 } 409 synchronized (consumerChangeMutexMap) { 410 consumerChangeMutexMap.remove(info.getConsumerId()); 411 } 412 } 413 414 protected void destroySubscription(Subscription sub) { 415 sub.destroy(); 416 } 417 418 @Override 419 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 420 throw new JMSException("Invalid operation."); 421 } 422 423 @Override 424 public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 425 final ConnectionContext context = producerExchange.getConnectionContext(); 426 427 if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) { 428 final Destination regionDestination = lookup(context, messageSend.getDestination(),false); 429 producerExchange.setRegionDestination(regionDestination); 430 } 431 432 producerExchange.getRegionDestination().send(producerExchange, messageSend); 433 434 if (producerExchange.getProducerState() != null && producerExchange.getProducerState().getInfo() != null){ 435 producerExchange.getProducerState().getInfo().incrementSentCount(); 436 } 437 } 438 439 @Override 440 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 441 Subscription sub = consumerExchange.getSubscription(); 442 if (sub == null) { 443 sub = subscriptions.get(ack.getConsumerId()); 444 if (sub == null) { 445 if (!consumerExchange.getConnectionContext().isInRecoveryMode()) { 446 LOG.warn("Ack for non existent subscription, ack: {}", ack); 447 throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId()); 448 } else { 449 LOG.debug("Ack for non existent subscription in recovery, ack: {}", ack); 450 return; 451 } 452 } 453 consumerExchange.setSubscription(sub); 454 } 455 sub.acknowledge(consumerExchange.getConnectionContext(), ack); 456 } 457 458 @Override 459 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 460 Subscription sub = subscriptions.get(pull.getConsumerId()); 461 if (sub == null) { 462 throw new IllegalArgumentException("The subscription does not exist: " + pull.getConsumerId()); 463 } 464 return sub.pullMessage(context, pull); 465 } 466 467 protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception { 468 Destination dest = null; 469 470 destinationsLock.readLock().lock(); 471 try { 472 dest = destinations.get(destination); 473 } finally { 474 destinationsLock.readLock().unlock(); 475 } 476 477 if (dest == null) { 478 if (isAutoCreateDestinations()) { 479 // Try to auto create the destination... re-invoke broker 480 // from the 481 // top so that the proper security checks are performed. 482 dest = context.getBroker().addDestination(context, destination, createTemporary); 483 } 484 485 if (dest == null) { 486 throw new JMSException("The destination " + destination + " does not exist."); 487 } 488 } 489 return dest; 490 } 491 492 @Override 493 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 494 Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId()); 495 if (sub != null) { 496 sub.processMessageDispatchNotification(messageDispatchNotification); 497 } else { 498 throw new JMSException("Slave broker out of sync with master - Subscription: " 499 + messageDispatchNotification.getConsumerId() + " on " 500 + messageDispatchNotification.getDestination() + " does not exist for dispatch of message: " 501 + messageDispatchNotification.getMessageId()); 502 } 503 } 504 505 /* 506 * For a Queue/TempQueue, dispatch order is imperative to match acks, so the 507 * dispatch is deferred till the notification to ensure that the 508 * subscription chosen by the master is used. AMQ-2102 509 */ 510 protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification) 511 throws Exception { 512 Destination dest = null; 513 destinationsLock.readLock().lock(); 514 try { 515 dest = destinations.get(messageDispatchNotification.getDestination()); 516 } finally { 517 destinationsLock.readLock().unlock(); 518 } 519 520 if (dest != null) { 521 dest.processDispatchNotification(messageDispatchNotification); 522 } else { 523 throw new JMSException("Slave broker out of sync with master - Destination: " 524 + messageDispatchNotification.getDestination() + " does not exist for consumer " 525 + messageDispatchNotification.getConsumerId() + " with message: " 526 + messageDispatchNotification.getMessageId()); 527 } 528 } 529 530 @Override 531 public void gc() { 532 for (Subscription sub : subscriptions.values()) { 533 sub.gc(); 534 } 535 536 destinationsLock.readLock().lock(); 537 try { 538 for (Destination dest : destinations.values()) { 539 dest.gc(); 540 } 541 } finally { 542 destinationsLock.readLock().unlock(); 543 } 544 } 545 546 protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception; 547 548 protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) 549 throws Exception { 550 return destinationFactory.createDestination(context, destination, destinationStatistics); 551 } 552 553 public boolean isAutoCreateDestinations() { 554 return autoCreateDestinations; 555 } 556 557 public void setAutoCreateDestinations(boolean autoCreateDestinations) { 558 this.autoCreateDestinations = autoCreateDestinations; 559 } 560 561 @Override 562 @SuppressWarnings("unchecked") 563 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 564 destinationsLock.readLock().lock(); 565 try { 566 for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) { 567 dest.addProducer(context, info); 568 } 569 } finally { 570 destinationsLock.readLock().unlock(); 571 } 572 } 573 574 /** 575 * Removes a Producer. 576 * 577 * @param context 578 * the environment the operation is being executed under. 579 * @throws Exception 580 * TODO 581 */ 582 @Override 583 @SuppressWarnings("unchecked") 584 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 585 destinationsLock.readLock().lock(); 586 try { 587 for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) { 588 dest.removeProducer(context, info); 589 } 590 } finally { 591 destinationsLock.readLock().unlock(); 592 } 593 } 594 595 protected void dispose(ConnectionContext context, Destination dest) throws Exception { 596 dest.dispose(context); 597 dest.stop(); 598 destinationFactory.removeDestination(dest); 599 } 600 601 @Override 602 public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { 603 Subscription sub = subscriptions.get(control.getConsumerId()); 604 if (sub != null && sub instanceof AbstractSubscription) { 605 ((AbstractSubscription) sub).setPrefetchSize(control.getPrefetch()); 606 if (broker.getDestinationPolicy() != null) { 607 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(control.getDestination()); 608 if (entry != null) { 609 entry.configurePrefetch(sub); 610 } 611 } 612 LOG.debug("setting prefetch: {}, on subscription: {}; resulting value: {}", new Object[]{ control.getPrefetch(), control.getConsumerId(), sub.getConsumerInfo().getPrefetchSize()}); 613 try { 614 lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup(); 615 } catch (Exception e) { 616 LOG.warn("failed to deliver post consumerControl dispatch-wakeup, to destination: {}", control.getDestination(), e); 617 } 618 } 619 } 620 621 @Override 622 public void reapplyInterceptor() { 623 destinationsLock.writeLock().lock(); 624 try { 625 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); 626 Map<ActiveMQDestination, Destination> map = getDestinationMap(); 627 for (ActiveMQDestination key : map.keySet()) { 628 Destination destination = map.get(key); 629 if (destination instanceof CompositeDestinationFilter) { 630 destination = ((CompositeDestinationFilter) destination).next; 631 } 632 if (destinationInterceptor != null) { 633 destination = destinationInterceptor.intercept(destination); 634 } 635 getDestinationMap().put(key, destination); 636 destinations.put(key, destination); 637 } 638 } finally { 639 destinationsLock.writeLock().unlock(); 640 } 641 } 642}