001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network.jms; 018 019import javax.jms.Connection; 020import javax.jms.Destination; 021import javax.jms.ExceptionListener; 022import javax.jms.JMSException; 023import javax.jms.Session; 024import javax.jms.Topic; 025import javax.jms.TopicConnection; 026import javax.jms.TopicConnectionFactory; 027import javax.jms.TopicSession; 028import javax.naming.NamingException; 029 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * A Bridge to other JMS Topic providers 035 */ 036public class SimpleJmsTopicConnector extends JmsConnector { 037 private static final Logger LOG = LoggerFactory.getLogger(SimpleJmsTopicConnector.class); 038 private String outboundTopicConnectionFactoryName; 039 private String localConnectionFactoryName; 040 private TopicConnectionFactory outboundTopicConnectionFactory; 041 private TopicConnectionFactory localTopicConnectionFactory; 042 private InboundTopicBridge[] inboundTopicBridges; 043 private OutboundTopicBridge[] outboundTopicBridges; 044 045 /** 046 * @return Returns the inboundTopicBridges. 047 */ 048 public InboundTopicBridge[] getInboundTopicBridges() { 049 return inboundTopicBridges; 050 } 051 052 /** 053 * @param inboundTopicBridges The inboundTopicBridges to set. 054 */ 055 public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges) { 056 this.inboundTopicBridges = inboundTopicBridges; 057 } 058 059 /** 060 * @return Returns the outboundTopicBridges. 061 */ 062 public OutboundTopicBridge[] getOutboundTopicBridges() { 063 return outboundTopicBridges; 064 } 065 066 /** 067 * @param outboundTopicBridges The outboundTopicBridges to set. 068 */ 069 public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges) { 070 this.outboundTopicBridges = outboundTopicBridges; 071 } 072 073 /** 074 * @return Returns the localTopicConnectionFactory. 075 */ 076 public TopicConnectionFactory getLocalTopicConnectionFactory() { 077 return localTopicConnectionFactory; 078 } 079 080 /** 081 * @param localTopicConnectionFactory The localTopicConnectionFactory to set. 082 */ 083 public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory) { 084 this.localTopicConnectionFactory = localConnectionFactory; 085 } 086 087 /** 088 * @return Returns the outboundTopicConnectionFactory. 089 */ 090 public TopicConnectionFactory getOutboundTopicConnectionFactory() { 091 return outboundTopicConnectionFactory; 092 } 093 094 /** 095 * @return Returns the outboundTopicConnectionFactoryName. 096 */ 097 public String getOutboundTopicConnectionFactoryName() { 098 return outboundTopicConnectionFactoryName; 099 } 100 101 /** 102 * @param outboundTopicConnectionFactoryName The outboundTopicConnectionFactoryName to set. 103 */ 104 public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) { 105 this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName; 106 } 107 108 /** 109 * @return Returns the localConnectionFactoryName. 110 */ 111 public String getLocalConnectionFactoryName() { 112 return localConnectionFactoryName; 113 } 114 115 /** 116 * @param localConnectionFactoryName The localConnectionFactoryName to set. 117 */ 118 public void setLocalConnectionFactoryName(String localConnectionFactoryName) { 119 this.localConnectionFactoryName = localConnectionFactoryName; 120 } 121 122 /** 123 * @return Returns the localTopicConnection. 124 */ 125 public TopicConnection getLocalTopicConnection() { 126 return (TopicConnection) localConnection.get(); 127 } 128 129 /** 130 * @param localTopicConnection The localTopicConnection to set. 131 */ 132 public void setLocalTopicConnection(TopicConnection localTopicConnection) { 133 this.localConnection.set(localTopicConnection); 134 } 135 136 /** 137 * @return Returns the outboundTopicConnection. 138 */ 139 public TopicConnection getOutboundTopicConnection() { 140 return (TopicConnection) foreignConnection.get(); 141 } 142 143 /** 144 * @param outboundTopicConnection The outboundTopicConnection to set. 145 */ 146 public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) { 147 this.foreignConnection.set(foreignTopicConnection); 148 } 149 150 /** 151 * @param outboundTopicConnectionFactory The outboundTopicConnectionFactory to set. 152 */ 153 public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) { 154 this.outboundTopicConnectionFactory = foreignTopicConnectionFactory; 155 } 156 157 @Override 158 protected void initializeForeignConnection() throws NamingException, JMSException { 159 160 final TopicConnection newConnection; 161 162 if (foreignConnection.get() == null) { 163 // get the connection factories 164 if (outboundTopicConnectionFactory == null) { 165 // look it up from JNDI 166 if (outboundTopicConnectionFactoryName != null) { 167 outboundTopicConnectionFactory = (TopicConnectionFactory)jndiOutboundTemplate 168 .lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class); 169 if (outboundUsername != null) { 170 newConnection = outboundTopicConnectionFactory 171 .createTopicConnection(outboundUsername, outboundPassword); 172 } else { 173 newConnection = outboundTopicConnectionFactory.createTopicConnection(); 174 } 175 } else { 176 throw new JMSException("Cannot create foreignConnection - no information"); 177 } 178 } else { 179 if (outboundUsername != null) { 180 newConnection = outboundTopicConnectionFactory 181 .createTopicConnection(outboundUsername, outboundPassword); 182 } else { 183 newConnection = outboundTopicConnectionFactory.createTopicConnection(); 184 } 185 } 186 } else { 187 // Clear if for now in case something goes wrong during the init. 188 newConnection = (TopicConnection) foreignConnection.getAndSet(null); 189 } 190 191 if (outboundClientId != null && outboundClientId.length() > 0) { 192 newConnection.setClientID(getOutboundClientId()); 193 } 194 newConnection.start(); 195 196 outboundMessageConvertor.setConnection(newConnection); 197 198 // Configure the bridges with the new Outbound connection. 199 initializeInboundDestinationBridgesOutboundSide(newConnection); 200 initializeOutboundDestinationBridgesOutboundSide(newConnection); 201 202 // Register for any async error notifications now so we can reset in the 203 // case where there's not a lot of activity and a connection drops. 204 newConnection.setExceptionListener(new ExceptionListener() { 205 @Override 206 public void onException(JMSException exception) { 207 handleConnectionFailure(newConnection); 208 } 209 }); 210 211 // At this point all looks good, so this our current connection now. 212 foreignConnection.set(newConnection); 213 } 214 215 @Override 216 protected void initializeLocalConnection() throws NamingException, JMSException { 217 218 final TopicConnection newConnection; 219 220 if (localConnection.get() == null) { 221 // get the connection factories 222 if (localTopicConnectionFactory == null) { 223 if (embeddedConnectionFactory == null) { 224 // look it up from JNDI 225 if (localConnectionFactoryName != null) { 226 localTopicConnectionFactory = (TopicConnectionFactory)jndiLocalTemplate 227 .lookup(localConnectionFactoryName, TopicConnectionFactory.class); 228 if (localUsername != null) { 229 newConnection = localTopicConnectionFactory 230 .createTopicConnection(localUsername, localPassword); 231 } else { 232 newConnection = localTopicConnectionFactory.createTopicConnection(); 233 } 234 } else { 235 throw new JMSException("Cannot create localConnection - no information"); 236 } 237 } else { 238 newConnection = embeddedConnectionFactory.createTopicConnection(); 239 } 240 } else { 241 if (localUsername != null) { 242 newConnection = localTopicConnectionFactory. 243 createTopicConnection(localUsername, localPassword); 244 } else { 245 newConnection = localTopicConnectionFactory.createTopicConnection(); 246 } 247 } 248 249 } else { 250 // Clear if for now in case something goes wrong during the init. 251 newConnection = (TopicConnection) localConnection.getAndSet(null); 252 } 253 254 if (localClientId != null && localClientId.length() > 0) { 255 newConnection.setClientID(getLocalClientId()); 256 } 257 newConnection.start(); 258 259 inboundMessageConvertor.setConnection(newConnection); 260 261 // Configure the bridges with the new Local connection. 262 initializeInboundDestinationBridgesLocalSide(newConnection); 263 initializeOutboundDestinationBridgesLocalSide(newConnection); 264 265 // Register for any async error notifications now so we can reset in the 266 // case where there's not a lot of activity and a connection drops. 267 newConnection.setExceptionListener(new ExceptionListener() { 268 @Override 269 public void onException(JMSException exception) { 270 handleConnectionFailure(newConnection); 271 } 272 }); 273 274 // At this point all looks good, so this our current connection now. 275 localConnection.set(newConnection); 276 } 277 278 protected void initializeInboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException { 279 if (inboundTopicBridges != null) { 280 TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 281 282 for (InboundTopicBridge bridge : inboundTopicBridges) { 283 String TopicName = bridge.getInboundTopicName(); 284 Topic foreignTopic = createForeignTopic(outboundSession, TopicName); 285 bridge.setConsumer(null); 286 bridge.setConsumerTopic(foreignTopic); 287 bridge.setConsumerConnection(connection); 288 bridge.setJmsConnector(this); 289 addInboundBridge(bridge); 290 } 291 outboundSession.close(); 292 } 293 } 294 295 protected void initializeInboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException { 296 if (inboundTopicBridges != null) { 297 TopicSession localSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 298 299 for (InboundTopicBridge bridge : inboundTopicBridges) { 300 String localTopicName = bridge.getLocalTopicName(); 301 Topic activemqTopic = createActiveMQTopic(localSession, localTopicName); 302 bridge.setProducerTopic(activemqTopic); 303 bridge.setProducerConnection(connection); 304 if (bridge.getJmsMessageConvertor() == null) { 305 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 306 } 307 bridge.setJmsConnector(this); 308 addInboundBridge(bridge); 309 } 310 localSession.close(); 311 } 312 } 313 314 protected void initializeOutboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException { 315 if (outboundTopicBridges != null) { 316 TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 317 318 for (OutboundTopicBridge bridge : outboundTopicBridges) { 319 String topicName = bridge.getOutboundTopicName(); 320 Topic foreignTopic = createForeignTopic(outboundSession, topicName); 321 bridge.setProducerTopic(foreignTopic); 322 bridge.setProducerConnection(connection); 323 if (bridge.getJmsMessageConvertor() == null) { 324 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 325 } 326 bridge.setJmsConnector(this); 327 addOutboundBridge(bridge); 328 } 329 outboundSession.close(); 330 } 331 } 332 333 protected void initializeOutboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException { 334 if (outboundTopicBridges != null) { 335 TopicSession localSession = 336 connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 337 338 for (OutboundTopicBridge bridge : outboundTopicBridges) { 339 String localTopicName = bridge.getLocalTopicName(); 340 Topic activemqTopic = createActiveMQTopic(localSession, localTopicName); 341 bridge.setConsumer(null); 342 bridge.setConsumerTopic(activemqTopic); 343 bridge.setConsumerConnection(connection); 344 bridge.setJmsConnector(this); 345 addOutboundBridge(bridge); 346 } 347 localSession.close(); 348 } 349 } 350 351 protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, 352 Connection replyToConsumerConnection) { 353 Topic replyToProducerTopic = (Topic)destination; 354 boolean isInbound = replyToProducerConnection.equals(localConnection.get()); 355 356 if (isInbound) { 357 InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic); 358 if (bridge == null) { 359 bridge = new InboundTopicBridge() { 360 protected Destination processReplyToDestination(Destination destination) { 361 return null; 362 } 363 }; 364 try { 365 TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection) 366 .createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 367 Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic(); 368 replyToConsumerSession.close(); 369 bridge.setConsumerTopic(replyToConsumerTopic); 370 bridge.setProducerTopic(replyToProducerTopic); 371 bridge.setProducerConnection((TopicConnection)replyToProducerConnection); 372 bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection); 373 bridge.setDoHandleReplyTo(false); 374 if (bridge.getJmsMessageConvertor() == null) { 375 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 376 } 377 bridge.setJmsConnector(this); 378 bridge.start(); 379 LOG.info("Created replyTo bridge for {}", replyToProducerTopic); 380 } catch (Exception e) { 381 LOG.error("Failed to create replyTo bridge for topic: {}", replyToProducerTopic, e); 382 return null; 383 } 384 replyToBridges.put(replyToProducerTopic, bridge); 385 } 386 return bridge.getConsumerTopic(); 387 } else { 388 OutboundTopicBridge bridge = (OutboundTopicBridge)replyToBridges.get(replyToProducerTopic); 389 if (bridge == null) { 390 bridge = new OutboundTopicBridge() { 391 protected Destination processReplyToDestination(Destination destination) { 392 return null; 393 } 394 }; 395 try { 396 TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection) 397 .createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 398 Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic(); 399 replyToConsumerSession.close(); 400 bridge.setConsumerTopic(replyToConsumerTopic); 401 bridge.setProducerTopic(replyToProducerTopic); 402 bridge.setProducerConnection((TopicConnection)replyToProducerConnection); 403 bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection); 404 bridge.setDoHandleReplyTo(false); 405 if (bridge.getJmsMessageConvertor() == null) { 406 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 407 } 408 bridge.setJmsConnector(this); 409 bridge.start(); 410 LOG.info("Created replyTo bridge for {}", replyToProducerTopic); 411 } catch (Exception e) { 412 LOG.error("Failed to create replyTo bridge for topic: {}", replyToProducerTopic, e); 413 return null; 414 } 415 replyToBridges.put(replyToProducerTopic, bridge); 416 } 417 return bridge.getConsumerTopic(); 418 } 419 } 420 421 protected Topic createActiveMQTopic(TopicSession session, String topicName) throws JMSException { 422 return session.createTopic(topicName); 423 } 424 425 protected Topic createForeignTopic(TopicSession session, String topicName) throws JMSException { 426 Topic result = null; 427 428 if (preferJndiDestinationLookup) { 429 try { 430 // look-up the Queue 431 result = (Topic)jndiOutboundTemplate.lookup(topicName, Topic.class); 432 } catch (NamingException e) { 433 try { 434 result = session.createTopic(topicName); 435 } catch (JMSException e1) { 436 String errStr = "Failed to look-up or create Topic for name: " + topicName; 437 LOG.error(errStr, e); 438 JMSException jmsEx = new JMSException(errStr); 439 jmsEx.setLinkedException(e1); 440 throw jmsEx; 441 } 442 } 443 } else { 444 try { 445 result = session.createTopic(topicName); 446 } catch (JMSException e) { 447 // look-up the Topic 448 try { 449 result = (Topic)jndiOutboundTemplate.lookup(topicName, Topic.class); 450 } catch (NamingException e1) { 451 String errStr = "Failed to look-up Topic for name: " + topicName; 452 LOG.error(errStr, e); 453 JMSException jmsEx = new JMSException(errStr); 454 jmsEx.setLinkedException(e1); 455 throw jmsEx; 456 } 457 } 458 } 459 return result; 460 } 461 462}