001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network.jms; 018 019import java.util.Iterator; 020import java.util.List; 021import java.util.Map; 022import java.util.concurrent.CopyOnWriteArrayList; 023import java.util.concurrent.LinkedBlockingQueue; 024import java.util.concurrent.ThreadFactory; 025import java.util.concurrent.ThreadPoolExecutor; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicBoolean; 028import java.util.concurrent.atomic.AtomicReference; 029 030import javax.jms.Connection; 031import javax.jms.Destination; 032 033import org.apache.activemq.ActiveMQConnectionFactory; 034import org.apache.activemq.Service; 035import org.apache.activemq.broker.BrokerService; 036import org.apache.activemq.util.LRUCache; 037import org.apache.activemq.util.ThreadPoolUtils; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * This bridge joins the gap between foreign JMS providers and ActiveMQ As some 043 * JMS providers are still only in compliance with JMS v1.0.1 , this bridge itself 044 * aimed to be in compliance with the JMS 1.0.2 specification. 045 */ 046public abstract class JmsConnector implements Service { 047 048 private static int nextId; 049 private static final Logger LOG = LoggerFactory.getLogger(JmsConnector.class); 050 051 protected boolean preferJndiDestinationLookup = false; 052 protected JndiLookupFactory jndiLocalTemplate; 053 protected JndiLookupFactory jndiOutboundTemplate; 054 protected JmsMesageConvertor inboundMessageConvertor; 055 protected JmsMesageConvertor outboundMessageConvertor; 056 protected AtomicBoolean initialized = new AtomicBoolean(false); 057 protected AtomicBoolean localSideInitialized = new AtomicBoolean(false); 058 protected AtomicBoolean foreignSideInitialized = new AtomicBoolean(false); 059 protected AtomicBoolean started = new AtomicBoolean(false); 060 protected AtomicBoolean failed = new AtomicBoolean(); 061 protected AtomicReference<Connection> foreignConnection = new AtomicReference<Connection>(); 062 protected AtomicReference<Connection> localConnection = new AtomicReference<Connection>(); 063 protected ActiveMQConnectionFactory embeddedConnectionFactory; 064 protected int replyToDestinationCacheSize = 10000; 065 protected String outboundUsername; 066 protected String outboundPassword; 067 protected String localUsername; 068 protected String localPassword; 069 protected String outboundClientId; 070 protected String localClientId; 071 protected LRUCache<Destination, DestinationBridge> replyToBridges = createLRUCache(); 072 073 private ReconnectionPolicy policy = new ReconnectionPolicy(); 074 protected ThreadPoolExecutor connectionSerivce; 075 private final List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>(); 076 private final List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>(); 077 private String name; 078 079 private static LRUCache<Destination, DestinationBridge> createLRUCache() { 080 return new LRUCache<Destination, DestinationBridge>() { 081 private static final long serialVersionUID = -7446792754185879286L; 082 083 @Override 084 protected boolean removeEldestEntry(Map.Entry<Destination, DestinationBridge> enty) { 085 if (size() > maxCacheSize) { 086 Iterator<Map.Entry<Destination, DestinationBridge>> iter = entrySet().iterator(); 087 Map.Entry<Destination, DestinationBridge> lru = iter.next(); 088 remove(lru.getKey()); 089 DestinationBridge bridge = lru.getValue(); 090 try { 091 bridge.stop(); 092 LOG.info("Expired bridge: {}", bridge); 093 } catch (Exception e) { 094 LOG.warn("Stopping expired bridge {} caused an exception", bridge, e); 095 } 096 } 097 return false; 098 } 099 }; 100 } 101 102 public boolean init() { 103 boolean result = initialized.compareAndSet(false, true); 104 if (result) { 105 if (jndiLocalTemplate == null) { 106 jndiLocalTemplate = new JndiLookupFactory(); 107 } 108 if (jndiOutboundTemplate == null) { 109 jndiOutboundTemplate = new JndiLookupFactory(); 110 } 111 if (inboundMessageConvertor == null) { 112 inboundMessageConvertor = new SimpleJmsMessageConvertor(); 113 } 114 if (outboundMessageConvertor == null) { 115 outboundMessageConvertor = new SimpleJmsMessageConvertor(); 116 } 117 replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize()); 118 119 connectionSerivce = createExecutor(); 120 121 // Subclasses can override this to customize their own it. 122 result = doConnectorInit(); 123 } 124 return result; 125 } 126 127 protected boolean doConnectorInit() { 128 129 // We try to make a connection via a sync call first so that the 130 // JmsConnector is fully initialized before the start call returns 131 // in order to avoid missing any messages that are dispatched 132 // immediately after startup. If either side fails we queue an 133 // asynchronous task to manage the reconnect attempts. 134 135 try { 136 initializeLocalConnection(); 137 localSideInitialized.set(true); 138 } catch(Exception e) { 139 // Queue up the task to attempt the local connection. 140 scheduleAsyncLocalConnectionReconnect(); 141 } 142 143 try { 144 initializeForeignConnection(); 145 foreignSideInitialized.set(true); 146 } catch(Exception e) { 147 // Queue up the task for the foreign connection now. 148 scheduleAsyncForeignConnectionReconnect(); 149 } 150 151 return true; 152 } 153 154 @Override 155 public void start() throws Exception { 156 if (started.compareAndSet(false, true)) { 157 init(); 158 for (DestinationBridge bridge : inboundBridges) { 159 bridge.start(); 160 } 161 for (DestinationBridge bridge : outboundBridges) { 162 bridge.start(); 163 } 164 LOG.info("JMS Connector {} started", getName()); 165 } 166 } 167 168 @Override 169 public void stop() throws Exception { 170 if (started.compareAndSet(true, false)) { 171 172 ThreadPoolUtils.shutdown(connectionSerivce); 173 connectionSerivce = null; 174 175 if (foreignConnection.get() != null) { 176 try { 177 foreignConnection.get().close(); 178 } catch (Exception e) { 179 } 180 } 181 182 if (localConnection.get() != null) { 183 try { 184 localConnection.get().close(); 185 } catch (Exception e) { 186 } 187 } 188 189 for (DestinationBridge bridge : inboundBridges) { 190 bridge.stop(); 191 } 192 for (DestinationBridge bridge : outboundBridges) { 193 bridge.stop(); 194 } 195 LOG.info("JMS Connector {} stopped", getName()); 196 } 197 } 198 199 public void clearBridges() { 200 inboundBridges.clear(); 201 outboundBridges.clear(); 202 replyToBridges.clear(); 203 } 204 205 protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection); 206 207 /** 208 * One way to configure the local connection - this is called by The 209 * BrokerService when the Connector is embedded 210 * 211 * @param service 212 */ 213 public void setBrokerService(BrokerService service) { 214 embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI()); 215 } 216 217 public Connection getLocalConnection() { 218 return this.localConnection.get(); 219 } 220 221 public Connection getForeignConnection() { 222 return this.foreignConnection.get(); 223 } 224 225 /** 226 * @return Returns the jndiTemplate. 227 */ 228 public JndiLookupFactory getJndiLocalTemplate() { 229 return jndiLocalTemplate; 230 } 231 232 /** 233 * @param jndiTemplate The jndiTemplate to set. 234 */ 235 public void setJndiLocalTemplate(JndiLookupFactory jndiTemplate) { 236 this.jndiLocalTemplate = jndiTemplate; 237 } 238 239 /** 240 * @return Returns the jndiOutboundTemplate. 241 */ 242 public JndiLookupFactory getJndiOutboundTemplate() { 243 return jndiOutboundTemplate; 244 } 245 246 /** 247 * @param jndiOutboundTemplate The jndiOutboundTemplate to set. 248 */ 249 public void setJndiOutboundTemplate(JndiLookupFactory jndiOutboundTemplate) { 250 this.jndiOutboundTemplate = jndiOutboundTemplate; 251 } 252 253 /** 254 * @return Returns the inboundMessageConvertor. 255 */ 256 public JmsMesageConvertor getInboundMessageConvertor() { 257 return inboundMessageConvertor; 258 } 259 260 /** 261 * @param inboundMessageConvertor The inboundMessageConvertor to set. 262 */ 263 public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) { 264 this.inboundMessageConvertor = jmsMessageConvertor; 265 } 266 267 /** 268 * @return Returns the outboundMessageConvertor. 269 */ 270 public JmsMesageConvertor getOutboundMessageConvertor() { 271 return outboundMessageConvertor; 272 } 273 274 /** 275 * @param outboundMessageConvertor The outboundMessageConvertor to set. 276 */ 277 public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor) { 278 this.outboundMessageConvertor = outboundMessageConvertor; 279 } 280 281 /** 282 * @return Returns the replyToDestinationCacheSize. 283 */ 284 public int getReplyToDestinationCacheSize() { 285 return replyToDestinationCacheSize; 286 } 287 288 /** 289 * @param replyToDestinationCacheSize The replyToDestinationCacheSize to set. 290 */ 291 public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) { 292 this.replyToDestinationCacheSize = replyToDestinationCacheSize; 293 } 294 295 /** 296 * @return Returns the localPassword. 297 */ 298 public String getLocalPassword() { 299 return localPassword; 300 } 301 302 /** 303 * @param localPassword The localPassword to set. 304 */ 305 public void setLocalPassword(String localPassword) { 306 this.localPassword = localPassword; 307 } 308 309 /** 310 * @return Returns the localUsername. 311 */ 312 public String getLocalUsername() { 313 return localUsername; 314 } 315 316 /** 317 * @param localUsername The localUsername to set. 318 */ 319 public void setLocalUsername(String localUsername) { 320 this.localUsername = localUsername; 321 } 322 323 /** 324 * @return Returns the outboundPassword. 325 */ 326 public String getOutboundPassword() { 327 return outboundPassword; 328 } 329 330 /** 331 * @param outboundPassword The outboundPassword to set. 332 */ 333 public void setOutboundPassword(String outboundPassword) { 334 this.outboundPassword = outboundPassword; 335 } 336 337 /** 338 * @return Returns the outboundUsername. 339 */ 340 public String getOutboundUsername() { 341 return outboundUsername; 342 } 343 344 /** 345 * @param outboundUsername The outboundUsername to set. 346 */ 347 public void setOutboundUsername(String outboundUsername) { 348 this.outboundUsername = outboundUsername; 349 } 350 351 /** 352 * @return the outboundClientId 353 */ 354 public String getOutboundClientId() { 355 return outboundClientId; 356 } 357 358 /** 359 * @param outboundClientId the outboundClientId to set 360 */ 361 public void setOutboundClientId(String outboundClientId) { 362 this.outboundClientId = outboundClientId; 363 } 364 365 /** 366 * @return the localClientId 367 */ 368 public String getLocalClientId() { 369 return localClientId; 370 } 371 372 /** 373 * @param localClientId the localClientId to set 374 */ 375 public void setLocalClientId(String localClientId) { 376 this.localClientId = localClientId; 377 } 378 379 /** 380 * @return the currently configured reconnection policy. 381 */ 382 public ReconnectionPolicy getReconnectionPolicy() { 383 return this.policy; 384 } 385 386 /** 387 * @param policy The new reconnection policy this {@link JmsConnector} should use. 388 */ 389 public void setReconnectionPolicy(ReconnectionPolicy policy) { 390 this.policy = policy; 391 } 392 393 /** 394 * @return the preferJndiDestinationLookup 395 */ 396 public boolean isPreferJndiDestinationLookup() { 397 return preferJndiDestinationLookup; 398 } 399 400 /** 401 * Sets whether the connector should prefer to first try to find a destination in JNDI before 402 * using JMS semantics to create a Destination. By default the connector will first use JMS 403 * semantics and then fall-back to JNDI lookup, setting this value to true will reverse that 404 * ordering. 405 * 406 * @param preferJndiDestinationLookup the preferJndiDestinationLookup to set 407 */ 408 public void setPreferJndiDestinationLookup(boolean preferJndiDestinationLookup) { 409 this.preferJndiDestinationLookup = preferJndiDestinationLookup; 410 } 411 412 /** 413 * @return returns true if the {@link JmsConnector} is connected to both brokers. 414 */ 415 public boolean isConnected() { 416 return localConnection.get() != null && foreignConnection.get() != null; 417 } 418 419 protected void addInboundBridge(DestinationBridge bridge) { 420 if (!inboundBridges.contains(bridge)) { 421 inboundBridges.add(bridge); 422 } 423 } 424 425 protected void addOutboundBridge(DestinationBridge bridge) { 426 if (!outboundBridges.contains(bridge)) { 427 outboundBridges.add(bridge); 428 } 429 } 430 431 protected void removeInboundBridge(DestinationBridge bridge) { 432 inboundBridges.remove(bridge); 433 } 434 435 protected void removeOutboundBridge(DestinationBridge bridge) { 436 outboundBridges.remove(bridge); 437 } 438 439 public String getName() { 440 if (name == null) { 441 name = "Connector:" + getNextId(); 442 } 443 return name; 444 } 445 446 public void setName(String name) { 447 this.name = name; 448 } 449 450 private static synchronized int getNextId() { 451 return nextId++; 452 } 453 454 public boolean isFailed() { 455 return this.failed.get(); 456 } 457 458 /** 459 * Performs the work of connection to the local side of the Connection. 460 * <p> 461 * This creates the initial connection to the local end of the {@link JmsConnector} 462 * and then sets up all the destination bridges with the information needed to bridge 463 * on the local side of the connection. 464 * 465 * @throws Exception if the connection cannot be established for any reason. 466 */ 467 protected abstract void initializeLocalConnection() throws Exception; 468 469 /** 470 * Performs the work of connection to the foreign side of the Connection. 471 * <p> 472 * This creates the initial connection to the foreign end of the {@link JmsConnector} 473 * and then sets up all the destination bridges with the information needed to bridge 474 * on the foreign side of the connection. 475 * 476 * @throws Exception if the connection cannot be established for any reason. 477 */ 478 protected abstract void initializeForeignConnection() throws Exception; 479 480 /** 481 * Callback method that the Destination bridges can use to report an exception to occurs 482 * during normal bridging operations. 483 * 484 * @param connection 485 * The connection that was in use when the failure occured. 486 */ 487 void handleConnectionFailure(Connection connection) { 488 489 // Can happen if async exception listener kicks in at the same time. 490 if (connection == null || !this.started.get()) { 491 return; 492 } 493 494 LOG.info("JmsConnector handling loss of connection [{}]", connection.toString()); 495 496 // TODO - How do we handle the re-wiring of replyToBridges in this case. 497 replyToBridges.clear(); 498 499 if (this.foreignConnection.compareAndSet(connection, null)) { 500 501 // Stop the inbound bridges when the foreign connection is dropped since 502 // the bridge has no consumer and needs to be restarted once a new connection 503 // to the foreign side is made. 504 for (DestinationBridge bridge : inboundBridges) { 505 try { 506 bridge.stop(); 507 } catch(Exception e) { 508 } 509 } 510 511 // We got here first and cleared the connection, now we queue a reconnect. 512 this.connectionSerivce.execute(new Runnable() { 513 514 @Override 515 public void run() { 516 try { 517 doInitializeConnection(false); 518 } catch (Exception e) { 519 LOG.error("Failed to initialize foreign connection for the JMSConnector", e); 520 } 521 } 522 }); 523 524 } else if (this.localConnection.compareAndSet(connection, null)) { 525 526 // Stop the outbound bridges when the local connection is dropped since 527 // the bridge has no consumer and needs to be restarted once a new connection 528 // to the local side is made. 529 for (DestinationBridge bridge : outboundBridges) { 530 try { 531 bridge.stop(); 532 } catch(Exception e) { 533 } 534 } 535 536 // We got here first and cleared the connection, now we queue a reconnect. 537 this.connectionSerivce.execute(new Runnable() { 538 539 @Override 540 public void run() { 541 try { 542 doInitializeConnection(true); 543 } catch (Exception e) { 544 LOG.error("Failed to initialize local connection for the JMSConnector", e); 545 } 546 } 547 }); 548 } 549 } 550 551 private void scheduleAsyncLocalConnectionReconnect() { 552 this.connectionSerivce.execute(new Runnable() { 553 @Override 554 public void run() { 555 try { 556 doInitializeConnection(true); 557 } catch (Exception e) { 558 LOG.error("Failed to initialize local connection for the JMSConnector", e); 559 } 560 } 561 }); 562 } 563 564 private void scheduleAsyncForeignConnectionReconnect() { 565 this.connectionSerivce.execute(new Runnable() { 566 @Override 567 public void run() { 568 try { 569 doInitializeConnection(false); 570 } catch (Exception e) { 571 LOG.error("Failed to initialize foreign connection for the JMSConnector", e); 572 } 573 } 574 }); 575 } 576 577 private void doInitializeConnection(boolean local) throws Exception { 578 579 int attempt = 0; 580 581 final int maxRetries; 582 if (local) { 583 maxRetries = !localSideInitialized.get() ? policy.getMaxInitialConnectAttempts() : 584 policy.getMaxReconnectAttempts(); 585 } else { 586 maxRetries = !foreignSideInitialized.get() ? policy.getMaxInitialConnectAttempts() : 587 policy.getMaxReconnectAttempts(); 588 } 589 590 do 591 { 592 if (attempt > 0) { 593 try { 594 Thread.sleep(policy.getNextDelay(attempt)); 595 } catch(InterruptedException e) { 596 } 597 } 598 599 if (connectionSerivce.isTerminating()) { 600 return; 601 } 602 603 try { 604 605 if (local) { 606 initializeLocalConnection(); 607 localSideInitialized.set(true); 608 } else { 609 initializeForeignConnection(); 610 foreignSideInitialized.set(true); 611 } 612 613 // Once we are connected we ensure all the bridges are started. 614 if (localConnection.get() != null && foreignConnection.get() != null) { 615 for (DestinationBridge bridge : inboundBridges) { 616 bridge.start(); 617 } 618 for (DestinationBridge bridge : outboundBridges) { 619 bridge.start(); 620 } 621 } 622 623 return; 624 } catch(Exception e) { 625 LOG.debug("Failed to establish initial {} connection for JmsConnector [{}]", new Object[]{ (local ? "local" : "foreign"), attempt }, e); 626 } 627 } 628 while (maxRetries < ++attempt && !connectionSerivce.isTerminating()); 629 630 this.failed.set(true); 631 } 632 633 private final ThreadFactory factory = new ThreadFactory() { 634 @Override 635 public Thread newThread(Runnable runnable) { 636 Thread thread = new Thread(runnable, "JmsConnector Async Connection Task: "); 637 thread.setDaemon(true); 638 return thread; 639 } 640 }; 641 642 private ThreadPoolExecutor createExecutor() { 643 ThreadPoolExecutor exec = new ThreadPoolExecutor(0, 2, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory); 644 exec.allowCoreThreadTimeOut(true); 645 return exec; 646 } 647}