001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.io.BufferedReader; 020import java.io.File; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.InputStreamReader; 024import java.net.URI; 025import java.net.URISyntaxException; 026import java.net.UnknownHostException; 027import java.security.Provider; 028import java.security.Security; 029import java.util.ArrayList; 030import java.util.Date; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Locale; 036import java.util.Map; 037import java.util.Set; 038import java.util.concurrent.CopyOnWriteArrayList; 039import java.util.concurrent.CountDownLatch; 040import java.util.concurrent.LinkedBlockingQueue; 041import java.util.concurrent.RejectedExecutionException; 042import java.util.concurrent.RejectedExecutionHandler; 043import java.util.concurrent.SynchronousQueue; 044import java.util.concurrent.ThreadFactory; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.atomic.AtomicBoolean; 048import java.util.concurrent.atomic.AtomicInteger; 049import java.util.concurrent.atomic.AtomicLong; 050 051import javax.annotation.PostConstruct; 052import javax.annotation.PreDestroy; 053import javax.management.MalformedObjectNameException; 054import javax.management.ObjectName; 055 056import org.apache.activemq.ActiveMQConnectionMetaData; 057import org.apache.activemq.ConfigurationException; 058import org.apache.activemq.Service; 059import org.apache.activemq.advisory.AdvisoryBroker; 060import org.apache.activemq.broker.cluster.ConnectionSplitBroker; 061import org.apache.activemq.broker.jmx.AnnotatedMBean; 062import org.apache.activemq.broker.jmx.BrokerMBeanSupport; 063import org.apache.activemq.broker.jmx.BrokerView; 064import org.apache.activemq.broker.jmx.ConnectorView; 065import org.apache.activemq.broker.jmx.ConnectorViewMBean; 066import org.apache.activemq.broker.jmx.HealthView; 067import org.apache.activemq.broker.jmx.HealthViewMBean; 068import org.apache.activemq.broker.jmx.JmsConnectorView; 069import org.apache.activemq.broker.jmx.JobSchedulerView; 070import org.apache.activemq.broker.jmx.JobSchedulerViewMBean; 071import org.apache.activemq.broker.jmx.Log4JConfigView; 072import org.apache.activemq.broker.jmx.ManagedRegionBroker; 073import org.apache.activemq.broker.jmx.ManagementContext; 074import org.apache.activemq.broker.jmx.NetworkConnectorView; 075import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean; 076import org.apache.activemq.broker.jmx.ProxyConnectorView; 077import org.apache.activemq.broker.region.CompositeDestinationInterceptor; 078import org.apache.activemq.broker.region.Destination; 079import org.apache.activemq.broker.region.DestinationFactory; 080import org.apache.activemq.broker.region.DestinationFactoryImpl; 081import org.apache.activemq.broker.region.DestinationInterceptor; 082import org.apache.activemq.broker.region.RegionBroker; 083import org.apache.activemq.broker.region.policy.PolicyMap; 084import org.apache.activemq.broker.region.virtual.MirroredQueue; 085import org.apache.activemq.broker.region.virtual.VirtualDestination; 086import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; 087import org.apache.activemq.broker.region.virtual.VirtualTopic; 088import org.apache.activemq.broker.scheduler.JobSchedulerStore; 089import org.apache.activemq.broker.scheduler.SchedulerBroker; 090import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore; 091import org.apache.activemq.command.ActiveMQDestination; 092import org.apache.activemq.command.ActiveMQQueue; 093import org.apache.activemq.command.BrokerId; 094import org.apache.activemq.command.ProducerInfo; 095import org.apache.activemq.filter.DestinationFilter; 096import org.apache.activemq.network.ConnectionFilter; 097import org.apache.activemq.network.DiscoveryNetworkConnector; 098import org.apache.activemq.network.NetworkConnector; 099import org.apache.activemq.network.jms.JmsConnector; 100import org.apache.activemq.openwire.OpenWireFormat; 101import org.apache.activemq.proxy.ProxyConnector; 102import org.apache.activemq.security.MessageAuthorizationPolicy; 103import org.apache.activemq.selector.SelectorParser; 104import org.apache.activemq.store.JournaledStore; 105import org.apache.activemq.store.PListStore; 106import org.apache.activemq.store.PersistenceAdapter; 107import org.apache.activemq.store.PersistenceAdapterFactory; 108import org.apache.activemq.store.memory.MemoryPersistenceAdapter; 109import org.apache.activemq.thread.Scheduler; 110import org.apache.activemq.thread.TaskRunnerFactory; 111import org.apache.activemq.transport.TransportFactorySupport; 112import org.apache.activemq.transport.TransportServer; 113import org.apache.activemq.transport.vm.VMTransportFactory; 114import org.apache.activemq.usage.PercentLimitUsage; 115import org.apache.activemq.usage.StoreUsage; 116import org.apache.activemq.usage.SystemUsage; 117import org.apache.activemq.util.BrokerSupport; 118import org.apache.activemq.util.DefaultIOExceptionHandler; 119import org.apache.activemq.util.IOExceptionHandler; 120import org.apache.activemq.util.IOExceptionSupport; 121import org.apache.activemq.util.IOHelper; 122import org.apache.activemq.util.InetAddressUtil; 123import org.apache.activemq.util.ServiceStopper; 124import org.apache.activemq.util.StoreUtil; 125import org.apache.activemq.util.ThreadPoolUtils; 126import org.apache.activemq.util.TimeUtils; 127import org.apache.activemq.util.URISupport; 128import org.slf4j.Logger; 129import org.slf4j.LoggerFactory; 130import org.slf4j.MDC; 131 132/** 133 * Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a 134 * number of transport connectors, network connectors and a bunch of properties 135 * which can be used to configure the broker as its lazily created. 136 * 137 * @org.apache.xbean.XBean 138 */ 139public class BrokerService implements Service { 140 public static final String DEFAULT_PORT = "61616"; 141 public static final String LOCAL_HOST_NAME; 142 public static final String BROKER_VERSION; 143 public static final String DEFAULT_BROKER_NAME = "localhost"; 144 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 145 public static final long DEFAULT_START_TIMEOUT = 600000L; 146 147 private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class); 148 149 @SuppressWarnings("unused") 150 private static final long serialVersionUID = 7353129142305630237L; 151 152 private boolean useJmx = true; 153 private boolean enableStatistics = true; 154 private boolean persistent = true; 155 private boolean populateJMSXUserID; 156 private boolean useAuthenticatedPrincipalForJMSXUserID; 157 private boolean populateUserNameInMBeans; 158 private long mbeanInvocationTimeout = 0; 159 160 private boolean useShutdownHook = true; 161 private boolean useLoggingForShutdownErrors; 162 private boolean shutdownOnMasterFailure; 163 private boolean shutdownOnSlaveFailure; 164 private boolean waitForSlave; 165 private long waitForSlaveTimeout = DEFAULT_START_TIMEOUT; 166 private boolean passiveSlave; 167 private String brokerName = DEFAULT_BROKER_NAME; 168 private File dataDirectoryFile; 169 private File tmpDataDirectory; 170 private Broker broker; 171 private BrokerView adminView; 172 private ManagementContext managementContext; 173 private ObjectName brokerObjectName; 174 private TaskRunnerFactory taskRunnerFactory; 175 private TaskRunnerFactory persistenceTaskRunnerFactory; 176 private SystemUsage systemUsage; 177 private SystemUsage producerSystemUsage; 178 private SystemUsage consumerSystemUsaage; 179 private PersistenceAdapter persistenceAdapter; 180 private PersistenceAdapterFactory persistenceFactory; 181 protected DestinationFactory destinationFactory; 182 private MessageAuthorizationPolicy messageAuthorizationPolicy; 183 private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>(); 184 private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>(); 185 private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>(); 186 private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>(); 187 private final List<Service> services = new ArrayList<Service>(); 188 private transient Thread shutdownHook; 189 private String[] transportConnectorURIs; 190 private String[] networkConnectorURIs; 191 private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges 192 // to other jms messaging systems 193 private boolean deleteAllMessagesOnStartup; 194 private boolean advisorySupport = true; 195 private URI vmConnectorURI; 196 private String defaultSocketURIString; 197 private PolicyMap destinationPolicy; 198 private final AtomicBoolean started = new AtomicBoolean(false); 199 private final AtomicBoolean stopped = new AtomicBoolean(false); 200 private final AtomicBoolean stopping = new AtomicBoolean(false); 201 private BrokerPlugin[] plugins; 202 private boolean keepDurableSubsActive = true; 203 private boolean useVirtualTopics = true; 204 private boolean useMirroredQueues = false; 205 private boolean useTempMirroredQueues = true; 206 /** 207 * Whether or not virtual destination subscriptions should cause network demand 208 */ 209 private boolean useVirtualDestSubs = false; 210 /** 211 * Whether or no the creation of destinations that match virtual destinations 212 * should cause network demand 213 */ 214 private boolean useVirtualDestSubsOnCreation = false; 215 private BrokerId brokerId; 216 private volatile DestinationInterceptor[] destinationInterceptors; 217 private ActiveMQDestination[] destinations; 218 private PListStore tempDataStore; 219 private int persistenceThreadPriority = Thread.MAX_PRIORITY; 220 private boolean useLocalHostBrokerName; 221 private final CountDownLatch stoppedLatch = new CountDownLatch(1); 222 private final CountDownLatch startedLatch = new CountDownLatch(1); 223 private Broker regionBroker; 224 private int producerSystemUsagePortion = 60; 225 private int consumerSystemUsagePortion = 40; 226 private boolean splitSystemUsageForProducersConsumers; 227 private boolean monitorConnectionSplits = false; 228 private int taskRunnerPriority = Thread.NORM_PRIORITY; 229 private boolean dedicatedTaskRunner; 230 private boolean cacheTempDestinations = false;// useful for failover 231 private int timeBeforePurgeTempDestinations = 5000; 232 private final List<Runnable> shutdownHooks = new ArrayList<Runnable>(); 233 private boolean systemExitOnShutdown; 234 private int systemExitOnShutdownExitCode; 235 private SslContext sslContext; 236 private boolean forceStart = false; 237 private IOExceptionHandler ioExceptionHandler; 238 private boolean schedulerSupport = false; 239 private File schedulerDirectoryFile; 240 private Scheduler scheduler; 241 private ThreadPoolExecutor executor; 242 private int schedulePeriodForDestinationPurge= 0; 243 private int maxPurgedDestinationsPerSweep = 0; 244 private int schedulePeriodForDiskUsageCheck = 0; 245 private int diskUsageCheckRegrowThreshold = -1; 246 private boolean adjustUsageLimits = true; 247 private BrokerContext brokerContext; 248 private boolean networkConnectorStartAsync = false; 249 private boolean allowTempAutoCreationOnSend; 250 private JobSchedulerStore jobSchedulerStore; 251 private final AtomicLong totalConnections = new AtomicLong(); 252 private final AtomicInteger currentConnections = new AtomicInteger(); 253 254 private long offlineDurableSubscriberTimeout = -1; 255 private long offlineDurableSubscriberTaskSchedule = 300000; 256 private DestinationFilter virtualConsumerDestinationFilter; 257 258 private final Object persistenceAdapterLock = new Object(); 259 private Throwable startException = null; 260 private boolean startAsync = false; 261 private Date startDate; 262 private boolean slave = true; 263 264 private boolean restartAllowed = true; 265 private boolean restartRequested = false; 266 private boolean rejectDurableConsumers = false; 267 private boolean rollbackOnlyOnAsyncException = true; 268 269 private int storeOpenWireVersion = OpenWireFormat.DEFAULT_VERSION; 270 271 static { 272 273 try { 274 ClassLoader loader = BrokerService.class.getClassLoader(); 275 Class<?> clazz = loader.loadClass("org.bouncycastle.jce.provider.BouncyCastleProvider"); 276 Provider bouncycastle = (Provider) clazz.newInstance(); 277 Security.insertProviderAt(bouncycastle, 2); 278 LOG.info("Loaded the Bouncy Castle security provider."); 279 } catch(Throwable e) { 280 // No BouncyCastle found so we use the default Java Security Provider 281 } 282 283 String localHostName = "localhost"; 284 try { 285 localHostName = InetAddressUtil.getLocalHostName(); 286 } catch (UnknownHostException e) { 287 LOG.error("Failed to resolve localhost"); 288 } 289 LOCAL_HOST_NAME = localHostName; 290 291 String version = null; 292 try(InputStream in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) { 293 if (in != null) { 294 try(InputStreamReader isr = new InputStreamReader(in); 295 BufferedReader reader = new BufferedReader(isr)) { 296 version = reader.readLine(); 297 } 298 } 299 } catch (IOException ie) { 300 LOG.warn("Error reading broker version ", ie); 301 } 302 BROKER_VERSION = version; 303 } 304 305 @Override 306 public String toString() { 307 return "BrokerService[" + getBrokerName() + "]"; 308 } 309 310 private String getBrokerVersion() { 311 String version = ActiveMQConnectionMetaData.PROVIDER_VERSION; 312 if (version == null) { 313 version = BROKER_VERSION; 314 } 315 316 return version; 317 } 318 319 /** 320 * Adds a new transport connector for the given bind address 321 * 322 * @return the newly created and added transport connector 323 * @throws Exception 324 */ 325 public TransportConnector addConnector(String bindAddress) throws Exception { 326 return addConnector(new URI(bindAddress)); 327 } 328 329 /** 330 * Adds a new transport connector for the given bind address 331 * 332 * @return the newly created and added transport connector 333 * @throws Exception 334 */ 335 public TransportConnector addConnector(URI bindAddress) throws Exception { 336 return addConnector(createTransportConnector(bindAddress)); 337 } 338 339 /** 340 * Adds a new transport connector for the given TransportServer transport 341 * 342 * @return the newly created and added transport connector 343 * @throws Exception 344 */ 345 public TransportConnector addConnector(TransportServer transport) throws Exception { 346 return addConnector(new TransportConnector(transport)); 347 } 348 349 /** 350 * Adds a new transport connector 351 * 352 * @return the transport connector 353 * @throws Exception 354 */ 355 public TransportConnector addConnector(TransportConnector connector) throws Exception { 356 transportConnectors.add(connector); 357 return connector; 358 } 359 360 /** 361 * Stops and removes a transport connector from the broker. 362 * 363 * @param connector 364 * @return true if the connector has been previously added to the broker 365 * @throws Exception 366 */ 367 public boolean removeConnector(TransportConnector connector) throws Exception { 368 boolean rc = transportConnectors.remove(connector); 369 if (rc) { 370 unregisterConnectorMBean(connector); 371 } 372 return rc; 373 } 374 375 /** 376 * Adds a new network connector using the given discovery address 377 * 378 * @return the newly created and added network connector 379 * @throws Exception 380 */ 381 public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception { 382 return addNetworkConnector(new URI(discoveryAddress)); 383 } 384 385 /** 386 * Adds a new proxy connector using the given bind address 387 * 388 * @return the newly created and added network connector 389 * @throws Exception 390 */ 391 public ProxyConnector addProxyConnector(String bindAddress) throws Exception { 392 return addProxyConnector(new URI(bindAddress)); 393 } 394 395 /** 396 * Adds a new network connector using the given discovery address 397 * 398 * @return the newly created and added network connector 399 * @throws Exception 400 */ 401 public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception { 402 NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress); 403 return addNetworkConnector(connector); 404 } 405 406 /** 407 * Adds a new proxy connector using the given bind address 408 * 409 * @return the newly created and added network connector 410 * @throws Exception 411 */ 412 public ProxyConnector addProxyConnector(URI bindAddress) throws Exception { 413 ProxyConnector connector = new ProxyConnector(); 414 connector.setBind(bindAddress); 415 connector.setRemote(new URI("fanout:multicast://default")); 416 return addProxyConnector(connector); 417 } 418 419 /** 420 * Adds a new network connector to connect this broker to a federated 421 * network 422 */ 423 public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception { 424 connector.setBrokerService(this); 425 connector.setLocalUri(getVmConnectorURI()); 426 // Set a connection filter so that the connector does not establish loop 427 // back connections. 428 connector.setConnectionFilter(new ConnectionFilter() { 429 @Override 430 public boolean connectTo(URI location) { 431 List<TransportConnector> transportConnectors = getTransportConnectors(); 432 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) { 433 try { 434 TransportConnector tc = iter.next(); 435 if (location.equals(tc.getConnectUri())) { 436 return false; 437 } 438 } catch (Throwable e) { 439 } 440 } 441 return true; 442 } 443 }); 444 networkConnectors.add(connector); 445 return connector; 446 } 447 448 /** 449 * Removes the given network connector without stopping it. The caller 450 * should call {@link NetworkConnector#stop()} to close the connector 451 */ 452 public boolean removeNetworkConnector(NetworkConnector connector) { 453 boolean answer = networkConnectors.remove(connector); 454 if (answer) { 455 unregisterNetworkConnectorMBean(connector); 456 } 457 return answer; 458 } 459 460 public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception { 461 URI uri = getVmConnectorURI(); 462 connector.setLocalUri(uri); 463 proxyConnectors.add(connector); 464 if (isUseJmx()) { 465 registerProxyConnectorMBean(connector); 466 } 467 return connector; 468 } 469 470 public JmsConnector addJmsConnector(JmsConnector connector) throws Exception { 471 connector.setBrokerService(this); 472 jmsConnectors.add(connector); 473 if (isUseJmx()) { 474 registerJmsConnectorMBean(connector); 475 } 476 return connector; 477 } 478 479 public JmsConnector removeJmsConnector(JmsConnector connector) { 480 if (jmsConnectors.remove(connector)) { 481 return connector; 482 } 483 return null; 484 } 485 486 public void masterFailed() { 487 if (shutdownOnMasterFailure) { 488 LOG.error("The Master has failed ... shutting down"); 489 try { 490 stop(); 491 } catch (Exception e) { 492 LOG.error("Failed to stop for master failure", e); 493 } 494 } else { 495 LOG.warn("Master Failed - starting all connectors"); 496 try { 497 startAllConnectors(); 498 broker.nowMasterBroker(); 499 } catch (Exception e) { 500 LOG.error("Failed to startAllConnectors", e); 501 } 502 } 503 } 504 505 public String getUptime() { 506 long delta = getUptimeMillis(); 507 508 if (delta == 0) { 509 return "not started"; 510 } 511 512 return TimeUtils.printDuration(delta); 513 } 514 515 public long getUptimeMillis() { 516 if (startDate == null) { 517 return 0; 518 } 519 520 return new Date().getTime() - startDate.getTime(); 521 } 522 523 public boolean isStarted() { 524 return started.get() && startedLatch.getCount() == 0; 525 } 526 527 /** 528 * Forces a start of the broker. 529 * By default a BrokerService instance that was 530 * previously stopped using BrokerService.stop() cannot be restarted 531 * using BrokerService.start(). 532 * This method enforces a restart. 533 * It is not recommended to force a restart of the broker and will not work 534 * for most but some very trivial broker configurations. 535 * For restarting a broker instance we recommend to first call stop() on 536 * the old instance and then recreate a new BrokerService instance. 537 * 538 * @param force - if true enforces a restart. 539 * @throws Exception 540 */ 541 public void start(boolean force) throws Exception { 542 forceStart = force; 543 stopped.set(false); 544 started.set(false); 545 start(); 546 } 547 548 // Service interface 549 // ------------------------------------------------------------------------- 550 551 protected boolean shouldAutostart() { 552 return true; 553 } 554 555 /** 556 * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions 557 * 558 * delegates to autoStart, done to prevent backwards incompatible signature change 559 */ 560 @PostConstruct 561 private void postConstruct() { 562 try { 563 autoStart(); 564 } catch (Exception ex) { 565 throw new RuntimeException(ex); 566 } 567 } 568 569 /** 570 * 571 * @throws Exception 572 * @org. apache.xbean.InitMethod 573 */ 574 public void autoStart() throws Exception { 575 if(shouldAutostart()) { 576 start(); 577 } 578 } 579 580 @Override 581 public void start() throws Exception { 582 if (stopped.get() || !started.compareAndSet(false, true)) { 583 // lets just ignore redundant start() calls 584 // as its way too easy to not be completely sure if start() has been 585 // called or not with the gazillion of different configuration 586 // mechanisms 587 // throw new IllegalStateException("Already started."); 588 return; 589 } 590 591 setStartException(null); 592 stopping.set(false); 593 startDate = new Date(); 594 MDC.put("activemq.broker", brokerName); 595 596 try { 597 checkMemorySystemUsageLimits(); 598 if (systemExitOnShutdown && useShutdownHook) { 599 throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)"); 600 } 601 processHelperProperties(); 602 if (isUseJmx()) { 603 // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and 604 // we cannot cleanup clear that during shutdown of the broker. 605 MDC.remove("activemq.broker"); 606 try { 607 startManagementContext(); 608 for (NetworkConnector connector : getNetworkConnectors()) { 609 registerNetworkConnectorMBean(connector); 610 } 611 } finally { 612 MDC.put("activemq.broker", brokerName); 613 } 614 } 615 616 // in jvm master slave, lets not publish over existing broker till we get the lock 617 final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance(); 618 if (brokerRegistry.lookup(getBrokerName()) == null) { 619 brokerRegistry.bind(getBrokerName(), BrokerService.this); 620 } 621 startPersistenceAdapter(startAsync); 622 startBroker(startAsync); 623 brokerRegistry.bind(getBrokerName(), BrokerService.this); 624 } catch (Exception e) { 625 LOG.error("Failed to start Apache ActiveMQ (" + getBrokerName() + ", " + brokerId + ")", e); 626 try { 627 if (!stopped.get()) { 628 stop(); 629 } 630 } catch (Exception ex) { 631 LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex); 632 } 633 throw e; 634 } finally { 635 MDC.remove("activemq.broker"); 636 } 637 } 638 639 private void startPersistenceAdapter(boolean async) throws Exception { 640 if (async) { 641 new Thread("Persistence Adapter Starting Thread") { 642 @Override 643 public void run() { 644 try { 645 doStartPersistenceAdapter(); 646 } catch (Throwable e) { 647 setStartException(e); 648 } finally { 649 synchronized (persistenceAdapterLock) { 650 persistenceAdapterLock.notifyAll(); 651 } 652 } 653 } 654 }.start(); 655 } else { 656 doStartPersistenceAdapter(); 657 } 658 } 659 660 private void doStartPersistenceAdapter() throws Exception { 661 PersistenceAdapter persistenceAdapterToStart = getPersistenceAdapter(); 662 if (persistenceAdapterToStart == null) { 663 checkStartException(); 664 throw new ConfigurationException("Cannot start null persistence adapter"); 665 } 666 persistenceAdapterToStart.setUsageManager(getProducerSystemUsage()); 667 persistenceAdapterToStart.setBrokerName(getBrokerName()); 668 LOG.info("Using Persistence Adapter: {}", persistenceAdapterToStart); 669 if (deleteAllMessagesOnStartup) { 670 deleteAllMessages(); 671 } 672 persistenceAdapterToStart.start(); 673 674 getTempDataStore(); 675 if (tempDataStore != null) { 676 try { 677 // start after we have the store lock 678 tempDataStore.start(); 679 } catch (Exception e) { 680 RuntimeException exception = new RuntimeException( 681 "Failed to start temp data store: " + tempDataStore, e); 682 LOG.error(exception.getLocalizedMessage(), e); 683 throw exception; 684 } 685 } 686 687 getJobSchedulerStore(); 688 if (jobSchedulerStore != null) { 689 try { 690 jobSchedulerStore.start(); 691 } catch (Exception e) { 692 RuntimeException exception = new RuntimeException( 693 "Failed to start job scheduler store: " + jobSchedulerStore, e); 694 LOG.error(exception.getLocalizedMessage(), e); 695 throw exception; 696 } 697 } 698 } 699 700 private void startBroker(boolean async) throws Exception { 701 if (async) { 702 new Thread("Broker Starting Thread") { 703 @Override 704 public void run() { 705 try { 706 synchronized (persistenceAdapterLock) { 707 persistenceAdapterLock.wait(); 708 } 709 doStartBroker(); 710 } catch (Throwable t) { 711 setStartException(t); 712 } 713 } 714 }.start(); 715 } else { 716 doStartBroker(); 717 } 718 } 719 720 private void doStartBroker() throws Exception { 721 checkStartException(); 722 startDestinations(); 723 addShutdownHook(); 724 725 broker = getBroker(); 726 brokerId = broker.getBrokerId(); 727 728 // need to log this after creating the broker so we have its id and name 729 LOG.info("Apache ActiveMQ {} ({}, {}) is starting", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId }); 730 broker.start(); 731 732 if (isUseJmx()) { 733 if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) { 734 // try to restart management context 735 // typical for slaves that use the same ports as master 736 managementContext.stop(); 737 startManagementContext(); 738 } 739 ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker; 740 managedBroker.setContextBroker(broker); 741 adminView.setBroker(managedBroker); 742 } 743 744 if (ioExceptionHandler == null) { 745 setIoExceptionHandler(new DefaultIOExceptionHandler()); 746 } 747 748 if (isUseJmx() && Log4JConfigView.isLog4JAvailable()) { 749 ObjectName objectName = BrokerMBeanSupport.createLog4JConfigViewName(getBrokerObjectName().toString()); 750 Log4JConfigView log4jConfigView = new Log4JConfigView(); 751 AnnotatedMBean.registerMBean(getManagementContext(), log4jConfigView, objectName); 752 } 753 754 startAllConnectors(); 755 756 LOG.info("Apache ActiveMQ {} ({}, {}) started", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId}); 757 LOG.info("For help or more information please see: http://activemq.apache.org"); 758 759 getBroker().brokerServiceStarted(); 760 checkStoreSystemUsageLimits(); 761 startedLatch.countDown(); 762 getBroker().nowMasterBroker(); 763 } 764 765 /** 766 * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions 767 * 768 * delegates to stop, done to prevent backwards incompatible signature change 769 */ 770 @PreDestroy 771 private void preDestroy () { 772 try { 773 stop(); 774 } catch (Exception ex) { 775 throw new RuntimeException(); 776 } 777 } 778 779 /** 780 * 781 * @throws Exception 782 * @org.apache .xbean.DestroyMethod 783 */ 784 @Override 785 public void stop() throws Exception { 786 if (!stopping.compareAndSet(false, true)) { 787 LOG.trace("Broker already stopping/stopped"); 788 return; 789 } 790 791 setStartException(new BrokerStoppedException("Stop invoked")); 792 MDC.put("activemq.broker", brokerName); 793 794 if (systemExitOnShutdown) { 795 new Thread() { 796 @Override 797 public void run() { 798 System.exit(systemExitOnShutdownExitCode); 799 } 800 }.start(); 801 } 802 803 LOG.info("Apache ActiveMQ {} ({}, {}) is shutting down", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId} ); 804 805 removeShutdownHook(); 806 if (this.scheduler != null) { 807 this.scheduler.stop(); 808 this.scheduler = null; 809 } 810 ServiceStopper stopper = new ServiceStopper(); 811 if (services != null) { 812 for (Service service : services) { 813 stopper.stop(service); 814 } 815 } 816 stopAllConnectors(stopper); 817 this.slave = true; 818 // remove any VMTransports connected 819 // this has to be done after services are stopped, 820 // to avoid timing issue with discovery (spinning up a new instance) 821 BrokerRegistry.getInstance().unbind(getBrokerName()); 822 VMTransportFactory.stopped(getBrokerName()); 823 if (broker != null) { 824 stopper.stop(broker); 825 broker = null; 826 } 827 828 if (jobSchedulerStore != null) { 829 jobSchedulerStore.stop(); 830 jobSchedulerStore = null; 831 } 832 if (tempDataStore != null) { 833 tempDataStore.stop(); 834 tempDataStore = null; 835 } 836 try { 837 stopper.stop(getPersistenceAdapter()); 838 persistenceAdapter = null; 839 if (isUseJmx()) { 840 stopper.stop(managementContext); 841 managementContext = null; 842 } 843 // Clear SelectorParser cache to free memory 844 SelectorParser.clearCache(); 845 } finally { 846 started.set(false); 847 stopped.set(true); 848 stoppedLatch.countDown(); 849 } 850 851 if (this.taskRunnerFactory != null) { 852 this.taskRunnerFactory.shutdown(); 853 this.taskRunnerFactory = null; 854 } 855 if (this.executor != null) { 856 ThreadPoolUtils.shutdownNow(executor); 857 this.executor = null; 858 } 859 860 this.destinationInterceptors = null; 861 this.destinationFactory = null; 862 863 if (startDate != null) { 864 LOG.info("Apache ActiveMQ {} ({}, {}) uptime {}", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId, getUptime()}); 865 } 866 LOG.info("Apache ActiveMQ {} ({}, {}) is shutdown", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId}); 867 868 synchronized (shutdownHooks) { 869 for (Runnable hook : shutdownHooks) { 870 try { 871 hook.run(); 872 } catch (Throwable e) { 873 stopper.onException(hook, e); 874 } 875 } 876 } 877 878 MDC.remove("activemq.broker"); 879 880 // and clear start date 881 startDate = null; 882 883 stopper.throwFirstException(); 884 } 885 886 public boolean checkQueueSize(String queueName) { 887 long count = 0; 888 long queueSize = 0; 889 Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap(); 890 for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) { 891 if (entry.getKey().isQueue()) { 892 if (entry.getValue().getName().matches(queueName)) { 893 queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount(); 894 count += queueSize; 895 if (queueSize > 0) { 896 LOG.info("Queue has pending message: {} queueSize is: {}", entry.getValue().getName(), queueSize); 897 } 898 } 899 } 900 } 901 return count == 0; 902 } 903 904 /** 905 * This method (both connectorName and queueName are using regex to match) 906 * 1. stop the connector (supposed the user input the connector which the 907 * clients connect to) 2. to check whether there is any pending message on 908 * the queues defined by queueName 3. supposedly, after stop the connector, 909 * client should failover to other broker and pending messages should be 910 * forwarded. if no pending messages, the method finally call stop to stop 911 * the broker. 912 * 913 * @param connectorName 914 * @param queueName 915 * @param timeout 916 * @param pollInterval 917 * @throws Exception 918 */ 919 public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception { 920 if (isUseJmx()) { 921 if (connectorName == null || queueName == null || timeout <= 0) { 922 throw new Exception( 923 "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully."); 924 } 925 if (pollInterval <= 0) { 926 pollInterval = 30; 927 } 928 LOG.info("Stop gracefully with connectorName: {} queueName: {} timeout: {} pollInterval: {}", new Object[]{ 929 connectorName, queueName, timeout, pollInterval 930 }); 931 TransportConnector connector; 932 for (int i = 0; i < transportConnectors.size(); i++) { 933 connector = transportConnectors.get(i); 934 if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) { 935 connector.stop(); 936 } 937 } 938 long start = System.currentTimeMillis(); 939 while (System.currentTimeMillis() - start < timeout * 1000) { 940 // check quesize until it gets zero 941 if (checkQueueSize(queueName)) { 942 stop(); 943 break; 944 } else { 945 Thread.sleep(pollInterval * 1000); 946 } 947 } 948 if (stopped.get()) { 949 LOG.info("Successfully stop the broker."); 950 } else { 951 LOG.info("There is still pending message on the queue. Please check and stop the broker manually."); 952 } 953 } 954 } 955 956 /** 957 * A helper method to block the caller thread until the broker has been 958 * stopped 959 */ 960 public void waitUntilStopped() { 961 while (isStarted() && !stopped.get()) { 962 try { 963 stoppedLatch.await(); 964 } catch (InterruptedException e) { 965 // ignore 966 } 967 } 968 } 969 970 public boolean isStopped() { 971 return stopped.get(); 972 } 973 974 /** 975 * A helper method to block the caller thread until the broker has fully started 976 * @return boolean true if wait succeeded false if broker was not started or was stopped 977 */ 978 public boolean waitUntilStarted() { 979 return waitUntilStarted(DEFAULT_START_TIMEOUT); 980 } 981 982 /** 983 * A helper method to block the caller thread until the broker has fully started 984 * 985 * @param timeout 986 * the amount of time to wait before giving up and returning false. 987 * 988 * @return boolean true if wait succeeded false if broker was not started or was stopped 989 */ 990 public boolean waitUntilStarted(long timeout) { 991 boolean waitSucceeded = isStarted(); 992 long expiration = Math.max(0, timeout + System.currentTimeMillis()); 993 while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) { 994 try { 995 if (getStartException() != null) { 996 return waitSucceeded; 997 } 998 waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS); 999 } catch (InterruptedException ignore) { 1000 } 1001 } 1002 return waitSucceeded; 1003 } 1004 1005 // Properties 1006 // ------------------------------------------------------------------------- 1007 /** 1008 * Returns the message broker 1009 */ 1010 public Broker getBroker() throws Exception { 1011 if (broker == null) { 1012 checkStartException(); 1013 broker = createBroker(); 1014 } 1015 return broker; 1016 } 1017 1018 /** 1019 * Returns the administration view of the broker; used to create and destroy 1020 * resources such as queues and topics. Note this method returns null if JMX 1021 * is disabled. 1022 */ 1023 public BrokerView getAdminView() throws Exception { 1024 if (adminView == null) { 1025 // force lazy creation 1026 getBroker(); 1027 } 1028 return adminView; 1029 } 1030 1031 public void setAdminView(BrokerView adminView) { 1032 this.adminView = adminView; 1033 } 1034 1035 public String getBrokerName() { 1036 return brokerName; 1037 } 1038 1039 /** 1040 * Sets the name of this broker; which must be unique in the network 1041 * 1042 * @param brokerName 1043 */ 1044 private static final String brokerNameReplacedCharsRegExp = "[^a-zA-Z0-9\\.\\_\\-\\:]"; 1045 public void setBrokerName(String brokerName) { 1046 if (brokerName == null) { 1047 throw new NullPointerException("The broker name cannot be null"); 1048 } 1049 String str = brokerName.replaceAll(brokerNameReplacedCharsRegExp, "_"); 1050 if (!str.equals(brokerName)) { 1051 LOG.error("Broker Name: {} contained illegal characters matching regExp: {} - replaced with {}", brokerName, brokerNameReplacedCharsRegExp, str); 1052 } 1053 this.brokerName = str.trim(); 1054 } 1055 1056 public PersistenceAdapterFactory getPersistenceFactory() { 1057 return persistenceFactory; 1058 } 1059 1060 public File getDataDirectoryFile() { 1061 if (dataDirectoryFile == null) { 1062 dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory()); 1063 } 1064 return dataDirectoryFile; 1065 } 1066 1067 public File getBrokerDataDirectory() { 1068 String brokerDir = getBrokerName(); 1069 return new File(getDataDirectoryFile(), brokerDir); 1070 } 1071 1072 /** 1073 * Sets the directory in which the data files will be stored by default for 1074 * the JDBC and Journal persistence adaptors. 1075 * 1076 * @param dataDirectory 1077 * the directory to store data files 1078 */ 1079 public void setDataDirectory(String dataDirectory) { 1080 setDataDirectoryFile(new File(dataDirectory)); 1081 } 1082 1083 /** 1084 * Sets the directory in which the data files will be stored by default for 1085 * the JDBC and Journal persistence adaptors. 1086 * 1087 * @param dataDirectoryFile 1088 * the directory to store data files 1089 */ 1090 public void setDataDirectoryFile(File dataDirectoryFile) { 1091 this.dataDirectoryFile = dataDirectoryFile; 1092 } 1093 1094 /** 1095 * @return the tmpDataDirectory 1096 */ 1097 public File getTmpDataDirectory() { 1098 if (tmpDataDirectory == null) { 1099 tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage"); 1100 } 1101 return tmpDataDirectory; 1102 } 1103 1104 /** 1105 * @param tmpDataDirectory 1106 * the tmpDataDirectory to set 1107 */ 1108 public void setTmpDataDirectory(File tmpDataDirectory) { 1109 this.tmpDataDirectory = tmpDataDirectory; 1110 } 1111 1112 public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) { 1113 this.persistenceFactory = persistenceFactory; 1114 } 1115 1116 public void setDestinationFactory(DestinationFactory destinationFactory) { 1117 this.destinationFactory = destinationFactory; 1118 } 1119 1120 public boolean isPersistent() { 1121 return persistent; 1122 } 1123 1124 /** 1125 * Sets whether or not persistence is enabled or disabled. 1126 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1127 */ 1128 public void setPersistent(boolean persistent) { 1129 this.persistent = persistent; 1130 } 1131 1132 public boolean isPopulateJMSXUserID() { 1133 return populateJMSXUserID; 1134 } 1135 1136 /** 1137 * Sets whether or not the broker should populate the JMSXUserID header. 1138 */ 1139 public void setPopulateJMSXUserID(boolean populateJMSXUserID) { 1140 this.populateJMSXUserID = populateJMSXUserID; 1141 } 1142 1143 public SystemUsage getSystemUsage() { 1144 try { 1145 if (systemUsage == null) { 1146 1147 systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore()); 1148 systemUsage.setExecutor(getExecutor()); 1149 systemUsage.getMemoryUsage().setLimit(1024L * 1024 * 1024 * 1); // 1 GB 1150 systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB 1151 systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB 1152 systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB 1153 addService(this.systemUsage); 1154 } 1155 return systemUsage; 1156 } catch (IOException e) { 1157 LOG.error("Cannot create SystemUsage", e); 1158 throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage(), e); 1159 } 1160 } 1161 1162 public void setSystemUsage(SystemUsage memoryManager) { 1163 if (this.systemUsage != null) { 1164 removeService(this.systemUsage); 1165 } 1166 this.systemUsage = memoryManager; 1167 if (this.systemUsage.getExecutor()==null) { 1168 this.systemUsage.setExecutor(getExecutor()); 1169 } 1170 addService(this.systemUsage); 1171 } 1172 1173 /** 1174 * @return the consumerUsageManager 1175 * @throws IOException 1176 */ 1177 public SystemUsage getConsumerSystemUsage() throws IOException { 1178 if (this.consumerSystemUsaage == null) { 1179 if (splitSystemUsageForProducersConsumers) { 1180 this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer"); 1181 float portion = consumerSystemUsagePortion / 100f; 1182 this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion); 1183 addService(this.consumerSystemUsaage); 1184 } else { 1185 consumerSystemUsaage = getSystemUsage(); 1186 } 1187 } 1188 return this.consumerSystemUsaage; 1189 } 1190 1191 /** 1192 * @param consumerSystemUsaage 1193 * the storeSystemUsage to set 1194 */ 1195 public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) { 1196 if (this.consumerSystemUsaage != null) { 1197 removeService(this.consumerSystemUsaage); 1198 } 1199 this.consumerSystemUsaage = consumerSystemUsaage; 1200 addService(this.consumerSystemUsaage); 1201 } 1202 1203 /** 1204 * @return the producerUsageManager 1205 * @throws IOException 1206 */ 1207 public SystemUsage getProducerSystemUsage() throws IOException { 1208 if (producerSystemUsage == null) { 1209 if (splitSystemUsageForProducersConsumers) { 1210 producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer"); 1211 float portion = producerSystemUsagePortion / 100f; 1212 producerSystemUsage.getMemoryUsage().setUsagePortion(portion); 1213 addService(producerSystemUsage); 1214 } else { 1215 producerSystemUsage = getSystemUsage(); 1216 } 1217 } 1218 return producerSystemUsage; 1219 } 1220 1221 /** 1222 * @param producerUsageManager 1223 * the producerUsageManager to set 1224 */ 1225 public void setProducerSystemUsage(SystemUsage producerUsageManager) { 1226 if (this.producerSystemUsage != null) { 1227 removeService(this.producerSystemUsage); 1228 } 1229 this.producerSystemUsage = producerUsageManager; 1230 addService(this.producerSystemUsage); 1231 } 1232 1233 public synchronized PersistenceAdapter getPersistenceAdapter() throws IOException { 1234 if (persistenceAdapter == null && !hasStartException()) { 1235 persistenceAdapter = createPersistenceAdapter(); 1236 configureService(persistenceAdapter); 1237 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); 1238 } 1239 return persistenceAdapter; 1240 } 1241 1242 /** 1243 * Sets the persistence adaptor implementation to use for this broker 1244 * 1245 * @throws IOException 1246 */ 1247 public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException { 1248 if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) { 1249 LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: {}", persistenceAdapter); 1250 return; 1251 } 1252 this.persistenceAdapter = persistenceAdapter; 1253 configureService(this.persistenceAdapter); 1254 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); 1255 } 1256 1257 public TaskRunnerFactory getTaskRunnerFactory() { 1258 if (this.taskRunnerFactory == null) { 1259 this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000, 1260 isDedicatedTaskRunner()); 1261 this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader()); 1262 } 1263 return this.taskRunnerFactory; 1264 } 1265 1266 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 1267 this.taskRunnerFactory = taskRunnerFactory; 1268 } 1269 1270 public TaskRunnerFactory getPersistenceTaskRunnerFactory() { 1271 if (taskRunnerFactory == null) { 1272 persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, 1273 true, 1000, isDedicatedTaskRunner()); 1274 } 1275 return persistenceTaskRunnerFactory; 1276 } 1277 1278 public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) { 1279 this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory; 1280 } 1281 1282 public boolean isUseJmx() { 1283 return useJmx; 1284 } 1285 1286 public boolean isEnableStatistics() { 1287 return enableStatistics; 1288 } 1289 1290 /** 1291 * Sets whether or not the Broker's services enable statistics or not. 1292 */ 1293 public void setEnableStatistics(boolean enableStatistics) { 1294 this.enableStatistics = enableStatistics; 1295 } 1296 1297 /** 1298 * Sets whether or not the Broker's services should be exposed into JMX or 1299 * not. 1300 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1301 */ 1302 public void setUseJmx(boolean useJmx) { 1303 this.useJmx = useJmx; 1304 } 1305 1306 public ObjectName getBrokerObjectName() throws MalformedObjectNameException { 1307 if (brokerObjectName == null) { 1308 brokerObjectName = createBrokerObjectName(); 1309 } 1310 return brokerObjectName; 1311 } 1312 1313 /** 1314 * Sets the JMX ObjectName for this broker 1315 */ 1316 public void setBrokerObjectName(ObjectName brokerObjectName) { 1317 this.brokerObjectName = brokerObjectName; 1318 } 1319 1320 public ManagementContext getManagementContext() { 1321 if (managementContext == null) { 1322 checkStartException(); 1323 managementContext = new ManagementContext(); 1324 } 1325 return managementContext; 1326 } 1327 1328 synchronized private void checkStartException() { 1329 if (startException != null) { 1330 throw new BrokerStoppedException(startException); 1331 } 1332 } 1333 1334 synchronized private boolean hasStartException() { 1335 return startException != null; 1336 } 1337 1338 synchronized private void setStartException(Throwable t) { 1339 startException = t; 1340 } 1341 1342 public void setManagementContext(ManagementContext managementContext) { 1343 this.managementContext = managementContext; 1344 } 1345 1346 public NetworkConnector getNetworkConnectorByName(String connectorName) { 1347 for (NetworkConnector connector : networkConnectors) { 1348 if (connector.getName().equals(connectorName)) { 1349 return connector; 1350 } 1351 } 1352 return null; 1353 } 1354 1355 public String[] getNetworkConnectorURIs() { 1356 return networkConnectorURIs; 1357 } 1358 1359 public void setNetworkConnectorURIs(String[] networkConnectorURIs) { 1360 this.networkConnectorURIs = networkConnectorURIs; 1361 } 1362 1363 public TransportConnector getConnectorByName(String connectorName) { 1364 for (TransportConnector connector : transportConnectors) { 1365 if (connector.getName().equals(connectorName)) { 1366 return connector; 1367 } 1368 } 1369 return null; 1370 } 1371 1372 public Map<String, String> getTransportConnectorURIsAsMap() { 1373 Map<String, String> answer = new HashMap<String, String>(); 1374 for (TransportConnector connector : transportConnectors) { 1375 try { 1376 URI uri = connector.getConnectUri(); 1377 if (uri != null) { 1378 String scheme = uri.getScheme(); 1379 if (scheme != null) { 1380 answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString()); 1381 } 1382 } 1383 } catch (Exception e) { 1384 LOG.debug("Failed to read URI to build transportURIsAsMap", e); 1385 } 1386 } 1387 return answer; 1388 } 1389 1390 public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){ 1391 ProducerBrokerExchange result = null; 1392 1393 for (TransportConnector connector : transportConnectors) { 1394 for (TransportConnection tc: connector.getConnections()){ 1395 result = tc.getProducerBrokerExchangeIfExists(producerInfo); 1396 if (result !=null){ 1397 return result; 1398 } 1399 } 1400 } 1401 return result; 1402 } 1403 1404 public String[] getTransportConnectorURIs() { 1405 return transportConnectorURIs; 1406 } 1407 1408 public void setTransportConnectorURIs(String[] transportConnectorURIs) { 1409 this.transportConnectorURIs = transportConnectorURIs; 1410 } 1411 1412 /** 1413 * @return Returns the jmsBridgeConnectors. 1414 */ 1415 public JmsConnector[] getJmsBridgeConnectors() { 1416 return jmsBridgeConnectors; 1417 } 1418 1419 /** 1420 * @param jmsConnectors 1421 * The jmsBridgeConnectors to set. 1422 */ 1423 public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) { 1424 this.jmsBridgeConnectors = jmsConnectors; 1425 } 1426 1427 public Service[] getServices() { 1428 return services.toArray(new Service[0]); 1429 } 1430 1431 /** 1432 * Sets the services associated with this broker. 1433 */ 1434 public void setServices(Service[] services) { 1435 this.services.clear(); 1436 if (services != null) { 1437 for (int i = 0; i < services.length; i++) { 1438 this.services.add(services[i]); 1439 } 1440 } 1441 } 1442 1443 /** 1444 * Adds a new service so that it will be started as part of the broker 1445 * lifecycle 1446 */ 1447 public void addService(Service service) { 1448 services.add(service); 1449 } 1450 1451 public void removeService(Service service) { 1452 services.remove(service); 1453 } 1454 1455 public boolean isUseLoggingForShutdownErrors() { 1456 return useLoggingForShutdownErrors; 1457 } 1458 1459 /** 1460 * Sets whether or not we should use commons-logging when reporting errors 1461 * when shutting down the broker 1462 */ 1463 public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) { 1464 this.useLoggingForShutdownErrors = useLoggingForShutdownErrors; 1465 } 1466 1467 public boolean isUseShutdownHook() { 1468 return useShutdownHook; 1469 } 1470 1471 /** 1472 * Sets whether or not we should use a shutdown handler to close down the 1473 * broker cleanly if the JVM is terminated. It is recommended you leave this 1474 * enabled. 1475 */ 1476 public void setUseShutdownHook(boolean useShutdownHook) { 1477 this.useShutdownHook = useShutdownHook; 1478 } 1479 1480 public boolean isAdvisorySupport() { 1481 return advisorySupport; 1482 } 1483 1484 /** 1485 * Allows the support of advisory messages to be disabled for performance 1486 * reasons. 1487 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1488 */ 1489 public void setAdvisorySupport(boolean advisorySupport) { 1490 this.advisorySupport = advisorySupport; 1491 } 1492 1493 public List<TransportConnector> getTransportConnectors() { 1494 return new ArrayList<TransportConnector>(transportConnectors); 1495 } 1496 1497 /** 1498 * Sets the transport connectors which this broker will listen on for new 1499 * clients 1500 * 1501 * @org.apache.xbean.Property 1502 * nestedType="org.apache.activemq.broker.TransportConnector" 1503 */ 1504 public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception { 1505 for (TransportConnector connector : transportConnectors) { 1506 addConnector(connector); 1507 } 1508 } 1509 1510 public TransportConnector getTransportConnectorByName(String name){ 1511 for (TransportConnector transportConnector : transportConnectors){ 1512 if (name.equals(transportConnector.getName())){ 1513 return transportConnector; 1514 } 1515 } 1516 return null; 1517 } 1518 1519 public TransportConnector getTransportConnectorByScheme(String scheme){ 1520 for (TransportConnector transportConnector : transportConnectors){ 1521 if (scheme.equals(transportConnector.getUri().getScheme())){ 1522 return transportConnector; 1523 } 1524 } 1525 return null; 1526 } 1527 1528 public List<NetworkConnector> getNetworkConnectors() { 1529 return new ArrayList<NetworkConnector>(networkConnectors); 1530 } 1531 1532 public List<ProxyConnector> getProxyConnectors() { 1533 return new ArrayList<ProxyConnector>(proxyConnectors); 1534 } 1535 1536 /** 1537 * Sets the network connectors which this broker will use to connect to 1538 * other brokers in a federated network 1539 * 1540 * @org.apache.xbean.Property 1541 * nestedType="org.apache.activemq.network.NetworkConnector" 1542 */ 1543 public void setNetworkConnectors(List<?> networkConnectors) throws Exception { 1544 for (Object connector : networkConnectors) { 1545 addNetworkConnector((NetworkConnector) connector); 1546 } 1547 } 1548 1549 /** 1550 * Sets the network connectors which this broker will use to connect to 1551 * other brokers in a federated network 1552 */ 1553 public void setProxyConnectors(List<?> proxyConnectors) throws Exception { 1554 for (Object connector : proxyConnectors) { 1555 addProxyConnector((ProxyConnector) connector); 1556 } 1557 } 1558 1559 public PolicyMap getDestinationPolicy() { 1560 return destinationPolicy; 1561 } 1562 1563 /** 1564 * Sets the destination specific policies available either for exact 1565 * destinations or for wildcard areas of destinations. 1566 */ 1567 public void setDestinationPolicy(PolicyMap policyMap) { 1568 this.destinationPolicy = policyMap; 1569 } 1570 1571 public BrokerPlugin[] getPlugins() { 1572 return plugins; 1573 } 1574 1575 /** 1576 * Sets a number of broker plugins to install such as for security 1577 * authentication or authorization 1578 */ 1579 public void setPlugins(BrokerPlugin[] plugins) { 1580 this.plugins = plugins; 1581 } 1582 1583 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 1584 return messageAuthorizationPolicy; 1585 } 1586 1587 /** 1588 * Sets the policy used to decide if the current connection is authorized to 1589 * consume a given message 1590 */ 1591 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 1592 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 1593 } 1594 1595 /** 1596 * Delete all messages from the persistent store 1597 * 1598 * @throws IOException 1599 */ 1600 public void deleteAllMessages() throws IOException { 1601 getPersistenceAdapter().deleteAllMessages(); 1602 } 1603 1604 public boolean isDeleteAllMessagesOnStartup() { 1605 return deleteAllMessagesOnStartup; 1606 } 1607 1608 /** 1609 * Sets whether or not all messages are deleted on startup - mostly only 1610 * useful for testing. 1611 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1612 */ 1613 public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) { 1614 this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup; 1615 } 1616 1617 public URI getVmConnectorURI() { 1618 if (vmConnectorURI == null) { 1619 try { 1620 vmConnectorURI = new URI("vm://" + getBrokerName()); 1621 } catch (URISyntaxException e) { 1622 LOG.error("Badly formed URI from {}", getBrokerName(), e); 1623 } 1624 } 1625 return vmConnectorURI; 1626 } 1627 1628 public void setVmConnectorURI(URI vmConnectorURI) { 1629 this.vmConnectorURI = vmConnectorURI; 1630 } 1631 1632 public String getDefaultSocketURIString() { 1633 if (started.get()) { 1634 if (this.defaultSocketURIString == null) { 1635 for (TransportConnector tc:this.transportConnectors) { 1636 String result = null; 1637 try { 1638 result = tc.getPublishableConnectString(); 1639 } catch (Exception e) { 1640 LOG.warn("Failed to get the ConnectURI for {}", tc, e); 1641 } 1642 if (result != null) { 1643 // find first publishable uri 1644 if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) { 1645 this.defaultSocketURIString = result; 1646 break; 1647 } else { 1648 // or use the first defined 1649 if (this.defaultSocketURIString == null) { 1650 this.defaultSocketURIString = result; 1651 } 1652 } 1653 } 1654 } 1655 1656 } 1657 return this.defaultSocketURIString; 1658 } 1659 return null; 1660 } 1661 1662 /** 1663 * @return Returns the shutdownOnMasterFailure. 1664 */ 1665 public boolean isShutdownOnMasterFailure() { 1666 return shutdownOnMasterFailure; 1667 } 1668 1669 /** 1670 * @param shutdownOnMasterFailure 1671 * The shutdownOnMasterFailure to set. 1672 */ 1673 public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) { 1674 this.shutdownOnMasterFailure = shutdownOnMasterFailure; 1675 } 1676 1677 public boolean isKeepDurableSubsActive() { 1678 return keepDurableSubsActive; 1679 } 1680 1681 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 1682 this.keepDurableSubsActive = keepDurableSubsActive; 1683 } 1684 1685 public boolean isUseVirtualTopics() { 1686 return useVirtualTopics; 1687 } 1688 1689 /** 1690 * Sets whether or not <a 1691 * href="http://activemq.apache.org/virtual-destinations.html">Virtual 1692 * Topics</a> should be supported by default if they have not been 1693 * explicitly configured. 1694 */ 1695 public void setUseVirtualTopics(boolean useVirtualTopics) { 1696 this.useVirtualTopics = useVirtualTopics; 1697 } 1698 1699 public DestinationInterceptor[] getDestinationInterceptors() { 1700 return destinationInterceptors; 1701 } 1702 1703 public boolean isUseMirroredQueues() { 1704 return useMirroredQueues; 1705 } 1706 1707 /** 1708 * Sets whether or not <a 1709 * href="http://activemq.apache.org/mirrored-queues.html">Mirrored 1710 * Queues</a> should be supported by default if they have not been 1711 * explicitly configured. 1712 */ 1713 public void setUseMirroredQueues(boolean useMirroredQueues) { 1714 this.useMirroredQueues = useMirroredQueues; 1715 } 1716 1717 /** 1718 * Sets the destination interceptors to use 1719 */ 1720 public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) { 1721 this.destinationInterceptors = destinationInterceptors; 1722 } 1723 1724 public ActiveMQDestination[] getDestinations() { 1725 return destinations; 1726 } 1727 1728 /** 1729 * Sets the destinations which should be loaded/created on startup 1730 */ 1731 public void setDestinations(ActiveMQDestination[] destinations) { 1732 this.destinations = destinations; 1733 } 1734 1735 /** 1736 * @return the tempDataStore 1737 */ 1738 public synchronized PListStore getTempDataStore() { 1739 if (tempDataStore == null) { 1740 if (!isPersistent()) { 1741 return null; 1742 } 1743 1744 try { 1745 PersistenceAdapter pa = getPersistenceAdapter(); 1746 if( pa!=null && pa instanceof PListStore) { 1747 return (PListStore) pa; 1748 } 1749 } catch (IOException e) { 1750 throw new RuntimeException(e); 1751 } 1752 1753 try { 1754 String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl"; 1755 this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance(); 1756 this.tempDataStore.setDirectory(getTmpDataDirectory()); 1757 configureService(tempDataStore); 1758 } catch (Exception e) { 1759 throw new RuntimeException(e); 1760 } 1761 } 1762 return tempDataStore; 1763 } 1764 1765 /** 1766 * @param tempDataStore 1767 * the tempDataStore to set 1768 */ 1769 public void setTempDataStore(PListStore tempDataStore) { 1770 this.tempDataStore = tempDataStore; 1771 if (tempDataStore != null) { 1772 if (tmpDataDirectory == null) { 1773 tmpDataDirectory = tempDataStore.getDirectory(); 1774 } else if (tempDataStore.getDirectory() == null) { 1775 tempDataStore.setDirectory(tmpDataDirectory); 1776 } 1777 } 1778 configureService(tempDataStore); 1779 } 1780 1781 public int getPersistenceThreadPriority() { 1782 return persistenceThreadPriority; 1783 } 1784 1785 public void setPersistenceThreadPriority(int persistenceThreadPriority) { 1786 this.persistenceThreadPriority = persistenceThreadPriority; 1787 } 1788 1789 /** 1790 * @return the useLocalHostBrokerName 1791 */ 1792 public boolean isUseLocalHostBrokerName() { 1793 return this.useLocalHostBrokerName; 1794 } 1795 1796 /** 1797 * @param useLocalHostBrokerName 1798 * the useLocalHostBrokerName to set 1799 */ 1800 public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) { 1801 this.useLocalHostBrokerName = useLocalHostBrokerName; 1802 if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) { 1803 brokerName = LOCAL_HOST_NAME; 1804 } 1805 } 1806 1807 /** 1808 * Looks up and lazily creates if necessary the destination for the given 1809 * JMS name 1810 */ 1811 public Destination getDestination(ActiveMQDestination destination) throws Exception { 1812 return getBroker().addDestination(getAdminConnectionContext(), destination,false); 1813 } 1814 1815 public void removeDestination(ActiveMQDestination destination) throws Exception { 1816 getBroker().removeDestination(getAdminConnectionContext(), destination, 0); 1817 } 1818 1819 public int getProducerSystemUsagePortion() { 1820 return producerSystemUsagePortion; 1821 } 1822 1823 public void setProducerSystemUsagePortion(int producerSystemUsagePortion) { 1824 this.producerSystemUsagePortion = producerSystemUsagePortion; 1825 } 1826 1827 public int getConsumerSystemUsagePortion() { 1828 return consumerSystemUsagePortion; 1829 } 1830 1831 public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) { 1832 this.consumerSystemUsagePortion = consumerSystemUsagePortion; 1833 } 1834 1835 public boolean isSplitSystemUsageForProducersConsumers() { 1836 return splitSystemUsageForProducersConsumers; 1837 } 1838 1839 public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) { 1840 this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers; 1841 } 1842 1843 public boolean isMonitorConnectionSplits() { 1844 return monitorConnectionSplits; 1845 } 1846 1847 public void setMonitorConnectionSplits(boolean monitorConnectionSplits) { 1848 this.monitorConnectionSplits = monitorConnectionSplits; 1849 } 1850 1851 public int getTaskRunnerPriority() { 1852 return taskRunnerPriority; 1853 } 1854 1855 public void setTaskRunnerPriority(int taskRunnerPriority) { 1856 this.taskRunnerPriority = taskRunnerPriority; 1857 } 1858 1859 public boolean isDedicatedTaskRunner() { 1860 return dedicatedTaskRunner; 1861 } 1862 1863 public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) { 1864 this.dedicatedTaskRunner = dedicatedTaskRunner; 1865 } 1866 1867 public boolean isCacheTempDestinations() { 1868 return cacheTempDestinations; 1869 } 1870 1871 public void setCacheTempDestinations(boolean cacheTempDestinations) { 1872 this.cacheTempDestinations = cacheTempDestinations; 1873 } 1874 1875 public int getTimeBeforePurgeTempDestinations() { 1876 return timeBeforePurgeTempDestinations; 1877 } 1878 1879 public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) { 1880 this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations; 1881 } 1882 1883 public boolean isUseTempMirroredQueues() { 1884 return useTempMirroredQueues; 1885 } 1886 1887 public void setUseTempMirroredQueues(boolean useTempMirroredQueues) { 1888 this.useTempMirroredQueues = useTempMirroredQueues; 1889 } 1890 1891 public synchronized JobSchedulerStore getJobSchedulerStore() { 1892 1893 // If support is off don't allow any scheduler even is user configured their own. 1894 if (!isSchedulerSupport()) { 1895 return null; 1896 } 1897 1898 // If the user configured their own we use it even if persistence is disabled since 1899 // we don't know anything about their implementation. 1900 if (jobSchedulerStore == null) { 1901 1902 if (!isPersistent()) { 1903 this.jobSchedulerStore = new InMemoryJobSchedulerStore(); 1904 configureService(jobSchedulerStore); 1905 return this.jobSchedulerStore; 1906 } 1907 1908 try { 1909 PersistenceAdapter pa = getPersistenceAdapter(); 1910 if (pa != null) { 1911 this.jobSchedulerStore = pa.createJobSchedulerStore(); 1912 jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); 1913 configureService(jobSchedulerStore); 1914 return this.jobSchedulerStore; 1915 } 1916 } catch (IOException e) { 1917 throw new RuntimeException(e); 1918 } catch (UnsupportedOperationException ex) { 1919 // It's ok if the store doesn't implement a scheduler. 1920 } catch (Exception e) { 1921 throw new RuntimeException(e); 1922 } 1923 1924 try { 1925 PersistenceAdapter pa = getPersistenceAdapter(); 1926 if (pa != null && pa instanceof JobSchedulerStore) { 1927 this.jobSchedulerStore = (JobSchedulerStore) pa; 1928 configureService(jobSchedulerStore); 1929 return this.jobSchedulerStore; 1930 } 1931 } catch (IOException e) { 1932 throw new RuntimeException(e); 1933 } 1934 1935 // Load the KahaDB store as a last resort, this only works if KahaDB is 1936 // included at runtime, otherwise this will fail. User should disable 1937 // scheduler support if this fails. 1938 try { 1939 String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"; 1940 PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance(); 1941 jobSchedulerStore = adaptor.createJobSchedulerStore(); 1942 jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); 1943 configureService(jobSchedulerStore); 1944 LOG.info("JobScheduler using directory: {}", getSchedulerDirectoryFile()); 1945 } catch (Exception e) { 1946 throw new RuntimeException(e); 1947 } 1948 } 1949 return jobSchedulerStore; 1950 } 1951 1952 public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) { 1953 this.jobSchedulerStore = jobSchedulerStore; 1954 configureService(jobSchedulerStore); 1955 } 1956 1957 // 1958 // Implementation methods 1959 // ------------------------------------------------------------------------- 1960 /** 1961 * Handles any lazy-creation helper properties which are added to make 1962 * things easier to configure inside environments such as Spring 1963 * 1964 * @throws Exception 1965 */ 1966 protected void processHelperProperties() throws Exception { 1967 if (transportConnectorURIs != null) { 1968 for (int i = 0; i < transportConnectorURIs.length; i++) { 1969 String uri = transportConnectorURIs[i]; 1970 addConnector(uri); 1971 } 1972 } 1973 if (networkConnectorURIs != null) { 1974 for (int i = 0; i < networkConnectorURIs.length; i++) { 1975 String uri = networkConnectorURIs[i]; 1976 addNetworkConnector(uri); 1977 } 1978 } 1979 if (jmsBridgeConnectors != null) { 1980 for (int i = 0; i < jmsBridgeConnectors.length; i++) { 1981 addJmsConnector(jmsBridgeConnectors[i]); 1982 } 1983 } 1984 } 1985 1986 /** 1987 * Check that the store usage limit is not greater than max usable 1988 * space and adjust if it is 1989 */ 1990 protected void checkStoreUsageLimits() throws Exception { 1991 final SystemUsage usage = getSystemUsage(); 1992 1993 if (getPersistenceAdapter() != null) { 1994 PersistenceAdapter adapter = getPersistenceAdapter(); 1995 checkUsageLimit(adapter.getDirectory(), usage.getStoreUsage(), usage.getStoreUsage().getPercentLimit()); 1996 1997 long maxJournalFileSize = 0; 1998 long storeLimit = usage.getStoreUsage().getLimit(); 1999 2000 if (adapter instanceof JournaledStore) { 2001 maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength(); 2002 } 2003 2004 if (storeLimit < maxJournalFileSize) { 2005 LOG.error("Store limit is " + storeLimit / (1024 * 1024) + 2006 " mb, whilst the max journal file size for the store is: " + 2007 maxJournalFileSize / (1024 * 1024) + " mb, " + 2008 "the store will not accept any data when used."); 2009 2010 } 2011 } 2012 } 2013 2014 /** 2015 * Check that temporary usage limit is not greater than max usable 2016 * space and adjust if it is 2017 */ 2018 protected void checkTmpStoreUsageLimits() throws Exception { 2019 final SystemUsage usage = getSystemUsage(); 2020 2021 File tmpDir = getTmpDataDirectory(); 2022 2023 if (tmpDir != null) { 2024 checkUsageLimit(tmpDir, usage.getTempUsage(), usage.getTempUsage().getPercentLimit()); 2025 2026 if (isPersistent()) { 2027 long maxJournalFileSize; 2028 2029 PListStore store = usage.getTempUsage().getStore(); 2030 if (store != null && store instanceof JournaledStore) { 2031 maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength(); 2032 } else { 2033 maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH; 2034 } 2035 long storeLimit = usage.getTempUsage().getLimit(); 2036 2037 if (storeLimit < maxJournalFileSize) { 2038 LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) + 2039 " mb, whilst the max journal file size for the temporary store is: " + 2040 maxJournalFileSize / (1024 * 1024) + " mb, " + 2041 "the temp store will not accept any data when used."); 2042 } 2043 } 2044 } 2045 } 2046 2047 protected void checkUsageLimit(File dir, PercentLimitUsage<?> storeUsage, int percentLimit) throws ConfigurationException { 2048 if (dir != null) { 2049 dir = StoreUtil.findParentDirectory(dir); 2050 String storeName = storeUsage instanceof StoreUsage ? "Store" : "Temporary Store"; 2051 long storeLimit = storeUsage.getLimit(); 2052 long storeCurrent = storeUsage.getUsage(); 2053 long totalSpace = storeUsage.getTotal() > 0 ? storeUsage.getTotal() : dir.getTotalSpace(); 2054 long totalUsableSpace = (storeUsage.getTotal() > 0 ? storeUsage.getTotal() : dir.getUsableSpace()) + storeCurrent; 2055 if (totalUsableSpace < 0 || totalSpace < 0) { 2056 final String message = "File system space reported by: " + dir + " was negative, possibly a huge file system, set a sane usage.total to provide some guidance"; 2057 LOG.error(message); 2058 throw new ConfigurationException(message); 2059 } 2060 //compute byte value of the percent limit 2061 long bytePercentLimit = totalSpace * percentLimit / 100; 2062 int oneMeg = 1024 * 1024; 2063 2064 //Check if the store limit is less than the percent Limit that was set and also 2065 //the usable space...this means we can grow the store larger 2066 //Changes in partition size (total space) as well as changes in usable space should 2067 //be detected here 2068 if (diskUsageCheckRegrowThreshold > -1 && percentLimit > 0 2069 && storeUsage.getTotal() == 0 2070 && storeLimit < bytePercentLimit && storeLimit < totalUsableSpace){ 2071 2072 // set the limit to be bytePercentLimit or usableSpace if 2073 // usableSpace is less than the percentLimit 2074 long newLimit = bytePercentLimit > totalUsableSpace ? totalUsableSpace : bytePercentLimit; 2075 2076 //To prevent changing too often, check threshold 2077 if (newLimit - storeLimit >= diskUsageCheckRegrowThreshold) { 2078 LOG.info("Usable disk space has been increased, attempting to regrow " + storeName + " limit to " 2079 + percentLimit + "% of the partition size."); 2080 storeUsage.setLimit(newLimit); 2081 LOG.info(storeName + " limit has been increased to " + newLimit * 100 / totalSpace 2082 + "% (" + newLimit / oneMeg + " mb) of the partition size."); 2083 } 2084 2085 //check if the limit is too large for the amount of usable space 2086 } else if (storeLimit > totalUsableSpace) { 2087 final String message = storeName + " limit is " + storeLimit / oneMeg 2088 + " mb (current store usage is " + storeCurrent / oneMeg 2089 + " mb). The data directory: " + dir.getAbsolutePath() 2090 + " only has " + totalUsableSpace / oneMeg 2091 + " mb of usable space."; 2092 2093 if (!isAdjustUsageLimits()) { 2094 LOG.error(message); 2095 throw new ConfigurationException(message); 2096 } 2097 2098 if (percentLimit > 0) { 2099 LOG.warn(storeName + " limit has been set to " 2100 + percentLimit + "% (" + bytePercentLimit / oneMeg + " mb)" 2101 + " of the partition size but there is not enough usable space." 2102 + " The current store limit (which may have been adjusted by a" 2103 + " previous usage limit check) is set to (" + storeLimit / oneMeg + " mb)" 2104 + " but only " + totalUsableSpace * 100 / totalSpace + "% (" + totalUsableSpace / oneMeg + " mb)" 2105 + " is available - resetting limit"); 2106 } else { 2107 LOG.warn(message + " - resetting to maximum available disk space: " + 2108 totalUsableSpace / oneMeg + " mb"); 2109 } 2110 storeUsage.setLimit(totalUsableSpace); 2111 } 2112 } 2113 } 2114 2115 /** 2116 * Schedules a periodic task based on schedulePeriodForDiskLimitCheck to 2117 * update store and temporary store limits if the amount of available space 2118 * plus current store size is less than the existin configured limit 2119 */ 2120 protected void scheduleDiskUsageLimitsCheck() throws IOException { 2121 if (schedulePeriodForDiskUsageCheck > 0 && 2122 (getPersistenceAdapter() != null || getTmpDataDirectory() != null)) { 2123 Runnable diskLimitCheckTask = new Runnable() { 2124 @Override 2125 public void run() { 2126 try { 2127 checkStoreUsageLimits(); 2128 } catch (Exception e) { 2129 LOG.error("Failed to check persistent disk usage limits", e); 2130 } 2131 2132 try { 2133 checkTmpStoreUsageLimits(); 2134 } catch (Exception e) { 2135 LOG.error("Failed to check temporary store usage limits", e); 2136 } 2137 } 2138 }; 2139 scheduler.executePeriodically(diskLimitCheckTask, schedulePeriodForDiskUsageCheck); 2140 } 2141 } 2142 2143 protected void checkMemorySystemUsageLimits() throws Exception { 2144 final SystemUsage usage = getSystemUsage(); 2145 long memLimit = usage.getMemoryUsage().getLimit(); 2146 long jvmLimit = Runtime.getRuntime().maxMemory(); 2147 2148 if (memLimit > jvmLimit) { 2149 final String message = "Memory Usage for the Broker (" + memLimit / (1024 * 1024) 2150 + "mb) is more than the maximum available for the JVM: " + jvmLimit / (1024 * 1024); 2151 2152 if (adjustUsageLimits) { 2153 usage.getMemoryUsage().setPercentOfJvmHeap(70); 2154 LOG.warn(message + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb"); 2155 } else { 2156 LOG.error(message); 2157 throw new ConfigurationException(message); 2158 } 2159 } 2160 } 2161 2162 protected void checkStoreSystemUsageLimits() throws Exception { 2163 final SystemUsage usage = getSystemUsage(); 2164 2165 //Check the persistent store and temp store limits if they exist 2166 //and schedule a periodic check to update disk limits if 2167 //schedulePeriodForDiskLimitCheck is set 2168 checkStoreUsageLimits(); 2169 checkTmpStoreUsageLimits(); 2170 scheduleDiskUsageLimitsCheck(); 2171 2172 if (getJobSchedulerStore() != null) { 2173 JobSchedulerStore scheduler = getJobSchedulerStore(); 2174 File schedulerDir = scheduler.getDirectory(); 2175 if (schedulerDir != null) { 2176 2177 String schedulerDirPath = schedulerDir.getAbsolutePath(); 2178 if (!schedulerDir.isAbsolute()) { 2179 schedulerDir = new File(schedulerDirPath); 2180 } 2181 2182 while (schedulerDir != null && !schedulerDir.isDirectory()) { 2183 schedulerDir = schedulerDir.getParentFile(); 2184 } 2185 long schedulerLimit = usage.getJobSchedulerUsage().getLimit(); 2186 long dirFreeSpace = schedulerDir.getUsableSpace(); 2187 if (schedulerLimit > dirFreeSpace) { 2188 LOG.warn("Job Scheduler Store limit is " + schedulerLimit / (1024 * 1024) + 2189 " mb, whilst the data directory: " + schedulerDir.getAbsolutePath() + 2190 " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to " + 2191 dirFreeSpace / (1024 * 1024) + " mb."); 2192 usage.getJobSchedulerUsage().setLimit(dirFreeSpace); 2193 } 2194 } 2195 } 2196 } 2197 2198 public void stopAllConnectors(ServiceStopper stopper) { 2199 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { 2200 NetworkConnector connector = iter.next(); 2201 unregisterNetworkConnectorMBean(connector); 2202 stopper.stop(connector); 2203 } 2204 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { 2205 ProxyConnector connector = iter.next(); 2206 stopper.stop(connector); 2207 } 2208 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { 2209 JmsConnector connector = iter.next(); 2210 stopper.stop(connector); 2211 } 2212 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { 2213 TransportConnector connector = iter.next(); 2214 try { 2215 unregisterConnectorMBean(connector); 2216 } catch (IOException e) { 2217 } 2218 stopper.stop(connector); 2219 } 2220 } 2221 2222 protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException { 2223 try { 2224 ObjectName objectName = createConnectorObjectName(connector); 2225 connector = connector.asManagedConnector(getManagementContext(), objectName); 2226 ConnectorViewMBean view = new ConnectorView(connector); 2227 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2228 return connector; 2229 } catch (Throwable e) { 2230 throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e, e); 2231 } 2232 } 2233 2234 protected void unregisterConnectorMBean(TransportConnector connector) throws IOException { 2235 if (isUseJmx()) { 2236 try { 2237 ObjectName objectName = createConnectorObjectName(connector); 2238 getManagementContext().unregisterMBean(objectName); 2239 } catch (Throwable e) { 2240 throw IOExceptionSupport.create( 2241 "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e); 2242 } 2243 } 2244 } 2245 2246 protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { 2247 return adaptor; 2248 } 2249 2250 protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { 2251 if (isUseJmx()) {} 2252 } 2253 2254 private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException { 2255 return BrokerMBeanSupport.createConnectorName(getBrokerObjectName(), "clientConnectors", connector.getName()); 2256 } 2257 2258 public void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException { 2259 NetworkConnectorViewMBean view = new NetworkConnectorView(connector); 2260 try { 2261 ObjectName objectName = createNetworkConnectorObjectName(connector); 2262 connector.setObjectName(objectName); 2263 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2264 } catch (Throwable e) { 2265 throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e); 2266 } 2267 } 2268 2269 protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException { 2270 return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName()); 2271 } 2272 2273 public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException { 2274 return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "duplexNetworkConnectors", transport); 2275 } 2276 2277 protected void unregisterNetworkConnectorMBean(NetworkConnector connector) { 2278 if (isUseJmx()) { 2279 try { 2280 ObjectName objectName = createNetworkConnectorObjectName(connector); 2281 getManagementContext().unregisterMBean(objectName); 2282 } catch (Exception e) { 2283 LOG.warn("Network Connector could not be unregistered from JMX due " + e.getMessage() + ". This exception is ignored.", e); 2284 } 2285 } 2286 } 2287 2288 protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException { 2289 ProxyConnectorView view = new ProxyConnectorView(connector); 2290 try { 2291 ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "proxyConnectors", connector.getName()); 2292 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2293 } catch (Throwable e) { 2294 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 2295 } 2296 } 2297 2298 protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException { 2299 JmsConnectorView view = new JmsConnectorView(connector); 2300 try { 2301 ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "jmsConnectors", connector.getName()); 2302 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2303 } catch (Throwable e) { 2304 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 2305 } 2306 } 2307 2308 /** 2309 * Factory method to create a new broker 2310 * 2311 * @throws Exception 2312 */ 2313 protected Broker createBroker() throws Exception { 2314 regionBroker = createRegionBroker(); 2315 Broker broker = addInterceptors(regionBroker); 2316 // Add a filter that will stop access to the broker once stopped 2317 broker = new MutableBrokerFilter(broker) { 2318 Broker old; 2319 2320 @Override 2321 public void stop() throws Exception { 2322 old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) { 2323 // Just ignore additional stop actions. 2324 @Override 2325 public void stop() throws Exception { 2326 } 2327 }); 2328 old.stop(); 2329 } 2330 2331 @Override 2332 public void start() throws Exception { 2333 if (forceStart && old != null) { 2334 this.next.set(old); 2335 } 2336 getNext().start(); 2337 } 2338 }; 2339 return broker; 2340 } 2341 2342 /** 2343 * Factory method to create the core region broker onto which interceptors 2344 * are added 2345 * 2346 * @throws Exception 2347 */ 2348 protected Broker createRegionBroker() throws Exception { 2349 if (destinationInterceptors == null) { 2350 destinationInterceptors = createDefaultDestinationInterceptor(); 2351 } 2352 configureServices(destinationInterceptors); 2353 DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors); 2354 if (destinationFactory == null) { 2355 destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter()); 2356 } 2357 return createRegionBroker(destinationInterceptor); 2358 } 2359 2360 protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException { 2361 RegionBroker regionBroker; 2362 if (isUseJmx()) { 2363 try { 2364 regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(), 2365 getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor()); 2366 } catch(MalformedObjectNameException me){ 2367 LOG.warn("Cannot create ManagedRegionBroker due " + me.getMessage(), me); 2368 throw new IOException(me); 2369 } 2370 } else { 2371 regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, 2372 destinationInterceptor,getScheduler(),getExecutor()); 2373 } 2374 destinationFactory.setRegionBroker(regionBroker); 2375 regionBroker.setKeepDurableSubsActive(keepDurableSubsActive); 2376 regionBroker.setBrokerName(getBrokerName()); 2377 regionBroker.getDestinationStatistics().setEnabled(enableStatistics); 2378 regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend()); 2379 if (brokerId != null) { 2380 regionBroker.setBrokerId(brokerId); 2381 } 2382 return regionBroker; 2383 } 2384 2385 /** 2386 * Create the default destination interceptor 2387 */ 2388 protected DestinationInterceptor[] createDefaultDestinationInterceptor() { 2389 List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>(); 2390 if (isUseVirtualTopics()) { 2391 VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); 2392 VirtualTopic virtualTopic = new VirtualTopic(); 2393 virtualTopic.setName("VirtualTopic.>"); 2394 VirtualDestination[] virtualDestinations = { virtualTopic }; 2395 interceptor.setVirtualDestinations(virtualDestinations); 2396 answer.add(interceptor); 2397 } 2398 if (isUseMirroredQueues()) { 2399 MirroredQueue interceptor = new MirroredQueue(); 2400 answer.add(interceptor); 2401 } 2402 DestinationInterceptor[] array = new DestinationInterceptor[answer.size()]; 2403 answer.toArray(array); 2404 return array; 2405 } 2406 2407 /** 2408 * Strategy method to add interceptors to the broker 2409 * 2410 * @throws IOException 2411 */ 2412 protected Broker addInterceptors(Broker broker) throws Exception { 2413 if (isSchedulerSupport()) { 2414 SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore()); 2415 if (isUseJmx()) { 2416 JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler()); 2417 try { 2418 ObjectName objectName = BrokerMBeanSupport.createJobSchedulerServiceName(getBrokerObjectName()); 2419 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2420 this.adminView.setJMSJobScheduler(objectName); 2421 } catch (Throwable e) { 2422 throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: " 2423 + e.getMessage(), e); 2424 } 2425 } 2426 broker = sb; 2427 } 2428 if (isUseJmx()) { 2429 HealthViewMBean statusView = new HealthView((ManagedRegionBroker)getRegionBroker()); 2430 try { 2431 ObjectName objectName = BrokerMBeanSupport.createHealthServiceName(getBrokerObjectName()); 2432 AnnotatedMBean.registerMBean(getManagementContext(), statusView, objectName); 2433 } catch (Throwable e) { 2434 throw IOExceptionSupport.create("Status MBean could not be registered in JMX: " 2435 + e.getMessage(), e); 2436 } 2437 } 2438 if (isAdvisorySupport()) { 2439 broker = new AdvisoryBroker(broker); 2440 } 2441 broker = new CompositeDestinationBroker(broker); 2442 broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore()); 2443 if (isPopulateJMSXUserID()) { 2444 UserIDBroker userIDBroker = new UserIDBroker(broker); 2445 userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID()); 2446 broker = userIDBroker; 2447 } 2448 if (isMonitorConnectionSplits()) { 2449 broker = new ConnectionSplitBroker(broker); 2450 } 2451 if (plugins != null) { 2452 for (int i = 0; i < plugins.length; i++) { 2453 BrokerPlugin plugin = plugins[i]; 2454 broker = plugin.installPlugin(broker); 2455 } 2456 } 2457 return broker; 2458 } 2459 2460 protected PersistenceAdapter createPersistenceAdapter() throws IOException { 2461 if (isPersistent()) { 2462 PersistenceAdapterFactory fac = getPersistenceFactory(); 2463 if (fac != null) { 2464 return fac.createPersistenceAdapter(); 2465 } else { 2466 try { 2467 String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"; 2468 PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance(); 2469 File dir = new File(getBrokerDataDirectory(),"KahaDB"); 2470 adaptor.setDirectory(dir); 2471 return adaptor; 2472 } catch (Throwable e) { 2473 throw IOExceptionSupport.create(e); 2474 } 2475 } 2476 } else { 2477 return new MemoryPersistenceAdapter(); 2478 } 2479 } 2480 2481 protected ObjectName createBrokerObjectName() throws MalformedObjectNameException { 2482 return BrokerMBeanSupport.createBrokerObjectName(getManagementContext().getJmxDomainName(), getBrokerName()); 2483 } 2484 2485 protected TransportConnector createTransportConnector(URI brokerURI) throws Exception { 2486 TransportServer transport = TransportFactorySupport.bind(this, brokerURI); 2487 return new TransportConnector(transport); 2488 } 2489 2490 /** 2491 * Extracts the port from the options 2492 */ 2493 protected Object getPort(Map<?,?> options) { 2494 Object port = options.get("port"); 2495 if (port == null) { 2496 port = DEFAULT_PORT; 2497 LOG.warn("No port specified so defaulting to: {}", port); 2498 } 2499 return port; 2500 } 2501 2502 protected void addShutdownHook() { 2503 if (useShutdownHook) { 2504 shutdownHook = new Thread("ActiveMQ ShutdownHook") { 2505 @Override 2506 public void run() { 2507 containerShutdown(); 2508 } 2509 }; 2510 Runtime.getRuntime().addShutdownHook(shutdownHook); 2511 } 2512 } 2513 2514 protected void removeShutdownHook() { 2515 if (shutdownHook != null) { 2516 try { 2517 Runtime.getRuntime().removeShutdownHook(shutdownHook); 2518 } catch (Exception e) { 2519 LOG.debug("Caught exception, must be shutting down. This exception is ignored.", e); 2520 } 2521 } 2522 } 2523 2524 /** 2525 * Sets hooks to be executed when broker shut down 2526 * 2527 * @org.apache.xbean.Property 2528 */ 2529 public void setShutdownHooks(List<Runnable> hooks) throws Exception { 2530 for (Runnable hook : hooks) { 2531 addShutdownHook(hook); 2532 } 2533 } 2534 2535 /** 2536 * Causes a clean shutdown of the container when the VM is being shut down 2537 */ 2538 protected void containerShutdown() { 2539 try { 2540 stop(); 2541 } catch (IOException e) { 2542 Throwable linkedException = e.getCause(); 2543 if (linkedException != null) { 2544 logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException); 2545 } else { 2546 logError("Failed to shut down: " + e, e); 2547 } 2548 if (!useLoggingForShutdownErrors) { 2549 e.printStackTrace(System.err); 2550 } 2551 } catch (Exception e) { 2552 logError("Failed to shut down: " + e, e); 2553 } 2554 } 2555 2556 protected void logError(String message, Throwable e) { 2557 if (useLoggingForShutdownErrors) { 2558 LOG.error("Failed to shut down: " + e); 2559 } else { 2560 System.err.println("Failed to shut down: " + e); 2561 } 2562 } 2563 2564 /** 2565 * Starts any configured destinations on startup 2566 */ 2567 protected void startDestinations() throws Exception { 2568 if (destinations != null) { 2569 ConnectionContext adminConnectionContext = getAdminConnectionContext(); 2570 for (int i = 0; i < destinations.length; i++) { 2571 ActiveMQDestination destination = destinations[i]; 2572 getBroker().addDestination(adminConnectionContext, destination,true); 2573 } 2574 } 2575 if (isUseVirtualTopics()) { 2576 startVirtualConsumerDestinations(); 2577 } 2578 } 2579 2580 /** 2581 * Returns the broker's administration connection context used for 2582 * configuring the broker at startup 2583 */ 2584 public ConnectionContext getAdminConnectionContext() throws Exception { 2585 return BrokerSupport.getConnectionContext(getBroker()); 2586 } 2587 2588 protected void startManagementContext() throws Exception { 2589 getManagementContext().setBrokerName(brokerName); 2590 getManagementContext().start(); 2591 adminView = new BrokerView(this, null); 2592 ObjectName objectName = getBrokerObjectName(); 2593 AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName); 2594 } 2595 2596 /** 2597 * Start all transport and network connections, proxies and bridges 2598 * 2599 * @throws Exception 2600 */ 2601 public void startAllConnectors() throws Exception { 2602 Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations(); 2603 List<TransportConnector> al = new ArrayList<TransportConnector>(); 2604 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { 2605 TransportConnector connector = iter.next(); 2606 al.add(startTransportConnector(connector)); 2607 } 2608 if (al.size() > 0) { 2609 // let's clear the transportConnectors list and replace it with 2610 // the started transportConnector instances 2611 this.transportConnectors.clear(); 2612 setTransportConnectors(al); 2613 } 2614 this.slave = false; 2615 if (!stopped.get()) { 2616 ThreadPoolExecutor networkConnectorStartExecutor = null; 2617 if (isNetworkConnectorStartAsync()) { 2618 // spin up as many threads as needed 2619 networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 2620 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 2621 new ThreadFactory() { 2622 int count=0; 2623 @Override 2624 public Thread newThread(Runnable runnable) { 2625 Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++)); 2626 thread.setDaemon(true); 2627 return thread; 2628 } 2629 }); 2630 } 2631 2632 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { 2633 final NetworkConnector connector = iter.next(); 2634 connector.setLocalUri(getVmConnectorURI()); 2635 connector.setBrokerName(getBrokerName()); 2636 connector.setDurableDestinations(durableDestinations); 2637 if (getDefaultSocketURIString() != null) { 2638 connector.setBrokerURL(getDefaultSocketURIString()); 2639 } 2640 if (networkConnectorStartExecutor != null) { 2641 networkConnectorStartExecutor.execute(new Runnable() { 2642 @Override 2643 public void run() { 2644 try { 2645 LOG.info("Async start of {}", connector); 2646 connector.start(); 2647 } catch(Exception e) { 2648 LOG.error("Async start of network connector: {} failed", connector, e); 2649 } 2650 } 2651 }); 2652 } else { 2653 connector.start(); 2654 } 2655 } 2656 if (networkConnectorStartExecutor != null) { 2657 // executor done when enqueued tasks are complete 2658 ThreadPoolUtils.shutdown(networkConnectorStartExecutor); 2659 } 2660 2661 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { 2662 ProxyConnector connector = iter.next(); 2663 connector.start(); 2664 } 2665 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { 2666 JmsConnector connector = iter.next(); 2667 connector.start(); 2668 } 2669 for (Service service : services) { 2670 configureService(service); 2671 service.start(); 2672 } 2673 } 2674 } 2675 2676 public TransportConnector startTransportConnector(TransportConnector connector) throws Exception { 2677 connector.setBrokerService(this); 2678 connector.setTaskRunnerFactory(getTaskRunnerFactory()); 2679 MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy(); 2680 if (policy != null) { 2681 connector.setMessageAuthorizationPolicy(policy); 2682 } 2683 if (isUseJmx()) { 2684 connector = registerConnectorMBean(connector); 2685 } 2686 connector.getStatistics().setEnabled(enableStatistics); 2687 connector.start(); 2688 return connector; 2689 } 2690 2691 /** 2692 * Perform any custom dependency injection 2693 */ 2694 protected void configureServices(Object[] services) { 2695 for (Object service : services) { 2696 configureService(service); 2697 } 2698 } 2699 2700 /** 2701 * Perform any custom dependency injection 2702 */ 2703 protected void configureService(Object service) { 2704 if (service instanceof BrokerServiceAware) { 2705 BrokerServiceAware serviceAware = (BrokerServiceAware) service; 2706 serviceAware.setBrokerService(this); 2707 } 2708 } 2709 2710 public void handleIOException(IOException exception) { 2711 if (ioExceptionHandler != null) { 2712 ioExceptionHandler.handle(exception); 2713 } else { 2714 LOG.info("No IOExceptionHandler registered, ignoring IO exception", exception); 2715 } 2716 } 2717 2718 protected void startVirtualConsumerDestinations() throws Exception { 2719 checkStartException(); 2720 ConnectionContext adminConnectionContext = getAdminConnectionContext(); 2721 Set<ActiveMQDestination> destinations = destinationFactory.getDestinations(); 2722 DestinationFilter filter = getVirtualTopicConsumerDestinationFilter(); 2723 if (!destinations.isEmpty()) { 2724 for (ActiveMQDestination destination : destinations) { 2725 if (filter.matches(destination) == true) { 2726 broker.addDestination(adminConnectionContext, destination, false); 2727 } 2728 } 2729 } 2730 } 2731 2732 private DestinationFilter getVirtualTopicConsumerDestinationFilter() { 2733 // created at startup, so no sync needed 2734 if (virtualConsumerDestinationFilter == null) { 2735 Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>(); 2736 if (destinationInterceptors != null) { 2737 for (DestinationInterceptor interceptor : destinationInterceptors) { 2738 if (interceptor instanceof VirtualDestinationInterceptor) { 2739 VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor; 2740 for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) { 2741 if (virtualDestination instanceof VirtualTopic) { 2742 consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT)); 2743 } 2744 if (isUseVirtualDestSubs()) { 2745 try { 2746 broker.virtualDestinationAdded(getAdminConnectionContext(), virtualDestination); 2747 LOG.debug("Adding virtual destination: {}", virtualDestination); 2748 } catch (Exception e) { 2749 LOG.warn("Could not fire virtual destination consumer advisory", e); 2750 } 2751 } 2752 } 2753 } 2754 } 2755 } 2756 ActiveMQQueue filter = new ActiveMQQueue(); 2757 filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{})); 2758 virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter); 2759 } 2760 return virtualConsumerDestinationFilter; 2761 } 2762 2763 protected synchronized ThreadPoolExecutor getExecutor() { 2764 if (this.executor == null) { 2765 this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 2766 2767 private long i = 0; 2768 2769 @Override 2770 public Thread newThread(Runnable runnable) { 2771 this.i++; 2772 Thread thread = new Thread(runnable, "ActiveMQ BrokerService.worker." + this.i); 2773 thread.setDaemon(true); 2774 thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 2775 @Override 2776 public void uncaughtException(final Thread t, final Throwable e) { 2777 LOG.error("Error in thread '{}'", t.getName(), e); 2778 } 2779 }); 2780 return thread; 2781 } 2782 }, new RejectedExecutionHandler() { 2783 @Override 2784 public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { 2785 try { 2786 executor.getQueue().offer(r, 60, TimeUnit.SECONDS); 2787 } catch (InterruptedException e) { 2788 throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker"); 2789 } 2790 2791 throw new RejectedExecutionException("Timed Out while attempting to enqueue Task."); 2792 } 2793 }); 2794 } 2795 return this.executor; 2796 } 2797 2798 public synchronized Scheduler getScheduler() { 2799 if (this.scheduler==null) { 2800 this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler"); 2801 try { 2802 this.scheduler.start(); 2803 } catch (Exception e) { 2804 LOG.error("Failed to start Scheduler", e); 2805 } 2806 } 2807 return this.scheduler; 2808 } 2809 2810 public Broker getRegionBroker() { 2811 return regionBroker; 2812 } 2813 2814 public void setRegionBroker(Broker regionBroker) { 2815 this.regionBroker = regionBroker; 2816 } 2817 2818 public void addShutdownHook(Runnable hook) { 2819 synchronized (shutdownHooks) { 2820 shutdownHooks.add(hook); 2821 } 2822 } 2823 2824 public void removeShutdownHook(Runnable hook) { 2825 synchronized (shutdownHooks) { 2826 shutdownHooks.remove(hook); 2827 } 2828 } 2829 2830 public boolean isSystemExitOnShutdown() { 2831 return systemExitOnShutdown; 2832 } 2833 2834 /** 2835 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2836 */ 2837 public void setSystemExitOnShutdown(boolean systemExitOnShutdown) { 2838 this.systemExitOnShutdown = systemExitOnShutdown; 2839 } 2840 2841 public int getSystemExitOnShutdownExitCode() { 2842 return systemExitOnShutdownExitCode; 2843 } 2844 2845 public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) { 2846 this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode; 2847 } 2848 2849 public SslContext getSslContext() { 2850 return sslContext; 2851 } 2852 2853 public void setSslContext(SslContext sslContext) { 2854 this.sslContext = sslContext; 2855 } 2856 2857 public boolean isShutdownOnSlaveFailure() { 2858 return shutdownOnSlaveFailure; 2859 } 2860 2861 /** 2862 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2863 */ 2864 public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) { 2865 this.shutdownOnSlaveFailure = shutdownOnSlaveFailure; 2866 } 2867 2868 public boolean isWaitForSlave() { 2869 return waitForSlave; 2870 } 2871 2872 /** 2873 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2874 */ 2875 public void setWaitForSlave(boolean waitForSlave) { 2876 this.waitForSlave = waitForSlave; 2877 } 2878 2879 public long getWaitForSlaveTimeout() { 2880 return this.waitForSlaveTimeout; 2881 } 2882 2883 public void setWaitForSlaveTimeout(long waitForSlaveTimeout) { 2884 this.waitForSlaveTimeout = waitForSlaveTimeout; 2885 } 2886 2887 /** 2888 * Get the passiveSlave 2889 * @return the passiveSlave 2890 */ 2891 public boolean isPassiveSlave() { 2892 return this.passiveSlave; 2893 } 2894 2895 /** 2896 * Set the passiveSlave 2897 * @param passiveSlave the passiveSlave to set 2898 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2899 */ 2900 public void setPassiveSlave(boolean passiveSlave) { 2901 this.passiveSlave = passiveSlave; 2902 } 2903 2904 /** 2905 * override the Default IOException handler, called when persistence adapter 2906 * has experiences File or JDBC I/O Exceptions 2907 * 2908 * @param ioExceptionHandler 2909 */ 2910 public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) { 2911 configureService(ioExceptionHandler); 2912 this.ioExceptionHandler = ioExceptionHandler; 2913 } 2914 2915 public IOExceptionHandler getIoExceptionHandler() { 2916 return ioExceptionHandler; 2917 } 2918 2919 /** 2920 * @return the schedulerSupport 2921 */ 2922 public boolean isSchedulerSupport() { 2923 return this.schedulerSupport; 2924 } 2925 2926 /** 2927 * @param schedulerSupport the schedulerSupport to set 2928 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2929 */ 2930 public void setSchedulerSupport(boolean schedulerSupport) { 2931 this.schedulerSupport = schedulerSupport; 2932 } 2933 2934 /** 2935 * @return the schedulerDirectory 2936 */ 2937 public File getSchedulerDirectoryFile() { 2938 if (this.schedulerDirectoryFile == null) { 2939 this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler"); 2940 } 2941 return schedulerDirectoryFile; 2942 } 2943 2944 /** 2945 * @param schedulerDirectory the schedulerDirectory to set 2946 */ 2947 public void setSchedulerDirectoryFile(File schedulerDirectory) { 2948 this.schedulerDirectoryFile = schedulerDirectory; 2949 } 2950 2951 public void setSchedulerDirectory(String schedulerDirectory) { 2952 setSchedulerDirectoryFile(new File(schedulerDirectory)); 2953 } 2954 2955 public int getSchedulePeriodForDestinationPurge() { 2956 return this.schedulePeriodForDestinationPurge; 2957 } 2958 2959 public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) { 2960 this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge; 2961 } 2962 2963 /** 2964 * @param schedulePeriodForDiskUsageCheck 2965 */ 2966 public void setSchedulePeriodForDiskUsageCheck( 2967 int schedulePeriodForDiskUsageCheck) { 2968 this.schedulePeriodForDiskUsageCheck = schedulePeriodForDiskUsageCheck; 2969 } 2970 2971 public int getDiskUsageCheckRegrowThreshold() { 2972 return diskUsageCheckRegrowThreshold; 2973 } 2974 2975 /** 2976 * @param diskUsageCheckRegrowThreshold 2977 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 2978 */ 2979 public void setDiskUsageCheckRegrowThreshold(int diskUsageCheckRegrowThreshold) { 2980 this.diskUsageCheckRegrowThreshold = diskUsageCheckRegrowThreshold; 2981 } 2982 2983 public int getMaxPurgedDestinationsPerSweep() { 2984 return this.maxPurgedDestinationsPerSweep; 2985 } 2986 2987 public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) { 2988 this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep; 2989 } 2990 2991 public BrokerContext getBrokerContext() { 2992 return brokerContext; 2993 } 2994 2995 public void setBrokerContext(BrokerContext brokerContext) { 2996 this.brokerContext = brokerContext; 2997 } 2998 2999 public void setBrokerId(String brokerId) { 3000 this.brokerId = new BrokerId(brokerId); 3001 } 3002 3003 public boolean isUseAuthenticatedPrincipalForJMSXUserID() { 3004 return useAuthenticatedPrincipalForJMSXUserID; 3005 } 3006 3007 public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) { 3008 this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID; 3009 } 3010 3011 /** 3012 * Should MBeans that support showing the Authenticated User Name information have this 3013 * value filled in or not. 3014 * 3015 * @return true if user names should be exposed in MBeans 3016 */ 3017 public boolean isPopulateUserNameInMBeans() { 3018 return this.populateUserNameInMBeans; 3019 } 3020 3021 /** 3022 * Sets whether Authenticated User Name information is shown in MBeans that support this field. 3023 * @param value if MBeans should expose user name information. 3024 */ 3025 public void setPopulateUserNameInMBeans(boolean value) { 3026 this.populateUserNameInMBeans = value; 3027 } 3028 3029 /** 3030 * Gets the time in Milliseconds that an invocation of an MBean method will wait before 3031 * failing. The default value is to wait forever (zero). 3032 * 3033 * @return timeout in milliseconds before MBean calls fail, (default is 0 or no timeout). 3034 */ 3035 public long getMbeanInvocationTimeout() { 3036 return mbeanInvocationTimeout; 3037 } 3038 3039 /** 3040 * Gets the time in Milliseconds that an invocation of an MBean method will wait before 3041 * failing. The default value is to wait forever (zero). 3042 * 3043 * @param mbeanInvocationTimeout 3044 * timeout in milliseconds before MBean calls fail, (default is 0 or no timeout). 3045 */ 3046 public void setMbeanInvocationTimeout(long mbeanInvocationTimeout) { 3047 this.mbeanInvocationTimeout = mbeanInvocationTimeout; 3048 } 3049 3050 public boolean isNetworkConnectorStartAsync() { 3051 return networkConnectorStartAsync; 3052 } 3053 3054 public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) { 3055 this.networkConnectorStartAsync = networkConnectorStartAsync; 3056 } 3057 3058 public boolean isAllowTempAutoCreationOnSend() { 3059 return allowTempAutoCreationOnSend; 3060 } 3061 3062 /** 3063 * enable if temp destinations need to be propagated through a network when 3064 * advisorySupport==false. This is used in conjunction with the policy 3065 * gcInactiveDestinations for matching temps so they can get removed 3066 * when inactive 3067 * 3068 * @param allowTempAutoCreationOnSend 3069 */ 3070 public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) { 3071 this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend; 3072 } 3073 3074 public long getOfflineDurableSubscriberTimeout() { 3075 return offlineDurableSubscriberTimeout; 3076 } 3077 3078 public void setOfflineDurableSubscriberTimeout(long offlineDurableSubscriberTimeout) { 3079 this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout; 3080 } 3081 3082 public long getOfflineDurableSubscriberTaskSchedule() { 3083 return offlineDurableSubscriberTaskSchedule; 3084 } 3085 3086 public void setOfflineDurableSubscriberTaskSchedule(long offlineDurableSubscriberTaskSchedule) { 3087 this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule; 3088 } 3089 3090 public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) { 3091 return isUseVirtualTopics() && destination.isQueue() && 3092 getVirtualTopicConsumerDestinationFilter().matches(destination); 3093 } 3094 3095 synchronized public Throwable getStartException() { 3096 return startException; 3097 } 3098 3099 public boolean isStartAsync() { 3100 return startAsync; 3101 } 3102 3103 public void setStartAsync(boolean startAsync) { 3104 this.startAsync = startAsync; 3105 } 3106 3107 public boolean isSlave() { 3108 return this.slave; 3109 } 3110 3111 public boolean isStopping() { 3112 return this.stopping.get(); 3113 } 3114 3115 /** 3116 * @return true if the broker allowed to restart on shutdown. 3117 */ 3118 public boolean isRestartAllowed() { 3119 return restartAllowed; 3120 } 3121 3122 /** 3123 * Sets if the broker allowed to restart on shutdown. 3124 */ 3125 public void setRestartAllowed(boolean restartAllowed) { 3126 this.restartAllowed = restartAllowed; 3127 } 3128 3129 /** 3130 * A lifecycle manager of the BrokerService should 3131 * inspect this property after a broker shutdown has occurred 3132 * to find out if the broker needs to be re-created and started 3133 * again. 3134 * 3135 * @return true if the broker wants to be restarted after it shuts down. 3136 */ 3137 public boolean isRestartRequested() { 3138 return restartRequested; 3139 } 3140 3141 public void requestRestart() { 3142 this.restartRequested = true; 3143 } 3144 3145 public int getStoreOpenWireVersion() { 3146 return storeOpenWireVersion; 3147 } 3148 3149 public void setStoreOpenWireVersion(int storeOpenWireVersion) { 3150 this.storeOpenWireVersion = storeOpenWireVersion; 3151 } 3152 3153 /** 3154 * @return the current number of connections on this Broker. 3155 */ 3156 public int getCurrentConnections() { 3157 return this.currentConnections.get(); 3158 } 3159 3160 /** 3161 * @return the total number of connections this broker has handled since startup. 3162 */ 3163 public long getTotalConnections() { 3164 return this.totalConnections.get(); 3165 } 3166 3167 public void incrementCurrentConnections() { 3168 this.currentConnections.incrementAndGet(); 3169 } 3170 3171 public void decrementCurrentConnections() { 3172 this.currentConnections.decrementAndGet(); 3173 } 3174 3175 public void incrementTotalConnections() { 3176 this.totalConnections.incrementAndGet(); 3177 } 3178 3179 public boolean isRejectDurableConsumers() { 3180 return rejectDurableConsumers; 3181 } 3182 3183 public void setRejectDurableConsumers(boolean rejectDurableConsumers) { 3184 this.rejectDurableConsumers = rejectDurableConsumers; 3185 } 3186 3187 public boolean isAdjustUsageLimits() { 3188 return adjustUsageLimits; 3189 } 3190 3191 public void setAdjustUsageLimits(boolean adjustUsageLimits) { 3192 this.adjustUsageLimits = adjustUsageLimits; 3193 } 3194 3195 public void setRollbackOnlyOnAsyncException(boolean rollbackOnlyOnAsyncException) { 3196 this.rollbackOnlyOnAsyncException = rollbackOnlyOnAsyncException; 3197 } 3198 3199 public boolean isRollbackOnlyOnAsyncException() { 3200 return rollbackOnlyOnAsyncException; 3201 } 3202 3203 public boolean isUseVirtualDestSubs() { 3204 return useVirtualDestSubs; 3205 } 3206 3207 public void setUseVirtualDestSubs( 3208 boolean useVirtualDestSubs) { 3209 this.useVirtualDestSubs = useVirtualDestSubs; 3210 } 3211 3212 public boolean isUseVirtualDestSubsOnCreation() { 3213 return useVirtualDestSubsOnCreation; 3214 } 3215 3216 public void setUseVirtualDestSubsOnCreation( 3217 boolean useVirtualDestSubsOnCreation) { 3218 this.useVirtualDestSubsOnCreation = useVirtualDestSubsOnCreation; 3219 } 3220}