001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import org.apache.activemq.command.ConsumerId;
020import org.apache.activemq.command.Message;
021import org.apache.activemq.command.MessageId;
022
023/**
024 * Keeps track of a message that is flowing through the Broker. This object may
025 * hold a hard reference to the message or only hold the id of the message if
026 * the message has been persisted on in a MessageStore.
027 *
028 *
029 */
030public class IndirectMessageReference implements QueueMessageReference {
031
032    /** The subscription that has locked the message */
033    private LockOwner lockOwner;
034    /** Has the message been dropped? */
035    private boolean dropped;
036    /** Has the message been acked? */
037    private boolean acked;
038    /** Direct reference to the message */
039    private final Message message;
040    private final MessageId messageId;
041
042    /**
043     * @param message
044     */
045    public IndirectMessageReference(final Message message) {
046        this.message = message;
047        this.messageId = message.getMessageId().copy();
048        message.getMessageId();
049        message.getGroupID();
050        message.getGroupSequence();
051    }
052
053    @Override
054    public Message getMessageHardRef() {
055        return message;
056    }
057
058    @Override
059    public int getReferenceCount() {
060        return message.getReferenceCount();
061    }
062
063    @Override
064    public int incrementReferenceCount() {
065        return message.incrementReferenceCount();
066    }
067
068    @Override
069    public int decrementReferenceCount() {
070        return message.decrementReferenceCount();
071    }
072
073    @Override
074    public Message getMessage() {
075        return message;
076    }
077
078    @Override
079    public String toString() {
080        return "Message " + message.getMessageId() + " dropped=" + dropped + " acked=" + acked + " locked=" + (lockOwner != null);
081    }
082
083    @Override
084    public void incrementRedeliveryCounter() {
085        message.incrementRedeliveryCounter();
086    }
087
088    @Override
089    public synchronized boolean isDropped() {
090        return dropped;
091    }
092
093    @Override
094    public synchronized void drop() {
095        dropped = true;
096        lockOwner = null;
097        message.decrementReferenceCount();
098    }
099
100    /**
101     * Check if the message has already been dropped before
102     * dropping. Return true if dropped, else false.
103     * This method exists so that this can be done atomically
104     * under the intrinisic lock
105     */
106    @Override
107    public synchronized boolean dropIfLive() {
108        if (isDropped()) {
109            return false;
110        } else {
111            drop();
112            return true;
113        }
114    }
115
116    @Override
117    public boolean lock(LockOwner subscription) {
118        synchronized (this) {
119            if (dropped || lockOwner != null) {
120                return false;
121            }
122            lockOwner = subscription;
123            return true;
124        }
125    }
126
127    @Override
128    public synchronized boolean unlock() {
129        boolean result = lockOwner != null;
130        lockOwner = null;
131        return result;
132    }
133
134    @Override
135    public synchronized LockOwner getLockOwner() {
136        return lockOwner;
137    }
138
139    @Override
140    public int getRedeliveryCounter() {
141        return message.getRedeliveryCounter();
142    }
143
144    @Override
145    public MessageId getMessageId() {
146        return messageId;
147    }
148
149    @Override
150    public Message.MessageDestination getRegionDestination() {
151        return message.getRegionDestination();
152    }
153
154    @Override
155    public boolean isPersistent() {
156        return message.isPersistent();
157    }
158
159    public synchronized boolean isLocked() {
160        return lockOwner != null;
161    }
162
163    @Override
164    public synchronized boolean isAcked() {
165        return acked;
166    }
167
168    @Override
169    public synchronized void setAcked(boolean b) {
170        acked = b;
171    }
172
173    @Override
174    public String getGroupID() {
175        return message.getGroupID();
176    }
177
178    @Override
179    public int getGroupSequence() {
180        return message.getGroupSequence();
181    }
182
183    @Override
184    public ConsumerId getTargetConsumerId() {
185        return message.getTargetConsumerId();
186    }
187
188    @Override
189    public long getExpiration() {
190        return message.getExpiration();
191    }
192
193    @Override
194    public boolean isExpired() {
195        return message.isExpired();
196    }
197
198    @Override
199    public synchronized int getSize() {
200       return message.getSize();
201    }
202
203    @Override
204    public boolean isAdvisory() {
205       return message.isAdvisory();
206    }
207
208    @Override
209    public boolean canProcessAsExpired() {
210        return message.canProcessAsExpired();
211    }
212}