001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.command; 019 020import java.io.BufferedInputStream; 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.EOFException; 024import java.io.IOException; 025import java.io.InputStream; 026import java.io.OutputStream; 027import java.util.zip.DeflaterOutputStream; 028import java.util.zip.InflaterInputStream; 029 030import javax.jms.JMSException; 031import javax.jms.MessageEOFException; 032import javax.jms.MessageFormatException; 033import javax.jms.MessageNotReadableException; 034import javax.jms.MessageNotWriteableException; 035import javax.jms.StreamMessage; 036 037import org.apache.activemq.ActiveMQConnection; 038import org.apache.activemq.util.ByteArrayInputStream; 039import org.apache.activemq.util.ByteArrayOutputStream; 040import org.apache.activemq.util.ByteSequence; 041import org.apache.activemq.util.JMSExceptionSupport; 042import org.apache.activemq.util.MarshallingSupport; 043 044/** 045 * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive 046 * types in the Java programming language. It is filled and read sequentially. 047 * It inherits from the <CODE>Message</CODE> interface and adds a stream 048 * message body. Its methods are based largely on those found in 049 * <CODE>java.io.DataInputStream</CODE> and 050 * <CODE>java.io.DataOutputStream</CODE>. <p/> 051 * <P> 052 * The primitive types can be read or written explicitly using methods for each 053 * type. They may also be read or written generically as objects. For instance, 054 * a call to <CODE>StreamMessage.writeInt(6)</CODE> is equivalent to 055 * <CODE>StreamMessage.writeObject(new 056 * Integer(6))</CODE>. Both forms are 057 * provided, because the explicit form is convenient for static programming, and 058 * the object form is needed when types are not known at compile time. <p/> 059 * <P> 060 * When the message is first created, and when <CODE>clearBody</CODE> is 061 * called, the body of the message is in write-only mode. After the first call 062 * to <CODE>reset</CODE> has been made, the message body is in read-only mode. 063 * After a message has been sent, the client that sent it can retain and modify 064 * it without affecting the message that has been sent. The same message object 065 * can be sent multiple times. When a message has been received, the provider 066 * has called <CODE>reset</CODE> so that the message body is in read-only mode 067 * for the client. <p/> 068 * <P> 069 * If <CODE>clearBody</CODE> is called on a message in read-only mode, the 070 * message body is cleared and the message body is in write-only mode. <p/> 071 * <P> 072 * If a client attempts to read a message in write-only mode, a 073 * <CODE>MessageNotReadableException</CODE> is thrown. <p/> 074 * <P> 075 * If a client attempts to write a message in read-only mode, a 076 * <CODE>MessageNotWriteableException</CODE> is thrown. <p/> 077 * <P> 078 * <CODE>StreamMessage</CODE> objects support the following conversion table. 079 * The marked cases must be supported. The unmarked cases must throw a 080 * <CODE>JMSException</CODE>. The <CODE>String</CODE>-to-primitive 081 * conversions may throw a runtime exception if the primitive's 082 * <CODE>valueOf()</CODE> method does not accept it as a valid 083 * <CODE>String</CODE> representation of the primitive. <p/> 084 * <P> 085 * A value written as the row type can be read as the column type. <p/> 086 * 087 * <PRE> 088 * | | boolean byte short char int long float double String byte[] 089 * |---------------------------------------------------------------------- 090 * |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X 091 * |long | X X |float | X X X |double | X X |String | X X X X X X X X |byte[] | 092 * X |---------------------------------------------------------------------- 093 * 094 * </PRE> 095 * 096 * <p/> 097 * <P> 098 * Attempting to read a null value as a primitive type must be treated as 099 * calling the primitive's corresponding <code>valueOf(String)</code> 100 * conversion method with a null value. Since <code>char</code> does not 101 * support a <code>String</code> conversion, attempting to read a null value 102 * as a <code>char</code> must throw a <code>NullPointerException</code>. 103 * 104 * @openwire:marshaller code="27" 105 * @see javax.jms.Session#createStreamMessage() 106 * @see javax.jms.BytesMessage 107 * @see javax.jms.MapMessage 108 * @see javax.jms.Message 109 * @see javax.jms.ObjectMessage 110 * @see javax.jms.TextMessage 111 */ 112public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMessage { 113 114 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_STREAM_MESSAGE; 115 116 protected transient DataOutputStream dataOut; 117 protected transient ByteArrayOutputStream bytesOut; 118 protected transient DataInputStream dataIn; 119 protected transient int remainingBytes = -1; 120 121 @Override 122 public Message copy() { 123 ActiveMQStreamMessage copy = new ActiveMQStreamMessage(); 124 copy(copy); 125 return copy; 126 } 127 128 private void copy(ActiveMQStreamMessage copy) { 129 storeContent(); 130 super.copy(copy); 131 copy.dataOut = null; 132 copy.bytesOut = null; 133 copy.dataIn = null; 134 } 135 136 @Override 137 public void onSend() throws JMSException { 138 super.onSend(); 139 storeContent(); 140 } 141 142 @Override 143 public void storeContent() { 144 if (dataOut != null) { 145 try { 146 dataOut.close(); 147 setContent(bytesOut.toByteSequence()); 148 bytesOut = null; 149 dataOut = null; 150 } catch (IOException ioe) { 151 throw new RuntimeException(ioe); 152 } 153 } 154 } 155 156 @Override 157 public byte getDataStructureType() { 158 return DATA_STRUCTURE_TYPE; 159 } 160 161 @Override 162 public String getJMSXMimeType() { 163 return "jms/stream-message"; 164 } 165 166 /** 167 * Clears out the message body. Clearing a message's body does not clear its 168 * header values or property entries. <p/> 169 * <P> 170 * If this message body was read-only, calling this method leaves the 171 * message body in the same state as an empty body in a newly created 172 * message. 173 * 174 * @throws JMSException if the JMS provider fails to clear the message body 175 * due to some internal error. 176 */ 177 178 @Override 179 public void clearBody() throws JMSException { 180 super.clearBody(); 181 this.dataOut = null; 182 this.dataIn = null; 183 this.bytesOut = null; 184 this.remainingBytes = -1; 185 } 186 187 /** 188 * Reads a <code>boolean</code> from the stream message. 189 * 190 * @return the <code>boolean</code> value read 191 * @throws JMSException if the JMS provider fails to read the message due to 192 * some internal error. 193 * @throws MessageEOFException if unexpected end of message stream has been 194 * reached. 195 * @throws MessageFormatException if this type conversion is invalid. 196 * @throws MessageNotReadableException if the message is in write-only mode. 197 */ 198 199 @Override 200 public boolean readBoolean() throws JMSException { 201 initializeReading(); 202 try { 203 204 this.dataIn.mark(10); 205 int type = this.dataIn.read(); 206 if (type == -1) { 207 throw new MessageEOFException("reached end of data"); 208 } 209 if (type == MarshallingSupport.BOOLEAN_TYPE) { 210 return this.dataIn.readBoolean(); 211 } 212 if (type == MarshallingSupport.STRING_TYPE) { 213 return Boolean.valueOf(this.dataIn.readUTF()).booleanValue(); 214 } 215 if (type == MarshallingSupport.NULL) { 216 this.dataIn.reset(); 217 throw new NullPointerException("Cannot convert NULL value to boolean."); 218 } else { 219 this.dataIn.reset(); 220 throw new MessageFormatException(" not a boolean type"); 221 } 222 } catch (EOFException e) { 223 throw JMSExceptionSupport.createMessageEOFException(e); 224 } catch (IOException e) { 225 throw JMSExceptionSupport.createMessageFormatException(e); 226 } 227 } 228 229 /** 230 * Reads a <code>byte</code> value from the stream message. 231 * 232 * @return the next byte from the stream message as a 8-bit 233 * <code>byte</code> 234 * @throws JMSException if the JMS provider fails to read the message due to 235 * some internal error. 236 * @throws MessageEOFException if unexpected end of message stream has been 237 * reached. 238 * @throws MessageFormatException if this type conversion is invalid. 239 * @throws MessageNotReadableException if the message is in write-only mode. 240 */ 241 242 @Override 243 public byte readByte() throws JMSException { 244 initializeReading(); 245 try { 246 247 this.dataIn.mark(10); 248 int type = this.dataIn.read(); 249 if (type == -1) { 250 throw new MessageEOFException("reached end of data"); 251 } 252 if (type == MarshallingSupport.BYTE_TYPE) { 253 return this.dataIn.readByte(); 254 } 255 if (type == MarshallingSupport.STRING_TYPE) { 256 return Byte.valueOf(this.dataIn.readUTF()).byteValue(); 257 } 258 if (type == MarshallingSupport.NULL) { 259 this.dataIn.reset(); 260 throw new NullPointerException("Cannot convert NULL value to byte."); 261 } else { 262 this.dataIn.reset(); 263 throw new MessageFormatException(" not a byte type"); 264 } 265 } catch (NumberFormatException mfe) { 266 try { 267 this.dataIn.reset(); 268 } catch (IOException ioe) { 269 throw JMSExceptionSupport.create(ioe); 270 } 271 throw mfe; 272 273 } catch (EOFException e) { 274 throw JMSExceptionSupport.createMessageEOFException(e); 275 } catch (IOException e) { 276 throw JMSExceptionSupport.createMessageFormatException(e); 277 } 278 } 279 280 /** 281 * Reads a 16-bit integer from the stream message. 282 * 283 * @return a 16-bit integer from the stream message 284 * @throws JMSException if the JMS provider fails to read the message due to 285 * some internal error. 286 * @throws MessageEOFException if unexpected end of message stream has been 287 * reached. 288 * @throws MessageFormatException if this type conversion is invalid. 289 * @throws MessageNotReadableException if the message is in write-only mode. 290 */ 291 292 @Override 293 public short readShort() throws JMSException { 294 initializeReading(); 295 try { 296 297 this.dataIn.mark(17); 298 int type = this.dataIn.read(); 299 if (type == -1) { 300 throw new MessageEOFException("reached end of data"); 301 } 302 if (type == MarshallingSupport.SHORT_TYPE) { 303 return this.dataIn.readShort(); 304 } 305 if (type == MarshallingSupport.BYTE_TYPE) { 306 return this.dataIn.readByte(); 307 } 308 if (type == MarshallingSupport.STRING_TYPE) { 309 return Short.valueOf(this.dataIn.readUTF()).shortValue(); 310 } 311 if (type == MarshallingSupport.NULL) { 312 this.dataIn.reset(); 313 throw new NullPointerException("Cannot convert NULL value to short."); 314 } else { 315 this.dataIn.reset(); 316 throw new MessageFormatException(" not a short type"); 317 } 318 } catch (NumberFormatException mfe) { 319 try { 320 this.dataIn.reset(); 321 } catch (IOException ioe) { 322 throw JMSExceptionSupport.create(ioe); 323 } 324 throw mfe; 325 326 } catch (EOFException e) { 327 throw JMSExceptionSupport.createMessageEOFException(e); 328 } catch (IOException e) { 329 throw JMSExceptionSupport.createMessageFormatException(e); 330 } 331 332 } 333 334 /** 335 * Reads a Unicode character value from the stream message. 336 * 337 * @return a Unicode character from the stream message 338 * @throws JMSException if the JMS provider fails to read the message due to 339 * some internal error. 340 * @throws MessageEOFException if unexpected end of message stream has been 341 * reached. 342 * @throws MessageFormatException if this type conversion is invalid 343 * @throws MessageNotReadableException if the message is in write-only mode. 344 */ 345 346 @Override 347 public char readChar() throws JMSException { 348 initializeReading(); 349 try { 350 351 this.dataIn.mark(17); 352 int type = this.dataIn.read(); 353 if (type == -1) { 354 throw new MessageEOFException("reached end of data"); 355 } 356 if (type == MarshallingSupport.CHAR_TYPE) { 357 return this.dataIn.readChar(); 358 } 359 if (type == MarshallingSupport.NULL) { 360 this.dataIn.reset(); 361 throw new NullPointerException("Cannot convert NULL value to char."); 362 } else { 363 this.dataIn.reset(); 364 throw new MessageFormatException(" not a char type"); 365 } 366 } catch (NumberFormatException mfe) { 367 try { 368 this.dataIn.reset(); 369 } catch (IOException ioe) { 370 throw JMSExceptionSupport.create(ioe); 371 } 372 throw mfe; 373 374 } catch (EOFException e) { 375 throw JMSExceptionSupport.createMessageEOFException(e); 376 } catch (IOException e) { 377 throw JMSExceptionSupport.createMessageFormatException(e); 378 } 379 } 380 381 /** 382 * Reads a 32-bit integer from the stream message. 383 * 384 * @return a 32-bit integer value from the stream message, interpreted as an 385 * <code>int</code> 386 * @throws JMSException if the JMS provider fails to read the message due to 387 * some internal error. 388 * @throws MessageEOFException if unexpected end of message stream has been 389 * reached. 390 * @throws MessageFormatException if this type conversion is invalid. 391 * @throws MessageNotReadableException if the message is in write-only mode. 392 */ 393 394 @Override 395 public int readInt() throws JMSException { 396 initializeReading(); 397 try { 398 399 this.dataIn.mark(33); 400 int type = this.dataIn.read(); 401 if (type == -1) { 402 throw new MessageEOFException("reached end of data"); 403 } 404 if (type == MarshallingSupport.INTEGER_TYPE) { 405 return this.dataIn.readInt(); 406 } 407 if (type == MarshallingSupport.SHORT_TYPE) { 408 return this.dataIn.readShort(); 409 } 410 if (type == MarshallingSupport.BYTE_TYPE) { 411 return this.dataIn.readByte(); 412 } 413 if (type == MarshallingSupport.STRING_TYPE) { 414 return Integer.valueOf(this.dataIn.readUTF()).intValue(); 415 } 416 if (type == MarshallingSupport.NULL) { 417 this.dataIn.reset(); 418 throw new NullPointerException("Cannot convert NULL value to int."); 419 } else { 420 this.dataIn.reset(); 421 throw new MessageFormatException(" not an int type"); 422 } 423 } catch (NumberFormatException mfe) { 424 try { 425 this.dataIn.reset(); 426 } catch (IOException ioe) { 427 throw JMSExceptionSupport.create(ioe); 428 } 429 throw mfe; 430 431 } catch (EOFException e) { 432 throw JMSExceptionSupport.createMessageEOFException(e); 433 } catch (IOException e) { 434 throw JMSExceptionSupport.createMessageFormatException(e); 435 } 436 } 437 438 /** 439 * Reads a 64-bit integer from the stream message. 440 * 441 * @return a 64-bit integer value from the stream message, interpreted as a 442 * <code>long</code> 443 * @throws JMSException if the JMS provider fails to read the message due to 444 * some internal error. 445 * @throws MessageEOFException if unexpected end of message stream has been 446 * reached. 447 * @throws MessageFormatException if this type conversion is invalid. 448 * @throws MessageNotReadableException if the message is in write-only mode. 449 */ 450 451 @Override 452 public long readLong() throws JMSException { 453 initializeReading(); 454 try { 455 456 this.dataIn.mark(65); 457 int type = this.dataIn.read(); 458 if (type == -1) { 459 throw new MessageEOFException("reached end of data"); 460 } 461 if (type == MarshallingSupport.LONG_TYPE) { 462 return this.dataIn.readLong(); 463 } 464 if (type == MarshallingSupport.INTEGER_TYPE) { 465 return this.dataIn.readInt(); 466 } 467 if (type == MarshallingSupport.SHORT_TYPE) { 468 return this.dataIn.readShort(); 469 } 470 if (type == MarshallingSupport.BYTE_TYPE) { 471 return this.dataIn.readByte(); 472 } 473 if (type == MarshallingSupport.STRING_TYPE) { 474 return Long.valueOf(this.dataIn.readUTF()).longValue(); 475 } 476 if (type == MarshallingSupport.NULL) { 477 this.dataIn.reset(); 478 throw new NullPointerException("Cannot convert NULL value to long."); 479 } else { 480 this.dataIn.reset(); 481 throw new MessageFormatException(" not a long type"); 482 } 483 } catch (NumberFormatException mfe) { 484 try { 485 this.dataIn.reset(); 486 } catch (IOException ioe) { 487 throw JMSExceptionSupport.create(ioe); 488 } 489 throw mfe; 490 491 } catch (EOFException e) { 492 throw JMSExceptionSupport.createMessageEOFException(e); 493 } catch (IOException e) { 494 throw JMSExceptionSupport.createMessageFormatException(e); 495 } 496 } 497 498 /** 499 * Reads a <code>float</code> from the stream message. 500 * 501 * @return a <code>float</code> value from the stream message 502 * @throws JMSException if the JMS provider fails to read the message due to 503 * some internal error. 504 * @throws MessageEOFException if unexpected end of message stream has been 505 * reached. 506 * @throws MessageFormatException if this type conversion is invalid. 507 * @throws MessageNotReadableException if the message is in write-only mode. 508 */ 509 510 @Override 511 public float readFloat() throws JMSException { 512 initializeReading(); 513 try { 514 this.dataIn.mark(33); 515 int type = this.dataIn.read(); 516 if (type == -1) { 517 throw new MessageEOFException("reached end of data"); 518 } 519 if (type == MarshallingSupport.FLOAT_TYPE) { 520 return this.dataIn.readFloat(); 521 } 522 if (type == MarshallingSupport.STRING_TYPE) { 523 return Float.valueOf(this.dataIn.readUTF()).floatValue(); 524 } 525 if (type == MarshallingSupport.NULL) { 526 this.dataIn.reset(); 527 throw new NullPointerException("Cannot convert NULL value to float."); 528 } else { 529 this.dataIn.reset(); 530 throw new MessageFormatException(" not a float type"); 531 } 532 } catch (NumberFormatException mfe) { 533 try { 534 this.dataIn.reset(); 535 } catch (IOException ioe) { 536 throw JMSExceptionSupport.create(ioe); 537 } 538 throw mfe; 539 540 } catch (EOFException e) { 541 throw JMSExceptionSupport.createMessageEOFException(e); 542 } catch (IOException e) { 543 throw JMSExceptionSupport.createMessageFormatException(e); 544 } 545 } 546 547 /** 548 * Reads a <code>double</code> from the stream message. 549 * 550 * @return a <code>double</code> value from the stream message 551 * @throws JMSException if the JMS provider fails to read the message due to 552 * some internal error. 553 * @throws MessageEOFException if unexpected end of message stream has been 554 * reached. 555 * @throws MessageFormatException if this type conversion is invalid. 556 * @throws MessageNotReadableException if the message is in write-only mode. 557 */ 558 559 @Override 560 public double readDouble() throws JMSException { 561 initializeReading(); 562 try { 563 564 this.dataIn.mark(65); 565 int type = this.dataIn.read(); 566 if (type == -1) { 567 throw new MessageEOFException("reached end of data"); 568 } 569 if (type == MarshallingSupport.DOUBLE_TYPE) { 570 return this.dataIn.readDouble(); 571 } 572 if (type == MarshallingSupport.FLOAT_TYPE) { 573 return this.dataIn.readFloat(); 574 } 575 if (type == MarshallingSupport.STRING_TYPE) { 576 return Double.valueOf(this.dataIn.readUTF()).doubleValue(); 577 } 578 if (type == MarshallingSupport.NULL) { 579 this.dataIn.reset(); 580 throw new NullPointerException("Cannot convert NULL value to double."); 581 } else { 582 this.dataIn.reset(); 583 throw new MessageFormatException(" not a double type"); 584 } 585 } catch (NumberFormatException mfe) { 586 try { 587 this.dataIn.reset(); 588 } catch (IOException ioe) { 589 throw JMSExceptionSupport.create(ioe); 590 } 591 throw mfe; 592 593 } catch (EOFException e) { 594 throw JMSExceptionSupport.createMessageEOFException(e); 595 } catch (IOException e) { 596 throw JMSExceptionSupport.createMessageFormatException(e); 597 } 598 } 599 600 /** 601 * Reads a <CODE>String</CODE> from the stream message. 602 * 603 * @return a Unicode string from the stream message 604 * @throws JMSException if the JMS provider fails to read the message due to 605 * some internal error. 606 * @throws MessageEOFException if unexpected end of message stream has been 607 * reached. 608 * @throws MessageFormatException if this type conversion is invalid. 609 * @throws MessageNotReadableException if the message is in write-only mode. 610 */ 611 612 @Override 613 public String readString() throws JMSException { 614 initializeReading(); 615 try { 616 617 this.dataIn.mark(65); 618 int type = this.dataIn.read(); 619 if (type == -1) { 620 throw new MessageEOFException("reached end of data"); 621 } 622 if (type == MarshallingSupport.NULL) { 623 return null; 624 } 625 if (type == MarshallingSupport.BIG_STRING_TYPE) { 626 return MarshallingSupport.readUTF8(dataIn); 627 } 628 if (type == MarshallingSupport.STRING_TYPE) { 629 return this.dataIn.readUTF(); 630 } 631 if (type == MarshallingSupport.LONG_TYPE) { 632 return new Long(this.dataIn.readLong()).toString(); 633 } 634 if (type == MarshallingSupport.INTEGER_TYPE) { 635 return new Integer(this.dataIn.readInt()).toString(); 636 } 637 if (type == MarshallingSupport.SHORT_TYPE) { 638 return new Short(this.dataIn.readShort()).toString(); 639 } 640 if (type == MarshallingSupport.BYTE_TYPE) { 641 return new Byte(this.dataIn.readByte()).toString(); 642 } 643 if (type == MarshallingSupport.FLOAT_TYPE) { 644 return new Float(this.dataIn.readFloat()).toString(); 645 } 646 if (type == MarshallingSupport.DOUBLE_TYPE) { 647 return new Double(this.dataIn.readDouble()).toString(); 648 } 649 if (type == MarshallingSupport.BOOLEAN_TYPE) { 650 return (this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE).toString(); 651 } 652 if (type == MarshallingSupport.CHAR_TYPE) { 653 return new Character(this.dataIn.readChar()).toString(); 654 } else { 655 this.dataIn.reset(); 656 throw new MessageFormatException(" not a String type"); 657 } 658 } catch (NumberFormatException mfe) { 659 try { 660 this.dataIn.reset(); 661 } catch (IOException ioe) { 662 throw JMSExceptionSupport.create(ioe); 663 } 664 throw mfe; 665 666 } catch (EOFException e) { 667 throw JMSExceptionSupport.createMessageEOFException(e); 668 } catch (IOException e) { 669 throw JMSExceptionSupport.createMessageFormatException(e); 670 } 671 } 672 673 /** 674 * Reads a byte array field from the stream message into the specified 675 * <CODE>byte[]</CODE> object (the read buffer). <p/> 676 * <P> 677 * To read the field value, <CODE>readBytes</CODE> should be successively 678 * called until it returns a value less than the length of the read buffer. 679 * The value of the bytes in the buffer following the last byte read is 680 * undefined. <p/> 681 * <P> 682 * If <CODE>readBytes</CODE> returns a value equal to the length of the 683 * buffer, a subsequent <CODE>readBytes</CODE> call must be made. If there 684 * are no more bytes to be read, this call returns -1. <p/> 685 * <P> 686 * If the byte array field value is null, <CODE>readBytes</CODE> returns 687 * -1. <p/> 688 * <P> 689 * If the byte array field value is empty, <CODE>readBytes</CODE> returns 690 * 0. <p/> 691 * <P> 692 * Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE> 693 * field value has been made, the full value of the field must be read 694 * before it is valid to read the next field. An attempt to read the next 695 * field before that has been done will throw a 696 * <CODE>MessageFormatException</CODE>. <p/> 697 * <P> 698 * To read the byte field value into a new <CODE>byte[]</CODE> object, use 699 * the <CODE>readObject</CODE> method. 700 * 701 * @param value the buffer into which the data is read 702 * @return the total number of bytes read into the buffer, or -1 if there is 703 * no more data because the end of the byte field has been reached 704 * @throws JMSException if the JMS provider fails to read the message due to 705 * some internal error. 706 * @throws MessageEOFException if unexpected end of message stream has been 707 * reached. 708 * @throws MessageFormatException if this type conversion is invalid. 709 * @throws MessageNotReadableException if the message is in write-only mode. 710 * @see #readObject() 711 */ 712 713 @Override 714 public int readBytes(byte[] value) throws JMSException { 715 716 initializeReading(); 717 try { 718 if (value == null) { 719 throw new NullPointerException(); 720 } 721 722 if (remainingBytes == -1) { 723 this.dataIn.mark(value.length + 1); 724 int type = this.dataIn.read(); 725 if (type == -1) { 726 throw new MessageEOFException("reached end of data"); 727 } 728 if (type != MarshallingSupport.BYTE_ARRAY_TYPE) { 729 throw new MessageFormatException("Not a byte array"); 730 } 731 remainingBytes = this.dataIn.readInt(); 732 } else if (remainingBytes == 0) { 733 remainingBytes = -1; 734 return -1; 735 } 736 737 if (value.length <= remainingBytes) { 738 // small buffer 739 remainingBytes -= value.length; 740 this.dataIn.readFully(value); 741 return value.length; 742 } else { 743 // big buffer 744 int rc = this.dataIn.read(value, 0, remainingBytes); 745 remainingBytes = 0; 746 return rc; 747 } 748 749 } catch (EOFException e) { 750 JMSException jmsEx = new MessageEOFException(e.getMessage()); 751 jmsEx.setLinkedException(e); 752 throw jmsEx; 753 } catch (IOException e) { 754 JMSException jmsEx = new MessageFormatException(e.getMessage()); 755 jmsEx.setLinkedException(e); 756 throw jmsEx; 757 } 758 } 759 760 /** 761 * Reads an object from the stream message. <p/> 762 * <P> 763 * This method can be used to return, in objectified format, an object in 764 * the Java programming language ("Java object") that has been written to 765 * the stream with the equivalent <CODE>writeObject</CODE> method call, or 766 * its equivalent primitive <CODE>write<I>type</I></CODE> method. <p/> 767 * <P> 768 * Note that byte values are returned as <CODE>byte[]</CODE>, not 769 * <CODE>Byte[]</CODE>. <p/> 770 * <P> 771 * An attempt to call <CODE>readObject</CODE> to read a byte field value 772 * into a new <CODE>byte[]</CODE> object before the full value of the byte 773 * field has been read will throw a <CODE>MessageFormatException</CODE>. 774 * 775 * @return a Java object from the stream message, in objectified format (for 776 * example, if the object was written as an <CODE>int</CODE>, an 777 * <CODE>Integer</CODE> is returned) 778 * @throws JMSException if the JMS provider fails to read the message due to 779 * some internal error. 780 * @throws MessageEOFException if unexpected end of message stream has been 781 * reached. 782 * @throws MessageFormatException if this type conversion is invalid. 783 * @throws MessageNotReadableException if the message is in write-only mode. 784 * @see #readBytes(byte[] value) 785 */ 786 787 @Override 788 public Object readObject() throws JMSException { 789 initializeReading(); 790 try { 791 this.dataIn.mark(65); 792 int type = this.dataIn.read(); 793 if (type == -1) { 794 throw new MessageEOFException("reached end of data"); 795 } 796 if (type == MarshallingSupport.NULL) { 797 return null; 798 } 799 if (type == MarshallingSupport.BIG_STRING_TYPE) { 800 return MarshallingSupport.readUTF8(dataIn); 801 } 802 if (type == MarshallingSupport.STRING_TYPE) { 803 return this.dataIn.readUTF(); 804 } 805 if (type == MarshallingSupport.LONG_TYPE) { 806 return Long.valueOf(this.dataIn.readLong()); 807 } 808 if (type == MarshallingSupport.INTEGER_TYPE) { 809 return Integer.valueOf(this.dataIn.readInt()); 810 } 811 if (type == MarshallingSupport.SHORT_TYPE) { 812 return Short.valueOf(this.dataIn.readShort()); 813 } 814 if (type == MarshallingSupport.BYTE_TYPE) { 815 return Byte.valueOf(this.dataIn.readByte()); 816 } 817 if (type == MarshallingSupport.FLOAT_TYPE) { 818 return new Float(this.dataIn.readFloat()); 819 } 820 if (type == MarshallingSupport.DOUBLE_TYPE) { 821 return new Double(this.dataIn.readDouble()); 822 } 823 if (type == MarshallingSupport.BOOLEAN_TYPE) { 824 return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE; 825 } 826 if (type == MarshallingSupport.CHAR_TYPE) { 827 return Character.valueOf(this.dataIn.readChar()); 828 } 829 if (type == MarshallingSupport.BYTE_ARRAY_TYPE) { 830 int len = this.dataIn.readInt(); 831 byte[] value = new byte[len]; 832 this.dataIn.readFully(value); 833 return value; 834 } else { 835 this.dataIn.reset(); 836 throw new MessageFormatException("unknown type"); 837 } 838 } catch (NumberFormatException mfe) { 839 try { 840 this.dataIn.reset(); 841 } catch (IOException ioe) { 842 throw JMSExceptionSupport.create(ioe); 843 } 844 throw mfe; 845 846 } catch (EOFException e) { 847 JMSException jmsEx = new MessageEOFException(e.getMessage()); 848 jmsEx.setLinkedException(e); 849 throw jmsEx; 850 } catch (IOException e) { 851 JMSException jmsEx = new MessageFormatException(e.getMessage()); 852 jmsEx.setLinkedException(e); 853 throw jmsEx; 854 } 855 } 856 857 /** 858 * Writes a <code>boolean</code> to the stream message. The value 859 * <code>true</code> is written as the value <code>(byte)1</code>; the 860 * value <code>false</code> is written as the value <code>(byte)0</code>. 861 * 862 * @param value the <code>boolean</code> value to be written 863 * @throws JMSException if the JMS provider fails to write the message due 864 * to some internal error. 865 * @throws MessageNotWriteableException if the message is in read-only mode. 866 */ 867 868 @Override 869 public void writeBoolean(boolean value) throws JMSException { 870 initializeWriting(); 871 try { 872 MarshallingSupport.marshalBoolean(dataOut, value); 873 } catch (IOException ioe) { 874 throw JMSExceptionSupport.create(ioe); 875 } 876 } 877 878 /** 879 * Writes a <code>byte</code> to the stream message. 880 * 881 * @param value the <code>byte</code> value to be written 882 * @throws JMSException if the JMS provider fails to write the message due 883 * to some internal error. 884 * @throws MessageNotWriteableException if the message is in read-only mode. 885 */ 886 887 @Override 888 public void writeByte(byte value) throws JMSException { 889 initializeWriting(); 890 try { 891 MarshallingSupport.marshalByte(dataOut, value); 892 } catch (IOException ioe) { 893 throw JMSExceptionSupport.create(ioe); 894 } 895 } 896 897 /** 898 * Writes a <code>short</code> to the stream message. 899 * 900 * @param value the <code>short</code> value to be written 901 * @throws JMSException if the JMS provider fails to write the message due 902 * to some internal error. 903 * @throws MessageNotWriteableException if the message is in read-only mode. 904 */ 905 906 @Override 907 public void writeShort(short value) throws JMSException { 908 initializeWriting(); 909 try { 910 MarshallingSupport.marshalShort(dataOut, value); 911 } catch (IOException ioe) { 912 throw JMSExceptionSupport.create(ioe); 913 } 914 } 915 916 /** 917 * Writes a <code>char</code> to the stream message. 918 * 919 * @param value the <code>char</code> value to be written 920 * @throws JMSException if the JMS provider fails to write the message due 921 * to some internal error. 922 * @throws MessageNotWriteableException if the message is in read-only mode. 923 */ 924 925 @Override 926 public void writeChar(char value) throws JMSException { 927 initializeWriting(); 928 try { 929 MarshallingSupport.marshalChar(dataOut, value); 930 } catch (IOException ioe) { 931 throw JMSExceptionSupport.create(ioe); 932 } 933 } 934 935 /** 936 * Writes an <code>int</code> to the stream message. 937 * 938 * @param value the <code>int</code> value to be written 939 * @throws JMSException if the JMS provider fails to write the message due 940 * to some internal error. 941 * @throws MessageNotWriteableException if the message is in read-only mode. 942 */ 943 944 @Override 945 public void writeInt(int value) throws JMSException { 946 initializeWriting(); 947 try { 948 MarshallingSupport.marshalInt(dataOut, value); 949 } catch (IOException ioe) { 950 throw JMSExceptionSupport.create(ioe); 951 } 952 } 953 954 /** 955 * Writes a <code>long</code> to the stream message. 956 * 957 * @param value the <code>long</code> value to be written 958 * @throws JMSException if the JMS provider fails to write the message due 959 * to some internal error. 960 * @throws MessageNotWriteableException if the message is in read-only mode. 961 */ 962 963 @Override 964 public void writeLong(long value) throws JMSException { 965 initializeWriting(); 966 try { 967 MarshallingSupport.marshalLong(dataOut, value); 968 } catch (IOException ioe) { 969 throw JMSExceptionSupport.create(ioe); 970 } 971 } 972 973 /** 974 * Writes a <code>float</code> to the stream message. 975 * 976 * @param value the <code>float</code> value to be written 977 * @throws JMSException if the JMS provider fails to write the message due 978 * to some internal error. 979 * @throws MessageNotWriteableException if the message is in read-only mode. 980 */ 981 982 @Override 983 public void writeFloat(float value) throws JMSException { 984 initializeWriting(); 985 try { 986 MarshallingSupport.marshalFloat(dataOut, value); 987 } catch (IOException ioe) { 988 throw JMSExceptionSupport.create(ioe); 989 } 990 } 991 992 /** 993 * Writes a <code>double</code> to the stream message. 994 * 995 * @param value the <code>double</code> value to be written 996 * @throws JMSException if the JMS provider fails to write the message due 997 * to some internal error. 998 * @throws MessageNotWriteableException if the message is in read-only mode. 999 */ 1000 1001 @Override 1002 public void writeDouble(double value) throws JMSException { 1003 initializeWriting(); 1004 try { 1005 MarshallingSupport.marshalDouble(dataOut, value); 1006 } catch (IOException ioe) { 1007 throw JMSExceptionSupport.create(ioe); 1008 } 1009 } 1010 1011 /** 1012 * Writes a <code>String</code> to the stream message. 1013 * 1014 * @param value the <code>String</code> value to be written 1015 * @throws JMSException if the JMS provider fails to write the message due 1016 * to some internal error. 1017 * @throws MessageNotWriteableException if the message is in read-only mode. 1018 */ 1019 1020 @Override 1021 public void writeString(String value) throws JMSException { 1022 initializeWriting(); 1023 try { 1024 if (value == null) { 1025 MarshallingSupport.marshalNull(dataOut); 1026 } else { 1027 MarshallingSupport.marshalString(dataOut, value); 1028 } 1029 } catch (IOException ioe) { 1030 throw JMSExceptionSupport.create(ioe); 1031 } 1032 } 1033 1034 /** 1035 * Writes a byte array field to the stream message. <p/> 1036 * <P> 1037 * The byte array <code>value</code> is written to the message as a byte 1038 * array field. Consecutively written byte array fields are treated as two 1039 * distinct fields when the fields are read. 1040 * 1041 * @param value the byte array value to be written 1042 * @throws JMSException if the JMS provider fails to write the message due 1043 * to some internal error. 1044 * @throws MessageNotWriteableException if the message is in read-only mode. 1045 */ 1046 1047 @Override 1048 public void writeBytes(byte[] value) throws JMSException { 1049 writeBytes(value, 0, value.length); 1050 } 1051 1052 /** 1053 * Writes a portion of a byte array as a byte array field to the stream 1054 * message. <p/> 1055 * <P> 1056 * The a portion of the byte array <code>value</code> is written to the 1057 * message as a byte array field. Consecutively written byte array fields 1058 * are treated as two distinct fields when the fields are read. 1059 * 1060 * @param value the byte array value to be written 1061 * @param offset the initial offset within the byte array 1062 * @param length the number of bytes to use 1063 * @throws JMSException if the JMS provider fails to write the message due 1064 * to some internal error. 1065 * @throws MessageNotWriteableException if the message is in read-only mode. 1066 */ 1067 1068 @Override 1069 public void writeBytes(byte[] value, int offset, int length) throws JMSException { 1070 initializeWriting(); 1071 try { 1072 MarshallingSupport.marshalByteArray(dataOut, value, offset, length); 1073 } catch (IOException ioe) { 1074 throw JMSExceptionSupport.create(ioe); 1075 } 1076 } 1077 1078 /** 1079 * Writes an object to the stream message. <p/> 1080 * <P> 1081 * This method works only for the objectified primitive object types (<code>Integer</code>, 1082 * <code>Double</code>, <code>Long</code> ...), 1083 * <code>String</code> objects, and byte arrays. 1084 * 1085 * @param value the Java object to be written 1086 * @throws JMSException if the JMS provider fails to write the message due 1087 * to some internal error. 1088 * @throws MessageFormatException if the object is invalid. 1089 * @throws MessageNotWriteableException if the message is in read-only mode. 1090 */ 1091 1092 @Override 1093 public void writeObject(Object value) throws JMSException { 1094 initializeWriting(); 1095 if (value == null) { 1096 try { 1097 MarshallingSupport.marshalNull(dataOut); 1098 } catch (IOException ioe) { 1099 throw JMSExceptionSupport.create(ioe); 1100 } 1101 } else if (value instanceof String) { 1102 writeString(value.toString()); 1103 } else if (value instanceof Character) { 1104 writeChar(((Character)value).charValue()); 1105 } else if (value instanceof Boolean) { 1106 writeBoolean(((Boolean)value).booleanValue()); 1107 } else if (value instanceof Byte) { 1108 writeByte(((Byte)value).byteValue()); 1109 } else if (value instanceof Short) { 1110 writeShort(((Short)value).shortValue()); 1111 } else if (value instanceof Integer) { 1112 writeInt(((Integer)value).intValue()); 1113 } else if (value instanceof Float) { 1114 writeFloat(((Float)value).floatValue()); 1115 } else if (value instanceof Double) { 1116 writeDouble(((Double)value).doubleValue()); 1117 } else if (value instanceof byte[]) { 1118 writeBytes((byte[])value); 1119 }else if (value instanceof Long) { 1120 writeLong(((Long)value).longValue()); 1121 }else { 1122 throw new MessageFormatException("Unsupported Object type: " + value.getClass()); 1123 } 1124 } 1125 1126 /** 1127 * Puts the message body in read-only mode and repositions the stream of 1128 * bytes to the beginning. 1129 * 1130 * @throws JMSException if an internal error occurs 1131 */ 1132 1133 @Override 1134 public void reset() throws JMSException { 1135 storeContent(); 1136 this.bytesOut = null; 1137 this.dataIn = null; 1138 this.dataOut = null; 1139 this.remainingBytes = -1; 1140 setReadOnlyBody(true); 1141 } 1142 1143 private void initializeWriting() throws JMSException { 1144 checkReadOnlyBody(); 1145 if (this.dataOut == null) { 1146 this.bytesOut = new ByteArrayOutputStream(); 1147 OutputStream os = bytesOut; 1148 ActiveMQConnection connection = getConnection(); 1149 if (connection != null && connection.isUseCompression()) { 1150 compressed = true; 1151 os = new DeflaterOutputStream(os); 1152 } 1153 this.dataOut = new DataOutputStream(os); 1154 } 1155 1156 // For a message that already had a body and was sent we need to restore the content 1157 // if the message is used again without having its clearBody method called. 1158 if (this.content != null && this.content.length > 0) { 1159 try { 1160 if (compressed) { 1161 ByteArrayInputStream input = new ByteArrayInputStream(this.content.getData(), this.content.getOffset(), this.content.getLength()); 1162 InflaterInputStream inflater = new InflaterInputStream(input); 1163 try { 1164 byte[] buffer = new byte[8*1024]; 1165 int read = 0; 1166 while ((read = inflater.read(buffer)) != -1) { 1167 this.dataOut.write(buffer, 0, read); 1168 } 1169 } finally { 1170 inflater.close(); 1171 } 1172 } else { 1173 this.dataOut.write(this.content.getData(), this.content.getOffset(), this.content.getLength()); 1174 } 1175 // Free up the buffer from the old content, will be re-written when 1176 // tbe message is sent again and storeContent() is called. 1177 this.content = null; 1178 } catch (IOException ioe) { 1179 throw JMSExceptionSupport.create(ioe); 1180 } 1181 } 1182 } 1183 1184 protected void checkWriteOnlyBody() throws MessageNotReadableException { 1185 if (!readOnlyBody) { 1186 throw new MessageNotReadableException("Message body is write-only"); 1187 } 1188 } 1189 1190 private void initializeReading() throws MessageNotReadableException { 1191 checkWriteOnlyBody(); 1192 if (this.dataIn == null) { 1193 ByteSequence data = getContent(); 1194 if (data == null) { 1195 data = new ByteSequence(new byte[] {}, 0, 0); 1196 } 1197 InputStream is = new ByteArrayInputStream(data); 1198 if (isCompressed()) { 1199 is = new InflaterInputStream(is); 1200 is = new BufferedInputStream(is); 1201 } 1202 this.dataIn = new DataInputStream(is); 1203 } 1204 } 1205 1206 @Override 1207 public void compress() throws IOException { 1208 storeContent(); 1209 super.compress(); 1210 } 1211 1212 @Override 1213 public String toString() { 1214 return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }"; 1215 } 1216}