001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.file; 018 019import java.io.File; 020import java.util.concurrent.locks.Lock; 021import java.util.concurrent.locks.ReentrantLock; 022 023import org.apache.camel.Exchange; 024import org.apache.camel.Expression; 025import org.apache.camel.impl.DefaultExchange; 026import org.apache.camel.impl.DefaultProducer; 027import org.apache.camel.util.FileUtil; 028import org.apache.camel.util.LRUCache; 029import org.apache.camel.util.ObjectHelper; 030import org.apache.camel.util.ServiceHelper; 031import org.apache.camel.util.StringHelper; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034/** 035 * Generic file producer 036 */ 037public class GenericFileProducer<T> extends DefaultProducer { 038 protected final Logger log = LoggerFactory.getLogger(getClass()); 039 protected final GenericFileEndpoint<T> endpoint; 040 protected GenericFileOperations<T> operations; 041 // assume writing to 100 different files concurrently at most for the same file producer 042 private final LRUCache<String, Lock> locks = new LRUCache<String, Lock>(100); 043 044 protected GenericFileProducer(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations) { 045 super(endpoint); 046 this.endpoint = endpoint; 047 this.operations = operations; 048 } 049 050 public String getFileSeparator() { 051 return File.separator; 052 } 053 054 public String normalizePath(String name) { 055 return FileUtil.normalizePath(name); 056 } 057 058 public void process(Exchange exchange) throws Exception { 059 // store any existing file header which we want to keep and propagate 060 final String existing = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class); 061 062 // create the target file name 063 String target = createFileName(exchange); 064 065 // use lock for same file name to avoid concurrent writes to the same file 066 // for example when you concurrently append to the same file 067 Lock lock; 068 synchronized (locks) { 069 lock = locks.get(target); 070 if (lock == null) { 071 lock = new ReentrantLock(); 072 locks.put(target, lock); 073 } 074 } 075 076 lock.lock(); 077 try { 078 processExchange(exchange, target); 079 } finally { 080 // do not remove as the locks cache has an upper bound 081 // this ensure the locks is appropriate reused 082 lock.unlock(); 083 // and remove the write file name header as we only want to use it once (by design) 084 exchange.getIn().removeHeader(Exchange.OVERRULE_FILE_NAME); 085 // and restore existing file name 086 exchange.getIn().setHeader(Exchange.FILE_NAME, existing); 087 } 088 } 089 090 /** 091 * Sets the operations to be used. 092 * <p/> 093 * Can be used to set a fresh operations in case of recovery attempts 094 * 095 * @param operations the operations 096 */ 097 public void setOperations(GenericFileOperations<T> operations) { 098 this.operations = operations; 099 } 100 101 /** 102 * Perform the work to process the fileExchange 103 * 104 * @param exchange fileExchange 105 * @param target the target filename 106 * @throws Exception is thrown if some error 107 */ 108 protected void processExchange(Exchange exchange, String target) throws Exception { 109 log.trace("Processing file: {} for exchange: {}", target, exchange); 110 111 try { 112 preWriteCheck(); 113 114 // should we write to a temporary name and then afterwards rename to real target 115 boolean writeAsTempAndRename = ObjectHelper.isNotEmpty(endpoint.getTempFileName()); 116 String tempTarget = null; 117 // remember if target exists to avoid checking twice 118 Boolean targetExists; 119 if (writeAsTempAndRename) { 120 // compute temporary name with the temp prefix 121 tempTarget = createTempFileName(exchange, target); 122 123 log.trace("Writing using tempNameFile: {}", tempTarget); 124 125 //if we should eager delete target file before deploying temporary file 126 if (endpoint.getFileExist() != GenericFileExist.TryRename && endpoint.isEagerDeleteTargetFile()) { 127 128 // cater for file exists option on the real target as 129 // the file operations code will work on the temp file 130 131 // if an existing file already exists what should we do? 132 targetExists = operations.existsFile(target); 133 if (targetExists) { 134 135 log.trace("EagerDeleteTargetFile, target exists"); 136 137 if (endpoint.getFileExist() == GenericFileExist.Ignore) { 138 // ignore but indicate that the file was written 139 log.trace("An existing file already exists: {}. Ignore and do not override it.", target); 140 return; 141 } else if (endpoint.getFileExist() == GenericFileExist.Fail) { 142 throw new GenericFileOperationFailedException("File already exist: " + target + ". Cannot write new file."); 143 } else if (endpoint.getFileExist() == GenericFileExist.Move) { 144 // move any existing file first 145 doMoveExistingFile(target); 146 } else if (endpoint.isEagerDeleteTargetFile() && endpoint.getFileExist() == GenericFileExist.Override) { 147 // we override the target so we do this by deleting it so the temp file can be renamed later 148 // with success as the existing target file have been deleted 149 log.trace("Eagerly deleting existing file: {}", target); 150 if (!operations.deleteFile(target)) { 151 throw new GenericFileOperationFailedException("Cannot delete file: " + target); 152 } 153 } 154 } 155 } 156 157 // delete any pre existing temp file 158 if (operations.existsFile(tempTarget)) { 159 log.trace("Deleting existing temp file: {}", tempTarget); 160 if (!operations.deleteFile(tempTarget)) { 161 throw new GenericFileOperationFailedException("Cannot delete file: " + tempTarget); 162 } 163 } 164 } 165 166 // write/upload the file 167 writeFile(exchange, tempTarget != null ? tempTarget : target); 168 169 // if we did write to a temporary name then rename it to the real 170 // name after we have written the file 171 if (tempTarget != null) { 172 // if we did not eager delete the target file 173 if (endpoint.getFileExist() != GenericFileExist.TryRename && !endpoint.isEagerDeleteTargetFile()) { 174 175 // if an existing file already exists what should we do? 176 targetExists = operations.existsFile(target); 177 if (targetExists) { 178 179 log.trace("Not using EagerDeleteTargetFile, target exists"); 180 181 if (endpoint.getFileExist() == GenericFileExist.Ignore) { 182 // ignore but indicate that the file was written 183 log.trace("An existing file already exists: {}. Ignore and do not override it.", target); 184 return; 185 } else if (endpoint.getFileExist() == GenericFileExist.Fail) { 186 throw new GenericFileOperationFailedException("File already exist: " + target + ". Cannot write new file."); 187 } else if (endpoint.getFileExist() == GenericFileExist.Override) { 188 // we override the target so we do this by deleting it so the temp file can be renamed later 189 // with success as the existing target file have been deleted 190 log.trace("Deleting existing file: {}", target); 191 if (!operations.deleteFile(target)) { 192 throw new GenericFileOperationFailedException("Cannot delete file: " + target); 193 } 194 } 195 } 196 } 197 198 // now we are ready to rename the temp file to the target file 199 log.trace("Renaming file: [{}] to: [{}]", tempTarget, target); 200 boolean renamed = operations.renameFile(tempTarget, target); 201 if (!renamed) { 202 throw new GenericFileOperationFailedException("Cannot rename file from: " + tempTarget + " to: " + target); 203 } 204 } 205 206 // any done file to write? 207 if (endpoint.getDoneFileName() != null) { 208 String doneFileName = endpoint.createDoneFileName(target); 209 ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint); 210 211 // create empty exchange with empty body to write as the done file 212 Exchange empty = new DefaultExchange(exchange); 213 empty.getIn().setBody(""); 214 215 log.trace("Writing done file: [{}]", doneFileName); 216 // delete any existing done file 217 if (operations.existsFile(doneFileName)) { 218 if (!operations.deleteFile(doneFileName)) { 219 throw new GenericFileOperationFailedException("Cannot delete existing done file: " + doneFileName); 220 } 221 } 222 writeFile(empty, doneFileName); 223 } 224 225 // let's store the name we really used in the header, so end-users 226 // can retrieve it 227 exchange.getIn().setHeader(Exchange.FILE_NAME_PRODUCED, target); 228 } catch (Exception e) { 229 handleFailedWrite(exchange, e); 230 } 231 232 postWriteCheck(); 233 } 234 235 private void doMoveExistingFile(String fileName) throws GenericFileOperationFailedException { 236 // need to evaluate using a dummy and simulate the file first, to have access to all the file attributes 237 // create a dummy exchange as Exchange is needed for expression evaluation 238 // we support only the following 3 tokens. 239 Exchange dummy = endpoint.createExchange(); 240 String parent = FileUtil.onlyPath(fileName); 241 String onlyName = FileUtil.stripPath(fileName); 242 dummy.getIn().setHeader(Exchange.FILE_NAME, fileName); 243 dummy.getIn().setHeader(Exchange.FILE_NAME_ONLY, onlyName); 244 dummy.getIn().setHeader(Exchange.FILE_PARENT, parent); 245 246 String to = endpoint.getMoveExisting().evaluate(dummy, String.class); 247 // we must normalize it (to avoid having both \ and / in the name which confuses java.io.File) 248 to = FileUtil.normalizePath(to); 249 if (ObjectHelper.isEmpty(to)) { 250 throw new GenericFileOperationFailedException("moveExisting evaluated as empty String, cannot move existing file: " + fileName); 251 } 252 String parentTo = FileUtil.onlyPath(to); 253 boolean absolute = FileUtil.isAbsolute(new File(parentTo)); 254 if (!absolute && !parentTo.startsWith(parent)) { 255 parentTo = parent + File.separator + parentTo; 256 String fileonlyname = FileUtil.stripPath(to); 257 to = parentTo + File.separator + (ObjectHelper.isNotEmpty(fileonlyname) ? fileonlyname : onlyName); 258 } 259 operations.buildDirectory(parentTo, absolute); 260 boolean renamed = operations.renameFile(fileName, to); 261 262 if (!renamed) { 263 throw new GenericFileOperationFailedException("Cannot rename file from: " + fileName + " to: " + to); 264 } 265 } 266 267 /** 268 * If we fail writing out a file, we will call this method. This hook is 269 * provided to disconnect from servers or clean up files we created (if needed). 270 */ 271 public void handleFailedWrite(Exchange exchange, Exception exception) throws Exception { 272 throw exception; 273 } 274 275 /** 276 * Perform any actions that need to occur before we write such as connecting to an FTP server etc. 277 */ 278 public void preWriteCheck() throws Exception { 279 // nothing needed to check 280 } 281 282 /** 283 * Perform any actions that need to occur after we are done such as disconnecting. 284 */ 285 public void postWriteCheck() { 286 // nothing needed to check 287 } 288 289 public void writeFile(Exchange exchange, String fileName) throws GenericFileOperationFailedException { 290 // build directory if auto create is enabled 291 if (endpoint.isAutoCreate()) { 292 // we must normalize it (to avoid having both \ and / in the name which confuses java.io.File) 293 String name = FileUtil.normalizePath(fileName); 294 295 // use java.io.File to compute the file path 296 File file = new File(name); 297 String directory = file.getParent(); 298 boolean absolute = FileUtil.isAbsolute(file); 299 if (directory != null) { 300 if (!operations.buildDirectory(directory, absolute)) { 301 log.debug("Cannot build directory [{}] (could be because of denied permissions)", directory); 302 } 303 } 304 } 305 306 // upload 307 if (log.isTraceEnabled()) { 308 log.trace("About to write [{}] to [{}] from exchange [{}]", new Object[]{fileName, getEndpoint(), exchange}); 309 } 310 311 boolean success = operations.storeFile(fileName, exchange); 312 if (!success) { 313 throw new GenericFileOperationFailedException("Error writing file [" + fileName + "]"); 314 } 315 log.debug("Wrote [{}] to [{}]", fileName, getEndpoint()); 316 } 317 318 public String createFileName(Exchange exchange) { 319 String answer; 320 321 // overrule takes precedence 322 Object value; 323 324 Object overrule = exchange.getIn().getHeader(Exchange.OVERRULE_FILE_NAME); 325 if (overrule != null) { 326 if (overrule instanceof Expression) { 327 value = overrule; 328 } else { 329 value = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, overrule); 330 } 331 } else { 332 value = exchange.getIn().getHeader(Exchange.FILE_NAME); 333 } 334 335 // if we have an overrule then override the existing header to use the overrule computed name from this point forward 336 if (overrule != null) { 337 exchange.getIn().setHeader(Exchange.FILE_NAME, value); 338 } 339 340 if (value != null && value instanceof String && StringHelper.hasStartToken((String) value, "simple")) { 341 log.warn("Simple expression: {} detected in header: {} of type String. This feature has been removed (see CAMEL-6748).", value, Exchange.FILE_NAME); 342 } 343 344 // expression support 345 Expression expression = endpoint.getFileName(); 346 if (value != null && value instanceof Expression) { 347 expression = (Expression) value; 348 } 349 350 // evaluate the name as a String from the value 351 String name; 352 if (expression != null) { 353 log.trace("Filename evaluated as expression: {}", expression); 354 name = expression.evaluate(exchange, String.class); 355 } else { 356 name = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, value); 357 } 358 359 // flatten name 360 if (name != null && endpoint.isFlatten()) { 361 // check for both windows and unix separators 362 int pos = Math.max(name.lastIndexOf("/"), name.lastIndexOf("\\")); 363 if (pos != -1) { 364 name = name.substring(pos + 1); 365 } 366 } 367 368 // compute path by adding endpoint starting directory 369 String endpointPath = endpoint.getConfiguration().getDirectory(); 370 String baseDir = ""; 371 if (endpointPath.length() > 0) { 372 // Its a directory so we should use it as a base path for the filename 373 // If the path isn't empty, we need to add a trailing / if it isn't already there 374 baseDir = endpointPath; 375 boolean trailingSlash = endpointPath.endsWith("/") || endpointPath.endsWith("\\"); 376 if (!trailingSlash) { 377 baseDir += getFileSeparator(); 378 } 379 } 380 if (name != null) { 381 answer = baseDir + name; 382 } else { 383 // use a generated filename if no name provided 384 answer = baseDir + endpoint.getGeneratedFileName(exchange.getIn()); 385 } 386 387 if (endpoint.getConfiguration().needToNormalize()) { 388 // must normalize path to cater for Windows and other OS 389 answer = normalizePath(answer); 390 } 391 392 return answer; 393 } 394 395 public String createTempFileName(Exchange exchange, String fileName) { 396 String answer = fileName; 397 398 String tempName; 399 if (exchange.getIn().getHeader(Exchange.FILE_NAME) == null) { 400 // its a generated filename then add it to header so we can evaluate the expression 401 exchange.getIn().setHeader(Exchange.FILE_NAME, FileUtil.stripPath(fileName)); 402 tempName = endpoint.getTempFileName().evaluate(exchange, String.class); 403 // and remove it again after evaluation 404 exchange.getIn().removeHeader(Exchange.FILE_NAME); 405 } else { 406 tempName = endpoint.getTempFileName().evaluate(exchange, String.class); 407 } 408 409 // check for both windows and unix separators 410 int pos = Math.max(answer.lastIndexOf("/"), answer.lastIndexOf("\\")); 411 if (pos == -1) { 412 // no path so use temp name as calculated 413 answer = tempName; 414 } else { 415 // path should be prefixed before the temp name 416 StringBuilder sb = new StringBuilder(answer.substring(0, pos + 1)); 417 sb.append(tempName); 418 answer = sb.toString(); 419 } 420 421 if (endpoint.getConfiguration().needToNormalize()) { 422 // must normalize path to cater for Windows and other OS 423 answer = normalizePath(answer); 424 } 425 426 return answer; 427 } 428 429 @Override 430 protected void doStart() throws Exception { 431 super.doStart(); 432 ServiceHelper.startService(locks); 433 } 434 435 @Override 436 protected void doStop() throws Exception { 437 ServiceHelper.stopService(locks); 438 super.doStop(); 439 } 440}