001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.net.URI;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Locale;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.CopyOnWriteArrayList;
030import java.util.concurrent.ThreadPoolExecutor;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.locks.ReentrantReadWriteLock;
033
034import javax.jms.InvalidClientIDException;
035import javax.jms.JMSException;
036
037import org.apache.activemq.broker.Broker;
038import org.apache.activemq.broker.BrokerService;
039import org.apache.activemq.broker.Connection;
040import org.apache.activemq.broker.ConnectionContext;
041import org.apache.activemq.broker.ConsumerBrokerExchange;
042import org.apache.activemq.broker.EmptyBroker;
043import org.apache.activemq.broker.ProducerBrokerExchange;
044import org.apache.activemq.broker.TransportConnection;
045import org.apache.activemq.broker.TransportConnector;
046import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
047import org.apache.activemq.broker.region.policy.PolicyMap;
048import org.apache.activemq.command.ActiveMQDestination;
049import org.apache.activemq.command.ActiveMQMessage;
050import org.apache.activemq.command.BrokerId;
051import org.apache.activemq.command.BrokerInfo;
052import org.apache.activemq.command.ConnectionId;
053import org.apache.activemq.command.ConnectionInfo;
054import org.apache.activemq.command.ConsumerControl;
055import org.apache.activemq.command.ConsumerInfo;
056import org.apache.activemq.command.DestinationInfo;
057import org.apache.activemq.command.Message;
058import org.apache.activemq.command.MessageAck;
059import org.apache.activemq.command.MessageDispatch;
060import org.apache.activemq.command.MessageDispatchNotification;
061import org.apache.activemq.command.MessagePull;
062import org.apache.activemq.command.ProducerInfo;
063import org.apache.activemq.command.RemoveSubscriptionInfo;
064import org.apache.activemq.command.Response;
065import org.apache.activemq.command.TransactionId;
066import org.apache.activemq.state.ConnectionState;
067import org.apache.activemq.store.PListStore;
068import org.apache.activemq.thread.Scheduler;
069import org.apache.activemq.thread.TaskRunnerFactory;
070import org.apache.activemq.transport.TransmitCallback;
071import org.apache.activemq.usage.SystemUsage;
072import org.apache.activemq.util.BrokerSupport;
073import org.apache.activemq.util.IdGenerator;
074import org.apache.activemq.util.InetAddressUtil;
075import org.apache.activemq.util.LongSequenceGenerator;
076import org.apache.activemq.util.ServiceStopper;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080/**
081 * Routes Broker operations to the correct messaging regions for processing.
082 */
083public class RegionBroker extends EmptyBroker {
084    public static final String ORIGINAL_EXPIRATION = "originalExpiration";
085    private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class);
086    private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
087
088    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
089    protected DestinationFactory destinationFactory;
090    protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
091
092    private final Region queueRegion;
093    private final Region topicRegion;
094    private final Region tempQueueRegion;
095    private final Region tempTopicRegion;
096    protected final BrokerService brokerService;
097    private boolean started;
098    private boolean keepDurableSubsActive;
099
100    private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
101    private final Map<ActiveMQDestination, ActiveMQDestination> destinationGate = new HashMap<ActiveMQDestination, ActiveMQDestination>();
102    private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
103    private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap<BrokerId, BrokerInfo>();
104
105    private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
106    private BrokerId brokerId;
107    private String brokerName;
108    private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>();
109    private final DestinationInterceptor destinationInterceptor;
110    private ConnectionContext adminConnectionContext;
111    private final Scheduler scheduler;
112    private final ThreadPoolExecutor executor;
113    private boolean allowTempAutoCreationOnSend;
114
115    private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock();
116    private final TaskRunnerFactory taskRunnerFactory;
117    private final AtomicBoolean purgeInactiveDestinationsTaskInProgress = new AtomicBoolean(false);
118    private final Runnable purgeInactiveDestinationsTask = new Runnable() {
119        @Override
120        public void run() {
121            if (purgeInactiveDestinationsTaskInProgress.compareAndSet(false, true)) {
122                taskRunnerFactory.execute(purgeInactiveDestinationsWork);
123            }
124        }
125    };
126    private final Runnable purgeInactiveDestinationsWork = new Runnable() {
127        @Override
128        public void run() {
129            try {
130                purgeInactiveDestinations();
131            } catch (Throwable ignored) {
132                LOG.error("Unexpected exception on purgeInactiveDestinations {}", this, ignored);
133            } finally {
134                purgeInactiveDestinationsTaskInProgress.set(false);
135            }
136        }
137    };
138
139    public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
140        DestinationInterceptor destinationInterceptor, Scheduler scheduler, ThreadPoolExecutor executor) throws IOException {
141        this.brokerService = brokerService;
142        this.executor = executor;
143        this.scheduler = scheduler;
144        if (destinationFactory == null) {
145            throw new IllegalArgumentException("null destinationFactory");
146        }
147        this.sequenceGenerator.setLastSequenceId(destinationFactory.getLastMessageBrokerSequenceId());
148        this.destinationFactory = destinationFactory;
149        queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
150        topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
151        this.destinationInterceptor = destinationInterceptor;
152        tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
153        tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
154        this.taskRunnerFactory = taskRunnerFactory;
155    }
156
157    @Override
158    public Map<ActiveMQDestination, Destination> getDestinationMap() {
159        Map<ActiveMQDestination, Destination> answer = new HashMap<ActiveMQDestination, Destination>(getQueueRegion().getDestinationMap());
160        answer.putAll(getTopicRegion().getDestinationMap());
161        return answer;
162    }
163
164    @Override
165    public Map<ActiveMQDestination, Destination> getDestinationMap(ActiveMQDestination destination) {
166        try {
167            return getRegion(destination).getDestinationMap();
168        } catch (JMSException jmse) {
169            return Collections.emptyMap();
170        }
171    }
172
173    @Override
174    public Set<Destination> getDestinations(ActiveMQDestination destination) {
175        try {
176            return getRegion(destination).getDestinations(destination);
177        } catch (JMSException jmse) {
178            return Collections.emptySet();
179        }
180    }
181
182    @Override
183    @SuppressWarnings("rawtypes")
184    public Broker getAdaptor(Class type) {
185        if (type.isInstance(this)) {
186            return this;
187        }
188        return null;
189    }
190
191    public Region getQueueRegion() {
192        return queueRegion;
193    }
194
195    public Region getTempQueueRegion() {
196        return tempQueueRegion;
197    }
198
199    public Region getTempTopicRegion() {
200        return tempTopicRegion;
201    }
202
203    public Region getTopicRegion() {
204        return topicRegion;
205    }
206
207    protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
208        return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
209    }
210
211    protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
212        return new TempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
213    }
214
215    protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
216        return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
217    }
218
219    protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
220        return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
221    }
222
223    @Override
224    public void start() throws Exception {
225        started = true;
226        queueRegion.start();
227        topicRegion.start();
228        tempQueueRegion.start();
229        tempTopicRegion.start();
230        int period = this.brokerService.getSchedulePeriodForDestinationPurge();
231        if (period > 0) {
232            this.scheduler.executePeriodically(purgeInactiveDestinationsTask, period);
233        }
234    }
235
236    @Override
237    public void stop() throws Exception {
238        started = false;
239        this.scheduler.cancel(purgeInactiveDestinationsTask);
240        ServiceStopper ss = new ServiceStopper();
241        doStop(ss);
242        ss.throwFirstException();
243        // clear the state
244        clientIdSet.clear();
245        connections.clear();
246        destinations.clear();
247        brokerInfos.clear();
248    }
249
250    public PolicyMap getDestinationPolicy() {
251        return brokerService != null ? brokerService.getDestinationPolicy() : null;
252    }
253
254    public ConnectionContext getConnectionContext(String clientId) {
255        return clientIdSet.get(clientId);
256    }
257
258    @Override
259    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
260        String clientId = info.getClientId();
261        if (clientId == null) {
262            throw new InvalidClientIDException("No clientID specified for connection request");
263        }
264
265        ConnectionContext oldContext = null;
266
267        synchronized (clientIdSet) {
268            oldContext = clientIdSet.get(clientId);
269            if (oldContext != null) {
270                if (context.isAllowLinkStealing()) {
271                    clientIdSet.put(clientId, context);
272                } else {
273                    throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
274                        + oldContext.getConnection().getRemoteAddress());
275                }
276            } else {
277                clientIdSet.put(clientId, context);
278            }
279        }
280
281        if (oldContext != null) {
282            if (oldContext.getConnection() != null) {
283                Connection connection = oldContext.getConnection();
284                LOG.warn("Stealing link for clientId {} From Connection {}", clientId, oldContext.getConnection());
285                if (connection instanceof TransportConnection) {
286                    TransportConnection transportConnection = (TransportConnection) connection;
287                    transportConnection.stopAsync(new IOException("Stealing link for clientId " + clientId + " From Connection " + oldContext.getConnection().getConnectionId()));
288                } else {
289                    connection.stop();
290                }
291            } else {
292                LOG.error("No Connection found for {}", oldContext);
293            }
294        }
295
296        connections.add(context.getConnection());
297    }
298
299    @Override
300    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
301        String clientId = info.getClientId();
302        if (clientId == null) {
303            throw new InvalidClientIDException("No clientID specified for connection disconnect request");
304        }
305        synchronized (clientIdSet) {
306            ConnectionContext oldValue = clientIdSet.get(clientId);
307            // we may be removing the duplicate connection, not the first connection to be created
308            // so lets check that their connection IDs are the same
309            if (oldValue == context) {
310                if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) {
311                    clientIdSet.remove(clientId);
312                }
313            }
314        }
315        connections.remove(context.getConnection());
316    }
317
318    protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) {
319        return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2));
320    }
321
322    @Override
323    public Connection[] getClients() throws Exception {
324        ArrayList<Connection> l = new ArrayList<Connection>(connections);
325        Connection rc[] = new Connection[l.size()];
326        l.toArray(rc);
327        return rc;
328    }
329
330    @Override
331    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemp) throws Exception {
332
333        Destination answer;
334
335        answer = destinations.get(destination);
336        if (answer != null) {
337            return answer;
338        }
339
340        synchronized (destinationGate) {
341            answer = destinations.get(destination);
342            if (answer != null) {
343                return answer;
344            }
345
346            if (destinationGate.get(destination) != null) {
347                // Guard against spurious wakeup.
348                while (destinationGate.containsKey(destination)) {
349                    destinationGate.wait();
350                }
351                answer = destinations.get(destination);
352                if (answer != null) {
353                    return answer;
354                } else {
355                    // In case of intermediate remove or add failure
356                    destinationGate.put(destination, destination);
357                }
358            }
359        }
360
361        try {
362            boolean create = true;
363            if (destination.isTemporary()) {
364                create = createIfTemp;
365            }
366            answer = getRegion(destination).addDestination(context, destination, create);
367            destinations.put(destination, answer);
368        } finally {
369            synchronized (destinationGate) {
370                destinationGate.remove(destination);
371                destinationGate.notifyAll();
372            }
373        }
374
375        return answer;
376    }
377
378    @Override
379    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
380        if (destinations.containsKey(destination)) {
381            getRegion(destination).removeDestination(context, destination, timeout);
382            destinations.remove(destination);
383        }
384    }
385
386    @Override
387    public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
388        addDestination(context, info.getDestination(), true);
389
390    }
391
392    @Override
393    public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
394        removeDestination(context, info.getDestination(), info.getTimeout());
395    }
396
397    @Override
398    public ActiveMQDestination[] getDestinations() throws Exception {
399        ArrayList<ActiveMQDestination> l;
400
401        l = new ArrayList<ActiveMQDestination>(getDestinationMap().keySet());
402
403        ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];
404        l.toArray(rc);
405        return rc;
406    }
407
408    @Override
409    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
410        ActiveMQDestination destination = info.getDestination();
411        if (destination != null) {
412            inactiveDestinationsPurgeLock.readLock().lock();
413            try {
414                // This seems to cause the destination to be added but without
415                // advisories firing...
416                context.getBroker().addDestination(context, destination, isAllowTempAutoCreationOnSend());
417                getRegion(destination).addProducer(context, info);
418            } finally {
419                inactiveDestinationsPurgeLock.readLock().unlock();
420            }
421        }
422    }
423
424    @Override
425    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
426        ActiveMQDestination destination = info.getDestination();
427        if (destination != null) {
428            inactiveDestinationsPurgeLock.readLock().lock();
429            try {
430                getRegion(destination).removeProducer(context, info);
431            } finally {
432                inactiveDestinationsPurgeLock.readLock().unlock();
433            }
434        }
435    }
436
437    @Override
438    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
439        ActiveMQDestination destination = info.getDestination();
440        if (destinationInterceptor != null) {
441            destinationInterceptor.create(this, context, destination);
442        }
443        inactiveDestinationsPurgeLock.readLock().lock();
444        try {
445            return getRegion(destination).addConsumer(context, info);
446        } finally {
447            inactiveDestinationsPurgeLock.readLock().unlock();
448        }
449    }
450
451    @Override
452    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
453        ActiveMQDestination destination = info.getDestination();
454        inactiveDestinationsPurgeLock.readLock().lock();
455        try {
456            getRegion(destination).removeConsumer(context, info);
457        } finally {
458            inactiveDestinationsPurgeLock.readLock().unlock();
459        }
460    }
461
462    @Override
463    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
464        inactiveDestinationsPurgeLock.readLock().lock();
465        try {
466            topicRegion.removeSubscription(context, info);
467        } finally {
468            inactiveDestinationsPurgeLock.readLock().unlock();
469
470        }
471    }
472
473    @Override
474    public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
475        ActiveMQDestination destination = message.getDestination();
476        message.setBrokerInTime(System.currentTimeMillis());
477        if (producerExchange.isMutable() || producerExchange.getRegion() == null
478            || (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed())) {
479            // ensure the destination is registered with the RegionBroker
480            producerExchange.getConnectionContext().getBroker()
481                .addDestination(producerExchange.getConnectionContext(), destination, isAllowTempAutoCreationOnSend());
482            producerExchange.setRegion(getRegion(destination));
483            producerExchange.setRegionDestination(null);
484        }
485
486        producerExchange.getRegion().send(producerExchange, message);
487
488        // clean up so these references aren't kept (possible leak) in the producer exchange
489        // especially since temps are transitory
490        if (producerExchange.isMutable()) {
491            producerExchange.setRegionDestination(null);
492            producerExchange.setRegion(null);
493        }
494    }
495
496    @Override
497    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
498        if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) {
499            ActiveMQDestination destination = ack.getDestination();
500            consumerExchange.setRegion(getRegion(destination));
501        }
502        consumerExchange.getRegion().acknowledge(consumerExchange, ack);
503    }
504
505    public Region getRegion(ActiveMQDestination destination) throws JMSException {
506        switch (destination.getDestinationType()) {
507            case ActiveMQDestination.QUEUE_TYPE:
508                return queueRegion;
509            case ActiveMQDestination.TOPIC_TYPE:
510                return topicRegion;
511            case ActiveMQDestination.TEMP_QUEUE_TYPE:
512                return tempQueueRegion;
513            case ActiveMQDestination.TEMP_TOPIC_TYPE:
514                return tempTopicRegion;
515            default:
516                throw createUnknownDestinationTypeException(destination);
517        }
518    }
519
520    @Override
521    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
522        ActiveMQDestination destination = pull.getDestination();
523        return getRegion(destination).messagePull(context, pull);
524    }
525
526    @Override
527    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
528        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
529    }
530
531    @Override
532    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
533        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
534    }
535
536    @Override
537    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
538        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
539    }
540
541    @Override
542    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
543        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
544    }
545
546    @Override
547    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
548        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
549    }
550
551    @Override
552    public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
553        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
554    }
555
556    @Override
557    public void gc() {
558        queueRegion.gc();
559        topicRegion.gc();
560    }
561
562    @Override
563    public BrokerId getBrokerId() {
564        if (brokerId == null) {
565            brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
566        }
567        return brokerId;
568    }
569
570    public void setBrokerId(BrokerId brokerId) {
571        this.brokerId = brokerId;
572    }
573
574    @Override
575    public String getBrokerName() {
576        if (brokerName == null) {
577            try {
578                brokerName = InetAddressUtil.getLocalHostName().toLowerCase(Locale.ENGLISH);
579            } catch (Exception e) {
580                brokerName = "localhost";
581            }
582        }
583        return brokerName;
584    }
585
586    public void setBrokerName(String brokerName) {
587        this.brokerName = brokerName;
588    }
589
590    public DestinationStatistics getDestinationStatistics() {
591        return destinationStatistics;
592    }
593
594    protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) {
595        return new JMSException("Unknown destination type: " + destination.getDestinationType());
596    }
597
598    @Override
599    public synchronized void addBroker(Connection connection, BrokerInfo info) {
600        BrokerInfo existing = brokerInfos.get(info.getBrokerId());
601        if (existing == null) {
602            existing = info.copy();
603            existing.setPeerBrokerInfos(null);
604            brokerInfos.put(info.getBrokerId(), existing);
605        }
606        existing.incrementRefCount();
607        LOG.debug("{} addBroker: {} brokerInfo size: {}", new Object[]{ getBrokerName(), info.getBrokerName(), brokerInfos.size() });
608        addBrokerInClusterUpdate(info);
609    }
610
611    @Override
612    public synchronized void removeBroker(Connection connection, BrokerInfo info) {
613        if (info != null) {
614            BrokerInfo existing = brokerInfos.get(info.getBrokerId());
615            if (existing != null && existing.decrementRefCount() == 0) {
616                brokerInfos.remove(info.getBrokerId());
617            }
618            LOG.debug("{} removeBroker: {} brokerInfo size: {}", new Object[]{ getBrokerName(), info.getBrokerName(), brokerInfos.size()});
619            // When stopping don't send cluster updates since we are the one's tearing down
620            // our own bridges.
621            if (!brokerService.isStopping()) {
622                removeBrokerInClusterUpdate(info);
623            }
624        }
625    }
626
627    @Override
628    public synchronized BrokerInfo[] getPeerBrokerInfos() {
629        BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
630        result = brokerInfos.values().toArray(result);
631        return result;
632    }
633
634    @Override
635    public void preProcessDispatch(final MessageDispatch messageDispatch) {
636        final Message message = messageDispatch.getMessage();
637        if (message != null) {
638            long endTime = System.currentTimeMillis();
639            message.setBrokerOutTime(endTime);
640            if (getBrokerService().isEnableStatistics()) {
641                long totalTime = endTime - message.getBrokerInTime();
642                ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
643            }
644            if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() && !message.isRedelivered()) {
645                final int originalValue = message.getRedeliveryCounter();
646                message.incrementRedeliveryCounter();
647                try {
648                    if (message.isPersistent()) {
649                        ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message);
650                    }
651                    messageDispatch.setTransmitCallback(new TransmitCallback() {
652                        // dispatch is considered a delivery, so update sub state post dispatch otherwise
653                        // on a disconnect/reconnect cached messages will not reflect initial delivery attempt
654                        final TransmitCallback delegate = messageDispatch.getTransmitCallback();
655                        @Override
656                        public void onSuccess() {
657                            message.incrementRedeliveryCounter();
658                            if (delegate != null) {
659                                delegate.onSuccess();
660                            }
661                        }
662
663                        @Override
664                        public void onFailure() {
665                            if (delegate != null) {
666                                delegate.onFailure();
667                            }
668                        }
669                    });
670                } catch (IOException error) {
671                    RuntimeException runtimeException = new RuntimeException("Failed to persist JMSRedeliveryFlag on " + message.getMessageId() + " in " + message.getDestination(), error);
672                    LOG.warn(runtimeException.getLocalizedMessage(), runtimeException);
673                    throw runtimeException;
674                } finally {
675                    message.setRedeliveryCounter(originalValue);
676                }
677            }
678        }
679    }
680
681    @Override
682    public void postProcessDispatch(MessageDispatch messageDispatch) {
683    }
684
685    @Override
686    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
687        ActiveMQDestination destination = messageDispatchNotification.getDestination();
688        getRegion(destination).processDispatchNotification(messageDispatchNotification);
689    }
690
691    @Override
692    public boolean isStopped() {
693        return !started;
694    }
695
696    @Override
697    public Set<ActiveMQDestination> getDurableDestinations() {
698        return destinationFactory.getDestinations();
699    }
700
701    protected void doStop(ServiceStopper ss) {
702        ss.stop(queueRegion);
703        ss.stop(topicRegion);
704        ss.stop(tempQueueRegion);
705        ss.stop(tempTopicRegion);
706    }
707
708    public boolean isKeepDurableSubsActive() {
709        return keepDurableSubsActive;
710    }
711
712    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
713        this.keepDurableSubsActive = keepDurableSubsActive;
714        ((TopicRegion) topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
715    }
716
717    public DestinationInterceptor getDestinationInterceptor() {
718        return destinationInterceptor;
719    }
720
721    @Override
722    public ConnectionContext getAdminConnectionContext() {
723        return adminConnectionContext;
724    }
725
726    @Override
727    public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
728        this.adminConnectionContext = adminConnectionContext;
729    }
730
731    public Map<ConnectionId, ConnectionState> getConnectionStates() {
732        return connectionStates;
733    }
734
735    @Override
736    public PListStore getTempDataStore() {
737        return brokerService.getTempDataStore();
738    }
739
740    @Override
741    public URI getVmConnectorURI() {
742        return brokerService.getVmConnectorURI();
743    }
744
745    @Override
746    public void brokerServiceStarted() {
747    }
748
749    @Override
750    public BrokerService getBrokerService() {
751        return brokerService;
752    }
753
754    @Override
755    public boolean isExpired(MessageReference messageReference) {
756        return messageReference.canProcessAsExpired();
757    }
758
759    private boolean stampAsExpired(Message message) throws IOException {
760        boolean stamped = false;
761        if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
762            long expiration = message.getExpiration();
763            message.setProperty(ORIGINAL_EXPIRATION, new Long(expiration));
764            stamped = true;
765        }
766        return stamped;
767    }
768
769    @Override
770    public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) {
771        LOG.debug("Message expired {}", node);
772        getRoot().sendToDeadLetterQueue(context, node, subscription, new Throwable("Message Expired. Expiration:" + node.getExpiration()));
773    }
774
775    @Override
776    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference node, Subscription subscription, Throwable poisonCause) {
777        try {
778            if (node != null) {
779                Message message = node.getMessage();
780                if (message != null && node.getRegionDestination() != null) {
781                    DeadLetterStrategy deadLetterStrategy = ((Destination) node.getRegionDestination()).getDeadLetterStrategy();
782                    if (deadLetterStrategy != null) {
783                        if (deadLetterStrategy.isSendToDeadLetterQueue(message)) {
784                            ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(message, subscription);
785                            // Prevent a DLQ loop where same message is sent from a DLQ back to itself
786                            if (deadLetterDestination.equals(message.getDestination())) {
787                                LOG.debug("Not re-adding to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination());
788                                return false;
789                            }
790
791                            // message may be inflight to other subscriptions so do not modify
792                            message = message.copy();
793                            long dlqExpiration = deadLetterStrategy.getExpiration();
794                            if (dlqExpiration > 0) {
795                                dlqExpiration += System.currentTimeMillis();
796                            } else {
797                                stampAsExpired(message);
798                            }
799                            message.setExpiration(dlqExpiration);
800                            if (!message.isPersistent()) {
801                                message.setPersistent(true);
802                                message.setProperty("originalDeliveryMode", "NON_PERSISTENT");
803                            }
804                            if (poisonCause != null) {
805                                message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
806                                        poisonCause.toString());
807                            }
808                            // The original destination and transaction id do
809                            // not get filled when the message is first sent,
810                            // it is only populated if the message is routed to
811                            // another destination like the DLQ
812                            ConnectionContext adminContext = context;
813                            if (context.getSecurityContext() == null || !context.getSecurityContext().isBrokerContext()) {
814                                adminContext = BrokerSupport.getConnectionContext(this);
815                            }
816                            addDestination(adminContext, deadLetterDestination, false).getActiveMQDestination().setDLQ(true);
817                            BrokerSupport.resendNoCopy(adminContext, message, deadLetterDestination);
818                            return true;
819                        }
820                    } else {
821                        LOG.debug("Dead Letter message with no DLQ strategy in place, message id: {}, destination: {}", message.getMessageId(), message.getDestination());
822                    }
823                }
824            }
825        } catch (Exception e) {
826            LOG.warn("Caught an exception sending to DLQ: {}", node, e);
827        }
828
829        return false;
830    }
831
832    @Override
833    public Broker getRoot() {
834        try {
835            return getBrokerService().getBroker();
836        } catch (Exception e) {
837            LOG.error("Trying to get Root Broker", e);
838            throw new RuntimeException("The broker from the BrokerService should not throw an exception", e);
839        }
840    }
841
842    /**
843     * @return the broker sequence id
844     */
845    @Override
846    public long getBrokerSequenceId() {
847        synchronized (sequenceGenerator) {
848            return sequenceGenerator.getNextSequenceId();
849        }
850    }
851
852    @Override
853    public Scheduler getScheduler() {
854        return this.scheduler;
855    }
856
857    @Override
858    public ThreadPoolExecutor getExecutor() {
859        return this.executor;
860    }
861
862    @Override
863    public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
864        ActiveMQDestination destination = control.getDestination();
865        try {
866            getRegion(destination).processConsumerControl(consumerExchange, control);
867        } catch (JMSException jmse) {
868            LOG.warn("unmatched destination: {}, in consumerControl: {}", destination, control);
869        }
870    }
871
872    protected void addBrokerInClusterUpdate(BrokerInfo info) {
873        List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
874        for (TransportConnector connector : connectors) {
875            if (connector.isUpdateClusterClients()) {
876                connector.addPeerBroker(info);
877                connector.updateClientClusterInfo();
878            }
879        }
880    }
881
882    protected void removeBrokerInClusterUpdate(BrokerInfo info) {
883        List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
884        for (TransportConnector connector : connectors) {
885            if (connector.isUpdateClusterClients() && connector.isUpdateClusterClientsOnRemove()) {
886                connector.removePeerBroker(info);
887                connector.updateClientClusterInfo();
888            }
889        }
890    }
891
892    protected void purgeInactiveDestinations() {
893        inactiveDestinationsPurgeLock.writeLock().lock();
894        try {
895            List<Destination> list = new ArrayList<Destination>();
896            Map<ActiveMQDestination, Destination> map = getDestinationMap();
897            if (isAllowTempAutoCreationOnSend()) {
898                map.putAll(tempQueueRegion.getDestinationMap());
899                map.putAll(tempTopicRegion.getDestinationMap());
900            }
901            long maxPurgedDests = this.brokerService.getMaxPurgedDestinationsPerSweep();
902            long timeStamp = System.currentTimeMillis();
903            for (Destination d : map.values()) {
904                d.markForGC(timeStamp);
905                if (d.canGC()) {
906                    list.add(d);
907                    if (maxPurgedDests > 0 && list.size() == maxPurgedDests) {
908                        break;
909                    }
910                }
911            }
912
913            if (!list.isEmpty()) {
914                ConnectionContext context = BrokerSupport.getConnectionContext(this);
915                context.setBroker(this);
916
917                for (Destination dest : list) {
918                    Logger log = LOG;
919                    if (dest instanceof BaseDestination) {
920                        log = ((BaseDestination) dest).getLog();
921                    }
922                    log.info("{} Inactive for longer than {} ms - removing ...", dest.getName(), dest.getInactiveTimeoutBeforeGC());
923                    try {
924                        getRoot().removeDestination(context, dest.getActiveMQDestination(), isAllowTempAutoCreationOnSend() ? 1 : 0);
925                    } catch (Throwable e) {
926                        LOG.error("Failed to remove inactive destination {}", dest, e);
927                    }
928                }
929            }
930        } finally {
931            inactiveDestinationsPurgeLock.writeLock().unlock();
932        }
933    }
934
935    public boolean isAllowTempAutoCreationOnSend() {
936        return allowTempAutoCreationOnSend;
937    }
938
939    public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
940        this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
941    }
942
943    @Override
944    public void reapplyInterceptor() {
945        queueRegion.reapplyInterceptor();
946        topicRegion.reapplyInterceptor();
947        tempQueueRegion.reapplyInterceptor();
948        tempTopicRegion.reapplyInterceptor();
949    }
950}