001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import org.apache.activemq.command.ConsumerId;
020import org.apache.activemq.command.Message;
021import org.apache.activemq.command.MessageId;
022
023/**
024 * Keeps track of a message that is flowing through the Broker. This object may
025 * hold a hard reference to the message or only hold the id of the message if
026 * the message has been persisted on in a MessageStore.
027 * 
028 * 
029 */
030public class IndirectMessageReference implements QueueMessageReference {
031
032    /** The subscription that has locked the message */
033    private LockOwner lockOwner;
034    /** Has the message been dropped? */
035    private boolean dropped;
036    /** Has the message been acked? */
037    private boolean acked;
038    /** Direct reference to the message */
039    private final Message message;
040    private final MessageId messageId;
041    
042    /**
043     * @param message
044     */
045    public IndirectMessageReference(final Message message) {
046        this.message = message;
047        this.messageId = message.getMessageId().copy();
048        message.getMessageId();
049        message.getGroupID();
050        message.getGroupSequence();
051    }
052
053    public Message getMessageHardRef() {
054        return message;
055    }
056
057    public int getReferenceCount() {
058        return message.getReferenceCount();
059    }
060
061    public int incrementReferenceCount() {
062        return message.incrementReferenceCount();
063    }
064
065    public int decrementReferenceCount() {
066        return message.decrementReferenceCount();
067    }
068
069    public Message getMessage() {
070        return message;
071    }
072
073    public String toString() {
074        return "Message " + message.getMessageId() + " dropped=" + dropped + " acked=" + acked + " locked=" + (lockOwner != null);
075    }
076
077    public void incrementRedeliveryCounter() {
078        message.incrementRedeliveryCounter();
079    }
080
081    public synchronized boolean isDropped() {
082        return dropped;
083    }
084
085    public synchronized void drop() {
086        dropped = true;
087        lockOwner = null;
088        message.decrementReferenceCount();
089    }
090
091    public boolean lock(LockOwner subscription) {
092        synchronized (this) {
093            if (dropped || lockOwner != null) {
094                return false;
095            }
096            lockOwner = subscription;
097            return true;
098        }
099    }
100
101    public synchronized boolean unlock() {
102        boolean result = lockOwner != null;
103        lockOwner = null;
104        return result;
105    }
106
107    public synchronized LockOwner getLockOwner() {
108        return lockOwner;
109    }
110
111    public int getRedeliveryCounter() {
112        return message.getRedeliveryCounter();
113    }
114
115    public MessageId getMessageId() {
116        return messageId;
117    }
118
119    public Message.MessageDestination getRegionDestination() {
120        return message.getRegionDestination();
121    }
122
123    public boolean isPersistent() {
124        return message.isPersistent();
125    }
126
127    public synchronized boolean isLocked() {
128        return lockOwner != null;
129    }
130
131    public synchronized boolean isAcked() {
132        return acked;
133    }
134
135    public synchronized void setAcked(boolean b) {
136        acked = b;
137    }
138
139    public String getGroupID() {
140        return message.getGroupID();
141    }
142
143    public int getGroupSequence() {
144        return message.getGroupSequence();
145    }
146
147    public ConsumerId getTargetConsumerId() {
148        return message.getTargetConsumerId();
149    }
150
151    public long getExpiration() {
152        return message.getExpiration();
153    }
154
155    public boolean isExpired() {
156        return message.isExpired();
157    }
158
159    public synchronized int getSize() {
160       return message.getSize();
161    }
162
163    public boolean isAdvisory() {
164       return message.isAdvisory();
165    }
166
167    @Override
168    public boolean canProcessAsExpired() {
169        return message.canProcessAsExpired();
170    }
171}