001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.console.command.store; 018 019import org.apache.activemq.console.command.store.proto.*; 020import org.apache.activemq.console.command.store.tar.TarEntry; 021import org.apache.activemq.console.command.store.tar.TarOutputStream; 022import org.fusesource.hawtbuf.AsciiBuffer; 023import org.fusesource.hawtbuf.Buffer; 024import org.fusesource.hawtbuf.proto.MessageBuffer; 025 026import java.io.IOException; 027import java.io.OutputStream; 028import java.util.zip.GZIPOutputStream; 029 030/** 031 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 032 */ 033public class ExportStreamManager { 034 035 private final OutputStream target; 036 private final int version; 037 TarOutputStream stream; 038 039 ExportStreamManager(OutputStream target, int version) throws IOException { 040 this.target = target; 041 this.version = version; 042 stream = new TarOutputStream(new GZIPOutputStream(target)); 043 store("ver", new AsciiBuffer(""+version)); 044 } 045 046 047 long seq = 0; 048 049 public void finish() throws IOException { 050 stream.close(); 051 } 052 053 private void store(String ext, Buffer value) throws IOException { 054 TarEntry entry = new TarEntry(seq + "." + ext); 055 seq += 1; 056 entry.setSize(value.length()); 057 stream.putNextEntry(entry); 058 value.writeTo(stream); 059 stream.closeEntry(); 060 } 061 062 private void store(String ext, MessageBuffer<?,?> value) throws IOException { 063 TarEntry entry = new TarEntry(seq + "." + ext); 064 seq += 1; 065 entry.setSize(value.serializedSizeFramed()); 066 stream.putNextEntry(entry); 067 value.writeFramed(stream); 068 stream.closeEntry(); 069 } 070 071 072 public void store_queue(QueuePB.Getter value) throws IOException { 073 store("que", value.freeze()); 074 } 075 public void store_queue_entry(QueueEntryPB.Getter value) throws IOException { 076 store("qen", value.freeze()); 077 } 078 public void store_message(MessagePB.Getter value) throws IOException { 079 store("msg", value.freeze()); 080 } 081 public void store_map_entry(MapEntryPB.Getter value) throws IOException { 082 store("map", value.freeze()); 083 } 084 085}