001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import java.io.DataInputStream; 020import java.io.DataOutputStream; 021import java.io.EOFException; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.util.Arrays; 026import java.util.zip.Deflater; 027import java.util.zip.Inflater; 028 029import javax.jms.BytesMessage; 030import javax.jms.JMSException; 031import javax.jms.MessageEOFException; 032import javax.jms.MessageFormatException; 033import javax.jms.MessageNotReadableException; 034import javax.jms.MessageNotWriteableException; 035 036import org.apache.activemq.ActiveMQConnection; 037import org.apache.activemq.util.ByteArrayInputStream; 038import org.apache.activemq.util.ByteArrayOutputStream; 039import org.apache.activemq.util.ByteSequence; 040import org.apache.activemq.util.ByteSequenceData; 041import org.apache.activemq.util.JMSExceptionSupport; 042 043/** 044 * A <CODE>BytesMessage</CODE> object is used to send a message containing a 045 * stream of uninterpreted bytes. It inherits from the <CODE>Message</CODE> 046 * interface and adds a bytes message body. The receiver of the message supplies 047 * the interpretation of the bytes. 048 * <P> 049 * The <CODE>BytesMessage</CODE> methods are based largely on those found in 050 * <CODE>java.io.DataInputStream</CODE> and 051 * <CODE>java.io.DataOutputStream</CODE>. 052 * <P> 053 * This message type is for client encoding of existing message formats. If 054 * possible, one of the other self-defining message types should be used 055 * instead. 056 * <P> 057 * Although the JMS API allows the use of message properties with byte messages, 058 * they are typically not used, since the inclusion of properties may affect the 059 * format. 060 * <P> 061 * The primitive types can be written explicitly using methods for each type. 062 * They may also be written generically as objects. For instance, a call to 063 * <CODE>BytesMessage.writeInt(6)</CODE> is equivalent to 064 * <CODE> BytesMessage.writeObject(new Integer(6))</CODE>. Both forms are 065 * provided, because the explicit form is convenient for static programming, and 066 * the object form is needed when types are not known at compile time. 067 * <P> 068 * When the message is first created, and when <CODE>clearBody</CODE> is 069 * called, the body of the message is in write-only mode. After the first call 070 * to <CODE>reset</CODE> has been made, the message body is in read-only mode. 071 * After a message has been sent, the client that sent it can retain and modify 072 * it without affecting the message that has been sent. The same message object 073 * can be sent multiple times. When a message has been received, the provider 074 * has called <CODE>reset</CODE> so that the message body is in read-only mode 075 * for the client. 076 * <P> 077 * If <CODE>clearBody</CODE> is called on a message in read-only mode, the 078 * message body is cleared and the message is in write-only mode. 079 * <P> 080 * If a client attempts to read a message in write-only mode, a 081 * <CODE>MessageNotReadableException</CODE> is thrown. 082 * <P> 083 * If a client attempts to write a message in read-only mode, a 084 * <CODE>MessageNotWriteableException</CODE> is thrown. 085 * 086 * @openwire:marshaller code=24 087 * @see javax.jms.Session#createBytesMessage() 088 * @see javax.jms.MapMessage 089 * @see javax.jms.Message 090 * @see javax.jms.ObjectMessage 091 * @see javax.jms.StreamMessage 092 * @see javax.jms.TextMessage 093 */ 094public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessage { 095 096 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_BYTES_MESSAGE; 097 098 protected transient DataOutputStream dataOut; 099 protected transient ByteArrayOutputStream bytesOut; 100 protected transient DataInputStream dataIn; 101 protected transient int length; 102 103 @Override 104 public Message copy() { 105 ActiveMQBytesMessage copy = new ActiveMQBytesMessage(); 106 copy(copy); 107 return copy; 108 } 109 110 private void copy(ActiveMQBytesMessage copy) { 111 storeContent(); 112 super.copy(copy); 113 copy.dataOut = null; 114 copy.bytesOut = null; 115 copy.dataIn = null; 116 } 117 118 @Override 119 public void onSend() throws JMSException { 120 super.onSend(); 121 storeContent(); 122 } 123 124 @Override 125 public void storeContent() { 126 if (dataOut != null) { 127 try { 128 dataOut.close(); 129 ByteSequence bs = bytesOut.toByteSequence(); 130 setContent(bs); 131 132 ActiveMQConnection connection = getConnection(); 133 if (connection != null && connection.isUseCompression()) { 134 doCompress(); 135 } 136 } catch (IOException ioe) { 137 throw new RuntimeException(ioe.getMessage(), ioe); 138 } finally { 139 try { 140 if (bytesOut != null) { 141 bytesOut.close(); 142 bytesOut = null; 143 } 144 if (dataOut != null) { 145 dataOut.close(); 146 dataOut = null; 147 } 148 } catch (IOException ioe) { 149 } 150 } 151 } 152 } 153 154 @Override 155 public byte getDataStructureType() { 156 return DATA_STRUCTURE_TYPE; 157 } 158 159 @Override 160 public String getJMSXMimeType() { 161 return "jms/bytes-message"; 162 } 163 164 /** 165 * Clears out the message body. Clearing a message's body does not clear its 166 * header values or property entries. 167 * <P> 168 * If this message body was read-only, calling this method leaves the 169 * message body in the same state as an empty body in a newly created 170 * message. 171 * 172 * @throws JMSException if the JMS provider fails to clear the message body 173 * due to some internal error. 174 */ 175 @Override 176 public void clearBody() throws JMSException { 177 super.clearBody(); 178 this.dataOut = null; 179 this.dataIn = null; 180 this.bytesOut = null; 181 } 182 183 /** 184 * Gets the number of bytes of the message body when the message is in 185 * read-only mode. The value returned can be used to allocate a byte array. 186 * The value returned is the entire length of the message body, regardless 187 * of where the pointer for reading the message is currently located. 188 * 189 * @return number of bytes in the message 190 * @throws JMSException if the JMS provider fails to read the message due to 191 * some internal error. 192 * @throws MessageNotReadableException if the message is in write-only mode. 193 * @since 1.1 194 */ 195 196 @Override 197 public long getBodyLength() throws JMSException { 198 initializeReading(); 199 return length; 200 } 201 202 /** 203 * Reads a <code>boolean</code> from the bytes message stream. 204 * 205 * @return the <code>boolean</code> value read 206 * @throws JMSException if the JMS provider fails to read the message due to 207 * some internal error. 208 * @throws MessageEOFException if unexpected end of bytes stream has been 209 * reached. 210 * @throws MessageNotReadableException if the message is in write-only mode. 211 */ 212 @Override 213 public boolean readBoolean() throws JMSException { 214 initializeReading(); 215 try { 216 return this.dataIn.readBoolean(); 217 } catch (EOFException e) { 218 throw JMSExceptionSupport.createMessageEOFException(e); 219 } catch (IOException e) { 220 throw JMSExceptionSupport.createMessageFormatException(e); 221 } 222 } 223 224 /** 225 * Reads a signed 8-bit value from the bytes message stream. 226 * 227 * @return the next byte from the bytes message stream as a signed 8-bit 228 * <code>byte</code> 229 * @throws JMSException if the JMS provider fails to read the message due to 230 * some internal error. 231 * @throws MessageEOFException if unexpected end of bytes stream has been 232 * reached. 233 * @throws MessageNotReadableException if the message is in write-only mode. 234 */ 235 @Override 236 public byte readByte() throws JMSException { 237 initializeReading(); 238 try { 239 return this.dataIn.readByte(); 240 } catch (EOFException e) { 241 throw JMSExceptionSupport.createMessageEOFException(e); 242 } catch (IOException e) { 243 throw JMSExceptionSupport.createMessageFormatException(e); 244 } 245 } 246 247 /** 248 * Reads an unsigned 8-bit number from the bytes message stream. 249 * 250 * @return the next byte from the bytes message stream, interpreted as an 251 * unsigned 8-bit number 252 * @throws JMSException if the JMS provider fails to read the message due to 253 * some internal error. 254 * @throws MessageEOFException if unexpected end of bytes stream has been 255 * reached. 256 * @throws MessageNotReadableException if the message is in write-only mode. 257 */ 258 @Override 259 public int readUnsignedByte() throws JMSException { 260 initializeReading(); 261 try { 262 return this.dataIn.readUnsignedByte(); 263 } catch (EOFException e) { 264 throw JMSExceptionSupport.createMessageEOFException(e); 265 } catch (IOException e) { 266 throw JMSExceptionSupport.createMessageFormatException(e); 267 } 268 } 269 270 /** 271 * Reads a signed 16-bit number from the bytes message stream. 272 * 273 * @return the next two bytes from the bytes message stream, interpreted as 274 * a signed 16-bit number 275 * @throws JMSException if the JMS provider fails to read the message due to 276 * some internal error. 277 * @throws MessageEOFException if unexpected end of bytes stream has been 278 * reached. 279 * @throws MessageNotReadableException if the message is in write-only mode. 280 */ 281 @Override 282 public short readShort() throws JMSException { 283 initializeReading(); 284 try { 285 return this.dataIn.readShort(); 286 } catch (EOFException e) { 287 throw JMSExceptionSupport.createMessageEOFException(e); 288 } catch (IOException e) { 289 throw JMSExceptionSupport.createMessageFormatException(e); 290 } 291 } 292 293 /** 294 * Reads an unsigned 16-bit number from the bytes message stream. 295 * 296 * @return the next two bytes from the bytes message stream, interpreted as 297 * an unsigned 16-bit integer 298 * @throws JMSException if the JMS provider fails to read the message due to 299 * some internal error. 300 * @throws MessageEOFException if unexpected end of bytes stream has been 301 * reached. 302 * @throws MessageNotReadableException if the message is in write-only mode. 303 */ 304 @Override 305 public int readUnsignedShort() throws JMSException { 306 initializeReading(); 307 try { 308 return this.dataIn.readUnsignedShort(); 309 } catch (EOFException e) { 310 throw JMSExceptionSupport.createMessageEOFException(e); 311 } catch (IOException e) { 312 throw JMSExceptionSupport.createMessageFormatException(e); 313 } 314 } 315 316 /** 317 * Reads a Unicode character value from the bytes message stream. 318 * 319 * @return the next two bytes from the bytes message stream as a Unicode 320 * character 321 * @throws JMSException if the JMS provider fails to read the message due to 322 * some internal error. 323 * @throws MessageEOFException if unexpected end of bytes stream has been 324 * reached. 325 * @throws MessageNotReadableException if the message is in write-only mode. 326 */ 327 @Override 328 public char readChar() throws JMSException { 329 initializeReading(); 330 try { 331 return this.dataIn.readChar(); 332 } catch (EOFException e) { 333 throw JMSExceptionSupport.createMessageEOFException(e); 334 } catch (IOException e) { 335 throw JMSExceptionSupport.createMessageFormatException(e); 336 } 337 } 338 339 /** 340 * Reads a signed 32-bit integer from the bytes message stream. 341 * 342 * @return the next four bytes from the bytes message stream, interpreted as 343 * an <code>int</code> 344 * @throws JMSException if the JMS provider fails to read the message due to 345 * some internal error. 346 * @throws MessageEOFException if unexpected end of bytes stream has been 347 * reached. 348 * @throws MessageNotReadableException if the message is in write-only mode. 349 */ 350 @Override 351 public int readInt() throws JMSException { 352 initializeReading(); 353 try { 354 return this.dataIn.readInt(); 355 } catch (EOFException e) { 356 throw JMSExceptionSupport.createMessageEOFException(e); 357 } catch (IOException e) { 358 throw JMSExceptionSupport.createMessageFormatException(e); 359 } 360 } 361 362 /** 363 * Reads a signed 64-bit integer from the bytes message stream. 364 * 365 * @return the next eight bytes from the bytes message stream, interpreted 366 * as a <code>long</code> 367 * @throws JMSException if the JMS provider fails to read the message due to 368 * some internal error. 369 * @throws MessageEOFException if unexpected end of bytes stream has been 370 * reached. 371 * @throws MessageNotReadableException if the message is in write-only mode. 372 */ 373 @Override 374 public long readLong() throws JMSException { 375 initializeReading(); 376 try { 377 return this.dataIn.readLong(); 378 } catch (EOFException e) { 379 throw JMSExceptionSupport.createMessageEOFException(e); 380 } catch (IOException e) { 381 throw JMSExceptionSupport.createMessageFormatException(e); 382 } 383 } 384 385 /** 386 * Reads a <code>float</code> from the bytes message stream. 387 * 388 * @return the next four bytes from the bytes message stream, interpreted as 389 * a <code>float</code> 390 * @throws JMSException if the JMS provider fails to read the message due to 391 * some internal error. 392 * @throws MessageEOFException if unexpected end of bytes stream has been 393 * reached. 394 * @throws MessageNotReadableException if the message is in write-only mode. 395 */ 396 @Override 397 public float readFloat() throws JMSException { 398 initializeReading(); 399 try { 400 return this.dataIn.readFloat(); 401 } catch (EOFException e) { 402 throw JMSExceptionSupport.createMessageEOFException(e); 403 } catch (IOException e) { 404 throw JMSExceptionSupport.createMessageFormatException(e); 405 } 406 } 407 408 /** 409 * Reads a <code>double</code> from the bytes message stream. 410 * 411 * @return the next eight bytes from the bytes message stream, interpreted 412 * as a <code>double</code> 413 * @throws JMSException if the JMS provider fails to read the message due to 414 * some internal error. 415 * @throws MessageEOFException if unexpected end of bytes stream has been 416 * reached. 417 * @throws MessageNotReadableException if the message is in write-only mode. 418 */ 419 @Override 420 public double readDouble() throws JMSException { 421 initializeReading(); 422 try { 423 return this.dataIn.readDouble(); 424 } catch (EOFException e) { 425 throw JMSExceptionSupport.createMessageEOFException(e); 426 } catch (IOException e) { 427 throw JMSExceptionSupport.createMessageFormatException(e); 428 } 429 } 430 431 /** 432 * Reads a string that has been encoded using a modified UTF-8 format from 433 * the bytes message stream. 434 * <P> 435 * For more information on the UTF-8 format, see "File System Safe UCS 436 * Transformation Format (FSS_UTF)", X/Open Preliminary Specification, 437 * X/Open Company Ltd., Document Number: P316. This information also appears 438 * in ISO/IEC 10646, Annex P. 439 * 440 * @return a Unicode string from the bytes message stream 441 * @throws JMSException if the JMS provider fails to read the message due to 442 * some internal error. 443 * @throws MessageEOFException if unexpected end of bytes stream has been 444 * reached. 445 * @throws MessageNotReadableException if the message is in write-only mode. 446 */ 447 @Override 448 public String readUTF() throws JMSException { 449 initializeReading(); 450 try { 451 return this.dataIn.readUTF(); 452 } catch (EOFException e) { 453 throw JMSExceptionSupport.createMessageEOFException(e); 454 } catch (IOException e) { 455 throw JMSExceptionSupport.createMessageFormatException(e); 456 } 457 } 458 459 /** 460 * Reads a byte array from the bytes message stream. 461 * <P> 462 * If the length of array <code>value</code> is less than the number of 463 * bytes remaining to be read from the stream, the array should be filled. A 464 * subsequent call reads the next increment, and so on. 465 * <P> 466 * If the number of bytes remaining in the stream is less than the length of 467 * array <code>value</code>, the bytes should be read into the array. The 468 * return value of the total number of bytes read will be less than the 469 * length of the array, indicating that there are no more bytes left to be 470 * read from the stream. The next read of the stream returns -1. 471 * 472 * @param value the buffer into which the data is read 473 * @return the total number of bytes read into the buffer, or -1 if there is 474 * no more data because the end of the stream has been reached 475 * @throws JMSException if the JMS provider fails to read the message due to 476 * some internal error. 477 * @throws MessageNotReadableException if the message is in write-only mode. 478 */ 479 @Override 480 public int readBytes(byte[] value) throws JMSException { 481 return readBytes(value, value.length); 482 } 483 484 /** 485 * Reads a portion of the bytes message stream. 486 * <P> 487 * If the length of array <code>value</code> is less than the number of 488 * bytes remaining to be read from the stream, the array should be filled. A 489 * subsequent call reads the next increment, and so on. 490 * <P> 491 * If the number of bytes remaining in the stream is less than the length of 492 * array <code>value</code>, the bytes should be read into the array. The 493 * return value of the total number of bytes read will be less than the 494 * length of the array, indicating that there are no more bytes left to be 495 * read from the stream. The next read of the stream returns -1. <p/> If 496 * <code>length</code> is negative, or <code>length</code> is greater 497 * than the length of the array <code>value</code>, then an 498 * <code>IndexOutOfBoundsException</code> is thrown. No bytes will be read 499 * from the stream for this exception case. 500 * 501 * @param value the buffer into which the data is read 502 * @param length the number of bytes to read; must be less than or equal to 503 * <code>value.length</code> 504 * @return the total number of bytes read into the buffer, or -1 if there is 505 * no more data because the end of the stream has been reached 506 * @throws JMSException if the JMS provider fails to read the message due to 507 * some internal error. 508 * @throws MessageNotReadableException if the message is in write-only mode. 509 */ 510 @Override 511 public int readBytes(byte[] value, int length) throws JMSException { 512 initializeReading(); 513 try { 514 int n = 0; 515 while (n < length) { 516 int count = this.dataIn.read(value, n, length - n); 517 if (count < 0) { 518 break; 519 } 520 n += count; 521 } 522 if (n == 0 && length > 0) { 523 n = -1; 524 } 525 return n; 526 } catch (EOFException e) { 527 throw JMSExceptionSupport.createMessageEOFException(e); 528 } catch (IOException e) { 529 throw JMSExceptionSupport.createMessageFormatException(e); 530 } 531 } 532 533 /** 534 * Writes a <code>boolean</code> to the bytes message stream as a 1-byte 535 * value. The value <code>true</code> is written as the value 536 * <code>(byte)1</code>; the value <code>false</code> is written as the 537 * value <code>(byte)0</code>. 538 * 539 * @param value the <code>boolean</code> value to be written 540 * @throws JMSException if the JMS provider fails to write the message due 541 * to some internal error. 542 * @throws MessageNotWriteableException if the message is in read-only mode. 543 */ 544 @Override 545 public void writeBoolean(boolean value) throws JMSException { 546 initializeWriting(); 547 try { 548 this.dataOut.writeBoolean(value); 549 } catch (IOException ioe) { 550 throw JMSExceptionSupport.create(ioe); 551 } 552 } 553 554 /** 555 * Writes a <code>byte</code> to the bytes message stream as a 1-byte 556 * value. 557 * 558 * @param value the <code>byte</code> value to be written 559 * @throws JMSException if the JMS provider fails to write the message due 560 * to some internal error. 561 * @throws MessageNotWriteableException if the message is in read-only mode. 562 */ 563 @Override 564 public void writeByte(byte value) throws JMSException { 565 initializeWriting(); 566 try { 567 this.dataOut.writeByte(value); 568 } catch (IOException ioe) { 569 throw JMSExceptionSupport.create(ioe); 570 } 571 } 572 573 /** 574 * Writes a <code>short</code> to the bytes message stream as two bytes, 575 * high byte first. 576 * 577 * @param value the <code>short</code> to be written 578 * @throws JMSException if the JMS provider fails to write the message due 579 * to some internal error. 580 * @throws MessageNotWriteableException if the message is in read-only mode. 581 */ 582 @Override 583 public void writeShort(short value) throws JMSException { 584 initializeWriting(); 585 try { 586 this.dataOut.writeShort(value); 587 } catch (IOException ioe) { 588 throw JMSExceptionSupport.create(ioe); 589 } 590 } 591 592 /** 593 * Writes a <code>char</code> to the bytes message stream as a 2-byte 594 * value, high byte first. 595 * 596 * @param value the <code>char</code> value to be written 597 * @throws JMSException if the JMS provider fails to write the message due 598 * to some internal error. 599 * @throws MessageNotWriteableException if the message is in read-only mode. 600 */ 601 @Override 602 public void writeChar(char value) throws JMSException { 603 initializeWriting(); 604 try { 605 this.dataOut.writeChar(value); 606 } catch (IOException ioe) { 607 throw JMSExceptionSupport.create(ioe); 608 } 609 } 610 611 /** 612 * Writes an <code>int</code> to the bytes message stream as four bytes, 613 * high byte first. 614 * 615 * @param value the <code>int</code> to be written 616 * @throws JMSException if the JMS provider fails to write the message due 617 * to some internal error. 618 * @throws MessageNotWriteableException if the message is in read-only mode. 619 */ 620 @Override 621 public void writeInt(int value) throws JMSException { 622 initializeWriting(); 623 try { 624 this.dataOut.writeInt(value); 625 } catch (IOException ioe) { 626 throw JMSExceptionSupport.create(ioe); 627 } 628 } 629 630 /** 631 * Writes a <code>long</code> to the bytes message stream as eight bytes, 632 * high byte first. 633 * 634 * @param value the <code>long</code> to be written 635 * @throws JMSException if the JMS provider fails to write the message due 636 * to some internal error. 637 * @throws MessageNotWriteableException if the message is in read-only mode. 638 */ 639 @Override 640 public void writeLong(long value) throws JMSException { 641 initializeWriting(); 642 try { 643 this.dataOut.writeLong(value); 644 } catch (IOException ioe) { 645 throw JMSExceptionSupport.create(ioe); 646 } 647 } 648 649 /** 650 * Converts the <code>float</code> argument to an <code>int</code> using 651 * the <code>floatToIntBits</code> method in class <code>Float</code>, 652 * and then writes that <code>int</code> value to the bytes message stream 653 * as a 4-byte quantity, high byte first. 654 * 655 * @param value the <code>float</code> value to be written 656 * @throws JMSException if the JMS provider fails to write the message due 657 * to some internal error. 658 * @throws MessageNotWriteableException if the message is in read-only mode. 659 */ 660 @Override 661 public void writeFloat(float value) throws JMSException { 662 initializeWriting(); 663 try { 664 this.dataOut.writeFloat(value); 665 } catch (IOException ioe) { 666 throw JMSExceptionSupport.create(ioe); 667 } 668 } 669 670 /** 671 * Converts the <code>double</code> argument to a <code>long</code> 672 * using the <code>doubleToLongBits</code> method in class 673 * <code>Double</code>, and then writes that <code>long</code> value to 674 * the bytes message stream as an 8-byte quantity, high byte first. 675 * 676 * @param value the <code>double</code> value to be written 677 * @throws JMSException if the JMS provider fails to write the message due 678 * to some internal error. 679 * @throws MessageNotWriteableException if the message is in read-only mode. 680 */ 681 @Override 682 public void writeDouble(double value) throws JMSException { 683 initializeWriting(); 684 try { 685 this.dataOut.writeDouble(value); 686 } catch (IOException ioe) { 687 throw JMSExceptionSupport.create(ioe); 688 } 689 } 690 691 /** 692 * Writes a string to the bytes message stream using UTF-8 encoding in a 693 * machine-independent manner. 694 * <P> 695 * For more information on the UTF-8 format, see "File System Safe UCS 696 * Transformation Format (FSS_UTF)", X/Open Preliminary Specification, 697 * X/Open Company Ltd., Document Number: P316. This information also appears 698 * in ISO/IEC 10646, Annex P. 699 * 700 * @param value the <code>String</code> value to be written 701 * @throws JMSException if the JMS provider fails to write the message due 702 * to some internal error. 703 * @throws MessageNotWriteableException if the message is in read-only mode. 704 */ 705 @Override 706 public void writeUTF(String value) throws JMSException { 707 initializeWriting(); 708 try { 709 this.dataOut.writeUTF(value); 710 } catch (IOException ioe) { 711 throw JMSExceptionSupport.create(ioe); 712 } 713 } 714 715 /** 716 * Writes a byte array to the bytes message stream. 717 * 718 * @param value the byte array to be written 719 * @throws JMSException if the JMS provider fails to write the message due 720 * to some internal error. 721 * @throws MessageNotWriteableException if the message is in read-only mode. 722 */ 723 @Override 724 public void writeBytes(byte[] value) throws JMSException { 725 initializeWriting(); 726 try { 727 this.dataOut.write(value); 728 } catch (IOException ioe) { 729 throw JMSExceptionSupport.create(ioe); 730 } 731 } 732 733 /** 734 * Writes a portion of a byte array to the bytes message stream. 735 * 736 * @param value the byte array value to be written 737 * @param offset the initial offset within the byte array 738 * @param length the number of bytes to use 739 * @throws JMSException if the JMS provider fails to write the message due 740 * to some internal error. 741 * @throws MessageNotWriteableException if the message is in read-only mode. 742 */ 743 @Override 744 public void writeBytes(byte[] value, int offset, int length) throws JMSException { 745 initializeWriting(); 746 try { 747 this.dataOut.write(value, offset, length); 748 } catch (IOException ioe) { 749 throw JMSExceptionSupport.create(ioe); 750 } 751 } 752 753 /** 754 * Writes an object to the bytes message stream. 755 * <P> 756 * This method works only for the objectified primitive object types (<code>Integer</code>,<code>Double</code>, 757 * <code>Long</code> ...), <code>String</code> objects, and byte 758 * arrays. 759 * 760 * @param value the object in the Java programming language ("Java object") 761 * to be written; it must not be null 762 * @throws JMSException if the JMS provider fails to write the message due 763 * to some internal error. 764 * @throws MessageFormatException if the object is of an invalid type. 765 * @throws MessageNotWriteableException if the message is in read-only mode. 766 * @throws java.lang.NullPointerException if the parameter 767 * <code>value</code> is null. 768 */ 769 @Override 770 public void writeObject(Object value) throws JMSException { 771 if (value == null) { 772 throw new NullPointerException(); 773 } 774 initializeWriting(); 775 if (value instanceof Boolean) { 776 writeBoolean(((Boolean)value).booleanValue()); 777 } else if (value instanceof Character) { 778 writeChar(((Character)value).charValue()); 779 } else if (value instanceof Byte) { 780 writeByte(((Byte)value).byteValue()); 781 } else if (value instanceof Short) { 782 writeShort(((Short)value).shortValue()); 783 } else if (value instanceof Integer) { 784 writeInt(((Integer)value).intValue()); 785 } else if (value instanceof Long) { 786 writeLong(((Long)value).longValue()); 787 } else if (value instanceof Float) { 788 writeFloat(((Float)value).floatValue()); 789 } else if (value instanceof Double) { 790 writeDouble(((Double)value).doubleValue()); 791 } else if (value instanceof String) { 792 writeUTF(value.toString()); 793 } else if (value instanceof byte[]) { 794 writeBytes((byte[])value); 795 } else { 796 throw new MessageFormatException("Cannot write non-primitive type:" + value.getClass()); 797 } 798 } 799 800 /** 801 * Puts the message body in read-only mode and repositions the stream of 802 * bytes to the beginning. 803 * 804 * @throws JMSException if an internal error occurs 805 */ 806 @Override 807 public void reset() throws JMSException { 808 storeContent(); 809 setReadOnlyBody(true); 810 try { 811 if (bytesOut != null) { 812 bytesOut.close(); 813 bytesOut = null; 814 } 815 if (dataIn != null) { 816 dataIn.close(); 817 dataIn = null; 818 } 819 if (dataOut != null) { 820 dataOut.close(); 821 dataOut = null; 822 } 823 } catch (IOException ioe) { 824 throw JMSExceptionSupport.create(ioe); 825 } 826 } 827 828 private void initializeWriting() throws JMSException { 829 checkReadOnlyBody(); 830 if (this.dataOut == null) { 831 this.bytesOut = new ByteArrayOutputStream(); 832 OutputStream os = bytesOut; 833 this.dataOut = new DataOutputStream(os); 834 } 835 836 restoreOldContent(); 837 } 838 839 private void restoreOldContent() throws JMSException { 840 // For a message that already had a body and was sent we need to restore the content 841 // if the message is used again without having its clearBody method called. 842 if (this.content != null && this.content.length > 0) { 843 try { 844 ByteSequence toRestore = this.content; 845 if (compressed) { 846 toRestore = new ByteSequence(decompress(this.content)); 847 } 848 849 this.dataOut.write(toRestore.getData(), toRestore.getOffset(), toRestore.getLength()); 850 // Free up the buffer from the old content, will be re-written when 851 // the message is sent again and storeContent() is called. 852 this.content = null; 853 } catch (IOException ioe) { 854 throw JMSExceptionSupport.create(ioe); 855 } 856 } 857 } 858 859 protected void checkWriteOnlyBody() throws MessageNotReadableException { 860 if (!readOnlyBody) { 861 throw new MessageNotReadableException("Message body is write-only"); 862 } 863 } 864 865 private void initializeReading() throws JMSException { 866 checkWriteOnlyBody(); 867 if (dataIn == null) { 868 try { 869 ByteSequence data = getContent(); 870 if (data == null) { 871 data = new ByteSequence(new byte[] {}, 0, 0); 872 } 873 InputStream is = new ByteArrayInputStream(data); 874 if (isCompressed()) { 875 if (data.length != 0) { 876 is = new ByteArrayInputStream(decompress(data)); 877 } 878 } else { 879 length = data.getLength(); 880 } 881 882 dataIn = new DataInputStream(is); 883 } catch (IOException ioe) { 884 throw JMSExceptionSupport.create(ioe); 885 } 886 } 887 } 888 889 protected byte[] decompress(ByteSequence dataSequence) throws IOException { 890 Inflater inflater = new Inflater(); 891 ByteArrayOutputStream decompressed = new ByteArrayOutputStream(); 892 try { 893 length = ByteSequenceData.readIntBig(dataSequence); 894 dataSequence.offset = 0; 895 byte[] data = Arrays.copyOfRange(dataSequence.getData(), 4, dataSequence.getLength()); 896 inflater.setInput(data); 897 byte[] buffer = new byte[length]; 898 int count = inflater.inflate(buffer); 899 decompressed.write(buffer, 0, count); 900 return decompressed.toByteArray(); 901 } catch (Exception e) { 902 throw new IOException(e); 903 } finally { 904 inflater.end(); 905 decompressed.close(); 906 } 907 } 908 909 @Override 910 public void setObjectProperty(String name, Object value) throws JMSException { 911 initializeWriting(); 912 super.setObjectProperty(name, value); 913 } 914 915 @Override 916 public String toString() { 917 return super.toString() + " ActiveMQBytesMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }"; 918 } 919 920 @Override 921 protected void doCompress() throws IOException { 922 compressed = true; 923 ByteSequence bytes = getContent(); 924 if (bytes != null) { 925 int length = bytes.getLength(); 926 ByteArrayOutputStream compressed = new ByteArrayOutputStream(); 927 compressed.write(new byte[4]); 928 Deflater deflater = new Deflater(); 929 try { 930 deflater.setInput(bytes.data); 931 deflater.finish(); 932 byte[] buffer = new byte[1024]; 933 while (!deflater.finished()) { 934 int count = deflater.deflate(buffer); 935 compressed.write(buffer, 0, count); 936 } 937 938 bytes = compressed.toByteSequence(); 939 ByteSequenceData.writeIntBig(bytes, length); 940 bytes.offset = 0; 941 setContent(bytes); 942 } finally { 943 deflater.end(); 944 compressed.close(); 945 } 946 } 947 } 948}