001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import java.io.DataInputStream;
020import java.io.DataOutputStream;
021import java.io.IOException;
022import java.io.OutputStream;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.Map;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.zip.DeflaterOutputStream;
028
029import javax.jms.JMSException;
030
031import org.apache.activemq.ActiveMQConnection;
032import org.apache.activemq.advisory.AdvisorySupport;
033import org.apache.activemq.broker.region.MessageReference;
034import org.apache.activemq.usage.MemoryUsage;
035import org.apache.activemq.util.ByteArrayInputStream;
036import org.apache.activemq.util.ByteArrayOutputStream;
037import org.apache.activemq.util.ByteSequence;
038import org.apache.activemq.util.MarshallingSupport;
039import org.apache.activemq.wireformat.WireFormat;
040import org.fusesource.hawtbuf.UTF8Buffer;
041
042/**
043 * Represents an ActiveMQ message
044 *
045 * @openwire:marshaller
046 *
047 */
048public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
049    public static final String ORIGINAL_EXPIRATION = "originalExpiration";
050
051    /**
052     * The default minimum amount of memory a message is assumed to use
053     */
054    public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
055
056    protected MessageId messageId;
057    protected ActiveMQDestination originalDestination;
058    protected TransactionId originalTransactionId;
059
060    protected ProducerId producerId;
061    protected ActiveMQDestination destination;
062    protected TransactionId transactionId;
063
064    protected long expiration;
065    protected long timestamp;
066    protected long arrival;
067    protected long brokerInTime;
068    protected long brokerOutTime;
069    protected String correlationId;
070    protected ActiveMQDestination replyTo;
071    protected boolean persistent;
072    protected String type;
073    protected byte priority;
074    protected String groupID;
075    protected int groupSequence;
076    protected ConsumerId targetConsumerId;
077    protected boolean compressed;
078    protected String userID;
079
080    protected ByteSequence content;
081    protected ByteSequence marshalledProperties;
082    protected DataStructure dataStructure;
083    protected int redeliveryCounter;
084
085    protected int size;
086    protected Map<String, Object> properties;
087    protected boolean readOnlyProperties;
088    protected boolean readOnlyBody;
089    protected transient boolean recievedByDFBridge;
090    protected boolean droppable;
091    protected boolean jmsXGroupFirstForConsumer;
092
093    private transient short referenceCount;
094    private transient ActiveMQConnection connection;
095    transient MessageDestination regionDestination;
096    transient MemoryUsage memoryUsage;
097    transient AtomicBoolean processAsExpired = new AtomicBoolean(false);
098
099    private BrokerId[] brokerPath;
100    private BrokerId[] cluster;
101
102    public static interface MessageDestination {
103        int getMinimumMessageSize();
104        MemoryUsage getMemoryUsage();
105    }
106
107    public abstract Message copy();
108    public abstract void clearBody() throws JMSException;
109    public abstract void storeContent();
110    public abstract void storeContentAndClear();
111
112    // useful to reduce the memory footprint of a persisted message
113    public void clearMarshalledState() throws JMSException {
114        properties = null;
115    }
116
117    protected void copy(Message copy) {
118        super.copy(copy);
119        copy.producerId = producerId;
120        copy.transactionId = transactionId;
121        copy.destination = destination;
122        copy.messageId = messageId != null ? messageId.copy() : null;
123        copy.originalDestination = originalDestination;
124        copy.originalTransactionId = originalTransactionId;
125        copy.expiration = expiration;
126        copy.timestamp = timestamp;
127        copy.correlationId = correlationId;
128        copy.replyTo = replyTo;
129        copy.persistent = persistent;
130        copy.redeliveryCounter = redeliveryCounter;
131        copy.type = type;
132        copy.priority = priority;
133        copy.size = size;
134        copy.groupID = groupID;
135        copy.userID = userID;
136        copy.groupSequence = groupSequence;
137
138        if (properties != null) {
139            copy.properties = new HashMap<String, Object>(properties);
140
141            // The new message hasn't expired, so remove this feild.
142            copy.properties.remove(ORIGINAL_EXPIRATION);
143        } else {
144            copy.properties = properties;
145        }
146
147        copy.content = copyByteSequence(content);
148        copy.marshalledProperties = copyByteSequence(marshalledProperties);
149        copy.dataStructure = dataStructure;
150        copy.readOnlyProperties = readOnlyProperties;
151        copy.readOnlyBody = readOnlyBody;
152        copy.compressed = compressed;
153        copy.recievedByDFBridge = recievedByDFBridge;
154
155        copy.arrival = arrival;
156        copy.connection = connection;
157        copy.regionDestination = regionDestination;
158        copy.brokerInTime = brokerInTime;
159        copy.brokerOutTime = brokerOutTime;
160        copy.memoryUsage=this.memoryUsage;
161        copy.brokerPath = brokerPath;
162        copy.jmsXGroupFirstForConsumer = jmsXGroupFirstForConsumer;
163
164        // lets not copy the following fields
165        // copy.targetConsumerId = targetConsumerId;
166        // copy.referenceCount = referenceCount;
167    }
168
169    private ByteSequence copyByteSequence(ByteSequence content) {
170        if (content != null) {
171            return new ByteSequence(content.getData(), content.getOffset(), content.getLength());
172        }
173        return null;
174    }
175
176    public Object getProperty(String name) throws IOException {
177        if (properties == null) {
178            if (marshalledProperties == null) {
179                return null;
180            }
181            properties = unmarsallProperties(marshalledProperties);
182        }
183        Object result = properties.get(name);
184        if (result instanceof UTF8Buffer) {
185            result = result.toString();
186        }
187
188        return result;
189    }
190
191    @SuppressWarnings("unchecked")
192    public Map<String, Object> getProperties() throws IOException {
193        if (properties == null) {
194            if (marshalledProperties == null) {
195                return Collections.EMPTY_MAP;
196            }
197            properties = unmarsallProperties(marshalledProperties);
198        }
199        return Collections.unmodifiableMap(properties);
200    }
201
202    public void clearProperties() {
203        marshalledProperties = null;
204        properties = null;
205    }
206
207    public void setProperty(String name, Object value) throws IOException {
208        lazyCreateProperties();
209        properties.put(name, value);
210    }
211
212    public void removeProperty(String name) throws IOException {
213        lazyCreateProperties();
214        properties.remove(name);
215    }
216
217    protected void lazyCreateProperties() throws IOException {
218        if (properties == null) {
219            if (marshalledProperties == null) {
220                properties = new HashMap<String, Object>();
221            } else {
222                properties = unmarsallProperties(marshalledProperties);
223                marshalledProperties = null;
224            }
225        } else {
226            marshalledProperties = null;
227        }
228    }
229
230    private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
231        return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)));
232    }
233
234    @Override
235        public void beforeMarshall(WireFormat wireFormat) throws IOException {
236        // Need to marshal the properties.
237        if (marshalledProperties == null && properties != null) {
238            ByteArrayOutputStream baos = new ByteArrayOutputStream();
239            DataOutputStream os = new DataOutputStream(baos);
240            MarshallingSupport.marshalPrimitiveMap(properties, os);
241            os.close();
242            marshalledProperties = baos.toByteSequence();
243        }
244    }
245
246    @Override
247        public void afterMarshall(WireFormat wireFormat) throws IOException {
248    }
249
250    @Override
251        public void beforeUnmarshall(WireFormat wireFormat) throws IOException {
252    }
253
254    @Override
255        public void afterUnmarshall(WireFormat wireFormat) throws IOException {
256    }
257
258    // /////////////////////////////////////////////////////////////////
259    //
260    // Simple Field accessors
261    //
262    // /////////////////////////////////////////////////////////////////
263
264    /**
265     * @openwire:property version=1 cache=true
266     */
267    public ProducerId getProducerId() {
268        return producerId;
269    }
270
271    public void setProducerId(ProducerId producerId) {
272        this.producerId = producerId;
273    }
274
275    /**
276     * @openwire:property version=1 cache=true
277     */
278    public ActiveMQDestination getDestination() {
279        return destination;
280    }
281
282    public void setDestination(ActiveMQDestination destination) {
283        this.destination = destination;
284    }
285
286    /**
287     * @openwire:property version=1 cache=true
288     */
289    public TransactionId getTransactionId() {
290        return transactionId;
291    }
292
293    public void setTransactionId(TransactionId transactionId) {
294        this.transactionId = transactionId;
295    }
296
297    public boolean isInTransaction() {
298        return transactionId != null;
299    }
300
301    /**
302     * @openwire:property version=1 cache=true
303     */
304    public ActiveMQDestination getOriginalDestination() {
305        return originalDestination;
306    }
307
308    public void setOriginalDestination(ActiveMQDestination destination) {
309        this.originalDestination = destination;
310    }
311
312    /**
313     * @openwire:property version=1
314     */
315    @Override
316        public MessageId getMessageId() {
317        return messageId;
318    }
319
320    public void setMessageId(MessageId messageId) {
321        this.messageId = messageId;
322    }
323
324    /**
325     * @openwire:property version=1 cache=true
326     */
327    public TransactionId getOriginalTransactionId() {
328        return originalTransactionId;
329    }
330
331    public void setOriginalTransactionId(TransactionId transactionId) {
332        this.originalTransactionId = transactionId;
333    }
334
335    /**
336     * @openwire:property version=1
337     */
338    @Override
339        public String getGroupID() {
340        return groupID;
341    }
342
343    public void setGroupID(String groupID) {
344        this.groupID = groupID;
345    }
346
347    /**
348     * @openwire:property version=1
349     */
350    @Override
351        public int getGroupSequence() {
352        return groupSequence;
353    }
354
355    public void setGroupSequence(int groupSequence) {
356        this.groupSequence = groupSequence;
357    }
358
359    /**
360     * @openwire:property version=1
361     */
362    public String getCorrelationId() {
363        return correlationId;
364    }
365
366    public void setCorrelationId(String correlationId) {
367        this.correlationId = correlationId;
368    }
369
370    /**
371     * @openwire:property version=1
372     */
373    @Override
374        public boolean isPersistent() {
375        return persistent;
376    }
377
378    public void setPersistent(boolean deliveryMode) {
379        this.persistent = deliveryMode;
380    }
381
382    /**
383     * @openwire:property version=1
384     */
385    @Override
386        public long getExpiration() {
387        return expiration;
388    }
389
390    public void setExpiration(long expiration) {
391        this.expiration = expiration;
392    }
393
394    /**
395     * @openwire:property version=1
396     */
397    public byte getPriority() {
398        return priority;
399    }
400
401    public void setPriority(byte priority) {
402        if (priority < 0) {
403            this.priority = 0;
404        } else if (priority > 9) {
405            this.priority = 9;
406        } else {
407            this.priority = priority;
408        }
409    }
410
411    /**
412     * @openwire:property version=1
413     */
414    public ActiveMQDestination getReplyTo() {
415        return replyTo;
416    }
417
418    public void setReplyTo(ActiveMQDestination replyTo) {
419        this.replyTo = replyTo;
420    }
421
422    /**
423     * @openwire:property version=1
424     */
425    public long getTimestamp() {
426        return timestamp;
427    }
428
429    public void setTimestamp(long timestamp) {
430        this.timestamp = timestamp;
431    }
432
433    /**
434     * @openwire:property version=1
435     */
436    public String getType() {
437        return type;
438    }
439
440    public void setType(String type) {
441        this.type = type;
442    }
443
444    /**
445     * @openwire:property version=1
446     */
447    public ByteSequence getContent() {
448        return content;
449    }
450
451    public void setContent(ByteSequence content) {
452        this.content = content;
453    }
454
455    /**
456     * @openwire:property version=1
457     */
458    public ByteSequence getMarshalledProperties() {
459        return marshalledProperties;
460    }
461
462    public void setMarshalledProperties(ByteSequence marshalledProperties) {
463        this.marshalledProperties = marshalledProperties;
464    }
465
466    /**
467     * @openwire:property version=1
468     */
469    public DataStructure getDataStructure() {
470        return dataStructure;
471    }
472
473    public void setDataStructure(DataStructure data) {
474        this.dataStructure = data;
475    }
476
477    /**
478     * Can be used to route the message to a specific consumer. Should be null
479     * to allow the broker use normal JMS routing semantics. If the target
480     * consumer id is an active consumer on the broker, the message is dropped.
481     * Used by the AdvisoryBroker to replay advisory messages to a specific
482     * consumer.
483     *
484     * @openwire:property version=1 cache=true
485     */
486    @Override
487        public ConsumerId getTargetConsumerId() {
488        return targetConsumerId;
489    }
490
491    public void setTargetConsumerId(ConsumerId targetConsumerId) {
492        this.targetConsumerId = targetConsumerId;
493    }
494
495    @Override
496        public boolean isExpired() {
497        long expireTime = getExpiration();
498        return expireTime > 0 && System.currentTimeMillis() > expireTime;
499    }
500
501    @Override
502        public boolean isAdvisory() {
503        return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
504    }
505
506    /**
507     * @openwire:property version=1
508     */
509    public boolean isCompressed() {
510        return compressed;
511    }
512
513    public void setCompressed(boolean compressed) {
514        this.compressed = compressed;
515    }
516
517    public boolean isRedelivered() {
518        return redeliveryCounter > 0;
519    }
520
521    public void setRedelivered(boolean redelivered) {
522        if (redelivered) {
523            if (!isRedelivered()) {
524                setRedeliveryCounter(1);
525            }
526        } else {
527            if (isRedelivered()) {
528                setRedeliveryCounter(0);
529            }
530        }
531    }
532
533    @Override
534        public void incrementRedeliveryCounter() {
535        redeliveryCounter++;
536    }
537
538    /**
539     * @openwire:property version=1
540     */
541    @Override
542        public int getRedeliveryCounter() {
543        return redeliveryCounter;
544    }
545
546    public void setRedeliveryCounter(int deliveryCounter) {
547        this.redeliveryCounter = deliveryCounter;
548    }
549
550    /**
551     * The route of brokers the command has moved through.
552     *
553     * @openwire:property version=1 cache=true
554     */
555    public BrokerId[] getBrokerPath() {
556        return brokerPath;
557    }
558
559    public void setBrokerPath(BrokerId[] brokerPath) {
560        this.brokerPath = brokerPath;
561    }
562
563    public boolean isReadOnlyProperties() {
564        return readOnlyProperties;
565    }
566
567    public void setReadOnlyProperties(boolean readOnlyProperties) {
568        this.readOnlyProperties = readOnlyProperties;
569    }
570
571    public boolean isReadOnlyBody() {
572        return readOnlyBody;
573    }
574
575    public void setReadOnlyBody(boolean readOnlyBody) {
576        this.readOnlyBody = readOnlyBody;
577    }
578
579    public ActiveMQConnection getConnection() {
580        return this.connection;
581    }
582
583    public void setConnection(ActiveMQConnection connection) {
584        this.connection = connection;
585    }
586
587    /**
588     * Used to schedule the arrival time of a message to a broker. The broker
589     * will not dispatch a message to a consumer until it's arrival time has
590     * elapsed.
591     *
592     * @openwire:property version=1
593     */
594    public long getArrival() {
595        return arrival;
596    }
597
598    public void setArrival(long arrival) {
599        this.arrival = arrival;
600    }
601
602    /**
603     * Only set by the broker and defines the userID of the producer connection
604     * who sent this message. This is an optional field, it needs to be enabled
605     * on the broker to have this field populated.
606     *
607     * @openwire:property version=1
608     */
609    public String getUserID() {
610        return userID;
611    }
612
613    public void setUserID(String jmsxUserID) {
614        this.userID = jmsxUserID;
615    }
616
617    @Override
618        public int getReferenceCount() {
619        return referenceCount;
620    }
621
622    @Override
623        public Message getMessageHardRef() {
624        return this;
625    }
626
627    @Override
628        public Message getMessage() {
629        return this;
630    }
631
632    public void setRegionDestination(MessageDestination destination) {
633        this.regionDestination = destination;
634        if(this.memoryUsage==null) {
635            this.memoryUsage=destination.getMemoryUsage();
636        }
637    }
638
639    @Override
640        public MessageDestination getRegionDestination() {
641        return regionDestination;
642    }
643
644    public MemoryUsage getMemoryUsage() {
645        return this.memoryUsage;
646    }
647
648    public void setMemoryUsage(MemoryUsage usage) {
649        this.memoryUsage=usage;
650    }
651
652    @Override
653    public boolean isMarshallAware() {
654        return true;
655    }
656
657    @Override
658        public int incrementReferenceCount() {
659        int rc;
660        int size;
661        synchronized (this) {
662            rc = ++referenceCount;
663            size = getSize();
664        }
665
666        if (rc == 1 && getMemoryUsage() != null) {
667            getMemoryUsage().increaseUsage(size);
668            //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
669
670        }
671
672        //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
673        return rc;
674    }
675
676    @Override
677        public int decrementReferenceCount() {
678        int rc;
679        int size;
680        synchronized (this) {
681            rc = --referenceCount;
682            size = getSize();
683        }
684
685        if (rc == 0 && getMemoryUsage() != null) {
686            getMemoryUsage().decreaseUsage(size);
687            //Thread.dumpStack();
688            //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
689        }
690
691        //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
692
693        return rc;
694    }
695
696    @Override
697        public int getSize() {
698        int minimumMessageSize = getMinimumMessageSize();
699        if (size < minimumMessageSize || size == 0) {
700            size = minimumMessageSize;
701            if (marshalledProperties != null) {
702                size += marshalledProperties.getLength();
703            }
704            if (content != null) {
705                size += content.getLength();
706            }
707        }
708        return size;
709    }
710
711    protected int getMinimumMessageSize() {
712        int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
713        //let destination override
714        MessageDestination dest = regionDestination;
715        if (dest != null) {
716            result=dest.getMinimumMessageSize();
717        }
718        return result;
719    }
720
721    /**
722     * @openwire:property version=1
723     * @return Returns the recievedByDFBridge.
724     */
725    public boolean isRecievedByDFBridge() {
726        return recievedByDFBridge;
727    }
728
729    /**
730     * @param recievedByDFBridge The recievedByDFBridge to set.
731     */
732    public void setRecievedByDFBridge(boolean recievedByDFBridge) {
733        this.recievedByDFBridge = recievedByDFBridge;
734    }
735
736    public void onMessageRolledBack() {
737        incrementRedeliveryCounter();
738    }
739
740    /**
741     * @openwire:property version=2 cache=true
742     */
743    public boolean isDroppable() {
744        return droppable;
745    }
746
747    public void setDroppable(boolean droppable) {
748        this.droppable = droppable;
749    }
750
751    /**
752     * If a message is stored in multiple nodes on a cluster, all the cluster
753     * members will be listed here. Otherwise, it will be null.
754     *
755     * @openwire:property version=3 cache=true
756     */
757    public BrokerId[] getCluster() {
758        return cluster;
759    }
760
761    public void setCluster(BrokerId[] cluster) {
762        this.cluster = cluster;
763    }
764
765    @Override
766    public boolean isMessage() {
767        return true;
768    }
769
770    /**
771     * @openwire:property version=3
772     */
773    public long getBrokerInTime() {
774        return this.brokerInTime;
775    }
776
777    public void setBrokerInTime(long brokerInTime) {
778        this.brokerInTime = brokerInTime;
779    }
780
781    /**
782     * @openwire:property version=3
783     */
784    public long getBrokerOutTime() {
785        return this.brokerOutTime;
786    }
787
788    public void setBrokerOutTime(long brokerOutTime) {
789        this.brokerOutTime = brokerOutTime;
790    }
791
792    @Override
793        public boolean isDropped() {
794        return false;
795    }
796
797    /**
798     * @openwire:property version=10
799     */
800    public boolean isJMSXGroupFirstForConsumer() {
801        return jmsXGroupFirstForConsumer;
802    }
803
804    public void setJMSXGroupFirstForConsumer(boolean val) {
805        jmsXGroupFirstForConsumer = val;
806    }
807
808    public void compress() throws IOException {
809        if (!isCompressed()) {
810            storeContent();
811            if (!isCompressed() && getContent() != null) {
812                doCompress();
813            }
814        }
815    }
816
817    protected void doCompress() throws IOException {
818        compressed = true;
819        ByteSequence bytes = getContent();
820        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
821        OutputStream os = new DeflaterOutputStream(bytesOut);
822        os.write(bytes.data, bytes.offset, bytes.length);
823        os.close();
824        setContent(bytesOut.toByteSequence());
825    }
826
827    @Override
828    public String toString() {
829        return toString(null);
830    }
831
832    @Override
833    public String toString(Map<String, Object>overrideFields) {
834        try {
835            getProperties();
836        } catch (IOException e) {
837        }
838        return super.toString(overrideFields);
839    }
840
841    @Override
842    public boolean canProcessAsExpired() {
843        return processAsExpired.compareAndSet(false, true);
844    }
845}