001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.BufferedReader;
020import java.io.File;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.InputStreamReader;
024import java.net.URI;
025import java.net.URISyntaxException;
026import java.net.UnknownHostException;
027import java.security.Provider;
028import java.security.Security;
029import java.util.ArrayList;
030import java.util.Date;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Locale;
036import java.util.Map;
037import java.util.Set;
038import java.util.concurrent.CopyOnWriteArrayList;
039import java.util.concurrent.CountDownLatch;
040import java.util.concurrent.LinkedBlockingQueue;
041import java.util.concurrent.RejectedExecutionException;
042import java.util.concurrent.RejectedExecutionHandler;
043import java.util.concurrent.SynchronousQueue;
044import java.util.concurrent.ThreadFactory;
045import java.util.concurrent.ThreadPoolExecutor;
046import java.util.concurrent.TimeUnit;
047import java.util.concurrent.atomic.AtomicBoolean;
048import java.util.concurrent.atomic.AtomicInteger;
049import java.util.concurrent.atomic.AtomicLong;
050
051import javax.annotation.PostConstruct;
052import javax.annotation.PreDestroy;
053import javax.management.MalformedObjectNameException;
054import javax.management.ObjectName;
055
056import org.apache.activemq.ActiveMQConnectionMetaData;
057import org.apache.activemq.ConfigurationException;
058import org.apache.activemq.Service;
059import org.apache.activemq.advisory.AdvisoryBroker;
060import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
061import org.apache.activemq.broker.jmx.AnnotatedMBean;
062import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
063import org.apache.activemq.broker.jmx.BrokerView;
064import org.apache.activemq.broker.jmx.ConnectorView;
065import org.apache.activemq.broker.jmx.ConnectorViewMBean;
066import org.apache.activemq.broker.jmx.HealthView;
067import org.apache.activemq.broker.jmx.HealthViewMBean;
068import org.apache.activemq.broker.jmx.JmsConnectorView;
069import org.apache.activemq.broker.jmx.JobSchedulerView;
070import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
071import org.apache.activemq.broker.jmx.Log4JConfigView;
072import org.apache.activemq.broker.jmx.ManagedRegionBroker;
073import org.apache.activemq.broker.jmx.ManagementContext;
074import org.apache.activemq.broker.jmx.NetworkConnectorView;
075import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
076import org.apache.activemq.broker.jmx.ProxyConnectorView;
077import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
078import org.apache.activemq.broker.region.Destination;
079import org.apache.activemq.broker.region.DestinationFactory;
080import org.apache.activemq.broker.region.DestinationFactoryImpl;
081import org.apache.activemq.broker.region.DestinationInterceptor;
082import org.apache.activemq.broker.region.RegionBroker;
083import org.apache.activemq.broker.region.policy.PolicyMap;
084import org.apache.activemq.broker.region.virtual.MirroredQueue;
085import org.apache.activemq.broker.region.virtual.VirtualDestination;
086import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
087import org.apache.activemq.broker.region.virtual.VirtualTopic;
088import org.apache.activemq.broker.scheduler.JobSchedulerStore;
089import org.apache.activemq.broker.scheduler.SchedulerBroker;
090import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore;
091import org.apache.activemq.command.ActiveMQDestination;
092import org.apache.activemq.command.ActiveMQQueue;
093import org.apache.activemq.command.BrokerId;
094import org.apache.activemq.command.ProducerInfo;
095import org.apache.activemq.filter.DestinationFilter;
096import org.apache.activemq.network.ConnectionFilter;
097import org.apache.activemq.network.DiscoveryNetworkConnector;
098import org.apache.activemq.network.NetworkConnector;
099import org.apache.activemq.network.jms.JmsConnector;
100import org.apache.activemq.openwire.OpenWireFormat;
101import org.apache.activemq.proxy.ProxyConnector;
102import org.apache.activemq.security.MessageAuthorizationPolicy;
103import org.apache.activemq.selector.SelectorParser;
104import org.apache.activemq.store.JournaledStore;
105import org.apache.activemq.store.PListStore;
106import org.apache.activemq.store.PersistenceAdapter;
107import org.apache.activemq.store.PersistenceAdapterFactory;
108import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
109import org.apache.activemq.thread.Scheduler;
110import org.apache.activemq.thread.TaskRunnerFactory;
111import org.apache.activemq.transport.TransportFactorySupport;
112import org.apache.activemq.transport.TransportServer;
113import org.apache.activemq.transport.vm.VMTransportFactory;
114import org.apache.activemq.usage.PercentLimitUsage;
115import org.apache.activemq.usage.StoreUsage;
116import org.apache.activemq.usage.SystemUsage;
117import org.apache.activemq.util.BrokerSupport;
118import org.apache.activemq.util.DefaultIOExceptionHandler;
119import org.apache.activemq.util.IOExceptionHandler;
120import org.apache.activemq.util.IOExceptionSupport;
121import org.apache.activemq.util.IOHelper;
122import org.apache.activemq.util.InetAddressUtil;
123import org.apache.activemq.util.ServiceStopper;
124import org.apache.activemq.util.StoreUtil;
125import org.apache.activemq.util.ThreadPoolUtils;
126import org.apache.activemq.util.TimeUtils;
127import org.apache.activemq.util.URISupport;
128import org.slf4j.Logger;
129import org.slf4j.LoggerFactory;
130import org.slf4j.MDC;
131
132/**
133 * Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a
134 * number of transport connectors, network connectors and a bunch of properties
135 * which can be used to configure the broker as its lazily created.
136 *
137 * @org.apache.xbean.XBean
138 */
139public class BrokerService implements Service {
140    public static final String DEFAULT_PORT = "61616";
141    public static final String LOCAL_HOST_NAME;
142    public static final String BROKER_VERSION;
143    public static final String DEFAULT_BROKER_NAME = "localhost";
144    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
145    public static final long DEFAULT_START_TIMEOUT = 600000L;
146
147    private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
148
149    @SuppressWarnings("unused")
150    private static final long serialVersionUID = 7353129142305630237L;
151
152    private boolean useJmx = true;
153    private boolean enableStatistics = true;
154    private boolean persistent = true;
155    private boolean populateJMSXUserID;
156    private boolean useAuthenticatedPrincipalForJMSXUserID;
157    private boolean populateUserNameInMBeans;
158    private long mbeanInvocationTimeout = 0;
159
160    private boolean useShutdownHook = true;
161    private boolean useLoggingForShutdownErrors;
162    private boolean shutdownOnMasterFailure;
163    private boolean shutdownOnSlaveFailure;
164    private boolean waitForSlave;
165    private long waitForSlaveTimeout = DEFAULT_START_TIMEOUT;
166    private boolean passiveSlave;
167    private String brokerName = DEFAULT_BROKER_NAME;
168    private File dataDirectoryFile;
169    private File tmpDataDirectory;
170    private Broker broker;
171    private BrokerView adminView;
172    private ManagementContext managementContext;
173    private ObjectName brokerObjectName;
174    private TaskRunnerFactory taskRunnerFactory;
175    private TaskRunnerFactory persistenceTaskRunnerFactory;
176    private SystemUsage systemUsage;
177    private SystemUsage producerSystemUsage;
178    private SystemUsage consumerSystemUsaage;
179    private PersistenceAdapter persistenceAdapter;
180    private PersistenceAdapterFactory persistenceFactory;
181    protected DestinationFactory destinationFactory;
182    private MessageAuthorizationPolicy messageAuthorizationPolicy;
183    private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
184    private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
185    private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
186    private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
187    private final List<Service> services = new ArrayList<Service>();
188    private transient Thread shutdownHook;
189    private String[] transportConnectorURIs;
190    private String[] networkConnectorURIs;
191    private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
192    // to other jms messaging systems
193    private boolean deleteAllMessagesOnStartup;
194    private boolean advisorySupport = true;
195    private URI vmConnectorURI;
196    private String defaultSocketURIString;
197    private PolicyMap destinationPolicy;
198    private final AtomicBoolean started = new AtomicBoolean(false);
199    private final AtomicBoolean stopped = new AtomicBoolean(false);
200    private final AtomicBoolean stopping = new AtomicBoolean(false);
201    private BrokerPlugin[] plugins;
202    private boolean keepDurableSubsActive = true;
203    private boolean useVirtualTopics = true;
204    private boolean useMirroredQueues = false;
205    private boolean useTempMirroredQueues = true;
206    /**
207     * Whether or not virtual destination subscriptions should cause network demand
208     */
209    private boolean useVirtualDestSubs = false;
210    /**
211     * Whether or no the creation of destinations that match virtual destinations
212     * should cause network demand
213     */
214    private boolean useVirtualDestSubsOnCreation = false;
215    private BrokerId brokerId;
216    private volatile DestinationInterceptor[] destinationInterceptors;
217    private ActiveMQDestination[] destinations;
218    private PListStore tempDataStore;
219    private int persistenceThreadPriority = Thread.MAX_PRIORITY;
220    private boolean useLocalHostBrokerName;
221    private final CountDownLatch stoppedLatch = new CountDownLatch(1);
222    private final CountDownLatch startedLatch = new CountDownLatch(1);
223    private Broker regionBroker;
224    private int producerSystemUsagePortion = 60;
225    private int consumerSystemUsagePortion = 40;
226    private boolean splitSystemUsageForProducersConsumers;
227    private boolean monitorConnectionSplits = false;
228    private int taskRunnerPriority = Thread.NORM_PRIORITY;
229    private boolean dedicatedTaskRunner;
230    private boolean cacheTempDestinations = false;// useful for failover
231    private int timeBeforePurgeTempDestinations = 5000;
232    private final List<Runnable> shutdownHooks = new ArrayList<Runnable>();
233    private boolean systemExitOnShutdown;
234    private int systemExitOnShutdownExitCode;
235    private SslContext sslContext;
236    private boolean forceStart = false;
237    private IOExceptionHandler ioExceptionHandler;
238    private boolean schedulerSupport = false;
239    private File schedulerDirectoryFile;
240    private Scheduler scheduler;
241    private ThreadPoolExecutor executor;
242    private int schedulePeriodForDestinationPurge= 0;
243    private int maxPurgedDestinationsPerSweep = 0;
244    private int schedulePeriodForDiskUsageCheck = 0;
245    private int diskUsageCheckRegrowThreshold = -1;
246    private boolean adjustUsageLimits = true;
247    private BrokerContext brokerContext;
248    private boolean networkConnectorStartAsync = false;
249    private boolean allowTempAutoCreationOnSend;
250    private JobSchedulerStore jobSchedulerStore;
251    private final AtomicLong totalConnections = new AtomicLong();
252    private final AtomicInteger currentConnections = new AtomicInteger();
253
254    private long offlineDurableSubscriberTimeout = -1;
255    private long offlineDurableSubscriberTaskSchedule = 300000;
256    private DestinationFilter virtualConsumerDestinationFilter;
257
258    private final Object persistenceAdapterLock = new Object();
259    private Throwable startException = null;
260    private boolean startAsync = false;
261    private Date startDate;
262    private boolean slave = true;
263
264    private boolean restartAllowed = true;
265    private boolean restartRequested = false;
266    private boolean rejectDurableConsumers = false;
267    private boolean rollbackOnlyOnAsyncException = true;
268
269    private int storeOpenWireVersion = OpenWireFormat.DEFAULT_VERSION;
270
271    static {
272
273        try {
274            ClassLoader loader = BrokerService.class.getClassLoader();
275            Class<?> clazz = loader.loadClass("org.bouncycastle.jce.provider.BouncyCastleProvider");
276            Provider bouncycastle = (Provider) clazz.newInstance();
277            Security.insertProviderAt(bouncycastle, 2);
278            LOG.info("Loaded the Bouncy Castle security provider.");
279        } catch(Throwable e) {
280            // No BouncyCastle found so we use the default Java Security Provider
281        }
282
283        String localHostName = "localhost";
284        try {
285            localHostName =  InetAddressUtil.getLocalHostName();
286        } catch (UnknownHostException e) {
287            LOG.error("Failed to resolve localhost");
288        }
289        LOCAL_HOST_NAME = localHostName;
290
291        String version = null;
292        try(InputStream in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) {
293            if (in != null) {
294                try(InputStreamReader isr = new InputStreamReader(in);
295                    BufferedReader reader = new BufferedReader(isr)) {
296                    version = reader.readLine();
297                }
298            }
299        } catch (IOException ie) {
300            LOG.warn("Error reading broker version ", ie);
301        }
302        BROKER_VERSION = version;
303    }
304
305    @Override
306    public String toString() {
307        return "BrokerService[" + getBrokerName() + "]";
308    }
309
310    private String getBrokerVersion() {
311        String version = ActiveMQConnectionMetaData.PROVIDER_VERSION;
312        if (version == null) {
313            version = BROKER_VERSION;
314        }
315
316        return version;
317    }
318
319    /**
320     * Adds a new transport connector for the given bind address
321     *
322     * @return the newly created and added transport connector
323     * @throws Exception
324     */
325    public TransportConnector addConnector(String bindAddress) throws Exception {
326        return addConnector(new URI(bindAddress));
327    }
328
329    /**
330     * Adds a new transport connector for the given bind address
331     *
332     * @return the newly created and added transport connector
333     * @throws Exception
334     */
335    public TransportConnector addConnector(URI bindAddress) throws Exception {
336        return addConnector(createTransportConnector(bindAddress));
337    }
338
339    /**
340     * Adds a new transport connector for the given TransportServer transport
341     *
342     * @return the newly created and added transport connector
343     * @throws Exception
344     */
345    public TransportConnector addConnector(TransportServer transport) throws Exception {
346        return addConnector(new TransportConnector(transport));
347    }
348
349    /**
350     * Adds a new transport connector
351     *
352     * @return the transport connector
353     * @throws Exception
354     */
355    public TransportConnector addConnector(TransportConnector connector) throws Exception {
356        transportConnectors.add(connector);
357        return connector;
358    }
359
360    /**
361     * Stops and removes a transport connector from the broker.
362     *
363     * @param connector
364     * @return true if the connector has been previously added to the broker
365     * @throws Exception
366     */
367    public boolean removeConnector(TransportConnector connector) throws Exception {
368        boolean rc = transportConnectors.remove(connector);
369        if (rc) {
370            unregisterConnectorMBean(connector);
371        }
372        return rc;
373    }
374
375    /**
376     * Adds a new network connector using the given discovery address
377     *
378     * @return the newly created and added network connector
379     * @throws Exception
380     */
381    public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
382        return addNetworkConnector(new URI(discoveryAddress));
383    }
384
385    /**
386     * Adds a new proxy connector using the given bind address
387     *
388     * @return the newly created and added network connector
389     * @throws Exception
390     */
391    public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
392        return addProxyConnector(new URI(bindAddress));
393    }
394
395    /**
396     * Adds a new network connector using the given discovery address
397     *
398     * @return the newly created and added network connector
399     * @throws Exception
400     */
401    public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
402        NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
403        return addNetworkConnector(connector);
404    }
405
406    /**
407     * Adds a new proxy connector using the given bind address
408     *
409     * @return the newly created and added network connector
410     * @throws Exception
411     */
412    public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
413        ProxyConnector connector = new ProxyConnector();
414        connector.setBind(bindAddress);
415        connector.setRemote(new URI("fanout:multicast://default"));
416        return addProxyConnector(connector);
417    }
418
419    /**
420     * Adds a new network connector to connect this broker to a federated
421     * network
422     */
423    public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
424        connector.setBrokerService(this);
425        connector.setLocalUri(getVmConnectorURI());
426        // Set a connection filter so that the connector does not establish loop
427        // back connections.
428        connector.setConnectionFilter(new ConnectionFilter() {
429            @Override
430            public boolean connectTo(URI location) {
431                List<TransportConnector> transportConnectors = getTransportConnectors();
432                for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
433                    try {
434                        TransportConnector tc = iter.next();
435                        if (location.equals(tc.getConnectUri())) {
436                            return false;
437                        }
438                    } catch (Throwable e) {
439                    }
440                }
441                return true;
442            }
443        });
444        networkConnectors.add(connector);
445        return connector;
446    }
447
448    /**
449     * Removes the given network connector without stopping it. The caller
450     * should call {@link NetworkConnector#stop()} to close the connector
451     */
452    public boolean removeNetworkConnector(NetworkConnector connector) {
453        boolean answer = networkConnectors.remove(connector);
454        if (answer) {
455            unregisterNetworkConnectorMBean(connector);
456        }
457        return answer;
458    }
459
460    public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
461        URI uri = getVmConnectorURI();
462        connector.setLocalUri(uri);
463        proxyConnectors.add(connector);
464        if (isUseJmx()) {
465            registerProxyConnectorMBean(connector);
466        }
467        return connector;
468    }
469
470    public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
471        connector.setBrokerService(this);
472        jmsConnectors.add(connector);
473        if (isUseJmx()) {
474            registerJmsConnectorMBean(connector);
475        }
476        return connector;
477    }
478
479    public JmsConnector removeJmsConnector(JmsConnector connector) {
480        if (jmsConnectors.remove(connector)) {
481            return connector;
482        }
483        return null;
484    }
485
486    public void masterFailed() {
487        if (shutdownOnMasterFailure) {
488            LOG.error("The Master has failed ... shutting down");
489            try {
490                stop();
491            } catch (Exception e) {
492                LOG.error("Failed to stop for master failure", e);
493            }
494        } else {
495            LOG.warn("Master Failed - starting all connectors");
496            try {
497                startAllConnectors();
498                broker.nowMasterBroker();
499            } catch (Exception e) {
500                LOG.error("Failed to startAllConnectors", e);
501            }
502        }
503    }
504
505    public String getUptime() {
506        long delta = getUptimeMillis();
507
508        if (delta == 0) {
509            return "not started";
510        }
511
512        return TimeUtils.printDuration(delta);
513    }
514
515    public long getUptimeMillis() {
516        if (startDate == null) {
517            return 0;
518        }
519
520        return new Date().getTime() - startDate.getTime();
521    }
522
523    public boolean isStarted() {
524        return started.get() && startedLatch.getCount() == 0;
525    }
526
527    /**
528     * Forces a start of the broker.
529     * By default a BrokerService instance that was
530     * previously stopped using BrokerService.stop() cannot be restarted
531     * using BrokerService.start().
532     * This method enforces a restart.
533     * It is not recommended to force a restart of the broker and will not work
534     * for most but some very trivial broker configurations.
535     * For restarting a broker instance we recommend to first call stop() on
536     * the old instance and then recreate a new BrokerService instance.
537     *
538     * @param force - if true enforces a restart.
539     * @throws Exception
540     */
541    public void start(boolean force) throws Exception {
542        forceStart = force;
543        stopped.set(false);
544        started.set(false);
545        start();
546    }
547
548    // Service interface
549    // -------------------------------------------------------------------------
550
551    protected boolean shouldAutostart() {
552        return true;
553    }
554
555    /**
556     * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
557     *
558     * delegates to autoStart, done to prevent backwards incompatible signature change
559     */
560    @PostConstruct
561    private void postConstruct() {
562        try {
563            autoStart();
564        } catch (Exception ex) {
565            throw new RuntimeException(ex);
566        }
567    }
568
569    /**
570     *
571     * @throws Exception
572     * @org. apache.xbean.InitMethod
573     */
574    public void autoStart() throws Exception {
575        if(shouldAutostart()) {
576            start();
577        }
578    }
579
580    @Override
581    public void start() throws Exception {
582        if (stopped.get() || !started.compareAndSet(false, true)) {
583            // lets just ignore redundant start() calls
584            // as its way too easy to not be completely sure if start() has been
585            // called or not with the gazillion of different configuration
586            // mechanisms
587            // throw new IllegalStateException("Already started.");
588            return;
589        }
590
591        setStartException(null);
592        stopping.set(false);
593        startDate = new Date();
594        MDC.put("activemq.broker", brokerName);
595
596        try {
597            checkMemorySystemUsageLimits();
598            if (systemExitOnShutdown && useShutdownHook) {
599                throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
600            }
601            processHelperProperties();
602            if (isUseJmx()) {
603                // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and
604                // we cannot cleanup clear that during shutdown of the broker.
605                MDC.remove("activemq.broker");
606                try {
607                    startManagementContext();
608                    for (NetworkConnector connector : getNetworkConnectors()) {
609                        registerNetworkConnectorMBean(connector);
610                    }
611                } finally {
612                    MDC.put("activemq.broker", brokerName);
613                }
614            }
615
616            // in jvm master slave, lets not publish over existing broker till we get the lock
617            final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance();
618            if (brokerRegistry.lookup(getBrokerName()) == null) {
619                brokerRegistry.bind(getBrokerName(), BrokerService.this);
620            }
621            startPersistenceAdapter(startAsync);
622            startBroker(startAsync);
623            brokerRegistry.bind(getBrokerName(), BrokerService.this);
624        } catch (Exception e) {
625            LOG.error("Failed to start Apache ActiveMQ (" + getBrokerName() + ", " + brokerId + ")", e);
626            try {
627                if (!stopped.get()) {
628                    stop();
629                }
630            } catch (Exception ex) {
631                LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex);
632            }
633            throw e;
634        } finally {
635            MDC.remove("activemq.broker");
636        }
637    }
638
639    private void startPersistenceAdapter(boolean async) throws Exception {
640        if (async) {
641            new Thread("Persistence Adapter Starting Thread") {
642                @Override
643                public void run() {
644                    try {
645                        doStartPersistenceAdapter();
646                    } catch (Throwable e) {
647                        setStartException(e);
648                    } finally {
649                        synchronized (persistenceAdapterLock) {
650                            persistenceAdapterLock.notifyAll();
651                        }
652                    }
653                }
654            }.start();
655        } else {
656            doStartPersistenceAdapter();
657        }
658    }
659
660    private void doStartPersistenceAdapter() throws Exception {
661        PersistenceAdapter persistenceAdapterToStart = getPersistenceAdapter();
662        if (persistenceAdapterToStart == null) {
663            checkStartException();
664            throw new ConfigurationException("Cannot start null persistence adapter");
665        }
666        persistenceAdapterToStart.setUsageManager(getProducerSystemUsage());
667        persistenceAdapterToStart.setBrokerName(getBrokerName());
668        LOG.info("Using Persistence Adapter: {}", persistenceAdapterToStart);
669        if (deleteAllMessagesOnStartup) {
670            deleteAllMessages();
671        }
672        persistenceAdapterToStart.start();
673
674        getTempDataStore();
675        if (tempDataStore != null) {
676            try {
677                // start after we have the store lock
678                tempDataStore.start();
679            } catch (Exception e) {
680                RuntimeException exception = new RuntimeException(
681                        "Failed to start temp data store: " + tempDataStore, e);
682                LOG.error(exception.getLocalizedMessage(), e);
683                throw exception;
684            }
685        }
686
687        getJobSchedulerStore();
688        if (jobSchedulerStore != null) {
689            try {
690                jobSchedulerStore.start();
691            } catch (Exception e) {
692                RuntimeException exception = new RuntimeException(
693                        "Failed to start job scheduler store: " + jobSchedulerStore, e);
694                LOG.error(exception.getLocalizedMessage(), e);
695                throw exception;
696            }
697        }
698    }
699
700    private void startBroker(boolean async) throws Exception {
701        if (async) {
702            new Thread("Broker Starting Thread") {
703                @Override
704                public void run() {
705                    try {
706                        synchronized (persistenceAdapterLock) {
707                            persistenceAdapterLock.wait();
708                        }
709                        doStartBroker();
710                    } catch (Throwable t) {
711                        setStartException(t);
712                    }
713                }
714            }.start();
715        } else {
716            doStartBroker();
717        }
718    }
719
720    private void doStartBroker() throws Exception {
721        checkStartException();
722        startDestinations();
723        addShutdownHook();
724
725        broker = getBroker();
726        brokerId = broker.getBrokerId();
727
728        // need to log this after creating the broker so we have its id and name
729        LOG.info("Apache ActiveMQ {} ({}, {}) is starting", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId });
730        broker.start();
731
732        if (isUseJmx()) {
733            if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
734                // try to restart management context
735                // typical for slaves that use the same ports as master
736                managementContext.stop();
737                startManagementContext();
738            }
739            ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
740            managedBroker.setContextBroker(broker);
741            adminView.setBroker(managedBroker);
742        }
743
744        if (ioExceptionHandler == null) {
745            setIoExceptionHandler(new DefaultIOExceptionHandler());
746        }
747
748        if (isUseJmx() && Log4JConfigView.isLog4JAvailable()) {
749            ObjectName objectName = BrokerMBeanSupport.createLog4JConfigViewName(getBrokerObjectName().toString());
750            Log4JConfigView log4jConfigView = new Log4JConfigView();
751            AnnotatedMBean.registerMBean(getManagementContext(), log4jConfigView, objectName);
752        }
753
754        startAllConnectors();
755
756        LOG.info("Apache ActiveMQ {} ({}, {}) started", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId});
757        LOG.info("For help or more information please see: http://activemq.apache.org");
758
759        getBroker().brokerServiceStarted();
760        checkStoreSystemUsageLimits();
761        startedLatch.countDown();
762        getBroker().nowMasterBroker();
763    }
764
765    /**
766     * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
767     *
768     * delegates to stop, done to prevent backwards incompatible signature change
769     */
770    @PreDestroy
771    private void preDestroy () {
772        try {
773            stop();
774        } catch (Exception ex) {
775            throw new RuntimeException();
776        }
777    }
778
779    /**
780     *
781     * @throws Exception
782     * @org.apache .xbean.DestroyMethod
783     */
784    @Override
785    public void stop() throws Exception {
786        if (!stopping.compareAndSet(false, true)) {
787            LOG.trace("Broker already stopping/stopped");
788            return;
789        }
790
791        setStartException(new BrokerStoppedException("Stop invoked"));
792        MDC.put("activemq.broker", brokerName);
793
794        if (systemExitOnShutdown) {
795            new Thread() {
796                @Override
797                public void run() {
798                    System.exit(systemExitOnShutdownExitCode);
799                }
800            }.start();
801        }
802
803        LOG.info("Apache ActiveMQ {} ({}, {}) is shutting down", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId} );
804
805        removeShutdownHook();
806        if (this.scheduler != null) {
807            this.scheduler.stop();
808            this.scheduler = null;
809        }
810        ServiceStopper stopper = new ServiceStopper();
811        if (services != null) {
812            for (Service service : services) {
813                stopper.stop(service);
814            }
815        }
816        stopAllConnectors(stopper);
817        this.slave = true;
818        // remove any VMTransports connected
819        // this has to be done after services are stopped,
820        // to avoid timing issue with discovery (spinning up a new instance)
821        BrokerRegistry.getInstance().unbind(getBrokerName());
822        VMTransportFactory.stopped(getBrokerName());
823        if (broker != null) {
824            stopper.stop(broker);
825            broker = null;
826        }
827
828        if (jobSchedulerStore != null) {
829            jobSchedulerStore.stop();
830            jobSchedulerStore = null;
831        }
832        if (tempDataStore != null) {
833            tempDataStore.stop();
834            tempDataStore = null;
835        }
836        try {
837            stopper.stop(getPersistenceAdapter());
838            persistenceAdapter = null;
839            if (isUseJmx()) {
840                stopper.stop(managementContext);
841                managementContext = null;
842            }
843            // Clear SelectorParser cache to free memory
844            SelectorParser.clearCache();
845        } finally {
846            started.set(false);
847            stopped.set(true);
848            stoppedLatch.countDown();
849        }
850
851        if (this.taskRunnerFactory != null) {
852            this.taskRunnerFactory.shutdown();
853            this.taskRunnerFactory = null;
854        }
855        if (this.executor != null) {
856            ThreadPoolUtils.shutdownNow(executor);
857            this.executor = null;
858        }
859
860        this.destinationInterceptors = null;
861        this.destinationFactory = null;
862
863        if (startDate != null) {
864            LOG.info("Apache ActiveMQ {} ({}, {}) uptime {}", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId, getUptime()});
865        }
866        LOG.info("Apache ActiveMQ {} ({}, {}) is shutdown", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId});
867
868        synchronized (shutdownHooks) {
869            for (Runnable hook : shutdownHooks) {
870                try {
871                    hook.run();
872                } catch (Throwable e) {
873                    stopper.onException(hook, e);
874                }
875            }
876        }
877
878        MDC.remove("activemq.broker");
879
880        // and clear start date
881        startDate = null;
882
883        stopper.throwFirstException();
884    }
885
886    public boolean checkQueueSize(String queueName) {
887        long count = 0;
888        long queueSize = 0;
889        Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
890        for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
891            if (entry.getKey().isQueue()) {
892                if (entry.getValue().getName().matches(queueName)) {
893                    queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
894                    count += queueSize;
895                    if (queueSize > 0) {
896                        LOG.info("Queue has pending message: {} queueSize is: {}", entry.getValue().getName(), queueSize);
897                    }
898                }
899            }
900        }
901        return count == 0;
902    }
903
904    /**
905     * This method (both connectorName and queueName are using regex to match)
906     * 1. stop the connector (supposed the user input the connector which the
907     * clients connect to) 2. to check whether there is any pending message on
908     * the queues defined by queueName 3. supposedly, after stop the connector,
909     * client should failover to other broker and pending messages should be
910     * forwarded. if no pending messages, the method finally call stop to stop
911     * the broker.
912     *
913     * @param connectorName
914     * @param queueName
915     * @param timeout
916     * @param pollInterval
917     * @throws Exception
918     */
919    public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception {
920        if (isUseJmx()) {
921            if (connectorName == null || queueName == null || timeout <= 0) {
922                throw new Exception(
923                        "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully.");
924            }
925            if (pollInterval <= 0) {
926                pollInterval = 30;
927            }
928            LOG.info("Stop gracefully with connectorName: {} queueName: {} timeout: {} pollInterval: {}", new Object[]{
929                    connectorName, queueName, timeout, pollInterval
930            });
931            TransportConnector connector;
932            for (int i = 0; i < transportConnectors.size(); i++) {
933                connector = transportConnectors.get(i);
934                if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) {
935                    connector.stop();
936                }
937            }
938            long start = System.currentTimeMillis();
939            while (System.currentTimeMillis() - start < timeout * 1000) {
940                // check quesize until it gets zero
941                if (checkQueueSize(queueName)) {
942                    stop();
943                    break;
944                } else {
945                    Thread.sleep(pollInterval * 1000);
946                }
947            }
948            if (stopped.get()) {
949                LOG.info("Successfully stop the broker.");
950            } else {
951                LOG.info("There is still pending message on the queue. Please check and stop the broker manually.");
952            }
953        }
954    }
955
956    /**
957     * A helper method to block the caller thread until the broker has been
958     * stopped
959     */
960    public void waitUntilStopped() {
961        while (isStarted() && !stopped.get()) {
962            try {
963                stoppedLatch.await();
964            } catch (InterruptedException e) {
965                // ignore
966            }
967        }
968    }
969
970    public boolean isStopped() {
971        return stopped.get();
972    }
973
974    /**
975     * A helper method to block the caller thread until the broker has fully started
976     * @return boolean true if wait succeeded false if broker was not started or was stopped
977     */
978    public boolean waitUntilStarted() {
979        return waitUntilStarted(DEFAULT_START_TIMEOUT);
980    }
981
982    /**
983     * A helper method to block the caller thread until the broker has fully started
984     *
985     * @param timeout
986     *        the amount of time to wait before giving up and returning false.
987     *
988     * @return boolean true if wait succeeded false if broker was not started or was stopped
989     */
990    public boolean waitUntilStarted(long timeout) {
991        boolean waitSucceeded = isStarted();
992        long expiration = Math.max(0, timeout + System.currentTimeMillis());
993        while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) {
994            try {
995                if (getStartException() != null) {
996                    return waitSucceeded;
997                }
998                waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
999            } catch (InterruptedException ignore) {
1000            }
1001        }
1002        return waitSucceeded;
1003    }
1004
1005    // Properties
1006    // -------------------------------------------------------------------------
1007    /**
1008     * Returns the message broker
1009     */
1010    public Broker getBroker() throws Exception {
1011        if (broker == null) {
1012            checkStartException();
1013            broker = createBroker();
1014        }
1015        return broker;
1016    }
1017
1018    /**
1019     * Returns the administration view of the broker; used to create and destroy
1020     * resources such as queues and topics. Note this method returns null if JMX
1021     * is disabled.
1022     */
1023    public BrokerView getAdminView() throws Exception {
1024        if (adminView == null) {
1025            // force lazy creation
1026            getBroker();
1027        }
1028        return adminView;
1029    }
1030
1031    public void setAdminView(BrokerView adminView) {
1032        this.adminView = adminView;
1033    }
1034
1035    public String getBrokerName() {
1036        return brokerName;
1037    }
1038
1039    /**
1040     * Sets the name of this broker; which must be unique in the network
1041     *
1042     * @param brokerName
1043     */
1044    private static final String brokerNameReplacedCharsRegExp = "[^a-zA-Z0-9\\.\\_\\-\\:]";
1045    public void setBrokerName(String brokerName) {
1046        if (brokerName == null) {
1047            throw new NullPointerException("The broker name cannot be null");
1048        }
1049        String str = brokerName.replaceAll(brokerNameReplacedCharsRegExp, "_");
1050        if (!str.equals(brokerName)) {
1051            LOG.error("Broker Name: {} contained illegal characters matching regExp: {} - replaced with {}", brokerName, brokerNameReplacedCharsRegExp, str);
1052        }
1053        this.brokerName = str.trim();
1054    }
1055
1056    public PersistenceAdapterFactory getPersistenceFactory() {
1057        return persistenceFactory;
1058    }
1059
1060    public File getDataDirectoryFile() {
1061        if (dataDirectoryFile == null) {
1062            dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
1063        }
1064        return dataDirectoryFile;
1065    }
1066
1067    public File getBrokerDataDirectory() {
1068        String brokerDir = getBrokerName();
1069        return new File(getDataDirectoryFile(), brokerDir);
1070    }
1071
1072    /**
1073     * Sets the directory in which the data files will be stored by default for
1074     * the JDBC and Journal persistence adaptors.
1075     *
1076     * @param dataDirectory
1077     *            the directory to store data files
1078     */
1079    public void setDataDirectory(String dataDirectory) {
1080        setDataDirectoryFile(new File(dataDirectory));
1081    }
1082
1083    /**
1084     * Sets the directory in which the data files will be stored by default for
1085     * the JDBC and Journal persistence adaptors.
1086     *
1087     * @param dataDirectoryFile
1088     *            the directory to store data files
1089     */
1090    public void setDataDirectoryFile(File dataDirectoryFile) {
1091        this.dataDirectoryFile = dataDirectoryFile;
1092    }
1093
1094    /**
1095     * @return the tmpDataDirectory
1096     */
1097    public File getTmpDataDirectory() {
1098        if (tmpDataDirectory == null) {
1099            tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
1100        }
1101        return tmpDataDirectory;
1102    }
1103
1104    /**
1105     * @param tmpDataDirectory
1106     *            the tmpDataDirectory to set
1107     */
1108    public void setTmpDataDirectory(File tmpDataDirectory) {
1109        this.tmpDataDirectory = tmpDataDirectory;
1110    }
1111
1112    public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
1113        this.persistenceFactory = persistenceFactory;
1114    }
1115
1116    public void setDestinationFactory(DestinationFactory destinationFactory) {
1117        this.destinationFactory = destinationFactory;
1118    }
1119
1120    public boolean isPersistent() {
1121        return persistent;
1122    }
1123
1124    /**
1125     * Sets whether or not persistence is enabled or disabled.
1126     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1127     */
1128    public void setPersistent(boolean persistent) {
1129        this.persistent = persistent;
1130    }
1131
1132    public boolean isPopulateJMSXUserID() {
1133        return populateJMSXUserID;
1134    }
1135
1136    /**
1137     * Sets whether or not the broker should populate the JMSXUserID header.
1138     */
1139    public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
1140        this.populateJMSXUserID = populateJMSXUserID;
1141    }
1142
1143    public SystemUsage getSystemUsage() {
1144        try {
1145            if (systemUsage == null) {
1146
1147                systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore());
1148                systemUsage.setExecutor(getExecutor());
1149                systemUsage.getMemoryUsage().setLimit(1024L * 1024 * 1024 * 1); // 1 GB
1150                systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
1151                systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB
1152                systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
1153                addService(this.systemUsage);
1154            }
1155            return systemUsage;
1156        } catch (IOException e) {
1157            LOG.error("Cannot create SystemUsage", e);
1158            throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage(), e);
1159        }
1160    }
1161
1162    public void setSystemUsage(SystemUsage memoryManager) {
1163        if (this.systemUsage != null) {
1164            removeService(this.systemUsage);
1165        }
1166        this.systemUsage = memoryManager;
1167        if (this.systemUsage.getExecutor()==null) {
1168            this.systemUsage.setExecutor(getExecutor());
1169        }
1170        addService(this.systemUsage);
1171    }
1172
1173    /**
1174     * @return the consumerUsageManager
1175     * @throws IOException
1176     */
1177    public SystemUsage getConsumerSystemUsage() throws IOException {
1178        if (this.consumerSystemUsaage == null) {
1179            if (splitSystemUsageForProducersConsumers) {
1180                this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
1181                float portion = consumerSystemUsagePortion / 100f;
1182                this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
1183                addService(this.consumerSystemUsaage);
1184            } else {
1185                consumerSystemUsaage = getSystemUsage();
1186            }
1187        }
1188        return this.consumerSystemUsaage;
1189    }
1190
1191    /**
1192     * @param consumerSystemUsaage
1193     *            the storeSystemUsage to set
1194     */
1195    public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
1196        if (this.consumerSystemUsaage != null) {
1197            removeService(this.consumerSystemUsaage);
1198        }
1199        this.consumerSystemUsaage = consumerSystemUsaage;
1200        addService(this.consumerSystemUsaage);
1201    }
1202
1203    /**
1204     * @return the producerUsageManager
1205     * @throws IOException
1206     */
1207    public SystemUsage getProducerSystemUsage() throws IOException {
1208        if (producerSystemUsage == null) {
1209            if (splitSystemUsageForProducersConsumers) {
1210                producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
1211                float portion = producerSystemUsagePortion / 100f;
1212                producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
1213                addService(producerSystemUsage);
1214            } else {
1215                producerSystemUsage = getSystemUsage();
1216            }
1217        }
1218        return producerSystemUsage;
1219    }
1220
1221    /**
1222     * @param producerUsageManager
1223     *            the producerUsageManager to set
1224     */
1225    public void setProducerSystemUsage(SystemUsage producerUsageManager) {
1226        if (this.producerSystemUsage != null) {
1227            removeService(this.producerSystemUsage);
1228        }
1229        this.producerSystemUsage = producerUsageManager;
1230        addService(this.producerSystemUsage);
1231    }
1232
1233    public synchronized PersistenceAdapter getPersistenceAdapter() throws IOException {
1234        if (persistenceAdapter == null && !hasStartException()) {
1235            persistenceAdapter = createPersistenceAdapter();
1236            configureService(persistenceAdapter);
1237            this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1238        }
1239        return persistenceAdapter;
1240    }
1241
1242    /**
1243     * Sets the persistence adaptor implementation to use for this broker
1244     *
1245     * @throws IOException
1246     */
1247    public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
1248        if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) {
1249            LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: {}", persistenceAdapter);
1250            return;
1251        }
1252        this.persistenceAdapter = persistenceAdapter;
1253        configureService(this.persistenceAdapter);
1254        this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1255    }
1256
1257    public TaskRunnerFactory getTaskRunnerFactory() {
1258        if (this.taskRunnerFactory == null) {
1259            this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
1260                    isDedicatedTaskRunner());
1261            this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader());
1262        }
1263        return this.taskRunnerFactory;
1264    }
1265
1266    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
1267        this.taskRunnerFactory = taskRunnerFactory;
1268    }
1269
1270    public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
1271        if (taskRunnerFactory == null) {
1272            persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
1273                    true, 1000, isDedicatedTaskRunner());
1274        }
1275        return persistenceTaskRunnerFactory;
1276    }
1277
1278    public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
1279        this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
1280    }
1281
1282    public boolean isUseJmx() {
1283        return useJmx;
1284    }
1285
1286    public boolean isEnableStatistics() {
1287        return enableStatistics;
1288    }
1289
1290    /**
1291     * Sets whether or not the Broker's services enable statistics or not.
1292     */
1293    public void setEnableStatistics(boolean enableStatistics) {
1294        this.enableStatistics = enableStatistics;
1295    }
1296
1297    /**
1298     * Sets whether or not the Broker's services should be exposed into JMX or
1299     * not.
1300     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1301     */
1302    public void setUseJmx(boolean useJmx) {
1303        this.useJmx = useJmx;
1304    }
1305
1306    public ObjectName getBrokerObjectName() throws MalformedObjectNameException {
1307        if (brokerObjectName == null) {
1308            brokerObjectName = createBrokerObjectName();
1309        }
1310        return brokerObjectName;
1311    }
1312
1313    /**
1314     * Sets the JMX ObjectName for this broker
1315     */
1316    public void setBrokerObjectName(ObjectName brokerObjectName) {
1317        this.brokerObjectName = brokerObjectName;
1318    }
1319
1320    public ManagementContext getManagementContext() {
1321        if (managementContext == null) {
1322            checkStartException();
1323            managementContext = new ManagementContext();
1324        }
1325        return managementContext;
1326    }
1327
1328    synchronized private void checkStartException() {
1329        if (startException != null) {
1330            throw new BrokerStoppedException(startException);
1331        }
1332    }
1333
1334    synchronized private boolean hasStartException() {
1335        return startException != null;
1336    }
1337
1338    synchronized private void setStartException(Throwable t) {
1339        startException = t;
1340    }
1341
1342    public void setManagementContext(ManagementContext managementContext) {
1343        this.managementContext = managementContext;
1344    }
1345
1346    public NetworkConnector getNetworkConnectorByName(String connectorName) {
1347        for (NetworkConnector connector : networkConnectors) {
1348            if (connector.getName().equals(connectorName)) {
1349                return connector;
1350            }
1351        }
1352        return null;
1353    }
1354
1355    public String[] getNetworkConnectorURIs() {
1356        return networkConnectorURIs;
1357    }
1358
1359    public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
1360        this.networkConnectorURIs = networkConnectorURIs;
1361    }
1362
1363    public TransportConnector getConnectorByName(String connectorName) {
1364        for (TransportConnector connector : transportConnectors) {
1365            if (connector.getName().equals(connectorName)) {
1366                return connector;
1367            }
1368        }
1369        return null;
1370    }
1371
1372    public Map<String, String> getTransportConnectorURIsAsMap() {
1373        Map<String, String> answer = new HashMap<String, String>();
1374        for (TransportConnector connector : transportConnectors) {
1375            try {
1376                URI uri = connector.getConnectUri();
1377                if (uri != null) {
1378                    String scheme = uri.getScheme();
1379                    if (scheme != null) {
1380                        answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString());
1381                    }
1382                }
1383            } catch (Exception e) {
1384                LOG.debug("Failed to read URI to build transportURIsAsMap", e);
1385            }
1386        }
1387        return answer;
1388    }
1389
1390    public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){
1391        ProducerBrokerExchange result = null;
1392
1393        for (TransportConnector connector : transportConnectors) {
1394            for (TransportConnection tc: connector.getConnections()){
1395                result = tc.getProducerBrokerExchangeIfExists(producerInfo);
1396                if (result !=null){
1397                    return result;
1398                }
1399            }
1400        }
1401        return result;
1402    }
1403
1404    public String[] getTransportConnectorURIs() {
1405        return transportConnectorURIs;
1406    }
1407
1408    public void setTransportConnectorURIs(String[] transportConnectorURIs) {
1409        this.transportConnectorURIs = transportConnectorURIs;
1410    }
1411
1412    /**
1413     * @return Returns the jmsBridgeConnectors.
1414     */
1415    public JmsConnector[] getJmsBridgeConnectors() {
1416        return jmsBridgeConnectors;
1417    }
1418
1419    /**
1420     * @param jmsConnectors
1421     *            The jmsBridgeConnectors to set.
1422     */
1423    public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
1424        this.jmsBridgeConnectors = jmsConnectors;
1425    }
1426
1427    public Service[] getServices() {
1428        return services.toArray(new Service[0]);
1429    }
1430
1431    /**
1432     * Sets the services associated with this broker.
1433     */
1434    public void setServices(Service[] services) {
1435        this.services.clear();
1436        if (services != null) {
1437            for (int i = 0; i < services.length; i++) {
1438                this.services.add(services[i]);
1439            }
1440        }
1441    }
1442
1443    /**
1444     * Adds a new service so that it will be started as part of the broker
1445     * lifecycle
1446     */
1447    public void addService(Service service) {
1448        services.add(service);
1449    }
1450
1451    public void removeService(Service service) {
1452        services.remove(service);
1453    }
1454
1455    public boolean isUseLoggingForShutdownErrors() {
1456        return useLoggingForShutdownErrors;
1457    }
1458
1459    /**
1460     * Sets whether or not we should use commons-logging when reporting errors
1461     * when shutting down the broker
1462     */
1463    public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
1464        this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
1465    }
1466
1467    public boolean isUseShutdownHook() {
1468        return useShutdownHook;
1469    }
1470
1471    /**
1472     * Sets whether or not we should use a shutdown handler to close down the
1473     * broker cleanly if the JVM is terminated. It is recommended you leave this
1474     * enabled.
1475     */
1476    public void setUseShutdownHook(boolean useShutdownHook) {
1477        this.useShutdownHook = useShutdownHook;
1478    }
1479
1480    public boolean isAdvisorySupport() {
1481        return advisorySupport;
1482    }
1483
1484    /**
1485     * Allows the support of advisory messages to be disabled for performance
1486     * reasons.
1487     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1488     */
1489    public void setAdvisorySupport(boolean advisorySupport) {
1490        this.advisorySupport = advisorySupport;
1491    }
1492
1493    public List<TransportConnector> getTransportConnectors() {
1494        return new ArrayList<TransportConnector>(transportConnectors);
1495    }
1496
1497    /**
1498     * Sets the transport connectors which this broker will listen on for new
1499     * clients
1500     *
1501     * @org.apache.xbean.Property
1502     *                            nestedType="org.apache.activemq.broker.TransportConnector"
1503     */
1504    public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
1505        for (TransportConnector connector : transportConnectors) {
1506            addConnector(connector);
1507        }
1508    }
1509
1510    public TransportConnector getTransportConnectorByName(String name){
1511        for (TransportConnector transportConnector : transportConnectors){
1512           if (name.equals(transportConnector.getName())){
1513               return transportConnector;
1514           }
1515        }
1516        return null;
1517    }
1518
1519    public TransportConnector getTransportConnectorByScheme(String scheme){
1520        for (TransportConnector transportConnector : transportConnectors){
1521            if (scheme.equals(transportConnector.getUri().getScheme())){
1522                return transportConnector;
1523            }
1524        }
1525        return null;
1526    }
1527
1528    public List<NetworkConnector> getNetworkConnectors() {
1529        return new ArrayList<NetworkConnector>(networkConnectors);
1530    }
1531
1532    public List<ProxyConnector> getProxyConnectors() {
1533        return new ArrayList<ProxyConnector>(proxyConnectors);
1534    }
1535
1536    /**
1537     * Sets the network connectors which this broker will use to connect to
1538     * other brokers in a federated network
1539     *
1540     * @org.apache.xbean.Property
1541     *                            nestedType="org.apache.activemq.network.NetworkConnector"
1542     */
1543    public void setNetworkConnectors(List<?> networkConnectors) throws Exception {
1544        for (Object connector : networkConnectors) {
1545            addNetworkConnector((NetworkConnector) connector);
1546        }
1547    }
1548
1549    /**
1550     * Sets the network connectors which this broker will use to connect to
1551     * other brokers in a federated network
1552     */
1553    public void setProxyConnectors(List<?> proxyConnectors) throws Exception {
1554        for (Object connector : proxyConnectors) {
1555            addProxyConnector((ProxyConnector) connector);
1556        }
1557    }
1558
1559    public PolicyMap getDestinationPolicy() {
1560        return destinationPolicy;
1561    }
1562
1563    /**
1564     * Sets the destination specific policies available either for exact
1565     * destinations or for wildcard areas of destinations.
1566     */
1567    public void setDestinationPolicy(PolicyMap policyMap) {
1568        this.destinationPolicy = policyMap;
1569    }
1570
1571    public BrokerPlugin[] getPlugins() {
1572        return plugins;
1573    }
1574
1575    /**
1576     * Sets a number of broker plugins to install such as for security
1577     * authentication or authorization
1578     */
1579    public void setPlugins(BrokerPlugin[] plugins) {
1580        this.plugins = plugins;
1581    }
1582
1583    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1584        return messageAuthorizationPolicy;
1585    }
1586
1587    /**
1588     * Sets the policy used to decide if the current connection is authorized to
1589     * consume a given message
1590     */
1591    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1592        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1593    }
1594
1595    /**
1596     * Delete all messages from the persistent store
1597     *
1598     * @throws IOException
1599     */
1600    public void deleteAllMessages() throws IOException {
1601        getPersistenceAdapter().deleteAllMessages();
1602    }
1603
1604    public boolean isDeleteAllMessagesOnStartup() {
1605        return deleteAllMessagesOnStartup;
1606    }
1607
1608    /**
1609     * Sets whether or not all messages are deleted on startup - mostly only
1610     * useful for testing.
1611     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1612     */
1613    public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
1614        this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
1615    }
1616
1617    public URI getVmConnectorURI() {
1618        if (vmConnectorURI == null) {
1619            try {
1620                vmConnectorURI = new URI("vm://" + getBrokerName());
1621            } catch (URISyntaxException e) {
1622                LOG.error("Badly formed URI from {}", getBrokerName(), e);
1623            }
1624        }
1625        return vmConnectorURI;
1626    }
1627
1628    public void setVmConnectorURI(URI vmConnectorURI) {
1629        this.vmConnectorURI = vmConnectorURI;
1630    }
1631
1632    public String getDefaultSocketURIString() {
1633        if (started.get()) {
1634            if (this.defaultSocketURIString == null) {
1635                for (TransportConnector tc:this.transportConnectors) {
1636                    String result = null;
1637                    try {
1638                        result = tc.getPublishableConnectString();
1639                    } catch (Exception e) {
1640                      LOG.warn("Failed to get the ConnectURI for {}", tc, e);
1641                    }
1642                    if (result != null) {
1643                        // find first publishable uri
1644                        if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
1645                            this.defaultSocketURIString = result;
1646                            break;
1647                        } else {
1648                        // or use the first defined
1649                            if (this.defaultSocketURIString == null) {
1650                                this.defaultSocketURIString = result;
1651                            }
1652                        }
1653                    }
1654                }
1655
1656            }
1657            return this.defaultSocketURIString;
1658        }
1659       return null;
1660    }
1661
1662    /**
1663     * @return Returns the shutdownOnMasterFailure.
1664     */
1665    public boolean isShutdownOnMasterFailure() {
1666        return shutdownOnMasterFailure;
1667    }
1668
1669    /**
1670     * @param shutdownOnMasterFailure
1671     *            The shutdownOnMasterFailure to set.
1672     */
1673    public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
1674        this.shutdownOnMasterFailure = shutdownOnMasterFailure;
1675    }
1676
1677    public boolean isKeepDurableSubsActive() {
1678        return keepDurableSubsActive;
1679    }
1680
1681    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
1682        this.keepDurableSubsActive = keepDurableSubsActive;
1683    }
1684
1685    public boolean isUseVirtualTopics() {
1686        return useVirtualTopics;
1687    }
1688
1689    /**
1690     * Sets whether or not <a
1691     * href="http://activemq.apache.org/virtual-destinations.html">Virtual
1692     * Topics</a> should be supported by default if they have not been
1693     * explicitly configured.
1694     */
1695    public void setUseVirtualTopics(boolean useVirtualTopics) {
1696        this.useVirtualTopics = useVirtualTopics;
1697    }
1698
1699    public DestinationInterceptor[] getDestinationInterceptors() {
1700        return destinationInterceptors;
1701    }
1702
1703    public boolean isUseMirroredQueues() {
1704        return useMirroredQueues;
1705    }
1706
1707    /**
1708     * Sets whether or not <a
1709     * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
1710     * Queues</a> should be supported by default if they have not been
1711     * explicitly configured.
1712     */
1713    public void setUseMirroredQueues(boolean useMirroredQueues) {
1714        this.useMirroredQueues = useMirroredQueues;
1715    }
1716
1717    /**
1718     * Sets the destination interceptors to use
1719     */
1720    public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
1721        this.destinationInterceptors = destinationInterceptors;
1722    }
1723
1724    public ActiveMQDestination[] getDestinations() {
1725        return destinations;
1726    }
1727
1728    /**
1729     * Sets the destinations which should be loaded/created on startup
1730     */
1731    public void setDestinations(ActiveMQDestination[] destinations) {
1732        this.destinations = destinations;
1733    }
1734
1735    /**
1736     * @return the tempDataStore
1737     */
1738    public synchronized PListStore getTempDataStore() {
1739        if (tempDataStore == null && !hasStartException()) {
1740            if (!isPersistent()) {
1741                return null;
1742            }
1743
1744            try {
1745                PersistenceAdapter pa = getPersistenceAdapter();
1746                if( pa!=null && pa instanceof PListStore) {
1747                    return (PListStore) pa;
1748                }
1749            } catch (IOException e) {
1750                throw new RuntimeException(e);
1751            }
1752
1753            try {
1754                String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl";
1755                this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance();
1756                this.tempDataStore.setDirectory(getTmpDataDirectory());
1757                configureService(tempDataStore);
1758            } catch (Exception e) {
1759                throw new RuntimeException(e);
1760            }
1761        }
1762        return tempDataStore;
1763    }
1764
1765    /**
1766     * @param tempDataStore
1767     *            the tempDataStore to set
1768     */
1769    public void setTempDataStore(PListStore tempDataStore) {
1770        this.tempDataStore = tempDataStore;
1771        if (tempDataStore != null) {
1772            if (tmpDataDirectory == null) {
1773                tmpDataDirectory = tempDataStore.getDirectory();
1774            } else if (tempDataStore.getDirectory() == null) {
1775                tempDataStore.setDirectory(tmpDataDirectory);
1776            }
1777        }
1778        configureService(tempDataStore);
1779    }
1780
1781    public int getPersistenceThreadPriority() {
1782        return persistenceThreadPriority;
1783    }
1784
1785    public void setPersistenceThreadPriority(int persistenceThreadPriority) {
1786        this.persistenceThreadPriority = persistenceThreadPriority;
1787    }
1788
1789    /**
1790     * @return the useLocalHostBrokerName
1791     */
1792    public boolean isUseLocalHostBrokerName() {
1793        return this.useLocalHostBrokerName;
1794    }
1795
1796    /**
1797     * @param useLocalHostBrokerName
1798     *            the useLocalHostBrokerName to set
1799     */
1800    public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
1801        this.useLocalHostBrokerName = useLocalHostBrokerName;
1802        if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
1803            brokerName = LOCAL_HOST_NAME;
1804        }
1805    }
1806
1807    /**
1808     * Looks up and lazily creates if necessary the destination for the given
1809     * JMS name
1810     */
1811    public Destination getDestination(ActiveMQDestination destination) throws Exception {
1812        return getBroker().addDestination(getAdminConnectionContext(), destination,false);
1813    }
1814
1815    public void removeDestination(ActiveMQDestination destination) throws Exception {
1816        getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
1817    }
1818
1819    public int getProducerSystemUsagePortion() {
1820        return producerSystemUsagePortion;
1821    }
1822
1823    public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
1824        this.producerSystemUsagePortion = producerSystemUsagePortion;
1825    }
1826
1827    public int getConsumerSystemUsagePortion() {
1828        return consumerSystemUsagePortion;
1829    }
1830
1831    public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
1832        this.consumerSystemUsagePortion = consumerSystemUsagePortion;
1833    }
1834
1835    public boolean isSplitSystemUsageForProducersConsumers() {
1836        return splitSystemUsageForProducersConsumers;
1837    }
1838
1839    public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
1840        this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
1841    }
1842
1843    public boolean isMonitorConnectionSplits() {
1844        return monitorConnectionSplits;
1845    }
1846
1847    public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
1848        this.monitorConnectionSplits = monitorConnectionSplits;
1849    }
1850
1851    public int getTaskRunnerPriority() {
1852        return taskRunnerPriority;
1853    }
1854
1855    public void setTaskRunnerPriority(int taskRunnerPriority) {
1856        this.taskRunnerPriority = taskRunnerPriority;
1857    }
1858
1859    public boolean isDedicatedTaskRunner() {
1860        return dedicatedTaskRunner;
1861    }
1862
1863    public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
1864        this.dedicatedTaskRunner = dedicatedTaskRunner;
1865    }
1866
1867    public boolean isCacheTempDestinations() {
1868        return cacheTempDestinations;
1869    }
1870
1871    public void setCacheTempDestinations(boolean cacheTempDestinations) {
1872        this.cacheTempDestinations = cacheTempDestinations;
1873    }
1874
1875    public int getTimeBeforePurgeTempDestinations() {
1876        return timeBeforePurgeTempDestinations;
1877    }
1878
1879    public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
1880        this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
1881    }
1882
1883    public boolean isUseTempMirroredQueues() {
1884        return useTempMirroredQueues;
1885    }
1886
1887    public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
1888        this.useTempMirroredQueues = useTempMirroredQueues;
1889    }
1890
1891    public synchronized JobSchedulerStore getJobSchedulerStore() {
1892
1893        // If support is off don't allow any scheduler even is user configured their own.
1894        if (!isSchedulerSupport()) {
1895            return null;
1896        }
1897
1898        // If the user configured their own we use it even if persistence is disabled since
1899        // we don't know anything about their implementation.
1900        if (jobSchedulerStore == null && !hasStartException()) {
1901
1902            if (!isPersistent()) {
1903                this.jobSchedulerStore = new InMemoryJobSchedulerStore();
1904                configureService(jobSchedulerStore);
1905                return this.jobSchedulerStore;
1906            }
1907
1908            try {
1909                PersistenceAdapter pa = getPersistenceAdapter();
1910                if (pa != null) {
1911                    this.jobSchedulerStore = pa.createJobSchedulerStore();
1912                    jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
1913                    configureService(jobSchedulerStore);
1914                    return this.jobSchedulerStore;
1915                }
1916            } catch (IOException e) {
1917                throw new RuntimeException(e);
1918            } catch (UnsupportedOperationException ex) {
1919                // It's ok if the store doesn't implement a scheduler.
1920            } catch (Exception e) {
1921                throw new RuntimeException(e);
1922            }
1923
1924            try {
1925                PersistenceAdapter pa = getPersistenceAdapter();
1926                if (pa != null && pa instanceof JobSchedulerStore) {
1927                    this.jobSchedulerStore = (JobSchedulerStore) pa;
1928                    configureService(jobSchedulerStore);
1929                    return this.jobSchedulerStore;
1930                }
1931            } catch (IOException e) {
1932                throw new RuntimeException(e);
1933            }
1934
1935            // Load the KahaDB store as a last resort, this only works if KahaDB is
1936            // included at runtime, otherwise this will fail.  User should disable
1937            // scheduler support if this fails.
1938            try {
1939                String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
1940                PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
1941                jobSchedulerStore = adaptor.createJobSchedulerStore();
1942                jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
1943                configureService(jobSchedulerStore);
1944                LOG.info("JobScheduler using directory: {}", getSchedulerDirectoryFile());
1945            } catch (Exception e) {
1946                throw new RuntimeException(e);
1947            }
1948        }
1949        return jobSchedulerStore;
1950    }
1951
1952    public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) {
1953        this.jobSchedulerStore = jobSchedulerStore;
1954        configureService(jobSchedulerStore);
1955    }
1956
1957    //
1958    // Implementation methods
1959    // -------------------------------------------------------------------------
1960    /**
1961     * Handles any lazy-creation helper properties which are added to make
1962     * things easier to configure inside environments such as Spring
1963     *
1964     * @throws Exception
1965     */
1966    protected void processHelperProperties() throws Exception {
1967        if (transportConnectorURIs != null) {
1968            for (int i = 0; i < transportConnectorURIs.length; i++) {
1969                String uri = transportConnectorURIs[i];
1970                addConnector(uri);
1971            }
1972        }
1973        if (networkConnectorURIs != null) {
1974            for (int i = 0; i < networkConnectorURIs.length; i++) {
1975                String uri = networkConnectorURIs[i];
1976                addNetworkConnector(uri);
1977            }
1978        }
1979        if (jmsBridgeConnectors != null) {
1980            for (int i = 0; i < jmsBridgeConnectors.length; i++) {
1981                addJmsConnector(jmsBridgeConnectors[i]);
1982            }
1983        }
1984    }
1985
1986    /**
1987     * Check that the store usage limit is not greater than max usable
1988     * space and adjust if it is
1989     */
1990    protected void checkStoreUsageLimits() throws Exception {
1991        final SystemUsage usage = getSystemUsage();
1992
1993        if (getPersistenceAdapter() != null) {
1994            PersistenceAdapter adapter = getPersistenceAdapter();
1995            checkUsageLimit(adapter.getDirectory(), usage.getStoreUsage(), usage.getStoreUsage().getPercentLimit());
1996
1997            long maxJournalFileSize = 0;
1998            long storeLimit = usage.getStoreUsage().getLimit();
1999
2000            if (adapter instanceof JournaledStore) {
2001                maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength();
2002            }
2003
2004            if (storeLimit < maxJournalFileSize) {
2005                LOG.error("Store limit is " + storeLimit / (1024 * 1024) +
2006                          " mb, whilst the max journal file size for the store is: " +
2007                          maxJournalFileSize / (1024 * 1024) + " mb, " +
2008                          "the store will not accept any data when used.");
2009
2010            }
2011        }
2012    }
2013
2014    /**
2015     * Check that temporary usage limit is not greater than max usable
2016     * space and adjust if it is
2017     */
2018    protected void checkTmpStoreUsageLimits() throws Exception {
2019        final SystemUsage usage = getSystemUsage();
2020
2021        File tmpDir = getTmpDataDirectory();
2022
2023        if (tmpDir != null) {
2024            checkUsageLimit(tmpDir, usage.getTempUsage(), usage.getTempUsage().getPercentLimit());
2025
2026            if (isPersistent()) {
2027                long maxJournalFileSize;
2028
2029                PListStore store = usage.getTempUsage().getStore();
2030                if (store != null && store instanceof JournaledStore) {
2031                    maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength();
2032                } else {
2033                    maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH;
2034                }
2035                long storeLimit = usage.getTempUsage().getLimit();
2036
2037                if (storeLimit < maxJournalFileSize) {
2038                    LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
2039                              " mb, whilst the max journal file size for the temporary store is: " +
2040                              maxJournalFileSize / (1024 * 1024) + " mb, " +
2041                              "the temp store will not accept any data when used.");
2042                }
2043            }
2044        }
2045    }
2046
2047    protected void checkUsageLimit(File dir, PercentLimitUsage<?> storeUsage, int percentLimit) throws ConfigurationException {
2048        if (dir != null) {
2049            dir = StoreUtil.findParentDirectory(dir);
2050            String storeName = storeUsage instanceof StoreUsage ? "Store" : "Temporary Store";
2051            long storeLimit = storeUsage.getLimit();
2052            long storeCurrent = storeUsage.getUsage();
2053            long totalSpace = storeUsage.getTotal() > 0 ? storeUsage.getTotal() : dir.getTotalSpace();
2054            long totalUsableSpace = (storeUsage.getTotal() > 0 ? storeUsage.getTotal() : dir.getUsableSpace()) + storeCurrent;
2055            if (totalUsableSpace < 0 || totalSpace < 0) {
2056                final String message = "File system space reported by: " + dir + " was negative, possibly a huge file system, set a sane usage.total to provide some guidance";
2057                LOG.error(message);
2058                throw new ConfigurationException(message);
2059            }
2060            //compute byte value of the percent limit
2061            long bytePercentLimit = totalSpace * percentLimit / 100;
2062            int oneMeg = 1024 * 1024;
2063
2064            //Check if the store limit is less than the percent Limit that was set and also
2065            //the usable space...this means we can grow the store larger
2066            //Changes in partition size (total space) as well as changes in usable space should
2067            //be detected here
2068            if (diskUsageCheckRegrowThreshold > -1 && percentLimit > 0
2069                    && storeUsage.getTotal() == 0
2070                    && storeLimit < bytePercentLimit && storeLimit < totalUsableSpace){
2071
2072                // set the limit to be bytePercentLimit or usableSpace if
2073                // usableSpace is less than the percentLimit
2074                long newLimit = bytePercentLimit > totalUsableSpace ? totalUsableSpace : bytePercentLimit;
2075
2076                //To prevent changing too often, check threshold
2077                if (newLimit - storeLimit >= diskUsageCheckRegrowThreshold) {
2078                    LOG.info("Usable disk space has been increased, attempting to regrow " + storeName + " limit to "
2079                            + percentLimit + "% of the partition size.");
2080                    storeUsage.setLimit(newLimit);
2081                    LOG.info(storeName + " limit has been increased to " + newLimit * 100 / totalSpace
2082                            + "% (" + newLimit / oneMeg + " mb) of the partition size.");
2083                }
2084
2085            //check if the limit is too large for the amount of usable space
2086            } else if (storeLimit > totalUsableSpace) {
2087                final String message = storeName + " limit is " +  storeLimit / oneMeg
2088                        + " mb (current store usage is " + storeCurrent / oneMeg
2089                        + " mb). The data directory: " + dir.getAbsolutePath()
2090                        + " only has " + totalUsableSpace / oneMeg
2091                        + " mb of usable space.";
2092
2093                if (!isAdjustUsageLimits()) {
2094                    LOG.error(message);
2095                    throw new ConfigurationException(message);
2096                }
2097
2098                if (percentLimit > 0) {
2099                    LOG.warn(storeName + " limit has been set to "
2100                            + percentLimit + "% (" + bytePercentLimit / oneMeg + " mb)"
2101                            + " of the partition size but there is not enough usable space."
2102                            + " The current store limit (which may have been adjusted by a"
2103                            + " previous usage limit check) is set to (" + storeLimit / oneMeg + " mb)"
2104                            + " but only " + totalUsableSpace * 100 / totalSpace + "% (" + totalUsableSpace / oneMeg + " mb)"
2105                            + " is available - resetting limit");
2106                } else {
2107                    LOG.warn(message + " - resetting to maximum available disk space: " +
2108                         totalUsableSpace / oneMeg + " mb");
2109                }
2110                storeUsage.setLimit(totalUsableSpace);
2111            }
2112        }
2113    }
2114
2115    /**
2116     * Schedules a periodic task based on schedulePeriodForDiskLimitCheck to
2117     * update store and temporary store limits if the amount of available space
2118     * plus current store size is less than the existin configured limit
2119     */
2120    protected void scheduleDiskUsageLimitsCheck() throws IOException {
2121        if (schedulePeriodForDiskUsageCheck > 0 &&
2122                (getPersistenceAdapter() != null || getTmpDataDirectory() != null)) {
2123            Runnable diskLimitCheckTask = new Runnable() {
2124                @Override
2125                public void run() {
2126                    try {
2127                        checkStoreUsageLimits();
2128                    } catch (Throwable e) {
2129                        LOG.error("Failed to check persistent disk usage limits", e);
2130                    }
2131
2132                    try {
2133                        checkTmpStoreUsageLimits();
2134                    } catch (Throwable e) {
2135                        LOG.error("Failed to check temporary store usage limits", e);
2136                    }
2137                }
2138            };
2139            scheduler.executePeriodically(diskLimitCheckTask, schedulePeriodForDiskUsageCheck);
2140        }
2141    }
2142
2143    protected void checkMemorySystemUsageLimits() throws Exception {
2144        final SystemUsage usage = getSystemUsage();
2145        long memLimit = usage.getMemoryUsage().getLimit();
2146        long jvmLimit = Runtime.getRuntime().maxMemory();
2147
2148        if (memLimit > jvmLimit) {
2149            final String message = "Memory Usage for the Broker (" + memLimit / (1024 * 1024)
2150                    + "mb) is more than the maximum available for the JVM: " + jvmLimit / (1024 * 1024);
2151
2152            if (adjustUsageLimits) {
2153                usage.getMemoryUsage().setPercentOfJvmHeap(70);
2154                LOG.warn(message + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb");
2155            } else {
2156                LOG.error(message);
2157                throw new ConfigurationException(message);
2158            }
2159        }
2160    }
2161
2162    protected void checkStoreSystemUsageLimits() throws Exception {
2163        final SystemUsage usage = getSystemUsage();
2164
2165        //Check the persistent store and temp store limits if they exist
2166        //and schedule a periodic check to update disk limits if
2167        //schedulePeriodForDiskLimitCheck is set
2168        checkStoreUsageLimits();
2169        checkTmpStoreUsageLimits();
2170        scheduleDiskUsageLimitsCheck();
2171
2172        if (getJobSchedulerStore() != null) {
2173            JobSchedulerStore scheduler = getJobSchedulerStore();
2174            File schedulerDir = scheduler.getDirectory();
2175            if (schedulerDir != null) {
2176
2177                String schedulerDirPath = schedulerDir.getAbsolutePath();
2178                if (!schedulerDir.isAbsolute()) {
2179                    schedulerDir = new File(schedulerDirPath);
2180                }
2181
2182                while (schedulerDir != null && !schedulerDir.isDirectory()) {
2183                    schedulerDir = schedulerDir.getParentFile();
2184                }
2185                long schedulerLimit = usage.getJobSchedulerUsage().getLimit();
2186                long dirFreeSpace = schedulerDir.getUsableSpace();
2187                if (schedulerLimit > dirFreeSpace) {
2188                    LOG.warn("Job Scheduler Store limit is " + schedulerLimit / (1024 * 1024) +
2189                             " mb, whilst the data directory: " + schedulerDir.getAbsolutePath() +
2190                             " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to " +
2191                            dirFreeSpace / (1024 * 1024) + " mb.");
2192                    usage.getJobSchedulerUsage().setLimit(dirFreeSpace);
2193                }
2194            }
2195        }
2196    }
2197
2198    public void stopAllConnectors(ServiceStopper stopper) {
2199        for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2200            NetworkConnector connector = iter.next();
2201            unregisterNetworkConnectorMBean(connector);
2202            stopper.stop(connector);
2203        }
2204        for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2205            ProxyConnector connector = iter.next();
2206            stopper.stop(connector);
2207        }
2208        for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2209            JmsConnector connector = iter.next();
2210            stopper.stop(connector);
2211        }
2212        for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2213            TransportConnector connector = iter.next();
2214            try {
2215                unregisterConnectorMBean(connector);
2216            } catch (IOException e) {
2217            }
2218            stopper.stop(connector);
2219        }
2220    }
2221
2222    protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
2223        try {
2224            ObjectName objectName = createConnectorObjectName(connector);
2225            connector = connector.asManagedConnector(getManagementContext(), objectName);
2226            ConnectorViewMBean view = new ConnectorView(connector);
2227            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2228            return connector;
2229        } catch (Throwable e) {
2230            throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e, e);
2231        }
2232    }
2233
2234    protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
2235        if (isUseJmx()) {
2236            try {
2237                ObjectName objectName = createConnectorObjectName(connector);
2238                getManagementContext().unregisterMBean(objectName);
2239            } catch (Throwable e) {
2240                throw IOExceptionSupport.create(
2241                        "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e);
2242            }
2243        }
2244    }
2245
2246    protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
2247        return adaptor;
2248    }
2249
2250    protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
2251        if (isUseJmx()) {}
2252    }
2253
2254    private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
2255        return BrokerMBeanSupport.createConnectorName(getBrokerObjectName(), "clientConnectors", connector.getName());
2256    }
2257
2258    public void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
2259        NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
2260        try {
2261            ObjectName objectName = createNetworkConnectorObjectName(connector);
2262            connector.setObjectName(objectName);
2263            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2264        } catch (Throwable e) {
2265            throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
2266        }
2267    }
2268
2269    protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
2270        return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName());
2271    }
2272
2273    public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException {
2274        return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "duplexNetworkConnectors", transport);
2275    }
2276
2277    protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
2278        if (isUseJmx()) {
2279            try {
2280                ObjectName objectName = createNetworkConnectorObjectName(connector);
2281                getManagementContext().unregisterMBean(objectName);
2282            } catch (Exception e) {
2283                LOG.warn("Network Connector could not be unregistered from JMX due " + e.getMessage() + ". This exception is ignored.", e);
2284            }
2285        }
2286    }
2287
2288    protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
2289        ProxyConnectorView view = new ProxyConnectorView(connector);
2290        try {
2291            ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "proxyConnectors", connector.getName());
2292            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2293        } catch (Throwable e) {
2294            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
2295        }
2296    }
2297
2298    protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
2299        JmsConnectorView view = new JmsConnectorView(connector);
2300        try {
2301            ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "jmsConnectors", connector.getName());
2302            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2303        } catch (Throwable e) {
2304            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
2305        }
2306    }
2307
2308    /**
2309     * Factory method to create a new broker
2310     *
2311     * @throws Exception
2312     */
2313    protected Broker createBroker() throws Exception {
2314        regionBroker = createRegionBroker();
2315        Broker broker = addInterceptors(regionBroker);
2316        // Add a filter that will stop access to the broker once stopped
2317        broker = new MutableBrokerFilter(broker) {
2318            Broker old;
2319
2320            @Override
2321            public void stop() throws Exception {
2322                old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
2323                    // Just ignore additional stop actions.
2324                    @Override
2325                    public void stop() throws Exception {
2326                    }
2327                });
2328                old.stop();
2329            }
2330
2331            @Override
2332            public void start() throws Exception {
2333                if (forceStart && old != null) {
2334                    this.next.set(old);
2335                }
2336                getNext().start();
2337            }
2338        };
2339        return broker;
2340    }
2341
2342    /**
2343     * Factory method to create the core region broker onto which interceptors
2344     * are added
2345     *
2346     * @throws Exception
2347     */
2348    protected Broker createRegionBroker() throws Exception {
2349        if (destinationInterceptors == null) {
2350            destinationInterceptors = createDefaultDestinationInterceptor();
2351        }
2352        configureServices(destinationInterceptors);
2353        DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
2354        if (destinationFactory == null) {
2355            destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
2356        }
2357        return createRegionBroker(destinationInterceptor);
2358    }
2359
2360    protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
2361        RegionBroker regionBroker;
2362        if (isUseJmx()) {
2363            try {
2364                regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
2365                    getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
2366            } catch(MalformedObjectNameException me){
2367                LOG.warn("Cannot create ManagedRegionBroker due " + me.getMessage(), me);
2368                throw new IOException(me);
2369            }
2370        } else {
2371            regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
2372                    destinationInterceptor,getScheduler(),getExecutor());
2373        }
2374        destinationFactory.setRegionBroker(regionBroker);
2375        regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
2376        regionBroker.setBrokerName(getBrokerName());
2377        regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
2378        regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend());
2379        if (brokerId != null) {
2380            regionBroker.setBrokerId(brokerId);
2381        }
2382        return regionBroker;
2383    }
2384
2385    /**
2386     * Create the default destination interceptor
2387     */
2388    protected DestinationInterceptor[] createDefaultDestinationInterceptor() {
2389        List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>();
2390        if (isUseVirtualTopics()) {
2391            VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
2392            VirtualTopic virtualTopic = new VirtualTopic();
2393            virtualTopic.setName("VirtualTopic.>");
2394            VirtualDestination[] virtualDestinations = { virtualTopic };
2395            interceptor.setVirtualDestinations(virtualDestinations);
2396            answer.add(interceptor);
2397        }
2398        if (isUseMirroredQueues()) {
2399            MirroredQueue interceptor = new MirroredQueue();
2400            answer.add(interceptor);
2401        }
2402        DestinationInterceptor[] array = new DestinationInterceptor[answer.size()];
2403        answer.toArray(array);
2404        return array;
2405    }
2406
2407    /**
2408     * Strategy method to add interceptors to the broker
2409     *
2410     * @throws IOException
2411     */
2412    protected Broker addInterceptors(Broker broker) throws Exception {
2413        if (isSchedulerSupport()) {
2414            SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore());
2415            if (isUseJmx()) {
2416                JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
2417                try {
2418                    ObjectName objectName = BrokerMBeanSupport.createJobSchedulerServiceName(getBrokerObjectName());
2419                    AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2420                    this.adminView.setJMSJobScheduler(objectName);
2421                } catch (Throwable e) {
2422                    throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: "
2423                            + e.getMessage(), e);
2424                }
2425            }
2426            broker = sb;
2427        }
2428        if (isUseJmx()) {
2429            HealthViewMBean statusView = new HealthView((ManagedRegionBroker)getRegionBroker());
2430            try {
2431                ObjectName objectName = BrokerMBeanSupport.createHealthServiceName(getBrokerObjectName());
2432                AnnotatedMBean.registerMBean(getManagementContext(), statusView, objectName);
2433            } catch (Throwable e) {
2434                throw IOExceptionSupport.create("Status MBean could not be registered in JMX: "
2435                        + e.getMessage(), e);
2436            }
2437        }
2438        if (isAdvisorySupport()) {
2439            broker = new AdvisoryBroker(broker);
2440        }
2441        broker = new CompositeDestinationBroker(broker);
2442        broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
2443        if (isPopulateJMSXUserID()) {
2444            UserIDBroker userIDBroker = new UserIDBroker(broker);
2445            userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID());
2446            broker = userIDBroker;
2447        }
2448        if (isMonitorConnectionSplits()) {
2449            broker = new ConnectionSplitBroker(broker);
2450        }
2451        if (plugins != null) {
2452            for (int i = 0; i < plugins.length; i++) {
2453                BrokerPlugin plugin = plugins[i];
2454                broker = plugin.installPlugin(broker);
2455            }
2456        }
2457        return broker;
2458    }
2459
2460    protected PersistenceAdapter createPersistenceAdapter() throws IOException {
2461        if (isPersistent()) {
2462            PersistenceAdapterFactory fac = getPersistenceFactory();
2463            if (fac != null) {
2464                return fac.createPersistenceAdapter();
2465            } else {
2466                try {
2467                    String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
2468                    PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
2469                    File dir = new File(getBrokerDataDirectory(),"KahaDB");
2470                    adaptor.setDirectory(dir);
2471                    return adaptor;
2472                } catch (Throwable e) {
2473                    throw IOExceptionSupport.create(e);
2474                }
2475            }
2476        } else {
2477            return new MemoryPersistenceAdapter();
2478        }
2479    }
2480
2481    protected ObjectName createBrokerObjectName() throws MalformedObjectNameException  {
2482        return BrokerMBeanSupport.createBrokerObjectName(getManagementContext().getJmxDomainName(), getBrokerName());
2483    }
2484
2485    protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
2486        TransportServer transport = TransportFactorySupport.bind(this, brokerURI);
2487        return new TransportConnector(transport);
2488    }
2489
2490    /**
2491     * Extracts the port from the options
2492     */
2493    protected Object getPort(Map<?,?> options) {
2494        Object port = options.get("port");
2495        if (port == null) {
2496            port = DEFAULT_PORT;
2497            LOG.warn("No port specified so defaulting to: {}", port);
2498        }
2499        return port;
2500    }
2501
2502    protected void addShutdownHook() {
2503        if (useShutdownHook) {
2504            shutdownHook = new Thread("ActiveMQ ShutdownHook") {
2505                @Override
2506                public void run() {
2507                    containerShutdown();
2508                }
2509            };
2510            Runtime.getRuntime().addShutdownHook(shutdownHook);
2511        }
2512    }
2513
2514    protected void removeShutdownHook() {
2515        if (shutdownHook != null) {
2516            try {
2517                Runtime.getRuntime().removeShutdownHook(shutdownHook);
2518            } catch (Exception e) {
2519                LOG.debug("Caught exception, must be shutting down. This exception is ignored.", e);
2520            }
2521        }
2522    }
2523
2524    /**
2525     * Sets hooks to be executed when broker shut down
2526     *
2527     * @org.apache.xbean.Property
2528     */
2529    public void setShutdownHooks(List<Runnable> hooks) throws Exception {
2530        for (Runnable hook : hooks) {
2531            addShutdownHook(hook);
2532        }
2533    }
2534
2535    /**
2536     * Causes a clean shutdown of the container when the VM is being shut down
2537     */
2538    protected void containerShutdown() {
2539        try {
2540            stop();
2541        } catch (IOException e) {
2542            Throwable linkedException = e.getCause();
2543            if (linkedException != null) {
2544                logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
2545            } else {
2546                logError("Failed to shut down: " + e, e);
2547            }
2548            if (!useLoggingForShutdownErrors) {
2549                e.printStackTrace(System.err);
2550            }
2551        } catch (Exception e) {
2552            logError("Failed to shut down: " + e, e);
2553        }
2554    }
2555
2556    protected void logError(String message, Throwable e) {
2557        if (useLoggingForShutdownErrors) {
2558            LOG.error("Failed to shut down: " + e);
2559        } else {
2560            System.err.println("Failed to shut down: " + e);
2561        }
2562    }
2563
2564    /**
2565     * Starts any configured destinations on startup
2566     */
2567    protected void startDestinations() throws Exception {
2568        if (destinations != null) {
2569            ConnectionContext adminConnectionContext = getAdminConnectionContext();
2570            for (int i = 0; i < destinations.length; i++) {
2571                ActiveMQDestination destination = destinations[i];
2572                getBroker().addDestination(adminConnectionContext, destination,true);
2573            }
2574        }
2575        if (isUseVirtualTopics()) {
2576            startVirtualConsumerDestinations();
2577        }
2578    }
2579
2580    /**
2581     * Returns the broker's administration connection context used for
2582     * configuring the broker at startup
2583     */
2584    public ConnectionContext getAdminConnectionContext() throws Exception {
2585        return BrokerSupport.getConnectionContext(getBroker());
2586    }
2587
2588    protected void startManagementContext() throws Exception {
2589        getManagementContext().setBrokerName(brokerName);
2590        getManagementContext().start();
2591        adminView = new BrokerView(this, null);
2592        ObjectName objectName = getBrokerObjectName();
2593        AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName);
2594    }
2595
2596    /**
2597     * Start all transport and network connections, proxies and bridges
2598     *
2599     * @throws Exception
2600     */
2601    public void startAllConnectors() throws Exception {
2602        Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
2603        List<TransportConnector> al = new ArrayList<TransportConnector>();
2604        for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2605            TransportConnector connector = iter.next();
2606            al.add(startTransportConnector(connector));
2607        }
2608        if (al.size() > 0) {
2609            // let's clear the transportConnectors list and replace it with
2610            // the started transportConnector instances
2611            this.transportConnectors.clear();
2612            setTransportConnectors(al);
2613        }
2614        this.slave = false;
2615        if (!stopped.get()) {
2616            ThreadPoolExecutor networkConnectorStartExecutor = null;
2617            if (isNetworkConnectorStartAsync()) {
2618                // spin up as many threads as needed
2619                networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
2620                    10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
2621                    new ThreadFactory() {
2622                        int count=0;
2623                        @Override
2624                        public Thread newThread(Runnable runnable) {
2625                            Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
2626                            thread.setDaemon(true);
2627                            return thread;
2628                        }
2629                    });
2630            }
2631
2632            for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2633                final NetworkConnector connector = iter.next();
2634                connector.setLocalUri(getVmConnectorURI());
2635                connector.setBrokerName(getBrokerName());
2636                connector.setDurableDestinations(durableDestinations);
2637                if (getDefaultSocketURIString() != null) {
2638                    connector.setBrokerURL(getDefaultSocketURIString());
2639                }
2640                if (networkConnectorStartExecutor != null) {
2641                    networkConnectorStartExecutor.execute(new Runnable() {
2642                        @Override
2643                        public void run() {
2644                            try {
2645                                LOG.info("Async start of {}", connector);
2646                                connector.start();
2647                            } catch(Exception e) {
2648                                LOG.error("Async start of network connector: {} failed", connector, e);
2649                            }
2650                        }
2651                    });
2652                } else {
2653                    connector.start();
2654                }
2655            }
2656            if (networkConnectorStartExecutor != null) {
2657                // executor done when enqueued tasks are complete
2658                ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
2659            }
2660
2661            for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2662                ProxyConnector connector = iter.next();
2663                connector.start();
2664            }
2665            for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2666                JmsConnector connector = iter.next();
2667                connector.start();
2668            }
2669            for (Service service : services) {
2670                configureService(service);
2671                service.start();
2672            }
2673        }
2674    }
2675
2676    public TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
2677        connector.setBrokerService(this);
2678        connector.setTaskRunnerFactory(getTaskRunnerFactory());
2679        MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
2680        if (policy != null) {
2681            connector.setMessageAuthorizationPolicy(policy);
2682        }
2683        if (isUseJmx()) {
2684            connector = registerConnectorMBean(connector);
2685        }
2686        connector.getStatistics().setEnabled(enableStatistics);
2687        connector.start();
2688        return connector;
2689    }
2690
2691    /**
2692     * Perform any custom dependency injection
2693     */
2694    protected void configureServices(Object[] services) {
2695        for (Object service : services) {
2696            configureService(service);
2697        }
2698    }
2699
2700    /**
2701     * Perform any custom dependency injection
2702     */
2703    protected void configureService(Object service) {
2704        if (service instanceof BrokerServiceAware) {
2705            BrokerServiceAware serviceAware = (BrokerServiceAware) service;
2706            serviceAware.setBrokerService(this);
2707        }
2708    }
2709
2710    public void handleIOException(IOException exception) {
2711        if (ioExceptionHandler != null) {
2712            ioExceptionHandler.handle(exception);
2713         } else {
2714            LOG.info("No IOExceptionHandler registered, ignoring IO exception", exception);
2715         }
2716    }
2717
2718    protected void startVirtualConsumerDestinations() throws Exception {
2719        checkStartException();
2720        ConnectionContext adminConnectionContext = getAdminConnectionContext();
2721        Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
2722        DestinationFilter filter = getVirtualTopicConsumerDestinationFilter();
2723        if (!destinations.isEmpty()) {
2724            for (ActiveMQDestination destination : destinations) {
2725                if (filter.matches(destination) == true) {
2726                    broker.addDestination(adminConnectionContext, destination, false);
2727                }
2728            }
2729        }
2730    }
2731
2732    private DestinationFilter getVirtualTopicConsumerDestinationFilter() {
2733        // created at startup, so no sync needed
2734        if (virtualConsumerDestinationFilter == null) {
2735            Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>();
2736            if (destinationInterceptors != null) {
2737                for (DestinationInterceptor interceptor : destinationInterceptors) {
2738                    if (interceptor instanceof VirtualDestinationInterceptor) {
2739                        VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor;
2740                        for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) {
2741                            if (virtualDestination instanceof VirtualTopic) {
2742                                consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT));
2743                            }
2744                            if (isUseVirtualDestSubs()) {
2745                                try {
2746                                    broker.virtualDestinationAdded(getAdminConnectionContext(), virtualDestination);
2747                                    LOG.debug("Adding virtual destination: {}", virtualDestination);
2748                                } catch (Exception e) {
2749                                    LOG.warn("Could not fire virtual destination consumer advisory", e);
2750                                }
2751                            }
2752                        }
2753                    }
2754                }
2755            }
2756            ActiveMQQueue filter = new ActiveMQQueue();
2757            filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{}));
2758            virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter);
2759        }
2760        return virtualConsumerDestinationFilter;
2761    }
2762
2763    protected synchronized ThreadPoolExecutor getExecutor() {
2764        if (this.executor == null) {
2765            this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
2766
2767                private long i = 0;
2768
2769                @Override
2770                public Thread newThread(Runnable runnable) {
2771                    this.i++;
2772                    Thread thread = new Thread(runnable, "ActiveMQ BrokerService.worker." + this.i);
2773                    thread.setDaemon(true);
2774                    thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
2775                        @Override
2776                        public void uncaughtException(final Thread t, final Throwable e) {
2777                            LOG.error("Error in thread '{}'", t.getName(), e);
2778                        }
2779                    });
2780                    return thread;
2781                }
2782            }, new RejectedExecutionHandler() {
2783                @Override
2784                public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
2785                    try {
2786                        executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
2787                    } catch (InterruptedException e) {
2788                        throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
2789                    }
2790
2791                    throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
2792                }
2793            });
2794        }
2795        return this.executor;
2796    }
2797
2798    public synchronized Scheduler getScheduler() {
2799        if (this.scheduler==null) {
2800            this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler");
2801            try {
2802                this.scheduler.start();
2803            } catch (Exception e) {
2804               LOG.error("Failed to start Scheduler", e);
2805            }
2806        }
2807        return this.scheduler;
2808    }
2809
2810    public Broker getRegionBroker() {
2811        return regionBroker;
2812    }
2813
2814    public void setRegionBroker(Broker regionBroker) {
2815        this.regionBroker = regionBroker;
2816    }
2817
2818    public void addShutdownHook(Runnable hook) {
2819        synchronized (shutdownHooks) {
2820            shutdownHooks.add(hook);
2821        }
2822    }
2823
2824    public void removeShutdownHook(Runnable hook) {
2825        synchronized (shutdownHooks) {
2826            shutdownHooks.remove(hook);
2827        }
2828    }
2829
2830    public boolean isSystemExitOnShutdown() {
2831        return systemExitOnShutdown;
2832    }
2833
2834    /**
2835     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2836     */
2837    public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
2838        this.systemExitOnShutdown = systemExitOnShutdown;
2839    }
2840
2841    public int getSystemExitOnShutdownExitCode() {
2842        return systemExitOnShutdownExitCode;
2843    }
2844
2845    public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) {
2846        this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode;
2847    }
2848
2849    public SslContext getSslContext() {
2850        return sslContext;
2851    }
2852
2853    public void setSslContext(SslContext sslContext) {
2854        this.sslContext = sslContext;
2855    }
2856
2857    public boolean isShutdownOnSlaveFailure() {
2858        return shutdownOnSlaveFailure;
2859    }
2860
2861    /**
2862     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2863     */
2864    public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
2865        this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
2866    }
2867
2868    public boolean isWaitForSlave() {
2869        return waitForSlave;
2870    }
2871
2872    /**
2873     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2874     */
2875    public void setWaitForSlave(boolean waitForSlave) {
2876        this.waitForSlave = waitForSlave;
2877    }
2878
2879    public long getWaitForSlaveTimeout() {
2880        return this.waitForSlaveTimeout;
2881    }
2882
2883    public void setWaitForSlaveTimeout(long waitForSlaveTimeout) {
2884        this.waitForSlaveTimeout = waitForSlaveTimeout;
2885    }
2886
2887    /**
2888     * Get the passiveSlave
2889     * @return the passiveSlave
2890     */
2891    public boolean isPassiveSlave() {
2892        return this.passiveSlave;
2893    }
2894
2895    /**
2896     * Set the passiveSlave
2897     * @param passiveSlave the passiveSlave to set
2898     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2899     */
2900    public void setPassiveSlave(boolean passiveSlave) {
2901        this.passiveSlave = passiveSlave;
2902    }
2903
2904    /**
2905     * override the Default IOException handler, called when persistence adapter
2906     * has experiences File or JDBC I/O Exceptions
2907     *
2908     * @param ioExceptionHandler
2909     */
2910    public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
2911        configureService(ioExceptionHandler);
2912        this.ioExceptionHandler = ioExceptionHandler;
2913    }
2914
2915    public IOExceptionHandler getIoExceptionHandler() {
2916        return ioExceptionHandler;
2917    }
2918
2919    /**
2920     * @return the schedulerSupport
2921     */
2922    public boolean isSchedulerSupport() {
2923        return this.schedulerSupport;
2924    }
2925
2926    /**
2927     * @param schedulerSupport the schedulerSupport to set
2928     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2929     */
2930    public void setSchedulerSupport(boolean schedulerSupport) {
2931        this.schedulerSupport = schedulerSupport;
2932    }
2933
2934    /**
2935     * @return the schedulerDirectory
2936     */
2937    public File getSchedulerDirectoryFile() {
2938        if (this.schedulerDirectoryFile == null) {
2939            this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler");
2940        }
2941        return schedulerDirectoryFile;
2942    }
2943
2944    /**
2945     * @param schedulerDirectory the schedulerDirectory to set
2946     */
2947    public void setSchedulerDirectoryFile(File schedulerDirectory) {
2948        this.schedulerDirectoryFile = schedulerDirectory;
2949    }
2950
2951    public void setSchedulerDirectory(String schedulerDirectory) {
2952        setSchedulerDirectoryFile(new File(schedulerDirectory));
2953    }
2954
2955    public int getSchedulePeriodForDestinationPurge() {
2956        return this.schedulePeriodForDestinationPurge;
2957    }
2958
2959    public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) {
2960        this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge;
2961    }
2962
2963    /**
2964     * @param schedulePeriodForDiskUsageCheck
2965     */
2966    public void setSchedulePeriodForDiskUsageCheck(
2967            int schedulePeriodForDiskUsageCheck) {
2968        this.schedulePeriodForDiskUsageCheck = schedulePeriodForDiskUsageCheck;
2969    }
2970
2971    public int getDiskUsageCheckRegrowThreshold() {
2972        return diskUsageCheckRegrowThreshold;
2973    }
2974
2975    /**
2976     * @param diskUsageCheckRegrowThreshold
2977     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
2978     */
2979    public void setDiskUsageCheckRegrowThreshold(int diskUsageCheckRegrowThreshold) {
2980        this.diskUsageCheckRegrowThreshold = diskUsageCheckRegrowThreshold;
2981    }
2982
2983    public int getMaxPurgedDestinationsPerSweep() {
2984        return this.maxPurgedDestinationsPerSweep;
2985    }
2986
2987    public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) {
2988        this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep;
2989    }
2990
2991    public BrokerContext getBrokerContext() {
2992        return brokerContext;
2993    }
2994
2995    public void setBrokerContext(BrokerContext brokerContext) {
2996        this.brokerContext = brokerContext;
2997    }
2998
2999    public void setBrokerId(String brokerId) {
3000        this.brokerId = new BrokerId(brokerId);
3001    }
3002
3003    public boolean isUseAuthenticatedPrincipalForJMSXUserID() {
3004        return useAuthenticatedPrincipalForJMSXUserID;
3005    }
3006
3007    public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) {
3008        this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID;
3009    }
3010
3011    /**
3012     * Should MBeans that support showing the Authenticated User Name information have this
3013     * value filled in or not.
3014     *
3015     * @return true if user names should be exposed in MBeans
3016     */
3017    public boolean isPopulateUserNameInMBeans() {
3018        return this.populateUserNameInMBeans;
3019    }
3020
3021    /**
3022     * Sets whether Authenticated User Name information is shown in MBeans that support this field.
3023     * @param value if MBeans should expose user name information.
3024     */
3025    public void setPopulateUserNameInMBeans(boolean value) {
3026        this.populateUserNameInMBeans = value;
3027    }
3028
3029    /**
3030     * Gets the time in Milliseconds that an invocation of an MBean method will wait before
3031     * failing.  The default value is to wait forever (zero).
3032     *
3033     * @return timeout in milliseconds before MBean calls fail, (default is 0 or no timeout).
3034     */
3035    public long getMbeanInvocationTimeout() {
3036        return mbeanInvocationTimeout;
3037    }
3038
3039    /**
3040     * Gets the time in Milliseconds that an invocation of an MBean method will wait before
3041     * failing. The default value is to wait forever (zero).
3042     *
3043     * @param mbeanInvocationTimeout
3044     *      timeout in milliseconds before MBean calls fail, (default is 0 or no timeout).
3045     */
3046    public void setMbeanInvocationTimeout(long mbeanInvocationTimeout) {
3047        this.mbeanInvocationTimeout = mbeanInvocationTimeout;
3048    }
3049
3050    public boolean isNetworkConnectorStartAsync() {
3051        return networkConnectorStartAsync;
3052    }
3053
3054    public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) {
3055        this.networkConnectorStartAsync = networkConnectorStartAsync;
3056    }
3057
3058    public boolean isAllowTempAutoCreationOnSend() {
3059        return allowTempAutoCreationOnSend;
3060    }
3061
3062    /**
3063     * enable if temp destinations need to be propagated through a network when
3064     * advisorySupport==false. This is used in conjunction with the policy
3065     * gcInactiveDestinations for matching temps so they can get removed
3066     * when inactive
3067     *
3068     * @param allowTempAutoCreationOnSend
3069     */
3070    public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
3071        this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
3072    }
3073
3074    public long getOfflineDurableSubscriberTimeout() {
3075        return offlineDurableSubscriberTimeout;
3076    }
3077
3078    public void setOfflineDurableSubscriberTimeout(long offlineDurableSubscriberTimeout) {
3079        this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout;
3080    }
3081
3082    public long getOfflineDurableSubscriberTaskSchedule() {
3083        return offlineDurableSubscriberTaskSchedule;
3084    }
3085
3086    public void setOfflineDurableSubscriberTaskSchedule(long offlineDurableSubscriberTaskSchedule) {
3087        this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule;
3088    }
3089
3090    public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) {
3091        return isUseVirtualTopics() && destination.isQueue() &&
3092               getVirtualTopicConsumerDestinationFilter().matches(destination);
3093    }
3094
3095    synchronized public Throwable getStartException() {
3096        return startException;
3097    }
3098
3099    public boolean isStartAsync() {
3100        return startAsync;
3101    }
3102
3103    public void setStartAsync(boolean startAsync) {
3104        this.startAsync = startAsync;
3105    }
3106
3107    public boolean isSlave() {
3108        return this.slave;
3109    }
3110
3111    public boolean isStopping() {
3112        return this.stopping.get();
3113    }
3114
3115    /**
3116     * @return true if the broker allowed to restart on shutdown.
3117     */
3118    public boolean isRestartAllowed() {
3119        return restartAllowed;
3120    }
3121
3122    /**
3123     * Sets if the broker allowed to restart on shutdown.
3124     */
3125    public void setRestartAllowed(boolean restartAllowed) {
3126        this.restartAllowed = restartAllowed;
3127    }
3128
3129    /**
3130     * A lifecycle manager of the BrokerService should
3131     * inspect this property after a broker shutdown has occurred
3132     * to find out if the broker needs to be re-created and started
3133     * again.
3134     *
3135     * @return true if the broker wants to be restarted after it shuts down.
3136     */
3137    public boolean isRestartRequested() {
3138        return restartRequested;
3139    }
3140
3141    public void requestRestart() {
3142        this.restartRequested = true;
3143    }
3144
3145    public int getStoreOpenWireVersion() {
3146        return storeOpenWireVersion;
3147    }
3148
3149    public void setStoreOpenWireVersion(int storeOpenWireVersion) {
3150        this.storeOpenWireVersion = storeOpenWireVersion;
3151    }
3152
3153    /**
3154     * @return the current number of connections on this Broker.
3155     */
3156    public int getCurrentConnections() {
3157        return this.currentConnections.get();
3158    }
3159
3160    /**
3161     * @return the total number of connections this broker has handled since startup.
3162     */
3163    public long getTotalConnections() {
3164        return this.totalConnections.get();
3165    }
3166
3167    public void incrementCurrentConnections() {
3168        this.currentConnections.incrementAndGet();
3169    }
3170
3171    public void decrementCurrentConnections() {
3172        this.currentConnections.decrementAndGet();
3173    }
3174
3175    public void incrementTotalConnections() {
3176        this.totalConnections.incrementAndGet();
3177    }
3178
3179    public boolean isRejectDurableConsumers() {
3180        return rejectDurableConsumers;
3181    }
3182
3183    public void setRejectDurableConsumers(boolean rejectDurableConsumers) {
3184        this.rejectDurableConsumers = rejectDurableConsumers;
3185    }
3186
3187    public boolean isAdjustUsageLimits() {
3188        return adjustUsageLimits;
3189    }
3190
3191    public void setAdjustUsageLimits(boolean adjustUsageLimits) {
3192        this.adjustUsageLimits = adjustUsageLimits;
3193    }
3194
3195    public void setRollbackOnlyOnAsyncException(boolean rollbackOnlyOnAsyncException) {
3196        this.rollbackOnlyOnAsyncException = rollbackOnlyOnAsyncException;
3197    }
3198
3199    public boolean isRollbackOnlyOnAsyncException() {
3200        return rollbackOnlyOnAsyncException;
3201    }
3202
3203    public boolean isUseVirtualDestSubs() {
3204        return useVirtualDestSubs;
3205    }
3206
3207    public void setUseVirtualDestSubs(
3208            boolean useVirtualDestSubs) {
3209        this.useVirtualDestSubs = useVirtualDestSubs;
3210    }
3211
3212    public boolean isUseVirtualDestSubsOnCreation() {
3213        return useVirtualDestSubsOnCreation;
3214    }
3215
3216    public void setUseVirtualDestSubsOnCreation(
3217            boolean useVirtualDestSubsOnCreation) {
3218        this.useVirtualDestSubsOnCreation = useVirtualDestSubsOnCreation;
3219    }
3220}