001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.policy; 018 019import org.apache.activemq.broker.region.Destination; 020import org.apache.activemq.broker.region.DurableTopicSubscription; 021import org.apache.activemq.broker.region.Subscription; 022import org.apache.activemq.command.ActiveMQDestination; 023import org.apache.activemq.command.ActiveMQQueue; 024import org.apache.activemq.command.ActiveMQTopic; 025import org.apache.activemq.command.Message; 026 027/** 028 * A {@link DeadLetterStrategy} where each destination has its own individual 029 * DLQ using the subject naming hierarchy. 030 * 031 * @org.apache.xbean.XBean 032 * 033 */ 034public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy { 035 036 private String topicPrefix = "ActiveMQ.DLQ.Topic."; 037 private String queuePrefix = "ActiveMQ.DLQ.Queue."; 038 private String topicSuffix; 039 private String queueSuffix; 040 private boolean useQueueForQueueMessages = true; 041 private boolean useQueueForTopicMessages = true; 042 private boolean destinationPerDurableSubscriber; 043 044 public ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription) { 045 if (message.getDestination().isQueue()) { 046 return createDestination(message, queuePrefix, queueSuffix, useQueueForQueueMessages, subscription); 047 } else { 048 return createDestination(message, topicPrefix, topicSuffix, useQueueForTopicMessages, subscription); 049 } 050 } 051 052 // Properties 053 // ------------------------------------------------------------------------- 054 055 public String getQueuePrefix() { 056 return queuePrefix; 057 } 058 059 /** 060 * Sets the prefix to use for all dead letter queues for queue messages 061 */ 062 public void setQueuePrefix(String queuePrefix) { 063 this.queuePrefix = queuePrefix; 064 } 065 066 public String getTopicPrefix() { 067 return topicPrefix; 068 } 069 070 /** 071 * Sets the prefix to use for all dead letter queues for topic messages 072 */ 073 public void setTopicPrefix(String topicPrefix) { 074 this.topicPrefix = topicPrefix; 075 } 076 077 public String getQueueSuffix() { 078 return queueSuffix; 079 } 080 081 /** 082 * Sets the suffix to use for all dead letter queues for queue messages 083 */ 084 public void setQueueSuffix(String queueSuffix) { 085 this.queueSuffix = queueSuffix; 086 } 087 088 public String getTopicSuffix() { 089 return topicSuffix; 090 } 091 092 /** 093 * Sets the suffix to use for all dead letter queues for topic messages 094 */ 095 public void setTopicSuffix(String topicSuffix) { 096 this.topicSuffix = topicSuffix; 097 } 098 099 public boolean isUseQueueForQueueMessages() { 100 return useQueueForQueueMessages; 101 } 102 103 /** 104 * Sets whether a queue or topic should be used for queue messages sent to a 105 * DLQ. The default is to use a Queue 106 */ 107 public void setUseQueueForQueueMessages(boolean useQueueForQueueMessages) { 108 this.useQueueForQueueMessages = useQueueForQueueMessages; 109 } 110 111 public boolean isUseQueueForTopicMessages() { 112 return useQueueForTopicMessages; 113 } 114 115 /** 116 * Sets whether a queue or topic should be used for topic messages sent to a 117 * DLQ. The default is to use a Queue 118 */ 119 public void setUseQueueForTopicMessages(boolean useQueueForTopicMessages) { 120 this.useQueueForTopicMessages = useQueueForTopicMessages; 121 } 122 123 public boolean isDestinationPerDurableSubscriber() { 124 return destinationPerDurableSubscriber; 125 } 126 127 /** 128 * sets whether durable topic subscriptions are to get individual dead letter destinations. 129 * When true, the DLQ is of the form 'topicPrefix.clientId:subscriptionName' 130 * The default is false. 131 * @param destinationPerDurableSubscriber 132 */ 133 public void setDestinationPerDurableSubscriber(boolean destinationPerDurableSubscriber) { 134 this.destinationPerDurableSubscriber = destinationPerDurableSubscriber; 135 } 136 137 // Implementation methods 138 // ------------------------------------------------------------------------- 139 protected ActiveMQDestination createDestination(Message message, 140 String prefix, 141 String suffix, 142 boolean useQueue, 143 Subscription subscription ) { 144 String name = null; 145 146 Destination regionDestination = (Destination) message.getRegionDestination(); 147 if (regionDestination != null 148 && regionDestination.getActiveMQDestination() != null 149 && regionDestination.getActiveMQDestination().getPhysicalName() != null 150 && !regionDestination.getActiveMQDestination().getPhysicalName().isEmpty()){ 151 name = prefix + regionDestination.getActiveMQDestination().getPhysicalName(); 152 } else { 153 name = prefix + message.getDestination().getPhysicalName(); 154 } 155 156 if (destinationPerDurableSubscriber && subscription instanceof DurableTopicSubscription) { 157 name += "." + ((DurableTopicSubscription)subscription).getSubscriptionKey(); 158 } 159 160 if (suffix != null && !suffix.isEmpty()) { 161 name += suffix; 162 } 163 164 if (useQueue) { 165 return new ActiveMQQueue(name); 166 } else { 167 return new ActiveMQTopic(name); 168 } 169 } 170 171}