001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.ra;
018
019import java.net.URI;
020import java.util.HashMap;
021
022import javax.jms.JMSException;
023import javax.resource.NotSupportedException;
024import javax.resource.ResourceException;
025import javax.resource.spi.ActivationSpec;
026import javax.resource.spi.BootstrapContext;
027import javax.resource.spi.ResourceAdapterInternalException;
028import javax.resource.spi.endpoint.MessageEndpointFactory;
029import javax.transaction.xa.XAException;
030import javax.transaction.xa.XAResource;
031
032import javax.transaction.xa.Xid;
033import org.apache.activemq.ActiveMQConnection;
034import org.apache.activemq.ActiveMQConnectionFactory;
035import org.apache.activemq.RedeliveryPolicy;
036import org.apache.activemq.TransactionContext;
037import org.apache.activemq.broker.BrokerFactory;
038import org.apache.activemq.broker.BrokerService;
039import org.apache.activemq.util.ServiceSupport;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Knows how to connect to one ActiveMQ server. It can then activate endpoints
045 * and deliver messages to those end points using the connection configure in
046 * the resource adapter. <p/>Must override equals and hashCode (JCA spec 16.4)
047 * 
048 * @org.apache.xbean.XBean element="resourceAdapter" rootElement="true"
049 *                         description="The JCA Resource Adaptor for ActiveMQ"
050 * 
051 */
052public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implements MessageResourceAdapter {
053    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQResourceAdapter.class);
054    private final HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> endpointWorkers = new HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker>();
055
056    private BootstrapContext bootstrapContext;
057    private String brokerXmlConfig;
058    private BrokerService broker;
059    private Thread brokerStartThread;
060    private ActiveMQConnectionFactory connectionFactory;
061    
062    /**
063     * 
064     */
065    public ActiveMQResourceAdapter() {
066        super();
067    }
068
069    /**
070     * @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext)
071     */
072    public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
073        this.bootstrapContext = bootstrapContext;
074        if (brokerXmlConfig != null && brokerXmlConfig.trim().length() > 0) {
075            brokerStartThread = new Thread("Starting ActiveMQ Broker") {
076                @Override
077                public void run () {
078                    try {
079                        // ensure RAR resources are available to xbean (needed for weblogic)
080                        log.debug("original thread context classLoader: " + Thread.currentThread().getContextClassLoader());
081                        Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
082                        log.debug("current (from getClass()) thread context classLoader: " + Thread.currentThread().getContextClassLoader());
083                        
084                        synchronized( ActiveMQResourceAdapter.this ) {
085                            broker = BrokerFactory.createBroker(new URI(brokerXmlConfig));
086                        }
087                        broker.start();
088                        // Default the ServerUrl to the local broker if not specified in the ra.xml
089                        if (getServerUrl() == null) {
090                            setServerUrl("vm://" + broker.getBrokerName() + "?create=false");
091                        }
092                    } catch (Throwable e) {
093                        log.warn("Could not start up embeded ActiveMQ Broker '"+brokerXmlConfig+"': "+e.getMessage());
094                        log.debug("Reason for: "+e.getMessage(), e);
095                    }
096                }
097            };
098            brokerStartThread.setDaemon(true);
099            brokerStartThread.start();
100            
101            // Wait up to 5 seconds for the broker to start up in the async thread.. otherwise keep going without it..
102            try {
103                brokerStartThread.join(1000*5);
104            } catch (InterruptedException e) {
105                Thread.currentThread().interrupt();
106            }                
107        }
108    }
109
110    public ActiveMQConnection makeConnection() throws JMSException {
111        if( connectionFactory == null ) {
112            return makeConnection(getInfo());
113        } else {
114            return makeConnection(getInfo(), connectionFactory);
115        }
116    }
117
118    /**
119     * @param activationSpec
120     */
121    public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException {
122        ActiveMQConnectionFactory cf = getConnectionFactory();
123        if (cf == null) {
124            cf = createConnectionFactory(getInfo(), activationSpec);
125        }
126        String userName = defaultValue(activationSpec.getUserName(), getInfo().getUserName());
127        String password = defaultValue(activationSpec.getPassword(), getInfo().getPassword());
128        String clientId = activationSpec.getClientId();
129        if (clientId != null) {
130            cf.setClientID(clientId);
131        } else {
132            if (activationSpec.isDurableSubscription()) {
133                log.warn("No clientID specified for durable subscription: " + activationSpec);
134            }
135        }
136        ActiveMQConnection physicalConnection = (ActiveMQConnection) cf.createConnection(userName, password);
137
138        // have we configured a redelivery policy
139        RedeliveryPolicy redeliveryPolicy = activationSpec.redeliveryPolicy();
140        if (redeliveryPolicy != null) {
141            physicalConnection.setRedeliveryPolicy(redeliveryPolicy);
142        }
143        return physicalConnection;
144    }
145
146    /**
147     * @see javax.resource.spi.ResourceAdapter#stop()
148     */
149    public void stop() {
150        synchronized (endpointWorkers) {
151            while (endpointWorkers.size() > 0) {
152                ActiveMQEndpointActivationKey key = endpointWorkers.keySet().iterator().next();
153                endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec());
154            }
155        }
156        
157        synchronized( this ) {
158            if (broker != null) {
159                if( brokerStartThread.isAlive() ) {
160                    brokerStartThread.interrupt();
161                }
162                ServiceSupport.dispose(broker);
163                broker = null;
164            }
165        }
166        
167        this.bootstrapContext = null;
168    }
169
170    /**
171     * @see org.apache.activemq.ra.MessageResourceAdapter#getBootstrapContext()
172     */
173    public BootstrapContext getBootstrapContext() {
174        return bootstrapContext;
175    }
176
177    /**
178     * @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory,
179     *      javax.resource.spi.ActivationSpec)
180     */
181    public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) throws ResourceException {
182
183        // spec section 5.3.3
184        if (!equals(activationSpec.getResourceAdapter())) {
185            throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance (" + activationSpec.getResourceAdapter() + " != " + this + ")");
186        }
187
188        if (!(activationSpec instanceof MessageActivationSpec)) {
189            throw new NotSupportedException("That type of ActivationSpec not supported: " + activationSpec.getClass());
190        }
191
192        ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec);
193        // This is weird.. the same endpoint activated twice.. must be a
194        // container error.
195        if (endpointWorkers.containsKey(key)) {
196            throw new IllegalStateException("Endpoint previously activated");
197        }
198
199        ActiveMQEndpointWorker worker = new ActiveMQEndpointWorker(this, key);
200
201        endpointWorkers.put(key, worker);
202        worker.start();
203    }
204
205    /**
206     * @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory,
207     *      javax.resource.spi.ActivationSpec)
208     */
209    public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) {
210        if (activationSpec instanceof MessageActivationSpec) {
211            ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec);
212            ActiveMQEndpointWorker worker = null;
213            synchronized (endpointWorkers) {
214                worker = endpointWorkers.remove(key);
215            }
216            if (worker == null) {
217                // This is weird.. that endpoint was not activated.. oh well..
218                // this method
219                // does not throw exceptions so just return.
220                return;
221            }
222            try {
223                worker.stop();
224            } catch (InterruptedException e) {
225                // We interrupted.. we won't throw an exception but will stop
226                // waiting for the worker
227                // to stop.. we tried our best. Keep trying to interrupt the
228                // thread.
229                Thread.currentThread().interrupt();
230            }
231
232        }
233
234    }
235
236    /**
237     * We only connect to one resource manager per ResourceAdapter instance, so
238     * any ActivationSpec will return the same XAResource.
239     * 
240     * @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[])
241     */
242    public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
243        try {
244            return new XAResource[]{
245                    new TransactionContext() {
246
247                        @Override
248                        public boolean isSameRM(XAResource xaresource) throws XAException {
249                            ActiveMQConnection original = null;
250                            try {
251                                original = setConnection(newConnection());
252                                boolean result = super.isSameRM(xaresource);
253                                LOG.trace("{}.recover({})={}", getConnection(), xaresource, result);
254                                return result;
255
256                            } catch (JMSException e) {
257                                LOG.trace("isSameRM({}) failed", xaresource, e);
258                                XAException xaException = new XAException(e.getMessage());
259                                throw xaException;
260                            } finally {
261                                closeConnection(original);
262                            }
263                        }
264
265                        @Override
266                        protected String getResourceManagerId() throws JMSException {
267                            ActiveMQConnection original = null;
268                            try {
269                                original = setConnection(newConnection());
270                                return super.getResourceManagerId();
271                            } finally {
272                                closeConnection(original);
273                            }
274                        }
275
276                        @Override
277                        public void commit(Xid xid, boolean onePhase) throws XAException {
278                            ActiveMQConnection original = null;
279                            try {
280                                setConnection(newConnection());
281                                super.commit(xid, onePhase);
282                                LOG.trace("{}.commit({},{})", getConnection(), xid);
283
284                            } catch (JMSException e) {
285                                LOG.trace("{}.commit({},{}) failed", getConnection(), xid, onePhase, e);
286                                throwXAException(e);
287                            } finally {
288                                closeConnection(original);
289                            }
290                        }
291
292                        @Override
293                        public void rollback(Xid xid) throws XAException {
294                            ActiveMQConnection original = null;
295                            try {
296                                original = setConnection(newConnection());
297                                super.rollback(xid);
298                                LOG.trace("{}.rollback({})", getConnection(), xid);
299
300                            } catch (JMSException e) {
301                                LOG.trace("{}.rollback({}) failed", getConnection(), xid, e);
302                                throwXAException(e);
303                            } finally {
304                               closeConnection(original);
305                            }
306                        }
307
308                        @Override
309                        public Xid[] recover(int flags) throws XAException {
310                            Xid[] result = new Xid[]{};
311                            ActiveMQConnection original = null;
312                            try {
313                                original = setConnection(newConnection());
314                                result = super.recover(flags);
315                                LOG.trace("{}.recover({})={}", getConnection(), flags, result);
316
317                            } catch (JMSException e) {
318                                LOG.trace("{}.recover({}) failed", getConnection(), flags, e);
319                                throwXAException(e);
320                            } finally {
321                                closeConnection(original);
322                            }
323                            return result;
324                        }
325
326                        @Override
327                        public void forget(Xid xid) throws XAException {
328                            ActiveMQConnection original = null;
329                            try {
330                                original = setConnection(newConnection());
331                                super.forget(xid);
332                                LOG.trace("{}.forget({})", getConnection(), xid);
333
334                            } catch (JMSException e) {
335                                LOG.trace("{}.forget({}) failed", getConnection(), xid, e);
336                                throwXAException(e);
337                            } finally {
338                                closeConnection(original);
339                            }
340                        }
341
342                        private void throwXAException(JMSException e) throws XAException {
343                            XAException xaException = new XAException(e.getMessage());
344                            xaException.errorCode = XAException.XAER_RMFAIL;
345                            throw xaException;
346                        }
347
348                        private ActiveMQConnection newConnection() throws JMSException {
349                            ActiveMQConnection connection = makeConnection();
350                            connection.start();
351                            return connection;
352                        }
353
354                        private void closeConnection(ActiveMQConnection original) {
355                            ActiveMQConnection connection = getConnection();
356                            if (connection != null) {
357                                try {
358                                    connection.close();
359                                } catch (JMSException ignored) {}
360                            }
361                            setConnection(original);
362                        }
363                    }};
364
365        } catch (Exception e) {
366            throw new ResourceException(e);
367        }
368    }
369
370    // ///////////////////////////////////////////////////////////////////////
371    //
372    // Java Bean getters and setters for this ResourceAdapter class.
373    //
374    // ///////////////////////////////////////////////////////////////////////
375
376    /**
377     * @see org.apache.activemq.ra.MessageResourceAdapter#getBrokerXmlConfig()
378     */
379    public String getBrokerXmlConfig() {
380        return brokerXmlConfig;
381    }
382
383    /**
384     * Sets the <a href="http://activemq.org/Xml+Configuration">XML
385     * configuration file </a> used to configure the ActiveMQ broker via Spring
386     * if using embedded mode.
387     * 
388     * @param brokerXmlConfig is the filename which is assumed to be on the
389     *                classpath unless a URL is specified. So a value of
390     *                <code>foo/bar.xml</code> would be assumed to be on the
391     *                classpath whereas <code>file:dir/file.xml</code> would
392     *                use the file system. Any valid URL string is supported.
393     */
394    public void setBrokerXmlConfig(String brokerXmlConfig) {
395        this.brokerXmlConfig = brokerXmlConfig;
396    }
397
398    /**
399     * @see java.lang.Object#equals(java.lang.Object)
400     */
401    @Override
402    public boolean equals(Object o) {
403        if (this == o) {
404            return true;
405        }
406        if (!(o instanceof MessageResourceAdapter)) {
407            return false;
408        }
409
410        final MessageResourceAdapter activeMQResourceAdapter = (MessageResourceAdapter)o;
411
412        if (!getInfo().equals(activeMQResourceAdapter.getInfo())) {
413            return false;
414        }
415        if (notEqual(brokerXmlConfig, activeMQResourceAdapter.getBrokerXmlConfig())) {
416            return false;
417        }
418
419        return true;
420    }
421
422    /**
423     * @see java.lang.Object#hashCode()
424     */
425    @Override
426    public int hashCode() {
427        int result;
428        result = getInfo().hashCode();
429        if (brokerXmlConfig != null) {
430            result ^= brokerXmlConfig.hashCode();
431        }
432        return result;
433    }
434
435    public ActiveMQConnectionFactory getConnectionFactory() {
436        return connectionFactory;
437    }
438
439    public void setConnectionFactory(ActiveMQConnectionFactory aConnectionFactory) {
440        this.connectionFactory = aConnectionFactory;
441    }
442
443
444    }