001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.virtual; 018 019import org.apache.activemq.broker.*; 020import org.apache.activemq.broker.region.Destination; 021import org.apache.activemq.broker.region.DestinationFilter; 022import org.apache.activemq.broker.region.DestinationInterceptor; 023import org.apache.activemq.command.ActiveMQDestination; 024import org.apache.activemq.command.ActiveMQTopic; 025import org.apache.activemq.command.Message; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029/** 030 * Creates <a href="http://activemq.org/site/mirrored-queues.html">Mirrored 031 * Queue</a> using a prefix and postfix to define the topic name on which to mirror the queue to. 032 * 033 * 034 * @org.apache.xbean.XBean 035 */ 036public class MirroredQueue implements DestinationInterceptor, BrokerServiceAware { 037 private static final transient Logger LOG = LoggerFactory.getLogger(MirroredQueue.class); 038 private String prefix = "VirtualTopic.Mirror."; 039 private String postfix = ""; 040 private boolean copyMessage = true; 041 private BrokerService brokerService; 042 043 public Destination intercept(final Destination destination) { 044 if (destination.getActiveMQDestination().isQueue()) { 045 if (!destination.getActiveMQDestination().isTemporary() || brokerService.isUseTempMirroredQueues()) { 046 try { 047 final Destination mirrorDestination = getMirrorDestination(destination); 048 if (mirrorDestination != null) { 049 return new DestinationFilter(destination) { 050 public void send(ProducerBrokerExchange context, Message message) throws Exception { 051 message.setDestination(mirrorDestination.getActiveMQDestination()); 052 mirrorDestination.send(context, message); 053 054 if (isCopyMessage()) { 055 message = message.copy(); 056 } 057 message.setDestination(destination.getActiveMQDestination()); 058 message.setMemoryUsage(null); // set this to null so that it will use the queue memoryUsage instance instead of the topic. 059 super.send(context, message); 060 } 061 }; 062 } 063 } 064 catch (Exception e) { 065 LOG.error("Failed to lookup the mirror destination for: {}", destination, e); 066 } 067 } 068 } 069 return destination; 070 } 071 072 073 public void remove(Destination destination) { 074 if (brokerService == null) { 075 throw new IllegalArgumentException("No brokerService injected!"); 076 } 077 ActiveMQDestination topic = getMirrorTopic(destination.getActiveMQDestination()); 078 if (topic != null) { 079 try { 080 brokerService.removeDestination(topic); 081 } catch (Exception e) { 082 LOG.error("Failed to remove mirror destination for {}", destination, e); 083 } 084 } 085 086 } 087 088 public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) {} 089 090 // Properties 091 // ------------------------------------------------------------------------- 092 093 public String getPostfix() { 094 return postfix; 095 } 096 097 /** 098 * Sets any postix used to identify the queue consumers 099 */ 100 public void setPostfix(String postfix) { 101 this.postfix = postfix; 102 } 103 104 public String getPrefix() { 105 return prefix; 106 } 107 108 /** 109 * Sets the prefix wildcard used to identify the queue consumers for a given 110 * topic 111 */ 112 public void setPrefix(String prefix) { 113 this.prefix = prefix; 114 } 115 116 public boolean isCopyMessage() { 117 return copyMessage; 118 } 119 120 /** 121 * Sets whether a copy of the message will be sent to each destination. 122 * Defaults to true so that the forward destination is set as the 123 * destination of the message 124 */ 125 public void setCopyMessage(boolean copyMessage) { 126 this.copyMessage = copyMessage; 127 } 128 129 public void setBrokerService(BrokerService brokerService) { 130 this.brokerService = brokerService; 131 } 132 133 // Implementation methods 134 //------------------------------------------------------------------------- 135 protected Destination getMirrorDestination(Destination destination) throws Exception { 136 if (brokerService == null) { 137 throw new IllegalArgumentException("No brokerService injected!"); 138 } 139 ActiveMQDestination topic = getMirrorTopic(destination.getActiveMQDestination()); 140 return brokerService.getDestination(topic); 141 } 142 143 protected ActiveMQDestination getMirrorTopic(ActiveMQDestination original) { 144 return new ActiveMQTopic(prefix + original.getPhysicalName() + postfix); 145 } 146 147}