001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.file.strategy; 018 019import java.io.File; 020import java.io.IOException; 021 022import org.apache.camel.CamelContext; 023import org.apache.camel.CamelContextAware; 024import org.apache.camel.Exchange; 025import org.apache.camel.component.file.GenericFile; 026import org.apache.camel.component.file.GenericFileEndpoint; 027import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy; 028import org.apache.camel.component.file.GenericFileOperationFailedException; 029import org.apache.camel.component.file.GenericFileOperations; 030import org.apache.camel.component.file.GenericFileProcessStrategy; 031import org.apache.camel.support.ServiceSupport; 032import org.apache.camel.util.FileUtil; 033import org.apache.camel.util.ServiceHelper; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037/** 038 * Base class for implementations of {@link GenericFileProcessStrategy}. 039 */ 040public abstract class GenericFileProcessStrategySupport<T> extends ServiceSupport implements GenericFileProcessStrategy<T>, CamelContextAware { 041 protected final Logger log = LoggerFactory.getLogger(getClass()); 042 protected GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy; 043 protected CamelContext camelContext; 044 045 @Override 046 public CamelContext getCamelContext() { 047 return camelContext; 048 } 049 050 @Override 051 public void setCamelContext(CamelContext camelContext) { 052 this.camelContext = camelContext; 053 } 054 055 public void prepareOnStartup(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint) throws Exception { 056 if (exclusiveReadLockStrategy != null) { 057 exclusiveReadLockStrategy.prepareOnStartup(operations, endpoint); 058 } 059 } 060 061 public boolean begin(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { 062 // if we use exclusive read then acquire the exclusive read (waiting until we got it) 063 if (exclusiveReadLockStrategy != null) { 064 boolean lock = exclusiveReadLockStrategy.acquireExclusiveReadLock(operations, file, exchange); 065 if (!lock) { 066 // do not begin since we could not get the exclusive read lock 067 return false; 068 } 069 } 070 071 return true; 072 } 073 074 public void abort(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { 075 deleteLocalWorkFile(exchange); 076 operations.releaseRetrievedFileResources(exchange); 077 078 // must release lock last 079 if (exclusiveReadLockStrategy != null) { 080 exclusiveReadLockStrategy.releaseExclusiveReadLockOnAbort(operations, file, exchange); 081 } 082 } 083 084 public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { 085 deleteLocalWorkFile(exchange); 086 operations.releaseRetrievedFileResources(exchange); 087 088 // must release lock last 089 if (exclusiveReadLockStrategy != null) { 090 exclusiveReadLockStrategy.releaseExclusiveReadLockOnCommit(operations, file, exchange); 091 } 092 } 093 094 public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { 095 deleteLocalWorkFile(exchange); 096 operations.releaseRetrievedFileResources(exchange); 097 098 // must release lock last 099 if (exclusiveReadLockStrategy != null) { 100 exclusiveReadLockStrategy.releaseExclusiveReadLockOnRollback(operations, file, exchange); 101 } 102 } 103 104 public GenericFileExclusiveReadLockStrategy<T> getExclusiveReadLockStrategy() { 105 return exclusiveReadLockStrategy; 106 } 107 108 public void setExclusiveReadLockStrategy(GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy) { 109 this.exclusiveReadLockStrategy = exclusiveReadLockStrategy; 110 } 111 112 protected GenericFile<T> renameFile(GenericFileOperations<T> operations, GenericFile<T> from, GenericFile<T> to) throws IOException { 113 // deleting any existing files before renaming 114 try { 115 operations.deleteFile(to.getAbsoluteFilePath()); 116 } catch (GenericFileOperationFailedException e) { 117 // ignore the file does not exists 118 } 119 120 // make parent folder if missing 121 boolean mkdir = operations.buildDirectory(to.getParent(), to.isAbsolute()); 122 123 if (!mkdir) { 124 throw new GenericFileOperationFailedException("Cannot create directory: " + to.getParent() + " (could be because of denied permissions)"); 125 } 126 127 log.debug("Renaming file: {} to: {}", from, to); 128 boolean renamed = operations.renameFile(from.getAbsoluteFilePath(), to.getAbsoluteFilePath()); 129 if (!renamed) { 130 throw new GenericFileOperationFailedException("Cannot rename file: " + from + " to: " + to); 131 } 132 133 return to; 134 } 135 136 protected void deleteLocalWorkFile(Exchange exchange) { 137 // delete local work file, if it was used (eg by ftp component) 138 File local = exchange.getIn().getHeader(Exchange.FILE_LOCAL_WORK_PATH, File.class); 139 if (local != null && local.exists()) { 140 boolean deleted = FileUtil.deleteFile(local); 141 log.trace("Local work file: {} was deleted: {}", local, deleted); 142 } 143 } 144 145 @Override 146 protected void doStart() throws Exception { 147 if (exclusiveReadLockStrategy instanceof CamelContextAware) { 148 ((CamelContextAware) exclusiveReadLockStrategy).setCamelContext(camelContext); 149 } 150 ServiceHelper.startService(exclusiveReadLockStrategy); 151 } 152 153 @Override 154 protected void doStop() throws Exception { 155 ServiceHelper.stopService(exclusiveReadLockStrategy); 156 } 157 158 @Override 159 protected void doShutdown() throws Exception { 160 ServiceHelper.stopAndShutdownService(exclusiveReadLockStrategy); 161 } 162} 163