001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.List;
021import java.util.Set;
022
023import org.apache.activemq.broker.Broker;
024import org.apache.activemq.broker.ConnectionContext;
025import org.apache.activemq.broker.ProducerBrokerExchange;
026import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
027import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
028import org.apache.activemq.command.ActiveMQDestination;
029import org.apache.activemq.command.Message;
030import org.apache.activemq.command.MessageAck;
031import org.apache.activemq.command.MessageDispatchNotification;
032import org.apache.activemq.command.ProducerInfo;
033import org.apache.activemq.store.MessageStore;
034import org.apache.activemq.usage.MemoryUsage;
035import org.apache.activemq.usage.Usage;
036import org.apache.activemq.util.SubscriptionKey;
037
038/**
039 *
040 *
041 */
042public class DestinationFilter implements Destination {
043
044    protected final Destination next;
045
046    public DestinationFilter(Destination next) {
047        this.next = next;
048    }
049
050    @Override
051    public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
052        next.acknowledge(context, sub, ack, node);
053    }
054
055    @Override
056    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
057        next.addSubscription(context, sub);
058    }
059
060    @Override
061    public Message[] browse() {
062        return next.browse();
063    }
064
065    @Override
066    public void dispose(ConnectionContext context) throws IOException {
067        next.dispose(context);
068    }
069
070    @Override
071    public boolean isDisposed() {
072        return next.isDisposed();
073    }
074
075    @Override
076    public void gc() {
077        next.gc();
078    }
079
080    @Override
081    public void markForGC(long timeStamp) {
082        next.markForGC(timeStamp);
083    }
084
085    @Override
086    public boolean canGC() {
087        return next.canGC();
088    }
089
090    @Override
091    public long getInactiveTimeoutBeforeGC() {
092        return next.getInactiveTimeoutBeforeGC();
093    }
094
095    @Override
096    public ActiveMQDestination getActiveMQDestination() {
097        return next.getActiveMQDestination();
098    }
099
100    @Override
101    public DeadLetterStrategy getDeadLetterStrategy() {
102        return next.getDeadLetterStrategy();
103    }
104
105    @Override
106    public DestinationStatistics getDestinationStatistics() {
107        return next.getDestinationStatistics();
108    }
109
110    @Override
111    public String getName() {
112        return next.getName();
113    }
114
115    @Override
116    public MemoryUsage getMemoryUsage() {
117        return next.getMemoryUsage();
118    }
119
120    @Override
121    public void setMemoryUsage(MemoryUsage memoryUsage) {
122        next.setMemoryUsage(memoryUsage);
123    }
124
125    @Override
126    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
127        next.removeSubscription(context, sub, lastDeliveredSequenceId);
128    }
129
130    @Override
131    public void send(ProducerBrokerExchange context, Message messageSend) throws Exception {
132        next.send(context, messageSend);
133    }
134
135    @Override
136    public void start() throws Exception {
137        next.start();
138    }
139
140    @Override
141    public void stop() throws Exception {
142        next.stop();
143    }
144
145    @Override
146    public List<Subscription> getConsumers() {
147        return next.getConsumers();
148    }
149
150    /**
151     * Sends a message to the given destination which may be a wildcard
152     *
153     * @param context broker context
154     * @param message message to send
155     * @param destination possibly wildcard destination to send the message to
156     * @throws Exception on error
157     */
158    protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception {
159        Broker broker = context.getConnectionContext().getBroker();
160        Set<Destination> destinations = broker.getDestinations(destination);
161
162        for (Destination dest : destinations) {
163            dest.send(context, message.copy());
164        }
165    }
166
167    @Override
168    public MessageStore getMessageStore() {
169        return next.getMessageStore();
170    }
171
172    @Override
173    public boolean isProducerFlowControl() {
174        return next.isProducerFlowControl();
175    }
176
177    @Override
178    public void setProducerFlowControl(boolean value) {
179        next.setProducerFlowControl(value);
180    }
181
182    @Override
183    public boolean isAlwaysRetroactive() {
184        return next.isAlwaysRetroactive();
185    }
186
187    @Override
188    public void setAlwaysRetroactive(boolean value) {
189        next.setAlwaysRetroactive(value);
190    }
191
192    @Override
193    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
194        next.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
195    }
196
197    @Override
198    public long getBlockedProducerWarningInterval() {
199        return next.getBlockedProducerWarningInterval();
200    }
201
202    @Override
203    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
204        next.addProducer(context, info);
205    }
206
207    @Override
208    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
209        next.removeProducer(context, info);
210    }
211
212    @Override
213    public int getMaxAuditDepth() {
214        return next.getMaxAuditDepth();
215    }
216
217    @Override
218    public int getMaxProducersToAudit() {
219        return next.getMaxProducersToAudit();
220    }
221
222    @Override
223    public boolean isEnableAudit() {
224        return next.isEnableAudit();
225    }
226
227    @Override
228    public void setEnableAudit(boolean enableAudit) {
229        next.setEnableAudit(enableAudit);
230    }
231
232    @Override
233    public void setMaxAuditDepth(int maxAuditDepth) {
234        next.setMaxAuditDepth(maxAuditDepth);
235    }
236
237    @Override
238    public void setMaxProducersToAudit(int maxProducersToAudit) {
239        next.setMaxProducersToAudit(maxProducersToAudit);
240    }
241
242    @Override
243    public boolean isActive() {
244        return next.isActive();
245    }
246
247    @Override
248    public int getMaxPageSize() {
249        return next.getMaxPageSize();
250    }
251
252    @Override
253    public void setMaxPageSize(int maxPageSize) {
254        next.setMaxPageSize(maxPageSize);
255    }
256
257    @Override
258    public boolean isUseCache() {
259        return next.isUseCache();
260    }
261
262    @Override
263    public void setUseCache(boolean useCache) {
264        next.setUseCache(useCache);
265    }
266
267    @Override
268    public int getMinimumMessageSize() {
269        return next.getMinimumMessageSize();
270    }
271
272    @Override
273    public void setMinimumMessageSize(int minimumMessageSize) {
274        next.setMinimumMessageSize(minimumMessageSize);
275    }
276
277    @Override
278    public void wakeup() {
279        next.wakeup();
280    }
281
282    @Override
283    public boolean isLazyDispatch() {
284        return next.isLazyDispatch();
285    }
286
287    @Override
288    public void setLazyDispatch(boolean value) {
289        next.setLazyDispatch(value);
290    }
291
292    public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node) {
293        next.messageExpired(context, prefetchSubscription, node);
294    }
295
296    @Override
297    public boolean iterate() {
298        return next.iterate();
299    }
300
301    @Override
302    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
303        next.fastProducer(context, producerInfo);
304    }
305
306    @Override
307    public void isFull(ConnectionContext context, Usage<?> usage) {
308        next.isFull(context, usage);
309    }
310
311    @Override
312    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
313        next.messageConsumed(context, messageReference);
314    }
315
316    @Override
317    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
318        next.messageDelivered(context, messageReference);
319    }
320
321    @Override
322    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
323        next.messageDiscarded(context, sub, messageReference);
324    }
325
326    @Override
327    public void slowConsumer(ConnectionContext context, Subscription subs) {
328        next.slowConsumer(context, subs);
329    }
330
331    @Override
332    public void messageExpired(ConnectionContext context, Subscription subs, MessageReference node) {
333        next.messageExpired(context, subs, node);
334    }
335
336    @Override
337    public int getMaxBrowsePageSize() {
338        return next.getMaxBrowsePageSize();
339    }
340
341    @Override
342    public void setMaxBrowsePageSize(int maxPageSize) {
343        next.setMaxBrowsePageSize(maxPageSize);
344    }
345
346    @Override
347    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
348        next.processDispatchNotification(messageDispatchNotification);
349    }
350
351    @Override
352    public int getCursorMemoryHighWaterMark() {
353        return next.getCursorMemoryHighWaterMark();
354    }
355
356    @Override
357    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
358        next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
359    }
360
361    @Override
362    public boolean isPrioritizedMessages() {
363        return next.isPrioritizedMessages();
364    }
365
366    @Override
367    public SlowConsumerStrategy getSlowConsumerStrategy() {
368        return next.getSlowConsumerStrategy();
369    }
370
371    @Override
372    public boolean isDoOptimzeMessageStorage() {
373        return next.isDoOptimzeMessageStorage();
374    }
375
376    @Override
377    public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
378        next.setDoOptimzeMessageStorage(doOptimzeMessageStorage);
379    }
380
381    @Override
382    public void clearPendingMessages(int pendingAdditionsCount) {
383        next.clearPendingMessages(pendingAdditionsCount);
384    }
385
386    @Override
387    public void duplicateFromStore(Message message, Subscription subscription) {
388        next.duplicateFromStore(message, subscription);
389    }
390
391    public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
392        if (next instanceof DestinationFilter) {
393            DestinationFilter filter = (DestinationFilter) next;
394            filter.deleteSubscription(context, key);
395        } else if (next instanceof Topic) {
396            Topic topic = (Topic)next;
397            topic.deleteSubscription(context, key);
398        }
399    }
400
401    public Destination getNext() {
402        return next;
403    }
404
405    public <T> T getAdaptor(Class <? extends T> clazz) {
406        if (clazz.isInstance(this)) {
407            return clazz.cast(this);
408        } else if (next != null && clazz.isInstance(next)) {
409            return clazz.cast(next);
410        } else if (next instanceof DestinationFilter) {
411            return ((DestinationFilter)next).getAdaptor(clazz);
412        }
413        return null;
414    }
415}