001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.file; 018 019import java.io.File; 020import java.util.Map; 021import java.util.concurrent.locks.Lock; 022import java.util.concurrent.locks.ReentrantLock; 023 024import org.apache.camel.Exchange; 025import org.apache.camel.Expression; 026import org.apache.camel.impl.DefaultExchange; 027import org.apache.camel.impl.DefaultProducer; 028import org.apache.camel.util.FileUtil; 029import org.apache.camel.util.LRUCacheFactory; 030import org.apache.camel.util.ObjectHelper; 031import org.apache.camel.util.ServiceHelper; 032import org.apache.camel.util.StringHelper; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * Generic file producer 038 */ 039public class GenericFileProducer<T> extends DefaultProducer { 040 protected final Logger log = LoggerFactory.getLogger(getClass()); 041 protected final GenericFileEndpoint<T> endpoint; 042 protected GenericFileOperations<T> operations; 043 // assume writing to 100 different files concurrently at most for the same file producer 044 private final Map<String, Lock> locks = LRUCacheFactory.newLRUCache(100); 045 046 protected GenericFileProducer(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations) { 047 super(endpoint); 048 this.endpoint = endpoint; 049 this.operations = operations; 050 } 051 052 public String getFileSeparator() { 053 return File.separator; 054 } 055 056 public String normalizePath(String name) { 057 return FileUtil.normalizePath(name); 058 } 059 060 public void process(Exchange exchange) throws Exception { 061 // store any existing file header which we want to keep and propagate 062 final String existing = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class); 063 064 // create the target file name 065 String target = createFileName(exchange); 066 067 // use lock for same file name to avoid concurrent writes to the same file 068 // for example when you concurrently append to the same file 069 Lock lock; 070 synchronized (locks) { 071 lock = locks.get(target); 072 if (lock == null) { 073 lock = new ReentrantLock(); 074 locks.put(target, lock); 075 } 076 } 077 078 lock.lock(); 079 try { 080 processExchange(exchange, target); 081 } finally { 082 // do not remove as the locks cache has an upper bound 083 // this ensure the locks is appropriate reused 084 lock.unlock(); 085 // and remove the write file name header as we only want to use it once (by design) 086 exchange.getIn().removeHeader(Exchange.OVERRULE_FILE_NAME); 087 // and restore existing file name 088 exchange.getIn().setHeader(Exchange.FILE_NAME, existing); 089 } 090 } 091 092 /** 093 * Sets the operations to be used. 094 * <p/> 095 * Can be used to set a fresh operations in case of recovery attempts 096 * 097 * @param operations the operations 098 */ 099 public void setOperations(GenericFileOperations<T> operations) { 100 this.operations = operations; 101 } 102 103 /** 104 * Perform the work to process the fileExchange 105 * 106 * @param exchange fileExchange 107 * @param target the target filename 108 * @throws Exception is thrown if some error 109 */ 110 protected void processExchange(Exchange exchange, String target) throws Exception { 111 log.trace("Processing file: {} for exchange: {}", target, exchange); 112 113 try { 114 preWriteCheck(); 115 116 // should we write to a temporary name and then afterwards rename to real target 117 boolean writeAsTempAndRename = ObjectHelper.isNotEmpty(endpoint.getTempFileName()); 118 String tempTarget = null; 119 // remember if target exists to avoid checking twice 120 Boolean targetExists; 121 if (writeAsTempAndRename) { 122 // compute temporary name with the temp prefix 123 tempTarget = createTempFileName(exchange, target); 124 125 log.trace("Writing using tempNameFile: {}", tempTarget); 126 127 //if we should eager delete target file before deploying temporary file 128 if (endpoint.getFileExist() != GenericFileExist.TryRename && endpoint.isEagerDeleteTargetFile()) { 129 130 // cater for file exists option on the real target as 131 // the file operations code will work on the temp file 132 133 // if an existing file already exists what should we do? 134 targetExists = operations.existsFile(target); 135 if (targetExists) { 136 137 log.trace("EagerDeleteTargetFile, target exists"); 138 139 if (endpoint.getFileExist() == GenericFileExist.Ignore) { 140 // ignore but indicate that the file was written 141 log.trace("An existing file already exists: {}. Ignore and do not override it.", target); 142 return; 143 } else if (endpoint.getFileExist() == GenericFileExist.Fail) { 144 throw new GenericFileOperationFailedException("File already exist: " + target + ". Cannot write new file."); 145 } else if (endpoint.getFileExist() == GenericFileExist.Move) { 146 // move any existing file first 147 doMoveExistingFile(target); 148 } else if (endpoint.isEagerDeleteTargetFile() && endpoint.getFileExist() == GenericFileExist.Override) { 149 // we override the target so we do this by deleting it so the temp file can be renamed later 150 // with success as the existing target file have been deleted 151 log.trace("Eagerly deleting existing file: {}", target); 152 if (!operations.deleteFile(target)) { 153 throw new GenericFileOperationFailedException("Cannot delete file: " + target); 154 } 155 } 156 } 157 } 158 159 // delete any pre existing temp file 160 if (endpoint.getFileExist() != GenericFileExist.TryRename && operations.existsFile(tempTarget)) { 161 log.trace("Deleting existing temp file: {}", tempTarget); 162 if (!operations.deleteFile(tempTarget)) { 163 throw new GenericFileOperationFailedException("Cannot delete file: " + tempTarget); 164 } 165 } 166 } 167 168 // write/upload the file 169 writeFile(exchange, tempTarget != null ? tempTarget : target); 170 171 // if we did write to a temporary name then rename it to the real 172 // name after we have written the file 173 if (tempTarget != null) { 174 // if we did not eager delete the target file 175 if (endpoint.getFileExist() != GenericFileExist.TryRename && !endpoint.isEagerDeleteTargetFile()) { 176 177 // if an existing file already exists what should we do? 178 targetExists = operations.existsFile(target); 179 if (targetExists) { 180 181 log.trace("Not using EagerDeleteTargetFile, target exists"); 182 183 if (endpoint.getFileExist() == GenericFileExist.Ignore) { 184 // ignore but indicate that the file was written 185 log.trace("An existing file already exists: {}. Ignore and do not override it.", target); 186 return; 187 } else if (endpoint.getFileExist() == GenericFileExist.Fail) { 188 throw new GenericFileOperationFailedException("File already exist: " + target + ". Cannot write new file."); 189 } else if (endpoint.getFileExist() == GenericFileExist.Override) { 190 // we override the target so we do this by deleting it so the temp file can be renamed later 191 // with success as the existing target file have been deleted 192 log.trace("Deleting existing file: {}", target); 193 if (!operations.deleteFile(target)) { 194 throw new GenericFileOperationFailedException("Cannot delete file: " + target); 195 } 196 } 197 } 198 } 199 200 // now we are ready to rename the temp file to the target file 201 log.trace("Renaming file: [{}] to: [{}]", tempTarget, target); 202 boolean renamed = operations.renameFile(tempTarget, target); 203 if (!renamed) { 204 throw new GenericFileOperationFailedException("Cannot rename file from: " + tempTarget + " to: " + target); 205 } 206 } 207 208 // any done file to write? 209 if (endpoint.getDoneFileName() != null) { 210 String doneFileName = endpoint.createDoneFileName(target); 211 StringHelper.notEmpty(doneFileName, "doneFileName", endpoint); 212 213 // create empty exchange with empty body to write as the done file 214 Exchange empty = new DefaultExchange(exchange); 215 empty.getIn().setBody(""); 216 217 log.trace("Writing done file: [{}]", doneFileName); 218 // delete any existing done file 219 if (operations.existsFile(doneFileName)) { 220 if (!operations.deleteFile(doneFileName)) { 221 throw new GenericFileOperationFailedException("Cannot delete existing done file: " + doneFileName); 222 } 223 } 224 writeFile(empty, doneFileName); 225 } 226 227 // let's store the name we really used in the header, so end-users 228 // can retrieve it 229 exchange.getIn().setHeader(Exchange.FILE_NAME_PRODUCED, target); 230 } catch (Exception e) { 231 handleFailedWrite(exchange, e); 232 } 233 234 postWriteCheck(exchange); 235 } 236 237 private void doMoveExistingFile(String fileName) throws GenericFileOperationFailedException { 238 // need to evaluate using a dummy and simulate the file first, to have access to all the file attributes 239 // create a dummy exchange as Exchange is needed for expression evaluation 240 // we support only the following 3 tokens. 241 Exchange dummy = endpoint.createExchange(); 242 String parent = FileUtil.onlyPath(fileName); 243 String onlyName = FileUtil.stripPath(fileName); 244 dummy.getIn().setHeader(Exchange.FILE_NAME, fileName); 245 dummy.getIn().setHeader(Exchange.FILE_NAME_ONLY, onlyName); 246 dummy.getIn().setHeader(Exchange.FILE_PARENT, parent); 247 248 String to = endpoint.getMoveExisting().evaluate(dummy, String.class); 249 // we must normalize it (to avoid having both \ and / in the name which confuses java.io.File) 250 to = FileUtil.normalizePath(to); 251 if (ObjectHelper.isEmpty(to)) { 252 throw new GenericFileOperationFailedException("moveExisting evaluated as empty String, cannot move existing file: " + fileName); 253 } 254 255 boolean renamed = operations.renameFile(fileName, to); 256 if (!renamed) { 257 throw new GenericFileOperationFailedException("Cannot rename file from: " + fileName + " to: " + to); 258 } 259 } 260 261 /** 262 * If we fail writing out a file, we will call this method. This hook is 263 * provided to disconnect from servers or clean up files we created (if needed). 264 */ 265 public void handleFailedWrite(Exchange exchange, Exception exception) throws Exception { 266 throw exception; 267 } 268 269 /** 270 * Perform any actions that need to occur before we write such as connecting to an FTP server etc. 271 */ 272 public void preWriteCheck() throws Exception { 273 // nothing needed to check 274 } 275 276 /** 277 * Perform any actions that need to occur after we are done such as disconnecting. 278 */ 279 public void postWriteCheck(Exchange exchange) { 280 // nothing needed to check 281 } 282 283 public void writeFile(Exchange exchange, String fileName) throws GenericFileOperationFailedException { 284 // build directory if auto create is enabled 285 if (endpoint.isAutoCreate()) { 286 // we must normalize it (to avoid having both \ and / in the name which confuses java.io.File) 287 String name = FileUtil.normalizePath(fileName); 288 289 // use java.io.File to compute the file path 290 File file = new File(name); 291 String directory = file.getParent(); 292 boolean absolute = FileUtil.isAbsolute(file); 293 if (directory != null) { 294 if (!operations.buildDirectory(directory, absolute)) { 295 log.debug("Cannot build directory [{}] (could be because of denied permissions)", directory); 296 } 297 } 298 } 299 300 // upload 301 if (log.isTraceEnabled()) { 302 log.trace("About to write [{}] to [{}] from exchange [{}]", fileName, getEndpoint(), exchange); 303 } 304 305 boolean success = operations.storeFile(fileName, exchange, -1); 306 if (!success) { 307 throw new GenericFileOperationFailedException("Error writing file [" + fileName + "]"); 308 } 309 log.debug("Wrote [{}] to [{}]", fileName, getEndpoint()); 310 } 311 312 public String createFileName(Exchange exchange) { 313 String answer; 314 315 // overrule takes precedence 316 Object value; 317 318 Object overrule = exchange.getIn().getHeader(Exchange.OVERRULE_FILE_NAME); 319 if (overrule != null) { 320 if (overrule instanceof Expression) { 321 value = overrule; 322 } else { 323 value = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, overrule); 324 } 325 } else { 326 value = exchange.getIn().getHeader(Exchange.FILE_NAME); 327 } 328 329 // if we have an overrule then override the existing header to use the overrule computed name from this point forward 330 if (overrule != null) { 331 exchange.getIn().setHeader(Exchange.FILE_NAME, value); 332 } 333 334 if (value instanceof String && StringHelper.hasStartToken((String) value, "simple")) { 335 log.warn("Simple expression: {} detected in header: {} of type String. This feature has been removed (see CAMEL-6748).", value, Exchange.FILE_NAME); 336 } 337 338 // expression support 339 Expression expression = endpoint.getFileName(); 340 if (value instanceof Expression) { 341 expression = (Expression) value; 342 } 343 344 // evaluate the name as a String from the value 345 String name; 346 if (expression != null) { 347 log.trace("Filename evaluated as expression: {}", expression); 348 name = expression.evaluate(exchange, String.class); 349 } else { 350 name = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, value); 351 } 352 353 // flatten name 354 if (name != null && endpoint.isFlatten()) { 355 // check for both windows and unix separators 356 int pos = Math.max(name.lastIndexOf("/"), name.lastIndexOf("\\")); 357 if (pos != -1) { 358 name = name.substring(pos + 1); 359 } 360 } 361 362 // compute path by adding endpoint starting directory 363 String endpointPath = endpoint.getConfiguration().getDirectory(); 364 String baseDir = ""; 365 if (endpointPath.length() > 0) { 366 // Its a directory so we should use it as a base path for the filename 367 // If the path isn't empty, we need to add a trailing / if it isn't already there 368 baseDir = endpointPath; 369 boolean trailingSlash = endpointPath.endsWith("/") || endpointPath.endsWith("\\"); 370 if (!trailingSlash) { 371 baseDir += getFileSeparator(); 372 } 373 } 374 if (name != null) { 375 answer = baseDir + name; 376 } else { 377 // use a generated filename if no name provided 378 answer = baseDir + endpoint.getGeneratedFileName(exchange.getIn()); 379 } 380 381 if (endpoint.isJailStartingDirectory()) { 382 // check for file must be within starting directory (need to compact first as the name can be using relative paths via ../ etc) 383 String compatchAnswer = FileUtil.compactPath(answer); 384 String compatchBaseDir = FileUtil.compactPath(baseDir); 385 if (!compatchAnswer.startsWith(compatchBaseDir)) { 386 throw new IllegalArgumentException("Cannot write file with name: " + compatchAnswer + " as the filename is jailed to the starting directory: " + compatchBaseDir); 387 } 388 } 389 390 if (endpoint.getConfiguration().needToNormalize()) { 391 // must normalize path to cater for Windows and other OS 392 answer = normalizePath(answer); 393 } 394 395 return answer; 396 } 397 398 public String createTempFileName(Exchange exchange, String fileName) { 399 String answer = fileName; 400 401 String tempName; 402 if (exchange.getIn().getHeader(Exchange.FILE_NAME) == null) { 403 // its a generated filename then add it to header so we can evaluate the expression 404 exchange.getIn().setHeader(Exchange.FILE_NAME, FileUtil.stripPath(fileName)); 405 tempName = endpoint.getTempFileName().evaluate(exchange, String.class); 406 // and remove it again after evaluation 407 exchange.getIn().removeHeader(Exchange.FILE_NAME); 408 } else { 409 tempName = endpoint.getTempFileName().evaluate(exchange, String.class); 410 } 411 412 // check for both windows and unix separators 413 int pos = Math.max(answer.lastIndexOf("/"), answer.lastIndexOf("\\")); 414 if (pos == -1) { 415 // no path so use temp name as calculated 416 answer = tempName; 417 } else { 418 // path should be prefixed before the temp name 419 StringBuilder sb = new StringBuilder(answer.substring(0, pos + 1)); 420 sb.append(tempName); 421 answer = sb.toString(); 422 } 423 424 if (endpoint.getConfiguration().needToNormalize()) { 425 // must normalize path to cater for Windows and other OS 426 answer = normalizePath(answer); 427 } 428 429 // stack path in case the temporary file uses .. paths 430 answer = FileUtil.compactPath(answer, getFileSeparator()); 431 432 return answer; 433 } 434 435 @Override 436 protected void doStart() throws Exception { 437 ServiceHelper.startService(locks); 438 super.doStart(); 439 } 440 441 @Override 442 protected void doStop() throws Exception { 443 super.doStop(); 444 ServiceHelper.stopService(locks); 445 } 446}