001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.file;
018
019import java.io.File;
020import java.util.Map;
021import java.util.concurrent.locks.Lock;
022import java.util.concurrent.locks.ReentrantLock;
023
024import org.apache.camel.Exchange;
025import org.apache.camel.Expression;
026import org.apache.camel.impl.DefaultExchange;
027import org.apache.camel.impl.DefaultProducer;
028import org.apache.camel.util.FileUtil;
029import org.apache.camel.util.LRUCacheFactory;
030import org.apache.camel.util.ObjectHelper;
031import org.apache.camel.util.ServiceHelper;
032import org.apache.camel.util.StringHelper;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * Generic file producer
038 */
039public class GenericFileProducer<T> extends DefaultProducer {
040    protected final Logger log = LoggerFactory.getLogger(getClass());
041    protected final GenericFileEndpoint<T> endpoint;
042    protected GenericFileOperations<T> operations;
043    // assume writing to 100 different files concurrently at most for the same file producer
044    private final Map<String, Lock> locks = LRUCacheFactory.newLRUCache(100);
045
046    protected GenericFileProducer(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations) {
047        super(endpoint);
048        this.endpoint = endpoint;
049        this.operations = operations;
050    }
051
052    public String getFileSeparator() {
053        return File.separator;
054    }
055
056    public String normalizePath(String name) {
057        return FileUtil.normalizePath(name);
058    }
059
060    public void process(Exchange exchange) throws Exception {
061        // store any existing file header which we want to keep and propagate
062        final String existing = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class);
063
064        // create the target file name
065        String target = createFileName(exchange);
066
067        // use lock for same file name to avoid concurrent writes to the same file
068        // for example when you concurrently append to the same file
069        Lock lock;
070        synchronized (locks) {
071            lock = locks.get(target);
072            if (lock == null) {
073                lock = new ReentrantLock();
074                locks.put(target, lock);
075            }
076        }
077
078        lock.lock();
079        try {
080            processExchange(exchange, target);
081        } finally {
082            // do not remove as the locks cache has an upper bound
083            // this ensure the locks is appropriate reused
084            lock.unlock();
085            // and remove the write file name header as we only want to use it once (by design)
086            exchange.getIn().removeHeader(Exchange.OVERRULE_FILE_NAME);
087            // and restore existing file name
088            exchange.getIn().setHeader(Exchange.FILE_NAME, existing);
089        }
090    }
091
092    /**
093     * Sets the operations to be used.
094     * <p/>
095     * Can be used to set a fresh operations in case of recovery attempts
096     *
097     * @param operations the operations
098     */
099    public void setOperations(GenericFileOperations<T> operations) {
100        this.operations = operations;
101    }
102
103    /**
104     * Perform the work to process the fileExchange
105     *
106     * @param exchange fileExchange
107     * @param target   the target filename
108     * @throws Exception is thrown if some error
109     */
110    protected void processExchange(Exchange exchange, String target) throws Exception {
111        log.trace("Processing file: {} for exchange: {}", target, exchange);
112
113        try {
114            preWriteCheck();
115
116            // should we write to a temporary name and then afterwards rename to real target
117            boolean writeAsTempAndRename = ObjectHelper.isNotEmpty(endpoint.getTempFileName());
118            String tempTarget = null;
119            // remember if target exists to avoid checking twice
120            Boolean targetExists;
121            if (writeAsTempAndRename) {
122                // compute temporary name with the temp prefix
123                tempTarget = createTempFileName(exchange, target);
124
125                log.trace("Writing using tempNameFile: {}", tempTarget);
126               
127                //if we should eager delete target file before deploying temporary file
128                if (endpoint.getFileExist() != GenericFileExist.TryRename && endpoint.isEagerDeleteTargetFile()) {
129                    
130                    // cater for file exists option on the real target as
131                    // the file operations code will work on the temp file
132
133                    // if an existing file already exists what should we do?
134                    targetExists = operations.existsFile(target);
135                    if (targetExists) {
136                        
137                        log.trace("EagerDeleteTargetFile, target exists");
138                        
139                        if (endpoint.getFileExist() == GenericFileExist.Ignore) {
140                            // ignore but indicate that the file was written
141                            log.trace("An existing file already exists: {}. Ignore and do not override it.", target);
142                            return;
143                        } else if (endpoint.getFileExist() == GenericFileExist.Fail) {
144                            throw new GenericFileOperationFailedException("File already exist: " + target + ". Cannot write new file.");
145                        } else if (endpoint.getFileExist() == GenericFileExist.Move) {
146                            // move any existing file first
147                            doMoveExistingFile(target);
148                        } else if (endpoint.isEagerDeleteTargetFile() && endpoint.getFileExist() == GenericFileExist.Override) {
149                            // we override the target so we do this by deleting it so the temp file can be renamed later
150                            // with success as the existing target file have been deleted
151                            log.trace("Eagerly deleting existing file: {}", target);
152                            if (!operations.deleteFile(target)) {
153                                throw new GenericFileOperationFailedException("Cannot delete file: " + target);
154                            }
155                        }
156                    }
157                }
158
159                // delete any pre existing temp file
160                if (endpoint.getFileExist() != GenericFileExist.TryRename && operations.existsFile(tempTarget)) {
161                    log.trace("Deleting existing temp file: {}", tempTarget);
162                    if (!operations.deleteFile(tempTarget)) {
163                        throw new GenericFileOperationFailedException("Cannot delete file: " + tempTarget);
164                    }
165                }
166            }
167
168            // write/upload the file
169            writeFile(exchange, tempTarget != null ? tempTarget : target);
170
171            // if we did write to a temporary name then rename it to the real
172            // name after we have written the file
173            if (tempTarget != null) {
174                // if we did not eager delete the target file
175                if (endpoint.getFileExist() != GenericFileExist.TryRename && !endpoint.isEagerDeleteTargetFile()) {
176
177                    // if an existing file already exists what should we do?
178                    targetExists = operations.existsFile(target);
179                    if (targetExists) {
180
181                        log.trace("Not using EagerDeleteTargetFile, target exists");
182
183                        if (endpoint.getFileExist() == GenericFileExist.Ignore) {
184                            // ignore but indicate that the file was written
185                            log.trace("An existing file already exists: {}. Ignore and do not override it.", target);
186                            return;
187                        } else if (endpoint.getFileExist() == GenericFileExist.Fail) {
188                            throw new GenericFileOperationFailedException("File already exist: " + target + ". Cannot write new file.");
189                        } else if (endpoint.getFileExist() == GenericFileExist.Override) {
190                            // we override the target so we do this by deleting it so the temp file can be renamed later
191                            // with success as the existing target file have been deleted
192                            log.trace("Deleting existing file: {}", target);
193                            if (!operations.deleteFile(target)) {
194                                throw new GenericFileOperationFailedException("Cannot delete file: " + target);
195                            }
196                        }
197                    }
198                }
199
200                // now we are ready to rename the temp file to the target file
201                log.trace("Renaming file: [{}] to: [{}]", tempTarget, target);
202                boolean renamed = operations.renameFile(tempTarget, target);
203                if (!renamed) {
204                    throw new GenericFileOperationFailedException("Cannot rename file from: " + tempTarget + " to: " + target);
205                }
206            }
207
208            // any done file to write?
209            if (endpoint.getDoneFileName() != null) {
210                String doneFileName = endpoint.createDoneFileName(target);
211                StringHelper.notEmpty(doneFileName, "doneFileName", endpoint);
212
213                // create empty exchange with empty body to write as the done file
214                Exchange empty = new DefaultExchange(exchange);
215                empty.getIn().setBody("");
216
217                log.trace("Writing done file: [{}]", doneFileName);
218                // delete any existing done file
219                if (operations.existsFile(doneFileName)) {
220                    if (!operations.deleteFile(doneFileName)) {
221                        throw new GenericFileOperationFailedException("Cannot delete existing done file: " + doneFileName);
222                    }
223                }
224                writeFile(empty, doneFileName);
225            }
226
227            // let's store the name we really used in the header, so end-users
228            // can retrieve it
229            exchange.getIn().setHeader(Exchange.FILE_NAME_PRODUCED, target);
230        } catch (Exception e) {
231            handleFailedWrite(exchange, e);
232        }
233
234        postWriteCheck(exchange);
235    }
236
237    private void doMoveExistingFile(String fileName) throws GenericFileOperationFailedException {
238        // need to evaluate using a dummy and simulate the file first, to have access to all the file attributes
239        // create a dummy exchange as Exchange is needed for expression evaluation
240        // we support only the following 3 tokens.
241        Exchange dummy = endpoint.createExchange();
242        String parent = FileUtil.onlyPath(fileName);
243        String onlyName = FileUtil.stripPath(fileName);
244        dummy.getIn().setHeader(Exchange.FILE_NAME, fileName);
245        dummy.getIn().setHeader(Exchange.FILE_NAME_ONLY, onlyName);
246        dummy.getIn().setHeader(Exchange.FILE_PARENT, parent);
247
248        String to = endpoint.getMoveExisting().evaluate(dummy, String.class);
249        // we must normalize it (to avoid having both \ and / in the name which confuses java.io.File)
250        to = FileUtil.normalizePath(to);
251        if (ObjectHelper.isEmpty(to)) {
252            throw new GenericFileOperationFailedException("moveExisting evaluated as empty String, cannot move existing file: " + fileName);
253        }
254
255        boolean renamed = operations.renameFile(fileName, to);
256        if (!renamed) {
257            throw new GenericFileOperationFailedException("Cannot rename file from: " + fileName + " to: " + to);
258        }
259    }
260
261    /**
262     * If we fail writing out a file, we will call this method. This hook is
263     * provided to disconnect from servers or clean up files we created (if needed).
264     */
265    public void handleFailedWrite(Exchange exchange, Exception exception) throws Exception {
266        throw exception;
267    }
268
269    /**
270     * Perform any actions that need to occur before we write such as connecting to an FTP server etc.
271     */
272    public void preWriteCheck() throws Exception {
273        // nothing needed to check
274    }
275
276    /**
277     * Perform any actions that need to occur after we are done such as disconnecting.
278     */
279    public void postWriteCheck(Exchange exchange) {
280        // nothing needed to check
281    }
282
283    public void writeFile(Exchange exchange, String fileName) throws GenericFileOperationFailedException {
284        // build directory if auto create is enabled
285        if (endpoint.isAutoCreate()) {
286            // we must normalize it (to avoid having both \ and / in the name which confuses java.io.File)
287            String name = FileUtil.normalizePath(fileName);
288
289            // use java.io.File to compute the file path
290            File file = new File(name);
291            String directory = file.getParent();
292            boolean absolute = FileUtil.isAbsolute(file);
293            if (directory != null) {
294                if (!operations.buildDirectory(directory, absolute)) {
295                    log.debug("Cannot build directory [{}] (could be because of denied permissions)", directory);
296                }
297            }
298        }
299
300        // upload
301        if (log.isTraceEnabled()) {
302            log.trace("About to write [{}] to [{}] from exchange [{}]", fileName, getEndpoint(), exchange);
303        }
304
305        boolean success = operations.storeFile(fileName, exchange, -1);
306        if (!success) {
307            throw new GenericFileOperationFailedException("Error writing file [" + fileName + "]");
308        }
309        log.debug("Wrote [{}] to [{}]", fileName, getEndpoint());
310    }
311
312    public String createFileName(Exchange exchange) {
313        String answer;
314
315        // overrule takes precedence
316        Object value;
317
318        Object overrule = exchange.getIn().getHeader(Exchange.OVERRULE_FILE_NAME);
319        if (overrule != null) {
320            if (overrule instanceof Expression) {
321                value = overrule;
322            } else {
323                value = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, overrule);
324            }
325        } else {
326            value = exchange.getIn().getHeader(Exchange.FILE_NAME);
327        }
328
329        // if we have an overrule then override the existing header to use the overrule computed name from this point forward
330        if (overrule != null) {
331            exchange.getIn().setHeader(Exchange.FILE_NAME, value);
332        }
333
334        if (value instanceof String && StringHelper.hasStartToken((String) value, "simple")) {
335            log.warn("Simple expression: {} detected in header: {} of type String. This feature has been removed (see CAMEL-6748).", value, Exchange.FILE_NAME);
336        }
337
338        // expression support
339        Expression expression = endpoint.getFileName();
340        if (value instanceof Expression) {
341            expression = (Expression) value;
342        }
343
344        // evaluate the name as a String from the value
345        String name;
346        if (expression != null) {
347            log.trace("Filename evaluated as expression: {}", expression);
348            name = expression.evaluate(exchange, String.class);
349        } else {
350            name = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, value);
351        }
352
353        // flatten name
354        if (name != null && endpoint.isFlatten()) {
355            // check for both windows and unix separators
356            int pos = Math.max(name.lastIndexOf("/"), name.lastIndexOf("\\"));
357            if (pos != -1) {
358                name = name.substring(pos + 1);
359            }
360        }
361
362        // compute path by adding endpoint starting directory
363        String endpointPath = endpoint.getConfiguration().getDirectory();
364        String baseDir = "";
365        if (endpointPath.length() > 0) {
366            // Its a directory so we should use it as a base path for the filename
367            // If the path isn't empty, we need to add a trailing / if it isn't already there
368            baseDir = endpointPath;
369            boolean trailingSlash = endpointPath.endsWith("/") || endpointPath.endsWith("\\");
370            if (!trailingSlash) {
371                baseDir += getFileSeparator();
372            }
373        }
374        if (name != null) {
375            answer = baseDir + name;
376        } else {
377            // use a generated filename if no name provided
378            answer = baseDir + endpoint.getGeneratedFileName(exchange.getIn());
379        }
380
381        if (endpoint.isJailStartingDirectory()) {
382            // check for file must be within starting directory (need to compact first as the name can be using relative paths via ../ etc)
383            String compatchAnswer = FileUtil.compactPath(answer);
384            String compatchBaseDir = FileUtil.compactPath(baseDir);
385            if (!compatchAnswer.startsWith(compatchBaseDir)) {
386                throw new IllegalArgumentException("Cannot write file with name: " + compatchAnswer + " as the filename is jailed to the starting directory: " + compatchBaseDir);
387            }
388        }
389
390        if (endpoint.getConfiguration().needToNormalize()) {
391            // must normalize path to cater for Windows and other OS
392            answer = normalizePath(answer);
393        }
394
395        return answer;
396    }
397
398    public String createTempFileName(Exchange exchange, String fileName) {
399        String answer = fileName;
400
401        String tempName;
402        if (exchange.getIn().getHeader(Exchange.FILE_NAME) == null) {
403            // its a generated filename then add it to header so we can evaluate the expression
404            exchange.getIn().setHeader(Exchange.FILE_NAME, FileUtil.stripPath(fileName));
405            tempName = endpoint.getTempFileName().evaluate(exchange, String.class);
406            // and remove it again after evaluation
407            exchange.getIn().removeHeader(Exchange.FILE_NAME);
408        } else {
409            tempName = endpoint.getTempFileName().evaluate(exchange, String.class);
410        }
411
412        // check for both windows and unix separators
413        int pos = Math.max(answer.lastIndexOf("/"), answer.lastIndexOf("\\"));
414        if (pos == -1) {
415            // no path so use temp name as calculated
416            answer = tempName;
417        } else {
418            // path should be prefixed before the temp name
419            StringBuilder sb = new StringBuilder(answer.substring(0, pos + 1));
420            sb.append(tempName);
421            answer = sb.toString();
422        }
423
424        if (endpoint.getConfiguration().needToNormalize()) {
425            // must normalize path to cater for Windows and other OS
426            answer = normalizePath(answer);
427        }
428
429        // stack path in case the temporary file uses .. paths
430        answer = FileUtil.compactPath(answer, getFileSeparator());
431
432        return answer;
433    }
434
435    @Override
436    protected void doStart() throws Exception {
437        ServiceHelper.startService(locks);
438        super.doStart();
439    }
440
441    @Override
442    protected void doStop() throws Exception {
443        super.doStop();
444        ServiceHelper.stopService(locks);
445    }
446}