001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.ra; 018 019import java.net.URI; 020import java.util.HashMap; 021 022import javax.jms.JMSException; 023import javax.resource.NotSupportedException; 024import javax.resource.ResourceException; 025import javax.resource.spi.ActivationSpec; 026import javax.resource.spi.BootstrapContext; 027import javax.resource.spi.ResourceAdapterInternalException; 028import javax.resource.spi.endpoint.MessageEndpointFactory; 029import javax.transaction.xa.XAException; 030import javax.transaction.xa.XAResource; 031 032import javax.transaction.xa.Xid; 033import org.apache.activemq.ActiveMQConnection; 034import org.apache.activemq.ActiveMQConnectionFactory; 035import org.apache.activemq.RedeliveryPolicy; 036import org.apache.activemq.TransactionContext; 037import org.apache.activemq.broker.BrokerFactory; 038import org.apache.activemq.broker.BrokerService; 039import org.apache.activemq.util.ServiceSupport; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Knows how to connect to one ActiveMQ server. It can then activate endpoints 045 * and deliver messages to those end points using the connection configure in 046 * the resource adapter. <p/>Must override equals and hashCode (JCA spec 16.4) 047 * 048 * @org.apache.xbean.XBean element="resourceAdapter" rootElement="true" 049 * description="The JCA Resource Adaptor for ActiveMQ" 050 * 051 */ 052public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implements MessageResourceAdapter { 053 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQResourceAdapter.class); 054 private final HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> endpointWorkers = new HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker>(); 055 056 private BootstrapContext bootstrapContext; 057 private String brokerXmlConfig; 058 private BrokerService broker; 059 private Thread brokerStartThread; 060 private ActiveMQConnectionFactory connectionFactory; 061 062 /** 063 * 064 */ 065 public ActiveMQResourceAdapter() { 066 super(); 067 } 068 069 /** 070 * @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext) 071 */ 072 public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException { 073 this.bootstrapContext = bootstrapContext; 074 if (brokerXmlConfig != null && brokerXmlConfig.trim().length() > 0) { 075 brokerStartThread = new Thread("Starting ActiveMQ Broker") { 076 @Override 077 public void run () { 078 try { 079 // ensure RAR resources are available to xbean (needed for weblogic) 080 log.debug("original thread context classLoader: " + Thread.currentThread().getContextClassLoader()); 081 Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); 082 log.debug("current (from getClass()) thread context classLoader: " + Thread.currentThread().getContextClassLoader()); 083 084 synchronized( ActiveMQResourceAdapter.this ) { 085 broker = BrokerFactory.createBroker(new URI(brokerXmlConfig)); 086 } 087 broker.start(); 088 // Default the ServerUrl to the local broker if not specified in the ra.xml 089 if (getServerUrl() == null) { 090 setServerUrl("vm://" + broker.getBrokerName() + "?create=false"); 091 } 092 } catch (Throwable e) { 093 log.warn("Could not start up embeded ActiveMQ Broker '"+brokerXmlConfig+"': "+e.getMessage()); 094 log.debug("Reason for: "+e.getMessage(), e); 095 } 096 } 097 }; 098 brokerStartThread.setDaemon(true); 099 brokerStartThread.start(); 100 101 // Wait up to 5 seconds for the broker to start up in the async thread.. otherwise keep going without it.. 102 try { 103 brokerStartThread.join(1000*5); 104 } catch (InterruptedException e) { 105 Thread.currentThread().interrupt(); 106 } 107 } 108 } 109 110 public ActiveMQConnection makeConnection() throws JMSException { 111 if( connectionFactory == null ) { 112 return makeConnection(getInfo()); 113 } else { 114 return makeConnection(getInfo(), connectionFactory); 115 } 116 } 117 118 /** 119 * @param activationSpec 120 */ 121 public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException { 122 ActiveMQConnectionFactory cf = getConnectionFactory(); 123 if (cf == null) { 124 cf = createConnectionFactory(getInfo(), activationSpec); 125 } 126 String userName = defaultValue(activationSpec.getUserName(), getInfo().getUserName()); 127 String password = defaultValue(activationSpec.getPassword(), getInfo().getPassword()); 128 String clientId = activationSpec.getClientId(); 129 if (clientId != null) { 130 cf.setClientID(clientId); 131 } else { 132 if (activationSpec.isDurableSubscription()) { 133 log.warn("No clientID specified for durable subscription: " + activationSpec); 134 } 135 } 136 ActiveMQConnection physicalConnection = (ActiveMQConnection) cf.createConnection(userName, password); 137 138 // have we configured a redelivery policy 139 RedeliveryPolicy redeliveryPolicy = activationSpec.redeliveryPolicy(); 140 if (redeliveryPolicy != null) { 141 physicalConnection.setRedeliveryPolicy(redeliveryPolicy); 142 } 143 return physicalConnection; 144 } 145 146 /** 147 * @see javax.resource.spi.ResourceAdapter#stop() 148 */ 149 public void stop() { 150 synchronized (endpointWorkers) { 151 while (endpointWorkers.size() > 0) { 152 ActiveMQEndpointActivationKey key = endpointWorkers.keySet().iterator().next(); 153 endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec()); 154 } 155 } 156 157 synchronized( this ) { 158 if (broker != null) { 159 if( brokerStartThread.isAlive() ) { 160 brokerStartThread.interrupt(); 161 } 162 ServiceSupport.dispose(broker); 163 broker = null; 164 } 165 } 166 167 this.bootstrapContext = null; 168 } 169 170 /** 171 * @see org.apache.activemq.ra.MessageResourceAdapter#getBootstrapContext() 172 */ 173 public BootstrapContext getBootstrapContext() { 174 return bootstrapContext; 175 } 176 177 /** 178 * @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory, 179 * javax.resource.spi.ActivationSpec) 180 */ 181 public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) throws ResourceException { 182 183 // spec section 5.3.3 184 if (!equals(activationSpec.getResourceAdapter())) { 185 throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance (" + activationSpec.getResourceAdapter() + " != " + this + ")"); 186 } 187 188 if (!(activationSpec instanceof MessageActivationSpec)) { 189 throw new NotSupportedException("That type of ActivationSpec not supported: " + activationSpec.getClass()); 190 } 191 192 ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec); 193 // This is weird.. the same endpoint activated twice.. must be a 194 // container error. 195 if (endpointWorkers.containsKey(key)) { 196 throw new IllegalStateException("Endpoint previously activated"); 197 } 198 199 ActiveMQEndpointWorker worker = new ActiveMQEndpointWorker(this, key); 200 201 endpointWorkers.put(key, worker); 202 worker.start(); 203 } 204 205 /** 206 * @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory, 207 * javax.resource.spi.ActivationSpec) 208 */ 209 public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) { 210 if (activationSpec instanceof MessageActivationSpec) { 211 ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec); 212 ActiveMQEndpointWorker worker = null; 213 synchronized (endpointWorkers) { 214 worker = endpointWorkers.remove(key); 215 } 216 if (worker == null) { 217 // This is weird.. that endpoint was not activated.. oh well.. 218 // this method 219 // does not throw exceptions so just return. 220 return; 221 } 222 try { 223 worker.stop(); 224 } catch (InterruptedException e) { 225 // We interrupted.. we won't throw an exception but will stop 226 // waiting for the worker 227 // to stop.. we tried our best. Keep trying to interrupt the 228 // thread. 229 Thread.currentThread().interrupt(); 230 } 231 232 } 233 234 } 235 236 /** 237 * We only connect to one resource manager per ResourceAdapter instance, so 238 * any ActivationSpec will return the same XAResource. 239 * 240 * @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[]) 241 */ 242 public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException { 243 try { 244 return new XAResource[]{ 245 new TransactionContext() { 246 247 @Override 248 public boolean isSameRM(XAResource xaresource) throws XAException { 249 ActiveMQConnection original = null; 250 try { 251 original = setConnection(newConnection()); 252 boolean result = super.isSameRM(xaresource); 253 LOG.trace("{}.recover({})={}", getConnection(), xaresource, result); 254 return result; 255 256 } catch (JMSException e) { 257 LOG.trace("isSameRM({}) failed", xaresource, e); 258 XAException xaException = new XAException(e.getMessage()); 259 throw xaException; 260 } finally { 261 closeConnection(original); 262 } 263 } 264 265 @Override 266 protected String getResourceManagerId() throws JMSException { 267 ActiveMQConnection original = null; 268 try { 269 original = setConnection(newConnection()); 270 return super.getResourceManagerId(); 271 } finally { 272 closeConnection(original); 273 } 274 } 275 276 @Override 277 public void commit(Xid xid, boolean onePhase) throws XAException { 278 ActiveMQConnection original = null; 279 try { 280 setConnection(newConnection()); 281 super.commit(xid, onePhase); 282 LOG.trace("{}.commit({},{})", getConnection(), xid); 283 284 } catch (JMSException e) { 285 LOG.trace("{}.commit({},{}) failed", getConnection(), xid, onePhase, e); 286 throwXAException(e); 287 } finally { 288 closeConnection(original); 289 } 290 } 291 292 @Override 293 public void rollback(Xid xid) throws XAException { 294 ActiveMQConnection original = null; 295 try { 296 original = setConnection(newConnection()); 297 super.rollback(xid); 298 LOG.trace("{}.rollback({})", getConnection(), xid); 299 300 } catch (JMSException e) { 301 LOG.trace("{}.rollback({}) failed", getConnection(), xid, e); 302 throwXAException(e); 303 } finally { 304 closeConnection(original); 305 } 306 } 307 308 @Override 309 public Xid[] recover(int flags) throws XAException { 310 Xid[] result = new Xid[]{}; 311 ActiveMQConnection original = null; 312 try { 313 original = setConnection(newConnection()); 314 result = super.recover(flags); 315 LOG.trace("{}.recover({})={}", getConnection(), flags, result); 316 317 } catch (JMSException e) { 318 LOG.trace("{}.recover({}) failed", getConnection(), flags, e); 319 throwXAException(e); 320 } finally { 321 closeConnection(original); 322 } 323 return result; 324 } 325 326 @Override 327 public void forget(Xid xid) throws XAException { 328 ActiveMQConnection original = null; 329 try { 330 original = setConnection(newConnection()); 331 super.forget(xid); 332 LOG.trace("{}.forget({})", getConnection(), xid); 333 334 } catch (JMSException e) { 335 LOG.trace("{}.forget({}) failed", getConnection(), xid, e); 336 throwXAException(e); 337 } finally { 338 closeConnection(original); 339 } 340 } 341 342 private void throwXAException(JMSException e) throws XAException { 343 XAException xaException = new XAException(e.getMessage()); 344 xaException.errorCode = XAException.XAER_RMFAIL; 345 throw xaException; 346 } 347 348 private ActiveMQConnection newConnection() throws JMSException { 349 ActiveMQConnection connection = makeConnection(); 350 connection.start(); 351 return connection; 352 } 353 354 private void closeConnection(ActiveMQConnection original) { 355 ActiveMQConnection connection = getConnection(); 356 if (connection != null) { 357 try { 358 connection.close(); 359 } catch (JMSException ignored) {} 360 } 361 setConnection(original); 362 } 363 }}; 364 365 } catch (Exception e) { 366 throw new ResourceException(e); 367 } 368 } 369 370 // /////////////////////////////////////////////////////////////////////// 371 // 372 // Java Bean getters and setters for this ResourceAdapter class. 373 // 374 // /////////////////////////////////////////////////////////////////////// 375 376 /** 377 * @see org.apache.activemq.ra.MessageResourceAdapter#getBrokerXmlConfig() 378 */ 379 public String getBrokerXmlConfig() { 380 return brokerXmlConfig; 381 } 382 383 /** 384 * Sets the <a href="http://activemq.org/Xml+Configuration">XML 385 * configuration file </a> used to configure the ActiveMQ broker via Spring 386 * if using embedded mode. 387 * 388 * @param brokerXmlConfig is the filename which is assumed to be on the 389 * classpath unless a URL is specified. So a value of 390 * <code>foo/bar.xml</code> would be assumed to be on the 391 * classpath whereas <code>file:dir/file.xml</code> would 392 * use the file system. Any valid URL string is supported. 393 */ 394 public void setBrokerXmlConfig(String brokerXmlConfig) { 395 this.brokerXmlConfig = brokerXmlConfig; 396 } 397 398 /** 399 * @see java.lang.Object#equals(java.lang.Object) 400 */ 401 @Override 402 public boolean equals(Object o) { 403 if (this == o) { 404 return true; 405 } 406 if (!(o instanceof MessageResourceAdapter)) { 407 return false; 408 } 409 410 final MessageResourceAdapter activeMQResourceAdapter = (MessageResourceAdapter)o; 411 412 if (!getInfo().equals(activeMQResourceAdapter.getInfo())) { 413 return false; 414 } 415 if (notEqual(brokerXmlConfig, activeMQResourceAdapter.getBrokerXmlConfig())) { 416 return false; 417 } 418 419 return true; 420 } 421 422 /** 423 * @see java.lang.Object#hashCode() 424 */ 425 @Override 426 public int hashCode() { 427 int result; 428 result = getInfo().hashCode(); 429 if (brokerXmlConfig != null) { 430 result ^= brokerXmlConfig.hashCode(); 431 } 432 return result; 433 } 434 435 public ActiveMQConnectionFactory getConnectionFactory() { 436 return connectionFactory; 437 } 438 439 public void setConnectionFactory(ActiveMQConnectionFactory aConnectionFactory) { 440 this.connectionFactory = aConnectionFactory; 441 } 442 443 444 }