001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.net.URI;
020import java.net.URISyntaxException;
021import java.security.AccessController;
022import java.security.PrivilegedAction;
023import java.util.*;
024import java.util.concurrent.RejectedExecutionHandler;
025
026import javax.jms.Connection;
027import javax.jms.ConnectionFactory;
028import javax.jms.ExceptionListener;
029import javax.jms.JMSException;
030import javax.jms.QueueConnection;
031import javax.jms.QueueConnectionFactory;
032import javax.jms.TopicConnection;
033import javax.jms.TopicConnectionFactory;
034import javax.naming.Context;
035
036import org.apache.activemq.blob.BlobTransferPolicy;
037import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
038import org.apache.activemq.jndi.JNDIBaseStorable;
039import org.apache.activemq.management.JMSStatsImpl;
040import org.apache.activemq.management.StatsCapable;
041import org.apache.activemq.management.StatsImpl;
042import org.apache.activemq.thread.TaskRunnerFactory;
043import org.apache.activemq.transport.Transport;
044import org.apache.activemq.transport.TransportFactory;
045import org.apache.activemq.transport.TransportListener;
046import org.apache.activemq.util.*;
047import org.apache.activemq.util.URISupport.CompositeData;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * A ConnectionFactory is an an Administered object, and is used for creating
053 * Connections. <p/> This class also implements QueueConnectionFactory and
054 * TopicConnectionFactory. You can use this connection to create both
055 * QueueConnections and TopicConnections.
056 *
057 *
058 * @see javax.jms.ConnectionFactory
059 */
060public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable {
061    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnectionFactory.class);
062    private static final String DEFAULT_BROKER_HOST;
063    private static final int DEFAULT_BROKER_PORT;
064    static{
065        String host = null;
066        String port = null;
067        try {
068             host = AccessController.doPrivileged(new PrivilegedAction<String>() {
069                 @Override
070                 public String run() {
071                     String result = System.getProperty("org.apache.activemq.AMQ_HOST");
072                     result = (result==null||result.isEmpty()) ?  System.getProperty("AMQ_HOST","localhost") : result;
073                     return result;
074                 }
075             });
076             port = AccessController.doPrivileged(new PrivilegedAction<String>() {
077                 @Override
078                 public String run() {
079                     String result = System.getProperty("org.apache.activemq.AMQ_PORT");
080                     result = (result==null||result.isEmpty()) ?  System.getProperty("AMQ_PORT","61616") : result;
081                     return result;
082                 }
083             });
084        }catch(Throwable e){
085            LOG.debug("Failed to look up System properties for host and port",e);
086        }
087        host = (host == null || host.isEmpty()) ? "localhost" : host;
088        port = (port == null || port.isEmpty()) ? "61616" : port;
089        DEFAULT_BROKER_HOST = host;
090        DEFAULT_BROKER_PORT = Integer.parseInt(port);
091    }
092
093
094    public static final String DEFAULT_BROKER_BIND_URL;
095
096    static{
097        final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
098        String bindURL = null;
099
100        try {
101            bindURL = AccessController.doPrivileged(new PrivilegedAction<String>() {
102                @Override
103                public String run() {
104                    String result = System.getProperty("org.apache.activemq.BROKER_BIND_URL");
105                    result = (result==null||result.isEmpty()) ?  System.getProperty("BROKER_BIND_URL",defaultURL) : result;
106                    return result;
107                }
108            });
109        }catch(Throwable e){
110            LOG.debug("Failed to look up System properties for host and port",e);
111        }
112        bindURL = (bindURL == null || bindURL.isEmpty()) ? defaultURL : bindURL;
113        DEFAULT_BROKER_BIND_URL = bindURL;
114    }
115
116    public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL;
117    public static final String DEFAULT_USER = null;
118    public static final String DEFAULT_PASSWORD = null;
119    public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
120
121    protected URI brokerURL;
122    protected String userName;
123    protected String password;
124    protected String clientID;
125    protected boolean dispatchAsync=true;
126    protected boolean alwaysSessionAsync=true;
127
128    JMSStatsImpl factoryStats = new JMSStatsImpl();
129
130    private IdGenerator clientIdGenerator;
131    private String clientIDPrefix;
132    private IdGenerator connectionIdGenerator;
133    private String connectionIDPrefix;
134
135    // client policies
136    private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
137    private RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
138    {
139        redeliveryPolicyMap.setDefaultEntry(new RedeliveryPolicy());
140    }
141    private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
142    private MessageTransformer transformer;
143
144    private boolean disableTimeStampsByDefault;
145    private boolean optimizedMessageDispatch = true;
146    private long optimizeAcknowledgeTimeOut = 300;
147    private long optimizedAckScheduledAckInterval = 0;
148    private boolean copyMessageOnSend = true;
149    private boolean useCompression;
150    private boolean objectMessageSerializationDefered;
151    private boolean useAsyncSend;
152    private boolean optimizeAcknowledge;
153    private int closeTimeout = 15000;
154    private boolean useRetroactiveConsumer;
155    private boolean exclusiveConsumer;
156    private boolean nestedMapAndListEnabled = true;
157    private boolean alwaysSyncSend;
158    private boolean watchTopicAdvisories = true;
159    private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
160    private long warnAboutUnstartedConnectionTimeout = 500L;
161    private int sendTimeout = 0;
162    private int connectResponseTimeout = 0;
163    private boolean sendAcksAsync=true;
164    private TransportListener transportListener;
165    private ExceptionListener exceptionListener;
166    private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
167    private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
168    private boolean useDedicatedTaskRunner;
169    private long consumerFailoverRedeliveryWaitPeriod = 0;
170    private boolean checkForDuplicates = true;
171    private ClientInternalExceptionListener clientInternalExceptionListener;
172    private boolean messagePrioritySupported = false;
173    private boolean transactedIndividualAck = false;
174    private boolean nonBlockingRedelivery = false;
175    private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE;
176    private TaskRunnerFactory sessionTaskRunner;
177    private RejectedExecutionHandler rejectedTaskHandler = null;
178    protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection in sub class
179    private boolean rmIdFromConnectionId = false;
180    private boolean consumerExpiryCheckEnabled = true;
181    private List<String> trustedPackages = Arrays.asList(ClassLoadingAwareObjectInputStream.serializablePackages);
182    private boolean trustAllPackages = false;
183
184    // /////////////////////////////////////////////
185    //
186    // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods
187    //
188    // /////////////////////////////////////////////
189
190    public ActiveMQConnectionFactory() {
191        this(DEFAULT_BROKER_URL);
192    }
193
194    public ActiveMQConnectionFactory(String brokerURL) {
195        this(createURI(brokerURL));
196    }
197
198    public ActiveMQConnectionFactory(URI brokerURL) {
199        setBrokerURL(brokerURL.toString());
200    }
201
202    public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {
203        setUserName(userName);
204        setPassword(password);
205        setBrokerURL(brokerURL.toString());
206    }
207
208    public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
209        setUserName(userName);
210        setPassword(password);
211        setBrokerURL(brokerURL);
212    }
213
214    /**
215     * Returns a copy of the given connection factory
216     */
217    public ActiveMQConnectionFactory copy() {
218        try {
219            return (ActiveMQConnectionFactory)super.clone();
220        } catch (CloneNotSupportedException e) {
221            throw new RuntimeException("This should never happen: " + e, e);
222        }
223    }
224
225    /*boolean*
226     * @param brokerURL
227     * @return
228     * @throws URISyntaxException
229     */
230    private static URI createURI(String brokerURL) {
231        try {
232            return new URI(brokerURL);
233        } catch (URISyntaxException e) {
234            throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e);
235        }
236    }
237
238    /**
239     * @return Returns the Connection.
240     */
241    @Override
242    public Connection createConnection() throws JMSException {
243        return createActiveMQConnection();
244    }
245
246    /**
247     * @return Returns the Connection.
248     */
249    @Override
250    public Connection createConnection(String userName, String password) throws JMSException {
251        return createActiveMQConnection(userName, password);
252    }
253
254    /**
255     * @return Returns the QueueConnection.
256     * @throws JMSException
257     */
258    @Override
259    public QueueConnection createQueueConnection() throws JMSException {
260        return createActiveMQConnection().enforceQueueOnlyConnection();
261    }
262
263    /**
264     * @return Returns the QueueConnection.
265     */
266    @Override
267    public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
268        return createActiveMQConnection(userName, password).enforceQueueOnlyConnection();
269    }
270
271    /**
272     * @return Returns the TopicConnection.
273     * @throws JMSException
274     */
275    @Override
276    public TopicConnection createTopicConnection() throws JMSException {
277        return createActiveMQConnection();
278    }
279
280    /**
281     * @return Returns the TopicConnection.
282     */
283    @Override
284    public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
285        return createActiveMQConnection(userName, password);
286    }
287
288    /**
289     * @returns the StatsImpl associated with this ConnectionFactory.
290     */
291    @Override
292    public StatsImpl getStats() {
293        return this.factoryStats;
294    }
295
296    // /////////////////////////////////////////////
297    //
298    // Implementation methods.
299    //
300    // /////////////////////////////////////////////
301
302    protected ActiveMQConnection createActiveMQConnection() throws JMSException {
303        return createActiveMQConnection(userName, password);
304    }
305
306    /**
307     * Creates a Transport based on this object's connection settings. Separated
308     * from createActiveMQConnection to allow for subclasses to override.
309     *
310     * @return The newly created Transport.
311     * @throws JMSException If unable to create trasnport.
312     */
313    protected Transport createTransport() throws JMSException {
314        try {
315            return TransportFactory.connect(brokerURL);
316        } catch (Exception e) {
317            throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
318        }
319    }
320
321    /**
322     * @return Returns the Connection.
323     */
324    protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
325        if (brokerURL == null) {
326            throw new ConfigurationException("brokerURL not set.");
327        }
328        ActiveMQConnection connection = null;
329        try {
330            Transport transport = createTransport();
331            connection = createActiveMQConnection(transport, factoryStats);
332
333            connection.setUserName(userName);
334            connection.setPassword(password);
335
336            configureConnection(connection);
337
338            transport.start();
339
340            if (clientID != null) {
341                connection.setDefaultClientID(clientID);
342            }
343
344            return connection;
345        } catch (JMSException e) {
346            // Clean up!
347            try {
348                connection.close();
349            } catch (Throwable ignore) {
350            }
351            throw e;
352        } catch (Exception e) {
353            // Clean up!
354            try {
355                connection.close();
356            } catch (Throwable ignore) {
357            }
358            throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
359        }
360    }
361
362    protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
363        ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(),
364                getConnectionIdGenerator(), stats);
365        return connection;
366    }
367
368    protected void configureConnection(ActiveMQConnection connection) throws JMSException {
369        connection.setPrefetchPolicy(getPrefetchPolicy());
370        connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
371        connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
372        connection.setCopyMessageOnSend(isCopyMessageOnSend());
373        connection.setUseCompression(isUseCompression());
374        connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
375        connection.setDispatchAsync(isDispatchAsync());
376        connection.setUseAsyncSend(isUseAsyncSend());
377        connection.setAlwaysSyncSend(isAlwaysSyncSend());
378        connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
379        connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
380        connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut());
381        connection.setOptimizedAckScheduledAckInterval(getOptimizedAckScheduledAckInterval());
382        connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
383        connection.setExclusiveConsumer(isExclusiveConsumer());
384        connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap());
385        connection.setTransformer(getTransformer());
386        connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
387        connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
388        connection.setProducerWindowSize(getProducerWindowSize());
389        connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
390        connection.setSendTimeout(getSendTimeout());
391        connection.setCloseTimeout(getCloseTimeout());
392        connection.setSendAcksAsync(isSendAcksAsync());
393        connection.setAuditDepth(getAuditDepth());
394        connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
395        connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
396        connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
397        connection.setCheckForDuplicates(isCheckForDuplicates());
398        connection.setMessagePrioritySupported(isMessagePrioritySupported());
399        connection.setTransactedIndividualAck(isTransactedIndividualAck());
400        connection.setNonBlockingRedelivery(isNonBlockingRedelivery());
401        connection.setMaxThreadPoolSize(getMaxThreadPoolSize());
402        connection.setSessionTaskRunner(getSessionTaskRunner());
403        connection.setRejectedTaskHandler(getRejectedTaskHandler());
404        connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled());
405        connection.setRmIdFromConnectionId(isRmIdFromConnectionId());
406        connection.setConsumerExpiryCheckEnabled(isConsumerExpiryCheckEnabled());
407        connection.setTrustedPackages(getTrustedPackages());
408        connection.setTrustAllPackages(isTrustAllPackages());
409        connection.setConnectResponseTimeout(getConnectResponseTimeout());
410        if (transportListener != null) {
411            connection.addTransportListener(transportListener);
412        }
413        if (exceptionListener != null) {
414            connection.setExceptionListener(exceptionListener);
415        }
416        if (clientInternalExceptionListener != null) {
417            connection.setClientInternalExceptionListener(clientInternalExceptionListener);
418        }
419    }
420
421    // /////////////////////////////////////////////
422    //
423    // Property Accessors
424    //
425    // /////////////////////////////////////////////
426
427        public String getBrokerURL() {
428        return brokerURL == null ? null : brokerURL.toString();
429    }
430
431    /**
432     * Sets the <a
433     * href="http://activemq.apache.org/configuring-transports.html">connection
434     * URL</a> used to connect to the ActiveMQ broker.
435     */
436    public void setBrokerURL(String brokerURL) {
437        this.brokerURL = createURI(brokerURL);
438
439        // Use all the properties prefixed with 'jms.' to set the connection
440        // factory
441        // options.
442        if (this.brokerURL.getQuery() != null) {
443            // It might be a standard URI or...
444            try {
445
446                Map<String,String> map = URISupport.parseQuery(this.brokerURL.getQuery());
447                Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(map, "jms.");
448                if (buildFromMap(jmsOptionsMap)) {
449                    if (!jmsOptionsMap.isEmpty()) {
450                        String msg = "There are " + jmsOptionsMap.size()
451                            + " jms options that couldn't be set on the ConnectionFactory."
452                            + " Check the options are spelled correctly."
453                            + " Unknown parameters=[" + jmsOptionsMap + "]."
454                            + " This connection factory cannot be started.";
455                        throw new IllegalArgumentException(msg);
456                    }
457
458                    this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map);
459                }
460
461            } catch (URISyntaxException e) {
462            }
463
464        } else {
465
466            // It might be a composite URI.
467            try {
468                CompositeData data = URISupport.parseComposite(this.brokerURL);
469                Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(data.getParameters(), "jms.");
470                if (buildFromMap(jmsOptionsMap)) {
471                    if (!jmsOptionsMap.isEmpty()) {
472                        String msg = "There are " + jmsOptionsMap.size()
473                            + " jms options that couldn't be set on the ConnectionFactory."
474                            + " Check the options are spelled correctly."
475                            + " Unknown parameters=[" + jmsOptionsMap + "]."
476                            + " This connection factory cannot be started.";
477                        throw new IllegalArgumentException(msg);
478                    }
479
480                    this.brokerURL = data.toURI();
481                }
482            } catch (URISyntaxException e) {
483            }
484        }
485    }
486
487    public String getClientID() {
488        return clientID;
489    }
490
491    /**
492     * Sets the JMS clientID to use for the created connection. Note that this
493     * can only be used by one connection at once so generally its a better idea
494     * to set the clientID on a Connection
495     */
496    public void setClientID(String clientID) {
497        this.clientID = clientID;
498    }
499
500    public boolean isCopyMessageOnSend() {
501        return copyMessageOnSend;
502    }
503
504    /**
505     * Should a JMS message be copied to a new JMS Message object as part of the
506     * send() method in JMS. This is enabled by default to be compliant with the
507     * JMS specification. You can disable it if you do not mutate JMS messages
508     * after they are sent for a performance boost
509     */
510    public void setCopyMessageOnSend(boolean copyMessageOnSend) {
511        this.copyMessageOnSend = copyMessageOnSend;
512    }
513
514    public boolean isDisableTimeStampsByDefault() {
515        return disableTimeStampsByDefault;
516    }
517
518    /**
519     * Sets whether or not timestamps on messages should be disabled or not. If
520     * you disable them it adds a small performance boost.
521     */
522    public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
523        this.disableTimeStampsByDefault = disableTimeStampsByDefault;
524    }
525
526    public boolean isOptimizedMessageDispatch() {
527        return optimizedMessageDispatch;
528    }
529
530    /**
531     * If this flag is set then an larger prefetch limit is used - only
532     * applicable for durable topic subscribers.
533     */
534    public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
535        this.optimizedMessageDispatch = optimizedMessageDispatch;
536    }
537
538    public String getPassword() {
539        return password;
540    }
541
542    /**
543     * Sets the JMS password used for connections created from this factory
544     */
545    public void setPassword(String password) {
546        this.password = password;
547    }
548
549    public ActiveMQPrefetchPolicy getPrefetchPolicy() {
550        return prefetchPolicy;
551    }
552
553    /**
554     * Sets the <a
555     * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
556     * policy</a> for consumers created by this connection.
557     */
558    public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
559        this.prefetchPolicy = prefetchPolicy;
560    }
561
562    public boolean isUseAsyncSend() {
563        return useAsyncSend;
564    }
565
566    public BlobTransferPolicy getBlobTransferPolicy() {
567        return blobTransferPolicy;
568    }
569
570    /**
571     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
572     * OBjects) are transferred from producers to brokers to consumers
573     */
574    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
575        this.blobTransferPolicy = blobTransferPolicy;
576    }
577
578    /**
579     * Forces the use of <a
580     * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
581     * adds a massive performance boost; but means that the send() method will
582     * return immediately whether the message has been sent or not which could
583     * lead to message loss.
584     */
585    public void setUseAsyncSend(boolean useAsyncSend) {
586        this.useAsyncSend = useAsyncSend;
587    }
588
589    public synchronized boolean isWatchTopicAdvisories() {
590        return watchTopicAdvisories;
591    }
592
593    public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
594        this.watchTopicAdvisories = watchTopicAdvisories;
595    }
596
597    /**
598     * @return true if always sync send messages
599     */
600    public boolean isAlwaysSyncSend() {
601        return this.alwaysSyncSend;
602    }
603
604    /**
605     * Set true if always require messages to be sync sent
606     *
607     * @param alwaysSyncSend
608     */
609    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
610        this.alwaysSyncSend = alwaysSyncSend;
611    }
612
613    public String getUserName() {
614        return userName;
615    }
616
617    /**
618     * Sets the JMS userName used by connections created by this factory
619     */
620    public void setUserName(String userName) {
621        this.userName = userName;
622    }
623
624    public boolean isUseRetroactiveConsumer() {
625        return useRetroactiveConsumer;
626    }
627
628    /**
629     * Sets whether or not retroactive consumers are enabled. Retroactive
630     * consumers allow non-durable topic subscribers to receive old messages
631     * that were published before the non-durable subscriber started.
632     */
633    public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
634        this.useRetroactiveConsumer = useRetroactiveConsumer;
635    }
636
637    public boolean isExclusiveConsumer() {
638        return exclusiveConsumer;
639    }
640
641    /**
642     * Enables or disables whether or not queue consumers should be exclusive or
643     * not for example to preserve ordering when not using <a
644     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
645     *
646     * @param exclusiveConsumer
647     */
648    public void setExclusiveConsumer(boolean exclusiveConsumer) {
649        this.exclusiveConsumer = exclusiveConsumer;
650    }
651
652    public RedeliveryPolicy getRedeliveryPolicy() {
653        return redeliveryPolicyMap.getDefaultEntry();
654    }
655
656    /**
657     * Sets the global default redelivery policy to be used when a message is delivered
658     * but the session is rolled back
659     */
660    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
661        this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
662    }
663
664    public RedeliveryPolicyMap getRedeliveryPolicyMap() {
665        return this.redeliveryPolicyMap;
666    }
667
668    /**
669     * Sets the global redelivery policy mapping to be used when a message is delivered
670     * but the session is rolled back
671     */
672    public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
673        this.redeliveryPolicyMap = redeliveryPolicyMap;
674    }
675
676    public MessageTransformer getTransformer() {
677        return transformer;
678    }
679
680    /**
681     * @return the sendTimeout (in milliseconds)
682     */
683    public int getSendTimeout() {
684        return sendTimeout;
685    }
686
687    /**
688     * @param sendTimeout the sendTimeout to set (in milliseconds)
689     */
690    public void setSendTimeout(int sendTimeout) {
691        this.sendTimeout = sendTimeout;
692    }
693
694    /**
695     * @return the sendAcksAsync
696     */
697    public boolean isSendAcksAsync() {
698        return sendAcksAsync;
699    }
700
701    /**
702     * @param sendAcksAsync the sendAcksAsync to set
703     */
704    public void setSendAcksAsync(boolean sendAcksAsync) {
705        this.sendAcksAsync = sendAcksAsync;
706    }
707
708    /**
709     * @return the messagePrioritySupported
710     */
711    public boolean isMessagePrioritySupported() {
712        return this.messagePrioritySupported;
713    }
714
715    /**
716     * @param messagePrioritySupported the messagePrioritySupported to set
717     */
718    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
719        this.messagePrioritySupported = messagePrioritySupported;
720    }
721
722
723    /**
724     * Sets the transformer used to transform messages before they are sent on
725     * to the JMS bus or when they are received from the bus but before they are
726     * delivered to the JMS client
727     */
728    public void setTransformer(MessageTransformer transformer) {
729        this.transformer = transformer;
730    }
731
732    @SuppressWarnings({ "unchecked", "rawtypes" })
733    @Override
734    public void buildFromProperties(Properties properties) {
735
736        if (properties == null) {
737            properties = new Properties();
738        }
739
740        String temp = properties.getProperty(Context.PROVIDER_URL);
741        if (temp == null || temp.length() == 0) {
742            temp = properties.getProperty("brokerURL");
743        }
744        if (temp != null && temp.length() > 0) {
745            setBrokerURL(temp);
746        }
747
748        Map<String, Object> p = new HashMap(properties);
749        buildFromMap(p);
750    }
751
752    public boolean buildFromMap(Map<String, Object> properties) {
753        boolean rc = false;
754
755        ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();
756        if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) {
757            setPrefetchPolicy(p);
758            rc = true;
759        }
760
761        RedeliveryPolicy rp = new RedeliveryPolicy();
762        if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) {
763            setRedeliveryPolicy(rp);
764            rc = true;
765        }
766
767        BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
768        if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) {
769            setBlobTransferPolicy(blobTransferPolicy);
770            rc = true;
771        }
772
773        rc |= IntrospectionSupport.setProperties(this, properties);
774
775        return rc;
776    }
777
778    @Override
779    public void populateProperties(Properties props) {
780        props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync()));
781
782        if (getBrokerURL() != null) {
783            props.setProperty(Context.PROVIDER_URL, getBrokerURL());
784            props.setProperty("brokerURL", getBrokerURL());
785        }
786
787        if (getClientID() != null) {
788            props.setProperty("clientID", getClientID());
789        }
790
791        IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy.");
792        IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy.");
793        IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy.");
794
795        props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend()));
796        props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault()));
797        props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered()));
798        props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch()));
799
800        if (getPassword() != null) {
801            props.setProperty("password", getPassword());
802        }
803
804        props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
805        props.setProperty("useCompression", Boolean.toString(isUseCompression()));
806        props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
807        props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories()));
808
809        if (getUserName() != null) {
810            props.setProperty("userName", getUserName());
811        }
812
813        props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
814        props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync()));
815        props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
816        props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled()));
817        props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
818        props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
819        props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
820        props.setProperty("connectResponseTimeout", Integer.toString(getConnectResponseTimeout()));
821        props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
822        props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
823        props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
824        props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates()));
825        props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported()));
826        props.setProperty("transactedIndividualAck", Boolean.toString(isTransactedIndividualAck()));
827        props.setProperty("nonBlockingRedelivery", Boolean.toString(isNonBlockingRedelivery()));
828        props.setProperty("maxThreadPoolSize", Integer.toString(getMaxThreadPoolSize()));
829        props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled()));
830        props.setProperty("consumerFailoverRedeliveryWaitPeriod", Long.toString(getConsumerFailoverRedeliveryWaitPeriod()));
831        props.setProperty("rmIdFromConnectionId", Boolean.toString(isRmIdFromConnectionId()));
832        props.setProperty("consumerExpiryCheckEnabled", Boolean.toString(isConsumerExpiryCheckEnabled()));
833    }
834
835    public boolean isUseCompression() {
836        return useCompression;
837    }
838
839    /**
840     * Enables the use of compression of the message bodies
841     */
842    public void setUseCompression(boolean useCompression) {
843        this.useCompression = useCompression;
844    }
845
846    public boolean isObjectMessageSerializationDefered() {
847        return objectMessageSerializationDefered;
848    }
849
850    /**
851     * When an object is set on an ObjectMessage, the JMS spec requires the
852     * object to be serialized by that set method. Enabling this flag causes the
853     * object to not get serialized. The object may subsequently get serialized
854     * if the message needs to be sent over a socket or stored to disk.
855     */
856    public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
857        this.objectMessageSerializationDefered = objectMessageSerializationDefered;
858    }
859
860    public boolean isDispatchAsync() {
861        return dispatchAsync;
862    }
863
864    /**
865     * Enables or disables the default setting of whether or not consumers have
866     * their messages <a
867     * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
868     * synchronously or asynchronously by the broker</a>. For non-durable
869     * topics for example we typically dispatch synchronously by default to
870     * minimize context switches which boost performance. However sometimes its
871     * better to go slower to ensure that a single blocked consumer socket does
872     * not block delivery to other consumers.
873     *
874     * @param asyncDispatch If true then consumers created on this connection
875     *                will default to having their messages dispatched
876     *                asynchronously. The default value is true.
877     */
878    public void setDispatchAsync(boolean asyncDispatch) {
879        this.dispatchAsync = asyncDispatch;
880    }
881
882    /**
883     * @return Returns the closeTimeout.
884     */
885    public int getCloseTimeout() {
886        return closeTimeout;
887    }
888
889    /**
890     * Sets the timeout before a close is considered complete. Normally a
891     * close() on a connection waits for confirmation from the broker; this
892     * allows that operation to timeout to save the client hanging if there is
893     * no broker
894     */
895    public void setCloseTimeout(int closeTimeout) {
896        this.closeTimeout = closeTimeout;
897    }
898
899    /**
900     * @return Returns the alwaysSessionAsync.
901     */
902    public boolean isAlwaysSessionAsync() {
903        return alwaysSessionAsync;
904    }
905
906    /**
907     * If this flag is not set then a separate thread is not used for dispatching messages for each Session in
908     * the Connection. However, a separate thread is always used if there is more than one session, or the session
909     * isn't in auto acknowledge or duplicates ok mode.  By default this value is set to true and session dispatch
910     * happens asynchronously.
911     */
912    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
913        this.alwaysSessionAsync = alwaysSessionAsync;
914    }
915
916    /**
917     * @return Returns the optimizeAcknowledge.
918     */
919    public boolean isOptimizeAcknowledge() {
920        return optimizeAcknowledge;
921    }
922
923    /**
924     * @param optimizeAcknowledge The optimizeAcknowledge to set.
925     */
926    public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
927        this.optimizeAcknowledge = optimizeAcknowledge;
928    }
929
930    /**
931     * The max time in milliseconds between optimized ack batches
932     * @param optimizeAcknowledgeTimeOut
933     */
934    public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
935        this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
936    }
937
938    public long getOptimizeAcknowledgeTimeOut() {
939        return optimizeAcknowledgeTimeOut;
940    }
941
942    public boolean isNestedMapAndListEnabled() {
943        return nestedMapAndListEnabled;
944    }
945
946    /**
947     * Enables/disables whether or not Message properties and MapMessage entries
948     * support <a
949     * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
950     * Structures</a> of Map and List objects
951     */
952    public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
953        this.nestedMapAndListEnabled = structuredMapsEnabled;
954    }
955
956    public String getClientIDPrefix() {
957        return clientIDPrefix;
958    }
959
960    /**
961     * Sets the prefix used by autogenerated JMS Client ID values which are used
962     * if the JMS client does not explicitly specify on.
963     *
964     * @param clientIDPrefix
965     */
966    public void setClientIDPrefix(String clientIDPrefix) {
967        this.clientIDPrefix = clientIDPrefix;
968    }
969
970    protected synchronized IdGenerator getClientIdGenerator() {
971        if (clientIdGenerator == null) {
972            if (clientIDPrefix != null) {
973                clientIdGenerator = new IdGenerator(clientIDPrefix);
974            } else {
975                clientIdGenerator = new IdGenerator();
976            }
977        }
978        return clientIdGenerator;
979    }
980
981    protected void setClientIdGenerator(IdGenerator clientIdGenerator) {
982        this.clientIdGenerator = clientIdGenerator;
983    }
984
985    /**
986     * Sets the prefix used by connection id generator
987     * @param connectionIDPrefix
988     */
989    public void setConnectionIDPrefix(String connectionIDPrefix) {
990        this.connectionIDPrefix = connectionIDPrefix;
991    }
992
993    protected synchronized IdGenerator getConnectionIdGenerator() {
994        if (connectionIdGenerator == null) {
995            if (connectionIDPrefix != null) {
996                connectionIdGenerator = new IdGenerator(connectionIDPrefix);
997            } else {
998                connectionIdGenerator = new IdGenerator();
999            }
1000        }
1001        return connectionIdGenerator;
1002    }
1003
1004    protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) {
1005        this.connectionIdGenerator = connectionIdGenerator;
1006    }
1007
1008    /**
1009     * @return the statsEnabled
1010     */
1011    public boolean isStatsEnabled() {
1012        return this.factoryStats.isEnabled();
1013    }
1014
1015    /**
1016     * @param statsEnabled the statsEnabled to set
1017     */
1018    public void setStatsEnabled(boolean statsEnabled) {
1019        this.factoryStats.setEnabled(statsEnabled);
1020    }
1021
1022    public synchronized int getProducerWindowSize() {
1023        return producerWindowSize;
1024    }
1025
1026    public synchronized void setProducerWindowSize(int producerWindowSize) {
1027        this.producerWindowSize = producerWindowSize;
1028    }
1029
1030    public long getWarnAboutUnstartedConnectionTimeout() {
1031        return warnAboutUnstartedConnectionTimeout;
1032    }
1033
1034    /**
1035     * Enables the timeout from a connection creation to when a warning is
1036     * generated if the connection is not properly started via
1037     * {@link Connection#start()} and a message is received by a consumer. It is
1038     * a very common gotcha to forget to <a
1039     * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1040     * the connection</a> so this option makes the default case to create a
1041     * warning if the user forgets. To disable the warning just set the value to <
1042     * 0 (say -1).
1043     */
1044    public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1045        this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1046    }
1047
1048    public TransportListener getTransportListener() {
1049        return transportListener;
1050    }
1051
1052    /**
1053     * Allows a listener to be configured on the ConnectionFactory so that when this factory is used
1054     * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register
1055     * a transport listener.
1056     *
1057     * @param transportListener sets the listener to be registered on all connections
1058     * created by this factory
1059     */
1060    public void setTransportListener(TransportListener transportListener) {
1061        this.transportListener = transportListener;
1062    }
1063
1064
1065    public ExceptionListener getExceptionListener() {
1066        return exceptionListener;
1067    }
1068
1069    /**
1070     * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory
1071     * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
1072     * an exception listener.
1073     * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than
1074     * on connection (as it will be if more than one connection is subsequently created by this connection factory)
1075     * @param exceptionListener sets the exception listener to be registered on all connections
1076     * created by this factory
1077     */
1078    public void setExceptionListener(ExceptionListener exceptionListener) {
1079        this.exceptionListener = exceptionListener;
1080    }
1081
1082    public int getAuditDepth() {
1083        return auditDepth;
1084    }
1085
1086    public void setAuditDepth(int auditDepth) {
1087        this.auditDepth = auditDepth;
1088    }
1089
1090    public int getAuditMaximumProducerNumber() {
1091        return auditMaximumProducerNumber;
1092    }
1093
1094    public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
1095        this.auditMaximumProducerNumber = auditMaximumProducerNumber;
1096    }
1097
1098    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1099        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1100    }
1101
1102    public boolean isUseDedicatedTaskRunner() {
1103        return useDedicatedTaskRunner;
1104    }
1105
1106    public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
1107        this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
1108    }
1109
1110    public long getConsumerFailoverRedeliveryWaitPeriod() {
1111        return consumerFailoverRedeliveryWaitPeriod;
1112    }
1113
1114    public ClientInternalExceptionListener getClientInternalExceptionListener() {
1115        return clientInternalExceptionListener;
1116    }
1117
1118    /**
1119     * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory
1120     * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
1121     * an exception listener.
1122     * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than
1123     * on connection (as it will be if more than one connection is subsequently created by this connection factory)
1124     * @param clientInternalExceptionListener sets the exception listener to be registered on all connections
1125     * created by this factory
1126     */
1127    public void setClientInternalExceptionListener(
1128            ClientInternalExceptionListener clientInternalExceptionListener) {
1129        this.clientInternalExceptionListener = clientInternalExceptionListener;
1130    }
1131
1132    /**
1133     * @return the checkForDuplicates
1134     */
1135    public boolean isCheckForDuplicates() {
1136        return this.checkForDuplicates;
1137    }
1138
1139    /**
1140     * @param checkForDuplicates the checkForDuplicates to set
1141     */
1142    public void setCheckForDuplicates(boolean checkForDuplicates) {
1143        this.checkForDuplicates = checkForDuplicates;
1144    }
1145
1146    public boolean isTransactedIndividualAck() {
1147         return transactedIndividualAck;
1148     }
1149
1150     /**
1151      * when true, submit individual transacted acks immediately rather than with transaction completion.
1152      * This allows the acks to represent delivery status which can be persisted on rollback
1153      * Used in conjunction with org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter#setRewriteOnRedelivery(boolean)  true
1154      */
1155     public void setTransactedIndividualAck(boolean transactedIndividualAck) {
1156         this.transactedIndividualAck = transactedIndividualAck;
1157     }
1158
1159
1160     public boolean isNonBlockingRedelivery() {
1161         return nonBlockingRedelivery;
1162     }
1163
1164     /**
1165      * When true a MessageConsumer will not stop Message delivery before re-delivering Messages
1166      * from a rolled back transaction.  This implies that message order will not be preserved and
1167      * also will result in the TransactedIndividualAck option to be enabled.
1168      */
1169     public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
1170         this.nonBlockingRedelivery = nonBlockingRedelivery;
1171     }
1172
1173    public int getMaxThreadPoolSize() {
1174        return maxThreadPoolSize;
1175    }
1176
1177    public void setMaxThreadPoolSize(int maxThreadPoolSize) {
1178        this.maxThreadPoolSize = maxThreadPoolSize;
1179    }
1180
1181    public TaskRunnerFactory getSessionTaskRunner() {
1182        return sessionTaskRunner;
1183    }
1184
1185    public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
1186        this.sessionTaskRunner = sessionTaskRunner;
1187    }
1188
1189    public RejectedExecutionHandler getRejectedTaskHandler() {
1190        return rejectedTaskHandler;
1191    }
1192
1193    public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
1194        this.rejectedTaskHandler = rejectedTaskHandler;
1195    }
1196
1197    /**
1198     * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
1199     * to send an ack for any outstanding Message Acks.  By default this value is set to zero meaning that the consumers
1200     * will not do any background Message acknowledgment.
1201     *
1202     * @return the scheduledOptimizedAckInterval
1203     */
1204    public long getOptimizedAckScheduledAckInterval() {
1205        return optimizedAckScheduledAckInterval;
1206    }
1207
1208    /**
1209     * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
1210     * have been configured with optimizeAcknowledge enabled.
1211     *
1212     * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set
1213     */
1214    public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
1215        this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
1216    }
1217
1218
1219    public boolean isRmIdFromConnectionId() {
1220        return rmIdFromConnectionId;
1221    }
1222
1223    /**
1224     * uses the connection id as the resource identity for XAResource.isSameRM
1225     * ensuring join will only occur on a single connection
1226     */
1227    public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
1228        this.rmIdFromConnectionId = rmIdFromConnectionId;
1229    }
1230
1231    /**
1232     * @return true if MessageConsumer instance will check for expired messages before dispatch.
1233     */
1234    public boolean isConsumerExpiryCheckEnabled() {
1235        return consumerExpiryCheckEnabled;
1236    }
1237
1238    /**
1239     * Controls whether message expiration checking is done in each MessageConsumer
1240     * prior to dispatching a message.  Disabling this check can lead to consumption
1241     * of expired messages.
1242     *
1243     * @param consumerExpiryCheckEnabled
1244     *        controls whether expiration checking is done prior to dispatch.
1245     */
1246    public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
1247        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
1248    }
1249
1250    public List<String> getTrustedPackages() {
1251        return trustedPackages;
1252    }
1253
1254    public void setTrustedPackages(List<String> trustedPackages) {
1255        this.trustedPackages = trustedPackages;
1256    }
1257
1258    public boolean isTrustAllPackages() {
1259        return trustAllPackages;
1260    }
1261
1262    public void setTrustAllPackages(boolean trustAllPackages) {
1263        this.trustAllPackages = trustAllPackages;
1264    }
1265
1266        public int getConnectResponseTimeout() {
1267                return connectResponseTimeout;
1268        }
1269
1270        public void setConnectResponseTimeout(int connectResponseTimeout) {
1271                this.connectResponseTimeout = connectResponseTimeout;
1272        }
1273}