001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.net.URI; 020import java.net.URISyntaxException; 021import java.security.AccessController; 022import java.security.PrivilegedAction; 023import java.util.*; 024import java.util.concurrent.RejectedExecutionHandler; 025 026import javax.jms.Connection; 027import javax.jms.ConnectionFactory; 028import javax.jms.ExceptionListener; 029import javax.jms.JMSException; 030import javax.jms.QueueConnection; 031import javax.jms.QueueConnectionFactory; 032import javax.jms.TopicConnection; 033import javax.jms.TopicConnectionFactory; 034import javax.naming.Context; 035 036import org.apache.activemq.blob.BlobTransferPolicy; 037import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; 038import org.apache.activemq.jndi.JNDIBaseStorable; 039import org.apache.activemq.management.JMSStatsImpl; 040import org.apache.activemq.management.StatsCapable; 041import org.apache.activemq.management.StatsImpl; 042import org.apache.activemq.thread.TaskRunnerFactory; 043import org.apache.activemq.transport.Transport; 044import org.apache.activemq.transport.TransportFactory; 045import org.apache.activemq.transport.TransportListener; 046import org.apache.activemq.util.*; 047import org.apache.activemq.util.URISupport.CompositeData; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051/** 052 * A ConnectionFactory is an an Administered object, and is used for creating 053 * Connections. <p/> This class also implements QueueConnectionFactory and 054 * TopicConnectionFactory. You can use this connection to create both 055 * QueueConnections and TopicConnections. 056 * 057 * 058 * @see javax.jms.ConnectionFactory 059 */ 060public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable { 061 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnectionFactory.class); 062 private static final String DEFAULT_BROKER_HOST; 063 private static final int DEFAULT_BROKER_PORT; 064 static{ 065 String host = null; 066 String port = null; 067 try { 068 host = AccessController.doPrivileged(new PrivilegedAction<String>() { 069 @Override 070 public String run() { 071 String result = System.getProperty("org.apache.activemq.AMQ_HOST"); 072 result = (result==null||result.isEmpty()) ? System.getProperty("AMQ_HOST","localhost") : result; 073 return result; 074 } 075 }); 076 port = AccessController.doPrivileged(new PrivilegedAction<String>() { 077 @Override 078 public String run() { 079 String result = System.getProperty("org.apache.activemq.AMQ_PORT"); 080 result = (result==null||result.isEmpty()) ? System.getProperty("AMQ_PORT","61616") : result; 081 return result; 082 } 083 }); 084 }catch(Throwable e){ 085 LOG.debug("Failed to look up System properties for host and port",e); 086 } 087 host = (host == null || host.isEmpty()) ? "localhost" : host; 088 port = (port == null || port.isEmpty()) ? "61616" : port; 089 DEFAULT_BROKER_HOST = host; 090 DEFAULT_BROKER_PORT = Integer.parseInt(port); 091 } 092 093 094 public static final String DEFAULT_BROKER_BIND_URL; 095 096 static{ 097 final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT; 098 String bindURL = null; 099 100 try { 101 bindURL = AccessController.doPrivileged(new PrivilegedAction<String>() { 102 @Override 103 public String run() { 104 String result = System.getProperty("org.apache.activemq.BROKER_BIND_URL"); 105 result = (result==null||result.isEmpty()) ? System.getProperty("BROKER_BIND_URL",defaultURL) : result; 106 return result; 107 } 108 }); 109 }catch(Throwable e){ 110 LOG.debug("Failed to look up System properties for host and port",e); 111 } 112 bindURL = (bindURL == null || bindURL.isEmpty()) ? defaultURL : bindURL; 113 DEFAULT_BROKER_BIND_URL = bindURL; 114 } 115 116 public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL; 117 public static final String DEFAULT_USER = null; 118 public static final String DEFAULT_PASSWORD = null; 119 public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0; 120 121 protected URI brokerURL; 122 protected String userName; 123 protected String password; 124 protected String clientID; 125 protected boolean dispatchAsync=true; 126 protected boolean alwaysSessionAsync=true; 127 128 JMSStatsImpl factoryStats = new JMSStatsImpl(); 129 130 private IdGenerator clientIdGenerator; 131 private String clientIDPrefix; 132 private IdGenerator connectionIdGenerator; 133 private String connectionIDPrefix; 134 135 // client policies 136 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 137 private RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap(); 138 { 139 redeliveryPolicyMap.setDefaultEntry(new RedeliveryPolicy()); 140 } 141 private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy(); 142 private MessageTransformer transformer; 143 144 private boolean disableTimeStampsByDefault; 145 private boolean optimizedMessageDispatch = true; 146 private long optimizeAcknowledgeTimeOut = 300; 147 private long optimizedAckScheduledAckInterval = 0; 148 private boolean copyMessageOnSend = true; 149 private boolean useCompression; 150 private boolean objectMessageSerializationDefered; 151 private boolean useAsyncSend; 152 private boolean optimizeAcknowledge; 153 private int closeTimeout = 15000; 154 private boolean useRetroactiveConsumer; 155 private boolean exclusiveConsumer; 156 private boolean nestedMapAndListEnabled = true; 157 private boolean alwaysSyncSend; 158 private boolean watchTopicAdvisories = true; 159 private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE; 160 private long warnAboutUnstartedConnectionTimeout = 500L; 161 private int sendTimeout = 0; 162 private int connectResponseTimeout = 0; 163 private boolean sendAcksAsync=true; 164 private TransportListener transportListener; 165 private ExceptionListener exceptionListener; 166 private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE; 167 private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT; 168 private boolean useDedicatedTaskRunner; 169 private long consumerFailoverRedeliveryWaitPeriod = 0; 170 private boolean checkForDuplicates = true; 171 private ClientInternalExceptionListener clientInternalExceptionListener; 172 private boolean messagePrioritySupported = false; 173 private boolean transactedIndividualAck = false; 174 private boolean nonBlockingRedelivery = false; 175 private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE; 176 private TaskRunnerFactory sessionTaskRunner; 177 private RejectedExecutionHandler rejectedTaskHandler = null; 178 protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection in sub class 179 private boolean rmIdFromConnectionId = false; 180 private boolean consumerExpiryCheckEnabled = true; 181 private List<String> trustedPackages = Arrays.asList(ClassLoadingAwareObjectInputStream.serializablePackages); 182 private boolean trustAllPackages = false; 183 184 // ///////////////////////////////////////////// 185 // 186 // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods 187 // 188 // ///////////////////////////////////////////// 189 190 public ActiveMQConnectionFactory() { 191 this(DEFAULT_BROKER_URL); 192 } 193 194 public ActiveMQConnectionFactory(String brokerURL) { 195 this(createURI(brokerURL)); 196 } 197 198 public ActiveMQConnectionFactory(URI brokerURL) { 199 setBrokerURL(brokerURL.toString()); 200 } 201 202 public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) { 203 setUserName(userName); 204 setPassword(password); 205 setBrokerURL(brokerURL.toString()); 206 } 207 208 public ActiveMQConnectionFactory(String userName, String password, String brokerURL) { 209 setUserName(userName); 210 setPassword(password); 211 setBrokerURL(brokerURL); 212 } 213 214 /** 215 * Returns a copy of the given connection factory 216 */ 217 public ActiveMQConnectionFactory copy() { 218 try { 219 return (ActiveMQConnectionFactory)super.clone(); 220 } catch (CloneNotSupportedException e) { 221 throw new RuntimeException("This should never happen: " + e, e); 222 } 223 } 224 225 /*boolean* 226 * @param brokerURL 227 * @return 228 * @throws URISyntaxException 229 */ 230 private static URI createURI(String brokerURL) { 231 try { 232 return new URI(brokerURL); 233 } catch (URISyntaxException e) { 234 throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e); 235 } 236 } 237 238 /** 239 * @return Returns the Connection. 240 */ 241 @Override 242 public Connection createConnection() throws JMSException { 243 return createActiveMQConnection(); 244 } 245 246 /** 247 * @return Returns the Connection. 248 */ 249 @Override 250 public Connection createConnection(String userName, String password) throws JMSException { 251 return createActiveMQConnection(userName, password); 252 } 253 254 /** 255 * @return Returns the QueueConnection. 256 * @throws JMSException 257 */ 258 @Override 259 public QueueConnection createQueueConnection() throws JMSException { 260 return createActiveMQConnection().enforceQueueOnlyConnection(); 261 } 262 263 /** 264 * @return Returns the QueueConnection. 265 */ 266 @Override 267 public QueueConnection createQueueConnection(String userName, String password) throws JMSException { 268 return createActiveMQConnection(userName, password).enforceQueueOnlyConnection(); 269 } 270 271 /** 272 * @return Returns the TopicConnection. 273 * @throws JMSException 274 */ 275 @Override 276 public TopicConnection createTopicConnection() throws JMSException { 277 return createActiveMQConnection(); 278 } 279 280 /** 281 * @return Returns the TopicConnection. 282 */ 283 @Override 284 public TopicConnection createTopicConnection(String userName, String password) throws JMSException { 285 return createActiveMQConnection(userName, password); 286 } 287 288 /** 289 * @returns the StatsImpl associated with this ConnectionFactory. 290 */ 291 @Override 292 public StatsImpl getStats() { 293 return this.factoryStats; 294 } 295 296 // ///////////////////////////////////////////// 297 // 298 // Implementation methods. 299 // 300 // ///////////////////////////////////////////// 301 302 protected ActiveMQConnection createActiveMQConnection() throws JMSException { 303 return createActiveMQConnection(userName, password); 304 } 305 306 /** 307 * Creates a Transport based on this object's connection settings. Separated 308 * from createActiveMQConnection to allow for subclasses to override. 309 * 310 * @return The newly created Transport. 311 * @throws JMSException If unable to create trasnport. 312 */ 313 protected Transport createTransport() throws JMSException { 314 try { 315 return TransportFactory.connect(brokerURL); 316 } catch (Exception e) { 317 throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e); 318 } 319 } 320 321 /** 322 * @return Returns the Connection. 323 */ 324 protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { 325 if (brokerURL == null) { 326 throw new ConfigurationException("brokerURL not set."); 327 } 328 ActiveMQConnection connection = null; 329 try { 330 Transport transport = createTransport(); 331 connection = createActiveMQConnection(transport, factoryStats); 332 333 connection.setUserName(userName); 334 connection.setPassword(password); 335 336 configureConnection(connection); 337 338 transport.start(); 339 340 if (clientID != null) { 341 connection.setDefaultClientID(clientID); 342 } 343 344 return connection; 345 } catch (JMSException e) { 346 // Clean up! 347 try { 348 connection.close(); 349 } catch (Throwable ignore) { 350 } 351 throw e; 352 } catch (Exception e) { 353 // Clean up! 354 try { 355 connection.close(); 356 } catch (Throwable ignore) { 357 } 358 throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e); 359 } 360 } 361 362 protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception { 363 ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), 364 getConnectionIdGenerator(), stats); 365 return connection; 366 } 367 368 protected void configureConnection(ActiveMQConnection connection) throws JMSException { 369 connection.setPrefetchPolicy(getPrefetchPolicy()); 370 connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault()); 371 connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch()); 372 connection.setCopyMessageOnSend(isCopyMessageOnSend()); 373 connection.setUseCompression(isUseCompression()); 374 connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered()); 375 connection.setDispatchAsync(isDispatchAsync()); 376 connection.setUseAsyncSend(isUseAsyncSend()); 377 connection.setAlwaysSyncSend(isAlwaysSyncSend()); 378 connection.setAlwaysSessionAsync(isAlwaysSessionAsync()); 379 connection.setOptimizeAcknowledge(isOptimizeAcknowledge()); 380 connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut()); 381 connection.setOptimizedAckScheduledAckInterval(getOptimizedAckScheduledAckInterval()); 382 connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer()); 383 connection.setExclusiveConsumer(isExclusiveConsumer()); 384 connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap()); 385 connection.setTransformer(getTransformer()); 386 connection.setBlobTransferPolicy(getBlobTransferPolicy().copy()); 387 connection.setWatchTopicAdvisories(isWatchTopicAdvisories()); 388 connection.setProducerWindowSize(getProducerWindowSize()); 389 connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout()); 390 connection.setSendTimeout(getSendTimeout()); 391 connection.setCloseTimeout(getCloseTimeout()); 392 connection.setSendAcksAsync(isSendAcksAsync()); 393 connection.setAuditDepth(getAuditDepth()); 394 connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber()); 395 connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner()); 396 connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod()); 397 connection.setCheckForDuplicates(isCheckForDuplicates()); 398 connection.setMessagePrioritySupported(isMessagePrioritySupported()); 399 connection.setTransactedIndividualAck(isTransactedIndividualAck()); 400 connection.setNonBlockingRedelivery(isNonBlockingRedelivery()); 401 connection.setMaxThreadPoolSize(getMaxThreadPoolSize()); 402 connection.setSessionTaskRunner(getSessionTaskRunner()); 403 connection.setRejectedTaskHandler(getRejectedTaskHandler()); 404 connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled()); 405 connection.setRmIdFromConnectionId(isRmIdFromConnectionId()); 406 connection.setConsumerExpiryCheckEnabled(isConsumerExpiryCheckEnabled()); 407 connection.setTrustedPackages(getTrustedPackages()); 408 connection.setTrustAllPackages(isTrustAllPackages()); 409 connection.setConnectResponseTimeout(getConnectResponseTimeout()); 410 if (transportListener != null) { 411 connection.addTransportListener(transportListener); 412 } 413 if (exceptionListener != null) { 414 connection.setExceptionListener(exceptionListener); 415 } 416 if (clientInternalExceptionListener != null) { 417 connection.setClientInternalExceptionListener(clientInternalExceptionListener); 418 } 419 } 420 421 // ///////////////////////////////////////////// 422 // 423 // Property Accessors 424 // 425 // ///////////////////////////////////////////// 426 427 public String getBrokerURL() { 428 return brokerURL == null ? null : brokerURL.toString(); 429 } 430 431 /** 432 * Sets the <a 433 * href="http://activemq.apache.org/configuring-transports.html">connection 434 * URL</a> used to connect to the ActiveMQ broker. 435 */ 436 public void setBrokerURL(String brokerURL) { 437 this.brokerURL = createURI(brokerURL); 438 439 // Use all the properties prefixed with 'jms.' to set the connection 440 // factory 441 // options. 442 if (this.brokerURL.getQuery() != null) { 443 // It might be a standard URI or... 444 try { 445 446 Map<String,String> map = URISupport.parseQuery(this.brokerURL.getQuery()); 447 Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(map, "jms."); 448 if (buildFromMap(jmsOptionsMap)) { 449 if (!jmsOptionsMap.isEmpty()) { 450 String msg = "There are " + jmsOptionsMap.size() 451 + " jms options that couldn't be set on the ConnectionFactory." 452 + " Check the options are spelled correctly." 453 + " Unknown parameters=[" + jmsOptionsMap + "]." 454 + " This connection factory cannot be started."; 455 throw new IllegalArgumentException(msg); 456 } 457 458 this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map); 459 } 460 461 } catch (URISyntaxException e) { 462 } 463 464 } else { 465 466 // It might be a composite URI. 467 try { 468 CompositeData data = URISupport.parseComposite(this.brokerURL); 469 Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(data.getParameters(), "jms."); 470 if (buildFromMap(jmsOptionsMap)) { 471 if (!jmsOptionsMap.isEmpty()) { 472 String msg = "There are " + jmsOptionsMap.size() 473 + " jms options that couldn't be set on the ConnectionFactory." 474 + " Check the options are spelled correctly." 475 + " Unknown parameters=[" + jmsOptionsMap + "]." 476 + " This connection factory cannot be started."; 477 throw new IllegalArgumentException(msg); 478 } 479 480 this.brokerURL = data.toURI(); 481 } 482 } catch (URISyntaxException e) { 483 } 484 } 485 } 486 487 public String getClientID() { 488 return clientID; 489 } 490 491 /** 492 * Sets the JMS clientID to use for the created connection. Note that this 493 * can only be used by one connection at once so generally its a better idea 494 * to set the clientID on a Connection 495 */ 496 public void setClientID(String clientID) { 497 this.clientID = clientID; 498 } 499 500 public boolean isCopyMessageOnSend() { 501 return copyMessageOnSend; 502 } 503 504 /** 505 * Should a JMS message be copied to a new JMS Message object as part of the 506 * send() method in JMS. This is enabled by default to be compliant with the 507 * JMS specification. You can disable it if you do not mutate JMS messages 508 * after they are sent for a performance boost 509 */ 510 public void setCopyMessageOnSend(boolean copyMessageOnSend) { 511 this.copyMessageOnSend = copyMessageOnSend; 512 } 513 514 public boolean isDisableTimeStampsByDefault() { 515 return disableTimeStampsByDefault; 516 } 517 518 /** 519 * Sets whether or not timestamps on messages should be disabled or not. If 520 * you disable them it adds a small performance boost. 521 */ 522 public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) { 523 this.disableTimeStampsByDefault = disableTimeStampsByDefault; 524 } 525 526 public boolean isOptimizedMessageDispatch() { 527 return optimizedMessageDispatch; 528 } 529 530 /** 531 * If this flag is set then an larger prefetch limit is used - only 532 * applicable for durable topic subscribers. 533 */ 534 public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) { 535 this.optimizedMessageDispatch = optimizedMessageDispatch; 536 } 537 538 public String getPassword() { 539 return password; 540 } 541 542 /** 543 * Sets the JMS password used for connections created from this factory 544 */ 545 public void setPassword(String password) { 546 this.password = password; 547 } 548 549 public ActiveMQPrefetchPolicy getPrefetchPolicy() { 550 return prefetchPolicy; 551 } 552 553 /** 554 * Sets the <a 555 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch 556 * policy</a> for consumers created by this connection. 557 */ 558 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { 559 this.prefetchPolicy = prefetchPolicy; 560 } 561 562 public boolean isUseAsyncSend() { 563 return useAsyncSend; 564 } 565 566 public BlobTransferPolicy getBlobTransferPolicy() { 567 return blobTransferPolicy; 568 } 569 570 /** 571 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 572 * OBjects) are transferred from producers to brokers to consumers 573 */ 574 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 575 this.blobTransferPolicy = blobTransferPolicy; 576 } 577 578 /** 579 * Forces the use of <a 580 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which 581 * adds a massive performance boost; but means that the send() method will 582 * return immediately whether the message has been sent or not which could 583 * lead to message loss. 584 */ 585 public void setUseAsyncSend(boolean useAsyncSend) { 586 this.useAsyncSend = useAsyncSend; 587 } 588 589 public synchronized boolean isWatchTopicAdvisories() { 590 return watchTopicAdvisories; 591 } 592 593 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) { 594 this.watchTopicAdvisories = watchTopicAdvisories; 595 } 596 597 /** 598 * @return true if always sync send messages 599 */ 600 public boolean isAlwaysSyncSend() { 601 return this.alwaysSyncSend; 602 } 603 604 /** 605 * Set true if always require messages to be sync sent 606 * 607 * @param alwaysSyncSend 608 */ 609 public void setAlwaysSyncSend(boolean alwaysSyncSend) { 610 this.alwaysSyncSend = alwaysSyncSend; 611 } 612 613 public String getUserName() { 614 return userName; 615 } 616 617 /** 618 * Sets the JMS userName used by connections created by this factory 619 */ 620 public void setUserName(String userName) { 621 this.userName = userName; 622 } 623 624 public boolean isUseRetroactiveConsumer() { 625 return useRetroactiveConsumer; 626 } 627 628 /** 629 * Sets whether or not retroactive consumers are enabled. Retroactive 630 * consumers allow non-durable topic subscribers to receive old messages 631 * that were published before the non-durable subscriber started. 632 */ 633 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) { 634 this.useRetroactiveConsumer = useRetroactiveConsumer; 635 } 636 637 public boolean isExclusiveConsumer() { 638 return exclusiveConsumer; 639 } 640 641 /** 642 * Enables or disables whether or not queue consumers should be exclusive or 643 * not for example to preserve ordering when not using <a 644 * href="http://activemq.apache.org/message-groups.html">Message Groups</a> 645 * 646 * @param exclusiveConsumer 647 */ 648 public void setExclusiveConsumer(boolean exclusiveConsumer) { 649 this.exclusiveConsumer = exclusiveConsumer; 650 } 651 652 public RedeliveryPolicy getRedeliveryPolicy() { 653 return redeliveryPolicyMap.getDefaultEntry(); 654 } 655 656 /** 657 * Sets the global default redelivery policy to be used when a message is delivered 658 * but the session is rolled back 659 */ 660 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 661 this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy); 662 } 663 664 public RedeliveryPolicyMap getRedeliveryPolicyMap() { 665 return this.redeliveryPolicyMap; 666 } 667 668 /** 669 * Sets the global redelivery policy mapping to be used when a message is delivered 670 * but the session is rolled back 671 */ 672 public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) { 673 this.redeliveryPolicyMap = redeliveryPolicyMap; 674 } 675 676 public MessageTransformer getTransformer() { 677 return transformer; 678 } 679 680 /** 681 * @return the sendTimeout (in milliseconds) 682 */ 683 public int getSendTimeout() { 684 return sendTimeout; 685 } 686 687 /** 688 * @param sendTimeout the sendTimeout to set (in milliseconds) 689 */ 690 public void setSendTimeout(int sendTimeout) { 691 this.sendTimeout = sendTimeout; 692 } 693 694 /** 695 * @return the sendAcksAsync 696 */ 697 public boolean isSendAcksAsync() { 698 return sendAcksAsync; 699 } 700 701 /** 702 * @param sendAcksAsync the sendAcksAsync to set 703 */ 704 public void setSendAcksAsync(boolean sendAcksAsync) { 705 this.sendAcksAsync = sendAcksAsync; 706 } 707 708 /** 709 * @return the messagePrioritySupported 710 */ 711 public boolean isMessagePrioritySupported() { 712 return this.messagePrioritySupported; 713 } 714 715 /** 716 * @param messagePrioritySupported the messagePrioritySupported to set 717 */ 718 public void setMessagePrioritySupported(boolean messagePrioritySupported) { 719 this.messagePrioritySupported = messagePrioritySupported; 720 } 721 722 723 /** 724 * Sets the transformer used to transform messages before they are sent on 725 * to the JMS bus or when they are received from the bus but before they are 726 * delivered to the JMS client 727 */ 728 public void setTransformer(MessageTransformer transformer) { 729 this.transformer = transformer; 730 } 731 732 @SuppressWarnings({ "unchecked", "rawtypes" }) 733 @Override 734 public void buildFromProperties(Properties properties) { 735 736 if (properties == null) { 737 properties = new Properties(); 738 } 739 740 String temp = properties.getProperty(Context.PROVIDER_URL); 741 if (temp == null || temp.length() == 0) { 742 temp = properties.getProperty("brokerURL"); 743 } 744 if (temp != null && temp.length() > 0) { 745 setBrokerURL(temp); 746 } 747 748 Map<String, Object> p = new HashMap(properties); 749 buildFromMap(p); 750 } 751 752 public boolean buildFromMap(Map<String, Object> properties) { 753 boolean rc = false; 754 755 ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy(); 756 if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) { 757 setPrefetchPolicy(p); 758 rc = true; 759 } 760 761 RedeliveryPolicy rp = new RedeliveryPolicy(); 762 if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) { 763 setRedeliveryPolicy(rp); 764 rc = true; 765 } 766 767 BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy(); 768 if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) { 769 setBlobTransferPolicy(blobTransferPolicy); 770 rc = true; 771 } 772 773 rc |= IntrospectionSupport.setProperties(this, properties); 774 775 return rc; 776 } 777 778 @Override 779 public void populateProperties(Properties props) { 780 props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync())); 781 782 if (getBrokerURL() != null) { 783 props.setProperty(Context.PROVIDER_URL, getBrokerURL()); 784 props.setProperty("brokerURL", getBrokerURL()); 785 } 786 787 if (getClientID() != null) { 788 props.setProperty("clientID", getClientID()); 789 } 790 791 IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy."); 792 IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy."); 793 IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy."); 794 795 props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend())); 796 props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault())); 797 props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered())); 798 props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch())); 799 800 if (getPassword() != null) { 801 props.setProperty("password", getPassword()); 802 } 803 804 props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend())); 805 props.setProperty("useCompression", Boolean.toString(isUseCompression())); 806 props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer())); 807 props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories())); 808 809 if (getUserName() != null) { 810 props.setProperty("userName", getUserName()); 811 } 812 813 props.setProperty("closeTimeout", Integer.toString(getCloseTimeout())); 814 props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync())); 815 props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge())); 816 props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled())); 817 props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend())); 818 props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize())); 819 props.setProperty("sendTimeout", Integer.toString(getSendTimeout())); 820 props.setProperty("connectResponseTimeout", Integer.toString(getConnectResponseTimeout())); 821 props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync())); 822 props.setProperty("auditDepth", Integer.toString(getAuditDepth())); 823 props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber())); 824 props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates())); 825 props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported())); 826 props.setProperty("transactedIndividualAck", Boolean.toString(isTransactedIndividualAck())); 827 props.setProperty("nonBlockingRedelivery", Boolean.toString(isNonBlockingRedelivery())); 828 props.setProperty("maxThreadPoolSize", Integer.toString(getMaxThreadPoolSize())); 829 props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled())); 830 props.setProperty("consumerFailoverRedeliveryWaitPeriod", Long.toString(getConsumerFailoverRedeliveryWaitPeriod())); 831 props.setProperty("rmIdFromConnectionId", Boolean.toString(isRmIdFromConnectionId())); 832 props.setProperty("consumerExpiryCheckEnabled", Boolean.toString(isConsumerExpiryCheckEnabled())); 833 } 834 835 public boolean isUseCompression() { 836 return useCompression; 837 } 838 839 /** 840 * Enables the use of compression of the message bodies 841 */ 842 public void setUseCompression(boolean useCompression) { 843 this.useCompression = useCompression; 844 } 845 846 public boolean isObjectMessageSerializationDefered() { 847 return objectMessageSerializationDefered; 848 } 849 850 /** 851 * When an object is set on an ObjectMessage, the JMS spec requires the 852 * object to be serialized by that set method. Enabling this flag causes the 853 * object to not get serialized. The object may subsequently get serialized 854 * if the message needs to be sent over a socket or stored to disk. 855 */ 856 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) { 857 this.objectMessageSerializationDefered = objectMessageSerializationDefered; 858 } 859 860 public boolean isDispatchAsync() { 861 return dispatchAsync; 862 } 863 864 /** 865 * Enables or disables the default setting of whether or not consumers have 866 * their messages <a 867 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched 868 * synchronously or asynchronously by the broker</a>. For non-durable 869 * topics for example we typically dispatch synchronously by default to 870 * minimize context switches which boost performance. However sometimes its 871 * better to go slower to ensure that a single blocked consumer socket does 872 * not block delivery to other consumers. 873 * 874 * @param asyncDispatch If true then consumers created on this connection 875 * will default to having their messages dispatched 876 * asynchronously. The default value is true. 877 */ 878 public void setDispatchAsync(boolean asyncDispatch) { 879 this.dispatchAsync = asyncDispatch; 880 } 881 882 /** 883 * @return Returns the closeTimeout. 884 */ 885 public int getCloseTimeout() { 886 return closeTimeout; 887 } 888 889 /** 890 * Sets the timeout before a close is considered complete. Normally a 891 * close() on a connection waits for confirmation from the broker; this 892 * allows that operation to timeout to save the client hanging if there is 893 * no broker 894 */ 895 public void setCloseTimeout(int closeTimeout) { 896 this.closeTimeout = closeTimeout; 897 } 898 899 /** 900 * @return Returns the alwaysSessionAsync. 901 */ 902 public boolean isAlwaysSessionAsync() { 903 return alwaysSessionAsync; 904 } 905 906 /** 907 * If this flag is not set then a separate thread is not used for dispatching messages for each Session in 908 * the Connection. However, a separate thread is always used if there is more than one session, or the session 909 * isn't in auto acknowledge or duplicates ok mode. By default this value is set to true and session dispatch 910 * happens asynchronously. 911 */ 912 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) { 913 this.alwaysSessionAsync = alwaysSessionAsync; 914 } 915 916 /** 917 * @return Returns the optimizeAcknowledge. 918 */ 919 public boolean isOptimizeAcknowledge() { 920 return optimizeAcknowledge; 921 } 922 923 /** 924 * @param optimizeAcknowledge The optimizeAcknowledge to set. 925 */ 926 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) { 927 this.optimizeAcknowledge = optimizeAcknowledge; 928 } 929 930 /** 931 * The max time in milliseconds between optimized ack batches 932 * @param optimizeAcknowledgeTimeOut 933 */ 934 public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) { 935 this.optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut; 936 } 937 938 public long getOptimizeAcknowledgeTimeOut() { 939 return optimizeAcknowledgeTimeOut; 940 } 941 942 public boolean isNestedMapAndListEnabled() { 943 return nestedMapAndListEnabled; 944 } 945 946 /** 947 * Enables/disables whether or not Message properties and MapMessage entries 948 * support <a 949 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested 950 * Structures</a> of Map and List objects 951 */ 952 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) { 953 this.nestedMapAndListEnabled = structuredMapsEnabled; 954 } 955 956 public String getClientIDPrefix() { 957 return clientIDPrefix; 958 } 959 960 /** 961 * Sets the prefix used by autogenerated JMS Client ID values which are used 962 * if the JMS client does not explicitly specify on. 963 * 964 * @param clientIDPrefix 965 */ 966 public void setClientIDPrefix(String clientIDPrefix) { 967 this.clientIDPrefix = clientIDPrefix; 968 } 969 970 protected synchronized IdGenerator getClientIdGenerator() { 971 if (clientIdGenerator == null) { 972 if (clientIDPrefix != null) { 973 clientIdGenerator = new IdGenerator(clientIDPrefix); 974 } else { 975 clientIdGenerator = new IdGenerator(); 976 } 977 } 978 return clientIdGenerator; 979 } 980 981 protected void setClientIdGenerator(IdGenerator clientIdGenerator) { 982 this.clientIdGenerator = clientIdGenerator; 983 } 984 985 /** 986 * Sets the prefix used by connection id generator 987 * @param connectionIDPrefix 988 */ 989 public void setConnectionIDPrefix(String connectionIDPrefix) { 990 this.connectionIDPrefix = connectionIDPrefix; 991 } 992 993 protected synchronized IdGenerator getConnectionIdGenerator() { 994 if (connectionIdGenerator == null) { 995 if (connectionIDPrefix != null) { 996 connectionIdGenerator = new IdGenerator(connectionIDPrefix); 997 } else { 998 connectionIdGenerator = new IdGenerator(); 999 } 1000 } 1001 return connectionIdGenerator; 1002 } 1003 1004 protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) { 1005 this.connectionIdGenerator = connectionIdGenerator; 1006 } 1007 1008 /** 1009 * @return the statsEnabled 1010 */ 1011 public boolean isStatsEnabled() { 1012 return this.factoryStats.isEnabled(); 1013 } 1014 1015 /** 1016 * @param statsEnabled the statsEnabled to set 1017 */ 1018 public void setStatsEnabled(boolean statsEnabled) { 1019 this.factoryStats.setEnabled(statsEnabled); 1020 } 1021 1022 public synchronized int getProducerWindowSize() { 1023 return producerWindowSize; 1024 } 1025 1026 public synchronized void setProducerWindowSize(int producerWindowSize) { 1027 this.producerWindowSize = producerWindowSize; 1028 } 1029 1030 public long getWarnAboutUnstartedConnectionTimeout() { 1031 return warnAboutUnstartedConnectionTimeout; 1032 } 1033 1034 /** 1035 * Enables the timeout from a connection creation to when a warning is 1036 * generated if the connection is not properly started via 1037 * {@link Connection#start()} and a message is received by a consumer. It is 1038 * a very common gotcha to forget to <a 1039 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start 1040 * the connection</a> so this option makes the default case to create a 1041 * warning if the user forgets. To disable the warning just set the value to < 1042 * 0 (say -1). 1043 */ 1044 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) { 1045 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout; 1046 } 1047 1048 public TransportListener getTransportListener() { 1049 return transportListener; 1050 } 1051 1052 /** 1053 * Allows a listener to be configured on the ConnectionFactory so that when this factory is used 1054 * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register 1055 * a transport listener. 1056 * 1057 * @param transportListener sets the listener to be registered on all connections 1058 * created by this factory 1059 */ 1060 public void setTransportListener(TransportListener transportListener) { 1061 this.transportListener = transportListener; 1062 } 1063 1064 1065 public ExceptionListener getExceptionListener() { 1066 return exceptionListener; 1067 } 1068 1069 /** 1070 * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory 1071 * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register 1072 * an exception listener. 1073 * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than 1074 * on connection (as it will be if more than one connection is subsequently created by this connection factory) 1075 * @param exceptionListener sets the exception listener to be registered on all connections 1076 * created by this factory 1077 */ 1078 public void setExceptionListener(ExceptionListener exceptionListener) { 1079 this.exceptionListener = exceptionListener; 1080 } 1081 1082 public int getAuditDepth() { 1083 return auditDepth; 1084 } 1085 1086 public void setAuditDepth(int auditDepth) { 1087 this.auditDepth = auditDepth; 1088 } 1089 1090 public int getAuditMaximumProducerNumber() { 1091 return auditMaximumProducerNumber; 1092 } 1093 1094 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { 1095 this.auditMaximumProducerNumber = auditMaximumProducerNumber; 1096 } 1097 1098 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { 1099 this.useDedicatedTaskRunner = useDedicatedTaskRunner; 1100 } 1101 1102 public boolean isUseDedicatedTaskRunner() { 1103 return useDedicatedTaskRunner; 1104 } 1105 1106 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) { 1107 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod; 1108 } 1109 1110 public long getConsumerFailoverRedeliveryWaitPeriod() { 1111 return consumerFailoverRedeliveryWaitPeriod; 1112 } 1113 1114 public ClientInternalExceptionListener getClientInternalExceptionListener() { 1115 return clientInternalExceptionListener; 1116 } 1117 1118 /** 1119 * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory 1120 * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register 1121 * an exception listener. 1122 * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than 1123 * on connection (as it will be if more than one connection is subsequently created by this connection factory) 1124 * @param clientInternalExceptionListener sets the exception listener to be registered on all connections 1125 * created by this factory 1126 */ 1127 public void setClientInternalExceptionListener( 1128 ClientInternalExceptionListener clientInternalExceptionListener) { 1129 this.clientInternalExceptionListener = clientInternalExceptionListener; 1130 } 1131 1132 /** 1133 * @return the checkForDuplicates 1134 */ 1135 public boolean isCheckForDuplicates() { 1136 return this.checkForDuplicates; 1137 } 1138 1139 /** 1140 * @param checkForDuplicates the checkForDuplicates to set 1141 */ 1142 public void setCheckForDuplicates(boolean checkForDuplicates) { 1143 this.checkForDuplicates = checkForDuplicates; 1144 } 1145 1146 public boolean isTransactedIndividualAck() { 1147 return transactedIndividualAck; 1148 } 1149 1150 /** 1151 * when true, submit individual transacted acks immediately rather than with transaction completion. 1152 * This allows the acks to represent delivery status which can be persisted on rollback 1153 * Used in conjunction with org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter#setRewriteOnRedelivery(boolean) true 1154 */ 1155 public void setTransactedIndividualAck(boolean transactedIndividualAck) { 1156 this.transactedIndividualAck = transactedIndividualAck; 1157 } 1158 1159 1160 public boolean isNonBlockingRedelivery() { 1161 return nonBlockingRedelivery; 1162 } 1163 1164 /** 1165 * When true a MessageConsumer will not stop Message delivery before re-delivering Messages 1166 * from a rolled back transaction. This implies that message order will not be preserved and 1167 * also will result in the TransactedIndividualAck option to be enabled. 1168 */ 1169 public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) { 1170 this.nonBlockingRedelivery = nonBlockingRedelivery; 1171 } 1172 1173 public int getMaxThreadPoolSize() { 1174 return maxThreadPoolSize; 1175 } 1176 1177 public void setMaxThreadPoolSize(int maxThreadPoolSize) { 1178 this.maxThreadPoolSize = maxThreadPoolSize; 1179 } 1180 1181 public TaskRunnerFactory getSessionTaskRunner() { 1182 return sessionTaskRunner; 1183 } 1184 1185 public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) { 1186 this.sessionTaskRunner = sessionTaskRunner; 1187 } 1188 1189 public RejectedExecutionHandler getRejectedTaskHandler() { 1190 return rejectedTaskHandler; 1191 } 1192 1193 public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) { 1194 this.rejectedTaskHandler = rejectedTaskHandler; 1195 } 1196 1197 /** 1198 * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled 1199 * to send an ack for any outstanding Message Acks. By default this value is set to zero meaning that the consumers 1200 * will not do any background Message acknowledgment. 1201 * 1202 * @return the scheduledOptimizedAckInterval 1203 */ 1204 public long getOptimizedAckScheduledAckInterval() { 1205 return optimizedAckScheduledAckInterval; 1206 } 1207 1208 /** 1209 * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that 1210 * have been configured with optimizeAcknowledge enabled. 1211 * 1212 * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set 1213 */ 1214 public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) { 1215 this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval; 1216 } 1217 1218 1219 public boolean isRmIdFromConnectionId() { 1220 return rmIdFromConnectionId; 1221 } 1222 1223 /** 1224 * uses the connection id as the resource identity for XAResource.isSameRM 1225 * ensuring join will only occur on a single connection 1226 */ 1227 public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) { 1228 this.rmIdFromConnectionId = rmIdFromConnectionId; 1229 } 1230 1231 /** 1232 * @return true if MessageConsumer instance will check for expired messages before dispatch. 1233 */ 1234 public boolean isConsumerExpiryCheckEnabled() { 1235 return consumerExpiryCheckEnabled; 1236 } 1237 1238 /** 1239 * Controls whether message expiration checking is done in each MessageConsumer 1240 * prior to dispatching a message. Disabling this check can lead to consumption 1241 * of expired messages. 1242 * 1243 * @param consumerExpiryCheckEnabled 1244 * controls whether expiration checking is done prior to dispatch. 1245 */ 1246 public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) { 1247 this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled; 1248 } 1249 1250 public List<String> getTrustedPackages() { 1251 return trustedPackages; 1252 } 1253 1254 public void setTrustedPackages(List<String> trustedPackages) { 1255 this.trustedPackages = trustedPackages; 1256 } 1257 1258 public boolean isTrustAllPackages() { 1259 return trustAllPackages; 1260 } 1261 1262 public void setTrustAllPackages(boolean trustAllPackages) { 1263 this.trustAllPackages = trustAllPackages; 1264 } 1265 1266 public int getConnectResponseTimeout() { 1267 return connectResponseTimeout; 1268 } 1269 1270 public void setConnectResponseTimeout(int connectResponseTimeout) { 1271 this.connectResponseTimeout = connectResponseTimeout; 1272 } 1273}