001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.file; 018 019 import org.apache.camel.Exchange; 020 import org.apache.camel.impl.LoggingExceptionHandler; 021 import org.apache.camel.spi.ExceptionHandler; 022 import org.apache.camel.spi.Synchronization; 023 import org.apache.camel.util.ObjectHelper; 024 import org.slf4j.Logger; 025 import org.slf4j.LoggerFactory; 026 027 /** 028 * On completion strategy that performs the required work after the {@link Exchange} has been processed. 029 * <p/> 030 * The work is for example to move the processed file into a backup folder, delete the file or 031 * in case of processing failure do a rollback. 032 * 033 * @version 034 */ 035 public class GenericFileOnCompletion<T> implements Synchronization { 036 037 private final transient Logger log = LoggerFactory.getLogger(GenericFileOnCompletion.class); 038 private GenericFileEndpoint<T> endpoint; 039 private GenericFileOperations<T> operations; 040 private ExceptionHandler exceptionHandler; 041 private GenericFile<T> file; 042 private String absoluteFileName; 043 044 public GenericFileOnCompletion(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations, 045 GenericFile<T> file, String absoluteFileName) { 046 this.endpoint = endpoint; 047 this.operations = operations; 048 this.file = file; 049 this.absoluteFileName = absoluteFileName; 050 } 051 052 public void onComplete(Exchange exchange) { 053 onCompletion(exchange); 054 } 055 056 public void onFailure(Exchange exchange) { 057 onCompletion(exchange); 058 } 059 060 public ExceptionHandler getExceptionHandler() { 061 if (exceptionHandler == null) { 062 exceptionHandler = new LoggingExceptionHandler(getClass()); 063 } 064 return exceptionHandler; 065 } 066 067 public void setExceptionHandler(ExceptionHandler exceptionHandler) { 068 this.exceptionHandler = exceptionHandler; 069 } 070 071 protected void onCompletion(Exchange exchange) { 072 GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy(); 073 074 log.debug("Done processing file: {} using exchange: {}", file, exchange); 075 076 // commit or rollback 077 boolean committed = false; 078 try { 079 boolean failed = exchange.isFailed(); 080 if (!failed) { 081 // commit the file strategy if there was no failure or already handled by the DeadLetterChannel 082 processStrategyCommit(processStrategy, exchange, file); 083 committed = true; 084 } 085 // if we failed, then it will be handled by the rollback in the finally block below 086 } finally { 087 if (!committed) { 088 processStrategyRollback(processStrategy, exchange, file); 089 } 090 091 // remove file from the in progress list as its no longer in progress 092 // use the original file name that was used to add it to the repository 093 // as the name can be different when using preMove option 094 endpoint.getInProgressRepository().remove(absoluteFileName); 095 } 096 } 097 098 /** 099 * Strategy when the file was processed and a commit should be executed. 100 * 101 * @param processStrategy the strategy to perform the commit 102 * @param exchange the exchange 103 * @param file the file processed 104 */ 105 protected void processStrategyCommit(GenericFileProcessStrategy<T> processStrategy, 106 Exchange exchange, GenericFile<T> file) { 107 if (endpoint.isIdempotent()) { 108 // only add to idempotent repository if we could process the file 109 endpoint.getIdempotentRepository().add(absoluteFileName); 110 } 111 112 // must be last in batch to delete the done file name 113 // delete done file if used (and not noop=true) 114 boolean complete = exchange.getProperty(Exchange.BATCH_COMPLETE, false, Boolean.class); 115 if (endpoint.getDoneFileName() != null && !endpoint.isNoop() && complete) { 116 // done file must be in same path as the original input file 117 String doneFileName = endpoint.createDoneFileName(absoluteFileName); 118 ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint); 119 120 try { 121 // delete done file 122 boolean deleted = operations.deleteFile(doneFileName); 123 log.trace("Done file: {} was deleted: {}", doneFileName, deleted); 124 if (!deleted) { 125 log.warn("Done file: " + doneFileName + " could not be deleted"); 126 } 127 } catch (Exception e) { 128 handleException(e); 129 } 130 } 131 132 try { 133 log.trace("Commit file strategy: {} for file: {}", processStrategy, file); 134 processStrategy.commit(operations, endpoint, exchange, file); 135 } catch (Exception e) { 136 handleException(e); 137 } 138 } 139 140 /** 141 * Strategy when the file was not processed and a rollback should be executed. 142 * 143 * @param processStrategy the strategy to perform the commit 144 * @param exchange the exchange 145 * @param file the file processed 146 */ 147 protected void processStrategyRollback(GenericFileProcessStrategy<T> processStrategy, 148 Exchange exchange, GenericFile<T> file) { 149 150 if (log.isWarnEnabled()) { 151 log.warn("Rollback file strategy: " + processStrategy + " for file: " + file); 152 } 153 try { 154 processStrategy.rollback(operations, endpoint, exchange, file); 155 } catch (Exception e) { 156 handleException(e); 157 } 158 } 159 160 protected void handleException(Throwable t) { 161 Throwable newt = (t == null) ? new IllegalArgumentException("Handling [null] exception") : t; 162 getExceptionHandler().handleException(newt); 163 } 164 165 @Override 166 public String toString() { 167 return "GenericFileOnCompletion"; 168 } 169 }