001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.file;
018    
019    import java.io.File;
020    import java.util.concurrent.locks.Lock;
021    import java.util.concurrent.locks.ReentrantLock;
022    
023    import org.apache.camel.Exchange;
024    import org.apache.camel.Expression;
025    import org.apache.camel.impl.DefaultExchange;
026    import org.apache.camel.impl.DefaultProducer;
027    import org.apache.camel.spi.Language;
028    import org.apache.camel.util.FileUtil;
029    import org.apache.camel.util.LRUCache;
030    import org.apache.camel.util.ObjectHelper;
031    import org.apache.camel.util.ServiceHelper;
032    import org.apache.camel.util.StringHelper;
033    import org.slf4j.Logger;
034    import org.slf4j.LoggerFactory;
035    
036    /**
037     * Generic file producer
038     */
039    public class GenericFileProducer<T> extends DefaultProducer {
040        protected final transient Logger log = LoggerFactory.getLogger(getClass());
041        protected final GenericFileEndpoint<T> endpoint;
042        protected GenericFileOperations<T> operations;
043        // assume writing to 100 different files concurrently at most for the same file producer
044        private final LRUCache<String, Lock> locks = new LRUCache<String, Lock>(100);
045    
046        protected GenericFileProducer(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations) {
047            super(endpoint);
048            this.endpoint = endpoint;
049            this.operations = operations;
050        }
051        
052        public String getFileSeparator() {
053            return File.separator;
054        }
055    
056        public String normalizePath(String name) {
057            return FileUtil.normalizePath(name);
058        }
059    
060        public void process(Exchange exchange) throws Exception {
061            String target = createFileName(exchange);
062    
063            // use lock for same file name to avoid concurrent writes to the same file
064            // for example when you concurrently append to the same file
065            Lock lock;
066            synchronized (locks) {
067                lock = locks.get(target);
068                if (lock == null) {
069                    lock = new ReentrantLock();
070                    locks.put(target, lock);
071                }
072            }
073    
074            lock.lock();
075            try {
076                processExchange(exchange, target);
077            } finally {
078                // do not remove as the locks cache has an upper bound
079                // this ensure the locks is appropriate reused
080                lock.unlock();
081            }
082        }
083    
084        /**
085         * Sets the operations to be used.
086         * <p/>
087         * Can be used to set a fresh operations in case of recovery attempts
088         *
089         * @param operations the operations
090         */
091        public void setOperations(GenericFileOperations<T> operations) {
092            this.operations = operations;
093        }
094    
095        /**
096         * Perform the work to process the fileExchange
097         *
098         * @param exchange fileExchange
099         * @param target   the target filename
100         * @throws Exception is thrown if some error
101         */
102        protected void processExchange(Exchange exchange, String target) throws Exception {
103            log.trace("Processing file: {} for exchange: {}", target, exchange);
104    
105            try {
106                preWriteCheck();
107    
108                // should we write to a temporary name and then afterwards rename to real target
109                boolean writeAsTempAndRename = ObjectHelper.isNotEmpty(endpoint.getTempFileName());
110                String tempTarget = null;
111                // remember if target exists to avoid checking twice
112                Boolean targetExists = null;
113                if (writeAsTempAndRename) {
114                    // compute temporary name with the temp prefix
115                    tempTarget = createTempFileName(exchange, target);
116    
117                    log.trace("Writing using tempNameFile: {}", tempTarget);
118    
119                    // cater for file exists option on the real target as
120                    // the file operations code will work on the temp file
121    
122                    // if an existing file already exists what should we do?
123                    targetExists = operations.existsFile(target);
124                    if (targetExists) {
125                        if (endpoint.getFileExist() == GenericFileExist.Ignore) {
126                            // ignore but indicate that the file was written
127                            log.trace("An existing file already exists: {}. Ignore and do not override it.", target);
128                            return;
129                        } else if (endpoint.getFileExist() == GenericFileExist.Fail) {
130                            throw new GenericFileOperationFailedException("File already exist: " + target + ". Cannot write new file.");
131                        } else if (endpoint.isEagerDeleteTargetFile() && endpoint.getFileExist() == GenericFileExist.Override) {
132                            // we override the target so we do this by deleting it so the temp file can be renamed later
133                            // with success as the existing target file have been deleted
134                            log.trace("Eagerly deleting existing file: {}", target);
135                            if (!operations.deleteFile(target)) {
136                                throw new GenericFileOperationFailedException("Cannot delete file: " + target);
137                            }
138                        }
139                    }
140    
141                    // delete any pre existing temp file
142                    if (operations.existsFile(tempTarget)) {
143                        log.trace("Deleting existing temp file: {}", tempTarget);
144                        if (!operations.deleteFile(tempTarget)) {
145                            throw new GenericFileOperationFailedException("Cannot delete file: " + tempTarget);
146                        }
147                    }
148                }
149    
150                // write/upload the file
151                writeFile(exchange, tempTarget != null ? tempTarget : target);
152    
153                // if we did write to a temporary name then rename it to the real
154                // name after we have written the file
155                if (tempTarget != null) {
156    
157                    // if we should not eager delete the target file then do it now just before renaming
158                    if (!endpoint.isEagerDeleteTargetFile() && targetExists
159                            && endpoint.getFileExist() == GenericFileExist.Override) {
160                        // we override the target so we do this by deleting it so the temp file can be renamed later
161                        // with success as the existing target file have been deleted
162                        log.trace("Deleting existing file: {}", target);
163                        if (!operations.deleteFile(target)) {
164                            throw new GenericFileOperationFailedException("Cannot delete file: " + target);
165                        }
166                    }
167    
168                    // now we are ready to rename the temp file to the target file
169                    log.trace("Renaming file: [{}] to: [{}]", tempTarget, target);
170                    boolean renamed = operations.renameFile(tempTarget, target);
171                    if (!renamed) {
172                        throw new GenericFileOperationFailedException("Cannot rename file from: " + tempTarget + " to: " + target);
173                    }
174                }
175    
176                // any done file to write?
177                if (endpoint.getDoneFileName() != null) {
178                    String doneFileName = endpoint.createDoneFileName(target);
179                    ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint);
180    
181                    // create empty exchange with empty body to write as the done file
182                    Exchange empty = new DefaultExchange(exchange);
183                    empty.getIn().setBody("");
184    
185                    log.trace("Writing done file: [{}]", doneFileName);
186                    // delete any existing done file
187                    if (operations.existsFile(doneFileName)) {
188                        if (!operations.deleteFile(doneFileName)) {
189                            throw new GenericFileOperationFailedException("Cannot delete existing done file: " + doneFileName);
190                        }
191                    }
192                    writeFile(empty, doneFileName);
193                }
194    
195                // let's store the name we really used in the header, so end-users
196                // can retrieve it
197                exchange.getIn().setHeader(Exchange.FILE_NAME_PRODUCED, target);
198            } catch (Exception e) {
199                handleFailedWrite(exchange, e);
200            }
201    
202            postWriteCheck();
203        }
204    
205        /**
206         * If we fail writing out a file, we will call this method. This hook is
207         * provided to disconnect from servers or clean up files we created (if needed).
208         */
209        public void handleFailedWrite(Exchange exchange, Exception exception) throws Exception {
210            throw exception;
211        }
212    
213        /**
214         * Perform any actions that need to occur before we write such as connecting to an FTP server etc.
215         */
216        public void preWriteCheck() throws Exception {
217            // nothing needed to check
218        }
219    
220        /**
221         * Perform any actions that need to occur after we are done such as disconnecting.
222         */
223        public void postWriteCheck() {
224            // nothing needed to check
225        }
226    
227        public void writeFile(Exchange exchange, String fileName) throws GenericFileOperationFailedException {
228            // build directory if auto create is enabled
229            if (endpoint.isAutoCreate()) {
230                // we must normalize it (to avoid having both \ and / in the name which confuses java.io.File)
231                String name = FileUtil.normalizePath(fileName);
232    
233                // use java.io.File to compute the file path
234                File file = new File(name);
235                String directory = file.getParent();
236                boolean absolute = FileUtil.isAbsolute(file);
237                if (directory != null) {
238                    if (!operations.buildDirectory(directory, absolute)) {
239                        log.debug("Cannot build directory [{}] (could be because of denied permissions)", directory);
240                    }
241                }
242            }
243    
244            // upload
245            if (log.isTraceEnabled()) {
246                log.trace("About to write [{}] to [{}] from exchange [{}]", new Object[]{fileName, getEndpoint(), exchange});
247            }
248    
249            boolean success = operations.storeFile(fileName, exchange);
250            if (!success) {
251                throw new GenericFileOperationFailedException("Error writing file [" + fileName + "]");
252            }
253            log.debug("Wrote [{}] to [{}]", fileName, getEndpoint());
254        }
255    
256        public String createFileName(Exchange exchange) {
257            String answer;
258    
259            String name = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class);
260    
261            // expression support
262            Expression expression = endpoint.getFileName();
263            if (name != null) {
264                // the header name can be an expression too, that should override
265                // whatever configured on the endpoint
266                if (StringHelper.hasStartToken(name, "simple")) {
267                    log.trace("{} contains a Simple expression: {}", Exchange.FILE_NAME, name);
268                    Language language = getEndpoint().getCamelContext().resolveLanguage("file");
269                    expression = language.createExpression(name);
270                }
271            }
272            if (expression != null) {
273                log.trace("Filename evaluated as expression: {}", expression);
274                name = expression.evaluate(exchange, String.class);
275            }
276    
277            // flatten name
278            if (name != null && endpoint.isFlatten()) {
279                // check for both windows and unix separators
280                int pos = Math.max(name.lastIndexOf("/"), name.lastIndexOf("\\"));
281                if (pos != -1) {
282                    name = name.substring(pos + 1);
283                }
284            }
285    
286            // compute path by adding endpoint starting directory
287            String endpointPath = endpoint.getConfiguration().getDirectory();
288            String baseDir = "";
289            if (endpointPath.length() > 0) {
290                // Its a directory so we should use it as a base path for the filename
291                // If the path isn't empty, we need to add a trailing / if it isn't already there
292                baseDir = endpointPath;
293                boolean trailingSlash = endpointPath.endsWith("/") || endpointPath.endsWith("\\");
294                if (!trailingSlash) {
295                    baseDir += getFileSeparator();
296                }
297            }
298            if (name != null) {
299                answer = baseDir + name;
300            } else {
301                // use a generated filename if no name provided
302                answer = baseDir + endpoint.getGeneratedFileName(exchange.getIn());
303            }
304    
305            if (endpoint.getConfiguration().needToNormalize()) {
306                // must normalize path to cater for Windows and other OS
307                answer = normalizePath(answer);
308            }
309    
310            return answer;
311        }
312    
313        public String createTempFileName(Exchange exchange, String fileName) {
314            String answer = fileName;
315    
316            String tempName;
317            if (exchange.getIn().getHeader(Exchange.FILE_NAME) == null) {
318                // its a generated filename then add it to header so we can evaluate the expression
319                exchange.getIn().setHeader(Exchange.FILE_NAME, FileUtil.stripPath(fileName));
320                tempName = endpoint.getTempFileName().evaluate(exchange, String.class);
321                // and remove it again after evaluation
322                exchange.getIn().removeHeader(Exchange.FILE_NAME);
323            } else {
324                tempName = endpoint.getTempFileName().evaluate(exchange, String.class);
325            }
326    
327            // check for both windows and unix separators
328            int pos = Math.max(answer.lastIndexOf("/"), answer.lastIndexOf("\\"));
329            if (pos == -1) {
330                // no path so use temp name as calculated
331                answer = tempName;
332            } else {
333                // path should be prefixed before the temp name
334                StringBuilder sb = new StringBuilder(answer.substring(0, pos + 1));
335                sb.append(tempName);
336                answer = sb.toString();
337            }
338    
339            if (endpoint.getConfiguration().needToNormalize()) {
340                // must normalize path to cater for Windows and other OS
341                answer = normalizePath(answer);
342            }
343    
344            return answer;
345        }
346    
347        @Override
348        protected void doStart() throws Exception {
349            super.doStart();
350            ServiceHelper.startService(locks);
351        }
352    
353        @Override
354        protected void doStop() throws Exception {
355            ServiceHelper.stopService(locks);
356            super.doStop();
357        }
358    }