001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.converter.stream;
018    
019    import java.io.ByteArrayInputStream;
020    import java.io.ByteArrayOutputStream;
021    import java.io.File;
022    import java.io.FileNotFoundException;
023    import java.io.FileOutputStream;
024    import java.io.IOException;
025    import java.io.InputStream;
026    import java.io.OutputStream;
027    
028    import org.apache.camel.Exchange;
029    import org.apache.camel.StreamCache;
030    import org.apache.camel.support.SynchronizationAdapter;
031    import org.apache.camel.util.FileUtil;
032    import org.apache.camel.util.IOHelper;
033    import org.slf4j.Logger;
034    import org.slf4j.LoggerFactory;
035    
036    /**
037     * This output stream will store the content into a File if the stream context size is exceed the
038     * THRESHOLD which's default value is 64K. The temp file will store in the temp directory, you 
039     * can configure it by setting the TEMP_DIR property. If you don't set the TEMP_DIR property,
040     * it will choose the directory which is set by the system property of "java.io.tmpdir".
041     * You can get a cached input stream of this stream. The temp file which is created with this 
042     * output stream will be deleted when you close this output stream or the all cached 
043     * fileInputStream is closed after the exchange is completed.
044     */
045    public class CachedOutputStream extends OutputStream {
046        public static final String THRESHOLD = "CamelCachedOutputStreamThreshold";
047        public static final String BUFFER_SIZE = "CamelCachedOutputStreamBufferSize";
048        public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory";
049        private static final transient Logger LOG = LoggerFactory.getLogger(CachedOutputStream.class);
050        
051        private OutputStream currentStream;
052        private boolean inMemory = true;
053        private int totalLength;
054        private File tempFile;
055        private FileInputStreamCache fileInputStreamCache;
056    
057        private long threshold = 64 * 1024;
058        private int bufferSize = 2 * 1024;
059        private File outputDir;
060        
061        public CachedOutputStream(Exchange exchange) {
062            this(exchange, true);
063        }
064    
065        public CachedOutputStream(Exchange exchange, boolean closedOnCompletion) {
066            String bufferSize = exchange.getContext().getProperties().get(BUFFER_SIZE);
067            String hold = exchange.getContext().getProperties().get(THRESHOLD);
068            String dir = exchange.getContext().getProperties().get(TEMP_DIR);
069            
070            if (bufferSize != null) {
071                this.bufferSize = exchange.getContext().getTypeConverter().convertTo(Integer.class, bufferSize);
072            }
073            if (hold != null) {
074                this.threshold = exchange.getContext().getTypeConverter().convertTo(Long.class, hold);
075            }
076            if (dir != null) {
077                this.outputDir = exchange.getContext().getTypeConverter().convertTo(File.class, dir);
078            }
079           
080            currentStream = new ByteArrayOutputStream(this.bufferSize);
081            
082            if (closedOnCompletion) {
083                // add on completion so we can cleanup after the exchange is done such as deleting temporary files
084                exchange.addOnCompletion(new SynchronizationAdapter() {
085                    @Override
086                    public void onDone(Exchange exchange) {
087                        try {
088                            if (fileInputStreamCache != null) {
089                                fileInputStreamCache.close();
090                            }
091                            close();
092                        } catch (Exception e) {
093                            LOG.warn("Error deleting temporary cache file: " + tempFile, e);
094                        }
095                    }
096        
097                    @Override
098                    public String toString() {
099                        return "OnCompletion[CachedOutputStream]";
100                    }
101                });
102            }
103        }
104    
105        public void flush() throws IOException {
106            currentStream.flush();       
107        }
108    
109        public void close() throws IOException {
110            currentStream.close();
111            cleanUpTempFile();
112        }
113    
114        public boolean equals(Object obj) {
115            return currentStream.equals(obj);
116        }
117    
118        public int hashCode() {
119            return currentStream.hashCode();
120        }
121    
122        public String toString() {
123            return "CachedOutputStream[size: " + totalLength + "]";
124        }
125    
126        public void write(byte[] b, int off, int len) throws IOException {
127            this.totalLength += len;
128            if (threshold > 0 && inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
129                pageToFileStream();
130            }
131            currentStream.write(b, off, len);
132        }
133    
134        public void write(byte[] b) throws IOException {
135            this.totalLength += b.length;
136            if (threshold > 0 && inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
137                pageToFileStream();
138            }
139            currentStream.write(b);
140        }
141    
142        public void write(int b) throws IOException {
143            this.totalLength++;
144            if (threshold > 0 && inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
145                pageToFileStream();
146            }
147            currentStream.write(b);
148        }
149    
150        public InputStream getInputStream() throws IOException {
151            flush();
152    
153            if (inMemory) {
154                if (currentStream instanceof ByteArrayOutputStream) {
155                    return new ByteArrayInputStream(((ByteArrayOutputStream) currentStream).toByteArray());
156                } else {
157                    throw new IllegalStateException("CurrentStream should be an instance of ByteArrayOutputStream but is: " + currentStream.getClass().getName());
158                }
159            } else {
160                try {
161                    if (fileInputStreamCache == null) {
162                        fileInputStreamCache = new FileInputStreamCache(tempFile);
163                    }
164                    return fileInputStreamCache;
165                } catch (FileNotFoundException e) {
166                    throw new IOException("Cached file " + tempFile + " not found", e);
167                }
168            }
169        }    
170        
171        public InputStream getWrappedInputStream() throws IOException {
172            // The WrappedInputStream will close the CachedOuputStream when it is closed
173            return new WrappedInputStream(this, getInputStream());
174        }
175    
176        public StreamCache getStreamCache() throws IOException {
177            flush();
178    
179            if (inMemory) {
180                if (currentStream instanceof ByteArrayOutputStream) {
181                    return new InputStreamCache(((ByteArrayOutputStream) currentStream).toByteArray());
182                } else {
183                    throw new IllegalStateException("CurrentStream should be an instance of ByteArrayOutputStream but is: " + currentStream.getClass().getName());
184                }
185            } else {
186                try {
187                    if (fileInputStreamCache == null) {
188                        fileInputStreamCache = new FileInputStreamCache(tempFile);
189                    }
190                    return fileInputStreamCache;
191                } catch (FileNotFoundException e) {
192                    throw new IOException("Cached file " + tempFile + " not found", e);
193                }
194            }
195        }
196    
197        private void cleanUpTempFile() {
198            // cleanup temporary file
199            if (tempFile != null) {
200                FileUtil.deleteFile(tempFile);
201                tempFile = null;
202            }
203        }
204    
205        private void pageToFileStream() throws IOException {
206            flush();
207    
208            ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream;
209            if (outputDir == null) {
210                tempFile = FileUtil.createTempFile("cos", ".tmp");
211            } else {
212                tempFile = FileUtil.createTempFile("cos", ".tmp", outputDir);
213            }
214    
215            LOG.trace("Creating temporary stream cache file: {}", tempFile);
216    
217            try {
218                currentStream = IOHelper.buffered(new FileOutputStream(tempFile));
219                bout.writeTo(currentStream);
220            } finally {
221                // ensure flag is flipped to file based
222                inMemory = false;
223            }
224        }
225        
226        public int getBufferSize() {
227            return bufferSize;
228        }
229    
230        // This class will close the CachedOutputStream when it is closed
231        private static class WrappedInputStream extends InputStream {
232            private CachedOutputStream cachedOutputStream;
233            private InputStream inputStream;
234            
235            WrappedInputStream(CachedOutputStream cos, InputStream is) {
236                cachedOutputStream = cos;
237                inputStream = is;
238            }
239            
240            @Override
241            public int read() throws IOException {
242                return inputStream.read();
243            }
244            
245            @Override
246            public int available() throws IOException {
247                return inputStream.available();
248            }
249            
250            @Override
251            public void reset() throws IOException {
252                inputStream.reset();
253            }
254            
255            @Override
256            public void close() throws IOException {
257                inputStream.close();
258                cachedOutputStream.close();
259            }
260        }
261    
262    }