001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.util.ArrayList;
020import java.util.HashMap;
021import java.util.Iterator;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.locks.ReentrantReadWriteLock;
027
028import javax.jms.JMSException;
029import org.apache.activemq.DestinationDoesNotExistException;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.broker.ConsumerBrokerExchange;
032import org.apache.activemq.broker.ProducerBrokerExchange;
033import org.apache.activemq.broker.region.policy.PolicyEntry;
034import org.apache.activemq.broker.region.virtual.CompositeDestinationFilter;
035import org.apache.activemq.command.ActiveMQDestination;
036import org.apache.activemq.command.ConsumerControl;
037import org.apache.activemq.command.ConsumerId;
038import org.apache.activemq.command.ConsumerInfo;
039import org.apache.activemq.command.Message;
040import org.apache.activemq.command.MessageAck;
041import org.apache.activemq.command.MessageDispatchNotification;
042import org.apache.activemq.command.MessagePull;
043import org.apache.activemq.command.ProducerInfo;
044import org.apache.activemq.command.RemoveSubscriptionInfo;
045import org.apache.activemq.command.Response;
046import org.apache.activemq.filter.DestinationFilter;
047import org.apache.activemq.filter.DestinationMap;
048import org.apache.activemq.security.SecurityContext;
049import org.apache.activemq.thread.TaskRunnerFactory;
050import org.apache.activemq.usage.SystemUsage;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 *
056 */
057public abstract class AbstractRegion implements Region {
058
059    private static final Logger LOG = LoggerFactory.getLogger(AbstractRegion.class);
060
061    protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
062    protected final DestinationMap destinationMap = new DestinationMap();
063    protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>();
064    protected final SystemUsage usageManager;
065    protected final DestinationFactory destinationFactory;
066    protected final DestinationStatistics destinationStatistics;
067    protected final RegionBroker broker;
068    protected boolean autoCreateDestinations = true;
069    protected final TaskRunnerFactory taskRunnerFactory;
070    protected final ReentrantReadWriteLock destinationsLock = new ReentrantReadWriteLock();
071    protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>();
072    protected boolean started;
073
074    public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager,
075            TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
076        if (broker == null) {
077            throw new IllegalArgumentException("null broker");
078        }
079        this.broker = broker;
080        this.destinationStatistics = destinationStatistics;
081        this.usageManager = memoryManager;
082        this.taskRunnerFactory = taskRunnerFactory;
083        if (destinationFactory == null) {
084            throw new IllegalArgumentException("null destinationFactory");
085        }
086        this.destinationFactory = destinationFactory;
087    }
088
089    @Override
090    public final void start() throws Exception {
091        started = true;
092
093        Set<ActiveMQDestination> inactiveDests = getInactiveDestinations();
094        for (Iterator<ActiveMQDestination> iter = inactiveDests.iterator(); iter.hasNext();) {
095            ActiveMQDestination dest = iter.next();
096
097            ConnectionContext context = new ConnectionContext();
098            context.setBroker(broker.getBrokerService().getBroker());
099            context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
100            context.getBroker().addDestination(context, dest, false);
101        }
102        destinationsLock.readLock().lock();
103        try{
104            for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
105                Destination dest = i.next();
106                dest.start();
107            }
108        } finally {
109            destinationsLock.readLock().unlock();
110        }
111    }
112
113    @Override
114    public void stop() throws Exception {
115        started = false;
116        destinationsLock.readLock().lock();
117        try{
118            for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
119                Destination dest = i.next();
120                dest.stop();
121            }
122        } finally {
123            destinationsLock.readLock().unlock();
124        }
125        destinations.clear();
126    }
127
128    @Override
129    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
130            boolean createIfTemporary) throws Exception {
131
132        destinationsLock.writeLock().lock();
133        try {
134            Destination dest = destinations.get(destination);
135            if (dest == null) {
136                if (destination.isTemporary() == false || createIfTemporary) {
137                    LOG.debug("{} adding destination: {}", broker.getBrokerName(), destination);
138                    dest = createDestination(context, destination);
139                    // intercept if there is a valid interceptor defined
140                    DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
141                    if (destinationInterceptor != null) {
142                        dest = destinationInterceptor.intercept(dest);
143                    }
144                    dest.start();
145                    addSubscriptionsForDestination(context, dest);
146                    destinations.put(destination, dest);
147                    destinationMap.unsynchronizedPut(destination, dest);
148                }
149                if (dest == null) {
150                    throw new DestinationDoesNotExistException(destination.getQualifiedName());
151                }
152            }
153            return dest;
154        } finally {
155            destinationsLock.writeLock().unlock();
156        }
157    }
158
159    public Map<ConsumerId, Subscription> getSubscriptions() {
160        return subscriptions;
161    }
162
163    protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest)
164            throws Exception {
165
166        List<Subscription> rc = new ArrayList<Subscription>();
167        // Add all consumers that are interested in the destination.
168        for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
169            Subscription sub = iter.next();
170            if (sub.matches(dest.getActiveMQDestination())) {
171                try {
172                    ConnectionContext originalContext = sub.getContext() != null ? sub.getContext() : context;
173                    dest.addSubscription(originalContext, sub);
174                    rc.add(sub);
175                } catch (SecurityException e) {
176                    if (sub.isWildcard()) {
177                        LOG.debug("Subscription denied for " + sub + " to destination " +
178                            dest.getActiveMQDestination() +  ": " + e.getMessage());
179                    } else {
180                        throw e;
181                    }
182                }
183            }
184        }
185        return rc;
186
187    }
188
189    @Override
190    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
191            throws Exception {
192
193        // No timeout.. then try to shut down right way, fails if there are
194        // current subscribers.
195        if (timeout == 0) {
196            for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
197                Subscription sub = iter.next();
198                if (sub.matches(destination) ) {
199                    throw new JMSException("Destination still has an active subscription: " + destination);
200                }
201            }
202        }
203
204        if (timeout > 0) {
205            // TODO: implement a way to notify the subscribers that we want to
206            // take the down
207            // the destination and that they should un-subscribe.. Then wait up
208            // to timeout time before
209            // dropping the subscription.
210        }
211
212        LOG.debug("{} removing destination: {}", broker.getBrokerName(), destination);
213
214        destinationsLock.writeLock().lock();
215        try {
216            Destination dest = destinations.remove(destination);
217            if (dest != null) {
218                // timeout<0 or we timed out, we now force any remaining
219                // subscriptions to un-subscribe.
220                for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
221                    Subscription sub = iter.next();
222                    if (sub.matches(destination)) {
223                        dest.removeSubscription(context, sub, 0l);
224                    }
225                }
226                destinationMap.unsynchronizedRemove(destination, dest);
227                dispose(context, dest);
228                DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
229                if (destinationInterceptor != null) {
230                    destinationInterceptor.remove(dest);
231                }
232
233            } else {
234                LOG.debug("Cannot remove a destination that doesn't exist: {}", destination);
235            }
236        } finally {
237            destinationsLock.writeLock().unlock();
238        }
239    }
240
241    /**
242     * Provide an exact or wildcard lookup of destinations in the region
243     *
244     * @return a set of matching destination objects.
245     */
246    @Override
247    @SuppressWarnings("unchecked")
248    public Set<Destination> getDestinations(ActiveMQDestination destination) {
249        destinationsLock.readLock().lock();
250        try{
251            return destinationMap.unsynchronizedGet(destination);
252        } finally {
253            destinationsLock.readLock().unlock();
254        }
255    }
256
257    @Override
258    public Map<ActiveMQDestination, Destination> getDestinationMap() {
259        return destinations;
260    }
261
262    @Override
263    @SuppressWarnings("unchecked")
264    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
265        LOG.debug("{} adding consumer: {} for destination: {}", new Object[]{ broker.getBrokerName(), info.getConsumerId(), info.getDestination() });
266        ActiveMQDestination destination = info.getDestination();
267        if (destination != null && !destination.isPattern() && !destination.isComposite()) {
268            // lets auto-create the destination
269            lookup(context, destination,true);
270        }
271
272        Object addGuard;
273        synchronized (consumerChangeMutexMap) {
274            addGuard = consumerChangeMutexMap.get(info.getConsumerId());
275            if (addGuard == null) {
276                addGuard = new Object();
277                consumerChangeMutexMap.put(info.getConsumerId(), addGuard);
278            }
279        }
280        synchronized (addGuard) {
281            Subscription o = subscriptions.get(info.getConsumerId());
282            if (o != null) {
283                LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
284                return o;
285            }
286
287            // We may need to add some destinations that are in persistent store
288            // but not active
289            // in the broker.
290            //
291            // TODO: think about this a little more. This is good cause
292            // destinations are not loaded into
293            // memory until a client needs to use the queue, but a management
294            // agent viewing the
295            // broker will not see a destination that exists in persistent
296            // store. We may want to
297            // eagerly load all destinations into the broker but have an
298            // inactive state for the
299            // destination which has reduced memory usage.
300            //
301            DestinationFilter.parseFilter(info.getDestination());
302
303            Subscription sub = createSubscription(context, info);
304
305            // At this point we're done directly manipulating subscriptions,
306            // but we need to retain the synchronized block here. Consider
307            // otherwise what would happen if at this point a second
308            // thread added, then removed, as would be allowed with
309            // no mutex held. Remove is only essentially run once
310            // so everything after this point would be leaked.
311
312            // Add the subscription to all the matching queues.
313            // But copy the matches first - to prevent deadlocks
314            List<Destination> addList = new ArrayList<Destination>();
315            destinationsLock.readLock().lock();
316            try {
317                for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
318                    addList.add(dest);
319                }
320                // ensure sub visible to any new dest addSubscriptionsForDestination
321                subscriptions.put(info.getConsumerId(), sub);
322            } finally {
323                destinationsLock.readLock().unlock();
324            }
325
326            List<Destination> removeList = new ArrayList<Destination>();
327            for (Destination dest : addList) {
328                try {
329                    dest.addSubscription(context, sub);
330                    removeList.add(dest);
331                } catch (SecurityException e){
332                    if (sub.isWildcard()) {
333                        LOG.debug("Subscription denied for " + sub + " to destination " +
334                            dest.getActiveMQDestination() + ": " + e.getMessage());
335                    } else {
336                        // remove partial subscriptions
337                        for (Destination remove : removeList) {
338                            try {
339                                remove.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
340                            } catch (Exception ex) {
341                                LOG.error("Error unsubscribing " + sub + " from " + remove + ": " + ex.getMessage(), ex);
342                            }
343                        }
344                        subscriptions.remove(info.getConsumerId());
345                        removeList.clear();
346                        throw e;
347                    }
348                }
349            }
350            removeList.clear();
351
352            if (info.isBrowser()) {
353                ((QueueBrowserSubscription) sub).destinationsAdded();
354            }
355
356            return sub;
357        }
358    }
359
360    /**
361     * Get all the Destinations that are in storage
362     *
363     * @return Set of all stored destinations
364     */
365    @SuppressWarnings("rawtypes")
366    public Set getDurableDestinations() {
367        return destinationFactory.getDestinations();
368    }
369
370    /**
371     * @return all Destinations that don't have active consumers
372     */
373    protected Set<ActiveMQDestination> getInactiveDestinations() {
374        Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations();
375        destinationsLock.readLock().lock();
376        try {
377            inactiveDests.removeAll(destinations.keySet());
378        } finally {
379            destinationsLock.readLock().unlock();
380        }
381        return inactiveDests;
382    }
383
384    @Override
385    @SuppressWarnings("unchecked")
386    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
387        LOG.debug("{} removing consumer: {} for destination: {}", new Object[]{ broker.getBrokerName(), info.getConsumerId(), info.getDestination() });
388
389        Subscription sub = subscriptions.remove(info.getConsumerId());
390        // The sub could be removed elsewhere - see ConnectionSplitBroker
391        if (sub != null) {
392
393            // remove the subscription from all the matching queues.
394            List<Destination> removeList = new ArrayList<Destination>();
395            destinationsLock.readLock().lock();
396            try {
397                for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
398                    removeList.add(dest);
399                }
400            } finally {
401                destinationsLock.readLock().unlock();
402            }
403            for (Destination dest : removeList) {
404                dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
405            }
406
407            destroySubscription(sub);
408        }
409        synchronized (consumerChangeMutexMap) {
410            consumerChangeMutexMap.remove(info.getConsumerId());
411        }
412    }
413
414    protected void destroySubscription(Subscription sub) {
415        sub.destroy();
416    }
417
418    @Override
419    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
420        throw new JMSException("Invalid operation.");
421    }
422
423    @Override
424    public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
425        final ConnectionContext context = producerExchange.getConnectionContext();
426
427        if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) {
428            final Destination regionDestination = lookup(context, messageSend.getDestination(),false);
429            producerExchange.setRegionDestination(regionDestination);
430        }
431
432        producerExchange.getRegionDestination().send(producerExchange, messageSend);
433
434        if (producerExchange.getProducerState() != null && producerExchange.getProducerState().getInfo() != null){
435            producerExchange.getProducerState().getInfo().incrementSentCount();
436        }
437    }
438
439    @Override
440    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
441        Subscription sub = consumerExchange.getSubscription();
442        if (sub == null) {
443            sub = subscriptions.get(ack.getConsumerId());
444            if (sub == null) {
445                if (!consumerExchange.getConnectionContext().isInRecoveryMode()) {
446                    LOG.warn("Ack for non existent subscription, ack: {}", ack);
447                    throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
448                } else {
449                    LOG.debug("Ack for non existent subscription in recovery, ack: {}", ack);
450                    return;
451                }
452            }
453            consumerExchange.setSubscription(sub);
454        }
455        sub.acknowledge(consumerExchange.getConnectionContext(), ack);
456    }
457
458    @Override
459    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
460        Subscription sub = subscriptions.get(pull.getConsumerId());
461        if (sub == null) {
462            throw new IllegalArgumentException("The subscription does not exist: " + pull.getConsumerId());
463        }
464        return sub.pullMessage(context, pull);
465    }
466
467    protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception {
468        Destination dest = null;
469
470        destinationsLock.readLock().lock();
471        try {
472            dest = destinations.get(destination);
473        } finally {
474            destinationsLock.readLock().unlock();
475        }
476
477        if (dest == null) {
478            if (isAutoCreateDestinations()) {
479                // Try to auto create the destination... re-invoke broker
480                // from the
481                // top so that the proper security checks are performed.
482                dest = context.getBroker().addDestination(context, destination, createTemporary);
483            }
484
485            if (dest == null) {
486                throw new JMSException("The destination " + destination + " does not exist.");
487            }
488        }
489        return dest;
490    }
491
492    @Override
493    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
494        Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId());
495        if (sub != null) {
496            sub.processMessageDispatchNotification(messageDispatchNotification);
497        } else {
498            throw new JMSException("Slave broker out of sync with master - Subscription: "
499                    + messageDispatchNotification.getConsumerId() + " on "
500                    + messageDispatchNotification.getDestination() + " does not exist for dispatch of message: "
501                    + messageDispatchNotification.getMessageId());
502        }
503    }
504
505    /*
506     * For a Queue/TempQueue, dispatch order is imperative to match acks, so the
507     * dispatch is deferred till the notification to ensure that the
508     * subscription chosen by the master is used. AMQ-2102
509     */
510    protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification)
511            throws Exception {
512        Destination dest = null;
513        destinationsLock.readLock().lock();
514        try {
515            dest = destinations.get(messageDispatchNotification.getDestination());
516        } finally {
517            destinationsLock.readLock().unlock();
518        }
519
520        if (dest != null) {
521            dest.processDispatchNotification(messageDispatchNotification);
522        } else {
523            throw new JMSException("Slave broker out of sync with master - Destination: "
524                    + messageDispatchNotification.getDestination() + " does not exist for consumer "
525                    + messageDispatchNotification.getConsumerId() + " with message: "
526                    + messageDispatchNotification.getMessageId());
527        }
528    }
529
530    @Override
531    public void gc() {
532        for (Subscription sub : subscriptions.values()) {
533            sub.gc();
534        }
535
536        destinationsLock.readLock().lock();
537        try {
538            for (Destination dest : destinations.values()) {
539                dest.gc();
540            }
541        } finally {
542            destinationsLock.readLock().unlock();
543        }
544    }
545
546    protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception;
547
548    protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination)
549            throws Exception {
550        return destinationFactory.createDestination(context, destination, destinationStatistics);
551    }
552
553    public boolean isAutoCreateDestinations() {
554        return autoCreateDestinations;
555    }
556
557    public void setAutoCreateDestinations(boolean autoCreateDestinations) {
558        this.autoCreateDestinations = autoCreateDestinations;
559    }
560
561    @Override
562    @SuppressWarnings("unchecked")
563    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
564        destinationsLock.readLock().lock();
565        try {
566            for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
567                dest.addProducer(context, info);
568            }
569        } finally {
570            destinationsLock.readLock().unlock();
571        }
572    }
573
574    /**
575     * Removes a Producer.
576     *
577     * @param context
578     *            the environment the operation is being executed under.
579     * @throws Exception
580     *             TODO
581     */
582    @Override
583    @SuppressWarnings("unchecked")
584    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
585        destinationsLock.readLock().lock();
586        try {
587            for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
588                dest.removeProducer(context, info);
589            }
590        } finally {
591            destinationsLock.readLock().unlock();
592        }
593    }
594
595    protected void dispose(ConnectionContext context, Destination dest) throws Exception {
596        dest.dispose(context);
597        dest.stop();
598        destinationFactory.removeDestination(dest);
599    }
600
601    @Override
602    public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
603        Subscription sub = subscriptions.get(control.getConsumerId());
604        if (sub != null && sub instanceof AbstractSubscription) {
605            ((AbstractSubscription) sub).setPrefetchSize(control.getPrefetch());
606            if (broker.getDestinationPolicy() != null) {
607                PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(control.getDestination());
608                if (entry != null) {
609                    entry.configurePrefetch(sub);
610                }
611            }
612            LOG.debug("setting prefetch: {}, on subscription: {}; resulting value: {}", new Object[]{ control.getPrefetch(), control.getConsumerId(), sub.getConsumerInfo().getPrefetchSize()});
613            try {
614                lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
615            } catch (Exception e) {
616                LOG.warn("failed to deliver post consumerControl dispatch-wakeup, to destination: {}", control.getDestination(), e);
617            }
618        }
619    }
620
621    @Override
622    public void reapplyInterceptor() {
623        destinationsLock.writeLock().lock();
624        try {
625            DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
626            Map<ActiveMQDestination, Destination> map = getDestinationMap();
627            for (ActiveMQDestination key : map.keySet()) {
628                Destination destination = map.get(key);
629                if (destination instanceof CompositeDestinationFilter) {
630                    destination = ((CompositeDestinationFilter) destination).next;
631                }
632                if (destinationInterceptor != null) {
633                    destination = destinationInterceptor.intercept(destination);
634                }
635                getDestinationMap().put(key, destination);
636                destinations.put(key, destination);
637            }
638        } finally {
639            destinationsLock.writeLock().unlock();
640        }
641    }
642}