001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import org.apache.activemq.command.ConsumerId; 020import org.apache.activemq.command.Message; 021import org.apache.activemq.command.MessageId; 022 023/** 024 * Keeps track of a message that is flowing through the Broker. This object may 025 * hold a hard reference to the message or only hold the id of the message if 026 * the message has been persisted on in a MessageStore. 027 * 028 * 029 */ 030public class IndirectMessageReference implements QueueMessageReference { 031 032 /** The subscription that has locked the message */ 033 private LockOwner lockOwner; 034 /** Has the message been dropped? */ 035 private boolean dropped; 036 /** Has the message been acked? */ 037 private boolean acked; 038 /** Direct reference to the message */ 039 private final Message message; 040 private final MessageId messageId; 041 042 /** 043 * @param message 044 */ 045 public IndirectMessageReference(final Message message) { 046 this.message = message; 047 this.messageId = message.getMessageId().copy(); 048 message.getMessageId(); 049 message.getGroupID(); 050 message.getGroupSequence(); 051 } 052 053 public Message getMessageHardRef() { 054 return message; 055 } 056 057 public int getReferenceCount() { 058 return message.getReferenceCount(); 059 } 060 061 public int incrementReferenceCount() { 062 return message.incrementReferenceCount(); 063 } 064 065 public int decrementReferenceCount() { 066 return message.decrementReferenceCount(); 067 } 068 069 public Message getMessage() { 070 return message; 071 } 072 073 public String toString() { 074 return "Message " + message.getMessageId() + " dropped=" + dropped + " acked=" + acked + " locked=" + (lockOwner != null); 075 } 076 077 public void incrementRedeliveryCounter() { 078 message.incrementRedeliveryCounter(); 079 } 080 081 public synchronized boolean isDropped() { 082 return dropped; 083 } 084 085 public synchronized void drop() { 086 dropped = true; 087 lockOwner = null; 088 message.decrementReferenceCount(); 089 } 090 091 public boolean lock(LockOwner subscription) { 092 synchronized (this) { 093 if (dropped || lockOwner != null) { 094 return false; 095 } 096 lockOwner = subscription; 097 return true; 098 } 099 } 100 101 public synchronized boolean unlock() { 102 boolean result = lockOwner != null; 103 lockOwner = null; 104 return result; 105 } 106 107 public synchronized LockOwner getLockOwner() { 108 return lockOwner; 109 } 110 111 public int getRedeliveryCounter() { 112 return message.getRedeliveryCounter(); 113 } 114 115 public MessageId getMessageId() { 116 return messageId; 117 } 118 119 public Message.MessageDestination getRegionDestination() { 120 return message.getRegionDestination(); 121 } 122 123 public boolean isPersistent() { 124 return message.isPersistent(); 125 } 126 127 public synchronized boolean isLocked() { 128 return lockOwner != null; 129 } 130 131 public synchronized boolean isAcked() { 132 return acked; 133 } 134 135 public synchronized void setAcked(boolean b) { 136 acked = b; 137 } 138 139 public String getGroupID() { 140 return message.getGroupID(); 141 } 142 143 public int getGroupSequence() { 144 return message.getGroupSequence(); 145 } 146 147 public ConsumerId getTargetConsumerId() { 148 return message.getTargetConsumerId(); 149 } 150 151 public long getExpiration() { 152 return message.getExpiration(); 153 } 154 155 public boolean isExpired() { 156 return message.isExpired(); 157 } 158 159 public synchronized int getSize() { 160 return message.getSize(); 161 } 162 163 public boolean isAdvisory() { 164 return message.isAdvisory(); 165 } 166 167 @Override 168 public boolean canProcessAsExpired() { 169 return message.canProcessAsExpired(); 170 } 171}