001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.openwire; 018 019import java.io.DataInput; 020import java.io.DataOutput; 021import java.io.IOException; 022import java.lang.reflect.Method; 023import java.util.HashMap; 024import java.util.Map; 025 026import org.apache.activemq.command.CommandTypes; 027import org.apache.activemq.command.DataStructure; 028import org.apache.activemq.command.WireFormatInfo; 029import org.apache.activemq.util.ByteSequence; 030import org.apache.activemq.util.ByteSequenceData; 031import org.apache.activemq.util.DataByteArrayInputStream; 032import org.apache.activemq.util.DataByteArrayOutputStream; 033import org.apache.activemq.util.IOExceptionSupport; 034import org.apache.activemq.wireformat.WireFormat; 035 036/** 037 * 038 * 039 */ 040public final class OpenWireFormat implements WireFormat { 041 042 public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_STORE_VERSION; 043 public static final int DEFAULT_WIRE_VERSION = CommandTypes.PROTOCOL_VERSION; 044 public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE; 045 046 static final byte NULL_TYPE = CommandTypes.NULL; 047 private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2; 048 private static final int MARSHAL_CACHE_FREE_SPACE = 100; 049 050 private DataStreamMarshaller dataMarshallers[]; 051 private int version; 052 private boolean stackTraceEnabled; 053 private boolean tcpNoDelayEnabled; 054 private boolean cacheEnabled; 055 private boolean tightEncodingEnabled; 056 private boolean sizePrefixDisabled; 057 private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE; 058 059 // The following fields are used for value caching 060 private short nextMarshallCacheIndex; 061 private short nextMarshallCacheEvictionIndex; 062 private Map<DataStructure, Short> marshallCacheMap = new HashMap<DataStructure, Short>(); 063 private DataStructure marshallCache[] = null; 064 private DataStructure unmarshallCache[] = null; 065 private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream(); 066 private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream(); 067 private WireFormatInfo preferedWireFormatInfo; 068 069 public OpenWireFormat() { 070 this(DEFAULT_VERSION); 071 } 072 073 public OpenWireFormat(int i) { 074 setVersion(i); 075 } 076 077 public int hashCode() { 078 return version ^ (cacheEnabled ? 0x10000000 : 0x20000000) 079 ^ (stackTraceEnabled ? 0x01000000 : 0x02000000) 080 ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000) 081 ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000); 082 } 083 084 public OpenWireFormat copy() { 085 OpenWireFormat answer = new OpenWireFormat(version); 086 answer.stackTraceEnabled = stackTraceEnabled; 087 answer.tcpNoDelayEnabled = tcpNoDelayEnabled; 088 answer.cacheEnabled = cacheEnabled; 089 answer.tightEncodingEnabled = tightEncodingEnabled; 090 answer.sizePrefixDisabled = sizePrefixDisabled; 091 answer.preferedWireFormatInfo = preferedWireFormatInfo; 092 return answer; 093 } 094 095 public boolean equals(Object object) { 096 if (object == null) { 097 return false; 098 } 099 OpenWireFormat o = (OpenWireFormat)object; 100 return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled 101 && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled 102 && o.sizePrefixDisabled == sizePrefixDisabled; 103 } 104 105 106 public String toString() { 107 return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled=" 108 + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + ", maxFrameSize=" + maxFrameSize + "}"; 109 // return "OpenWireFormat{id="+id+", 110 // tightEncodingEnabled="+tightEncodingEnabled+"}"; 111 } 112 113 public int getVersion() { 114 return version; 115 } 116 117 public synchronized ByteSequence marshal(Object command) throws IOException { 118 119 if (cacheEnabled) { 120 runMarshallCacheEvictionSweep(); 121 } 122 123 ByteSequence sequence = null; 124 int size = 1; 125 if (command != null) { 126 127 DataStructure c = (DataStructure)command; 128 byte type = c.getDataStructureType(); 129 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 130 if (dsm == null) { 131 throw new IOException("Unknown data type: " + type); 132 } 133 if (tightEncodingEnabled) { 134 135 BooleanStream bs = new BooleanStream(); 136 size += dsm.tightMarshal1(this, c, bs); 137 size += bs.marshalledSize(); 138 139 bytesOut.restart(size); 140 if (!sizePrefixDisabled) { 141 bytesOut.writeInt(size); 142 } 143 bytesOut.writeByte(type); 144 bs.marshal(bytesOut); 145 dsm.tightMarshal2(this, c, bytesOut, bs); 146 sequence = bytesOut.toByteSequence(); 147 148 } else { 149 bytesOut.restart(); 150 if (!sizePrefixDisabled) { 151 bytesOut.writeInt(0); // we don't know the final size 152 // yet but write this here for 153 // now. 154 } 155 bytesOut.writeByte(type); 156 dsm.looseMarshal(this, c, bytesOut); 157 sequence = bytesOut.toByteSequence(); 158 159 if (!sizePrefixDisabled) { 160 size = sequence.getLength() - 4; 161 int pos = sequence.offset; 162 ByteSequenceData.writeIntBig(sequence, size); 163 sequence.offset = pos; 164 } 165 } 166 167 } else { 168 bytesOut.restart(5); 169 bytesOut.writeInt(size); 170 bytesOut.writeByte(NULL_TYPE); 171 sequence = bytesOut.toByteSequence(); 172 } 173 174 return sequence; 175 } 176 177 public synchronized Object unmarshal(ByteSequence sequence) throws IOException { 178 bytesIn.restart(sequence); 179 // DataInputStream dis = new DataInputStream(new 180 // ByteArrayInputStream(sequence)); 181 182 if (!sizePrefixDisabled) { 183 int size = bytesIn.readInt(); 184 if (sequence.getLength() - 4 != size) { 185 // throw new IOException("Packet size does not match marshaled 186 // size"); 187 } 188 189 if (size > maxFrameSize) { 190 throw IOExceptionSupport.createFrameSizeException(size, maxFrameSize); 191 } 192 } 193 194 Object command = doUnmarshal(bytesIn); 195 // if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) { 196 // ((MarshallAware) command).setCachedMarshalledForm(this, sequence); 197 // } 198 return command; 199 } 200 201 public synchronized void marshal(Object o, DataOutput dataOut) throws IOException { 202 203 if (cacheEnabled) { 204 runMarshallCacheEvictionSweep(); 205 } 206 207 int size = 1; 208 if (o != null) { 209 210 DataStructure c = (DataStructure)o; 211 byte type = c.getDataStructureType(); 212 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 213 if (dsm == null) { 214 throw new IOException("Unknown data type: " + type); 215 } 216 if (tightEncodingEnabled) { 217 BooleanStream bs = new BooleanStream(); 218 size += dsm.tightMarshal1(this, c, bs); 219 size += bs.marshalledSize(); 220 221 if (!sizePrefixDisabled) { 222 dataOut.writeInt(size); 223 } 224 225 dataOut.writeByte(type); 226 bs.marshal(dataOut); 227 dsm.tightMarshal2(this, c, dataOut, bs); 228 229 } else { 230 DataOutput looseOut = dataOut; 231 232 if (!sizePrefixDisabled) { 233 bytesOut.restart(); 234 looseOut = bytesOut; 235 } 236 237 looseOut.writeByte(type); 238 dsm.looseMarshal(this, c, looseOut); 239 240 if (!sizePrefixDisabled) { 241 ByteSequence sequence = bytesOut.toByteSequence(); 242 dataOut.writeInt(sequence.getLength()); 243 dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength()); 244 } 245 246 } 247 248 } else { 249 if (!sizePrefixDisabled) { 250 dataOut.writeInt(size); 251 } 252 dataOut.writeByte(NULL_TYPE); 253 } 254 } 255 256 public Object unmarshal(DataInput dis) throws IOException { 257 DataInput dataIn = dis; 258 if (!sizePrefixDisabled) { 259 int size = dis.readInt(); 260 if (size > maxFrameSize) { 261 throw IOExceptionSupport.createFrameSizeException(size, maxFrameSize); 262 } 263 // int size = dis.readInt(); 264 // byte[] data = new byte[size]; 265 // dis.readFully(data); 266 // bytesIn.restart(data); 267 // dataIn = bytesIn; 268 } 269 return doUnmarshal(dataIn); 270 } 271 272 /** 273 * Used by NIO or AIO transports 274 */ 275 public int tightMarshal1(Object o, BooleanStream bs) throws IOException { 276 int size = 1; 277 if (o != null) { 278 DataStructure c = (DataStructure)o; 279 byte type = c.getDataStructureType(); 280 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 281 if (dsm == null) { 282 throw new IOException("Unknown data type: " + type); 283 } 284 285 size += dsm.tightMarshal1(this, c, bs); 286 size += bs.marshalledSize(); 287 } 288 return size; 289 } 290 291 /** 292 * Used by NIO or AIO transports; note that the size is not written as part 293 * of this method. 294 */ 295 public void tightMarshal2(Object o, DataOutput ds, BooleanStream bs) throws IOException { 296 if (cacheEnabled) { 297 runMarshallCacheEvictionSweep(); 298 } 299 300 if (o != null) { 301 DataStructure c = (DataStructure)o; 302 byte type = c.getDataStructureType(); 303 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 304 if (dsm == null) { 305 throw new IOException("Unknown data type: " + type); 306 } 307 ds.writeByte(type); 308 bs.marshal(ds); 309 dsm.tightMarshal2(this, c, ds, bs); 310 } 311 } 312 313 /** 314 * Allows you to dynamically switch the version of the openwire protocol 315 * being used. 316 * 317 * @param version 318 */ 319 public void setVersion(int version) { 320 String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory"; 321 Class mfClass; 322 try { 323 mfClass = Class.forName(mfName, false, getClass().getClassLoader()); 324 } catch (ClassNotFoundException e) { 325 throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: " + version 326 + ", could not load " + mfName) 327 .initCause(e); 328 } 329 try { 330 Method method = mfClass.getMethod("createMarshallerMap", new Class[] {OpenWireFormat.class}); 331 dataMarshallers = (DataStreamMarshaller[])method.invoke(null, new Object[] {this}); 332 } catch (Throwable e) { 333 throw (IllegalArgumentException)new IllegalArgumentException( 334 "Invalid version: " 335 + version 336 + ", " 337 + mfName 338 + " does not properly implement the createMarshallerMap method.") 339 .initCause(e); 340 } 341 this.version = version; 342 } 343 344 public Object doUnmarshal(DataInput dis) throws IOException { 345 byte dataType = dis.readByte(); 346 if (dataType != NULL_TYPE) { 347 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; 348 if (dsm == null) { 349 throw new IOException("Unknown data type: " + dataType); 350 } 351 Object data = dsm.createObject(); 352 if (this.tightEncodingEnabled) { 353 BooleanStream bs = new BooleanStream(); 354 bs.unmarshal(dis); 355 dsm.tightUnmarshal(this, data, dis, bs); 356 } else { 357 dsm.looseUnmarshal(this, data, dis); 358 } 359 return data; 360 } else { 361 return null; 362 } 363 } 364 365 // public void debug(String msg) { 366 // String t = (Thread.currentThread().getName()+" ").substring(0, 40); 367 // System.out.println(t+": "+msg); 368 // } 369 public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException { 370 bs.writeBoolean(o != null); 371 if (o == null) { 372 return 0; 373 } 374 375 if (o.isMarshallAware()) { 376 // MarshallAware ma = (MarshallAware)o; 377 ByteSequence sequence = null; 378 // sequence=ma.getCachedMarshalledForm(this); 379 bs.writeBoolean(sequence != null); 380 if (sequence != null) { 381 return 1 + sequence.getLength(); 382 } 383 } 384 385 byte type = o.getDataStructureType(); 386 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 387 if (dsm == null) { 388 throw new IOException("Unknown data type: " + type); 389 } 390 return 1 + dsm.tightMarshal1(this, o, bs); 391 } 392 393 public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs) 394 throws IOException { 395 if (!bs.readBoolean()) { 396 return; 397 } 398 399 byte type = o.getDataStructureType(); 400 ds.writeByte(type); 401 402 if (o.isMarshallAware() && bs.readBoolean()) { 403 404 // We should not be doing any caching 405 throw new IOException("Corrupted stream"); 406 // MarshallAware ma = (MarshallAware) o; 407 // ByteSequence sequence=ma.getCachedMarshalledForm(this); 408 // ds.write(sequence.getData(), sequence.getOffset(), 409 // sequence.getLength()); 410 411 } else { 412 413 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 414 if (dsm == null) { 415 throw new IOException("Unknown data type: " + type); 416 } 417 dsm.tightMarshal2(this, o, ds, bs); 418 419 } 420 } 421 422 public DataStructure tightUnmarshalNestedObject(DataInput dis, BooleanStream bs) throws IOException { 423 if (bs.readBoolean()) { 424 425 byte dataType = dis.readByte(); 426 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; 427 if (dsm == null) { 428 throw new IOException("Unknown data type: " + dataType); 429 } 430 DataStructure data = dsm.createObject(); 431 432 if (data.isMarshallAware() && bs.readBoolean()) { 433 434 dis.readInt(); 435 dis.readByte(); 436 437 BooleanStream bs2 = new BooleanStream(); 438 bs2.unmarshal(dis); 439 dsm.tightUnmarshal(this, data, dis, bs2); 440 441 // TODO: extract the sequence from the dis and associate it. 442 // MarshallAware ma = (MarshallAware)data 443 // ma.setCachedMarshalledForm(this, sequence); 444 445 } else { 446 dsm.tightUnmarshal(this, data, dis, bs); 447 } 448 449 return data; 450 } else { 451 return null; 452 } 453 } 454 455 public DataStructure looseUnmarshalNestedObject(DataInput dis) throws IOException { 456 if (dis.readBoolean()) { 457 458 byte dataType = dis.readByte(); 459 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; 460 if (dsm == null) { 461 throw new IOException("Unknown data type: " + dataType); 462 } 463 DataStructure data = dsm.createObject(); 464 dsm.looseUnmarshal(this, data, dis); 465 return data; 466 467 } else { 468 return null; 469 } 470 } 471 472 public void looseMarshalNestedObject(DataStructure o, DataOutput dataOut) throws IOException { 473 dataOut.writeBoolean(o != null); 474 if (o != null) { 475 byte type = o.getDataStructureType(); 476 dataOut.writeByte(type); 477 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 478 if (dsm == null) { 479 throw new IOException("Unknown data type: " + type); 480 } 481 dsm.looseMarshal(this, o, dataOut); 482 } 483 } 484 485 public void runMarshallCacheEvictionSweep() { 486 // Do we need to start evicting?? 487 while (marshallCacheMap.size() > marshallCache.length - MARSHAL_CACHE_FREE_SPACE) { 488 489 marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]); 490 marshallCache[nextMarshallCacheEvictionIndex] = null; 491 492 nextMarshallCacheEvictionIndex++; 493 if (nextMarshallCacheEvictionIndex >= marshallCache.length) { 494 nextMarshallCacheEvictionIndex = 0; 495 } 496 497 } 498 } 499 500 public Short getMarshallCacheIndex(DataStructure o) { 501 return marshallCacheMap.get(o); 502 } 503 504 public Short addToMarshallCache(DataStructure o) { 505 short i = nextMarshallCacheIndex++; 506 if (nextMarshallCacheIndex >= marshallCache.length) { 507 nextMarshallCacheIndex = 0; 508 } 509 510 // We can only cache that item if there is space left. 511 if (marshallCacheMap.size() < marshallCache.length) { 512 marshallCache[i] = o; 513 Short index = new Short(i); 514 marshallCacheMap.put(o, index); 515 return index; 516 } else { 517 // Use -1 to indicate that the value was not cached due to cache 518 // being full. 519 return new Short((short)-1); 520 } 521 } 522 523 public void setInUnmarshallCache(short index, DataStructure o) { 524 525 // There was no space left in the cache, so we can't 526 // put this in the cache. 527 if (index == -1) { 528 return; 529 } 530 531 unmarshallCache[index] = o; 532 } 533 534 public DataStructure getFromUnmarshallCache(short index) { 535 return unmarshallCache[index]; 536 } 537 538 public void setStackTraceEnabled(boolean b) { 539 stackTraceEnabled = b; 540 } 541 542 public boolean isStackTraceEnabled() { 543 return stackTraceEnabled; 544 } 545 546 public boolean isTcpNoDelayEnabled() { 547 return tcpNoDelayEnabled; 548 } 549 550 public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) { 551 this.tcpNoDelayEnabled = tcpNoDelayEnabled; 552 } 553 554 public boolean isCacheEnabled() { 555 return cacheEnabled; 556 } 557 558 public void setCacheEnabled(boolean cacheEnabled) { 559 if(cacheEnabled){ 560 marshallCache = new DataStructure[MARSHAL_CACHE_SIZE]; 561 unmarshallCache = new DataStructure[MARSHAL_CACHE_SIZE]; 562 } 563 this.cacheEnabled = cacheEnabled; 564 } 565 566 public boolean isTightEncodingEnabled() { 567 return tightEncodingEnabled; 568 } 569 570 public void setTightEncodingEnabled(boolean tightEncodingEnabled) { 571 this.tightEncodingEnabled = tightEncodingEnabled; 572 } 573 574 public boolean isSizePrefixDisabled() { 575 return sizePrefixDisabled; 576 } 577 578 public void setSizePrefixDisabled(boolean prefixPacketSize) { 579 this.sizePrefixDisabled = prefixPacketSize; 580 } 581 582 public void setPreferedWireFormatInfo(WireFormatInfo info) { 583 this.preferedWireFormatInfo = info; 584 } 585 586 public WireFormatInfo getPreferedWireFormatInfo() { 587 return preferedWireFormatInfo; 588 } 589 590 public long getMaxFrameSize() { 591 return maxFrameSize; 592 } 593 594 public void setMaxFrameSize(long maxFrameSize) { 595 this.maxFrameSize = maxFrameSize; 596 } 597 598 public void renegotiateWireFormat(WireFormatInfo info) throws IOException { 599 600 if (preferedWireFormatInfo == null) { 601 throw new IllegalStateException("Wireformat cannot not be renegotiated."); 602 } 603 604 this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion())); 605 info.setVersion(this.getVersion()); 606 607 this.setMaxFrameSize(min(preferedWireFormatInfo.getMaxFrameSize(), info.getMaxFrameSize())); 608 info.setMaxFrameSize(this.getMaxFrameSize()); 609 610 this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled(); 611 info.setStackTraceEnabled(this.stackTraceEnabled); 612 613 this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled(); 614 info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled); 615 616 this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled(); 617 info.setCacheEnabled(this.cacheEnabled); 618 619 this.tightEncodingEnabled = info.isTightEncodingEnabled() 620 && preferedWireFormatInfo.isTightEncodingEnabled(); 621 info.setTightEncodingEnabled(this.tightEncodingEnabled); 622 623 this.sizePrefixDisabled = info.isSizePrefixDisabled() 624 && preferedWireFormatInfo.isSizePrefixDisabled(); 625 info.setSizePrefixDisabled(this.sizePrefixDisabled); 626 627 if (cacheEnabled) { 628 629 int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize()); 630 info.setCacheSize(size); 631 632 if (size == 0) { 633 size = MARSHAL_CACHE_SIZE; 634 } 635 636 marshallCache = new DataStructure[size]; 637 unmarshallCache = new DataStructure[size]; 638 nextMarshallCacheIndex = 0; 639 nextMarshallCacheEvictionIndex = 0; 640 marshallCacheMap = new HashMap<DataStructure, Short>(); 641 } else { 642 marshallCache = null; 643 unmarshallCache = null; 644 nextMarshallCacheIndex = 0; 645 nextMarshallCacheEvictionIndex = 0; 646 marshallCacheMap = null; 647 } 648 649 } 650 651 protected int min(int version1, int version2) { 652 if (version1 < version2 && version1 > 0 || version2 <= 0) { 653 return version1; 654 } 655 return version2; 656 } 657 658 protected long min(long version1, long version2) { 659 if (version1 < version2 && version1 > 0 || version2 <= 0) { 660 return version1; 661 } 662 return version2; 663 } 664}