001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.converter.stream; 018 019 import java.io.ByteArrayInputStream; 020 import java.io.ByteArrayOutputStream; 021 import java.io.File; 022 import java.io.FileNotFoundException; 023 import java.io.FileOutputStream; 024 import java.io.IOException; 025 import java.io.InputStream; 026 import java.io.OutputStream; 027 028 import org.apache.camel.Exchange; 029 import org.apache.camel.StreamCache; 030 import org.apache.camel.support.SynchronizationAdapter; 031 import org.apache.camel.util.FileUtil; 032 import org.apache.camel.util.IOHelper; 033 import org.slf4j.Logger; 034 import org.slf4j.LoggerFactory; 035 036 /** 037 * This output stream will store the content into a File if the stream context size is exceed the 038 * THRESHOLD which's default value is 64K. The temp file will store in the temp directory, you 039 * can configure it by setting the TEMP_DIR property. If you don't set the TEMP_DIR property, 040 * it will choose the directory which is set by the system property of "java.io.tmpdir". 041 * You can get a cached input stream of this stream. The temp file which is created with this 042 * output stream will be deleted when you close this output stream or the all cached 043 * fileInputStream is closed after the exchange is completed. 044 */ 045 public class CachedOutputStream extends OutputStream { 046 public static final String THRESHOLD = "CamelCachedOutputStreamThreshold"; 047 public static final String BUFFER_SIZE = "CamelCachedOutputStreamBufferSize"; 048 public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory"; 049 private static final transient Logger LOG = LoggerFactory.getLogger(CachedOutputStream.class); 050 051 private OutputStream currentStream; 052 private boolean inMemory = true; 053 private int totalLength; 054 private File tempFile; 055 private FileInputStreamCache fileInputStreamCache; 056 057 private long threshold = 64 * 1024; 058 private int bufferSize = 2 * 1024; 059 private File outputDir; 060 061 public CachedOutputStream(Exchange exchange) { 062 this(exchange, true); 063 } 064 065 public CachedOutputStream(Exchange exchange, boolean closedOnCompletion) { 066 String bufferSize = exchange.getContext().getProperties().get(BUFFER_SIZE); 067 String hold = exchange.getContext().getProperties().get(THRESHOLD); 068 String dir = exchange.getContext().getProperties().get(TEMP_DIR); 069 070 if (bufferSize != null) { 071 this.bufferSize = exchange.getContext().getTypeConverter().convertTo(Integer.class, bufferSize); 072 } 073 if (hold != null) { 074 this.threshold = exchange.getContext().getTypeConverter().convertTo(Long.class, hold); 075 } 076 if (dir != null) { 077 this.outputDir = exchange.getContext().getTypeConverter().convertTo(File.class, dir); 078 } 079 080 currentStream = new ByteArrayOutputStream(this.bufferSize); 081 082 if (closedOnCompletion) { 083 // add on completion so we can cleanup after the exchange is done such as deleting temporary files 084 exchange.addOnCompletion(new SynchronizationAdapter() { 085 @Override 086 public void onDone(Exchange exchange) { 087 try { 088 if (fileInputStreamCache != null) { 089 fileInputStreamCache.close(); 090 } 091 close(); 092 } catch (Exception e) { 093 LOG.warn("Error deleting temporary cache file: " + tempFile, e); 094 } 095 } 096 097 @Override 098 public String toString() { 099 return "OnCompletion[CachedOutputStream]"; 100 } 101 }); 102 } 103 } 104 105 public void flush() throws IOException { 106 currentStream.flush(); 107 } 108 109 public void close() throws IOException { 110 currentStream.close(); 111 cleanUpTempFile(); 112 } 113 114 public boolean equals(Object obj) { 115 return currentStream.equals(obj); 116 } 117 118 public int hashCode() { 119 return currentStream.hashCode(); 120 } 121 122 public String toString() { 123 return "CachedOutputStream[size: " + totalLength + "]"; 124 } 125 126 public void write(byte[] b, int off, int len) throws IOException { 127 this.totalLength += len; 128 if (threshold > 0 && inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) { 129 pageToFileStream(); 130 } 131 currentStream.write(b, off, len); 132 } 133 134 public void write(byte[] b) throws IOException { 135 this.totalLength += b.length; 136 if (threshold > 0 && inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) { 137 pageToFileStream(); 138 } 139 currentStream.write(b); 140 } 141 142 public void write(int b) throws IOException { 143 this.totalLength++; 144 if (threshold > 0 && inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) { 145 pageToFileStream(); 146 } 147 currentStream.write(b); 148 } 149 150 public InputStream getInputStream() throws IOException { 151 flush(); 152 153 if (inMemory) { 154 if (currentStream instanceof ByteArrayOutputStream) { 155 return new ByteArrayInputStream(((ByteArrayOutputStream) currentStream).toByteArray()); 156 } else { 157 throw new IllegalStateException("CurrentStream should be an instance of ByteArrayOutputStream but is: " + currentStream.getClass().getName()); 158 } 159 } else { 160 try { 161 if (fileInputStreamCache == null) { 162 fileInputStreamCache = new FileInputStreamCache(tempFile); 163 } 164 return fileInputStreamCache; 165 } catch (FileNotFoundException e) { 166 throw new IOException("Cached file " + tempFile + " not found", e); 167 } 168 } 169 } 170 171 public InputStream getWrappedInputStream() throws IOException { 172 // The WrappedInputStream will close the CachedOuputStream when it is closed 173 return new WrappedInputStream(this, getInputStream()); 174 } 175 176 public StreamCache getStreamCache() throws IOException { 177 flush(); 178 179 if (inMemory) { 180 if (currentStream instanceof ByteArrayOutputStream) { 181 return new InputStreamCache(((ByteArrayOutputStream) currentStream).toByteArray()); 182 } else { 183 throw new IllegalStateException("CurrentStream should be an instance of ByteArrayOutputStream but is: " + currentStream.getClass().getName()); 184 } 185 } else { 186 try { 187 if (fileInputStreamCache == null) { 188 fileInputStreamCache = new FileInputStreamCache(tempFile); 189 } 190 return fileInputStreamCache; 191 } catch (FileNotFoundException e) { 192 throw new IOException("Cached file " + tempFile + " not found", e); 193 } 194 } 195 } 196 197 private void cleanUpTempFile() { 198 // cleanup temporary file 199 if (tempFile != null) { 200 FileUtil.deleteFile(tempFile); 201 tempFile = null; 202 } 203 } 204 205 private void pageToFileStream() throws IOException { 206 flush(); 207 208 ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream; 209 if (outputDir == null) { 210 tempFile = FileUtil.createTempFile("cos", ".tmp"); 211 } else { 212 tempFile = FileUtil.createTempFile("cos", ".tmp", outputDir); 213 } 214 215 LOG.trace("Creating temporary stream cache file: {}", tempFile); 216 217 try { 218 currentStream = IOHelper.buffered(new FileOutputStream(tempFile)); 219 bout.writeTo(currentStream); 220 } finally { 221 // ensure flag is flipped to file based 222 inMemory = false; 223 } 224 } 225 226 public int getBufferSize() { 227 return bufferSize; 228 } 229 230 // This class will close the CachedOutputStream when it is closed 231 private static class WrappedInputStream extends InputStream { 232 private CachedOutputStream cachedOutputStream; 233 private InputStream inputStream; 234 235 WrappedInputStream(CachedOutputStream cos, InputStream is) { 236 cachedOutputStream = cos; 237 inputStream = is; 238 } 239 240 @Override 241 public int read() throws IOException { 242 return inputStream.read(); 243 } 244 245 @Override 246 public int available() throws IOException { 247 return inputStream.available(); 248 } 249 250 @Override 251 public void reset() throws IOException { 252 inputStream.reset(); 253 } 254 255 @Override 256 public void close() throws IOException { 257 inputStream.close(); 258 cachedOutputStream.close(); 259 } 260 } 261 262 }