001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.openwire;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.IOException;
022import java.lang.reflect.Method;
023import java.util.HashMap;
024import java.util.Map;
025
026import org.apache.activemq.command.CommandTypes;
027import org.apache.activemq.command.DataStructure;
028import org.apache.activemq.command.WireFormatInfo;
029import org.apache.activemq.util.ByteSequence;
030import org.apache.activemq.util.ByteSequenceData;
031import org.apache.activemq.util.DataByteArrayInputStream;
032import org.apache.activemq.util.DataByteArrayOutputStream;
033import org.apache.activemq.util.IOExceptionSupport;
034import org.apache.activemq.wireformat.WireFormat;
035
036/**
037 * 
038 * 
039 */
040public final class OpenWireFormat implements WireFormat {
041
042    public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_STORE_VERSION;
043    public static final int DEFAULT_WIRE_VERSION = CommandTypes.PROTOCOL_VERSION;
044    public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
045
046    static final byte NULL_TYPE = CommandTypes.NULL;
047    private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
048    private static final int MARSHAL_CACHE_FREE_SPACE = 100;
049
050    private DataStreamMarshaller dataMarshallers[];
051    private int version;
052    private boolean stackTraceEnabled;
053    private boolean tcpNoDelayEnabled;
054    private boolean cacheEnabled;
055    private boolean tightEncodingEnabled;
056    private boolean sizePrefixDisabled;
057    private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
058
059    // The following fields are used for value caching
060    private short nextMarshallCacheIndex;
061    private short nextMarshallCacheEvictionIndex;
062    private Map<DataStructure, Short> marshallCacheMap = new HashMap<DataStructure, Short>();
063    private DataStructure marshallCache[] = null;
064    private DataStructure unmarshallCache[] = null;
065    private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
066    private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
067    private WireFormatInfo preferedWireFormatInfo;
068    
069    public OpenWireFormat() {
070        this(DEFAULT_VERSION);
071    }
072
073    public OpenWireFormat(int i) {
074        setVersion(i);
075    }
076
077    public int hashCode() {
078        return version ^ (cacheEnabled ? 0x10000000 : 0x20000000)
079               ^ (stackTraceEnabled ? 0x01000000 : 0x02000000)
080               ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000)
081               ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000);
082    }
083
084    public OpenWireFormat copy() {
085        OpenWireFormat answer = new OpenWireFormat(version);
086        answer.stackTraceEnabled = stackTraceEnabled;
087        answer.tcpNoDelayEnabled = tcpNoDelayEnabled;
088        answer.cacheEnabled = cacheEnabled;
089        answer.tightEncodingEnabled = tightEncodingEnabled;
090        answer.sizePrefixDisabled = sizePrefixDisabled;
091        answer.preferedWireFormatInfo = preferedWireFormatInfo;
092        return answer;
093    }
094
095    public boolean equals(Object object) {
096        if (object == null) {
097            return false;
098        }
099        OpenWireFormat o = (OpenWireFormat)object;
100        return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled
101               && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled
102               && o.sizePrefixDisabled == sizePrefixDisabled;
103    }
104
105
106    public String toString() {
107        return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
108               + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled +  ", maxFrameSize=" + maxFrameSize + "}";
109        // return "OpenWireFormat{id="+id+",
110        // tightEncodingEnabled="+tightEncodingEnabled+"}";
111    }
112
113    public int getVersion() {
114        return version;
115    }
116
117    public synchronized ByteSequence marshal(Object command) throws IOException {
118
119        if (cacheEnabled) {
120            runMarshallCacheEvictionSweep();
121        }
122
123        ByteSequence sequence = null;
124        int size = 1;
125        if (command != null) {
126
127            DataStructure c = (DataStructure)command;
128            byte type = c.getDataStructureType();
129            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
130            if (dsm == null) {
131                throw new IOException("Unknown data type: " + type);
132            }
133            if (tightEncodingEnabled) {
134
135                BooleanStream bs = new BooleanStream();
136                size += dsm.tightMarshal1(this, c, bs);
137                size += bs.marshalledSize();
138
139                bytesOut.restart(size);
140                if (!sizePrefixDisabled) {
141                    bytesOut.writeInt(size);
142                }
143                bytesOut.writeByte(type);
144                bs.marshal(bytesOut);
145                dsm.tightMarshal2(this, c, bytesOut, bs);
146                sequence = bytesOut.toByteSequence();
147
148            } else {
149                bytesOut.restart();
150                if (!sizePrefixDisabled) {
151                    bytesOut.writeInt(0); // we don't know the final size
152                    // yet but write this here for
153                    // now.
154                }
155                bytesOut.writeByte(type);
156                dsm.looseMarshal(this, c, bytesOut);
157                sequence = bytesOut.toByteSequence();
158
159                if (!sizePrefixDisabled) {
160                    size = sequence.getLength() - 4;
161                    int pos = sequence.offset;
162                    ByteSequenceData.writeIntBig(sequence, size);
163                    sequence.offset = pos;
164                }
165            }
166
167        } else {
168            bytesOut.restart(5);
169            bytesOut.writeInt(size);
170            bytesOut.writeByte(NULL_TYPE);
171            sequence = bytesOut.toByteSequence();
172        }
173
174        return sequence;
175    }
176
177    public synchronized Object unmarshal(ByteSequence sequence) throws IOException {
178        bytesIn.restart(sequence);
179        // DataInputStream dis = new DataInputStream(new
180        // ByteArrayInputStream(sequence));
181
182        if (!sizePrefixDisabled) {
183            int size = bytesIn.readInt();
184            if (sequence.getLength() - 4 != size) {
185                // throw new IOException("Packet size does not match marshaled
186                // size");
187            }
188
189            if (size > maxFrameSize) {
190                throw IOExceptionSupport.createFrameSizeException(size, maxFrameSize);
191            }
192        }
193
194        Object command = doUnmarshal(bytesIn);
195        // if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) {
196        // ((MarshallAware) command).setCachedMarshalledForm(this, sequence);
197        // }
198        return command;
199    }
200
201    public synchronized void marshal(Object o, DataOutput dataOut) throws IOException {
202
203        if (cacheEnabled) {
204            runMarshallCacheEvictionSweep();
205        }
206
207        int size = 1;
208        if (o != null) {
209
210            DataStructure c = (DataStructure)o;
211            byte type = c.getDataStructureType();
212            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
213            if (dsm == null) {
214                throw new IOException("Unknown data type: " + type);
215            }
216            if (tightEncodingEnabled) {
217                BooleanStream bs = new BooleanStream();
218                size += dsm.tightMarshal1(this, c, bs);
219                size += bs.marshalledSize();
220
221                if (!sizePrefixDisabled) {
222                    dataOut.writeInt(size);
223                }
224
225                dataOut.writeByte(type);
226                bs.marshal(dataOut);
227                dsm.tightMarshal2(this, c, dataOut, bs);
228
229            } else {
230                DataOutput looseOut = dataOut;
231
232                if (!sizePrefixDisabled) {
233                    bytesOut.restart();
234                    looseOut = bytesOut;
235                }
236
237                looseOut.writeByte(type);
238                dsm.looseMarshal(this, c, looseOut);
239
240                if (!sizePrefixDisabled) {
241                    ByteSequence sequence = bytesOut.toByteSequence();
242                    dataOut.writeInt(sequence.getLength());
243                    dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
244                }
245
246            }
247
248        } else {
249            if (!sizePrefixDisabled) {
250                dataOut.writeInt(size);
251            }
252            dataOut.writeByte(NULL_TYPE);
253        }
254    }
255
256    public Object unmarshal(DataInput dis) throws IOException {
257        DataInput dataIn = dis;
258        if (!sizePrefixDisabled) {
259            int size = dis.readInt();
260            if (size > maxFrameSize) {
261                throw IOExceptionSupport.createFrameSizeException(size, maxFrameSize);
262            }
263            // int size = dis.readInt();
264            // byte[] data = new byte[size];
265            // dis.readFully(data);
266            // bytesIn.restart(data);
267            // dataIn = bytesIn;
268        }
269        return doUnmarshal(dataIn);
270    }
271
272    /**
273     * Used by NIO or AIO transports
274     */
275    public int tightMarshal1(Object o, BooleanStream bs) throws IOException {
276        int size = 1;
277        if (o != null) {
278            DataStructure c = (DataStructure)o;
279            byte type = c.getDataStructureType();
280            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
281            if (dsm == null) {
282                throw new IOException("Unknown data type: " + type);
283            }
284
285            size += dsm.tightMarshal1(this, c, bs);
286            size += bs.marshalledSize();
287        }
288        return size;
289    }
290
291    /**
292     * Used by NIO or AIO transports; note that the size is not written as part
293     * of this method.
294     */
295    public void tightMarshal2(Object o, DataOutput ds, BooleanStream bs) throws IOException {
296        if (cacheEnabled) {
297            runMarshallCacheEvictionSweep();
298        }
299
300        if (o != null) {
301            DataStructure c = (DataStructure)o;
302            byte type = c.getDataStructureType();
303            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
304            if (dsm == null) {
305                throw new IOException("Unknown data type: " + type);
306            }
307            ds.writeByte(type);
308            bs.marshal(ds);
309            dsm.tightMarshal2(this, c, ds, bs);
310        }
311    }
312
313    /**
314     * Allows you to dynamically switch the version of the openwire protocol
315     * being used.
316     * 
317     * @param version
318     */
319    public void setVersion(int version) {
320        String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory";
321        Class mfClass;
322        try {
323            mfClass = Class.forName(mfName, false, getClass().getClassLoader());
324        } catch (ClassNotFoundException e) {
325            throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: " + version
326                                                                         + ", could not load " + mfName)
327                .initCause(e);
328        }
329        try {
330            Method method = mfClass.getMethod("createMarshallerMap", new Class[] {OpenWireFormat.class});
331            dataMarshallers = (DataStreamMarshaller[])method.invoke(null, new Object[] {this});
332        } catch (Throwable e) {
333            throw (IllegalArgumentException)new IllegalArgumentException(
334                                                                         "Invalid version: "
335                                                                             + version
336                                                                             + ", "
337                                                                             + mfName
338                                                                             + " does not properly implement the createMarshallerMap method.")
339                .initCause(e);
340        }
341        this.version = version;
342    }
343
344    public Object doUnmarshal(DataInput dis) throws IOException {
345        byte dataType = dis.readByte();
346        if (dataType != NULL_TYPE) {
347            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
348            if (dsm == null) {
349                throw new IOException("Unknown data type: " + dataType);
350            }
351            Object data = dsm.createObject();
352            if (this.tightEncodingEnabled) {
353                BooleanStream bs = new BooleanStream();
354                bs.unmarshal(dis);
355                dsm.tightUnmarshal(this, data, dis, bs);
356            } else {
357                dsm.looseUnmarshal(this, data, dis);
358            }
359            return data;
360        } else {
361            return null;
362        }
363    }
364
365    // public void debug(String msg) {
366    // String t = (Thread.currentThread().getName()+" ").substring(0, 40);
367    // System.out.println(t+": "+msg);
368    // }
369    public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException {
370        bs.writeBoolean(o != null);
371        if (o == null) {
372            return 0;
373        }
374
375        if (o.isMarshallAware()) {
376            // MarshallAware ma = (MarshallAware)o;
377            ByteSequence sequence = null;
378            // sequence=ma.getCachedMarshalledForm(this);
379            bs.writeBoolean(sequence != null);
380            if (sequence != null) {
381                return 1 + sequence.getLength();
382            }
383        }
384
385        byte type = o.getDataStructureType();
386        DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
387        if (dsm == null) {
388            throw new IOException("Unknown data type: " + type);
389        }
390        return 1 + dsm.tightMarshal1(this, o, bs);
391    }
392
393    public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs)
394        throws IOException {
395        if (!bs.readBoolean()) {
396            return;
397        }
398
399        byte type = o.getDataStructureType();
400        ds.writeByte(type);
401
402        if (o.isMarshallAware() && bs.readBoolean()) {
403
404            // We should not be doing any caching
405            throw new IOException("Corrupted stream");
406            // MarshallAware ma = (MarshallAware) o;
407            // ByteSequence sequence=ma.getCachedMarshalledForm(this);
408            // ds.write(sequence.getData(), sequence.getOffset(),
409            // sequence.getLength());
410
411        } else {
412
413            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
414            if (dsm == null) {
415                throw new IOException("Unknown data type: " + type);
416            }
417            dsm.tightMarshal2(this, o, ds, bs);
418
419        }
420    }
421
422    public DataStructure tightUnmarshalNestedObject(DataInput dis, BooleanStream bs) throws IOException {
423        if (bs.readBoolean()) {
424
425            byte dataType = dis.readByte();
426            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
427            if (dsm == null) {
428                throw new IOException("Unknown data type: " + dataType);
429            }
430            DataStructure data = dsm.createObject();
431
432            if (data.isMarshallAware() && bs.readBoolean()) {
433
434                dis.readInt();
435                dis.readByte();
436
437                BooleanStream bs2 = new BooleanStream();
438                bs2.unmarshal(dis);
439                dsm.tightUnmarshal(this, data, dis, bs2);
440
441                // TODO: extract the sequence from the dis and associate it.
442                // MarshallAware ma = (MarshallAware)data
443                // ma.setCachedMarshalledForm(this, sequence);
444
445            } else {
446                dsm.tightUnmarshal(this, data, dis, bs);
447            }
448
449            return data;
450        } else {
451            return null;
452        }
453    }
454
455    public DataStructure looseUnmarshalNestedObject(DataInput dis) throws IOException {
456        if (dis.readBoolean()) {
457
458            byte dataType = dis.readByte();
459            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
460            if (dsm == null) {
461                throw new IOException("Unknown data type: " + dataType);
462            }
463            DataStructure data = dsm.createObject();
464            dsm.looseUnmarshal(this, data, dis);
465            return data;
466
467        } else {
468            return null;
469        }
470    }
471
472    public void looseMarshalNestedObject(DataStructure o, DataOutput dataOut) throws IOException {
473        dataOut.writeBoolean(o != null);
474        if (o != null) {
475            byte type = o.getDataStructureType();
476            dataOut.writeByte(type);
477            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
478            if (dsm == null) {
479                throw new IOException("Unknown data type: " + type);
480            }
481            dsm.looseMarshal(this, o, dataOut);
482        }
483    }
484
485    public void runMarshallCacheEvictionSweep() {
486        // Do we need to start evicting??
487        while (marshallCacheMap.size() > marshallCache.length - MARSHAL_CACHE_FREE_SPACE) {
488
489            marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]);
490            marshallCache[nextMarshallCacheEvictionIndex] = null;
491
492            nextMarshallCacheEvictionIndex++;
493            if (nextMarshallCacheEvictionIndex >= marshallCache.length) {
494                nextMarshallCacheEvictionIndex = 0;
495            }
496
497        }
498    }
499
500    public Short getMarshallCacheIndex(DataStructure o) {
501        return marshallCacheMap.get(o);
502    }
503
504    public Short addToMarshallCache(DataStructure o) {
505        short i = nextMarshallCacheIndex++;
506        if (nextMarshallCacheIndex >= marshallCache.length) {
507            nextMarshallCacheIndex = 0;
508        }
509
510        // We can only cache that item if there is space left.
511        if (marshallCacheMap.size() < marshallCache.length) {
512            marshallCache[i] = o;
513            Short index = new Short(i);
514            marshallCacheMap.put(o, index);
515            return index;
516        } else {
517            // Use -1 to indicate that the value was not cached due to cache
518            // being full.
519            return new Short((short)-1);
520        }
521    }
522
523    public void setInUnmarshallCache(short index, DataStructure o) {
524
525        // There was no space left in the cache, so we can't
526        // put this in the cache.
527        if (index == -1) {
528            return;
529        }
530
531        unmarshallCache[index] = o;
532    }
533
534    public DataStructure getFromUnmarshallCache(short index) {
535        return unmarshallCache[index];
536    }
537
538    public void setStackTraceEnabled(boolean b) {
539        stackTraceEnabled = b;
540    }
541
542    public boolean isStackTraceEnabled() {
543        return stackTraceEnabled;
544    }
545
546    public boolean isTcpNoDelayEnabled() {
547        return tcpNoDelayEnabled;
548    }
549
550    public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) {
551        this.tcpNoDelayEnabled = tcpNoDelayEnabled;
552    }
553
554    public boolean isCacheEnabled() {
555        return cacheEnabled;
556    }
557
558    public void setCacheEnabled(boolean cacheEnabled) {
559        if(cacheEnabled){
560            marshallCache = new DataStructure[MARSHAL_CACHE_SIZE];
561            unmarshallCache = new DataStructure[MARSHAL_CACHE_SIZE];
562        }
563        this.cacheEnabled = cacheEnabled;
564    }
565
566    public boolean isTightEncodingEnabled() {
567        return tightEncodingEnabled;
568    }
569
570    public void setTightEncodingEnabled(boolean tightEncodingEnabled) {
571        this.tightEncodingEnabled = tightEncodingEnabled;
572    }
573
574    public boolean isSizePrefixDisabled() {
575        return sizePrefixDisabled;
576    }
577
578    public void setSizePrefixDisabled(boolean prefixPacketSize) {
579        this.sizePrefixDisabled = prefixPacketSize;
580    }
581
582    public void setPreferedWireFormatInfo(WireFormatInfo info) {
583        this.preferedWireFormatInfo = info;
584    }
585
586    public WireFormatInfo getPreferedWireFormatInfo() {
587        return preferedWireFormatInfo;
588    }
589
590    public long getMaxFrameSize() {
591        return maxFrameSize;
592    }
593
594    public void setMaxFrameSize(long maxFrameSize) {
595        this.maxFrameSize = maxFrameSize;
596    }
597
598    public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
599
600        if (preferedWireFormatInfo == null) {
601            throw new IllegalStateException("Wireformat cannot not be renegotiated.");
602        }
603
604        this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()));
605        info.setVersion(this.getVersion());
606
607        this.setMaxFrameSize(min(preferedWireFormatInfo.getMaxFrameSize(), info.getMaxFrameSize()));
608        info.setMaxFrameSize(this.getMaxFrameSize());
609
610        this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
611        info.setStackTraceEnabled(this.stackTraceEnabled);
612
613        this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled();
614        info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled);
615
616        this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled();
617        info.setCacheEnabled(this.cacheEnabled);
618
619        this.tightEncodingEnabled = info.isTightEncodingEnabled()
620                                    && preferedWireFormatInfo.isTightEncodingEnabled();
621        info.setTightEncodingEnabled(this.tightEncodingEnabled);
622
623        this.sizePrefixDisabled = info.isSizePrefixDisabled()
624                                  && preferedWireFormatInfo.isSizePrefixDisabled();
625        info.setSizePrefixDisabled(this.sizePrefixDisabled);
626
627        if (cacheEnabled) {
628
629            int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize());
630            info.setCacheSize(size);
631
632            if (size == 0) {
633                size = MARSHAL_CACHE_SIZE;
634            }
635
636            marshallCache = new DataStructure[size];
637            unmarshallCache = new DataStructure[size];
638            nextMarshallCacheIndex = 0;
639            nextMarshallCacheEvictionIndex = 0;
640            marshallCacheMap = new HashMap<DataStructure, Short>();
641        } else {
642            marshallCache = null;
643            unmarshallCache = null;
644            nextMarshallCacheIndex = 0;
645            nextMarshallCacheEvictionIndex = 0;
646            marshallCacheMap = null;
647        }
648
649    }
650
651    protected int min(int version1, int version2) {
652        if (version1 < version2 && version1 > 0 || version2 <= 0) {
653            return version1;
654        }
655        return version2;
656    }
657
658    protected long min(long version1, long version2) {
659        if (version1 < version2 && version1 > 0 || version2 <= 0) {
660            return version1;
661        }
662        return version2;
663    }
664}