001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store; 018 019import java.io.IOException; 020import java.util.concurrent.Future; 021 022import org.apache.activemq.broker.ConnectionContext; 023import org.apache.activemq.command.ActiveMQDestination; 024import org.apache.activemq.command.Message; 025import org.apache.activemq.command.MessageAck; 026import org.apache.activemq.command.MessageId; 027import org.apache.activemq.command.SubscriptionInfo; 028import org.apache.activemq.usage.MemoryUsage; 029 030/** 031 * A simple proxy that delegates to another MessageStore. 032 */ 033public class ProxyTopicMessageStore extends ProxyMessageStore implements TopicMessageStore { 034 035 public ProxyTopicMessageStore(TopicMessageStore delegate) { 036 super(delegate); 037 } 038 039 public MessageStore getDelegate() { 040 return delegate; 041 } 042 043 @Override 044 public void addMessage(ConnectionContext context, Message message) throws IOException { 045 delegate.addMessage(context, message); 046 } 047 048 @Override 049 public void addMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { 050 delegate.addMessage(context, message, canOptimizeHint); 051 } 052 053 @Override 054 public Message getMessage(MessageId identity) throws IOException { 055 return delegate.getMessage(identity); 056 } 057 058 @Override 059 public void recover(MessageRecoveryListener listener) throws Exception { 060 delegate.recover(listener); 061 } 062 063 @Override 064 public void removeAllMessages(ConnectionContext context) throws IOException { 065 delegate.removeAllMessages(context); 066 } 067 068 @Override 069 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 070 delegate.removeMessage(context, ack); 071 } 072 073 @Override 074 public void start() throws Exception { 075 delegate.start(); 076 } 077 078 @Override 079 public void stop() throws Exception { 080 delegate.stop(); 081 } 082 083 @Override 084 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 085 return ((TopicMessageStore)delegate).lookupSubscription(clientId, subscriptionName); 086 } 087 088 @Override 089 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 090 MessageId messageId, MessageAck ack) throws IOException { 091 ((TopicMessageStore)delegate).acknowledge(context, clientId, subscriptionName, messageId, ack); 092 } 093 094 @Override 095 public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 096 ((TopicMessageStore)delegate).addSubscription(subscriptionInfo, retroactive); 097 } 098 099 @Override 100 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 101 ((TopicMessageStore)delegate).deleteSubscription(clientId, subscriptionName); 102 } 103 104 @Override 105 public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) 106 throws Exception { 107 ((TopicMessageStore)delegate).recoverSubscription(clientId, subscriptionName, listener); 108 } 109 110 @Override 111 public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, 112 MessageRecoveryListener listener) throws Exception { 113 ((TopicMessageStore)delegate).recoverNextMessages(clientId, subscriptionName, maxReturned, listener); 114 } 115 116 @Override 117 public void resetBatching(String clientId, String subscriptionName) { 118 ((TopicMessageStore)delegate).resetBatching(clientId, subscriptionName); 119 } 120 121 @Override 122 public ActiveMQDestination getDestination() { 123 return delegate.getDestination(); 124 } 125 126 @Override 127 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 128 return ((TopicMessageStore)delegate).getAllSubscriptions(); 129 } 130 131 @Override 132 public void setMemoryUsage(MemoryUsage memoryUsage) { 133 delegate.setMemoryUsage(memoryUsage); 134 } 135 136 @Override 137 public int getMessageCount(String clientId, String subscriberName) throws IOException { 138 return ((TopicMessageStore)delegate).getMessageCount(clientId, subscriberName); 139 } 140 141 @Override 142 public int getMessageCount() throws IOException { 143 return delegate.getMessageCount(); 144 } 145 146 @Override 147 public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { 148 delegate.recoverNextMessages(maxReturned, listener); 149 } 150 151 @Override 152 public void dispose(ConnectionContext context) { 153 delegate.dispose(context); 154 } 155 156 @Override 157 public void resetBatching() { 158 delegate.resetBatching(); 159 } 160 161 @Override 162 public void setBatch(MessageId messageId) throws Exception { 163 delegate.setBatch(messageId); 164 } 165 166 @Override 167 public boolean isEmpty() throws Exception { 168 return delegate.isEmpty(); 169 } 170 171 @Override 172 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { 173 return delegate.asyncAddTopicMessage(context, message); 174 } 175 176 @Override 177 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { 178 return delegate.asyncAddTopicMessage(context,message, canOptimizeHint); 179 } 180 181 @Override 182 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { 183 return delegate.asyncAddQueueMessage(context, message); 184 } 185 186 @Override 187 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { 188 return delegate.asyncAddQueueMessage(context,message, canOptimizeHint); 189 } 190 191 @Override 192 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 193 delegate.removeAsyncMessage(context, ack); 194 } 195 196 @Override 197 public void setPrioritizedMessages(boolean prioritizedMessages) { 198 delegate.setPrioritizedMessages(prioritizedMessages); 199 } 200 201 @Override 202 public boolean isPrioritizedMessages() { 203 return delegate.isPrioritizedMessages(); 204 } 205 206 public void updateMessage(Message message) throws IOException { 207 delegate.updateMessage(message); 208 } 209 210 @Override 211 public void registerIndexListener(IndexListener indexListener) { 212 delegate.registerIndexListener(indexListener); 213 } 214}