001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.console.command.store;
018
019import org.apache.activemq.console.command.store.proto.*;
020import org.apache.activemq.console.command.store.tar.TarEntry;
021import org.apache.activemq.console.command.store.tar.TarOutputStream;
022import org.fusesource.hawtbuf.AsciiBuffer;
023import org.fusesource.hawtbuf.Buffer;
024import org.fusesource.hawtbuf.proto.MessageBuffer;
025
026import java.io.IOException;
027import java.io.OutputStream;
028import java.util.zip.GZIPOutputStream;
029
030/**
031 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
032 */
033public class ExportStreamManager {
034
035    private final OutputStream target;
036    private final int version;
037    TarOutputStream stream;
038
039    ExportStreamManager(OutputStream target, int version) throws IOException {
040        this.target = target;
041        this.version = version;
042        stream = new TarOutputStream(new GZIPOutputStream(target));
043        store("ver", new AsciiBuffer(""+version));
044    }
045
046
047    long seq = 0;
048
049    public void finish() throws IOException {
050        stream.close();
051    }
052
053    private void store(String ext, Buffer value) throws IOException {
054        TarEntry entry = new TarEntry(seq + "." + ext);
055        seq += 1;
056        entry.setSize(value.length());
057        stream.putNextEntry(entry);
058        value.writeTo(stream);
059        stream.closeEntry();
060    }
061
062    private void store(String ext, MessageBuffer<?,?> value) throws IOException {
063        TarEntry entry = new TarEntry(seq + "." + ext);
064      seq += 1;
065      entry.setSize(value.serializedSizeFramed());
066      stream.putNextEntry(entry);
067      value.writeFramed(stream);
068      stream.closeEntry();
069    }
070
071
072    public void store_queue(QueuePB.Getter value) throws IOException {
073      store("que", value.freeze());
074    }
075    public void store_queue_entry(QueueEntryPB.Getter value) throws IOException {
076      store("qen", value.freeze());
077    }
078    public void store_message(MessagePB.Getter value) throws IOException {
079      store("msg", value.freeze());
080    }
081    public void store_map_entry(MapEntryPB.Getter value) throws IOException {
082      store("map", value.freeze());
083    }
084
085}