001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.io.BufferedReader; 020import java.io.File; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.InputStreamReader; 024import java.net.URI; 025import java.net.URISyntaxException; 026import java.net.UnknownHostException; 027import java.security.Provider; 028import java.security.Security; 029import java.util.ArrayList; 030import java.util.Date; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Locale; 036import java.util.Map; 037import java.util.Set; 038import java.util.concurrent.CopyOnWriteArrayList; 039import java.util.concurrent.CountDownLatch; 040import java.util.concurrent.LinkedBlockingQueue; 041import java.util.concurrent.RejectedExecutionException; 042import java.util.concurrent.RejectedExecutionHandler; 043import java.util.concurrent.SynchronousQueue; 044import java.util.concurrent.ThreadFactory; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.atomic.AtomicBoolean; 048import java.util.concurrent.atomic.AtomicInteger; 049import java.util.concurrent.atomic.AtomicLong; 050 051import javax.annotation.PostConstruct; 052import javax.annotation.PreDestroy; 053import javax.management.MalformedObjectNameException; 054import javax.management.ObjectName; 055 056import org.apache.activemq.ActiveMQConnectionMetaData; 057import org.apache.activemq.ConfigurationException; 058import org.apache.activemq.Service; 059import org.apache.activemq.advisory.AdvisoryBroker; 060import org.apache.activemq.broker.cluster.ConnectionSplitBroker; 061import org.apache.activemq.broker.jmx.AnnotatedMBean; 062import org.apache.activemq.broker.jmx.BrokerMBeanSupport; 063import org.apache.activemq.broker.jmx.BrokerView; 064import org.apache.activemq.broker.jmx.ConnectorView; 065import org.apache.activemq.broker.jmx.ConnectorViewMBean; 066import org.apache.activemq.broker.jmx.HealthView; 067import org.apache.activemq.broker.jmx.HealthViewMBean; 068import org.apache.activemq.broker.jmx.JmsConnectorView; 069import org.apache.activemq.broker.jmx.JobSchedulerView; 070import org.apache.activemq.broker.jmx.JobSchedulerViewMBean; 071import org.apache.activemq.broker.jmx.Log4JConfigView; 072import org.apache.activemq.broker.jmx.ManagedRegionBroker; 073import org.apache.activemq.broker.jmx.ManagementContext; 074import org.apache.activemq.broker.jmx.NetworkConnectorView; 075import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean; 076import org.apache.activemq.broker.jmx.ProxyConnectorView; 077import org.apache.activemq.broker.region.CompositeDestinationInterceptor; 078import org.apache.activemq.broker.region.Destination; 079import org.apache.activemq.broker.region.DestinationFactory; 080import org.apache.activemq.broker.region.DestinationFactoryImpl; 081import org.apache.activemq.broker.region.DestinationInterceptor; 082import org.apache.activemq.broker.region.RegionBroker; 083import org.apache.activemq.broker.region.policy.PolicyMap; 084import org.apache.activemq.broker.region.virtual.MirroredQueue; 085import org.apache.activemq.broker.region.virtual.VirtualDestination; 086import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; 087import org.apache.activemq.broker.region.virtual.VirtualTopic; 088import org.apache.activemq.broker.scheduler.JobSchedulerStore; 089import org.apache.activemq.broker.scheduler.SchedulerBroker; 090import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore; 091import org.apache.activemq.command.ActiveMQDestination; 092import org.apache.activemq.command.ActiveMQQueue; 093import org.apache.activemq.command.BrokerId; 094import org.apache.activemq.command.ProducerInfo; 095import org.apache.activemq.filter.DestinationFilter; 096import org.apache.activemq.network.ConnectionFilter; 097import org.apache.activemq.network.DiscoveryNetworkConnector; 098import org.apache.activemq.network.NetworkConnector; 099import org.apache.activemq.network.jms.JmsConnector; 100import org.apache.activemq.openwire.OpenWireFormat; 101import org.apache.activemq.proxy.ProxyConnector; 102import org.apache.activemq.security.MessageAuthorizationPolicy; 103import org.apache.activemq.selector.SelectorParser; 104import org.apache.activemq.store.JournaledStore; 105import org.apache.activemq.store.PListStore; 106import org.apache.activemq.store.PersistenceAdapter; 107import org.apache.activemq.store.PersistenceAdapterFactory; 108import org.apache.activemq.store.memory.MemoryPersistenceAdapter; 109import org.apache.activemq.thread.Scheduler; 110import org.apache.activemq.thread.TaskRunnerFactory; 111import org.apache.activemq.transport.TransportFactorySupport; 112import org.apache.activemq.transport.TransportServer; 113import org.apache.activemq.transport.vm.VMTransportFactory; 114import org.apache.activemq.usage.PercentLimitUsage; 115import org.apache.activemq.usage.StoreUsage; 116import org.apache.activemq.usage.SystemUsage; 117import org.apache.activemq.util.BrokerSupport; 118import org.apache.activemq.util.DefaultIOExceptionHandler; 119import org.apache.activemq.util.IOExceptionHandler; 120import org.apache.activemq.util.IOExceptionSupport; 121import org.apache.activemq.util.IOHelper; 122import org.apache.activemq.util.InetAddressUtil; 123import org.apache.activemq.util.ServiceStopper; 124import org.apache.activemq.util.StoreUtil; 125import org.apache.activemq.util.ThreadPoolUtils; 126import org.apache.activemq.util.TimeUtils; 127import org.apache.activemq.util.URISupport; 128import org.slf4j.Logger; 129import org.slf4j.LoggerFactory; 130import org.slf4j.MDC; 131 132/** 133 * Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a 134 * number of transport connectors, network connectors and a bunch of properties 135 * which can be used to configure the broker as its lazily created. 136 * 137 * @org.apache.xbean.XBean 138 */ 139public class BrokerService implements Service { 140 public static final String DEFAULT_PORT = "61616"; 141 public static final String LOCAL_HOST_NAME; 142 public static final String BROKER_VERSION; 143 public static final String DEFAULT_BROKER_NAME = "localhost"; 144 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 145 public static final long DEFAULT_START_TIMEOUT = 600000L; 146 147 private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class); 148 149 @SuppressWarnings("unused") 150 private static final long serialVersionUID = 7353129142305630237L; 151 152 private boolean useJmx = true; 153 private boolean enableStatistics = true; 154 private boolean persistent = true; 155 private boolean populateJMSXUserID; 156 private boolean useAuthenticatedPrincipalForJMSXUserID; 157 private boolean populateUserNameInMBeans; 158 private long mbeanInvocationTimeout = 0; 159 160 private boolean useShutdownHook = true; 161 private boolean useLoggingForShutdownErrors; 162 private boolean shutdownOnMasterFailure; 163 private boolean shutdownOnSlaveFailure; 164 private boolean waitForSlave; 165 private long waitForSlaveTimeout = DEFAULT_START_TIMEOUT; 166 private boolean passiveSlave; 167 private String brokerName = DEFAULT_BROKER_NAME; 168 private File dataDirectoryFile; 169 private File tmpDataDirectory; 170 private Broker broker; 171 private BrokerView adminView; 172 private ManagementContext managementContext; 173 private ObjectName brokerObjectName; 174 private TaskRunnerFactory taskRunnerFactory; 175 private TaskRunnerFactory persistenceTaskRunnerFactory; 176 private SystemUsage systemUsage; 177 private SystemUsage producerSystemUsage; 178 private SystemUsage consumerSystemUsaage; 179 private PersistenceAdapter persistenceAdapter; 180 private PersistenceAdapterFactory persistenceFactory; 181 protected DestinationFactory destinationFactory; 182 private MessageAuthorizationPolicy messageAuthorizationPolicy; 183 private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>(); 184 private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>(); 185 private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>(); 186 private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>(); 187 private final List<Service> services = new ArrayList<Service>(); 188 private transient Thread shutdownHook; 189 private String[] transportConnectorURIs; 190 private String[] networkConnectorURIs; 191 private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges 192 // to other jms messaging systems 193 private boolean deleteAllMessagesOnStartup; 194 private boolean advisorySupport = true; 195 private URI vmConnectorURI; 196 private String defaultSocketURIString; 197 private PolicyMap destinationPolicy; 198 private final AtomicBoolean started = new AtomicBoolean(false); 199 private final AtomicBoolean stopped = new AtomicBoolean(false); 200 private final AtomicBoolean stopping = new AtomicBoolean(false); 201 private BrokerPlugin[] plugins; 202 private boolean keepDurableSubsActive = true; 203 private boolean enableMessageExpirationOnActiveDurableSubs = false; 204 private boolean useVirtualTopics = true; 205 private boolean useMirroredQueues = false; 206 private boolean useTempMirroredQueues = true; 207 /** 208 * Whether or not virtual destination subscriptions should cause network demand 209 */ 210 private boolean useVirtualDestSubs = false; 211 /** 212 * Whether or no the creation of destinations that match virtual destinations 213 * should cause network demand 214 */ 215 private boolean useVirtualDestSubsOnCreation = false; 216 private BrokerId brokerId; 217 private volatile DestinationInterceptor[] destinationInterceptors; 218 private ActiveMQDestination[] destinations; 219 private PListStore tempDataStore; 220 private int persistenceThreadPriority = Thread.MAX_PRIORITY; 221 private boolean useLocalHostBrokerName; 222 private final CountDownLatch stoppedLatch = new CountDownLatch(1); 223 private final CountDownLatch startedLatch = new CountDownLatch(1); 224 private Broker regionBroker; 225 private int producerSystemUsagePortion = 60; 226 private int consumerSystemUsagePortion = 40; 227 private boolean splitSystemUsageForProducersConsumers; 228 private boolean monitorConnectionSplits = false; 229 private int taskRunnerPriority = Thread.NORM_PRIORITY; 230 private boolean dedicatedTaskRunner; 231 private boolean cacheTempDestinations = false;// useful for failover 232 private int timeBeforePurgeTempDestinations = 5000; 233 private final List<Runnable> shutdownHooks = new ArrayList<Runnable>(); 234 private boolean systemExitOnShutdown; 235 private int systemExitOnShutdownExitCode; 236 private SslContext sslContext; 237 private boolean forceStart = false; 238 private IOExceptionHandler ioExceptionHandler; 239 private boolean schedulerSupport = false; 240 private File schedulerDirectoryFile; 241 private Scheduler scheduler; 242 private ThreadPoolExecutor executor; 243 private int schedulePeriodForDestinationPurge= 0; 244 private int maxPurgedDestinationsPerSweep = 0; 245 private int schedulePeriodForDiskUsageCheck = 0; 246 private int diskUsageCheckRegrowThreshold = -1; 247 private boolean adjustUsageLimits = true; 248 private BrokerContext brokerContext; 249 private boolean networkConnectorStartAsync = false; 250 private boolean allowTempAutoCreationOnSend; 251 private JobSchedulerStore jobSchedulerStore; 252 private final AtomicLong totalConnections = new AtomicLong(); 253 private final AtomicInteger currentConnections = new AtomicInteger(); 254 255 private long offlineDurableSubscriberTimeout = -1; 256 private long offlineDurableSubscriberTaskSchedule = 300000; 257 private DestinationFilter virtualConsumerDestinationFilter; 258 259 private final Object persistenceAdapterLock = new Object(); 260 private Throwable startException = null; 261 private boolean startAsync = false; 262 private Date startDate; 263 private boolean slave = true; 264 265 private boolean restartAllowed = true; 266 private boolean restartRequested = false; 267 private boolean rejectDurableConsumers = false; 268 private boolean rollbackOnlyOnAsyncException = true; 269 270 private int storeOpenWireVersion = OpenWireFormat.DEFAULT_VERSION; 271 272 static { 273 274 try { 275 ClassLoader loader = BrokerService.class.getClassLoader(); 276 Class<?> clazz = loader.loadClass("org.bouncycastle.jce.provider.BouncyCastleProvider"); 277 Provider bouncycastle = (Provider) clazz.newInstance(); 278 Security.insertProviderAt(bouncycastle, 279 Integer.getInteger("org.apache.activemq.broker.BouncyCastlePosition", 2)); 280 LOG.info("Loaded the Bouncy Castle security provider."); 281 } catch(Throwable e) { 282 // No BouncyCastle found so we use the default Java Security Provider 283 } 284 285 String localHostName = "localhost"; 286 try { 287 localHostName = InetAddressUtil.getLocalHostName(); 288 } catch (UnknownHostException e) { 289 LOG.error("Failed to resolve localhost"); 290 } 291 LOCAL_HOST_NAME = localHostName; 292 293 String version = null; 294 try(InputStream in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) { 295 if (in != null) { 296 try(InputStreamReader isr = new InputStreamReader(in); 297 BufferedReader reader = new BufferedReader(isr)) { 298 version = reader.readLine(); 299 } 300 } 301 } catch (IOException ie) { 302 LOG.warn("Error reading broker version ", ie); 303 } 304 BROKER_VERSION = version; 305 } 306 307 @Override 308 public String toString() { 309 return "BrokerService[" + getBrokerName() + "]"; 310 } 311 312 private String getBrokerVersion() { 313 String version = ActiveMQConnectionMetaData.PROVIDER_VERSION; 314 if (version == null) { 315 version = BROKER_VERSION; 316 } 317 318 return version; 319 } 320 321 /** 322 * Adds a new transport connector for the given bind address 323 * 324 * @return the newly created and added transport connector 325 * @throws Exception 326 */ 327 public TransportConnector addConnector(String bindAddress) throws Exception { 328 return addConnector(new URI(bindAddress)); 329 } 330 331 /** 332 * Adds a new transport connector for the given bind address 333 * 334 * @return the newly created and added transport connector 335 * @throws Exception 336 */ 337 public TransportConnector addConnector(URI bindAddress) throws Exception { 338 return addConnector(createTransportConnector(bindAddress)); 339 } 340 341 /** 342 * Adds a new transport connector for the given TransportServer transport 343 * 344 * @return the newly created and added transport connector 345 * @throws Exception 346 */ 347 public TransportConnector addConnector(TransportServer transport) throws Exception { 348 return addConnector(new TransportConnector(transport)); 349 } 350 351 /** 352 * Adds a new transport connector 353 * 354 * @return the transport connector 355 * @throws Exception 356 */ 357 public TransportConnector addConnector(TransportConnector connector) throws Exception { 358 transportConnectors.add(connector); 359 return connector; 360 } 361 362 /** 363 * Stops and removes a transport connector from the broker. 364 * 365 * @param connector 366 * @return true if the connector has been previously added to the broker 367 * @throws Exception 368 */ 369 public boolean removeConnector(TransportConnector connector) throws Exception { 370 boolean rc = transportConnectors.remove(connector); 371 if (rc) { 372 unregisterConnectorMBean(connector); 373 } 374 return rc; 375 } 376 377 /** 378 * Adds a new network connector using the given discovery address 379 * 380 * @return the newly created and added network connector 381 * @throws Exception 382 */ 383 public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception { 384 return addNetworkConnector(new URI(discoveryAddress)); 385 } 386 387 /** 388 * Adds a new proxy connector using the given bind address 389 * 390 * @return the newly created and added network connector 391 * @throws Exception 392 */ 393 public ProxyConnector addProxyConnector(String bindAddress) throws Exception { 394 return addProxyConnector(new URI(bindAddress)); 395 } 396 397 /** 398 * Adds a new network connector using the given discovery address 399 * 400 * @return the newly created and added network connector 401 * @throws Exception 402 */ 403 public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception { 404 NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress); 405 return addNetworkConnector(connector); 406 } 407 408 /** 409 * Adds a new proxy connector using the given bind address 410 * 411 * @return the newly created and added network connector 412 * @throws Exception 413 */ 414 public ProxyConnector addProxyConnector(URI bindAddress) throws Exception { 415 ProxyConnector connector = new ProxyConnector(); 416 connector.setBind(bindAddress); 417 connector.setRemote(new URI("fanout:multicast://default")); 418 return addProxyConnector(connector); 419 } 420 421 /** 422 * Adds a new network connector to connect this broker to a federated 423 * network 424 */ 425 public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception { 426 connector.setBrokerService(this); 427 connector.setLocalUri(getVmConnectorURI()); 428 // Set a connection filter so that the connector does not establish loop 429 // back connections. 430 connector.setConnectionFilter(new ConnectionFilter() { 431 @Override 432 public boolean connectTo(URI location) { 433 List<TransportConnector> transportConnectors = getTransportConnectors(); 434 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) { 435 try { 436 TransportConnector tc = iter.next(); 437 if (location.equals(tc.getConnectUri())) { 438 return false; 439 } 440 } catch (Throwable e) { 441 } 442 } 443 return true; 444 } 445 }); 446 networkConnectors.add(connector); 447 return connector; 448 } 449 450 /** 451 * Removes the given network connector without stopping it. The caller 452 * should call {@link NetworkConnector#stop()} to close the connector 453 */ 454 public boolean removeNetworkConnector(NetworkConnector connector) { 455 boolean answer = networkConnectors.remove(connector); 456 if (answer) { 457 unregisterNetworkConnectorMBean(connector); 458 } 459 return answer; 460 } 461 462 public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception { 463 URI uri = getVmConnectorURI(); 464 connector.setLocalUri(uri); 465 proxyConnectors.add(connector); 466 if (isUseJmx()) { 467 registerProxyConnectorMBean(connector); 468 } 469 return connector; 470 } 471 472 public JmsConnector addJmsConnector(JmsConnector connector) throws Exception { 473 connector.setBrokerService(this); 474 jmsConnectors.add(connector); 475 if (isUseJmx()) { 476 registerJmsConnectorMBean(connector); 477 } 478 return connector; 479 } 480 481 public JmsConnector removeJmsConnector(JmsConnector connector) { 482 if (jmsConnectors.remove(connector)) { 483 return connector; 484 } 485 return null; 486 } 487 488 public void masterFailed() { 489 if (shutdownOnMasterFailure) { 490 LOG.error("The Master has failed ... shutting down"); 491 try { 492 stop(); 493 } catch (Exception e) { 494 LOG.error("Failed to stop for master failure", e); 495 } 496 } else { 497 LOG.warn("Master Failed - starting all connectors"); 498 try { 499 startAllConnectors(); 500 broker.nowMasterBroker(); 501 } catch (Exception e) { 502 LOG.error("Failed to startAllConnectors", e); 503 } 504 } 505 } 506 507 public String getUptime() { 508 long delta = getUptimeMillis(); 509 510 if (delta == 0) { 511 return "not started"; 512 } 513 514 return TimeUtils.printDuration(delta); 515 } 516 517 public long getUptimeMillis() { 518 if (startDate == null) { 519 return 0; 520 } 521 522 return new Date().getTime() - startDate.getTime(); 523 } 524 525 public boolean isStarted() { 526 return started.get() && startedLatch.getCount() == 0; 527 } 528 529 /** 530 * Forces a start of the broker. 531 * By default a BrokerService instance that was 532 * previously stopped using BrokerService.stop() cannot be restarted 533 * using BrokerService.start(). 534 * This method enforces a restart. 535 * It is not recommended to force a restart of the broker and will not work 536 * for most but some very trivial broker configurations. 537 * For restarting a broker instance we recommend to first call stop() on 538 * the old instance and then recreate a new BrokerService instance. 539 * 540 * @param force - if true enforces a restart. 541 * @throws Exception 542 */ 543 public void start(boolean force) throws Exception { 544 forceStart = force; 545 stopped.set(false); 546 started.set(false); 547 start(); 548 } 549 550 // Service interface 551 // ------------------------------------------------------------------------- 552 553 protected boolean shouldAutostart() { 554 return true; 555 } 556 557 /** 558 * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions 559 * 560 * delegates to autoStart, done to prevent backwards incompatible signature change 561 */ 562 @PostConstruct 563 private void postConstruct() { 564 try { 565 autoStart(); 566 } catch (Exception ex) { 567 throw new RuntimeException(ex); 568 } 569 } 570 571 /** 572 * 573 * @throws Exception 574 * @org. apache.xbean.InitMethod 575 */ 576 public void autoStart() throws Exception { 577 if(shouldAutostart()) { 578 start(); 579 } 580 } 581 582 @Override 583 public void start() throws Exception { 584 if (stopped.get() || !started.compareAndSet(false, true)) { 585 // lets just ignore redundant start() calls 586 // as its way too easy to not be completely sure if start() has been 587 // called or not with the gazillion of different configuration 588 // mechanisms 589 // throw new IllegalStateException("Already started."); 590 return; 591 } 592 593 setStartException(null); 594 stopping.set(false); 595 startDate = new Date(); 596 MDC.put("activemq.broker", brokerName); 597 598 try { 599 checkMemorySystemUsageLimits(); 600 if (systemExitOnShutdown && useShutdownHook) { 601 throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)"); 602 } 603 processHelperProperties(); 604 if (isUseJmx()) { 605 // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and 606 // we cannot cleanup clear that during shutdown of the broker. 607 MDC.remove("activemq.broker"); 608 try { 609 startManagementContext(); 610 for (NetworkConnector connector : getNetworkConnectors()) { 611 registerNetworkConnectorMBean(connector); 612 } 613 } finally { 614 MDC.put("activemq.broker", brokerName); 615 } 616 } 617 618 // in jvm master slave, lets not publish over existing broker till we get the lock 619 final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance(); 620 if (brokerRegistry.lookup(getBrokerName()) == null) { 621 brokerRegistry.bind(getBrokerName(), BrokerService.this); 622 } 623 startPersistenceAdapter(startAsync); 624 startBroker(startAsync); 625 brokerRegistry.bind(getBrokerName(), BrokerService.this); 626 } catch (Exception e) { 627 LOG.error("Failed to start Apache ActiveMQ (" + getBrokerName() + ", " + brokerId + ")", e); 628 try { 629 if (!stopped.get()) { 630 stop(); 631 } 632 } catch (Exception ex) { 633 LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex); 634 } 635 throw e; 636 } finally { 637 MDC.remove("activemq.broker"); 638 } 639 } 640 641 private void startPersistenceAdapter(boolean async) throws Exception { 642 if (async) { 643 new Thread("Persistence Adapter Starting Thread") { 644 @Override 645 public void run() { 646 try { 647 doStartPersistenceAdapter(); 648 } catch (Throwable e) { 649 setStartException(e); 650 } finally { 651 synchronized (persistenceAdapterLock) { 652 persistenceAdapterLock.notifyAll(); 653 } 654 } 655 } 656 }.start(); 657 } else { 658 doStartPersistenceAdapter(); 659 } 660 } 661 662 private void doStartPersistenceAdapter() throws Exception { 663 PersistenceAdapter persistenceAdapterToStart = getPersistenceAdapter(); 664 if (persistenceAdapterToStart == null) { 665 checkStartException(); 666 throw new ConfigurationException("Cannot start null persistence adapter"); 667 } 668 persistenceAdapterToStart.setUsageManager(getProducerSystemUsage()); 669 persistenceAdapterToStart.setBrokerName(getBrokerName()); 670 LOG.info("Using Persistence Adapter: {}", persistenceAdapterToStart); 671 if (deleteAllMessagesOnStartup) { 672 deleteAllMessages(); 673 } 674 persistenceAdapterToStart.start(); 675 676 getTempDataStore(); 677 if (tempDataStore != null) { 678 try { 679 // start after we have the store lock 680 tempDataStore.start(); 681 } catch (Exception e) { 682 RuntimeException exception = new RuntimeException( 683 "Failed to start temp data store: " + tempDataStore, e); 684 LOG.error(exception.getLocalizedMessage(), e); 685 throw exception; 686 } 687 } 688 689 getJobSchedulerStore(); 690 if (jobSchedulerStore != null) { 691 try { 692 jobSchedulerStore.start(); 693 } catch (Exception e) { 694 RuntimeException exception = new RuntimeException( 695 "Failed to start job scheduler store: " + jobSchedulerStore, e); 696 LOG.error(exception.getLocalizedMessage(), e); 697 throw exception; 698 } 699 } 700 } 701 702 private void startBroker(boolean async) throws Exception { 703 if (async) { 704 new Thread("Broker Starting Thread") { 705 @Override 706 public void run() { 707 try { 708 synchronized (persistenceAdapterLock) { 709 persistenceAdapterLock.wait(); 710 } 711 doStartBroker(); 712 } catch (Throwable t) { 713 setStartException(t); 714 } 715 } 716 }.start(); 717 } else { 718 doStartBroker(); 719 } 720 } 721 722 private void doStartBroker() throws Exception { 723 checkStartException(); 724 startDestinations(); 725 addShutdownHook(); 726 727 broker = getBroker(); 728 brokerId = broker.getBrokerId(); 729 730 // need to log this after creating the broker so we have its id and name 731 LOG.info("Apache ActiveMQ {} ({}, {}) is starting", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId }); 732 broker.start(); 733 734 if (isUseJmx()) { 735 if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) { 736 // try to restart management context 737 // typical for slaves that use the same ports as master 738 managementContext.stop(); 739 startManagementContext(); 740 } 741 ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker; 742 managedBroker.setContextBroker(broker); 743 adminView.setBroker(managedBroker); 744 } 745 746 if (ioExceptionHandler == null) { 747 setIoExceptionHandler(new DefaultIOExceptionHandler()); 748 } 749 750 if (isUseJmx() && Log4JConfigView.isLog4JAvailable()) { 751 ObjectName objectName = BrokerMBeanSupport.createLog4JConfigViewName(getBrokerObjectName().toString()); 752 Log4JConfigView log4jConfigView = new Log4JConfigView(); 753 AnnotatedMBean.registerMBean(getManagementContext(), log4jConfigView, objectName); 754 } 755 756 startAllConnectors(); 757 758 LOG.info("Apache ActiveMQ {} ({}, {}) started", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId}); 759 LOG.info("For help or more information please see: http://activemq.apache.org"); 760 761 getBroker().brokerServiceStarted(); 762 checkStoreSystemUsageLimits(); 763 startedLatch.countDown(); 764 getBroker().nowMasterBroker(); 765 } 766 767 /** 768 * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions 769 * 770 * delegates to stop, done to prevent backwards incompatible signature change 771 */ 772 @PreDestroy 773 private void preDestroy () { 774 try { 775 stop(); 776 } catch (Exception ex) { 777 throw new RuntimeException(); 778 } 779 } 780 781 /** 782 * 783 * @throws Exception 784 * @org.apache .xbean.DestroyMethod 785 */ 786 @Override 787 public void stop() throws Exception { 788 if (!stopping.compareAndSet(false, true)) { 789 LOG.trace("Broker already stopping/stopped"); 790 return; 791 } 792 793 setStartException(new BrokerStoppedException("Stop invoked")); 794 MDC.put("activemq.broker", brokerName); 795 796 if (systemExitOnShutdown) { 797 new Thread() { 798 @Override 799 public void run() { 800 System.exit(systemExitOnShutdownExitCode); 801 } 802 }.start(); 803 } 804 805 LOG.info("Apache ActiveMQ {} ({}, {}) is shutting down", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId} ); 806 807 removeShutdownHook(); 808 if (this.scheduler != null) { 809 this.scheduler.stop(); 810 this.scheduler = null; 811 } 812 ServiceStopper stopper = new ServiceStopper(); 813 if (services != null) { 814 for (Service service : services) { 815 stopper.stop(service); 816 } 817 } 818 stopAllConnectors(stopper); 819 this.slave = true; 820 // remove any VMTransports connected 821 // this has to be done after services are stopped, 822 // to avoid timing issue with discovery (spinning up a new instance) 823 BrokerRegistry.getInstance().unbind(getBrokerName()); 824 VMTransportFactory.stopped(getBrokerName()); 825 if (broker != null) { 826 stopper.stop(broker); 827 broker = null; 828 } 829 830 if (jobSchedulerStore != null) { 831 jobSchedulerStore.stop(); 832 jobSchedulerStore = null; 833 } 834 if (tempDataStore != null) { 835 tempDataStore.stop(); 836 tempDataStore = null; 837 } 838 try { 839 stopper.stop(getPersistenceAdapter()); 840 persistenceAdapter = null; 841 if (isUseJmx()) { 842 stopper.stop(managementContext); 843 managementContext = null; 844 } 845 // Clear SelectorParser cache to free memory 846 SelectorParser.clearCache(); 847 } finally { 848 started.set(false); 849 stopped.set(true); 850 stoppedLatch.countDown(); 851 } 852 853 if (this.taskRunnerFactory != null) { 854 this.taskRunnerFactory.shutdown(); 855 this.taskRunnerFactory = null; 856 } 857 if (this.executor != null) { 858 ThreadPoolUtils.shutdownNow(executor); 859 this.executor = null; 860 } 861 862 this.destinationInterceptors = null; 863 this.destinationFactory = null; 864 865 if (startDate != null) { 866 LOG.info("Apache ActiveMQ {} ({}, {}) uptime {}", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId, getUptime()}); 867 } 868 LOG.info("Apache ActiveMQ {} ({}, {}) is shutdown", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId}); 869 870 synchronized (shutdownHooks) { 871 for (Runnable hook : shutdownHooks) { 872 try { 873 hook.run(); 874 } catch (Throwable e) { 875 stopper.onException(hook, e); 876 } 877 } 878 } 879 880 MDC.remove("activemq.broker"); 881 882 // and clear start date 883 startDate = null; 884 885 stopper.throwFirstException(); 886 } 887 888 public boolean checkQueueSize(String queueName) { 889 long count = 0; 890 long queueSize = 0; 891 Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap(); 892 for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) { 893 if (entry.getKey().isQueue()) { 894 if (entry.getValue().getName().matches(queueName)) { 895 queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount(); 896 count += queueSize; 897 if (queueSize > 0) { 898 LOG.info("Queue has pending message: {} queueSize is: {}", entry.getValue().getName(), queueSize); 899 } 900 } 901 } 902 } 903 return count == 0; 904 } 905 906 /** 907 * This method (both connectorName and queueName are using regex to match) 908 * 1. stop the connector (supposed the user input the connector which the 909 * clients connect to) 2. to check whether there is any pending message on 910 * the queues defined by queueName 3. supposedly, after stop the connector, 911 * client should failover to other broker and pending messages should be 912 * forwarded. if no pending messages, the method finally call stop to stop 913 * the broker. 914 * 915 * @param connectorName 916 * @param queueName 917 * @param timeout 918 * @param pollInterval 919 * @throws Exception 920 */ 921 public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception { 922 if (isUseJmx()) { 923 if (connectorName == null || queueName == null || timeout <= 0) { 924 throw new Exception( 925 "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully."); 926 } 927 if (pollInterval <= 0) { 928 pollInterval = 30; 929 } 930 LOG.info("Stop gracefully with connectorName: {} queueName: {} timeout: {} pollInterval: {}", new Object[]{ 931 connectorName, queueName, timeout, pollInterval 932 }); 933 TransportConnector connector; 934 for (int i = 0; i < transportConnectors.size(); i++) { 935 connector = transportConnectors.get(i); 936 if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) { 937 connector.stop(); 938 } 939 } 940 long start = System.currentTimeMillis(); 941 while (System.currentTimeMillis() - start < timeout * 1000) { 942 // check quesize until it gets zero 943 if (checkQueueSize(queueName)) { 944 stop(); 945 break; 946 } else { 947 Thread.sleep(pollInterval * 1000); 948 } 949 } 950 if (stopped.get()) { 951 LOG.info("Successfully stop the broker."); 952 } else { 953 LOG.info("There is still pending message on the queue. Please check and stop the broker manually."); 954 } 955 } 956 } 957 958 /** 959 * A helper method to block the caller thread until the broker has been 960 * stopped 961 */ 962 public void waitUntilStopped() { 963 while (isStarted() && !stopped.get()) { 964 try { 965 stoppedLatch.await(); 966 } catch (InterruptedException e) { 967 // ignore 968 } 969 } 970 } 971 972 public boolean isStopped() { 973 return stopped.get(); 974 } 975 976 /** 977 * A helper method to block the caller thread until the broker has fully started 978 * @return boolean true if wait succeeded false if broker was not started or was stopped 979 */ 980 public boolean waitUntilStarted() { 981 return waitUntilStarted(DEFAULT_START_TIMEOUT); 982 } 983 984 /** 985 * A helper method to block the caller thread until the broker has fully started 986 * 987 * @param timeout 988 * the amount of time to wait before giving up and returning false. 989 * 990 * @return boolean true if wait succeeded false if broker was not started or was stopped 991 */ 992 public boolean waitUntilStarted(long timeout) { 993 boolean waitSucceeded = isStarted(); 994 long expiration = Math.max(0, timeout + System.currentTimeMillis()); 995 while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) { 996 try { 997 if (getStartException() != null) { 998 return waitSucceeded; 999 } 1000 waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS); 1001 } catch (InterruptedException ignore) { 1002 } 1003 } 1004 return waitSucceeded; 1005 } 1006 1007 // Properties 1008 // ------------------------------------------------------------------------- 1009 /** 1010 * Returns the message broker 1011 */ 1012 public Broker getBroker() throws Exception { 1013 if (broker == null) { 1014 checkStartException(); 1015 broker = createBroker(); 1016 } 1017 return broker; 1018 } 1019 1020 /** 1021 * Returns the administration view of the broker; used to create and destroy 1022 * resources such as queues and topics. Note this method returns null if JMX 1023 * is disabled. 1024 */ 1025 public BrokerView getAdminView() throws Exception { 1026 if (adminView == null) { 1027 // force lazy creation 1028 getBroker(); 1029 } 1030 return adminView; 1031 } 1032 1033 public void setAdminView(BrokerView adminView) { 1034 this.adminView = adminView; 1035 } 1036 1037 public String getBrokerName() { 1038 return brokerName; 1039 } 1040 1041 /** 1042 * Sets the name of this broker; which must be unique in the network 1043 * 1044 * @param brokerName 1045 */ 1046 private static final String brokerNameReplacedCharsRegExp = "[^a-zA-Z0-9\\.\\_\\-\\:]"; 1047 public void setBrokerName(String brokerName) { 1048 if (brokerName == null) { 1049 throw new NullPointerException("The broker name cannot be null"); 1050 } 1051 String str = brokerName.replaceAll(brokerNameReplacedCharsRegExp, "_"); 1052 if (!str.equals(brokerName)) { 1053 LOG.error("Broker Name: {} contained illegal characters matching regExp: {} - replaced with {}", brokerName, brokerNameReplacedCharsRegExp, str); 1054 } 1055 this.brokerName = str.trim(); 1056 } 1057 1058 public PersistenceAdapterFactory getPersistenceFactory() { 1059 return persistenceFactory; 1060 } 1061 1062 public File getDataDirectoryFile() { 1063 if (dataDirectoryFile == null) { 1064 dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory()); 1065 } 1066 return dataDirectoryFile; 1067 } 1068 1069 public File getBrokerDataDirectory() { 1070 String brokerDir = getBrokerName(); 1071 return new File(getDataDirectoryFile(), brokerDir); 1072 } 1073 1074 /** 1075 * Sets the directory in which the data files will be stored by default for 1076 * the JDBC and Journal persistence adaptors. 1077 * 1078 * @param dataDirectory 1079 * the directory to store data files 1080 */ 1081 public void setDataDirectory(String dataDirectory) { 1082 setDataDirectoryFile(new File(dataDirectory)); 1083 } 1084 1085 /** 1086 * Sets the directory in which the data files will be stored by default for 1087 * the JDBC and Journal persistence adaptors. 1088 * 1089 * @param dataDirectoryFile 1090 * the directory to store data files 1091 */ 1092 public void setDataDirectoryFile(File dataDirectoryFile) { 1093 this.dataDirectoryFile = dataDirectoryFile; 1094 } 1095 1096 /** 1097 * @return the tmpDataDirectory 1098 */ 1099 public File getTmpDataDirectory() { 1100 if (tmpDataDirectory == null) { 1101 tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage"); 1102 } 1103 return tmpDataDirectory; 1104 } 1105 1106 /** 1107 * @param tmpDataDirectory 1108 * the tmpDataDirectory to set 1109 */ 1110 public void setTmpDataDirectory(File tmpDataDirectory) { 1111 this.tmpDataDirectory = tmpDataDirectory; 1112 } 1113 1114 public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) { 1115 this.persistenceFactory = persistenceFactory; 1116 } 1117 1118 public void setDestinationFactory(DestinationFactory destinationFactory) { 1119 this.destinationFactory = destinationFactory; 1120 } 1121 1122 public boolean isPersistent() { 1123 return persistent; 1124 } 1125 1126 /** 1127 * Sets whether or not persistence is enabled or disabled. 1128 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1129 */ 1130 public void setPersistent(boolean persistent) { 1131 this.persistent = persistent; 1132 } 1133 1134 public boolean isPopulateJMSXUserID() { 1135 return populateJMSXUserID; 1136 } 1137 1138 /** 1139 * Sets whether or not the broker should populate the JMSXUserID header. 1140 */ 1141 public void setPopulateJMSXUserID(boolean populateJMSXUserID) { 1142 this.populateJMSXUserID = populateJMSXUserID; 1143 } 1144 1145 public SystemUsage getSystemUsage() { 1146 try { 1147 if (systemUsage == null) { 1148 1149 systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore()); 1150 systemUsage.setExecutor(getExecutor()); 1151 systemUsage.getMemoryUsage().setLimit(1024L * 1024 * 1024 * 1); // 1 GB 1152 systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB 1153 systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB 1154 systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB 1155 addService(this.systemUsage); 1156 } 1157 return systemUsage; 1158 } catch (IOException e) { 1159 LOG.error("Cannot create SystemUsage", e); 1160 throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage(), e); 1161 } 1162 } 1163 1164 public void setSystemUsage(SystemUsage memoryManager) { 1165 if (this.systemUsage != null) { 1166 removeService(this.systemUsage); 1167 } 1168 this.systemUsage = memoryManager; 1169 if (this.systemUsage.getExecutor()==null) { 1170 this.systemUsage.setExecutor(getExecutor()); 1171 } 1172 addService(this.systemUsage); 1173 } 1174 1175 /** 1176 * @return the consumerUsageManager 1177 * @throws IOException 1178 */ 1179 public SystemUsage getConsumerSystemUsage() throws IOException { 1180 if (this.consumerSystemUsaage == null) { 1181 if (splitSystemUsageForProducersConsumers) { 1182 this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer"); 1183 float portion = consumerSystemUsagePortion / 100f; 1184 this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion); 1185 addService(this.consumerSystemUsaage); 1186 } else { 1187 consumerSystemUsaage = getSystemUsage(); 1188 } 1189 } 1190 return this.consumerSystemUsaage; 1191 } 1192 1193 /** 1194 * @param consumerSystemUsaage 1195 * the storeSystemUsage to set 1196 */ 1197 public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) { 1198 if (this.consumerSystemUsaage != null) { 1199 removeService(this.consumerSystemUsaage); 1200 } 1201 this.consumerSystemUsaage = consumerSystemUsaage; 1202 addService(this.consumerSystemUsaage); 1203 } 1204 1205 /** 1206 * @return the producerUsageManager 1207 * @throws IOException 1208 */ 1209 public SystemUsage getProducerSystemUsage() throws IOException { 1210 if (producerSystemUsage == null) { 1211 if (splitSystemUsageForProducersConsumers) { 1212 producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer"); 1213 float portion = producerSystemUsagePortion / 100f; 1214 producerSystemUsage.getMemoryUsage().setUsagePortion(portion); 1215 addService(producerSystemUsage); 1216 } else { 1217 producerSystemUsage = getSystemUsage(); 1218 } 1219 } 1220 return producerSystemUsage; 1221 } 1222 1223 /** 1224 * @param producerUsageManager 1225 * the producerUsageManager to set 1226 */ 1227 public void setProducerSystemUsage(SystemUsage producerUsageManager) { 1228 if (this.producerSystemUsage != null) { 1229 removeService(this.producerSystemUsage); 1230 } 1231 this.producerSystemUsage = producerUsageManager; 1232 addService(this.producerSystemUsage); 1233 } 1234 1235 public synchronized PersistenceAdapter getPersistenceAdapter() throws IOException { 1236 if (persistenceAdapter == null && !hasStartException()) { 1237 persistenceAdapter = createPersistenceAdapter(); 1238 configureService(persistenceAdapter); 1239 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); 1240 } 1241 return persistenceAdapter; 1242 } 1243 1244 /** 1245 * Sets the persistence adaptor implementation to use for this broker 1246 * 1247 * @throws IOException 1248 */ 1249 public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException { 1250 if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) { 1251 LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: {}", persistenceAdapter); 1252 return; 1253 } 1254 this.persistenceAdapter = persistenceAdapter; 1255 configureService(this.persistenceAdapter); 1256 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); 1257 } 1258 1259 public TaskRunnerFactory getTaskRunnerFactory() { 1260 if (this.taskRunnerFactory == null) { 1261 this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000, 1262 isDedicatedTaskRunner()); 1263 this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader()); 1264 } 1265 return this.taskRunnerFactory; 1266 } 1267 1268 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 1269 this.taskRunnerFactory = taskRunnerFactory; 1270 } 1271 1272 public TaskRunnerFactory getPersistenceTaskRunnerFactory() { 1273 if (taskRunnerFactory == null) { 1274 persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, 1275 true, 1000, isDedicatedTaskRunner()); 1276 } 1277 return persistenceTaskRunnerFactory; 1278 } 1279 1280 public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) { 1281 this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory; 1282 } 1283 1284 public boolean isUseJmx() { 1285 return useJmx; 1286 } 1287 1288 public boolean isEnableStatistics() { 1289 return enableStatistics; 1290 } 1291 1292 /** 1293 * Sets whether or not the Broker's services enable statistics or not. 1294 */ 1295 public void setEnableStatistics(boolean enableStatistics) { 1296 this.enableStatistics = enableStatistics; 1297 } 1298 1299 /** 1300 * Sets whether or not the Broker's services should be exposed into JMX or 1301 * not. 1302 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1303 */ 1304 public void setUseJmx(boolean useJmx) { 1305 this.useJmx = useJmx; 1306 } 1307 1308 public ObjectName getBrokerObjectName() throws MalformedObjectNameException { 1309 if (brokerObjectName == null) { 1310 brokerObjectName = createBrokerObjectName(); 1311 } 1312 return brokerObjectName; 1313 } 1314 1315 /** 1316 * Sets the JMX ObjectName for this broker 1317 */ 1318 public void setBrokerObjectName(ObjectName brokerObjectName) { 1319 this.brokerObjectName = brokerObjectName; 1320 } 1321 1322 public ManagementContext getManagementContext() { 1323 if (managementContext == null) { 1324 checkStartException(); 1325 managementContext = new ManagementContext(); 1326 } 1327 return managementContext; 1328 } 1329 1330 synchronized private void checkStartException() { 1331 if (startException != null) { 1332 throw new BrokerStoppedException(startException); 1333 } 1334 } 1335 1336 synchronized private boolean hasStartException() { 1337 return startException != null; 1338 } 1339 1340 synchronized private void setStartException(Throwable t) { 1341 startException = t; 1342 } 1343 1344 public void setManagementContext(ManagementContext managementContext) { 1345 this.managementContext = managementContext; 1346 } 1347 1348 public NetworkConnector getNetworkConnectorByName(String connectorName) { 1349 for (NetworkConnector connector : networkConnectors) { 1350 if (connector.getName().equals(connectorName)) { 1351 return connector; 1352 } 1353 } 1354 return null; 1355 } 1356 1357 public String[] getNetworkConnectorURIs() { 1358 return networkConnectorURIs; 1359 } 1360 1361 public void setNetworkConnectorURIs(String[] networkConnectorURIs) { 1362 this.networkConnectorURIs = networkConnectorURIs; 1363 } 1364 1365 public TransportConnector getConnectorByName(String connectorName) { 1366 for (TransportConnector connector : transportConnectors) { 1367 if (connector.getName().equals(connectorName)) { 1368 return connector; 1369 } 1370 } 1371 return null; 1372 } 1373 1374 public Map<String, String> getTransportConnectorURIsAsMap() { 1375 Map<String, String> answer = new HashMap<String, String>(); 1376 for (TransportConnector connector : transportConnectors) { 1377 try { 1378 URI uri = connector.getConnectUri(); 1379 if (uri != null) { 1380 String scheme = uri.getScheme(); 1381 if (scheme != null) { 1382 answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString()); 1383 } 1384 } 1385 } catch (Exception e) { 1386 LOG.debug("Failed to read URI to build transportURIsAsMap", e); 1387 } 1388 } 1389 return answer; 1390 } 1391 1392 public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){ 1393 ProducerBrokerExchange result = null; 1394 1395 for (TransportConnector connector : transportConnectors) { 1396 for (TransportConnection tc: connector.getConnections()){ 1397 result = tc.getProducerBrokerExchangeIfExists(producerInfo); 1398 if (result !=null){ 1399 return result; 1400 } 1401 } 1402 } 1403 return result; 1404 } 1405 1406 public String[] getTransportConnectorURIs() { 1407 return transportConnectorURIs; 1408 } 1409 1410 public void setTransportConnectorURIs(String[] transportConnectorURIs) { 1411 this.transportConnectorURIs = transportConnectorURIs; 1412 } 1413 1414 /** 1415 * @return Returns the jmsBridgeConnectors. 1416 */ 1417 public JmsConnector[] getJmsBridgeConnectors() { 1418 return jmsBridgeConnectors; 1419 } 1420 1421 /** 1422 * @param jmsConnectors 1423 * The jmsBridgeConnectors to set. 1424 */ 1425 public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) { 1426 this.jmsBridgeConnectors = jmsConnectors; 1427 } 1428 1429 public Service[] getServices() { 1430 return services.toArray(new Service[0]); 1431 } 1432 1433 /** 1434 * Sets the services associated with this broker. 1435 */ 1436 public void setServices(Service[] services) { 1437 this.services.clear(); 1438 if (services != null) { 1439 for (int i = 0; i < services.length; i++) { 1440 this.services.add(services[i]); 1441 } 1442 } 1443 } 1444 1445 /** 1446 * Adds a new service so that it will be started as part of the broker 1447 * lifecycle 1448 */ 1449 public void addService(Service service) { 1450 services.add(service); 1451 } 1452 1453 public void removeService(Service service) { 1454 services.remove(service); 1455 } 1456 1457 public boolean isUseLoggingForShutdownErrors() { 1458 return useLoggingForShutdownErrors; 1459 } 1460 1461 /** 1462 * Sets whether or not we should use commons-logging when reporting errors 1463 * when shutting down the broker 1464 */ 1465 public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) { 1466 this.useLoggingForShutdownErrors = useLoggingForShutdownErrors; 1467 } 1468 1469 public boolean isUseShutdownHook() { 1470 return useShutdownHook; 1471 } 1472 1473 /** 1474 * Sets whether or not we should use a shutdown handler to close down the 1475 * broker cleanly if the JVM is terminated. It is recommended you leave this 1476 * enabled. 1477 */ 1478 public void setUseShutdownHook(boolean useShutdownHook) { 1479 this.useShutdownHook = useShutdownHook; 1480 } 1481 1482 public boolean isAdvisorySupport() { 1483 return advisorySupport; 1484 } 1485 1486 /** 1487 * Allows the support of advisory messages to be disabled for performance 1488 * reasons. 1489 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1490 */ 1491 public void setAdvisorySupport(boolean advisorySupport) { 1492 this.advisorySupport = advisorySupport; 1493 } 1494 1495 public List<TransportConnector> getTransportConnectors() { 1496 return new ArrayList<TransportConnector>(transportConnectors); 1497 } 1498 1499 /** 1500 * Sets the transport connectors which this broker will listen on for new 1501 * clients 1502 * 1503 * @org.apache.xbean.Property 1504 * nestedType="org.apache.activemq.broker.TransportConnector" 1505 */ 1506 public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception { 1507 for (TransportConnector connector : transportConnectors) { 1508 addConnector(connector); 1509 } 1510 } 1511 1512 public TransportConnector getTransportConnectorByName(String name){ 1513 for (TransportConnector transportConnector : transportConnectors){ 1514 if (name.equals(transportConnector.getName())){ 1515 return transportConnector; 1516 } 1517 } 1518 return null; 1519 } 1520 1521 public TransportConnector getTransportConnectorByScheme(String scheme){ 1522 for (TransportConnector transportConnector : transportConnectors){ 1523 if (scheme.equals(transportConnector.getUri().getScheme())){ 1524 return transportConnector; 1525 } 1526 } 1527 return null; 1528 } 1529 1530 public List<NetworkConnector> getNetworkConnectors() { 1531 return new ArrayList<NetworkConnector>(networkConnectors); 1532 } 1533 1534 public List<ProxyConnector> getProxyConnectors() { 1535 return new ArrayList<ProxyConnector>(proxyConnectors); 1536 } 1537 1538 /** 1539 * Sets the network connectors which this broker will use to connect to 1540 * other brokers in a federated network 1541 * 1542 * @org.apache.xbean.Property 1543 * nestedType="org.apache.activemq.network.NetworkConnector" 1544 */ 1545 public void setNetworkConnectors(List<?> networkConnectors) throws Exception { 1546 for (Object connector : networkConnectors) { 1547 addNetworkConnector((NetworkConnector) connector); 1548 } 1549 } 1550 1551 /** 1552 * Sets the network connectors which this broker will use to connect to 1553 * other brokers in a federated network 1554 */ 1555 public void setProxyConnectors(List<?> proxyConnectors) throws Exception { 1556 for (Object connector : proxyConnectors) { 1557 addProxyConnector((ProxyConnector) connector); 1558 } 1559 } 1560 1561 public PolicyMap getDestinationPolicy() { 1562 return destinationPolicy; 1563 } 1564 1565 /** 1566 * Sets the destination specific policies available either for exact 1567 * destinations or for wildcard areas of destinations. 1568 */ 1569 public void setDestinationPolicy(PolicyMap policyMap) { 1570 this.destinationPolicy = policyMap; 1571 } 1572 1573 public BrokerPlugin[] getPlugins() { 1574 return plugins; 1575 } 1576 1577 /** 1578 * Sets a number of broker plugins to install such as for security 1579 * authentication or authorization 1580 */ 1581 public void setPlugins(BrokerPlugin[] plugins) { 1582 this.plugins = plugins; 1583 } 1584 1585 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 1586 return messageAuthorizationPolicy; 1587 } 1588 1589 /** 1590 * Sets the policy used to decide if the current connection is authorized to 1591 * consume a given message 1592 */ 1593 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 1594 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 1595 } 1596 1597 /** 1598 * Delete all messages from the persistent store 1599 * 1600 * @throws IOException 1601 */ 1602 public void deleteAllMessages() throws IOException { 1603 getPersistenceAdapter().deleteAllMessages(); 1604 } 1605 1606 public boolean isDeleteAllMessagesOnStartup() { 1607 return deleteAllMessagesOnStartup; 1608 } 1609 1610 /** 1611 * Sets whether or not all messages are deleted on startup - mostly only 1612 * useful for testing. 1613 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1614 */ 1615 public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) { 1616 this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup; 1617 } 1618 1619 public URI getVmConnectorURI() { 1620 if (vmConnectorURI == null) { 1621 try { 1622 vmConnectorURI = new URI("vm://" + getBrokerName()); 1623 } catch (URISyntaxException e) { 1624 LOG.error("Badly formed URI from {}", getBrokerName(), e); 1625 } 1626 } 1627 return vmConnectorURI; 1628 } 1629 1630 public void setVmConnectorURI(URI vmConnectorURI) { 1631 this.vmConnectorURI = vmConnectorURI; 1632 } 1633 1634 public String getDefaultSocketURIString() { 1635 if (started.get()) { 1636 if (this.defaultSocketURIString == null) { 1637 for (TransportConnector tc:this.transportConnectors) { 1638 String result = null; 1639 try { 1640 result = tc.getPublishableConnectString(); 1641 } catch (Exception e) { 1642 LOG.warn("Failed to get the ConnectURI for {}", tc, e); 1643 } 1644 if (result != null) { 1645 // find first publishable uri 1646 if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) { 1647 this.defaultSocketURIString = result; 1648 break; 1649 } else { 1650 // or use the first defined 1651 if (this.defaultSocketURIString == null) { 1652 this.defaultSocketURIString = result; 1653 } 1654 } 1655 } 1656 } 1657 1658 } 1659 return this.defaultSocketURIString; 1660 } 1661 return null; 1662 } 1663 1664 /** 1665 * @return Returns the shutdownOnMasterFailure. 1666 */ 1667 public boolean isShutdownOnMasterFailure() { 1668 return shutdownOnMasterFailure; 1669 } 1670 1671 /** 1672 * @param shutdownOnMasterFailure 1673 * The shutdownOnMasterFailure to set. 1674 */ 1675 public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) { 1676 this.shutdownOnMasterFailure = shutdownOnMasterFailure; 1677 } 1678 1679 public boolean isKeepDurableSubsActive() { 1680 return keepDurableSubsActive; 1681 } 1682 1683 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 1684 this.keepDurableSubsActive = keepDurableSubsActive; 1685 } 1686 1687 public boolean isEnableMessageExpirationOnActiveDurableSubs() { 1688 return enableMessageExpirationOnActiveDurableSubs; 1689 } 1690 1691 public void setEnableMessageExpirationOnActiveDurableSubs(boolean enableMessageExpirationOnActiveDurableSubs) { 1692 this.enableMessageExpirationOnActiveDurableSubs = enableMessageExpirationOnActiveDurableSubs; 1693 } 1694 1695 public boolean isUseVirtualTopics() { 1696 return useVirtualTopics; 1697 } 1698 1699 /** 1700 * Sets whether or not <a 1701 * href="http://activemq.apache.org/virtual-destinations.html">Virtual 1702 * Topics</a> should be supported by default if they have not been 1703 * explicitly configured. 1704 */ 1705 public void setUseVirtualTopics(boolean useVirtualTopics) { 1706 this.useVirtualTopics = useVirtualTopics; 1707 } 1708 1709 public DestinationInterceptor[] getDestinationInterceptors() { 1710 return destinationInterceptors; 1711 } 1712 1713 public boolean isUseMirroredQueues() { 1714 return useMirroredQueues; 1715 } 1716 1717 /** 1718 * Sets whether or not <a 1719 * href="http://activemq.apache.org/mirrored-queues.html">Mirrored 1720 * Queues</a> should be supported by default if they have not been 1721 * explicitly configured. 1722 */ 1723 public void setUseMirroredQueues(boolean useMirroredQueues) { 1724 this.useMirroredQueues = useMirroredQueues; 1725 } 1726 1727 /** 1728 * Sets the destination interceptors to use 1729 */ 1730 public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) { 1731 this.destinationInterceptors = destinationInterceptors; 1732 } 1733 1734 public ActiveMQDestination[] getDestinations() { 1735 return destinations; 1736 } 1737 1738 /** 1739 * Sets the destinations which should be loaded/created on startup 1740 */ 1741 public void setDestinations(ActiveMQDestination[] destinations) { 1742 this.destinations = destinations; 1743 } 1744 1745 /** 1746 * @return the tempDataStore 1747 */ 1748 public synchronized PListStore getTempDataStore() { 1749 if (tempDataStore == null && !hasStartException()) { 1750 if (!isPersistent()) { 1751 return null; 1752 } 1753 1754 try { 1755 PersistenceAdapter pa = getPersistenceAdapter(); 1756 if( pa!=null && pa instanceof PListStore) { 1757 return (PListStore) pa; 1758 } 1759 } catch (IOException e) { 1760 throw new RuntimeException(e); 1761 } 1762 1763 try { 1764 String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl"; 1765 this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance(); 1766 this.tempDataStore.setDirectory(getTmpDataDirectory()); 1767 configureService(tempDataStore); 1768 } catch (Exception e) { 1769 throw new RuntimeException(e); 1770 } 1771 } 1772 return tempDataStore; 1773 } 1774 1775 /** 1776 * @param tempDataStore 1777 * the tempDataStore to set 1778 */ 1779 public void setTempDataStore(PListStore tempDataStore) { 1780 this.tempDataStore = tempDataStore; 1781 if (tempDataStore != null) { 1782 if (tmpDataDirectory == null) { 1783 tmpDataDirectory = tempDataStore.getDirectory(); 1784 } else if (tempDataStore.getDirectory() == null) { 1785 tempDataStore.setDirectory(tmpDataDirectory); 1786 } 1787 } 1788 configureService(tempDataStore); 1789 } 1790 1791 public int getPersistenceThreadPriority() { 1792 return persistenceThreadPriority; 1793 } 1794 1795 public void setPersistenceThreadPriority(int persistenceThreadPriority) { 1796 this.persistenceThreadPriority = persistenceThreadPriority; 1797 } 1798 1799 /** 1800 * @return the useLocalHostBrokerName 1801 */ 1802 public boolean isUseLocalHostBrokerName() { 1803 return this.useLocalHostBrokerName; 1804 } 1805 1806 /** 1807 * @param useLocalHostBrokerName 1808 * the useLocalHostBrokerName to set 1809 */ 1810 public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) { 1811 this.useLocalHostBrokerName = useLocalHostBrokerName; 1812 if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) { 1813 brokerName = LOCAL_HOST_NAME; 1814 } 1815 } 1816 1817 /** 1818 * Looks up and lazily creates if necessary the destination for the given 1819 * JMS name 1820 */ 1821 public Destination getDestination(ActiveMQDestination destination) throws Exception { 1822 return getBroker().addDestination(getAdminConnectionContext(), destination,false); 1823 } 1824 1825 public void removeDestination(ActiveMQDestination destination) throws Exception { 1826 getBroker().removeDestination(getAdminConnectionContext(), destination, 0); 1827 } 1828 1829 public int getProducerSystemUsagePortion() { 1830 return producerSystemUsagePortion; 1831 } 1832 1833 public void setProducerSystemUsagePortion(int producerSystemUsagePortion) { 1834 this.producerSystemUsagePortion = producerSystemUsagePortion; 1835 } 1836 1837 public int getConsumerSystemUsagePortion() { 1838 return consumerSystemUsagePortion; 1839 } 1840 1841 public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) { 1842 this.consumerSystemUsagePortion = consumerSystemUsagePortion; 1843 } 1844 1845 public boolean isSplitSystemUsageForProducersConsumers() { 1846 return splitSystemUsageForProducersConsumers; 1847 } 1848 1849 public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) { 1850 this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers; 1851 } 1852 1853 public boolean isMonitorConnectionSplits() { 1854 return monitorConnectionSplits; 1855 } 1856 1857 public void setMonitorConnectionSplits(boolean monitorConnectionSplits) { 1858 this.monitorConnectionSplits = monitorConnectionSplits; 1859 } 1860 1861 public int getTaskRunnerPriority() { 1862 return taskRunnerPriority; 1863 } 1864 1865 public void setTaskRunnerPriority(int taskRunnerPriority) { 1866 this.taskRunnerPriority = taskRunnerPriority; 1867 } 1868 1869 public boolean isDedicatedTaskRunner() { 1870 return dedicatedTaskRunner; 1871 } 1872 1873 public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) { 1874 this.dedicatedTaskRunner = dedicatedTaskRunner; 1875 } 1876 1877 public boolean isCacheTempDestinations() { 1878 return cacheTempDestinations; 1879 } 1880 1881 public void setCacheTempDestinations(boolean cacheTempDestinations) { 1882 this.cacheTempDestinations = cacheTempDestinations; 1883 } 1884 1885 public int getTimeBeforePurgeTempDestinations() { 1886 return timeBeforePurgeTempDestinations; 1887 } 1888 1889 public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) { 1890 this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations; 1891 } 1892 1893 public boolean isUseTempMirroredQueues() { 1894 return useTempMirroredQueues; 1895 } 1896 1897 public void setUseTempMirroredQueues(boolean useTempMirroredQueues) { 1898 this.useTempMirroredQueues = useTempMirroredQueues; 1899 } 1900 1901 public synchronized JobSchedulerStore getJobSchedulerStore() { 1902 1903 // If support is off don't allow any scheduler even is user configured their own. 1904 if (!isSchedulerSupport()) { 1905 return null; 1906 } 1907 1908 // If the user configured their own we use it even if persistence is disabled since 1909 // we don't know anything about their implementation. 1910 if (jobSchedulerStore == null && !hasStartException()) { 1911 1912 if (!isPersistent()) { 1913 this.jobSchedulerStore = new InMemoryJobSchedulerStore(); 1914 configureService(jobSchedulerStore); 1915 return this.jobSchedulerStore; 1916 } 1917 1918 try { 1919 PersistenceAdapter pa = getPersistenceAdapter(); 1920 if (pa != null) { 1921 this.jobSchedulerStore = pa.createJobSchedulerStore(); 1922 jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); 1923 configureService(jobSchedulerStore); 1924 return this.jobSchedulerStore; 1925 } 1926 } catch (IOException e) { 1927 throw new RuntimeException(e); 1928 } catch (UnsupportedOperationException ex) { 1929 // It's ok if the store doesn't implement a scheduler. 1930 } catch (Exception e) { 1931 throw new RuntimeException(e); 1932 } 1933 1934 try { 1935 PersistenceAdapter pa = getPersistenceAdapter(); 1936 if (pa != null && pa instanceof JobSchedulerStore) { 1937 this.jobSchedulerStore = (JobSchedulerStore) pa; 1938 configureService(jobSchedulerStore); 1939 return this.jobSchedulerStore; 1940 } 1941 } catch (IOException e) { 1942 throw new RuntimeException(e); 1943 } 1944 1945 // Load the KahaDB store as a last resort, this only works if KahaDB is 1946 // included at runtime, otherwise this will fail. User should disable 1947 // scheduler support if this fails. 1948 try { 1949 String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"; 1950 PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance(); 1951 jobSchedulerStore = adaptor.createJobSchedulerStore(); 1952 jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); 1953 configureService(jobSchedulerStore); 1954 LOG.info("JobScheduler using directory: {}", getSchedulerDirectoryFile()); 1955 } catch (Exception e) { 1956 throw new RuntimeException(e); 1957 } 1958 } 1959 return jobSchedulerStore; 1960 } 1961 1962 public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) { 1963 this.jobSchedulerStore = jobSchedulerStore; 1964 configureService(jobSchedulerStore); 1965 } 1966 1967 // 1968 // Implementation methods 1969 // ------------------------------------------------------------------------- 1970 /** 1971 * Handles any lazy-creation helper properties which are added to make 1972 * things easier to configure inside environments such as Spring 1973 * 1974 * @throws Exception 1975 */ 1976 protected void processHelperProperties() throws Exception { 1977 if (transportConnectorURIs != null) { 1978 for (int i = 0; i < transportConnectorURIs.length; i++) { 1979 String uri = transportConnectorURIs[i]; 1980 addConnector(uri); 1981 } 1982 } 1983 if (networkConnectorURIs != null) { 1984 for (int i = 0; i < networkConnectorURIs.length; i++) { 1985 String uri = networkConnectorURIs[i]; 1986 addNetworkConnector(uri); 1987 } 1988 } 1989 if (jmsBridgeConnectors != null) { 1990 for (int i = 0; i < jmsBridgeConnectors.length; i++) { 1991 addJmsConnector(jmsBridgeConnectors[i]); 1992 } 1993 } 1994 } 1995 1996 /** 1997 * Check that the store usage limit is not greater than max usable 1998 * space and adjust if it is 1999 */ 2000 protected void checkStoreUsageLimits() throws Exception { 2001 final SystemUsage usage = getSystemUsage(); 2002 2003 if (getPersistenceAdapter() != null) { 2004 PersistenceAdapter adapter = getPersistenceAdapter(); 2005 checkUsageLimit(adapter.getDirectory(), usage.getStoreUsage(), usage.getStoreUsage().getPercentLimit()); 2006 2007 long maxJournalFileSize = 0; 2008 long storeLimit = usage.getStoreUsage().getLimit(); 2009 2010 if (adapter instanceof JournaledStore) { 2011 maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength(); 2012 } 2013 2014 if (storeLimit < maxJournalFileSize) { 2015 LOG.error("Store limit is " + storeLimit / (1024 * 1024) + 2016 " mb, whilst the max journal file size for the store is: " + 2017 maxJournalFileSize / (1024 * 1024) + " mb, " + 2018 "the store will not accept any data when used."); 2019 2020 } 2021 } 2022 } 2023 2024 /** 2025 * Check that temporary usage limit is not greater than max usable 2026 * space and adjust if it is 2027 */ 2028 protected void checkTmpStoreUsageLimits() throws Exception { 2029 final SystemUsage usage = getSystemUsage(); 2030 2031 File tmpDir = getTmpDataDirectory(); 2032 2033 if (tmpDir != null) { 2034 checkUsageLimit(tmpDir, usage.getTempUsage(), usage.getTempUsage().getPercentLimit()); 2035 2036 if (isPersistent()) { 2037 long maxJournalFileSize; 2038 2039 PListStore store = usage.getTempUsage().getStore(); 2040 if (store != null && store instanceof JournaledStore) { 2041 maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength(); 2042 } else { 2043 maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH; 2044 } 2045 long storeLimit = usage.getTempUsage().getLimit(); 2046 2047 if (storeLimit < maxJournalFileSize) { 2048 LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) + 2049 " mb, whilst the max journal file size for the temporary store is: " + 2050 maxJournalFileSize / (1024 * 1024) + " mb, " + 2051 "the temp store will not accept any data when used."); 2052 } 2053 } 2054 } 2055 } 2056 2057 protected void checkUsageLimit(File dir, PercentLimitUsage<?> storeUsage, int percentLimit) throws ConfigurationException { 2058 if (dir != null) { 2059 dir = StoreUtil.findParentDirectory(dir); 2060 String storeName = storeUsage instanceof StoreUsage ? "Store" : "Temporary Store"; 2061 long storeLimit = storeUsage.getLimit(); 2062 long storeCurrent = storeUsage.getUsage(); 2063 long totalSpace = storeUsage.getTotal() > 0 ? storeUsage.getTotal() : dir.getTotalSpace(); 2064 long totalUsableSpace = (storeUsage.getTotal() > 0 ? storeUsage.getTotal() : dir.getUsableSpace()) + storeCurrent; 2065 if (totalUsableSpace < 0 || totalSpace < 0) { 2066 final String message = "File system space reported by: " + dir + " was negative, possibly a huge file system, set a sane usage.total to provide some guidance"; 2067 LOG.error(message); 2068 throw new ConfigurationException(message); 2069 } 2070 //compute byte value of the percent limit 2071 long bytePercentLimit = totalSpace * percentLimit / 100; 2072 int oneMeg = 1024 * 1024; 2073 2074 //Check if the store limit is less than the percent Limit that was set and also 2075 //the usable space...this means we can grow the store larger 2076 //Changes in partition size (total space) as well as changes in usable space should 2077 //be detected here 2078 if (diskUsageCheckRegrowThreshold > -1 && percentLimit > 0 2079 && storeUsage.getTotal() == 0 2080 && storeLimit < bytePercentLimit && storeLimit < totalUsableSpace){ 2081 2082 // set the limit to be bytePercentLimit or usableSpace if 2083 // usableSpace is less than the percentLimit 2084 long newLimit = bytePercentLimit > totalUsableSpace ? totalUsableSpace : bytePercentLimit; 2085 2086 //To prevent changing too often, check threshold 2087 if (newLimit - storeLimit >= diskUsageCheckRegrowThreshold) { 2088 LOG.info("Usable disk space has been increased, attempting to regrow " + storeName + " limit to " 2089 + percentLimit + "% of the partition size."); 2090 storeUsage.setLimit(newLimit); 2091 LOG.info(storeName + " limit has been increased to " + newLimit * 100 / totalSpace 2092 + "% (" + newLimit / oneMeg + " mb) of the partition size."); 2093 } 2094 2095 //check if the limit is too large for the amount of usable space 2096 } else if (storeLimit > totalUsableSpace) { 2097 final String message = storeName + " limit is " + storeLimit / oneMeg 2098 + " mb (current store usage is " + storeCurrent / oneMeg 2099 + " mb). The data directory: " + dir.getAbsolutePath() 2100 + " only has " + totalUsableSpace / oneMeg 2101 + " mb of usable space."; 2102 2103 if (!isAdjustUsageLimits()) { 2104 LOG.error(message); 2105 throw new ConfigurationException(message); 2106 } 2107 2108 if (percentLimit > 0) { 2109 LOG.warn(storeName + " limit has been set to " 2110 + percentLimit + "% (" + bytePercentLimit / oneMeg + " mb)" 2111 + " of the partition size but there is not enough usable space." 2112 + " The current store limit (which may have been adjusted by a" 2113 + " previous usage limit check) is set to (" + storeLimit / oneMeg + " mb)" 2114 + " but only " + totalUsableSpace * 100 / totalSpace + "% (" + totalUsableSpace / oneMeg + " mb)" 2115 + " is available - resetting limit"); 2116 } else { 2117 LOG.warn(message + " - resetting to maximum available disk space: " + 2118 totalUsableSpace / oneMeg + " mb"); 2119 } 2120 storeUsage.setLimit(totalUsableSpace); 2121 } 2122 } 2123 } 2124 2125 /** 2126 * Schedules a periodic task based on schedulePeriodForDiskLimitCheck to 2127 * update store and temporary store limits if the amount of available space 2128 * plus current store size is less than the existin configured limit 2129 */ 2130 protected void scheduleDiskUsageLimitsCheck() throws IOException { 2131 if (schedulePeriodForDiskUsageCheck > 0 && 2132 (getPersistenceAdapter() != null || getTmpDataDirectory() != null)) { 2133 Runnable diskLimitCheckTask = new Runnable() { 2134 @Override 2135 public void run() { 2136 try { 2137 checkStoreUsageLimits(); 2138 } catch (Throwable e) { 2139 LOG.error("Failed to check persistent disk usage limits", e); 2140 } 2141 2142 try { 2143 checkTmpStoreUsageLimits(); 2144 } catch (Throwable e) { 2145 LOG.error("Failed to check temporary store usage limits", e); 2146 } 2147 } 2148 }; 2149 scheduler.executePeriodically(diskLimitCheckTask, schedulePeriodForDiskUsageCheck); 2150 } 2151 } 2152 2153 protected void checkMemorySystemUsageLimits() throws Exception { 2154 final SystemUsage usage = getSystemUsage(); 2155 long memLimit = usage.getMemoryUsage().getLimit(); 2156 long jvmLimit = Runtime.getRuntime().maxMemory(); 2157 2158 if (memLimit > jvmLimit) { 2159 final String message = "Memory Usage for the Broker (" + memLimit / (1024 * 1024) 2160 + "mb) is more than the maximum available for the JVM: " + jvmLimit / (1024 * 1024); 2161 2162 if (adjustUsageLimits) { 2163 usage.getMemoryUsage().setPercentOfJvmHeap(70); 2164 LOG.warn(message + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb"); 2165 } else { 2166 LOG.error(message); 2167 throw new ConfigurationException(message); 2168 } 2169 } 2170 } 2171 2172 protected void checkStoreSystemUsageLimits() throws Exception { 2173 final SystemUsage usage = getSystemUsage(); 2174 2175 //Check the persistent store and temp store limits if they exist 2176 //and schedule a periodic check to update disk limits if 2177 //schedulePeriodForDiskLimitCheck is set 2178 checkStoreUsageLimits(); 2179 checkTmpStoreUsageLimits(); 2180 scheduleDiskUsageLimitsCheck(); 2181 2182 if (getJobSchedulerStore() != null) { 2183 JobSchedulerStore scheduler = getJobSchedulerStore(); 2184 File schedulerDir = scheduler.getDirectory(); 2185 if (schedulerDir != null) { 2186 2187 String schedulerDirPath = schedulerDir.getAbsolutePath(); 2188 if (!schedulerDir.isAbsolute()) { 2189 schedulerDir = new File(schedulerDirPath); 2190 } 2191 2192 while (schedulerDir != null && !schedulerDir.isDirectory()) { 2193 schedulerDir = schedulerDir.getParentFile(); 2194 } 2195 long schedulerLimit = usage.getJobSchedulerUsage().getLimit(); 2196 long dirFreeSpace = schedulerDir.getUsableSpace(); 2197 if (schedulerLimit > dirFreeSpace) { 2198 LOG.warn("Job Scheduler Store limit is " + schedulerLimit / (1024 * 1024) + 2199 " mb, whilst the data directory: " + schedulerDir.getAbsolutePath() + 2200 " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to " + 2201 dirFreeSpace / (1024 * 1024) + " mb."); 2202 usage.getJobSchedulerUsage().setLimit(dirFreeSpace); 2203 } 2204 } 2205 } 2206 } 2207 2208 public void stopAllConnectors(ServiceStopper stopper) { 2209 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { 2210 NetworkConnector connector = iter.next(); 2211 unregisterNetworkConnectorMBean(connector); 2212 stopper.stop(connector); 2213 } 2214 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { 2215 ProxyConnector connector = iter.next(); 2216 stopper.stop(connector); 2217 } 2218 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { 2219 JmsConnector connector = iter.next(); 2220 stopper.stop(connector); 2221 } 2222 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { 2223 TransportConnector connector = iter.next(); 2224 try { 2225 unregisterConnectorMBean(connector); 2226 } catch (IOException e) { 2227 } 2228 stopper.stop(connector); 2229 } 2230 } 2231 2232 protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException { 2233 try { 2234 ObjectName objectName = createConnectorObjectName(connector); 2235 connector = connector.asManagedConnector(getManagementContext(), objectName); 2236 ConnectorViewMBean view = new ConnectorView(connector); 2237 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2238 return connector; 2239 } catch (Throwable e) { 2240 throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e, e); 2241 } 2242 } 2243 2244 protected void unregisterConnectorMBean(TransportConnector connector) throws IOException { 2245 if (isUseJmx()) { 2246 try { 2247 ObjectName objectName = createConnectorObjectName(connector); 2248 getManagementContext().unregisterMBean(objectName); 2249 } catch (Throwable e) { 2250 throw IOExceptionSupport.create( 2251 "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e); 2252 } 2253 } 2254 } 2255 2256 protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { 2257 return adaptor; 2258 } 2259 2260 protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { 2261 if (isUseJmx()) {} 2262 } 2263 2264 private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException { 2265 return BrokerMBeanSupport.createConnectorName(getBrokerObjectName(), "clientConnectors", connector.getName()); 2266 } 2267 2268 public void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException { 2269 NetworkConnectorViewMBean view = new NetworkConnectorView(connector); 2270 try { 2271 ObjectName objectName = createNetworkConnectorObjectName(connector); 2272 connector.setObjectName(objectName); 2273 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2274 } catch (Throwable e) { 2275 throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e); 2276 } 2277 } 2278 2279 protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException { 2280 return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName()); 2281 } 2282 2283 public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException { 2284 return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "duplexNetworkConnectors", transport); 2285 } 2286 2287 protected void unregisterNetworkConnectorMBean(NetworkConnector connector) { 2288 if (isUseJmx()) { 2289 try { 2290 ObjectName objectName = createNetworkConnectorObjectName(connector); 2291 getManagementContext().unregisterMBean(objectName); 2292 } catch (Exception e) { 2293 LOG.warn("Network Connector could not be unregistered from JMX due " + e.getMessage() + ". This exception is ignored.", e); 2294 } 2295 } 2296 } 2297 2298 protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException { 2299 ProxyConnectorView view = new ProxyConnectorView(connector); 2300 try { 2301 ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "proxyConnectors", connector.getName()); 2302 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2303 } catch (Throwable e) { 2304 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 2305 } 2306 } 2307 2308 protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException { 2309 JmsConnectorView view = new JmsConnectorView(connector); 2310 try { 2311 ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "jmsConnectors", connector.getName()); 2312 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2313 } catch (Throwable e) { 2314 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 2315 } 2316 } 2317 2318 /** 2319 * Factory method to create a new broker 2320 * 2321 * @throws Exception 2322 */ 2323 protected Broker createBroker() throws Exception { 2324 regionBroker = createRegionBroker(); 2325 Broker broker = addInterceptors(regionBroker); 2326 // Add a filter that will stop access to the broker once stopped 2327 broker = new MutableBrokerFilter(broker) { 2328 Broker old; 2329 2330 @Override 2331 public void stop() throws Exception { 2332 old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) { 2333 // Just ignore additional stop actions. 2334 @Override 2335 public void stop() throws Exception { 2336 } 2337 }); 2338 old.stop(); 2339 } 2340 2341 @Override 2342 public void start() throws Exception { 2343 if (forceStart && old != null) { 2344 this.next.set(old); 2345 } 2346 getNext().start(); 2347 } 2348 }; 2349 return broker; 2350 } 2351 2352 /** 2353 * Factory method to create the core region broker onto which interceptors 2354 * are added 2355 * 2356 * @throws Exception 2357 */ 2358 protected Broker createRegionBroker() throws Exception { 2359 if (destinationInterceptors == null) { 2360 destinationInterceptors = createDefaultDestinationInterceptor(); 2361 } 2362 configureServices(destinationInterceptors); 2363 DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors); 2364 if (destinationFactory == null) { 2365 destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter()); 2366 } 2367 return createRegionBroker(destinationInterceptor); 2368 } 2369 2370 protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException { 2371 RegionBroker regionBroker; 2372 if (isUseJmx()) { 2373 try { 2374 regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(), 2375 getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor()); 2376 } catch(MalformedObjectNameException me){ 2377 LOG.warn("Cannot create ManagedRegionBroker due " + me.getMessage(), me); 2378 throw new IOException(me); 2379 } 2380 } else { 2381 regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, 2382 destinationInterceptor,getScheduler(),getExecutor()); 2383 } 2384 destinationFactory.setRegionBroker(regionBroker); 2385 regionBroker.setKeepDurableSubsActive(keepDurableSubsActive); 2386 regionBroker.setBrokerName(getBrokerName()); 2387 regionBroker.getDestinationStatistics().setEnabled(enableStatistics); 2388 regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend()); 2389 if (brokerId != null) { 2390 regionBroker.setBrokerId(brokerId); 2391 } 2392 return regionBroker; 2393 } 2394 2395 /** 2396 * Create the default destination interceptor 2397 */ 2398 protected DestinationInterceptor[] createDefaultDestinationInterceptor() { 2399 List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>(); 2400 if (isUseVirtualTopics()) { 2401 VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); 2402 VirtualTopic virtualTopic = new VirtualTopic(); 2403 virtualTopic.setName("VirtualTopic.>"); 2404 VirtualDestination[] virtualDestinations = { virtualTopic }; 2405 interceptor.setVirtualDestinations(virtualDestinations); 2406 answer.add(interceptor); 2407 } 2408 if (isUseMirroredQueues()) { 2409 MirroredQueue interceptor = new MirroredQueue(); 2410 answer.add(interceptor); 2411 } 2412 DestinationInterceptor[] array = new DestinationInterceptor[answer.size()]; 2413 answer.toArray(array); 2414 return array; 2415 } 2416 2417 /** 2418 * Strategy method to add interceptors to the broker 2419 * 2420 * @throws IOException 2421 */ 2422 protected Broker addInterceptors(Broker broker) throws Exception { 2423 if (isSchedulerSupport()) { 2424 SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore()); 2425 if (isUseJmx()) { 2426 JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler()); 2427 try { 2428 ObjectName objectName = BrokerMBeanSupport.createJobSchedulerServiceName(getBrokerObjectName()); 2429 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2430 this.adminView.setJMSJobScheduler(objectName); 2431 } catch (Throwable e) { 2432 throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: " 2433 + e.getMessage(), e); 2434 } 2435 } 2436 broker = sb; 2437 } 2438 if (isUseJmx()) { 2439 HealthViewMBean statusView = new HealthView((ManagedRegionBroker)getRegionBroker()); 2440 try { 2441 ObjectName objectName = BrokerMBeanSupport.createHealthServiceName(getBrokerObjectName()); 2442 AnnotatedMBean.registerMBean(getManagementContext(), statusView, objectName); 2443 } catch (Throwable e) { 2444 throw IOExceptionSupport.create("Status MBean could not be registered in JMX: " 2445 + e.getMessage(), e); 2446 } 2447 } 2448 if (isAdvisorySupport()) { 2449 broker = new AdvisoryBroker(broker); 2450 } 2451 broker = new CompositeDestinationBroker(broker); 2452 broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore()); 2453 if (isPopulateJMSXUserID()) { 2454 UserIDBroker userIDBroker = new UserIDBroker(broker); 2455 userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID()); 2456 broker = userIDBroker; 2457 } 2458 if (isMonitorConnectionSplits()) { 2459 broker = new ConnectionSplitBroker(broker); 2460 } 2461 if (plugins != null) { 2462 for (int i = 0; i < plugins.length; i++) { 2463 BrokerPlugin plugin = plugins[i]; 2464 broker = plugin.installPlugin(broker); 2465 } 2466 } 2467 return broker; 2468 } 2469 2470 protected PersistenceAdapter createPersistenceAdapter() throws IOException { 2471 if (isPersistent()) { 2472 PersistenceAdapterFactory fac = getPersistenceFactory(); 2473 if (fac != null) { 2474 return fac.createPersistenceAdapter(); 2475 } else { 2476 try { 2477 String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"; 2478 PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance(); 2479 File dir = new File(getBrokerDataDirectory(),"KahaDB"); 2480 adaptor.setDirectory(dir); 2481 return adaptor; 2482 } catch (Throwable e) { 2483 throw IOExceptionSupport.create(e); 2484 } 2485 } 2486 } else { 2487 return new MemoryPersistenceAdapter(); 2488 } 2489 } 2490 2491 protected ObjectName createBrokerObjectName() throws MalformedObjectNameException { 2492 return BrokerMBeanSupport.createBrokerObjectName(getManagementContext().getJmxDomainName(), getBrokerName()); 2493 } 2494 2495 protected TransportConnector createTransportConnector(URI brokerURI) throws Exception { 2496 TransportServer transport = TransportFactorySupport.bind(this, brokerURI); 2497 return new TransportConnector(transport); 2498 } 2499 2500 /** 2501 * Extracts the port from the options 2502 */ 2503 protected Object getPort(Map<?,?> options) { 2504 Object port = options.get("port"); 2505 if (port == null) { 2506 port = DEFAULT_PORT; 2507 LOG.warn("No port specified so defaulting to: {}", port); 2508 } 2509 return port; 2510 } 2511 2512 protected void addShutdownHook() { 2513 if (useShutdownHook) { 2514 shutdownHook = new Thread("ActiveMQ ShutdownHook") { 2515 @Override 2516 public void run() { 2517 containerShutdown(); 2518 } 2519 }; 2520 Runtime.getRuntime().addShutdownHook(shutdownHook); 2521 } 2522 } 2523 2524 protected void removeShutdownHook() { 2525 if (shutdownHook != null) { 2526 try { 2527 Runtime.getRuntime().removeShutdownHook(shutdownHook); 2528 } catch (Exception e) { 2529 LOG.debug("Caught exception, must be shutting down. This exception is ignored.", e); 2530 } 2531 } 2532 } 2533 2534 /** 2535 * Sets hooks to be executed when broker shut down 2536 * 2537 * @org.apache.xbean.Property 2538 */ 2539 public void setShutdownHooks(List<Runnable> hooks) throws Exception { 2540 for (Runnable hook : hooks) { 2541 addShutdownHook(hook); 2542 } 2543 } 2544 2545 /** 2546 * Causes a clean shutdown of the container when the VM is being shut down 2547 */ 2548 protected void containerShutdown() { 2549 try { 2550 stop(); 2551 } catch (IOException e) { 2552 Throwable linkedException = e.getCause(); 2553 if (linkedException != null) { 2554 logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException); 2555 } else { 2556 logError("Failed to shut down: " + e, e); 2557 } 2558 if (!useLoggingForShutdownErrors) { 2559 e.printStackTrace(System.err); 2560 } 2561 } catch (Exception e) { 2562 logError("Failed to shut down: " + e, e); 2563 } 2564 } 2565 2566 protected void logError(String message, Throwable e) { 2567 if (useLoggingForShutdownErrors) { 2568 LOG.error("Failed to shut down: " + e); 2569 } else { 2570 System.err.println("Failed to shut down: " + e); 2571 } 2572 } 2573 2574 /** 2575 * Starts any configured destinations on startup 2576 */ 2577 protected void startDestinations() throws Exception { 2578 if (destinations != null) { 2579 ConnectionContext adminConnectionContext = getAdminConnectionContext(); 2580 for (int i = 0; i < destinations.length; i++) { 2581 ActiveMQDestination destination = destinations[i]; 2582 getBroker().addDestination(adminConnectionContext, destination,true); 2583 } 2584 } 2585 if (isUseVirtualTopics()) { 2586 startVirtualConsumerDestinations(); 2587 } 2588 } 2589 2590 /** 2591 * Returns the broker's administration connection context used for 2592 * configuring the broker at startup 2593 */ 2594 public ConnectionContext getAdminConnectionContext() throws Exception { 2595 return BrokerSupport.getConnectionContext(getBroker()); 2596 } 2597 2598 protected void startManagementContext() throws Exception { 2599 getManagementContext().setBrokerName(brokerName); 2600 getManagementContext().start(); 2601 adminView = new BrokerView(this, null); 2602 ObjectName objectName = getBrokerObjectName(); 2603 AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName); 2604 } 2605 2606 /** 2607 * Start all transport and network connections, proxies and bridges 2608 * 2609 * @throws Exception 2610 */ 2611 public void startAllConnectors() throws Exception { 2612 Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations(); 2613 List<TransportConnector> al = new ArrayList<TransportConnector>(); 2614 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { 2615 TransportConnector connector = iter.next(); 2616 al.add(startTransportConnector(connector)); 2617 } 2618 if (al.size() > 0) { 2619 // let's clear the transportConnectors list and replace it with 2620 // the started transportConnector instances 2621 this.transportConnectors.clear(); 2622 setTransportConnectors(al); 2623 } 2624 this.slave = false; 2625 if (!stopped.get()) { 2626 ThreadPoolExecutor networkConnectorStartExecutor = null; 2627 if (isNetworkConnectorStartAsync()) { 2628 // spin up as many threads as needed 2629 networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 2630 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 2631 new ThreadFactory() { 2632 int count=0; 2633 @Override 2634 public Thread newThread(Runnable runnable) { 2635 Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++)); 2636 thread.setDaemon(true); 2637 return thread; 2638 } 2639 }); 2640 } 2641 2642 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { 2643 final NetworkConnector connector = iter.next(); 2644 connector.setLocalUri(getVmConnectorURI()); 2645 connector.setBrokerName(getBrokerName()); 2646 connector.setDurableDestinations(durableDestinations); 2647 if (getDefaultSocketURIString() != null) { 2648 connector.setBrokerURL(getDefaultSocketURIString()); 2649 } 2650 if (networkConnectorStartExecutor != null) { 2651 networkConnectorStartExecutor.execute(new Runnable() { 2652 @Override 2653 public void run() { 2654 try { 2655 LOG.info("Async start of {}", connector); 2656 connector.start(); 2657 } catch(Exception e) { 2658 LOG.error("Async start of network connector: {} failed", connector, e); 2659 } 2660 } 2661 }); 2662 } else { 2663 connector.start(); 2664 } 2665 } 2666 if (networkConnectorStartExecutor != null) { 2667 // executor done when enqueued tasks are complete 2668 ThreadPoolUtils.shutdown(networkConnectorStartExecutor); 2669 } 2670 2671 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { 2672 ProxyConnector connector = iter.next(); 2673 connector.start(); 2674 } 2675 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { 2676 JmsConnector connector = iter.next(); 2677 connector.start(); 2678 } 2679 for (Service service : services) { 2680 configureService(service); 2681 service.start(); 2682 } 2683 } 2684 } 2685 2686 public TransportConnector startTransportConnector(TransportConnector connector) throws Exception { 2687 connector.setBrokerService(this); 2688 connector.setTaskRunnerFactory(getTaskRunnerFactory()); 2689 MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy(); 2690 if (policy != null) { 2691 connector.setMessageAuthorizationPolicy(policy); 2692 } 2693 if (isUseJmx()) { 2694 connector = registerConnectorMBean(connector); 2695 } 2696 connector.getStatistics().setEnabled(enableStatistics); 2697 connector.start(); 2698 return connector; 2699 } 2700 2701 /** 2702 * Perform any custom dependency injection 2703 */ 2704 protected void configureServices(Object[] services) { 2705 for (Object service : services) { 2706 configureService(service); 2707 } 2708 } 2709 2710 /** 2711 * Perform any custom dependency injection 2712 */ 2713 protected void configureService(Object service) { 2714 if (service instanceof BrokerServiceAware) { 2715 BrokerServiceAware serviceAware = (BrokerServiceAware) service; 2716 serviceAware.setBrokerService(this); 2717 } 2718 } 2719 2720 public void handleIOException(IOException exception) { 2721 if (ioExceptionHandler != null) { 2722 ioExceptionHandler.handle(exception); 2723 } else { 2724 LOG.info("No IOExceptionHandler registered, ignoring IO exception", exception); 2725 } 2726 } 2727 2728 protected void startVirtualConsumerDestinations() throws Exception { 2729 checkStartException(); 2730 ConnectionContext adminConnectionContext = getAdminConnectionContext(); 2731 Set<ActiveMQDestination> destinations = destinationFactory.getDestinations(); 2732 DestinationFilter filter = getVirtualTopicConsumerDestinationFilter(); 2733 if (!destinations.isEmpty()) { 2734 for (ActiveMQDestination destination : destinations) { 2735 if (filter.matches(destination) == true) { 2736 broker.addDestination(adminConnectionContext, destination, false); 2737 } 2738 } 2739 } 2740 } 2741 2742 private DestinationFilter getVirtualTopicConsumerDestinationFilter() { 2743 // created at startup, so no sync needed 2744 if (virtualConsumerDestinationFilter == null) { 2745 Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>(); 2746 if (destinationInterceptors != null) { 2747 for (DestinationInterceptor interceptor : destinationInterceptors) { 2748 if (interceptor instanceof VirtualDestinationInterceptor) { 2749 VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor; 2750 for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) { 2751 if (virtualDestination instanceof VirtualTopic) { 2752 consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT)); 2753 } 2754 if (isUseVirtualDestSubs()) { 2755 try { 2756 broker.virtualDestinationAdded(getAdminConnectionContext(), virtualDestination); 2757 LOG.debug("Adding virtual destination: {}", virtualDestination); 2758 } catch (Exception e) { 2759 LOG.warn("Could not fire virtual destination consumer advisory", e); 2760 } 2761 } 2762 } 2763 } 2764 } 2765 } 2766 ActiveMQQueue filter = new ActiveMQQueue(); 2767 filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{})); 2768 virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter); 2769 } 2770 return virtualConsumerDestinationFilter; 2771 } 2772 2773 protected synchronized ThreadPoolExecutor getExecutor() { 2774 if (this.executor == null) { 2775 this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 2776 2777 private long i = 0; 2778 2779 @Override 2780 public Thread newThread(Runnable runnable) { 2781 this.i++; 2782 Thread thread = new Thread(runnable, "ActiveMQ BrokerService.worker." + this.i); 2783 thread.setDaemon(true); 2784 thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 2785 @Override 2786 public void uncaughtException(final Thread t, final Throwable e) { 2787 LOG.error("Error in thread '{}'", t.getName(), e); 2788 } 2789 }); 2790 return thread; 2791 } 2792 }, new RejectedExecutionHandler() { 2793 @Override 2794 public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { 2795 try { 2796 executor.getQueue().offer(r, 60, TimeUnit.SECONDS); 2797 } catch (InterruptedException e) { 2798 throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker"); 2799 } 2800 2801 throw new RejectedExecutionException("Timed Out while attempting to enqueue Task."); 2802 } 2803 }); 2804 } 2805 return this.executor; 2806 } 2807 2808 public synchronized Scheduler getScheduler() { 2809 if (this.scheduler==null) { 2810 this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler"); 2811 try { 2812 this.scheduler.start(); 2813 } catch (Exception e) { 2814 LOG.error("Failed to start Scheduler", e); 2815 } 2816 } 2817 return this.scheduler; 2818 } 2819 2820 public Broker getRegionBroker() { 2821 return regionBroker; 2822 } 2823 2824 public void setRegionBroker(Broker regionBroker) { 2825 this.regionBroker = regionBroker; 2826 } 2827 2828 public void addShutdownHook(Runnable hook) { 2829 synchronized (shutdownHooks) { 2830 shutdownHooks.add(hook); 2831 } 2832 } 2833 2834 public void removeShutdownHook(Runnable hook) { 2835 synchronized (shutdownHooks) { 2836 shutdownHooks.remove(hook); 2837 } 2838 } 2839 2840 public boolean isSystemExitOnShutdown() { 2841 return systemExitOnShutdown; 2842 } 2843 2844 /** 2845 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2846 */ 2847 public void setSystemExitOnShutdown(boolean systemExitOnShutdown) { 2848 this.systemExitOnShutdown = systemExitOnShutdown; 2849 } 2850 2851 public int getSystemExitOnShutdownExitCode() { 2852 return systemExitOnShutdownExitCode; 2853 } 2854 2855 public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) { 2856 this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode; 2857 } 2858 2859 public SslContext getSslContext() { 2860 return sslContext; 2861 } 2862 2863 public void setSslContext(SslContext sslContext) { 2864 this.sslContext = sslContext; 2865 } 2866 2867 public boolean isShutdownOnSlaveFailure() { 2868 return shutdownOnSlaveFailure; 2869 } 2870 2871 /** 2872 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2873 */ 2874 public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) { 2875 this.shutdownOnSlaveFailure = shutdownOnSlaveFailure; 2876 } 2877 2878 public boolean isWaitForSlave() { 2879 return waitForSlave; 2880 } 2881 2882 /** 2883 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2884 */ 2885 public void setWaitForSlave(boolean waitForSlave) { 2886 this.waitForSlave = waitForSlave; 2887 } 2888 2889 public long getWaitForSlaveTimeout() { 2890 return this.waitForSlaveTimeout; 2891 } 2892 2893 public void setWaitForSlaveTimeout(long waitForSlaveTimeout) { 2894 this.waitForSlaveTimeout = waitForSlaveTimeout; 2895 } 2896 2897 /** 2898 * Get the passiveSlave 2899 * @return the passiveSlave 2900 */ 2901 public boolean isPassiveSlave() { 2902 return this.passiveSlave; 2903 } 2904 2905 /** 2906 * Set the passiveSlave 2907 * @param passiveSlave the passiveSlave to set 2908 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2909 */ 2910 public void setPassiveSlave(boolean passiveSlave) { 2911 this.passiveSlave = passiveSlave; 2912 } 2913 2914 /** 2915 * override the Default IOException handler, called when persistence adapter 2916 * has experiences File or JDBC I/O Exceptions 2917 * 2918 * @param ioExceptionHandler 2919 */ 2920 public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) { 2921 configureService(ioExceptionHandler); 2922 this.ioExceptionHandler = ioExceptionHandler; 2923 } 2924 2925 public IOExceptionHandler getIoExceptionHandler() { 2926 return ioExceptionHandler; 2927 } 2928 2929 /** 2930 * @return the schedulerSupport 2931 */ 2932 public boolean isSchedulerSupport() { 2933 return this.schedulerSupport; 2934 } 2935 2936 /** 2937 * @param schedulerSupport the schedulerSupport to set 2938 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2939 */ 2940 public void setSchedulerSupport(boolean schedulerSupport) { 2941 this.schedulerSupport = schedulerSupport; 2942 } 2943 2944 /** 2945 * @return the schedulerDirectory 2946 */ 2947 public File getSchedulerDirectoryFile() { 2948 if (this.schedulerDirectoryFile == null) { 2949 this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler"); 2950 } 2951 return schedulerDirectoryFile; 2952 } 2953 2954 /** 2955 * @param schedulerDirectory the schedulerDirectory to set 2956 */ 2957 public void setSchedulerDirectoryFile(File schedulerDirectory) { 2958 this.schedulerDirectoryFile = schedulerDirectory; 2959 } 2960 2961 public void setSchedulerDirectory(String schedulerDirectory) { 2962 setSchedulerDirectoryFile(new File(schedulerDirectory)); 2963 } 2964 2965 public int getSchedulePeriodForDestinationPurge() { 2966 return this.schedulePeriodForDestinationPurge; 2967 } 2968 2969 public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) { 2970 this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge; 2971 } 2972 2973 /** 2974 * @param schedulePeriodForDiskUsageCheck 2975 */ 2976 public void setSchedulePeriodForDiskUsageCheck( 2977 int schedulePeriodForDiskUsageCheck) { 2978 this.schedulePeriodForDiskUsageCheck = schedulePeriodForDiskUsageCheck; 2979 } 2980 2981 public int getDiskUsageCheckRegrowThreshold() { 2982 return diskUsageCheckRegrowThreshold; 2983 } 2984 2985 /** 2986 * @param diskUsageCheckRegrowThreshold 2987 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 2988 */ 2989 public void setDiskUsageCheckRegrowThreshold(int diskUsageCheckRegrowThreshold) { 2990 this.diskUsageCheckRegrowThreshold = diskUsageCheckRegrowThreshold; 2991 } 2992 2993 public int getMaxPurgedDestinationsPerSweep() { 2994 return this.maxPurgedDestinationsPerSweep; 2995 } 2996 2997 public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) { 2998 this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep; 2999 } 3000 3001 public BrokerContext getBrokerContext() { 3002 return brokerContext; 3003 } 3004 3005 public void setBrokerContext(BrokerContext brokerContext) { 3006 this.brokerContext = brokerContext; 3007 } 3008 3009 public void setBrokerId(String brokerId) { 3010 this.brokerId = new BrokerId(brokerId); 3011 } 3012 3013 public boolean isUseAuthenticatedPrincipalForJMSXUserID() { 3014 return useAuthenticatedPrincipalForJMSXUserID; 3015 } 3016 3017 public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) { 3018 this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID; 3019 } 3020 3021 /** 3022 * Should MBeans that support showing the Authenticated User Name information have this 3023 * value filled in or not. 3024 * 3025 * @return true if user names should be exposed in MBeans 3026 */ 3027 public boolean isPopulateUserNameInMBeans() { 3028 return this.populateUserNameInMBeans; 3029 } 3030 3031 /** 3032 * Sets whether Authenticated User Name information is shown in MBeans that support this field. 3033 * @param value if MBeans should expose user name information. 3034 */ 3035 public void setPopulateUserNameInMBeans(boolean value) { 3036 this.populateUserNameInMBeans = value; 3037 } 3038 3039 /** 3040 * Gets the time in Milliseconds that an invocation of an MBean method will wait before 3041 * failing. The default value is to wait forever (zero). 3042 * 3043 * @return timeout in milliseconds before MBean calls fail, (default is 0 or no timeout). 3044 */ 3045 public long getMbeanInvocationTimeout() { 3046 return mbeanInvocationTimeout; 3047 } 3048 3049 /** 3050 * Gets the time in Milliseconds that an invocation of an MBean method will wait before 3051 * failing. The default value is to wait forever (zero). 3052 * 3053 * @param mbeanInvocationTimeout 3054 * timeout in milliseconds before MBean calls fail, (default is 0 or no timeout). 3055 */ 3056 public void setMbeanInvocationTimeout(long mbeanInvocationTimeout) { 3057 this.mbeanInvocationTimeout = mbeanInvocationTimeout; 3058 } 3059 3060 public boolean isNetworkConnectorStartAsync() { 3061 return networkConnectorStartAsync; 3062 } 3063 3064 public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) { 3065 this.networkConnectorStartAsync = networkConnectorStartAsync; 3066 } 3067 3068 public boolean isAllowTempAutoCreationOnSend() { 3069 return allowTempAutoCreationOnSend; 3070 } 3071 3072 /** 3073 * enable if temp destinations need to be propagated through a network when 3074 * advisorySupport==false. This is used in conjunction with the policy 3075 * gcInactiveDestinations for matching temps so they can get removed 3076 * when inactive 3077 * 3078 * @param allowTempAutoCreationOnSend 3079 */ 3080 public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) { 3081 this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend; 3082 } 3083 3084 public long getOfflineDurableSubscriberTimeout() { 3085 return offlineDurableSubscriberTimeout; 3086 } 3087 3088 public void setOfflineDurableSubscriberTimeout(long offlineDurableSubscriberTimeout) { 3089 this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout; 3090 } 3091 3092 public long getOfflineDurableSubscriberTaskSchedule() { 3093 return offlineDurableSubscriberTaskSchedule; 3094 } 3095 3096 public void setOfflineDurableSubscriberTaskSchedule(long offlineDurableSubscriberTaskSchedule) { 3097 this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule; 3098 } 3099 3100 public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) { 3101 return isUseVirtualTopics() && destination.isQueue() && 3102 getVirtualTopicConsumerDestinationFilter().matches(destination); 3103 } 3104 3105 synchronized public Throwable getStartException() { 3106 return startException; 3107 } 3108 3109 public boolean isStartAsync() { 3110 return startAsync; 3111 } 3112 3113 public void setStartAsync(boolean startAsync) { 3114 this.startAsync = startAsync; 3115 } 3116 3117 public boolean isSlave() { 3118 return this.slave; 3119 } 3120 3121 public boolean isStopping() { 3122 return this.stopping.get(); 3123 } 3124 3125 /** 3126 * @return true if the broker allowed to restart on shutdown. 3127 */ 3128 public boolean isRestartAllowed() { 3129 return restartAllowed; 3130 } 3131 3132 /** 3133 * Sets if the broker allowed to restart on shutdown. 3134 */ 3135 public void setRestartAllowed(boolean restartAllowed) { 3136 this.restartAllowed = restartAllowed; 3137 } 3138 3139 /** 3140 * A lifecycle manager of the BrokerService should 3141 * inspect this property after a broker shutdown has occurred 3142 * to find out if the broker needs to be re-created and started 3143 * again. 3144 * 3145 * @return true if the broker wants to be restarted after it shuts down. 3146 */ 3147 public boolean isRestartRequested() { 3148 return restartRequested; 3149 } 3150 3151 public void requestRestart() { 3152 this.restartRequested = true; 3153 } 3154 3155 public int getStoreOpenWireVersion() { 3156 return storeOpenWireVersion; 3157 } 3158 3159 public void setStoreOpenWireVersion(int storeOpenWireVersion) { 3160 this.storeOpenWireVersion = storeOpenWireVersion; 3161 } 3162 3163 /** 3164 * @return the current number of connections on this Broker. 3165 */ 3166 public int getCurrentConnections() { 3167 return this.currentConnections.get(); 3168 } 3169 3170 /** 3171 * @return the total number of connections this broker has handled since startup. 3172 */ 3173 public long getTotalConnections() { 3174 return this.totalConnections.get(); 3175 } 3176 3177 public void incrementCurrentConnections() { 3178 this.currentConnections.incrementAndGet(); 3179 } 3180 3181 public void decrementCurrentConnections() { 3182 this.currentConnections.decrementAndGet(); 3183 } 3184 3185 public void incrementTotalConnections() { 3186 this.totalConnections.incrementAndGet(); 3187 } 3188 3189 public boolean isRejectDurableConsumers() { 3190 return rejectDurableConsumers; 3191 } 3192 3193 public void setRejectDurableConsumers(boolean rejectDurableConsumers) { 3194 this.rejectDurableConsumers = rejectDurableConsumers; 3195 } 3196 3197 public boolean isAdjustUsageLimits() { 3198 return adjustUsageLimits; 3199 } 3200 3201 public void setAdjustUsageLimits(boolean adjustUsageLimits) { 3202 this.adjustUsageLimits = adjustUsageLimits; 3203 } 3204 3205 public void setRollbackOnlyOnAsyncException(boolean rollbackOnlyOnAsyncException) { 3206 this.rollbackOnlyOnAsyncException = rollbackOnlyOnAsyncException; 3207 } 3208 3209 public boolean isRollbackOnlyOnAsyncException() { 3210 return rollbackOnlyOnAsyncException; 3211 } 3212 3213 public boolean isUseVirtualDestSubs() { 3214 return useVirtualDestSubs; 3215 } 3216 3217 public void setUseVirtualDestSubs( 3218 boolean useVirtualDestSubs) { 3219 this.useVirtualDestSubs = useVirtualDestSubs; 3220 } 3221 3222 public boolean isUseVirtualDestSubsOnCreation() { 3223 return useVirtualDestSubsOnCreation; 3224 } 3225 3226 public void setUseVirtualDestSubsOnCreation( 3227 boolean useVirtualDestSubsOnCreation) { 3228 this.useVirtualDestSubsOnCreation = useVirtualDestSubsOnCreation; 3229 } 3230}