001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.file;
018    
019    import org.apache.camel.Exchange;
020    import org.apache.camel.impl.LoggingExceptionHandler;
021    import org.apache.camel.spi.ExceptionHandler;
022    import org.apache.camel.spi.Synchronization;
023    import org.apache.camel.util.ObjectHelper;
024    import org.slf4j.Logger;
025    import org.slf4j.LoggerFactory;
026    
027    /**
028     * On completion strategy that performs the required work after the {@link Exchange} has been processed.
029     * <p/>
030     * The work is for example to move the processed file into a backup folder, delete the file or
031     * in case of processing failure do a rollback. 
032     *
033     * @version 
034     */
035    public class GenericFileOnCompletion<T> implements Synchronization {
036    
037        private final transient Logger log = LoggerFactory.getLogger(GenericFileOnCompletion.class);
038        private GenericFileEndpoint<T> endpoint;
039        private GenericFileOperations<T> operations;
040        private ExceptionHandler exceptionHandler;
041        private GenericFile<T> file;
042        private String absoluteFileName;
043    
044        public GenericFileOnCompletion(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations,
045                                       GenericFile<T> file, String absoluteFileName) {
046            this.endpoint = endpoint;
047            this.operations = operations;
048            this.file = file;
049            this.absoluteFileName = absoluteFileName;
050        }
051    
052        public void onComplete(Exchange exchange) {
053            onCompletion(exchange);
054        }
055    
056        public void onFailure(Exchange exchange) {
057            onCompletion(exchange);
058        }
059    
060        public ExceptionHandler getExceptionHandler() {
061            if (exceptionHandler == null) {
062                exceptionHandler = new LoggingExceptionHandler(getClass());
063            }
064            return exceptionHandler;
065        }
066    
067        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
068            this.exceptionHandler = exceptionHandler;
069        }
070    
071        protected void onCompletion(Exchange exchange) {
072            GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
073    
074            log.debug("Done processing file: {} using exchange: {}", file, exchange);
075    
076            // commit or rollback
077            boolean committed = false;
078            try {
079                boolean failed = exchange.isFailed();
080                if (!failed) {
081                    // commit the file strategy if there was no failure or already handled by the DeadLetterChannel
082                    processStrategyCommit(processStrategy, exchange, file);
083                    committed = true;
084                }
085                // if we failed, then it will be handled by the rollback in the finally block below
086            } finally {
087                if (!committed) {
088                    processStrategyRollback(processStrategy, exchange, file);
089                }
090    
091                // remove file from the in progress list as its no longer in progress
092                // use the original file name that was used to add it to the repository
093                // as the name can be different when using preMove option
094                endpoint.getInProgressRepository().remove(absoluteFileName);
095            }
096        }
097    
098        /**
099         * Strategy when the file was processed and a commit should be executed.
100         *
101         * @param processStrategy the strategy to perform the commit
102         * @param exchange        the exchange
103         * @param file            the file processed
104         */
105        protected void processStrategyCommit(GenericFileProcessStrategy<T> processStrategy,
106                                             Exchange exchange, GenericFile<T> file) {
107            if (endpoint.isIdempotent()) {
108                // only add to idempotent repository if we could process the file
109                endpoint.getIdempotentRepository().add(absoluteFileName);
110            }
111    
112            // must be last in batch to delete the done file name
113            // delete done file if used (and not noop=true)
114            boolean complete = exchange.getProperty(Exchange.BATCH_COMPLETE, false, Boolean.class);
115            if (endpoint.getDoneFileName() != null && !endpoint.isNoop() && complete) {
116                // done file must be in same path as the original input file
117                String doneFileName = endpoint.createDoneFileName(absoluteFileName);
118                ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint);
119    
120                try {
121                    // delete done file
122                    boolean deleted = operations.deleteFile(doneFileName);
123                    log.trace("Done file: {} was deleted: {}", doneFileName, deleted);
124                    if (!deleted) {
125                        log.warn("Done file: " + doneFileName + " could not be deleted");
126                    }
127                } catch (Exception e) {
128                    handleException(e);
129                }
130            }
131    
132            try {
133                log.trace("Commit file strategy: {} for file: {}", processStrategy, file);
134                processStrategy.commit(operations, endpoint, exchange, file);
135            } catch (Exception e) {
136                handleException(e);
137            }
138        }
139    
140        /**
141         * Strategy when the file was not processed and a rollback should be executed.
142         *
143         * @param processStrategy the strategy to perform the commit
144         * @param exchange        the exchange
145         * @param file            the file processed
146         */
147        protected void processStrategyRollback(GenericFileProcessStrategy<T> processStrategy,
148                                               Exchange exchange, GenericFile<T> file) {
149    
150            if (log.isWarnEnabled()) {
151                log.warn("Rollback file strategy: " + processStrategy + " for file: " + file);
152            }
153            try {
154                processStrategy.rollback(operations, endpoint, exchange, file);
155            } catch (Exception e) {
156                handleException(e);
157            }
158        }
159    
160        protected void handleException(Throwable t) {
161            Throwable newt = (t == null) ? new IllegalArgumentException("Handling [null] exception") : t;
162            getExceptionHandler().handleException(newt);
163        }
164    
165        @Override
166        public String toString() {
167            return "GenericFileOnCompletion";
168        }
169    }