001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.file;
018
019import java.io.File;
020import java.util.concurrent.locks.Lock;
021import java.util.concurrent.locks.ReentrantLock;
022
023import org.apache.camel.Exchange;
024import org.apache.camel.Expression;
025import org.apache.camel.impl.DefaultExchange;
026import org.apache.camel.impl.DefaultProducer;
027import org.apache.camel.util.FileUtil;
028import org.apache.camel.util.LRUCache;
029import org.apache.camel.util.ObjectHelper;
030import org.apache.camel.util.ServiceHelper;
031import org.apache.camel.util.StringHelper;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034/**
035 * Generic file producer
036 */
037public class GenericFileProducer<T> extends DefaultProducer {
038    protected final Logger log = LoggerFactory.getLogger(getClass());
039    protected final GenericFileEndpoint<T> endpoint;
040    protected GenericFileOperations<T> operations;
041    // assume writing to 100 different files concurrently at most for the same file producer
042    private final LRUCache<String, Lock> locks = new LRUCache<String, Lock>(100);
043
044    protected GenericFileProducer(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations) {
045        super(endpoint);
046        this.endpoint = endpoint;
047        this.operations = operations;
048    }
049    
050    public String getFileSeparator() {
051        return File.separator;
052    }
053
054    public String normalizePath(String name) {
055        return FileUtil.normalizePath(name);
056    }
057
058    public void process(Exchange exchange) throws Exception {
059        // store any existing file header which we want to keep and propagate
060        final String existing = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class);
061
062        // create the target file name
063        String target = createFileName(exchange);
064
065        // use lock for same file name to avoid concurrent writes to the same file
066        // for example when you concurrently append to the same file
067        Lock lock;
068        synchronized (locks) {
069            lock = locks.get(target);
070            if (lock == null) {
071                lock = new ReentrantLock();
072                locks.put(target, lock);
073            }
074        }
075
076        lock.lock();
077        try {
078            processExchange(exchange, target);
079        } finally {
080            // do not remove as the locks cache has an upper bound
081            // this ensure the locks is appropriate reused
082            lock.unlock();
083            // and remove the write file name header as we only want to use it once (by design)
084            exchange.getIn().removeHeader(Exchange.OVERRULE_FILE_NAME);
085            // and restore existing file name
086            exchange.getIn().setHeader(Exchange.FILE_NAME, existing);
087        }
088    }
089
090    /**
091     * Sets the operations to be used.
092     * <p/>
093     * Can be used to set a fresh operations in case of recovery attempts
094     *
095     * @param operations the operations
096     */
097    public void setOperations(GenericFileOperations<T> operations) {
098        this.operations = operations;
099    }
100
101    /**
102     * Perform the work to process the fileExchange
103     *
104     * @param exchange fileExchange
105     * @param target   the target filename
106     * @throws Exception is thrown if some error
107     */
108    protected void processExchange(Exchange exchange, String target) throws Exception {
109        log.trace("Processing file: {} for exchange: {}", target, exchange);
110
111        try {
112            preWriteCheck();
113
114            // should we write to a temporary name and then afterwards rename to real target
115            boolean writeAsTempAndRename = ObjectHelper.isNotEmpty(endpoint.getTempFileName());
116            String tempTarget = null;
117            // remember if target exists to avoid checking twice
118            Boolean targetExists;
119            if (writeAsTempAndRename) {
120                // compute temporary name with the temp prefix
121                tempTarget = createTempFileName(exchange, target);
122
123                log.trace("Writing using tempNameFile: {}", tempTarget);
124               
125                //if we should eager delete target file before deploying temporary file
126                if (endpoint.getFileExist() != GenericFileExist.TryRename && endpoint.isEagerDeleteTargetFile()) {
127                    
128                    // cater for file exists option on the real target as
129                    // the file operations code will work on the temp file
130
131                    // if an existing file already exists what should we do?
132                    targetExists = operations.existsFile(target);
133                    if (targetExists) {
134                        
135                        log.trace("EagerDeleteTargetFile, target exists");
136                        
137                        if (endpoint.getFileExist() == GenericFileExist.Ignore) {
138                            // ignore but indicate that the file was written
139                            log.trace("An existing file already exists: {}. Ignore and do not override it.", target);
140                            return;
141                        } else if (endpoint.getFileExist() == GenericFileExist.Fail) {
142                            throw new GenericFileOperationFailedException("File already exist: " + target + ". Cannot write new file.");
143                        } else if (endpoint.getFileExist() == GenericFileExist.Move) {
144                            // move any existing file first
145                            doMoveExistingFile(target);
146                        } else if (endpoint.isEagerDeleteTargetFile() && endpoint.getFileExist() == GenericFileExist.Override) {
147                            // we override the target so we do this by deleting it so the temp file can be renamed later
148                            // with success as the existing target file have been deleted
149                            log.trace("Eagerly deleting existing file: {}", target);
150                            if (!operations.deleteFile(target)) {
151                                throw new GenericFileOperationFailedException("Cannot delete file: " + target);
152                            }
153                        }
154                    }
155                }
156
157                // delete any pre existing temp file
158                if (operations.existsFile(tempTarget)) {
159                    log.trace("Deleting existing temp file: {}", tempTarget);
160                    if (!operations.deleteFile(tempTarget)) {
161                        throw new GenericFileOperationFailedException("Cannot delete file: " + tempTarget);
162                    }
163                }
164            }
165
166            // write/upload the file
167            writeFile(exchange, tempTarget != null ? tempTarget : target);
168
169            // if we did write to a temporary name then rename it to the real
170            // name after we have written the file
171            if (tempTarget != null) {
172                // if we did not eager delete the target file
173                if (endpoint.getFileExist() != GenericFileExist.TryRename && !endpoint.isEagerDeleteTargetFile()) {
174
175                    // if an existing file already exists what should we do?
176                    targetExists = operations.existsFile(target);
177                    if (targetExists) {
178
179                        log.trace("Not using EagerDeleteTargetFile, target exists");
180
181                        if (endpoint.getFileExist() == GenericFileExist.Ignore) {
182                            // ignore but indicate that the file was written
183                            log.trace("An existing file already exists: {}. Ignore and do not override it.", target);
184                            return;
185                        } else if (endpoint.getFileExist() == GenericFileExist.Fail) {
186                            throw new GenericFileOperationFailedException("File already exist: " + target + ". Cannot write new file.");
187                        } else if (endpoint.getFileExist() == GenericFileExist.Override) {
188                            // we override the target so we do this by deleting it so the temp file can be renamed later
189                            // with success as the existing target file have been deleted
190                            log.trace("Deleting existing file: {}", target);
191                            if (!operations.deleteFile(target)) {
192                                throw new GenericFileOperationFailedException("Cannot delete file: " + target);
193                            }
194                        }
195                    }
196                }
197
198                // now we are ready to rename the temp file to the target file
199                log.trace("Renaming file: [{}] to: [{}]", tempTarget, target);
200                boolean renamed = operations.renameFile(tempTarget, target);
201                if (!renamed) {
202                    throw new GenericFileOperationFailedException("Cannot rename file from: " + tempTarget + " to: " + target);
203                }
204            }
205
206            // any done file to write?
207            if (endpoint.getDoneFileName() != null) {
208                String doneFileName = endpoint.createDoneFileName(target);
209                ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint);
210
211                // create empty exchange with empty body to write as the done file
212                Exchange empty = new DefaultExchange(exchange);
213                empty.getIn().setBody("");
214
215                log.trace("Writing done file: [{}]", doneFileName);
216                // delete any existing done file
217                if (operations.existsFile(doneFileName)) {
218                    if (!operations.deleteFile(doneFileName)) {
219                        throw new GenericFileOperationFailedException("Cannot delete existing done file: " + doneFileName);
220                    }
221                }
222                writeFile(empty, doneFileName);
223            }
224
225            // let's store the name we really used in the header, so end-users
226            // can retrieve it
227            exchange.getIn().setHeader(Exchange.FILE_NAME_PRODUCED, target);
228        } catch (Exception e) {
229            handleFailedWrite(exchange, e);
230        }
231
232        postWriteCheck();
233    }
234
235    private void doMoveExistingFile(String fileName) throws GenericFileOperationFailedException {
236        // need to evaluate using a dummy and simulate the file first, to have access to all the file attributes
237        // create a dummy exchange as Exchange is needed for expression evaluation
238        // we support only the following 3 tokens.
239        Exchange dummy = endpoint.createExchange();
240        String parent = FileUtil.onlyPath(fileName);
241        String onlyName = FileUtil.stripPath(fileName);
242        dummy.getIn().setHeader(Exchange.FILE_NAME, fileName);
243        dummy.getIn().setHeader(Exchange.FILE_NAME_ONLY, onlyName);
244        dummy.getIn().setHeader(Exchange.FILE_PARENT, parent);
245
246        String to = endpoint.getMoveExisting().evaluate(dummy, String.class);
247        // we must normalize it (to avoid having both \ and / in the name which confuses java.io.File)
248        to = FileUtil.normalizePath(to);
249        if (ObjectHelper.isEmpty(to)) {
250            throw new GenericFileOperationFailedException("moveExisting evaluated as empty String, cannot move existing file: " + fileName);
251        }
252        String parentTo = FileUtil.onlyPath(to);
253        boolean absolute = FileUtil.isAbsolute(new File(parentTo));
254        if (!absolute && !parentTo.startsWith(parent)) {
255            parentTo = parent + File.separator + parentTo;
256            String fileonlyname =  FileUtil.stripPath(to);
257            to = parentTo + File.separator + (ObjectHelper.isNotEmpty(fileonlyname) ? fileonlyname : onlyName);
258        }
259        operations.buildDirectory(parentTo, absolute);
260        boolean renamed = operations.renameFile(fileName, to);
261
262        if (!renamed) {
263            throw new GenericFileOperationFailedException("Cannot rename file from: " + fileName + " to: " + to);
264        }
265    }
266
267    /**
268     * If we fail writing out a file, we will call this method. This hook is
269     * provided to disconnect from servers or clean up files we created (if needed).
270     */
271    public void handleFailedWrite(Exchange exchange, Exception exception) throws Exception {
272        throw exception;
273    }
274
275    /**
276     * Perform any actions that need to occur before we write such as connecting to an FTP server etc.
277     */
278    public void preWriteCheck() throws Exception {
279        // nothing needed to check
280    }
281
282    /**
283     * Perform any actions that need to occur after we are done such as disconnecting.
284     */
285    public void postWriteCheck() {
286        // nothing needed to check
287    }
288
289    public void writeFile(Exchange exchange, String fileName) throws GenericFileOperationFailedException {
290        // build directory if auto create is enabled
291        if (endpoint.isAutoCreate()) {
292            // we must normalize it (to avoid having both \ and / in the name which confuses java.io.File)
293            String name = FileUtil.normalizePath(fileName);
294
295            // use java.io.File to compute the file path
296            File file = new File(name);
297            String directory = file.getParent();
298            boolean absolute = FileUtil.isAbsolute(file);
299            if (directory != null) {
300                if (!operations.buildDirectory(directory, absolute)) {
301                    log.debug("Cannot build directory [{}] (could be because of denied permissions)", directory);
302                }
303            }
304        }
305
306        // upload
307        if (log.isTraceEnabled()) {
308            log.trace("About to write [{}] to [{}] from exchange [{}]", new Object[]{fileName, getEndpoint(), exchange});
309        }
310
311        boolean success = operations.storeFile(fileName, exchange);
312        if (!success) {
313            throw new GenericFileOperationFailedException("Error writing file [" + fileName + "]");
314        }
315        log.debug("Wrote [{}] to [{}]", fileName, getEndpoint());
316    }
317
318    public String createFileName(Exchange exchange) {
319        String answer;
320
321        // overrule takes precedence
322        Object value;
323
324        Object overrule = exchange.getIn().getHeader(Exchange.OVERRULE_FILE_NAME);
325        if (overrule != null) {
326            if (overrule instanceof Expression) {
327                value = overrule;
328            } else {
329                value = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, overrule);
330            }
331        } else {
332            value = exchange.getIn().getHeader(Exchange.FILE_NAME);
333        }
334
335        // if we have an overrule then override the existing header to use the overrule computed name from this point forward
336        if (overrule != null) {
337            exchange.getIn().setHeader(Exchange.FILE_NAME, value);
338        }
339
340        if (value != null && value instanceof String && StringHelper.hasStartToken((String) value, "simple")) {
341            log.warn("Simple expression: {} detected in header: {} of type String. This feature has been removed (see CAMEL-6748).", value, Exchange.FILE_NAME);
342        }
343
344        // expression support
345        Expression expression = endpoint.getFileName();
346        if (value != null && value instanceof Expression) {
347            expression = (Expression) value;
348        }
349
350        // evaluate the name as a String from the value
351        String name;
352        if (expression != null) {
353            log.trace("Filename evaluated as expression: {}", expression);
354            name = expression.evaluate(exchange, String.class);
355        } else {
356            name = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, value);
357        }
358
359        // flatten name
360        if (name != null && endpoint.isFlatten()) {
361            // check for both windows and unix separators
362            int pos = Math.max(name.lastIndexOf("/"), name.lastIndexOf("\\"));
363            if (pos != -1) {
364                name = name.substring(pos + 1);
365            }
366        }
367
368        // compute path by adding endpoint starting directory
369        String endpointPath = endpoint.getConfiguration().getDirectory();
370        String baseDir = "";
371        if (endpointPath.length() > 0) {
372            // Its a directory so we should use it as a base path for the filename
373            // If the path isn't empty, we need to add a trailing / if it isn't already there
374            baseDir = endpointPath;
375            boolean trailingSlash = endpointPath.endsWith("/") || endpointPath.endsWith("\\");
376            if (!trailingSlash) {
377                baseDir += getFileSeparator();
378            }
379        }
380        if (name != null) {
381            answer = baseDir + name;
382        } else {
383            // use a generated filename if no name provided
384            answer = baseDir + endpoint.getGeneratedFileName(exchange.getIn());
385        }
386
387        if (endpoint.getConfiguration().needToNormalize()) {
388            // must normalize path to cater for Windows and other OS
389            answer = normalizePath(answer);
390        }
391
392        return answer;
393    }
394
395    public String createTempFileName(Exchange exchange, String fileName) {
396        String answer = fileName;
397
398        String tempName;
399        if (exchange.getIn().getHeader(Exchange.FILE_NAME) == null) {
400            // its a generated filename then add it to header so we can evaluate the expression
401            exchange.getIn().setHeader(Exchange.FILE_NAME, FileUtil.stripPath(fileName));
402            tempName = endpoint.getTempFileName().evaluate(exchange, String.class);
403            // and remove it again after evaluation
404            exchange.getIn().removeHeader(Exchange.FILE_NAME);
405        } else {
406            tempName = endpoint.getTempFileName().evaluate(exchange, String.class);
407        }
408
409        // check for both windows and unix separators
410        int pos = Math.max(answer.lastIndexOf("/"), answer.lastIndexOf("\\"));
411        if (pos == -1) {
412            // no path so use temp name as calculated
413            answer = tempName;
414        } else {
415            // path should be prefixed before the temp name
416            StringBuilder sb = new StringBuilder(answer.substring(0, pos + 1));
417            sb.append(tempName);
418            answer = sb.toString();
419        }
420
421        if (endpoint.getConfiguration().needToNormalize()) {
422            // must normalize path to cater for Windows and other OS
423            answer = normalizePath(answer);
424        }
425
426        return answer;
427    }
428
429    @Override
430    protected void doStart() throws Exception {
431        super.doStart();
432        ServiceHelper.startService(locks);
433    }
434
435    @Override
436    protected void doStop() throws Exception {
437        ServiceHelper.stopService(locks);
438        super.doStop();
439    }
440}