001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.file; 018 019 import java.io.File; 020 import java.util.concurrent.locks.Lock; 021 import java.util.concurrent.locks.ReentrantLock; 022 023 import org.apache.camel.Exchange; 024 import org.apache.camel.Expression; 025 import org.apache.camel.impl.DefaultExchange; 026 import org.apache.camel.impl.DefaultProducer; 027 import org.apache.camel.spi.Language; 028 import org.apache.camel.util.FileUtil; 029 import org.apache.camel.util.LRUCache; 030 import org.apache.camel.util.ObjectHelper; 031 import org.apache.camel.util.ServiceHelper; 032 import org.apache.camel.util.StringHelper; 033 import org.slf4j.Logger; 034 import org.slf4j.LoggerFactory; 035 036 /** 037 * Generic file producer 038 */ 039 public class GenericFileProducer<T> extends DefaultProducer { 040 protected final transient Logger log = LoggerFactory.getLogger(getClass()); 041 protected final GenericFileEndpoint<T> endpoint; 042 protected GenericFileOperations<T> operations; 043 // assume writing to 100 different files concurrently at most for the same file producer 044 private final LRUCache<String, Lock> locks = new LRUCache<String, Lock>(100); 045 046 protected GenericFileProducer(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations) { 047 super(endpoint); 048 this.endpoint = endpoint; 049 this.operations = operations; 050 } 051 052 public String getFileSeparator() { 053 return File.separator; 054 } 055 056 public String normalizePath(String name) { 057 return FileUtil.normalizePath(name); 058 } 059 060 public void process(Exchange exchange) throws Exception { 061 String target = createFileName(exchange); 062 063 // use lock for same file name to avoid concurrent writes to the same file 064 // for example when you concurrently append to the same file 065 Lock lock; 066 synchronized (locks) { 067 lock = locks.get(target); 068 if (lock == null) { 069 lock = new ReentrantLock(); 070 locks.put(target, lock); 071 } 072 } 073 074 lock.lock(); 075 try { 076 processExchange(exchange, target); 077 } finally { 078 // do not remove as the locks cache has an upper bound 079 // this ensure the locks is appropriate reused 080 lock.unlock(); 081 } 082 } 083 084 /** 085 * Sets the operations to be used. 086 * <p/> 087 * Can be used to set a fresh operations in case of recovery attempts 088 * 089 * @param operations the operations 090 */ 091 public void setOperations(GenericFileOperations<T> operations) { 092 this.operations = operations; 093 } 094 095 /** 096 * Perform the work to process the fileExchange 097 * 098 * @param exchange fileExchange 099 * @param target the target filename 100 * @throws Exception is thrown if some error 101 */ 102 protected void processExchange(Exchange exchange, String target) throws Exception { 103 log.trace("Processing file: {} for exchange: {}", target, exchange); 104 105 try { 106 preWriteCheck(); 107 108 // should we write to a temporary name and then afterwards rename to real target 109 boolean writeAsTempAndRename = ObjectHelper.isNotEmpty(endpoint.getTempFileName()); 110 String tempTarget = null; 111 // remember if target exists to avoid checking twice 112 Boolean targetExists = null; 113 if (writeAsTempAndRename) { 114 // compute temporary name with the temp prefix 115 tempTarget = createTempFileName(exchange, target); 116 117 log.trace("Writing using tempNameFile: {}", tempTarget); 118 119 // cater for file exists option on the real target as 120 // the file operations code will work on the temp file 121 122 // if an existing file already exists what should we do? 123 targetExists = operations.existsFile(target); 124 if (targetExists) { 125 if (endpoint.getFileExist() == GenericFileExist.Ignore) { 126 // ignore but indicate that the file was written 127 log.trace("An existing file already exists: {}. Ignore and do not override it.", target); 128 return; 129 } else if (endpoint.getFileExist() == GenericFileExist.Fail) { 130 throw new GenericFileOperationFailedException("File already exist: " + target + ". Cannot write new file."); 131 } else if (endpoint.isEagerDeleteTargetFile() && endpoint.getFileExist() == GenericFileExist.Override) { 132 // we override the target so we do this by deleting it so the temp file can be renamed later 133 // with success as the existing target file have been deleted 134 log.trace("Eagerly deleting existing file: {}", target); 135 if (!operations.deleteFile(target)) { 136 throw new GenericFileOperationFailedException("Cannot delete file: " + target); 137 } 138 } 139 } 140 141 // delete any pre existing temp file 142 if (operations.existsFile(tempTarget)) { 143 log.trace("Deleting existing temp file: {}", tempTarget); 144 if (!operations.deleteFile(tempTarget)) { 145 throw new GenericFileOperationFailedException("Cannot delete file: " + tempTarget); 146 } 147 } 148 } 149 150 // write/upload the file 151 writeFile(exchange, tempTarget != null ? tempTarget : target); 152 153 // if we did write to a temporary name then rename it to the real 154 // name after we have written the file 155 if (tempTarget != null) { 156 157 // if we should not eager delete the target file then do it now just before renaming 158 if (!endpoint.isEagerDeleteTargetFile() && targetExists 159 && endpoint.getFileExist() == GenericFileExist.Override) { 160 // we override the target so we do this by deleting it so the temp file can be renamed later 161 // with success as the existing target file have been deleted 162 log.trace("Deleting existing file: {}", target); 163 if (!operations.deleteFile(target)) { 164 throw new GenericFileOperationFailedException("Cannot delete file: " + target); 165 } 166 } 167 168 // now we are ready to rename the temp file to the target file 169 log.trace("Renaming file: [{}] to: [{}]", tempTarget, target); 170 boolean renamed = operations.renameFile(tempTarget, target); 171 if (!renamed) { 172 throw new GenericFileOperationFailedException("Cannot rename file from: " + tempTarget + " to: " + target); 173 } 174 } 175 176 // any done file to write? 177 if (endpoint.getDoneFileName() != null) { 178 String doneFileName = endpoint.createDoneFileName(target); 179 ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint); 180 181 // create empty exchange with empty body to write as the done file 182 Exchange empty = new DefaultExchange(exchange); 183 empty.getIn().setBody(""); 184 185 log.trace("Writing done file: [{}]", doneFileName); 186 // delete any existing done file 187 if (operations.existsFile(doneFileName)) { 188 if (!operations.deleteFile(doneFileName)) { 189 throw new GenericFileOperationFailedException("Cannot delete existing done file: " + doneFileName); 190 } 191 } 192 writeFile(empty, doneFileName); 193 } 194 195 // let's store the name we really used in the header, so end-users 196 // can retrieve it 197 exchange.getIn().setHeader(Exchange.FILE_NAME_PRODUCED, target); 198 } catch (Exception e) { 199 handleFailedWrite(exchange, e); 200 } 201 202 postWriteCheck(); 203 } 204 205 /** 206 * If we fail writing out a file, we will call this method. This hook is 207 * provided to disconnect from servers or clean up files we created (if needed). 208 */ 209 public void handleFailedWrite(Exchange exchange, Exception exception) throws Exception { 210 throw exception; 211 } 212 213 /** 214 * Perform any actions that need to occur before we write such as connecting to an FTP server etc. 215 */ 216 public void preWriteCheck() throws Exception { 217 // nothing needed to check 218 } 219 220 /** 221 * Perform any actions that need to occur after we are done such as disconnecting. 222 */ 223 public void postWriteCheck() { 224 // nothing needed to check 225 } 226 227 public void writeFile(Exchange exchange, String fileName) throws GenericFileOperationFailedException { 228 // build directory if auto create is enabled 229 if (endpoint.isAutoCreate()) { 230 // we must normalize it (to avoid having both \ and / in the name which confuses java.io.File) 231 String name = FileUtil.normalizePath(fileName); 232 233 // use java.io.File to compute the file path 234 File file = new File(name); 235 String directory = file.getParent(); 236 boolean absolute = FileUtil.isAbsolute(file); 237 if (directory != null) { 238 if (!operations.buildDirectory(directory, absolute)) { 239 log.debug("Cannot build directory [{}] (could be because of denied permissions)", directory); 240 } 241 } 242 } 243 244 // upload 245 if (log.isTraceEnabled()) { 246 log.trace("About to write [{}] to [{}] from exchange [{}]", new Object[]{fileName, getEndpoint(), exchange}); 247 } 248 249 boolean success = operations.storeFile(fileName, exchange); 250 if (!success) { 251 throw new GenericFileOperationFailedException("Error writing file [" + fileName + "]"); 252 } 253 log.debug("Wrote [{}] to [{}]", fileName, getEndpoint()); 254 } 255 256 public String createFileName(Exchange exchange) { 257 String answer; 258 259 String name = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class); 260 261 // expression support 262 Expression expression = endpoint.getFileName(); 263 if (name != null) { 264 // the header name can be an expression too, that should override 265 // whatever configured on the endpoint 266 if (StringHelper.hasStartToken(name, "simple")) { 267 log.trace("{} contains a Simple expression: {}", Exchange.FILE_NAME, name); 268 Language language = getEndpoint().getCamelContext().resolveLanguage("file"); 269 expression = language.createExpression(name); 270 } 271 } 272 if (expression != null) { 273 log.trace("Filename evaluated as expression: {}", expression); 274 name = expression.evaluate(exchange, String.class); 275 } 276 277 // flatten name 278 if (name != null && endpoint.isFlatten()) { 279 // check for both windows and unix separators 280 int pos = Math.max(name.lastIndexOf("/"), name.lastIndexOf("\\")); 281 if (pos != -1) { 282 name = name.substring(pos + 1); 283 } 284 } 285 286 // compute path by adding endpoint starting directory 287 String endpointPath = endpoint.getConfiguration().getDirectory(); 288 String baseDir = ""; 289 if (endpointPath.length() > 0) { 290 // Its a directory so we should use it as a base path for the filename 291 // If the path isn't empty, we need to add a trailing / if it isn't already there 292 baseDir = endpointPath; 293 boolean trailingSlash = endpointPath.endsWith("/") || endpointPath.endsWith("\\"); 294 if (!trailingSlash) { 295 baseDir += getFileSeparator(); 296 } 297 } 298 if (name != null) { 299 answer = baseDir + name; 300 } else { 301 // use a generated filename if no name provided 302 answer = baseDir + endpoint.getGeneratedFileName(exchange.getIn()); 303 } 304 305 if (endpoint.getConfiguration().needToNormalize()) { 306 // must normalize path to cater for Windows and other OS 307 answer = normalizePath(answer); 308 } 309 310 return answer; 311 } 312 313 public String createTempFileName(Exchange exchange, String fileName) { 314 String answer = fileName; 315 316 String tempName; 317 if (exchange.getIn().getHeader(Exchange.FILE_NAME) == null) { 318 // its a generated filename then add it to header so we can evaluate the expression 319 exchange.getIn().setHeader(Exchange.FILE_NAME, FileUtil.stripPath(fileName)); 320 tempName = endpoint.getTempFileName().evaluate(exchange, String.class); 321 // and remove it again after evaluation 322 exchange.getIn().removeHeader(Exchange.FILE_NAME); 323 } else { 324 tempName = endpoint.getTempFileName().evaluate(exchange, String.class); 325 } 326 327 // check for both windows and unix separators 328 int pos = Math.max(answer.lastIndexOf("/"), answer.lastIndexOf("\\")); 329 if (pos == -1) { 330 // no path so use temp name as calculated 331 answer = tempName; 332 } else { 333 // path should be prefixed before the temp name 334 StringBuilder sb = new StringBuilder(answer.substring(0, pos + 1)); 335 sb.append(tempName); 336 answer = sb.toString(); 337 } 338 339 if (endpoint.getConfiguration().needToNormalize()) { 340 // must normalize path to cater for Windows and other OS 341 answer = normalizePath(answer); 342 } 343 344 return answer; 345 } 346 347 @Override 348 protected void doStart() throws Exception { 349 super.doStart(); 350 ServiceHelper.startService(locks); 351 } 352 353 @Override 354 protected void doStop() throws Exception { 355 ServiceHelper.stopService(locks); 356 super.doStop(); 357 } 358 }