001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network.jms; 018 019import javax.jms.Connection; 020import javax.jms.Destination; 021import javax.jms.ExceptionListener; 022import javax.jms.JMSException; 023import javax.jms.Queue; 024import javax.jms.QueueConnection; 025import javax.jms.QueueConnectionFactory; 026import javax.jms.QueueSession; 027import javax.jms.Session; 028import javax.naming.NamingException; 029 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 */ 035public class SimpleJmsQueueConnector extends JmsConnector { 036 private static final Logger LOG = LoggerFactory.getLogger(SimpleJmsQueueConnector.class); 037 private String outboundQueueConnectionFactoryName; 038 private String localConnectionFactoryName; 039 private QueueConnectionFactory outboundQueueConnectionFactory; 040 private QueueConnectionFactory localQueueConnectionFactory; 041 private InboundQueueBridge[] inboundQueueBridges; 042 private OutboundQueueBridge[] outboundQueueBridges; 043 044 /** 045 * @return Returns the inboundQueueBridges. 046 */ 047 public InboundQueueBridge[] getInboundQueueBridges() { 048 return inboundQueueBridges; 049 } 050 051 /** 052 * @param inboundQueueBridges The inboundQueueBridges to set. 053 */ 054 public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges) { 055 this.inboundQueueBridges = inboundQueueBridges; 056 } 057 058 /** 059 * @return Returns the outboundQueueBridges. 060 */ 061 public OutboundQueueBridge[] getOutboundQueueBridges() { 062 return outboundQueueBridges; 063 } 064 065 /** 066 * @param outboundQueueBridges The outboundQueueBridges to set. 067 */ 068 public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges) { 069 this.outboundQueueBridges = outboundQueueBridges; 070 } 071 072 /** 073 * @return Returns the localQueueConnectionFactory. 074 */ 075 public QueueConnectionFactory getLocalQueueConnectionFactory() { 076 return localQueueConnectionFactory; 077 } 078 079 /** 080 * @param localQueueConnectionFactory The localQueueConnectionFactory to 081 * set. 082 */ 083 public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory) { 084 this.localQueueConnectionFactory = localConnectionFactory; 085 } 086 087 /** 088 * @return Returns the outboundQueueConnectionFactory. 089 */ 090 public QueueConnectionFactory getOutboundQueueConnectionFactory() { 091 return outboundQueueConnectionFactory; 092 } 093 094 /** 095 * @return Returns the outboundQueueConnectionFactoryName. 096 */ 097 public String getOutboundQueueConnectionFactoryName() { 098 return outboundQueueConnectionFactoryName; 099 } 100 101 /** 102 * @param outboundQueueConnectionFactoryName The 103 * outboundQueueConnectionFactoryName to set. 104 */ 105 public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName) { 106 this.outboundQueueConnectionFactoryName = foreignQueueConnectionFactoryName; 107 } 108 109 /** 110 * @return Returns the localConnectionFactoryName. 111 */ 112 public String getLocalConnectionFactoryName() { 113 return localConnectionFactoryName; 114 } 115 116 /** 117 * @param localConnectionFactoryName The localConnectionFactoryName to set. 118 */ 119 public void setLocalConnectionFactoryName(String localConnectionFactoryName) { 120 this.localConnectionFactoryName = localConnectionFactoryName; 121 } 122 123 /** 124 * @return Returns the localQueueConnection. 125 */ 126 public QueueConnection getLocalQueueConnection() { 127 return (QueueConnection) localConnection.get(); 128 } 129 130 /** 131 * @param localQueueConnection The localQueueConnection to set. 132 */ 133 public void setLocalQueueConnection(QueueConnection localQueueConnection) { 134 this.localConnection.set(localQueueConnection); 135 } 136 137 /** 138 * @return Returns the outboundQueueConnection. 139 */ 140 public QueueConnection getOutboundQueueConnection() { 141 return (QueueConnection) foreignConnection.get(); 142 } 143 144 /** 145 * @param outboundQueueConnection The outboundQueueConnection to set. 146 */ 147 public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) { 148 this.foreignConnection.set(foreignQueueConnection); 149 } 150 151 /** 152 * @param outboundQueueConnectionFactory The outboundQueueConnectionFactory 153 * to set. 154 */ 155 public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory) { 156 this.outboundQueueConnectionFactory = foreignQueueConnectionFactory; 157 } 158 159 @Override 160 protected void initializeForeignConnection() throws NamingException, JMSException { 161 162 final QueueConnection newConnection; 163 164 if (foreignConnection.get() == null) { 165 // get the connection factories 166 if (outboundQueueConnectionFactory == null) { 167 // look it up from JNDI 168 if (outboundQueueConnectionFactoryName != null) { 169 outboundQueueConnectionFactory = (QueueConnectionFactory)jndiOutboundTemplate 170 .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class); 171 if (outboundUsername != null) { 172 newConnection = outboundQueueConnectionFactory 173 .createQueueConnection(outboundUsername, outboundPassword); 174 } else { 175 newConnection = outboundQueueConnectionFactory.createQueueConnection(); 176 } 177 } else { 178 throw new JMSException("Cannot create foreignConnection - no information"); 179 } 180 } else { 181 if (outboundUsername != null) { 182 newConnection = outboundQueueConnectionFactory 183 .createQueueConnection(outboundUsername, outboundPassword); 184 } else { 185 newConnection = outboundQueueConnectionFactory.createQueueConnection(); 186 } 187 } 188 } else { 189 // Clear if for now in case something goes wrong during the init. 190 newConnection = (QueueConnection) foreignConnection.getAndSet(null); 191 } 192 193 if (outboundClientId != null && outboundClientId.length() > 0) { 194 newConnection.setClientID(getOutboundClientId()); 195 } 196 newConnection.start(); 197 198 outboundMessageConvertor.setConnection(newConnection); 199 200 // Configure the bridges with the new Outbound connection. 201 initializeInboundDestinationBridgesOutboundSide(newConnection); 202 initializeOutboundDestinationBridgesOutboundSide(newConnection); 203 204 // Register for any async error notifications now so we can reset in the 205 // case where there's not a lot of activity and a connection drops. 206 newConnection.setExceptionListener(new ExceptionListener() { 207 @Override 208 public void onException(JMSException exception) { 209 handleConnectionFailure(newConnection); 210 } 211 }); 212 213 // At this point all looks good, so this our current connection now. 214 foreignConnection.set(newConnection); 215 } 216 217 @Override 218 protected void initializeLocalConnection() throws NamingException, JMSException { 219 220 final QueueConnection newConnection; 221 222 if (localConnection.get() == null) { 223 // get the connection factories 224 if (localQueueConnectionFactory == null) { 225 if (embeddedConnectionFactory == null) { 226 // look it up from JNDI 227 if (localConnectionFactoryName != null) { 228 localQueueConnectionFactory = (QueueConnectionFactory)jndiLocalTemplate 229 .lookup(localConnectionFactoryName, QueueConnectionFactory.class); 230 if (localUsername != null) { 231 newConnection = localQueueConnectionFactory 232 .createQueueConnection(localUsername, localPassword); 233 } else { 234 newConnection = localQueueConnectionFactory.createQueueConnection(); 235 } 236 } else { 237 throw new JMSException("Cannot create localConnection - no information"); 238 } 239 } else { 240 newConnection = embeddedConnectionFactory.createQueueConnection(); 241 } 242 } else { 243 if (localUsername != null) { 244 newConnection = localQueueConnectionFactory. 245 createQueueConnection(localUsername, localPassword); 246 } else { 247 newConnection = localQueueConnectionFactory.createQueueConnection(); 248 } 249 } 250 251 } else { 252 // Clear if for now in case something goes wrong during the init. 253 newConnection = (QueueConnection) localConnection.getAndSet(null); 254 } 255 256 if (localClientId != null && localClientId.length() > 0) { 257 newConnection.setClientID(getLocalClientId()); 258 } 259 newConnection.start(); 260 261 inboundMessageConvertor.setConnection(newConnection); 262 263 // Configure the bridges with the new Local connection. 264 initializeInboundDestinationBridgesLocalSide(newConnection); 265 initializeOutboundDestinationBridgesLocalSide(newConnection); 266 267 // Register for any async error notifications now so we can reset in the 268 // case where there's not a lot of activity and a connection drops. 269 newConnection.setExceptionListener(new ExceptionListener() { 270 @Override 271 public void onException(JMSException exception) { 272 handleConnectionFailure(newConnection); 273 } 274 }); 275 276 // At this point all looks good, so this our current connection now. 277 localConnection.set(newConnection); 278 } 279 280 protected void initializeInboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException { 281 if (inboundQueueBridges != null) { 282 QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 283 284 for (InboundQueueBridge bridge : inboundQueueBridges) { 285 String queueName = bridge.getInboundQueueName(); 286 Queue foreignQueue = createForeignQueue(outboundSession, queueName); 287 bridge.setConsumer(null); 288 bridge.setConsumerQueue(foreignQueue); 289 bridge.setConsumerConnection(connection); 290 bridge.setJmsConnector(this); 291 addInboundBridge(bridge); 292 } 293 outboundSession.close(); 294 } 295 } 296 297 protected void initializeInboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException { 298 if (inboundQueueBridges != null) { 299 QueueSession localSession = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 300 301 for (InboundQueueBridge bridge : inboundQueueBridges) { 302 String localQueueName = bridge.getLocalQueueName(); 303 Queue activemqQueue = createActiveMQQueue(localSession, localQueueName); 304 bridge.setProducerQueue(activemqQueue); 305 bridge.setProducerConnection(connection); 306 if (bridge.getJmsMessageConvertor() == null) { 307 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 308 } 309 bridge.setJmsConnector(this); 310 addInboundBridge(bridge); 311 } 312 localSession.close(); 313 } 314 } 315 316 protected void initializeOutboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException { 317 if (outboundQueueBridges != null) { 318 QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 319 320 for (OutboundQueueBridge bridge : outboundQueueBridges) { 321 String queueName = bridge.getOutboundQueueName(); 322 Queue foreignQueue = createForeignQueue(outboundSession, queueName); 323 bridge.setProducerQueue(foreignQueue); 324 bridge.setProducerConnection(connection); 325 if (bridge.getJmsMessageConvertor() == null) { 326 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 327 } 328 bridge.setJmsConnector(this); 329 addOutboundBridge(bridge); 330 } 331 outboundSession.close(); 332 } 333 } 334 335 protected void initializeOutboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException { 336 if (outboundQueueBridges != null) { 337 QueueSession localSession = 338 connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 339 340 for (OutboundQueueBridge bridge : outboundQueueBridges) { 341 String localQueueName = bridge.getLocalQueueName(); 342 Queue activemqQueue = createActiveMQQueue(localSession, localQueueName); 343 bridge.setConsumer(null); 344 bridge.setConsumerQueue(activemqQueue); 345 bridge.setConsumerConnection(connection); 346 bridge.setJmsConnector(this); 347 addOutboundBridge(bridge); 348 } 349 localSession.close(); 350 } 351 } 352 353 protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, 354 Connection replyToConsumerConnection) { 355 Queue replyToProducerQueue = (Queue)destination; 356 boolean isInbound = replyToProducerConnection.equals(localConnection.get()); 357 358 if (isInbound) { 359 InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue); 360 if (bridge == null) { 361 bridge = new InboundQueueBridge() { 362 protected Destination processReplyToDestination(Destination destination) { 363 return null; 364 } 365 }; 366 try { 367 QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection) 368 .createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 369 Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue(); 370 replyToConsumerSession.close(); 371 bridge.setConsumerQueue(replyToConsumerQueue); 372 bridge.setProducerQueue(replyToProducerQueue); 373 bridge.setProducerConnection((QueueConnection)replyToProducerConnection); 374 bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection); 375 bridge.setDoHandleReplyTo(false); 376 if (bridge.getJmsMessageConvertor() == null) { 377 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 378 } 379 bridge.setJmsConnector(this); 380 bridge.start(); 381 LOG.info("Created replyTo bridge for {}", replyToProducerQueue); 382 } catch (Exception e) { 383 LOG.error("Failed to create replyTo bridge for queue: {}", replyToProducerQueue, e); 384 return null; 385 } 386 replyToBridges.put(replyToProducerQueue, bridge); 387 } 388 return bridge.getConsumerQueue(); 389 } else { 390 OutboundQueueBridge bridge = (OutboundQueueBridge)replyToBridges.get(replyToProducerQueue); 391 if (bridge == null) { 392 bridge = new OutboundQueueBridge() { 393 protected Destination processReplyToDestination(Destination destination) { 394 return null; 395 } 396 }; 397 try { 398 QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection) 399 .createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 400 Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue(); 401 replyToConsumerSession.close(); 402 bridge.setConsumerQueue(replyToConsumerQueue); 403 bridge.setProducerQueue(replyToProducerQueue); 404 bridge.setProducerConnection((QueueConnection)replyToProducerConnection); 405 bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection); 406 bridge.setDoHandleReplyTo(false); 407 if (bridge.getJmsMessageConvertor() == null) { 408 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 409 } 410 bridge.setJmsConnector(this); 411 bridge.start(); 412 LOG.info("Created replyTo bridge for {}", replyToProducerQueue); 413 } catch (Exception e) { 414 LOG.error("Failed to create replyTo bridge for queue: {}", replyToProducerQueue, e); 415 return null; 416 } 417 replyToBridges.put(replyToProducerQueue, bridge); 418 } 419 return bridge.getConsumerQueue(); 420 } 421 } 422 423 protected Queue createActiveMQQueue(QueueSession session, String queueName) throws JMSException { 424 return session.createQueue(queueName); 425 } 426 427 protected Queue createForeignQueue(QueueSession session, String queueName) throws JMSException { 428 Queue result = null; 429 430 if (preferJndiDestinationLookup) { 431 try { 432 // look-up the Queue 433 result = (Queue)jndiOutboundTemplate.lookup(queueName, Queue.class); 434 } catch (NamingException e) { 435 try { 436 result = session.createQueue(queueName); 437 } catch (JMSException e1) { 438 String errStr = "Failed to look-up or create Queue for name: " + queueName; 439 LOG.error(errStr, e); 440 JMSException jmsEx = new JMSException(errStr); 441 jmsEx.setLinkedException(e1); 442 throw jmsEx; 443 } 444 } 445 } else { 446 try { 447 result = session.createQueue(queueName); 448 } catch (JMSException e) { 449 // look-up the Queue 450 try { 451 result = (Queue)jndiOutboundTemplate.lookup(queueName, Queue.class); 452 } catch (NamingException e1) { 453 String errStr = "Failed to look-up Queue for name: " + queueName; 454 LOG.error(errStr, e); 455 JMSException jmsEx = new JMSException(errStr); 456 jmsEx.setLinkedException(e1); 457 throw jmsEx; 458 } 459 } 460 } 461 462 return result; 463 } 464}