001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.BufferedReader;
020import java.io.File;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.InputStreamReader;
024import java.net.URI;
025import java.net.URISyntaxException;
026import java.net.UnknownHostException;
027import java.security.Provider;
028import java.security.Security;
029import java.util.ArrayList;
030import java.util.Date;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Locale;
036import java.util.Map;
037import java.util.Set;
038import java.util.concurrent.CopyOnWriteArrayList;
039import java.util.concurrent.CountDownLatch;
040import java.util.concurrent.LinkedBlockingQueue;
041import java.util.concurrent.RejectedExecutionException;
042import java.util.concurrent.RejectedExecutionHandler;
043import java.util.concurrent.SynchronousQueue;
044import java.util.concurrent.ThreadFactory;
045import java.util.concurrent.ThreadPoolExecutor;
046import java.util.concurrent.TimeUnit;
047import java.util.concurrent.atomic.AtomicBoolean;
048import java.util.concurrent.atomic.AtomicInteger;
049import java.util.concurrent.atomic.AtomicLong;
050
051import javax.annotation.PostConstruct;
052import javax.annotation.PreDestroy;
053import javax.management.MalformedObjectNameException;
054import javax.management.ObjectName;
055
056import org.apache.activemq.ActiveMQConnectionMetaData;
057import org.apache.activemq.ConfigurationException;
058import org.apache.activemq.Service;
059import org.apache.activemq.advisory.AdvisoryBroker;
060import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
061import org.apache.activemq.broker.jmx.AnnotatedMBean;
062import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
063import org.apache.activemq.broker.jmx.BrokerView;
064import org.apache.activemq.broker.jmx.ConnectorView;
065import org.apache.activemq.broker.jmx.ConnectorViewMBean;
066import org.apache.activemq.broker.jmx.HealthView;
067import org.apache.activemq.broker.jmx.HealthViewMBean;
068import org.apache.activemq.broker.jmx.JmsConnectorView;
069import org.apache.activemq.broker.jmx.JobSchedulerView;
070import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
071import org.apache.activemq.broker.jmx.Log4JConfigView;
072import org.apache.activemq.broker.jmx.ManagedRegionBroker;
073import org.apache.activemq.broker.jmx.ManagementContext;
074import org.apache.activemq.broker.jmx.NetworkConnectorView;
075import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
076import org.apache.activemq.broker.jmx.ProxyConnectorView;
077import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
078import org.apache.activemq.broker.region.Destination;
079import org.apache.activemq.broker.region.DestinationFactory;
080import org.apache.activemq.broker.region.DestinationFactoryImpl;
081import org.apache.activemq.broker.region.DestinationInterceptor;
082import org.apache.activemq.broker.region.RegionBroker;
083import org.apache.activemq.broker.region.policy.PolicyMap;
084import org.apache.activemq.broker.region.virtual.MirroredQueue;
085import org.apache.activemq.broker.region.virtual.VirtualDestination;
086import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
087import org.apache.activemq.broker.region.virtual.VirtualTopic;
088import org.apache.activemq.broker.scheduler.JobSchedulerStore;
089import org.apache.activemq.broker.scheduler.SchedulerBroker;
090import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore;
091import org.apache.activemq.command.ActiveMQDestination;
092import org.apache.activemq.command.ActiveMQQueue;
093import org.apache.activemq.command.BrokerId;
094import org.apache.activemq.command.ProducerInfo;
095import org.apache.activemq.filter.DestinationFilter;
096import org.apache.activemq.network.ConnectionFilter;
097import org.apache.activemq.network.DiscoveryNetworkConnector;
098import org.apache.activemq.network.NetworkConnector;
099import org.apache.activemq.network.jms.JmsConnector;
100import org.apache.activemq.openwire.OpenWireFormat;
101import org.apache.activemq.proxy.ProxyConnector;
102import org.apache.activemq.security.MessageAuthorizationPolicy;
103import org.apache.activemq.selector.SelectorParser;
104import org.apache.activemq.store.JournaledStore;
105import org.apache.activemq.store.PListStore;
106import org.apache.activemq.store.PersistenceAdapter;
107import org.apache.activemq.store.PersistenceAdapterFactory;
108import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
109import org.apache.activemq.thread.Scheduler;
110import org.apache.activemq.thread.TaskRunnerFactory;
111import org.apache.activemq.transport.TransportFactorySupport;
112import org.apache.activemq.transport.TransportServer;
113import org.apache.activemq.transport.vm.VMTransportFactory;
114import org.apache.activemq.usage.PercentLimitUsage;
115import org.apache.activemq.usage.StoreUsage;
116import org.apache.activemq.usage.SystemUsage;
117import org.apache.activemq.util.BrokerSupport;
118import org.apache.activemq.util.DefaultIOExceptionHandler;
119import org.apache.activemq.util.IOExceptionHandler;
120import org.apache.activemq.util.IOExceptionSupport;
121import org.apache.activemq.util.IOHelper;
122import org.apache.activemq.util.InetAddressUtil;
123import org.apache.activemq.util.ServiceStopper;
124import org.apache.activemq.util.StoreUtil;
125import org.apache.activemq.util.ThreadPoolUtils;
126import org.apache.activemq.util.TimeUtils;
127import org.apache.activemq.util.URISupport;
128import org.slf4j.Logger;
129import org.slf4j.LoggerFactory;
130import org.slf4j.MDC;
131
132/**
133 * Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a
134 * number of transport connectors, network connectors and a bunch of properties
135 * which can be used to configure the broker as its lazily created.
136 *
137 * @org.apache.xbean.XBean
138 */
139public class BrokerService implements Service {
140    public static final String DEFAULT_PORT = "61616";
141    public static final String LOCAL_HOST_NAME;
142    public static final String BROKER_VERSION;
143    public static final String DEFAULT_BROKER_NAME = "localhost";
144    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
145    public static final long DEFAULT_START_TIMEOUT = 600000L;
146
147    private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
148
149    @SuppressWarnings("unused")
150    private static final long serialVersionUID = 7353129142305630237L;
151
152    private boolean useJmx = true;
153    private boolean enableStatistics = true;
154    private boolean persistent = true;
155    private boolean populateJMSXUserID;
156    private boolean useAuthenticatedPrincipalForJMSXUserID;
157    private boolean populateUserNameInMBeans;
158    private long mbeanInvocationTimeout = 0;
159
160    private boolean useShutdownHook = true;
161    private boolean useLoggingForShutdownErrors;
162    private boolean shutdownOnMasterFailure;
163    private boolean shutdownOnSlaveFailure;
164    private boolean waitForSlave;
165    private long waitForSlaveTimeout = DEFAULT_START_TIMEOUT;
166    private boolean passiveSlave;
167    private String brokerName = DEFAULT_BROKER_NAME;
168    private File dataDirectoryFile;
169    private File tmpDataDirectory;
170    private Broker broker;
171    private BrokerView adminView;
172    private ManagementContext managementContext;
173    private ObjectName brokerObjectName;
174    private TaskRunnerFactory taskRunnerFactory;
175    private TaskRunnerFactory persistenceTaskRunnerFactory;
176    private SystemUsage systemUsage;
177    private SystemUsage producerSystemUsage;
178    private SystemUsage consumerSystemUsaage;
179    private PersistenceAdapter persistenceAdapter;
180    private PersistenceAdapterFactory persistenceFactory;
181    protected DestinationFactory destinationFactory;
182    private MessageAuthorizationPolicy messageAuthorizationPolicy;
183    private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
184    private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
185    private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
186    private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
187    private final List<Service> services = new ArrayList<Service>();
188    private transient Thread shutdownHook;
189    private String[] transportConnectorURIs;
190    private String[] networkConnectorURIs;
191    private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
192    // to other jms messaging systems
193    private boolean deleteAllMessagesOnStartup;
194    private boolean advisorySupport = true;
195    private URI vmConnectorURI;
196    private String defaultSocketURIString;
197    private PolicyMap destinationPolicy;
198    private final AtomicBoolean started = new AtomicBoolean(false);
199    private final AtomicBoolean stopped = new AtomicBoolean(false);
200    private final AtomicBoolean stopping = new AtomicBoolean(false);
201    private BrokerPlugin[] plugins;
202    private boolean keepDurableSubsActive = true;
203    private boolean enableMessageExpirationOnActiveDurableSubs = false;
204    private boolean useVirtualTopics = true;
205    private boolean useMirroredQueues = false;
206    private boolean useTempMirroredQueues = true;
207    /**
208     * Whether or not virtual destination subscriptions should cause network demand
209     */
210    private boolean useVirtualDestSubs = false;
211    /**
212     * Whether or no the creation of destinations that match virtual destinations
213     * should cause network demand
214     */
215    private boolean useVirtualDestSubsOnCreation = false;
216    private BrokerId brokerId;
217    private volatile DestinationInterceptor[] destinationInterceptors;
218    private ActiveMQDestination[] destinations;
219    private PListStore tempDataStore;
220    private int persistenceThreadPriority = Thread.MAX_PRIORITY;
221    private boolean useLocalHostBrokerName;
222    private final CountDownLatch stoppedLatch = new CountDownLatch(1);
223    private final CountDownLatch startedLatch = new CountDownLatch(1);
224    private Broker regionBroker;
225    private int producerSystemUsagePortion = 60;
226    private int consumerSystemUsagePortion = 40;
227    private boolean splitSystemUsageForProducersConsumers;
228    private boolean monitorConnectionSplits = false;
229    private int taskRunnerPriority = Thread.NORM_PRIORITY;
230    private boolean dedicatedTaskRunner;
231    private boolean cacheTempDestinations = false;// useful for failover
232    private int timeBeforePurgeTempDestinations = 5000;
233    private final List<Runnable> shutdownHooks = new ArrayList<Runnable>();
234    private boolean systemExitOnShutdown;
235    private int systemExitOnShutdownExitCode;
236    private SslContext sslContext;
237    private boolean forceStart = false;
238    private IOExceptionHandler ioExceptionHandler;
239    private boolean schedulerSupport = false;
240    private File schedulerDirectoryFile;
241    private Scheduler scheduler;
242    private ThreadPoolExecutor executor;
243    private int schedulePeriodForDestinationPurge= 0;
244    private int maxPurgedDestinationsPerSweep = 0;
245    private int schedulePeriodForDiskUsageCheck = 0;
246    private int diskUsageCheckRegrowThreshold = -1;
247    private boolean adjustUsageLimits = true;
248    private BrokerContext brokerContext;
249    private boolean networkConnectorStartAsync = false;
250    private boolean allowTempAutoCreationOnSend;
251    private JobSchedulerStore jobSchedulerStore;
252    private final AtomicLong totalConnections = new AtomicLong();
253    private final AtomicInteger currentConnections = new AtomicInteger();
254
255    private long offlineDurableSubscriberTimeout = -1;
256    private long offlineDurableSubscriberTaskSchedule = 300000;
257    private DestinationFilter virtualConsumerDestinationFilter;
258
259    private final Object persistenceAdapterLock = new Object();
260    private Throwable startException = null;
261    private boolean startAsync = false;
262    private Date startDate;
263    private boolean slave = true;
264
265    private boolean restartAllowed = true;
266    private boolean restartRequested = false;
267    private boolean rejectDurableConsumers = false;
268    private boolean rollbackOnlyOnAsyncException = true;
269
270    private int storeOpenWireVersion = OpenWireFormat.DEFAULT_VERSION;
271
272    static {
273
274        try {
275            ClassLoader loader = BrokerService.class.getClassLoader();
276            Class<?> clazz = loader.loadClass("org.bouncycastle.jce.provider.BouncyCastleProvider");
277            Provider bouncycastle = (Provider) clazz.newInstance();
278            Security.insertProviderAt(bouncycastle,
279                Integer.getInteger("org.apache.activemq.broker.BouncyCastlePosition", 2));
280            LOG.info("Loaded the Bouncy Castle security provider.");
281        } catch(Throwable e) {
282            // No BouncyCastle found so we use the default Java Security Provider
283        }
284
285        String localHostName = "localhost";
286        try {
287            localHostName =  InetAddressUtil.getLocalHostName();
288        } catch (UnknownHostException e) {
289            LOG.error("Failed to resolve localhost");
290        }
291        LOCAL_HOST_NAME = localHostName;
292
293        String version = null;
294        try(InputStream in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) {
295            if (in != null) {
296                try(InputStreamReader isr = new InputStreamReader(in);
297                    BufferedReader reader = new BufferedReader(isr)) {
298                    version = reader.readLine();
299                }
300            }
301        } catch (IOException ie) {
302            LOG.warn("Error reading broker version ", ie);
303        }
304        BROKER_VERSION = version;
305    }
306
307    @Override
308    public String toString() {
309        return "BrokerService[" + getBrokerName() + "]";
310    }
311
312    private String getBrokerVersion() {
313        String version = ActiveMQConnectionMetaData.PROVIDER_VERSION;
314        if (version == null) {
315            version = BROKER_VERSION;
316        }
317
318        return version;
319    }
320
321    /**
322     * Adds a new transport connector for the given bind address
323     *
324     * @return the newly created and added transport connector
325     * @throws Exception
326     */
327    public TransportConnector addConnector(String bindAddress) throws Exception {
328        return addConnector(new URI(bindAddress));
329    }
330
331    /**
332     * Adds a new transport connector for the given bind address
333     *
334     * @return the newly created and added transport connector
335     * @throws Exception
336     */
337    public TransportConnector addConnector(URI bindAddress) throws Exception {
338        return addConnector(createTransportConnector(bindAddress));
339    }
340
341    /**
342     * Adds a new transport connector for the given TransportServer transport
343     *
344     * @return the newly created and added transport connector
345     * @throws Exception
346     */
347    public TransportConnector addConnector(TransportServer transport) throws Exception {
348        return addConnector(new TransportConnector(transport));
349    }
350
351    /**
352     * Adds a new transport connector
353     *
354     * @return the transport connector
355     * @throws Exception
356     */
357    public TransportConnector addConnector(TransportConnector connector) throws Exception {
358        transportConnectors.add(connector);
359        return connector;
360    }
361
362    /**
363     * Stops and removes a transport connector from the broker.
364     *
365     * @param connector
366     * @return true if the connector has been previously added to the broker
367     * @throws Exception
368     */
369    public boolean removeConnector(TransportConnector connector) throws Exception {
370        boolean rc = transportConnectors.remove(connector);
371        if (rc) {
372            unregisterConnectorMBean(connector);
373        }
374        return rc;
375    }
376
377    /**
378     * Adds a new network connector using the given discovery address
379     *
380     * @return the newly created and added network connector
381     * @throws Exception
382     */
383    public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
384        return addNetworkConnector(new URI(discoveryAddress));
385    }
386
387    /**
388     * Adds a new proxy connector using the given bind address
389     *
390     * @return the newly created and added network connector
391     * @throws Exception
392     */
393    public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
394        return addProxyConnector(new URI(bindAddress));
395    }
396
397    /**
398     * Adds a new network connector using the given discovery address
399     *
400     * @return the newly created and added network connector
401     * @throws Exception
402     */
403    public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
404        NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
405        return addNetworkConnector(connector);
406    }
407
408    /**
409     * Adds a new proxy connector using the given bind address
410     *
411     * @return the newly created and added network connector
412     * @throws Exception
413     */
414    public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
415        ProxyConnector connector = new ProxyConnector();
416        connector.setBind(bindAddress);
417        connector.setRemote(new URI("fanout:multicast://default"));
418        return addProxyConnector(connector);
419    }
420
421    /**
422     * Adds a new network connector to connect this broker to a federated
423     * network
424     */
425    public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
426        connector.setBrokerService(this);
427        connector.setLocalUri(getVmConnectorURI());
428        // Set a connection filter so that the connector does not establish loop
429        // back connections.
430        connector.setConnectionFilter(new ConnectionFilter() {
431            @Override
432            public boolean connectTo(URI location) {
433                List<TransportConnector> transportConnectors = getTransportConnectors();
434                for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
435                    try {
436                        TransportConnector tc = iter.next();
437                        if (location.equals(tc.getConnectUri())) {
438                            return false;
439                        }
440                    } catch (Throwable e) {
441                    }
442                }
443                return true;
444            }
445        });
446        networkConnectors.add(connector);
447        return connector;
448    }
449
450    /**
451     * Removes the given network connector without stopping it. The caller
452     * should call {@link NetworkConnector#stop()} to close the connector
453     */
454    public boolean removeNetworkConnector(NetworkConnector connector) {
455        boolean answer = networkConnectors.remove(connector);
456        if (answer) {
457            unregisterNetworkConnectorMBean(connector);
458        }
459        return answer;
460    }
461
462    public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
463        URI uri = getVmConnectorURI();
464        connector.setLocalUri(uri);
465        proxyConnectors.add(connector);
466        if (isUseJmx()) {
467            registerProxyConnectorMBean(connector);
468        }
469        return connector;
470    }
471
472    public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
473        connector.setBrokerService(this);
474        jmsConnectors.add(connector);
475        if (isUseJmx()) {
476            registerJmsConnectorMBean(connector);
477        }
478        return connector;
479    }
480
481    public JmsConnector removeJmsConnector(JmsConnector connector) {
482        if (jmsConnectors.remove(connector)) {
483            return connector;
484        }
485        return null;
486    }
487
488    public void masterFailed() {
489        if (shutdownOnMasterFailure) {
490            LOG.error("The Master has failed ... shutting down");
491            try {
492                stop();
493            } catch (Exception e) {
494                LOG.error("Failed to stop for master failure", e);
495            }
496        } else {
497            LOG.warn("Master Failed - starting all connectors");
498            try {
499                startAllConnectors();
500                broker.nowMasterBroker();
501            } catch (Exception e) {
502                LOG.error("Failed to startAllConnectors", e);
503            }
504        }
505    }
506
507    public String getUptime() {
508        long delta = getUptimeMillis();
509
510        if (delta == 0) {
511            return "not started";
512        }
513
514        return TimeUtils.printDuration(delta);
515    }
516
517    public long getUptimeMillis() {
518        if (startDate == null) {
519            return 0;
520        }
521
522        return new Date().getTime() - startDate.getTime();
523    }
524
525    public boolean isStarted() {
526        return started.get() && startedLatch.getCount() == 0;
527    }
528
529    /**
530     * Forces a start of the broker.
531     * By default a BrokerService instance that was
532     * previously stopped using BrokerService.stop() cannot be restarted
533     * using BrokerService.start().
534     * This method enforces a restart.
535     * It is not recommended to force a restart of the broker and will not work
536     * for most but some very trivial broker configurations.
537     * For restarting a broker instance we recommend to first call stop() on
538     * the old instance and then recreate a new BrokerService instance.
539     *
540     * @param force - if true enforces a restart.
541     * @throws Exception
542     */
543    public void start(boolean force) throws Exception {
544        forceStart = force;
545        stopped.set(false);
546        started.set(false);
547        start();
548    }
549
550    // Service interface
551    // -------------------------------------------------------------------------
552
553    protected boolean shouldAutostart() {
554        return true;
555    }
556
557    /**
558     * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
559     *
560     * delegates to autoStart, done to prevent backwards incompatible signature change
561     */
562    @PostConstruct
563    private void postConstruct() {
564        try {
565            autoStart();
566        } catch (Exception ex) {
567            throw new RuntimeException(ex);
568        }
569    }
570
571    /**
572     *
573     * @throws Exception
574     * @org. apache.xbean.InitMethod
575     */
576    public void autoStart() throws Exception {
577        if(shouldAutostart()) {
578            start();
579        }
580    }
581
582    @Override
583    public void start() throws Exception {
584        if (stopped.get() || !started.compareAndSet(false, true)) {
585            // lets just ignore redundant start() calls
586            // as its way too easy to not be completely sure if start() has been
587            // called or not with the gazillion of different configuration
588            // mechanisms
589            // throw new IllegalStateException("Already started.");
590            return;
591        }
592
593        setStartException(null);
594        stopping.set(false);
595        startDate = new Date();
596        MDC.put("activemq.broker", brokerName);
597
598        try {
599            checkMemorySystemUsageLimits();
600            if (systemExitOnShutdown && useShutdownHook) {
601                throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
602            }
603            processHelperProperties();
604            if (isUseJmx()) {
605                // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and
606                // we cannot cleanup clear that during shutdown of the broker.
607                MDC.remove("activemq.broker");
608                try {
609                    startManagementContext();
610                    for (NetworkConnector connector : getNetworkConnectors()) {
611                        registerNetworkConnectorMBean(connector);
612                    }
613                } finally {
614                    MDC.put("activemq.broker", brokerName);
615                }
616            }
617
618            // in jvm master slave, lets not publish over existing broker till we get the lock
619            final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance();
620            if (brokerRegistry.lookup(getBrokerName()) == null) {
621                brokerRegistry.bind(getBrokerName(), BrokerService.this);
622            }
623            startPersistenceAdapter(startAsync);
624            startBroker(startAsync);
625            brokerRegistry.bind(getBrokerName(), BrokerService.this);
626        } catch (Exception e) {
627            LOG.error("Failed to start Apache ActiveMQ (" + getBrokerName() + ", " + brokerId + ")", e);
628            try {
629                if (!stopped.get()) {
630                    stop();
631                }
632            } catch (Exception ex) {
633                LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex);
634            }
635            throw e;
636        } finally {
637            MDC.remove("activemq.broker");
638        }
639    }
640
641    private void startPersistenceAdapter(boolean async) throws Exception {
642        if (async) {
643            new Thread("Persistence Adapter Starting Thread") {
644                @Override
645                public void run() {
646                    try {
647                        doStartPersistenceAdapter();
648                    } catch (Throwable e) {
649                        setStartException(e);
650                    } finally {
651                        synchronized (persistenceAdapterLock) {
652                            persistenceAdapterLock.notifyAll();
653                        }
654                    }
655                }
656            }.start();
657        } else {
658            doStartPersistenceAdapter();
659        }
660    }
661
662    private void doStartPersistenceAdapter() throws Exception {
663        PersistenceAdapter persistenceAdapterToStart = getPersistenceAdapter();
664        if (persistenceAdapterToStart == null) {
665            checkStartException();
666            throw new ConfigurationException("Cannot start null persistence adapter");
667        }
668        persistenceAdapterToStart.setUsageManager(getProducerSystemUsage());
669        persistenceAdapterToStart.setBrokerName(getBrokerName());
670        LOG.info("Using Persistence Adapter: {}", persistenceAdapterToStart);
671        if (deleteAllMessagesOnStartup) {
672            deleteAllMessages();
673        }
674        persistenceAdapterToStart.start();
675
676        getTempDataStore();
677        if (tempDataStore != null) {
678            try {
679                // start after we have the store lock
680                tempDataStore.start();
681            } catch (Exception e) {
682                RuntimeException exception = new RuntimeException(
683                        "Failed to start temp data store: " + tempDataStore, e);
684                LOG.error(exception.getLocalizedMessage(), e);
685                throw exception;
686            }
687        }
688
689        getJobSchedulerStore();
690        if (jobSchedulerStore != null) {
691            try {
692                jobSchedulerStore.start();
693            } catch (Exception e) {
694                RuntimeException exception = new RuntimeException(
695                        "Failed to start job scheduler store: " + jobSchedulerStore, e);
696                LOG.error(exception.getLocalizedMessage(), e);
697                throw exception;
698            }
699        }
700    }
701
702    private void startBroker(boolean async) throws Exception {
703        if (async) {
704            new Thread("Broker Starting Thread") {
705                @Override
706                public void run() {
707                    try {
708                        synchronized (persistenceAdapterLock) {
709                            persistenceAdapterLock.wait();
710                        }
711                        doStartBroker();
712                    } catch (Throwable t) {
713                        setStartException(t);
714                    }
715                }
716            }.start();
717        } else {
718            doStartBroker();
719        }
720    }
721
722    private void doStartBroker() throws Exception {
723        checkStartException();
724        startDestinations();
725        addShutdownHook();
726
727        broker = getBroker();
728        brokerId = broker.getBrokerId();
729
730        // need to log this after creating the broker so we have its id and name
731        LOG.info("Apache ActiveMQ {} ({}, {}) is starting", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId });
732        broker.start();
733
734        if (isUseJmx()) {
735            if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
736                // try to restart management context
737                // typical for slaves that use the same ports as master
738                managementContext.stop();
739                startManagementContext();
740            }
741            ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
742            managedBroker.setContextBroker(broker);
743            adminView.setBroker(managedBroker);
744        }
745
746        if (ioExceptionHandler == null) {
747            setIoExceptionHandler(new DefaultIOExceptionHandler());
748        }
749
750        if (isUseJmx() && Log4JConfigView.isLog4JAvailable()) {
751            ObjectName objectName = BrokerMBeanSupport.createLog4JConfigViewName(getBrokerObjectName().toString());
752            Log4JConfigView log4jConfigView = new Log4JConfigView();
753            AnnotatedMBean.registerMBean(getManagementContext(), log4jConfigView, objectName);
754        }
755
756        startAllConnectors();
757
758        LOG.info("Apache ActiveMQ {} ({}, {}) started", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId});
759        LOG.info("For help or more information please see: http://activemq.apache.org");
760
761        getBroker().brokerServiceStarted();
762        checkStoreSystemUsageLimits();
763        startedLatch.countDown();
764        getBroker().nowMasterBroker();
765    }
766
767    /**
768     * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
769     *
770     * delegates to stop, done to prevent backwards incompatible signature change
771     */
772    @PreDestroy
773    private void preDestroy () {
774        try {
775            stop();
776        } catch (Exception ex) {
777            throw new RuntimeException();
778        }
779    }
780
781    /**
782     *
783     * @throws Exception
784     * @org.apache .xbean.DestroyMethod
785     */
786    @Override
787    public void stop() throws Exception {
788        if (!stopping.compareAndSet(false, true)) {
789            LOG.trace("Broker already stopping/stopped");
790            return;
791        }
792
793        setStartException(new BrokerStoppedException("Stop invoked"));
794        MDC.put("activemq.broker", brokerName);
795
796        if (systemExitOnShutdown) {
797            new Thread() {
798                @Override
799                public void run() {
800                    System.exit(systemExitOnShutdownExitCode);
801                }
802            }.start();
803        }
804
805        LOG.info("Apache ActiveMQ {} ({}, {}) is shutting down", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId} );
806
807        removeShutdownHook();
808        if (this.scheduler != null) {
809            this.scheduler.stop();
810            this.scheduler = null;
811        }
812        ServiceStopper stopper = new ServiceStopper();
813        if (services != null) {
814            for (Service service : services) {
815                stopper.stop(service);
816            }
817        }
818        stopAllConnectors(stopper);
819        this.slave = true;
820        // remove any VMTransports connected
821        // this has to be done after services are stopped,
822        // to avoid timing issue with discovery (spinning up a new instance)
823        BrokerRegistry.getInstance().unbind(getBrokerName());
824        VMTransportFactory.stopped(getBrokerName());
825        if (broker != null) {
826            stopper.stop(broker);
827            broker = null;
828        }
829
830        if (jobSchedulerStore != null) {
831            jobSchedulerStore.stop();
832            jobSchedulerStore = null;
833        }
834        if (tempDataStore != null) {
835            tempDataStore.stop();
836            tempDataStore = null;
837        }
838        try {
839            stopper.stop(getPersistenceAdapter());
840            persistenceAdapter = null;
841            if (isUseJmx()) {
842                stopper.stop(managementContext);
843                managementContext = null;
844            }
845            // Clear SelectorParser cache to free memory
846            SelectorParser.clearCache();
847        } finally {
848            started.set(false);
849            stopped.set(true);
850            stoppedLatch.countDown();
851        }
852
853        if (this.taskRunnerFactory != null) {
854            this.taskRunnerFactory.shutdown();
855            this.taskRunnerFactory = null;
856        }
857        if (this.executor != null) {
858            ThreadPoolUtils.shutdownNow(executor);
859            this.executor = null;
860        }
861
862        this.destinationInterceptors = null;
863        this.destinationFactory = null;
864
865        if (startDate != null) {
866            LOG.info("Apache ActiveMQ {} ({}, {}) uptime {}", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId, getUptime()});
867        }
868        LOG.info("Apache ActiveMQ {} ({}, {}) is shutdown", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId});
869
870        synchronized (shutdownHooks) {
871            for (Runnable hook : shutdownHooks) {
872                try {
873                    hook.run();
874                } catch (Throwable e) {
875                    stopper.onException(hook, e);
876                }
877            }
878        }
879
880        MDC.remove("activemq.broker");
881
882        // and clear start date
883        startDate = null;
884
885        stopper.throwFirstException();
886    }
887
888    public boolean checkQueueSize(String queueName) {
889        long count = 0;
890        long queueSize = 0;
891        Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
892        for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
893            if (entry.getKey().isQueue()) {
894                if (entry.getValue().getName().matches(queueName)) {
895                    queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
896                    count += queueSize;
897                    if (queueSize > 0) {
898                        LOG.info("Queue has pending message: {} queueSize is: {}", entry.getValue().getName(), queueSize);
899                    }
900                }
901            }
902        }
903        return count == 0;
904    }
905
906    /**
907     * This method (both connectorName and queueName are using regex to match)
908     * 1. stop the connector (supposed the user input the connector which the
909     * clients connect to) 2. to check whether there is any pending message on
910     * the queues defined by queueName 3. supposedly, after stop the connector,
911     * client should failover to other broker and pending messages should be
912     * forwarded. if no pending messages, the method finally call stop to stop
913     * the broker.
914     *
915     * @param connectorName
916     * @param queueName
917     * @param timeout
918     * @param pollInterval
919     * @throws Exception
920     */
921    public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception {
922        if (isUseJmx()) {
923            if (connectorName == null || queueName == null || timeout <= 0) {
924                throw new Exception(
925                        "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully.");
926            }
927            if (pollInterval <= 0) {
928                pollInterval = 30;
929            }
930            LOG.info("Stop gracefully with connectorName: {} queueName: {} timeout: {} pollInterval: {}", new Object[]{
931                    connectorName, queueName, timeout, pollInterval
932            });
933            TransportConnector connector;
934            for (int i = 0; i < transportConnectors.size(); i++) {
935                connector = transportConnectors.get(i);
936                if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) {
937                    connector.stop();
938                }
939            }
940            long start = System.currentTimeMillis();
941            while (System.currentTimeMillis() - start < timeout * 1000) {
942                // check quesize until it gets zero
943                if (checkQueueSize(queueName)) {
944                    stop();
945                    break;
946                } else {
947                    Thread.sleep(pollInterval * 1000);
948                }
949            }
950            if (stopped.get()) {
951                LOG.info("Successfully stop the broker.");
952            } else {
953                LOG.info("There is still pending message on the queue. Please check and stop the broker manually.");
954            }
955        }
956    }
957
958    /**
959     * A helper method to block the caller thread until the broker has been
960     * stopped
961     */
962    public void waitUntilStopped() {
963        while (isStarted() && !stopped.get()) {
964            try {
965                stoppedLatch.await();
966            } catch (InterruptedException e) {
967                // ignore
968            }
969        }
970    }
971
972    public boolean isStopped() {
973        return stopped.get();
974    }
975
976    /**
977     * A helper method to block the caller thread until the broker has fully started
978     * @return boolean true if wait succeeded false if broker was not started or was stopped
979     */
980    public boolean waitUntilStarted() {
981        return waitUntilStarted(DEFAULT_START_TIMEOUT);
982    }
983
984    /**
985     * A helper method to block the caller thread until the broker has fully started
986     *
987     * @param timeout
988     *        the amount of time to wait before giving up and returning false.
989     *
990     * @return boolean true if wait succeeded false if broker was not started or was stopped
991     */
992    public boolean waitUntilStarted(long timeout) {
993        boolean waitSucceeded = isStarted();
994        long expiration = Math.max(0, timeout + System.currentTimeMillis());
995        while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) {
996            try {
997                if (getStartException() != null) {
998                    return waitSucceeded;
999                }
1000                waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
1001            } catch (InterruptedException ignore) {
1002            }
1003        }
1004        return waitSucceeded;
1005    }
1006
1007    // Properties
1008    // -------------------------------------------------------------------------
1009    /**
1010     * Returns the message broker
1011     */
1012    public Broker getBroker() throws Exception {
1013        if (broker == null) {
1014            checkStartException();
1015            broker = createBroker();
1016        }
1017        return broker;
1018    }
1019
1020    /**
1021     * Returns the administration view of the broker; used to create and destroy
1022     * resources such as queues and topics. Note this method returns null if JMX
1023     * is disabled.
1024     */
1025    public BrokerView getAdminView() throws Exception {
1026        if (adminView == null) {
1027            // force lazy creation
1028            getBroker();
1029        }
1030        return adminView;
1031    }
1032
1033    public void setAdminView(BrokerView adminView) {
1034        this.adminView = adminView;
1035    }
1036
1037    public String getBrokerName() {
1038        return brokerName;
1039    }
1040
1041    /**
1042     * Sets the name of this broker; which must be unique in the network
1043     *
1044     * @param brokerName
1045     */
1046    private static final String brokerNameReplacedCharsRegExp = "[^a-zA-Z0-9\\.\\_\\-\\:]";
1047    public void setBrokerName(String brokerName) {
1048        if (brokerName == null) {
1049            throw new NullPointerException("The broker name cannot be null");
1050        }
1051        String str = brokerName.replaceAll(brokerNameReplacedCharsRegExp, "_");
1052        if (!str.equals(brokerName)) {
1053            LOG.error("Broker Name: {} contained illegal characters matching regExp: {} - replaced with {}", brokerName, brokerNameReplacedCharsRegExp, str);
1054        }
1055        this.brokerName = str.trim();
1056    }
1057
1058    public PersistenceAdapterFactory getPersistenceFactory() {
1059        return persistenceFactory;
1060    }
1061
1062    public File getDataDirectoryFile() {
1063        if (dataDirectoryFile == null) {
1064            dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
1065        }
1066        return dataDirectoryFile;
1067    }
1068
1069    public File getBrokerDataDirectory() {
1070        String brokerDir = getBrokerName();
1071        return new File(getDataDirectoryFile(), brokerDir);
1072    }
1073
1074    /**
1075     * Sets the directory in which the data files will be stored by default for
1076     * the JDBC and Journal persistence adaptors.
1077     *
1078     * @param dataDirectory
1079     *            the directory to store data files
1080     */
1081    public void setDataDirectory(String dataDirectory) {
1082        setDataDirectoryFile(new File(dataDirectory));
1083    }
1084
1085    /**
1086     * Sets the directory in which the data files will be stored by default for
1087     * the JDBC and Journal persistence adaptors.
1088     *
1089     * @param dataDirectoryFile
1090     *            the directory to store data files
1091     */
1092    public void setDataDirectoryFile(File dataDirectoryFile) {
1093        this.dataDirectoryFile = dataDirectoryFile;
1094    }
1095
1096    /**
1097     * @return the tmpDataDirectory
1098     */
1099    public File getTmpDataDirectory() {
1100        if (tmpDataDirectory == null) {
1101            tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
1102        }
1103        return tmpDataDirectory;
1104    }
1105
1106    /**
1107     * @param tmpDataDirectory
1108     *            the tmpDataDirectory to set
1109     */
1110    public void setTmpDataDirectory(File tmpDataDirectory) {
1111        this.tmpDataDirectory = tmpDataDirectory;
1112    }
1113
1114    public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
1115        this.persistenceFactory = persistenceFactory;
1116    }
1117
1118    public void setDestinationFactory(DestinationFactory destinationFactory) {
1119        this.destinationFactory = destinationFactory;
1120    }
1121
1122    public boolean isPersistent() {
1123        return persistent;
1124    }
1125
1126    /**
1127     * Sets whether or not persistence is enabled or disabled.
1128     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1129     */
1130    public void setPersistent(boolean persistent) {
1131        this.persistent = persistent;
1132    }
1133
1134    public boolean isPopulateJMSXUserID() {
1135        return populateJMSXUserID;
1136    }
1137
1138    /**
1139     * Sets whether or not the broker should populate the JMSXUserID header.
1140     */
1141    public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
1142        this.populateJMSXUserID = populateJMSXUserID;
1143    }
1144
1145    public SystemUsage getSystemUsage() {
1146        try {
1147            if (systemUsage == null) {
1148
1149                systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore());
1150                systemUsage.setExecutor(getExecutor());
1151                systemUsage.getMemoryUsage().setLimit(1024L * 1024 * 1024 * 1); // 1 GB
1152                systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
1153                systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB
1154                systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
1155                addService(this.systemUsage);
1156            }
1157            return systemUsage;
1158        } catch (IOException e) {
1159            LOG.error("Cannot create SystemUsage", e);
1160            throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage(), e);
1161        }
1162    }
1163
1164    public void setSystemUsage(SystemUsage memoryManager) {
1165        if (this.systemUsage != null) {
1166            removeService(this.systemUsage);
1167        }
1168        this.systemUsage = memoryManager;
1169        if (this.systemUsage.getExecutor()==null) {
1170            this.systemUsage.setExecutor(getExecutor());
1171        }
1172        addService(this.systemUsage);
1173    }
1174
1175    /**
1176     * @return the consumerUsageManager
1177     * @throws IOException
1178     */
1179    public SystemUsage getConsumerSystemUsage() throws IOException {
1180        if (this.consumerSystemUsaage == null) {
1181            if (splitSystemUsageForProducersConsumers) {
1182                this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
1183                float portion = consumerSystemUsagePortion / 100f;
1184                this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
1185                addService(this.consumerSystemUsaage);
1186            } else {
1187                consumerSystemUsaage = getSystemUsage();
1188            }
1189        }
1190        return this.consumerSystemUsaage;
1191    }
1192
1193    /**
1194     * @param consumerSystemUsaage
1195     *            the storeSystemUsage to set
1196     */
1197    public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
1198        if (this.consumerSystemUsaage != null) {
1199            removeService(this.consumerSystemUsaage);
1200        }
1201        this.consumerSystemUsaage = consumerSystemUsaage;
1202        addService(this.consumerSystemUsaage);
1203    }
1204
1205    /**
1206     * @return the producerUsageManager
1207     * @throws IOException
1208     */
1209    public SystemUsage getProducerSystemUsage() throws IOException {
1210        if (producerSystemUsage == null) {
1211            if (splitSystemUsageForProducersConsumers) {
1212                producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
1213                float portion = producerSystemUsagePortion / 100f;
1214                producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
1215                addService(producerSystemUsage);
1216            } else {
1217                producerSystemUsage = getSystemUsage();
1218            }
1219        }
1220        return producerSystemUsage;
1221    }
1222
1223    /**
1224     * @param producerUsageManager
1225     *            the producerUsageManager to set
1226     */
1227    public void setProducerSystemUsage(SystemUsage producerUsageManager) {
1228        if (this.producerSystemUsage != null) {
1229            removeService(this.producerSystemUsage);
1230        }
1231        this.producerSystemUsage = producerUsageManager;
1232        addService(this.producerSystemUsage);
1233    }
1234
1235    public synchronized PersistenceAdapter getPersistenceAdapter() throws IOException {
1236        if (persistenceAdapter == null && !hasStartException()) {
1237            persistenceAdapter = createPersistenceAdapter();
1238            configureService(persistenceAdapter);
1239            this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1240        }
1241        return persistenceAdapter;
1242    }
1243
1244    /**
1245     * Sets the persistence adaptor implementation to use for this broker
1246     *
1247     * @throws IOException
1248     */
1249    public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
1250        if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) {
1251            LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: {}", persistenceAdapter);
1252            return;
1253        }
1254        this.persistenceAdapter = persistenceAdapter;
1255        configureService(this.persistenceAdapter);
1256        this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1257    }
1258
1259    public TaskRunnerFactory getTaskRunnerFactory() {
1260        if (this.taskRunnerFactory == null) {
1261            this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
1262                    isDedicatedTaskRunner());
1263            this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader());
1264        }
1265        return this.taskRunnerFactory;
1266    }
1267
1268    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
1269        this.taskRunnerFactory = taskRunnerFactory;
1270    }
1271
1272    public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
1273        if (taskRunnerFactory == null) {
1274            persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
1275                    true, 1000, isDedicatedTaskRunner());
1276        }
1277        return persistenceTaskRunnerFactory;
1278    }
1279
1280    public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
1281        this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
1282    }
1283
1284    public boolean isUseJmx() {
1285        return useJmx;
1286    }
1287
1288    public boolean isEnableStatistics() {
1289        return enableStatistics;
1290    }
1291
1292    /**
1293     * Sets whether or not the Broker's services enable statistics or not.
1294     */
1295    public void setEnableStatistics(boolean enableStatistics) {
1296        this.enableStatistics = enableStatistics;
1297    }
1298
1299    /**
1300     * Sets whether or not the Broker's services should be exposed into JMX or
1301     * not.
1302     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1303     */
1304    public void setUseJmx(boolean useJmx) {
1305        this.useJmx = useJmx;
1306    }
1307
1308    public ObjectName getBrokerObjectName() throws MalformedObjectNameException {
1309        if (brokerObjectName == null) {
1310            brokerObjectName = createBrokerObjectName();
1311        }
1312        return brokerObjectName;
1313    }
1314
1315    /**
1316     * Sets the JMX ObjectName for this broker
1317     */
1318    public void setBrokerObjectName(ObjectName brokerObjectName) {
1319        this.brokerObjectName = brokerObjectName;
1320    }
1321
1322    public ManagementContext getManagementContext() {
1323        if (managementContext == null) {
1324            checkStartException();
1325            managementContext = new ManagementContext();
1326        }
1327        return managementContext;
1328    }
1329
1330    synchronized private void checkStartException() {
1331        if (startException != null) {
1332            throw new BrokerStoppedException(startException);
1333        }
1334    }
1335
1336    synchronized private boolean hasStartException() {
1337        return startException != null;
1338    }
1339
1340    synchronized private void setStartException(Throwable t) {
1341        startException = t;
1342    }
1343
1344    public void setManagementContext(ManagementContext managementContext) {
1345        this.managementContext = managementContext;
1346    }
1347
1348    public NetworkConnector getNetworkConnectorByName(String connectorName) {
1349        for (NetworkConnector connector : networkConnectors) {
1350            if (connector.getName().equals(connectorName)) {
1351                return connector;
1352            }
1353        }
1354        return null;
1355    }
1356
1357    public String[] getNetworkConnectorURIs() {
1358        return networkConnectorURIs;
1359    }
1360
1361    public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
1362        this.networkConnectorURIs = networkConnectorURIs;
1363    }
1364
1365    public TransportConnector getConnectorByName(String connectorName) {
1366        for (TransportConnector connector : transportConnectors) {
1367            if (connector.getName().equals(connectorName)) {
1368                return connector;
1369            }
1370        }
1371        return null;
1372    }
1373
1374    public Map<String, String> getTransportConnectorURIsAsMap() {
1375        Map<String, String> answer = new HashMap<String, String>();
1376        for (TransportConnector connector : transportConnectors) {
1377            try {
1378                URI uri = connector.getConnectUri();
1379                if (uri != null) {
1380                    String scheme = uri.getScheme();
1381                    if (scheme != null) {
1382                        answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString());
1383                    }
1384                }
1385            } catch (Exception e) {
1386                LOG.debug("Failed to read URI to build transportURIsAsMap", e);
1387            }
1388        }
1389        return answer;
1390    }
1391
1392    public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){
1393        ProducerBrokerExchange result = null;
1394
1395        for (TransportConnector connector : transportConnectors) {
1396            for (TransportConnection tc: connector.getConnections()){
1397                result = tc.getProducerBrokerExchangeIfExists(producerInfo);
1398                if (result !=null){
1399                    return result;
1400                }
1401            }
1402        }
1403        return result;
1404    }
1405
1406    public String[] getTransportConnectorURIs() {
1407        return transportConnectorURIs;
1408    }
1409
1410    public void setTransportConnectorURIs(String[] transportConnectorURIs) {
1411        this.transportConnectorURIs = transportConnectorURIs;
1412    }
1413
1414    /**
1415     * @return Returns the jmsBridgeConnectors.
1416     */
1417    public JmsConnector[] getJmsBridgeConnectors() {
1418        return jmsBridgeConnectors;
1419    }
1420
1421    /**
1422     * @param jmsConnectors
1423     *            The jmsBridgeConnectors to set.
1424     */
1425    public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
1426        this.jmsBridgeConnectors = jmsConnectors;
1427    }
1428
1429    public Service[] getServices() {
1430        return services.toArray(new Service[0]);
1431    }
1432
1433    /**
1434     * Sets the services associated with this broker.
1435     */
1436    public void setServices(Service[] services) {
1437        this.services.clear();
1438        if (services != null) {
1439            for (int i = 0; i < services.length; i++) {
1440                this.services.add(services[i]);
1441            }
1442        }
1443    }
1444
1445    /**
1446     * Adds a new service so that it will be started as part of the broker
1447     * lifecycle
1448     */
1449    public void addService(Service service) {
1450        services.add(service);
1451    }
1452
1453    public void removeService(Service service) {
1454        services.remove(service);
1455    }
1456
1457    public boolean isUseLoggingForShutdownErrors() {
1458        return useLoggingForShutdownErrors;
1459    }
1460
1461    /**
1462     * Sets whether or not we should use commons-logging when reporting errors
1463     * when shutting down the broker
1464     */
1465    public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
1466        this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
1467    }
1468
1469    public boolean isUseShutdownHook() {
1470        return useShutdownHook;
1471    }
1472
1473    /**
1474     * Sets whether or not we should use a shutdown handler to close down the
1475     * broker cleanly if the JVM is terminated. It is recommended you leave this
1476     * enabled.
1477     */
1478    public void setUseShutdownHook(boolean useShutdownHook) {
1479        this.useShutdownHook = useShutdownHook;
1480    }
1481
1482    public boolean isAdvisorySupport() {
1483        return advisorySupport;
1484    }
1485
1486    /**
1487     * Allows the support of advisory messages to be disabled for performance
1488     * reasons.
1489     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1490     */
1491    public void setAdvisorySupport(boolean advisorySupport) {
1492        this.advisorySupport = advisorySupport;
1493    }
1494
1495    public List<TransportConnector> getTransportConnectors() {
1496        return new ArrayList<TransportConnector>(transportConnectors);
1497    }
1498
1499    /**
1500     * Sets the transport connectors which this broker will listen on for new
1501     * clients
1502     *
1503     * @org.apache.xbean.Property
1504     *                            nestedType="org.apache.activemq.broker.TransportConnector"
1505     */
1506    public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
1507        for (TransportConnector connector : transportConnectors) {
1508            addConnector(connector);
1509        }
1510    }
1511
1512    public TransportConnector getTransportConnectorByName(String name){
1513        for (TransportConnector transportConnector : transportConnectors){
1514           if (name.equals(transportConnector.getName())){
1515               return transportConnector;
1516           }
1517        }
1518        return null;
1519    }
1520
1521    public TransportConnector getTransportConnectorByScheme(String scheme){
1522        for (TransportConnector transportConnector : transportConnectors){
1523            if (scheme.equals(transportConnector.getUri().getScheme())){
1524                return transportConnector;
1525            }
1526        }
1527        return null;
1528    }
1529
1530    public List<NetworkConnector> getNetworkConnectors() {
1531        return new ArrayList<NetworkConnector>(networkConnectors);
1532    }
1533
1534    public List<ProxyConnector> getProxyConnectors() {
1535        return new ArrayList<ProxyConnector>(proxyConnectors);
1536    }
1537
1538    /**
1539     * Sets the network connectors which this broker will use to connect to
1540     * other brokers in a federated network
1541     *
1542     * @org.apache.xbean.Property
1543     *                            nestedType="org.apache.activemq.network.NetworkConnector"
1544     */
1545    public void setNetworkConnectors(List<?> networkConnectors) throws Exception {
1546        for (Object connector : networkConnectors) {
1547            addNetworkConnector((NetworkConnector) connector);
1548        }
1549    }
1550
1551    /**
1552     * Sets the network connectors which this broker will use to connect to
1553     * other brokers in a federated network
1554     */
1555    public void setProxyConnectors(List<?> proxyConnectors) throws Exception {
1556        for (Object connector : proxyConnectors) {
1557            addProxyConnector((ProxyConnector) connector);
1558        }
1559    }
1560
1561    public PolicyMap getDestinationPolicy() {
1562        return destinationPolicy;
1563    }
1564
1565    /**
1566     * Sets the destination specific policies available either for exact
1567     * destinations or for wildcard areas of destinations.
1568     */
1569    public void setDestinationPolicy(PolicyMap policyMap) {
1570        this.destinationPolicy = policyMap;
1571    }
1572
1573    public BrokerPlugin[] getPlugins() {
1574        return plugins;
1575    }
1576
1577    /**
1578     * Sets a number of broker plugins to install such as for security
1579     * authentication or authorization
1580     */
1581    public void setPlugins(BrokerPlugin[] plugins) {
1582        this.plugins = plugins;
1583    }
1584
1585    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1586        return messageAuthorizationPolicy;
1587    }
1588
1589    /**
1590     * Sets the policy used to decide if the current connection is authorized to
1591     * consume a given message
1592     */
1593    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1594        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1595    }
1596
1597    /**
1598     * Delete all messages from the persistent store
1599     *
1600     * @throws IOException
1601     */
1602    public void deleteAllMessages() throws IOException {
1603        getPersistenceAdapter().deleteAllMessages();
1604    }
1605
1606    public boolean isDeleteAllMessagesOnStartup() {
1607        return deleteAllMessagesOnStartup;
1608    }
1609
1610    /**
1611     * Sets whether or not all messages are deleted on startup - mostly only
1612     * useful for testing.
1613     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1614     */
1615    public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
1616        this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
1617    }
1618
1619    public URI getVmConnectorURI() {
1620        if (vmConnectorURI == null) {
1621            try {
1622                vmConnectorURI = new URI("vm://" + getBrokerName());
1623            } catch (URISyntaxException e) {
1624                LOG.error("Badly formed URI from {}", getBrokerName(), e);
1625            }
1626        }
1627        return vmConnectorURI;
1628    }
1629
1630    public void setVmConnectorURI(URI vmConnectorURI) {
1631        this.vmConnectorURI = vmConnectorURI;
1632    }
1633
1634    public String getDefaultSocketURIString() {
1635        if (started.get()) {
1636            if (this.defaultSocketURIString == null) {
1637                for (TransportConnector tc:this.transportConnectors) {
1638                    String result = null;
1639                    try {
1640                        result = tc.getPublishableConnectString();
1641                    } catch (Exception e) {
1642                      LOG.warn("Failed to get the ConnectURI for {}", tc, e);
1643                    }
1644                    if (result != null) {
1645                        // find first publishable uri
1646                        if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
1647                            this.defaultSocketURIString = result;
1648                            break;
1649                        } else {
1650                        // or use the first defined
1651                            if (this.defaultSocketURIString == null) {
1652                                this.defaultSocketURIString = result;
1653                            }
1654                        }
1655                    }
1656                }
1657
1658            }
1659            return this.defaultSocketURIString;
1660        }
1661       return null;
1662    }
1663
1664    /**
1665     * @return Returns the shutdownOnMasterFailure.
1666     */
1667    public boolean isShutdownOnMasterFailure() {
1668        return shutdownOnMasterFailure;
1669    }
1670
1671    /**
1672     * @param shutdownOnMasterFailure
1673     *            The shutdownOnMasterFailure to set.
1674     */
1675    public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
1676        this.shutdownOnMasterFailure = shutdownOnMasterFailure;
1677    }
1678
1679    public boolean isKeepDurableSubsActive() {
1680        return keepDurableSubsActive;
1681    }
1682
1683    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
1684        this.keepDurableSubsActive = keepDurableSubsActive;
1685    }
1686    
1687    public boolean isEnableMessageExpirationOnActiveDurableSubs() {
1688        return enableMessageExpirationOnActiveDurableSubs;
1689    }
1690    
1691    public void setEnableMessageExpirationOnActiveDurableSubs(boolean enableMessageExpirationOnActiveDurableSubs) {
1692        this.enableMessageExpirationOnActiveDurableSubs = enableMessageExpirationOnActiveDurableSubs;
1693    }
1694
1695    public boolean isUseVirtualTopics() {
1696        return useVirtualTopics;
1697    }
1698
1699    /**
1700     * Sets whether or not <a
1701     * href="http://activemq.apache.org/virtual-destinations.html">Virtual
1702     * Topics</a> should be supported by default if they have not been
1703     * explicitly configured.
1704     */
1705    public void setUseVirtualTopics(boolean useVirtualTopics) {
1706        this.useVirtualTopics = useVirtualTopics;
1707    }
1708
1709    public DestinationInterceptor[] getDestinationInterceptors() {
1710        return destinationInterceptors;
1711    }
1712
1713    public boolean isUseMirroredQueues() {
1714        return useMirroredQueues;
1715    }
1716
1717    /**
1718     * Sets whether or not <a
1719     * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
1720     * Queues</a> should be supported by default if they have not been
1721     * explicitly configured.
1722     */
1723    public void setUseMirroredQueues(boolean useMirroredQueues) {
1724        this.useMirroredQueues = useMirroredQueues;
1725    }
1726
1727    /**
1728     * Sets the destination interceptors to use
1729     */
1730    public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
1731        this.destinationInterceptors = destinationInterceptors;
1732    }
1733
1734    public ActiveMQDestination[] getDestinations() {
1735        return destinations;
1736    }
1737
1738    /**
1739     * Sets the destinations which should be loaded/created on startup
1740     */
1741    public void setDestinations(ActiveMQDestination[] destinations) {
1742        this.destinations = destinations;
1743    }
1744
1745    /**
1746     * @return the tempDataStore
1747     */
1748    public synchronized PListStore getTempDataStore() {
1749        if (tempDataStore == null && !hasStartException()) {
1750            if (!isPersistent()) {
1751                return null;
1752            }
1753
1754            try {
1755                PersistenceAdapter pa = getPersistenceAdapter();
1756                if( pa!=null && pa instanceof PListStore) {
1757                    return (PListStore) pa;
1758                }
1759            } catch (IOException e) {
1760                throw new RuntimeException(e);
1761            }
1762
1763            try {
1764                String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl";
1765                this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance();
1766                this.tempDataStore.setDirectory(getTmpDataDirectory());
1767                configureService(tempDataStore);
1768            } catch (Exception e) {
1769                throw new RuntimeException(e);
1770            }
1771        }
1772        return tempDataStore;
1773    }
1774
1775    /**
1776     * @param tempDataStore
1777     *            the tempDataStore to set
1778     */
1779    public void setTempDataStore(PListStore tempDataStore) {
1780        this.tempDataStore = tempDataStore;
1781        if (tempDataStore != null) {
1782            if (tmpDataDirectory == null) {
1783                tmpDataDirectory = tempDataStore.getDirectory();
1784            } else if (tempDataStore.getDirectory() == null) {
1785                tempDataStore.setDirectory(tmpDataDirectory);
1786            }
1787        }
1788        configureService(tempDataStore);
1789    }
1790
1791    public int getPersistenceThreadPriority() {
1792        return persistenceThreadPriority;
1793    }
1794
1795    public void setPersistenceThreadPriority(int persistenceThreadPriority) {
1796        this.persistenceThreadPriority = persistenceThreadPriority;
1797    }
1798
1799    /**
1800     * @return the useLocalHostBrokerName
1801     */
1802    public boolean isUseLocalHostBrokerName() {
1803        return this.useLocalHostBrokerName;
1804    }
1805
1806    /**
1807     * @param useLocalHostBrokerName
1808     *            the useLocalHostBrokerName to set
1809     */
1810    public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
1811        this.useLocalHostBrokerName = useLocalHostBrokerName;
1812        if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
1813            brokerName = LOCAL_HOST_NAME;
1814        }
1815    }
1816
1817    /**
1818     * Looks up and lazily creates if necessary the destination for the given
1819     * JMS name
1820     */
1821    public Destination getDestination(ActiveMQDestination destination) throws Exception {
1822        return getBroker().addDestination(getAdminConnectionContext(), destination,false);
1823    }
1824
1825    public void removeDestination(ActiveMQDestination destination) throws Exception {
1826        getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
1827    }
1828
1829    public int getProducerSystemUsagePortion() {
1830        return producerSystemUsagePortion;
1831    }
1832
1833    public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
1834        this.producerSystemUsagePortion = producerSystemUsagePortion;
1835    }
1836
1837    public int getConsumerSystemUsagePortion() {
1838        return consumerSystemUsagePortion;
1839    }
1840
1841    public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
1842        this.consumerSystemUsagePortion = consumerSystemUsagePortion;
1843    }
1844
1845    public boolean isSplitSystemUsageForProducersConsumers() {
1846        return splitSystemUsageForProducersConsumers;
1847    }
1848
1849    public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
1850        this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
1851    }
1852
1853    public boolean isMonitorConnectionSplits() {
1854        return monitorConnectionSplits;
1855    }
1856
1857    public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
1858        this.monitorConnectionSplits = monitorConnectionSplits;
1859    }
1860
1861    public int getTaskRunnerPriority() {
1862        return taskRunnerPriority;
1863    }
1864
1865    public void setTaskRunnerPriority(int taskRunnerPriority) {
1866        this.taskRunnerPriority = taskRunnerPriority;
1867    }
1868
1869    public boolean isDedicatedTaskRunner() {
1870        return dedicatedTaskRunner;
1871    }
1872
1873    public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
1874        this.dedicatedTaskRunner = dedicatedTaskRunner;
1875    }
1876
1877    public boolean isCacheTempDestinations() {
1878        return cacheTempDestinations;
1879    }
1880
1881    public void setCacheTempDestinations(boolean cacheTempDestinations) {
1882        this.cacheTempDestinations = cacheTempDestinations;
1883    }
1884
1885    public int getTimeBeforePurgeTempDestinations() {
1886        return timeBeforePurgeTempDestinations;
1887    }
1888
1889    public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
1890        this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
1891    }
1892
1893    public boolean isUseTempMirroredQueues() {
1894        return useTempMirroredQueues;
1895    }
1896
1897    public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
1898        this.useTempMirroredQueues = useTempMirroredQueues;
1899    }
1900
1901    public synchronized JobSchedulerStore getJobSchedulerStore() {
1902
1903        // If support is off don't allow any scheduler even is user configured their own.
1904        if (!isSchedulerSupport()) {
1905            return null;
1906        }
1907
1908        // If the user configured their own we use it even if persistence is disabled since
1909        // we don't know anything about their implementation.
1910        if (jobSchedulerStore == null && !hasStartException()) {
1911
1912            if (!isPersistent()) {
1913                this.jobSchedulerStore = new InMemoryJobSchedulerStore();
1914                configureService(jobSchedulerStore);
1915                return this.jobSchedulerStore;
1916            }
1917
1918            try {
1919                PersistenceAdapter pa = getPersistenceAdapter();
1920                if (pa != null) {
1921                    this.jobSchedulerStore = pa.createJobSchedulerStore();
1922                    jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
1923                    configureService(jobSchedulerStore);
1924                    return this.jobSchedulerStore;
1925                }
1926            } catch (IOException e) {
1927                throw new RuntimeException(e);
1928            } catch (UnsupportedOperationException ex) {
1929                // It's ok if the store doesn't implement a scheduler.
1930            } catch (Exception e) {
1931                throw new RuntimeException(e);
1932            }
1933
1934            try {
1935                PersistenceAdapter pa = getPersistenceAdapter();
1936                if (pa != null && pa instanceof JobSchedulerStore) {
1937                    this.jobSchedulerStore = (JobSchedulerStore) pa;
1938                    configureService(jobSchedulerStore);
1939                    return this.jobSchedulerStore;
1940                }
1941            } catch (IOException e) {
1942                throw new RuntimeException(e);
1943            }
1944
1945            // Load the KahaDB store as a last resort, this only works if KahaDB is
1946            // included at runtime, otherwise this will fail.  User should disable
1947            // scheduler support if this fails.
1948            try {
1949                String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
1950                PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
1951                jobSchedulerStore = adaptor.createJobSchedulerStore();
1952                jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
1953                configureService(jobSchedulerStore);
1954                LOG.info("JobScheduler using directory: {}", getSchedulerDirectoryFile());
1955            } catch (Exception e) {
1956                throw new RuntimeException(e);
1957            }
1958        }
1959        return jobSchedulerStore;
1960    }
1961
1962    public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) {
1963        this.jobSchedulerStore = jobSchedulerStore;
1964        configureService(jobSchedulerStore);
1965    }
1966
1967    //
1968    // Implementation methods
1969    // -------------------------------------------------------------------------
1970    /**
1971     * Handles any lazy-creation helper properties which are added to make
1972     * things easier to configure inside environments such as Spring
1973     *
1974     * @throws Exception
1975     */
1976    protected void processHelperProperties() throws Exception {
1977        if (transportConnectorURIs != null) {
1978            for (int i = 0; i < transportConnectorURIs.length; i++) {
1979                String uri = transportConnectorURIs[i];
1980                addConnector(uri);
1981            }
1982        }
1983        if (networkConnectorURIs != null) {
1984            for (int i = 0; i < networkConnectorURIs.length; i++) {
1985                String uri = networkConnectorURIs[i];
1986                addNetworkConnector(uri);
1987            }
1988        }
1989        if (jmsBridgeConnectors != null) {
1990            for (int i = 0; i < jmsBridgeConnectors.length; i++) {
1991                addJmsConnector(jmsBridgeConnectors[i]);
1992            }
1993        }
1994    }
1995
1996    /**
1997     * Check that the store usage limit is not greater than max usable
1998     * space and adjust if it is
1999     */
2000    protected void checkStoreUsageLimits() throws Exception {
2001        final SystemUsage usage = getSystemUsage();
2002
2003        if (getPersistenceAdapter() != null) {
2004            PersistenceAdapter adapter = getPersistenceAdapter();
2005            checkUsageLimit(adapter.getDirectory(), usage.getStoreUsage(), usage.getStoreUsage().getPercentLimit());
2006
2007            long maxJournalFileSize = 0;
2008            long storeLimit = usage.getStoreUsage().getLimit();
2009
2010            if (adapter instanceof JournaledStore) {
2011                maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength();
2012            }
2013
2014            if (storeLimit < maxJournalFileSize) {
2015                LOG.error("Store limit is " + storeLimit / (1024 * 1024) +
2016                          " mb, whilst the max journal file size for the store is: " +
2017                          maxJournalFileSize / (1024 * 1024) + " mb, " +
2018                          "the store will not accept any data when used.");
2019
2020            }
2021        }
2022    }
2023
2024    /**
2025     * Check that temporary usage limit is not greater than max usable
2026     * space and adjust if it is
2027     */
2028    protected void checkTmpStoreUsageLimits() throws Exception {
2029        final SystemUsage usage = getSystemUsage();
2030
2031        File tmpDir = getTmpDataDirectory();
2032
2033        if (tmpDir != null) {
2034            checkUsageLimit(tmpDir, usage.getTempUsage(), usage.getTempUsage().getPercentLimit());
2035
2036            if (isPersistent()) {
2037                long maxJournalFileSize;
2038
2039                PListStore store = usage.getTempUsage().getStore();
2040                if (store != null && store instanceof JournaledStore) {
2041                    maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength();
2042                } else {
2043                    maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH;
2044                }
2045                long storeLimit = usage.getTempUsage().getLimit();
2046
2047                if (storeLimit < maxJournalFileSize) {
2048                    LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
2049                              " mb, whilst the max journal file size for the temporary store is: " +
2050                              maxJournalFileSize / (1024 * 1024) + " mb, " +
2051                              "the temp store will not accept any data when used.");
2052                }
2053            }
2054        }
2055    }
2056
2057    protected void checkUsageLimit(File dir, PercentLimitUsage<?> storeUsage, int percentLimit) throws ConfigurationException {
2058        if (dir != null) {
2059            dir = StoreUtil.findParentDirectory(dir);
2060            String storeName = storeUsage instanceof StoreUsage ? "Store" : "Temporary Store";
2061            long storeLimit = storeUsage.getLimit();
2062            long storeCurrent = storeUsage.getUsage();
2063            long totalSpace = storeUsage.getTotal() > 0 ? storeUsage.getTotal() : dir.getTotalSpace();
2064            long totalUsableSpace = (storeUsage.getTotal() > 0 ? storeUsage.getTotal() : dir.getUsableSpace()) + storeCurrent;
2065            if (totalUsableSpace < 0 || totalSpace < 0) {
2066                final String message = "File system space reported by: " + dir + " was negative, possibly a huge file system, set a sane usage.total to provide some guidance";
2067                LOG.error(message);
2068                throw new ConfigurationException(message);
2069            }
2070            //compute byte value of the percent limit
2071            long bytePercentLimit = totalSpace * percentLimit / 100;
2072            int oneMeg = 1024 * 1024;
2073
2074            //Check if the store limit is less than the percent Limit that was set and also
2075            //the usable space...this means we can grow the store larger
2076            //Changes in partition size (total space) as well as changes in usable space should
2077            //be detected here
2078            if (diskUsageCheckRegrowThreshold > -1 && percentLimit > 0
2079                    && storeUsage.getTotal() == 0
2080                    && storeLimit < bytePercentLimit && storeLimit < totalUsableSpace){
2081
2082                // set the limit to be bytePercentLimit or usableSpace if
2083                // usableSpace is less than the percentLimit
2084                long newLimit = bytePercentLimit > totalUsableSpace ? totalUsableSpace : bytePercentLimit;
2085
2086                //To prevent changing too often, check threshold
2087                if (newLimit - storeLimit >= diskUsageCheckRegrowThreshold) {
2088                    LOG.info("Usable disk space has been increased, attempting to regrow " + storeName + " limit to "
2089                            + percentLimit + "% of the partition size.");
2090                    storeUsage.setLimit(newLimit);
2091                    LOG.info(storeName + " limit has been increased to " + newLimit * 100 / totalSpace
2092                            + "% (" + newLimit / oneMeg + " mb) of the partition size.");
2093                }
2094
2095            //check if the limit is too large for the amount of usable space
2096            } else if (storeLimit > totalUsableSpace) {
2097                final String message = storeName + " limit is " +  storeLimit / oneMeg
2098                        + " mb (current store usage is " + storeCurrent / oneMeg
2099                        + " mb). The data directory: " + dir.getAbsolutePath()
2100                        + " only has " + totalUsableSpace / oneMeg
2101                        + " mb of usable space.";
2102
2103                if (!isAdjustUsageLimits()) {
2104                    LOG.error(message);
2105                    throw new ConfigurationException(message);
2106                }
2107
2108                if (percentLimit > 0) {
2109                    LOG.warn(storeName + " limit has been set to "
2110                            + percentLimit + "% (" + bytePercentLimit / oneMeg + " mb)"
2111                            + " of the partition size but there is not enough usable space."
2112                            + " The current store limit (which may have been adjusted by a"
2113                            + " previous usage limit check) is set to (" + storeLimit / oneMeg + " mb)"
2114                            + " but only " + totalUsableSpace * 100 / totalSpace + "% (" + totalUsableSpace / oneMeg + " mb)"
2115                            + " is available - resetting limit");
2116                } else {
2117                    LOG.warn(message + " - resetting to maximum available disk space: " +
2118                         totalUsableSpace / oneMeg + " mb");
2119                }
2120                storeUsage.setLimit(totalUsableSpace);
2121            }
2122        }
2123    }
2124
2125    /**
2126     * Schedules a periodic task based on schedulePeriodForDiskLimitCheck to
2127     * update store and temporary store limits if the amount of available space
2128     * plus current store size is less than the existin configured limit
2129     */
2130    protected void scheduleDiskUsageLimitsCheck() throws IOException {
2131        if (schedulePeriodForDiskUsageCheck > 0 &&
2132                (getPersistenceAdapter() != null || getTmpDataDirectory() != null)) {
2133            Runnable diskLimitCheckTask = new Runnable() {
2134                @Override
2135                public void run() {
2136                    try {
2137                        checkStoreUsageLimits();
2138                    } catch (Throwable e) {
2139                        LOG.error("Failed to check persistent disk usage limits", e);
2140                    }
2141
2142                    try {
2143                        checkTmpStoreUsageLimits();
2144                    } catch (Throwable e) {
2145                        LOG.error("Failed to check temporary store usage limits", e);
2146                    }
2147                }
2148            };
2149            scheduler.executePeriodically(diskLimitCheckTask, schedulePeriodForDiskUsageCheck);
2150        }
2151    }
2152
2153    protected void checkMemorySystemUsageLimits() throws Exception {
2154        final SystemUsage usage = getSystemUsage();
2155        long memLimit = usage.getMemoryUsage().getLimit();
2156        long jvmLimit = Runtime.getRuntime().maxMemory();
2157
2158        if (memLimit > jvmLimit) {
2159            final String message = "Memory Usage for the Broker (" + memLimit / (1024 * 1024)
2160                    + "mb) is more than the maximum available for the JVM: " + jvmLimit / (1024 * 1024);
2161
2162            if (adjustUsageLimits) {
2163                usage.getMemoryUsage().setPercentOfJvmHeap(70);
2164                LOG.warn(message + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb");
2165            } else {
2166                LOG.error(message);
2167                throw new ConfigurationException(message);
2168            }
2169        }
2170    }
2171
2172    protected void checkStoreSystemUsageLimits() throws Exception {
2173        final SystemUsage usage = getSystemUsage();
2174
2175        //Check the persistent store and temp store limits if they exist
2176        //and schedule a periodic check to update disk limits if
2177        //schedulePeriodForDiskLimitCheck is set
2178        checkStoreUsageLimits();
2179        checkTmpStoreUsageLimits();
2180        scheduleDiskUsageLimitsCheck();
2181
2182        if (getJobSchedulerStore() != null) {
2183            JobSchedulerStore scheduler = getJobSchedulerStore();
2184            File schedulerDir = scheduler.getDirectory();
2185            if (schedulerDir != null) {
2186
2187                String schedulerDirPath = schedulerDir.getAbsolutePath();
2188                if (!schedulerDir.isAbsolute()) {
2189                    schedulerDir = new File(schedulerDirPath);
2190                }
2191
2192                while (schedulerDir != null && !schedulerDir.isDirectory()) {
2193                    schedulerDir = schedulerDir.getParentFile();
2194                }
2195                long schedulerLimit = usage.getJobSchedulerUsage().getLimit();
2196                long dirFreeSpace = schedulerDir.getUsableSpace();
2197                if (schedulerLimit > dirFreeSpace) {
2198                    LOG.warn("Job Scheduler Store limit is " + schedulerLimit / (1024 * 1024) +
2199                             " mb, whilst the data directory: " + schedulerDir.getAbsolutePath() +
2200                             " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to " +
2201                            dirFreeSpace / (1024 * 1024) + " mb.");
2202                    usage.getJobSchedulerUsage().setLimit(dirFreeSpace);
2203                }
2204            }
2205        }
2206    }
2207
2208    public void stopAllConnectors(ServiceStopper stopper) {
2209        for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2210            NetworkConnector connector = iter.next();
2211            unregisterNetworkConnectorMBean(connector);
2212            stopper.stop(connector);
2213        }
2214        for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2215            ProxyConnector connector = iter.next();
2216            stopper.stop(connector);
2217        }
2218        for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2219            JmsConnector connector = iter.next();
2220            stopper.stop(connector);
2221        }
2222        for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2223            TransportConnector connector = iter.next();
2224            try {
2225                unregisterConnectorMBean(connector);
2226            } catch (IOException e) {
2227            }
2228            stopper.stop(connector);
2229        }
2230    }
2231
2232    protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
2233        try {
2234            ObjectName objectName = createConnectorObjectName(connector);
2235            connector = connector.asManagedConnector(getManagementContext(), objectName);
2236            ConnectorViewMBean view = new ConnectorView(connector);
2237            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2238            return connector;
2239        } catch (Throwable e) {
2240            throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e, e);
2241        }
2242    }
2243
2244    protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
2245        if (isUseJmx()) {
2246            try {
2247                ObjectName objectName = createConnectorObjectName(connector);
2248                getManagementContext().unregisterMBean(objectName);
2249            } catch (Throwable e) {
2250                throw IOExceptionSupport.create(
2251                        "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e);
2252            }
2253        }
2254    }
2255
2256    protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
2257        return adaptor;
2258    }
2259
2260    protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
2261        if (isUseJmx()) {}
2262    }
2263
2264    private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
2265        return BrokerMBeanSupport.createConnectorName(getBrokerObjectName(), "clientConnectors", connector.getName());
2266    }
2267
2268    public void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
2269        NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
2270        try {
2271            ObjectName objectName = createNetworkConnectorObjectName(connector);
2272            connector.setObjectName(objectName);
2273            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2274        } catch (Throwable e) {
2275            throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
2276        }
2277    }
2278
2279    protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
2280        return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName());
2281    }
2282
2283    public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException {
2284        return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "duplexNetworkConnectors", transport);
2285    }
2286
2287    protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
2288        if (isUseJmx()) {
2289            try {
2290                ObjectName objectName = createNetworkConnectorObjectName(connector);
2291                getManagementContext().unregisterMBean(objectName);
2292            } catch (Exception e) {
2293                LOG.warn("Network Connector could not be unregistered from JMX due " + e.getMessage() + ". This exception is ignored.", e);
2294            }
2295        }
2296    }
2297
2298    protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
2299        ProxyConnectorView view = new ProxyConnectorView(connector);
2300        try {
2301            ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "proxyConnectors", connector.getName());
2302            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2303        } catch (Throwable e) {
2304            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
2305        }
2306    }
2307
2308    protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
2309        JmsConnectorView view = new JmsConnectorView(connector);
2310        try {
2311            ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "jmsConnectors", connector.getName());
2312            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2313        } catch (Throwable e) {
2314            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
2315        }
2316    }
2317
2318    /**
2319     * Factory method to create a new broker
2320     *
2321     * @throws Exception
2322     */
2323    protected Broker createBroker() throws Exception {
2324        regionBroker = createRegionBroker();
2325        Broker broker = addInterceptors(regionBroker);
2326        // Add a filter that will stop access to the broker once stopped
2327        broker = new MutableBrokerFilter(broker) {
2328            Broker old;
2329
2330            @Override
2331            public void stop() throws Exception {
2332                old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
2333                    // Just ignore additional stop actions.
2334                    @Override
2335                    public void stop() throws Exception {
2336                    }
2337                });
2338                old.stop();
2339            }
2340
2341            @Override
2342            public void start() throws Exception {
2343                if (forceStart && old != null) {
2344                    this.next.set(old);
2345                }
2346                getNext().start();
2347            }
2348        };
2349        return broker;
2350    }
2351
2352    /**
2353     * Factory method to create the core region broker onto which interceptors
2354     * are added
2355     *
2356     * @throws Exception
2357     */
2358    protected Broker createRegionBroker() throws Exception {
2359        if (destinationInterceptors == null) {
2360            destinationInterceptors = createDefaultDestinationInterceptor();
2361        }
2362        configureServices(destinationInterceptors);
2363        DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
2364        if (destinationFactory == null) {
2365            destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
2366        }
2367        return createRegionBroker(destinationInterceptor);
2368    }
2369
2370    protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
2371        RegionBroker regionBroker;
2372        if (isUseJmx()) {
2373            try {
2374                regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
2375                    getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
2376            } catch(MalformedObjectNameException me){
2377                LOG.warn("Cannot create ManagedRegionBroker due " + me.getMessage(), me);
2378                throw new IOException(me);
2379            }
2380        } else {
2381            regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
2382                    destinationInterceptor,getScheduler(),getExecutor());
2383        }
2384        destinationFactory.setRegionBroker(regionBroker);
2385        regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
2386        regionBroker.setBrokerName(getBrokerName());
2387        regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
2388        regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend());
2389        if (brokerId != null) {
2390            regionBroker.setBrokerId(brokerId);
2391        }
2392        return regionBroker;
2393    }
2394
2395    /**
2396     * Create the default destination interceptor
2397     */
2398    protected DestinationInterceptor[] createDefaultDestinationInterceptor() {
2399        List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>();
2400        if (isUseVirtualTopics()) {
2401            VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
2402            VirtualTopic virtualTopic = new VirtualTopic();
2403            virtualTopic.setName("VirtualTopic.>");
2404            VirtualDestination[] virtualDestinations = { virtualTopic };
2405            interceptor.setVirtualDestinations(virtualDestinations);
2406            answer.add(interceptor);
2407        }
2408        if (isUseMirroredQueues()) {
2409            MirroredQueue interceptor = new MirroredQueue();
2410            answer.add(interceptor);
2411        }
2412        DestinationInterceptor[] array = new DestinationInterceptor[answer.size()];
2413        answer.toArray(array);
2414        return array;
2415    }
2416
2417    /**
2418     * Strategy method to add interceptors to the broker
2419     *
2420     * @throws IOException
2421     */
2422    protected Broker addInterceptors(Broker broker) throws Exception {
2423        if (isSchedulerSupport()) {
2424            SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore());
2425            if (isUseJmx()) {
2426                JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
2427                try {
2428                    ObjectName objectName = BrokerMBeanSupport.createJobSchedulerServiceName(getBrokerObjectName());
2429                    AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2430                    this.adminView.setJMSJobScheduler(objectName);
2431                } catch (Throwable e) {
2432                    throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: "
2433                            + e.getMessage(), e);
2434                }
2435            }
2436            broker = sb;
2437        }
2438        if (isUseJmx()) {
2439            HealthViewMBean statusView = new HealthView((ManagedRegionBroker)getRegionBroker());
2440            try {
2441                ObjectName objectName = BrokerMBeanSupport.createHealthServiceName(getBrokerObjectName());
2442                AnnotatedMBean.registerMBean(getManagementContext(), statusView, objectName);
2443            } catch (Throwable e) {
2444                throw IOExceptionSupport.create("Status MBean could not be registered in JMX: "
2445                        + e.getMessage(), e);
2446            }
2447        }
2448        if (isAdvisorySupport()) {
2449            broker = new AdvisoryBroker(broker);
2450        }
2451        broker = new CompositeDestinationBroker(broker);
2452        broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
2453        if (isPopulateJMSXUserID()) {
2454            UserIDBroker userIDBroker = new UserIDBroker(broker);
2455            userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID());
2456            broker = userIDBroker;
2457        }
2458        if (isMonitorConnectionSplits()) {
2459            broker = new ConnectionSplitBroker(broker);
2460        }
2461        if (plugins != null) {
2462            for (int i = 0; i < plugins.length; i++) {
2463                BrokerPlugin plugin = plugins[i];
2464                broker = plugin.installPlugin(broker);
2465            }
2466        }
2467        return broker;
2468    }
2469
2470    protected PersistenceAdapter createPersistenceAdapter() throws IOException {
2471        if (isPersistent()) {
2472            PersistenceAdapterFactory fac = getPersistenceFactory();
2473            if (fac != null) {
2474                return fac.createPersistenceAdapter();
2475            } else {
2476                try {
2477                    String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
2478                    PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
2479                    File dir = new File(getBrokerDataDirectory(),"KahaDB");
2480                    adaptor.setDirectory(dir);
2481                    return adaptor;
2482                } catch (Throwable e) {
2483                    throw IOExceptionSupport.create(e);
2484                }
2485            }
2486        } else {
2487            return new MemoryPersistenceAdapter();
2488        }
2489    }
2490
2491    protected ObjectName createBrokerObjectName() throws MalformedObjectNameException  {
2492        return BrokerMBeanSupport.createBrokerObjectName(getManagementContext().getJmxDomainName(), getBrokerName());
2493    }
2494
2495    protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
2496        TransportServer transport = TransportFactorySupport.bind(this, brokerURI);
2497        return new TransportConnector(transport);
2498    }
2499
2500    /**
2501     * Extracts the port from the options
2502     */
2503    protected Object getPort(Map<?,?> options) {
2504        Object port = options.get("port");
2505        if (port == null) {
2506            port = DEFAULT_PORT;
2507            LOG.warn("No port specified so defaulting to: {}", port);
2508        }
2509        return port;
2510    }
2511
2512    protected void addShutdownHook() {
2513        if (useShutdownHook) {
2514            shutdownHook = new Thread("ActiveMQ ShutdownHook") {
2515                @Override
2516                public void run() {
2517                    containerShutdown();
2518                }
2519            };
2520            Runtime.getRuntime().addShutdownHook(shutdownHook);
2521        }
2522    }
2523
2524    protected void removeShutdownHook() {
2525        if (shutdownHook != null) {
2526            try {
2527                Runtime.getRuntime().removeShutdownHook(shutdownHook);
2528            } catch (Exception e) {
2529                LOG.debug("Caught exception, must be shutting down. This exception is ignored.", e);
2530            }
2531        }
2532    }
2533
2534    /**
2535     * Sets hooks to be executed when broker shut down
2536     *
2537     * @org.apache.xbean.Property
2538     */
2539    public void setShutdownHooks(List<Runnable> hooks) throws Exception {
2540        for (Runnable hook : hooks) {
2541            addShutdownHook(hook);
2542        }
2543    }
2544
2545    /**
2546     * Causes a clean shutdown of the container when the VM is being shut down
2547     */
2548    protected void containerShutdown() {
2549        try {
2550            stop();
2551        } catch (IOException e) {
2552            Throwable linkedException = e.getCause();
2553            if (linkedException != null) {
2554                logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
2555            } else {
2556                logError("Failed to shut down: " + e, e);
2557            }
2558            if (!useLoggingForShutdownErrors) {
2559                e.printStackTrace(System.err);
2560            }
2561        } catch (Exception e) {
2562            logError("Failed to shut down: " + e, e);
2563        }
2564    }
2565
2566    protected void logError(String message, Throwable e) {
2567        if (useLoggingForShutdownErrors) {
2568            LOG.error("Failed to shut down: " + e);
2569        } else {
2570            System.err.println("Failed to shut down: " + e);
2571        }
2572    }
2573
2574    /**
2575     * Starts any configured destinations on startup
2576     */
2577    protected void startDestinations() throws Exception {
2578        if (destinations != null) {
2579            ConnectionContext adminConnectionContext = getAdminConnectionContext();
2580            for (int i = 0; i < destinations.length; i++) {
2581                ActiveMQDestination destination = destinations[i];
2582                getBroker().addDestination(adminConnectionContext, destination,true);
2583            }
2584        }
2585        if (isUseVirtualTopics()) {
2586            startVirtualConsumerDestinations();
2587        }
2588    }
2589
2590    /**
2591     * Returns the broker's administration connection context used for
2592     * configuring the broker at startup
2593     */
2594    public ConnectionContext getAdminConnectionContext() throws Exception {
2595        return BrokerSupport.getConnectionContext(getBroker());
2596    }
2597
2598    protected void startManagementContext() throws Exception {
2599        getManagementContext().setBrokerName(brokerName);
2600        getManagementContext().start();
2601        adminView = new BrokerView(this, null);
2602        ObjectName objectName = getBrokerObjectName();
2603        AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName);
2604    }
2605
2606    /**
2607     * Start all transport and network connections, proxies and bridges
2608     *
2609     * @throws Exception
2610     */
2611    public void startAllConnectors() throws Exception {
2612        Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
2613        List<TransportConnector> al = new ArrayList<TransportConnector>();
2614        for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2615            TransportConnector connector = iter.next();
2616            al.add(startTransportConnector(connector));
2617        }
2618        if (al.size() > 0) {
2619            // let's clear the transportConnectors list and replace it with
2620            // the started transportConnector instances
2621            this.transportConnectors.clear();
2622            setTransportConnectors(al);
2623        }
2624        this.slave = false;
2625        if (!stopped.get()) {
2626            ThreadPoolExecutor networkConnectorStartExecutor = null;
2627            if (isNetworkConnectorStartAsync()) {
2628                // spin up as many threads as needed
2629                networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
2630                    10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
2631                    new ThreadFactory() {
2632                        int count=0;
2633                        @Override
2634                        public Thread newThread(Runnable runnable) {
2635                            Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
2636                            thread.setDaemon(true);
2637                            return thread;
2638                        }
2639                    });
2640            }
2641
2642            for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2643                final NetworkConnector connector = iter.next();
2644                connector.setLocalUri(getVmConnectorURI());
2645                connector.setBrokerName(getBrokerName());
2646                connector.setDurableDestinations(durableDestinations);
2647                if (getDefaultSocketURIString() != null) {
2648                    connector.setBrokerURL(getDefaultSocketURIString());
2649                }
2650                if (networkConnectorStartExecutor != null) {
2651                    networkConnectorStartExecutor.execute(new Runnable() {
2652                        @Override
2653                        public void run() {
2654                            try {
2655                                LOG.info("Async start of {}", connector);
2656                                connector.start();
2657                            } catch(Exception e) {
2658                                LOG.error("Async start of network connector: {} failed", connector, e);
2659                            }
2660                        }
2661                    });
2662                } else {
2663                    connector.start();
2664                }
2665            }
2666            if (networkConnectorStartExecutor != null) {
2667                // executor done when enqueued tasks are complete
2668                ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
2669            }
2670
2671            for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2672                ProxyConnector connector = iter.next();
2673                connector.start();
2674            }
2675            for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2676                JmsConnector connector = iter.next();
2677                connector.start();
2678            }
2679            for (Service service : services) {
2680                configureService(service);
2681                service.start();
2682            }
2683        }
2684    }
2685
2686    public TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
2687        connector.setBrokerService(this);
2688        connector.setTaskRunnerFactory(getTaskRunnerFactory());
2689        MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
2690        if (policy != null) {
2691            connector.setMessageAuthorizationPolicy(policy);
2692        }
2693        if (isUseJmx()) {
2694            connector = registerConnectorMBean(connector);
2695        }
2696        connector.getStatistics().setEnabled(enableStatistics);
2697        connector.start();
2698        return connector;
2699    }
2700
2701    /**
2702     * Perform any custom dependency injection
2703     */
2704    protected void configureServices(Object[] services) {
2705        for (Object service : services) {
2706            configureService(service);
2707        }
2708    }
2709
2710    /**
2711     * Perform any custom dependency injection
2712     */
2713    protected void configureService(Object service) {
2714        if (service instanceof BrokerServiceAware) {
2715            BrokerServiceAware serviceAware = (BrokerServiceAware) service;
2716            serviceAware.setBrokerService(this);
2717        }
2718    }
2719
2720    public void handleIOException(IOException exception) {
2721        if (ioExceptionHandler != null) {
2722            ioExceptionHandler.handle(exception);
2723         } else {
2724            LOG.info("No IOExceptionHandler registered, ignoring IO exception", exception);
2725         }
2726    }
2727
2728    protected void startVirtualConsumerDestinations() throws Exception {
2729        checkStartException();
2730        ConnectionContext adminConnectionContext = getAdminConnectionContext();
2731        Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
2732        DestinationFilter filter = getVirtualTopicConsumerDestinationFilter();
2733        if (!destinations.isEmpty()) {
2734            for (ActiveMQDestination destination : destinations) {
2735                if (filter.matches(destination) == true) {
2736                    broker.addDestination(adminConnectionContext, destination, false);
2737                }
2738            }
2739        }
2740    }
2741
2742    private DestinationFilter getVirtualTopicConsumerDestinationFilter() {
2743        // created at startup, so no sync needed
2744        if (virtualConsumerDestinationFilter == null) {
2745            Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>();
2746            if (destinationInterceptors != null) {
2747                for (DestinationInterceptor interceptor : destinationInterceptors) {
2748                    if (interceptor instanceof VirtualDestinationInterceptor) {
2749                        VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor;
2750                        for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) {
2751                            if (virtualDestination instanceof VirtualTopic) {
2752                                consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT));
2753                            }
2754                            if (isUseVirtualDestSubs()) {
2755                                try {
2756                                    broker.virtualDestinationAdded(getAdminConnectionContext(), virtualDestination);
2757                                    LOG.debug("Adding virtual destination: {}", virtualDestination);
2758                                } catch (Exception e) {
2759                                    LOG.warn("Could not fire virtual destination consumer advisory", e);
2760                                }
2761                            }
2762                        }
2763                    }
2764                }
2765            }
2766            ActiveMQQueue filter = new ActiveMQQueue();
2767            filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{}));
2768            virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter);
2769        }
2770        return virtualConsumerDestinationFilter;
2771    }
2772
2773    protected synchronized ThreadPoolExecutor getExecutor() {
2774        if (this.executor == null) {
2775            this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
2776
2777                private long i = 0;
2778
2779                @Override
2780                public Thread newThread(Runnable runnable) {
2781                    this.i++;
2782                    Thread thread = new Thread(runnable, "ActiveMQ BrokerService.worker." + this.i);
2783                    thread.setDaemon(true);
2784                    thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
2785                        @Override
2786                        public void uncaughtException(final Thread t, final Throwable e) {
2787                            LOG.error("Error in thread '{}'", t.getName(), e);
2788                        }
2789                    });
2790                    return thread;
2791                }
2792            }, new RejectedExecutionHandler() {
2793                @Override
2794                public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
2795                    try {
2796                        executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
2797                    } catch (InterruptedException e) {
2798                        throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
2799                    }
2800
2801                    throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
2802                }
2803            });
2804        }
2805        return this.executor;
2806    }
2807
2808    public synchronized Scheduler getScheduler() {
2809        if (this.scheduler==null) {
2810            this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler");
2811            try {
2812                this.scheduler.start();
2813            } catch (Exception e) {
2814               LOG.error("Failed to start Scheduler", e);
2815            }
2816        }
2817        return this.scheduler;
2818    }
2819
2820    public Broker getRegionBroker() {
2821        return regionBroker;
2822    }
2823
2824    public void setRegionBroker(Broker regionBroker) {
2825        this.regionBroker = regionBroker;
2826    }
2827
2828    public void addShutdownHook(Runnable hook) {
2829        synchronized (shutdownHooks) {
2830            shutdownHooks.add(hook);
2831        }
2832    }
2833
2834    public void removeShutdownHook(Runnable hook) {
2835        synchronized (shutdownHooks) {
2836            shutdownHooks.remove(hook);
2837        }
2838    }
2839
2840    public boolean isSystemExitOnShutdown() {
2841        return systemExitOnShutdown;
2842    }
2843
2844    /**
2845     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2846     */
2847    public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
2848        this.systemExitOnShutdown = systemExitOnShutdown;
2849    }
2850
2851    public int getSystemExitOnShutdownExitCode() {
2852        return systemExitOnShutdownExitCode;
2853    }
2854
2855    public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) {
2856        this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode;
2857    }
2858
2859    public SslContext getSslContext() {
2860        return sslContext;
2861    }
2862
2863    public void setSslContext(SslContext sslContext) {
2864        this.sslContext = sslContext;
2865    }
2866
2867    public boolean isShutdownOnSlaveFailure() {
2868        return shutdownOnSlaveFailure;
2869    }
2870
2871    /**
2872     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2873     */
2874    public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
2875        this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
2876    }
2877
2878    public boolean isWaitForSlave() {
2879        return waitForSlave;
2880    }
2881
2882    /**
2883     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2884     */
2885    public void setWaitForSlave(boolean waitForSlave) {
2886        this.waitForSlave = waitForSlave;
2887    }
2888
2889    public long getWaitForSlaveTimeout() {
2890        return this.waitForSlaveTimeout;
2891    }
2892
2893    public void setWaitForSlaveTimeout(long waitForSlaveTimeout) {
2894        this.waitForSlaveTimeout = waitForSlaveTimeout;
2895    }
2896
2897    /**
2898     * Get the passiveSlave
2899     * @return the passiveSlave
2900     */
2901    public boolean isPassiveSlave() {
2902        return this.passiveSlave;
2903    }
2904
2905    /**
2906     * Set the passiveSlave
2907     * @param passiveSlave the passiveSlave to set
2908     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2909     */
2910    public void setPassiveSlave(boolean passiveSlave) {
2911        this.passiveSlave = passiveSlave;
2912    }
2913
2914    /**
2915     * override the Default IOException handler, called when persistence adapter
2916     * has experiences File or JDBC I/O Exceptions
2917     *
2918     * @param ioExceptionHandler
2919     */
2920    public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
2921        configureService(ioExceptionHandler);
2922        this.ioExceptionHandler = ioExceptionHandler;
2923    }
2924
2925    public IOExceptionHandler getIoExceptionHandler() {
2926        return ioExceptionHandler;
2927    }
2928
2929    /**
2930     * @return the schedulerSupport
2931     */
2932    public boolean isSchedulerSupport() {
2933        return this.schedulerSupport;
2934    }
2935
2936    /**
2937     * @param schedulerSupport the schedulerSupport to set
2938     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2939     */
2940    public void setSchedulerSupport(boolean schedulerSupport) {
2941        this.schedulerSupport = schedulerSupport;
2942    }
2943
2944    /**
2945     * @return the schedulerDirectory
2946     */
2947    public File getSchedulerDirectoryFile() {
2948        if (this.schedulerDirectoryFile == null) {
2949            this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler");
2950        }
2951        return schedulerDirectoryFile;
2952    }
2953
2954    /**
2955     * @param schedulerDirectory the schedulerDirectory to set
2956     */
2957    public void setSchedulerDirectoryFile(File schedulerDirectory) {
2958        this.schedulerDirectoryFile = schedulerDirectory;
2959    }
2960
2961    public void setSchedulerDirectory(String schedulerDirectory) {
2962        setSchedulerDirectoryFile(new File(schedulerDirectory));
2963    }
2964
2965    public int getSchedulePeriodForDestinationPurge() {
2966        return this.schedulePeriodForDestinationPurge;
2967    }
2968
2969    public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) {
2970        this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge;
2971    }
2972
2973    /**
2974     * @param schedulePeriodForDiskUsageCheck
2975     */
2976    public void setSchedulePeriodForDiskUsageCheck(
2977            int schedulePeriodForDiskUsageCheck) {
2978        this.schedulePeriodForDiskUsageCheck = schedulePeriodForDiskUsageCheck;
2979    }
2980
2981    public int getDiskUsageCheckRegrowThreshold() {
2982        return diskUsageCheckRegrowThreshold;
2983    }
2984
2985    /**
2986     * @param diskUsageCheckRegrowThreshold
2987     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
2988     */
2989    public void setDiskUsageCheckRegrowThreshold(int diskUsageCheckRegrowThreshold) {
2990        this.diskUsageCheckRegrowThreshold = diskUsageCheckRegrowThreshold;
2991    }
2992
2993    public int getMaxPurgedDestinationsPerSweep() {
2994        return this.maxPurgedDestinationsPerSweep;
2995    }
2996
2997    public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) {
2998        this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep;
2999    }
3000
3001    public BrokerContext getBrokerContext() {
3002        return brokerContext;
3003    }
3004
3005    public void setBrokerContext(BrokerContext brokerContext) {
3006        this.brokerContext = brokerContext;
3007    }
3008
3009    public void setBrokerId(String brokerId) {
3010        this.brokerId = new BrokerId(brokerId);
3011    }
3012
3013    public boolean isUseAuthenticatedPrincipalForJMSXUserID() {
3014        return useAuthenticatedPrincipalForJMSXUserID;
3015    }
3016
3017    public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) {
3018        this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID;
3019    }
3020
3021    /**
3022     * Should MBeans that support showing the Authenticated User Name information have this
3023     * value filled in or not.
3024     *
3025     * @return true if user names should be exposed in MBeans
3026     */
3027    public boolean isPopulateUserNameInMBeans() {
3028        return this.populateUserNameInMBeans;
3029    }
3030
3031    /**
3032     * Sets whether Authenticated User Name information is shown in MBeans that support this field.
3033     * @param value if MBeans should expose user name information.
3034     */
3035    public void setPopulateUserNameInMBeans(boolean value) {
3036        this.populateUserNameInMBeans = value;
3037    }
3038
3039    /**
3040     * Gets the time in Milliseconds that an invocation of an MBean method will wait before
3041     * failing.  The default value is to wait forever (zero).
3042     *
3043     * @return timeout in milliseconds before MBean calls fail, (default is 0 or no timeout).
3044     */
3045    public long getMbeanInvocationTimeout() {
3046        return mbeanInvocationTimeout;
3047    }
3048
3049    /**
3050     * Gets the time in Milliseconds that an invocation of an MBean method will wait before
3051     * failing. The default value is to wait forever (zero).
3052     *
3053     * @param mbeanInvocationTimeout
3054     *      timeout in milliseconds before MBean calls fail, (default is 0 or no timeout).
3055     */
3056    public void setMbeanInvocationTimeout(long mbeanInvocationTimeout) {
3057        this.mbeanInvocationTimeout = mbeanInvocationTimeout;
3058    }
3059
3060    public boolean isNetworkConnectorStartAsync() {
3061        return networkConnectorStartAsync;
3062    }
3063
3064    public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) {
3065        this.networkConnectorStartAsync = networkConnectorStartAsync;
3066    }
3067
3068    public boolean isAllowTempAutoCreationOnSend() {
3069        return allowTempAutoCreationOnSend;
3070    }
3071
3072    /**
3073     * enable if temp destinations need to be propagated through a network when
3074     * advisorySupport==false. This is used in conjunction with the policy
3075     * gcInactiveDestinations for matching temps so they can get removed
3076     * when inactive
3077     *
3078     * @param allowTempAutoCreationOnSend
3079     */
3080    public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
3081        this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
3082    }
3083
3084    public long getOfflineDurableSubscriberTimeout() {
3085        return offlineDurableSubscriberTimeout;
3086    }
3087
3088    public void setOfflineDurableSubscriberTimeout(long offlineDurableSubscriberTimeout) {
3089        this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout;
3090    }
3091
3092    public long getOfflineDurableSubscriberTaskSchedule() {
3093        return offlineDurableSubscriberTaskSchedule;
3094    }
3095
3096    public void setOfflineDurableSubscriberTaskSchedule(long offlineDurableSubscriberTaskSchedule) {
3097        this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule;
3098    }
3099
3100    public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) {
3101        return isUseVirtualTopics() && destination.isQueue() &&
3102               getVirtualTopicConsumerDestinationFilter().matches(destination);
3103    }
3104
3105    synchronized public Throwable getStartException() {
3106        return startException;
3107    }
3108
3109    public boolean isStartAsync() {
3110        return startAsync;
3111    }
3112
3113    public void setStartAsync(boolean startAsync) {
3114        this.startAsync = startAsync;
3115    }
3116
3117    public boolean isSlave() {
3118        return this.slave;
3119    }
3120
3121    public boolean isStopping() {
3122        return this.stopping.get();
3123    }
3124
3125    /**
3126     * @return true if the broker allowed to restart on shutdown.
3127     */
3128    public boolean isRestartAllowed() {
3129        return restartAllowed;
3130    }
3131
3132    /**
3133     * Sets if the broker allowed to restart on shutdown.
3134     */
3135    public void setRestartAllowed(boolean restartAllowed) {
3136        this.restartAllowed = restartAllowed;
3137    }
3138
3139    /**
3140     * A lifecycle manager of the BrokerService should
3141     * inspect this property after a broker shutdown has occurred
3142     * to find out if the broker needs to be re-created and started
3143     * again.
3144     *
3145     * @return true if the broker wants to be restarted after it shuts down.
3146     */
3147    public boolean isRestartRequested() {
3148        return restartRequested;
3149    }
3150
3151    public void requestRestart() {
3152        this.restartRequested = true;
3153    }
3154
3155    public int getStoreOpenWireVersion() {
3156        return storeOpenWireVersion;
3157    }
3158
3159    public void setStoreOpenWireVersion(int storeOpenWireVersion) {
3160        this.storeOpenWireVersion = storeOpenWireVersion;
3161    }
3162
3163    /**
3164     * @return the current number of connections on this Broker.
3165     */
3166    public int getCurrentConnections() {
3167        return this.currentConnections.get();
3168    }
3169
3170    /**
3171     * @return the total number of connections this broker has handled since startup.
3172     */
3173    public long getTotalConnections() {
3174        return this.totalConnections.get();
3175    }
3176
3177    public void incrementCurrentConnections() {
3178        this.currentConnections.incrementAndGet();
3179    }
3180
3181    public void decrementCurrentConnections() {
3182        this.currentConnections.decrementAndGet();
3183    }
3184
3185    public void incrementTotalConnections() {
3186        this.totalConnections.incrementAndGet();
3187    }
3188
3189    public boolean isRejectDurableConsumers() {
3190        return rejectDurableConsumers;
3191    }
3192
3193    public void setRejectDurableConsumers(boolean rejectDurableConsumers) {
3194        this.rejectDurableConsumers = rejectDurableConsumers;
3195    }
3196
3197    public boolean isAdjustUsageLimits() {
3198        return adjustUsageLimits;
3199    }
3200
3201    public void setAdjustUsageLimits(boolean adjustUsageLimits) {
3202        this.adjustUsageLimits = adjustUsageLimits;
3203    }
3204
3205    public void setRollbackOnlyOnAsyncException(boolean rollbackOnlyOnAsyncException) {
3206        this.rollbackOnlyOnAsyncException = rollbackOnlyOnAsyncException;
3207    }
3208
3209    public boolean isRollbackOnlyOnAsyncException() {
3210        return rollbackOnlyOnAsyncException;
3211    }
3212
3213    public boolean isUseVirtualDestSubs() {
3214        return useVirtualDestSubs;
3215    }
3216
3217    public void setUseVirtualDestSubs(
3218            boolean useVirtualDestSubs) {
3219        this.useVirtualDestSubs = useVirtualDestSubs;
3220    }
3221
3222    public boolean isUseVirtualDestSubsOnCreation() {
3223        return useVirtualDestSubsOnCreation;
3224    }
3225
3226    public void setUseVirtualDestSubsOnCreation(
3227            boolean useVirtualDestSubsOnCreation) {
3228        this.useVirtualDestSubsOnCreation = useVirtualDestSubsOnCreation;
3229    }
3230}