001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import java.io.DataInputStream; 020import java.io.DataOutputStream; 021import java.io.IOException; 022import java.io.OutputStream; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.Map; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.zip.DeflaterOutputStream; 028 029import javax.jms.JMSException; 030 031import org.apache.activemq.ActiveMQConnection; 032import org.apache.activemq.advisory.AdvisorySupport; 033import org.apache.activemq.broker.region.MessageReference; 034import org.apache.activemq.usage.MemoryUsage; 035import org.apache.activemq.util.ByteArrayInputStream; 036import org.apache.activemq.util.ByteArrayOutputStream; 037import org.apache.activemq.util.ByteSequence; 038import org.apache.activemq.util.MarshallingSupport; 039import org.apache.activemq.wireformat.WireFormat; 040import org.fusesource.hawtbuf.UTF8Buffer; 041 042/** 043 * Represents an ActiveMQ message 044 * 045 * @openwire:marshaller 046 * 047 */ 048public abstract class Message extends BaseCommand implements MarshallAware, MessageReference { 049 public static final String ORIGINAL_EXPIRATION = "originalExpiration"; 050 051 /** 052 * The default minimum amount of memory a message is assumed to use 053 */ 054 public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024; 055 056 protected MessageId messageId; 057 protected ActiveMQDestination originalDestination; 058 protected TransactionId originalTransactionId; 059 060 protected ProducerId producerId; 061 protected ActiveMQDestination destination; 062 protected TransactionId transactionId; 063 064 protected long expiration; 065 protected long timestamp; 066 protected long arrival; 067 protected long brokerInTime; 068 protected long brokerOutTime; 069 protected String correlationId; 070 protected ActiveMQDestination replyTo; 071 protected boolean persistent; 072 protected String type; 073 protected byte priority; 074 protected String groupID; 075 protected int groupSequence; 076 protected ConsumerId targetConsumerId; 077 protected boolean compressed; 078 protected String userID; 079 080 protected ByteSequence content; 081 protected ByteSequence marshalledProperties; 082 protected DataStructure dataStructure; 083 protected int redeliveryCounter; 084 085 protected int size; 086 protected Map<String, Object> properties; 087 protected boolean readOnlyProperties; 088 protected boolean readOnlyBody; 089 protected transient boolean recievedByDFBridge; 090 protected boolean droppable; 091 protected boolean jmsXGroupFirstForConsumer; 092 093 private transient short referenceCount; 094 private transient ActiveMQConnection connection; 095 transient MessageDestination regionDestination; 096 transient MemoryUsage memoryUsage; 097 transient AtomicBoolean processAsExpired = new AtomicBoolean(false); 098 099 private BrokerId[] brokerPath; 100 private BrokerId[] cluster; 101 102 public static interface MessageDestination { 103 int getMinimumMessageSize(); 104 MemoryUsage getMemoryUsage(); 105 } 106 107 public abstract Message copy(); 108 public abstract void clearBody() throws JMSException; 109 public abstract void storeContent(); 110 public abstract void storeContentAndClear(); 111 112 // useful to reduce the memory footprint of a persisted message 113 public void clearMarshalledState() throws JMSException { 114 properties = null; 115 } 116 117 protected void copy(Message copy) { 118 super.copy(copy); 119 copy.producerId = producerId; 120 copy.transactionId = transactionId; 121 copy.destination = destination; 122 copy.messageId = messageId != null ? messageId.copy() : null; 123 copy.originalDestination = originalDestination; 124 copy.originalTransactionId = originalTransactionId; 125 copy.expiration = expiration; 126 copy.timestamp = timestamp; 127 copy.correlationId = correlationId; 128 copy.replyTo = replyTo; 129 copy.persistent = persistent; 130 copy.redeliveryCounter = redeliveryCounter; 131 copy.type = type; 132 copy.priority = priority; 133 copy.size = size; 134 copy.groupID = groupID; 135 copy.userID = userID; 136 copy.groupSequence = groupSequence; 137 138 if (properties != null) { 139 copy.properties = new HashMap<String, Object>(properties); 140 141 // The new message hasn't expired, so remove this feild. 142 copy.properties.remove(ORIGINAL_EXPIRATION); 143 } else { 144 copy.properties = properties; 145 } 146 147 copy.content = copyByteSequence(content); 148 copy.marshalledProperties = copyByteSequence(marshalledProperties); 149 copy.dataStructure = dataStructure; 150 copy.readOnlyProperties = readOnlyProperties; 151 copy.readOnlyBody = readOnlyBody; 152 copy.compressed = compressed; 153 copy.recievedByDFBridge = recievedByDFBridge; 154 155 copy.arrival = arrival; 156 copy.connection = connection; 157 copy.regionDestination = regionDestination; 158 copy.brokerInTime = brokerInTime; 159 copy.brokerOutTime = brokerOutTime; 160 copy.memoryUsage=this.memoryUsage; 161 copy.brokerPath = brokerPath; 162 copy.jmsXGroupFirstForConsumer = jmsXGroupFirstForConsumer; 163 164 // lets not copy the following fields 165 // copy.targetConsumerId = targetConsumerId; 166 // copy.referenceCount = referenceCount; 167 } 168 169 private ByteSequence copyByteSequence(ByteSequence content) { 170 if (content != null) { 171 return new ByteSequence(content.getData(), content.getOffset(), content.getLength()); 172 } 173 return null; 174 } 175 176 public Object getProperty(String name) throws IOException { 177 if (properties == null) { 178 if (marshalledProperties == null) { 179 return null; 180 } 181 properties = unmarsallProperties(marshalledProperties); 182 } 183 Object result = properties.get(name); 184 if (result instanceof UTF8Buffer) { 185 result = result.toString(); 186 } 187 188 return result; 189 } 190 191 @SuppressWarnings("unchecked") 192 public Map<String, Object> getProperties() throws IOException { 193 if (properties == null) { 194 if (marshalledProperties == null) { 195 return Collections.EMPTY_MAP; 196 } 197 properties = unmarsallProperties(marshalledProperties); 198 } 199 return Collections.unmodifiableMap(properties); 200 } 201 202 public void clearProperties() { 203 marshalledProperties = null; 204 properties = null; 205 } 206 207 public void setProperty(String name, Object value) throws IOException { 208 lazyCreateProperties(); 209 properties.put(name, value); 210 } 211 212 public void removeProperty(String name) throws IOException { 213 lazyCreateProperties(); 214 properties.remove(name); 215 } 216 217 protected void lazyCreateProperties() throws IOException { 218 if (properties == null) { 219 if (marshalledProperties == null) { 220 properties = new HashMap<String, Object>(); 221 } else { 222 properties = unmarsallProperties(marshalledProperties); 223 marshalledProperties = null; 224 } 225 } else { 226 marshalledProperties = null; 227 } 228 } 229 230 private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException { 231 return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties))); 232 } 233 234 @Override 235 public void beforeMarshall(WireFormat wireFormat) throws IOException { 236 // Need to marshal the properties. 237 if (marshalledProperties == null && properties != null) { 238 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 239 DataOutputStream os = new DataOutputStream(baos); 240 MarshallingSupport.marshalPrimitiveMap(properties, os); 241 os.close(); 242 marshalledProperties = baos.toByteSequence(); 243 } 244 } 245 246 @Override 247 public void afterMarshall(WireFormat wireFormat) throws IOException { 248 } 249 250 @Override 251 public void beforeUnmarshall(WireFormat wireFormat) throws IOException { 252 } 253 254 @Override 255 public void afterUnmarshall(WireFormat wireFormat) throws IOException { 256 } 257 258 // ///////////////////////////////////////////////////////////////// 259 // 260 // Simple Field accessors 261 // 262 // ///////////////////////////////////////////////////////////////// 263 264 /** 265 * @openwire:property version=1 cache=true 266 */ 267 public ProducerId getProducerId() { 268 return producerId; 269 } 270 271 public void setProducerId(ProducerId producerId) { 272 this.producerId = producerId; 273 } 274 275 /** 276 * @openwire:property version=1 cache=true 277 */ 278 public ActiveMQDestination getDestination() { 279 return destination; 280 } 281 282 public void setDestination(ActiveMQDestination destination) { 283 this.destination = destination; 284 } 285 286 /** 287 * @openwire:property version=1 cache=true 288 */ 289 public TransactionId getTransactionId() { 290 return transactionId; 291 } 292 293 public void setTransactionId(TransactionId transactionId) { 294 this.transactionId = transactionId; 295 } 296 297 public boolean isInTransaction() { 298 return transactionId != null; 299 } 300 301 /** 302 * @openwire:property version=1 cache=true 303 */ 304 public ActiveMQDestination getOriginalDestination() { 305 return originalDestination; 306 } 307 308 public void setOriginalDestination(ActiveMQDestination destination) { 309 this.originalDestination = destination; 310 } 311 312 /** 313 * @openwire:property version=1 314 */ 315 @Override 316 public MessageId getMessageId() { 317 return messageId; 318 } 319 320 public void setMessageId(MessageId messageId) { 321 this.messageId = messageId; 322 } 323 324 /** 325 * @openwire:property version=1 cache=true 326 */ 327 public TransactionId getOriginalTransactionId() { 328 return originalTransactionId; 329 } 330 331 public void setOriginalTransactionId(TransactionId transactionId) { 332 this.originalTransactionId = transactionId; 333 } 334 335 /** 336 * @openwire:property version=1 337 */ 338 @Override 339 public String getGroupID() { 340 return groupID; 341 } 342 343 public void setGroupID(String groupID) { 344 this.groupID = groupID; 345 } 346 347 /** 348 * @openwire:property version=1 349 */ 350 @Override 351 public int getGroupSequence() { 352 return groupSequence; 353 } 354 355 public void setGroupSequence(int groupSequence) { 356 this.groupSequence = groupSequence; 357 } 358 359 /** 360 * @openwire:property version=1 361 */ 362 public String getCorrelationId() { 363 return correlationId; 364 } 365 366 public void setCorrelationId(String correlationId) { 367 this.correlationId = correlationId; 368 } 369 370 /** 371 * @openwire:property version=1 372 */ 373 @Override 374 public boolean isPersistent() { 375 return persistent; 376 } 377 378 public void setPersistent(boolean deliveryMode) { 379 this.persistent = deliveryMode; 380 } 381 382 /** 383 * @openwire:property version=1 384 */ 385 @Override 386 public long getExpiration() { 387 return expiration; 388 } 389 390 public void setExpiration(long expiration) { 391 this.expiration = expiration; 392 } 393 394 /** 395 * @openwire:property version=1 396 */ 397 public byte getPriority() { 398 return priority; 399 } 400 401 public void setPriority(byte priority) { 402 if (priority < 0) { 403 this.priority = 0; 404 } else if (priority > 9) { 405 this.priority = 9; 406 } else { 407 this.priority = priority; 408 } 409 } 410 411 /** 412 * @openwire:property version=1 413 */ 414 public ActiveMQDestination getReplyTo() { 415 return replyTo; 416 } 417 418 public void setReplyTo(ActiveMQDestination replyTo) { 419 this.replyTo = replyTo; 420 } 421 422 /** 423 * @openwire:property version=1 424 */ 425 public long getTimestamp() { 426 return timestamp; 427 } 428 429 public void setTimestamp(long timestamp) { 430 this.timestamp = timestamp; 431 } 432 433 /** 434 * @openwire:property version=1 435 */ 436 public String getType() { 437 return type; 438 } 439 440 public void setType(String type) { 441 this.type = type; 442 } 443 444 /** 445 * @openwire:property version=1 446 */ 447 public ByteSequence getContent() { 448 return content; 449 } 450 451 public void setContent(ByteSequence content) { 452 this.content = content; 453 } 454 455 /** 456 * @openwire:property version=1 457 */ 458 public ByteSequence getMarshalledProperties() { 459 return marshalledProperties; 460 } 461 462 public void setMarshalledProperties(ByteSequence marshalledProperties) { 463 this.marshalledProperties = marshalledProperties; 464 } 465 466 /** 467 * @openwire:property version=1 468 */ 469 public DataStructure getDataStructure() { 470 return dataStructure; 471 } 472 473 public void setDataStructure(DataStructure data) { 474 this.dataStructure = data; 475 } 476 477 /** 478 * Can be used to route the message to a specific consumer. Should be null 479 * to allow the broker use normal JMS routing semantics. If the target 480 * consumer id is an active consumer on the broker, the message is dropped. 481 * Used by the AdvisoryBroker to replay advisory messages to a specific 482 * consumer. 483 * 484 * @openwire:property version=1 cache=true 485 */ 486 @Override 487 public ConsumerId getTargetConsumerId() { 488 return targetConsumerId; 489 } 490 491 public void setTargetConsumerId(ConsumerId targetConsumerId) { 492 this.targetConsumerId = targetConsumerId; 493 } 494 495 @Override 496 public boolean isExpired() { 497 long expireTime = getExpiration(); 498 return expireTime > 0 && System.currentTimeMillis() > expireTime; 499 } 500 501 @Override 502 public boolean isAdvisory() { 503 return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 504 } 505 506 /** 507 * @openwire:property version=1 508 */ 509 public boolean isCompressed() { 510 return compressed; 511 } 512 513 public void setCompressed(boolean compressed) { 514 this.compressed = compressed; 515 } 516 517 public boolean isRedelivered() { 518 return redeliveryCounter > 0; 519 } 520 521 public void setRedelivered(boolean redelivered) { 522 if (redelivered) { 523 if (!isRedelivered()) { 524 setRedeliveryCounter(1); 525 } 526 } else { 527 if (isRedelivered()) { 528 setRedeliveryCounter(0); 529 } 530 } 531 } 532 533 @Override 534 public void incrementRedeliveryCounter() { 535 redeliveryCounter++; 536 } 537 538 /** 539 * @openwire:property version=1 540 */ 541 @Override 542 public int getRedeliveryCounter() { 543 return redeliveryCounter; 544 } 545 546 public void setRedeliveryCounter(int deliveryCounter) { 547 this.redeliveryCounter = deliveryCounter; 548 } 549 550 /** 551 * The route of brokers the command has moved through. 552 * 553 * @openwire:property version=1 cache=true 554 */ 555 public BrokerId[] getBrokerPath() { 556 return brokerPath; 557 } 558 559 public void setBrokerPath(BrokerId[] brokerPath) { 560 this.brokerPath = brokerPath; 561 } 562 563 public boolean isReadOnlyProperties() { 564 return readOnlyProperties; 565 } 566 567 public void setReadOnlyProperties(boolean readOnlyProperties) { 568 this.readOnlyProperties = readOnlyProperties; 569 } 570 571 public boolean isReadOnlyBody() { 572 return readOnlyBody; 573 } 574 575 public void setReadOnlyBody(boolean readOnlyBody) { 576 this.readOnlyBody = readOnlyBody; 577 } 578 579 public ActiveMQConnection getConnection() { 580 return this.connection; 581 } 582 583 public void setConnection(ActiveMQConnection connection) { 584 this.connection = connection; 585 } 586 587 /** 588 * Used to schedule the arrival time of a message to a broker. The broker 589 * will not dispatch a message to a consumer until it's arrival time has 590 * elapsed. 591 * 592 * @openwire:property version=1 593 */ 594 public long getArrival() { 595 return arrival; 596 } 597 598 public void setArrival(long arrival) { 599 this.arrival = arrival; 600 } 601 602 /** 603 * Only set by the broker and defines the userID of the producer connection 604 * who sent this message. This is an optional field, it needs to be enabled 605 * on the broker to have this field populated. 606 * 607 * @openwire:property version=1 608 */ 609 public String getUserID() { 610 return userID; 611 } 612 613 public void setUserID(String jmsxUserID) { 614 this.userID = jmsxUserID; 615 } 616 617 @Override 618 public int getReferenceCount() { 619 return referenceCount; 620 } 621 622 @Override 623 public Message getMessageHardRef() { 624 return this; 625 } 626 627 @Override 628 public Message getMessage() { 629 return this; 630 } 631 632 public void setRegionDestination(MessageDestination destination) { 633 this.regionDestination = destination; 634 if(this.memoryUsage==null) { 635 this.memoryUsage=destination.getMemoryUsage(); 636 } 637 } 638 639 @Override 640 public MessageDestination getRegionDestination() { 641 return regionDestination; 642 } 643 644 public MemoryUsage getMemoryUsage() { 645 return this.memoryUsage; 646 } 647 648 public void setMemoryUsage(MemoryUsage usage) { 649 this.memoryUsage=usage; 650 } 651 652 @Override 653 public boolean isMarshallAware() { 654 return true; 655 } 656 657 @Override 658 public int incrementReferenceCount() { 659 int rc; 660 int size; 661 synchronized (this) { 662 rc = ++referenceCount; 663 size = getSize(); 664 } 665 666 if (rc == 1 && getMemoryUsage() != null) { 667 getMemoryUsage().increaseUsage(size); 668 //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage()); 669 670 } 671 672 //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc); 673 return rc; 674 } 675 676 @Override 677 public int decrementReferenceCount() { 678 int rc; 679 int size; 680 synchronized (this) { 681 rc = --referenceCount; 682 size = getSize(); 683 } 684 685 if (rc == 0 && getMemoryUsage() != null) { 686 getMemoryUsage().decreaseUsage(size); 687 //Thread.dumpStack(); 688 //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage()); 689 } 690 691 //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc); 692 693 return rc; 694 } 695 696 @Override 697 public int getSize() { 698 int minimumMessageSize = getMinimumMessageSize(); 699 if (size < minimumMessageSize || size == 0) { 700 size = minimumMessageSize; 701 if (marshalledProperties != null) { 702 size += marshalledProperties.getLength(); 703 } 704 if (content != null) { 705 size += content.getLength(); 706 } 707 } 708 return size; 709 } 710 711 protected int getMinimumMessageSize() { 712 int result = DEFAULT_MINIMUM_MESSAGE_SIZE; 713 //let destination override 714 MessageDestination dest = regionDestination; 715 if (dest != null) { 716 result=dest.getMinimumMessageSize(); 717 } 718 return result; 719 } 720 721 /** 722 * @openwire:property version=1 723 * @return Returns the recievedByDFBridge. 724 */ 725 public boolean isRecievedByDFBridge() { 726 return recievedByDFBridge; 727 } 728 729 /** 730 * @param recievedByDFBridge The recievedByDFBridge to set. 731 */ 732 public void setRecievedByDFBridge(boolean recievedByDFBridge) { 733 this.recievedByDFBridge = recievedByDFBridge; 734 } 735 736 public void onMessageRolledBack() { 737 incrementRedeliveryCounter(); 738 } 739 740 /** 741 * @openwire:property version=2 cache=true 742 */ 743 public boolean isDroppable() { 744 return droppable; 745 } 746 747 public void setDroppable(boolean droppable) { 748 this.droppable = droppable; 749 } 750 751 /** 752 * If a message is stored in multiple nodes on a cluster, all the cluster 753 * members will be listed here. Otherwise, it will be null. 754 * 755 * @openwire:property version=3 cache=true 756 */ 757 public BrokerId[] getCluster() { 758 return cluster; 759 } 760 761 public void setCluster(BrokerId[] cluster) { 762 this.cluster = cluster; 763 } 764 765 @Override 766 public boolean isMessage() { 767 return true; 768 } 769 770 /** 771 * @openwire:property version=3 772 */ 773 public long getBrokerInTime() { 774 return this.brokerInTime; 775 } 776 777 public void setBrokerInTime(long brokerInTime) { 778 this.brokerInTime = brokerInTime; 779 } 780 781 /** 782 * @openwire:property version=3 783 */ 784 public long getBrokerOutTime() { 785 return this.brokerOutTime; 786 } 787 788 public void setBrokerOutTime(long brokerOutTime) { 789 this.brokerOutTime = brokerOutTime; 790 } 791 792 @Override 793 public boolean isDropped() { 794 return false; 795 } 796 797 /** 798 * @openwire:property version=10 799 */ 800 public boolean isJMSXGroupFirstForConsumer() { 801 return jmsXGroupFirstForConsumer; 802 } 803 804 public void setJMSXGroupFirstForConsumer(boolean val) { 805 jmsXGroupFirstForConsumer = val; 806 } 807 808 public void compress() throws IOException { 809 if (!isCompressed()) { 810 storeContent(); 811 if (!isCompressed() && getContent() != null) { 812 doCompress(); 813 } 814 } 815 } 816 817 protected void doCompress() throws IOException { 818 compressed = true; 819 ByteSequence bytes = getContent(); 820 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 821 OutputStream os = new DeflaterOutputStream(bytesOut); 822 os.write(bytes.data, bytes.offset, bytes.length); 823 os.close(); 824 setContent(bytesOut.toByteSequence()); 825 } 826 827 @Override 828 public String toString() { 829 return toString(null); 830 } 831 832 @Override 833 public String toString(Map<String, Object>overrideFields) { 834 try { 835 getProperties(); 836 } catch (IOException e) { 837 } 838 return super.toString(overrideFields); 839 } 840 841 @Override 842 public boolean canProcessAsExpired() { 843 return processAsExpired.compareAndSet(false, true); 844 } 845}