001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import org.apache.activemq.state.CommandVisitor; 020 021/** 022 * Used to pull messages on demand. 023 * 024 * @openwire:marshaller code="20" 025 * 026 * 027 */ 028public class MessagePull extends BaseCommand { 029 030 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_PULL; 031 032 protected ConsumerId consumerId; 033 protected ActiveMQDestination destination; 034 protected long timeout; 035 private MessageId messageId; 036 private String correlationId; 037 038 private transient int quantity = 1; 039 private transient boolean alwaysSignalDone; 040 private transient boolean tracked = false; 041 042 @Override 043 public byte getDataStructureType() { 044 return DATA_STRUCTURE_TYPE; 045 } 046 047 @Override 048 public Response visit(CommandVisitor visitor) throws Exception { 049 return visitor.processMessagePull(this); 050 } 051 052 /** 053 * Configures a message pull from the consumer information 054 */ 055 public void configure(ConsumerInfo info) { 056 setConsumerId(info.getConsumerId()); 057 setDestination(info.getDestination()); 058 } 059 060 /** 061 * @openwire:property version=1 cache=true 062 */ 063 public ConsumerId getConsumerId() { 064 return consumerId; 065 } 066 067 public void setConsumerId(ConsumerId consumerId) { 068 this.consumerId = consumerId; 069 } 070 071 /** 072 * @openwire:property version=1 cache=true 073 */ 074 public ActiveMQDestination getDestination() { 075 return destination; 076 } 077 078 public void setDestination(ActiveMQDestination destination) { 079 this.destination = destination; 080 } 081 082 /** 083 * @openwire:property version=1 084 */ 085 public long getTimeout() { 086 return timeout; 087 } 088 089 public void setTimeout(long timeout) { 090 this.timeout = timeout; 091 } 092 093 /** 094 * An optional correlation ID which could be used by a broker to decide which messages are pulled 095 * on demand from a queue for a consumer 096 * 097 * @openwire:property version=3 098 */ 099 public String getCorrelationId() { 100 return correlationId; 101 } 102 103 public void setCorrelationId(String correlationId) { 104 this.correlationId = correlationId; 105 } 106 107 108 /** 109 * An optional message ID which could be used by a broker to decide which messages are pulled 110 * on demand from a queue for a consumer 111 * 112 * @openwire:property version=3 113 */ 114 public MessageId getMessageId() { 115 return messageId; 116 } 117 118 public void setMessageId(MessageId messageId) { 119 this.messageId = messageId; 120 } 121 122 public void setTracked(boolean tracked) { 123 this.tracked = tracked; 124 } 125 126 public boolean isTracked() { 127 return this.tracked; 128 } 129 130 public int getQuantity() { 131 return quantity; 132 } 133 134 public void setQuantity(int quantity) { 135 this.quantity = quantity; 136 } 137 138 public boolean isAlwaysSignalDone() { 139 return alwaysSignalDone; 140 } 141 142 public void setAlwaysSignalDone(boolean alwaysSignalDone) { 143 this.alwaysSignalDone = alwaysSignalDone; 144 } 145}