001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.file.strategy; 018 019import java.io.File; 020import java.util.Map; 021import java.util.concurrent.ScheduledExecutorService; 022 023import org.apache.camel.CamelContext; 024import org.apache.camel.Expression; 025import org.apache.camel.LoggingLevel; 026import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy; 027import org.apache.camel.component.file.GenericFileProcessStrategy; 028import org.apache.camel.spi.IdempotentRepository; 029import org.apache.camel.spi.Language; 030import org.apache.camel.util.ObjectHelper; 031 032public final class FileProcessStrategyFactory { 033 034 private FileProcessStrategyFactory() { 035 } 036 037 public static GenericFileProcessStrategy<File> createGenericFileProcessStrategy(CamelContext context, Map<String, Object> params) { 038 039 // We assume a value is present only if its value not null for String and 'true' for boolean 040 Expression moveExpression = (Expression) params.get("move"); 041 Expression moveFailedExpression = (Expression) params.get("moveFailed"); 042 Expression preMoveExpression = (Expression) params.get("preMove"); 043 boolean isNoop = params.get("noop") != null; 044 boolean isDelete = params.get("delete") != null; 045 boolean isMove = moveExpression != null || preMoveExpression != null || moveFailedExpression != null; 046 047 if (isDelete) { 048 GenericFileDeleteProcessStrategy<File> strategy = new GenericFileDeleteProcessStrategy<>(); 049 strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params)); 050 if (preMoveExpression != null) { 051 GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<>(); 052 renamer.setExpression(preMoveExpression); 053 strategy.setBeginRenamer(renamer); 054 } 055 if (moveFailedExpression != null) { 056 GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<>(); 057 renamer.setExpression(moveFailedExpression); 058 strategy.setFailureRenamer(renamer); 059 } 060 return strategy; 061 } else if (isMove || isNoop) { 062 GenericFileRenameProcessStrategy<File> strategy = new GenericFileRenameProcessStrategy<>(); 063 strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params)); 064 if (!isNoop) { 065 // move on commit is only possible if not noop 066 if (moveExpression != null) { 067 GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<>(); 068 renamer.setExpression(moveExpression); 069 strategy.setCommitRenamer(renamer); 070 } else { 071 strategy.setCommitRenamer(getDefaultCommitRenamer(context)); 072 } 073 } 074 // both move and noop supports pre move 075 if (preMoveExpression != null) { 076 GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<>(); 077 renamer.setExpression(preMoveExpression); 078 strategy.setBeginRenamer(renamer); 079 } 080 // both move and noop supports move failed 081 if (moveFailedExpression != null) { 082 GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<>(); 083 renamer.setExpression(moveFailedExpression); 084 strategy.setFailureRenamer(renamer); 085 } 086 return strategy; 087 } else { 088 // default strategy will move files in a .camel/ subfolder where the file was consumed 089 GenericFileRenameProcessStrategy<File> strategy = new GenericFileRenameProcessStrategy<>(); 090 strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params)); 091 strategy.setCommitRenamer(getDefaultCommitRenamer(context)); 092 return strategy; 093 } 094 } 095 096 private static GenericFileExpressionRenamer<File> getDefaultCommitRenamer(CamelContext context) { 097 // use context to lookup language to let it be loose coupled 098 Language language = context.resolveLanguage("file"); 099 Expression expression = language.createExpression("${file:parent}/.camel/${file:onlyname}"); 100 return new GenericFileExpressionRenamer<>(expression); 101 } 102 103 @SuppressWarnings("unchecked") 104 private static GenericFileExclusiveReadLockStrategy<File> getExclusiveReadLockStrategy(Map<String, Object> params) { 105 GenericFileExclusiveReadLockStrategy<File> strategy = (GenericFileExclusiveReadLockStrategy<File>) params.get("exclusiveReadLockStrategy"); 106 if (strategy != null) { 107 return strategy; 108 } 109 110 // no explicit strategy set then fallback to readLock option 111 String readLock = (String) params.get("readLock"); 112 if (ObjectHelper.isNotEmpty(readLock)) { 113 if ("none".equals(readLock) || "false".equals(readLock)) { 114 return null; 115 } else if ("markerFile".equals(readLock)) { 116 strategy = new MarkerFileExclusiveReadLockStrategy(); 117 } else if ("fileLock".equals(readLock)) { 118 strategy = new FileLockExclusiveReadLockStrategy(); 119 } else if ("rename".equals(readLock)) { 120 strategy = new FileRenameExclusiveReadLockStrategy(); 121 } else if ("changed".equals(readLock)) { 122 FileChangedExclusiveReadLockStrategy readLockStrategy = new FileChangedExclusiveReadLockStrategy(); 123 Long minLength = (Long) params.get("readLockMinLength"); 124 if (minLength != null) { 125 readLockStrategy.setMinLength(minLength); 126 } 127 Long minAge = (Long) params.get("readLockMinAge"); 128 if (null != minAge) { 129 readLockStrategy.setMinAge(minAge); 130 } 131 strategy = readLockStrategy; 132 } else if ("idempotent".equals(readLock)) { 133 FileIdempotentRepositoryReadLockStrategy readLockStrategy = new FileIdempotentRepositoryReadLockStrategy(); 134 Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback"); 135 if (readLockRemoveOnRollback != null) { 136 readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback); 137 } 138 Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit"); 139 if (readLockRemoveOnCommit != null) { 140 readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit); 141 } 142 IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository"); 143 if (repo != null) { 144 readLockStrategy.setIdempotentRepository(repo); 145 } 146 Integer readLockIdempotentReleaseDelay = (Integer) params.get("readLockIdempotentReleaseDelay"); 147 if (readLockIdempotentReleaseDelay != null) { 148 readLockStrategy.setReadLockIdempotentReleaseDelay(readLockIdempotentReleaseDelay); 149 } 150 Boolean readLockIdempotentReleaseAsync = (Boolean) params.get("readLockIdempotentReleaseAsync"); 151 if (readLockIdempotentReleaseAsync != null) { 152 readLockStrategy.setReadLockIdempotentReleaseAsync(readLockIdempotentReleaseAsync); 153 } 154 Integer readLockIdempotentReleaseAsyncPoolSize = (Integer) params.get("readLockIdempotentReleaseAsyncPoolSize"); 155 if (readLockIdempotentReleaseAsyncPoolSize != null) { 156 readLockStrategy.setReadLockIdempotentReleaseAsyncPoolSize(readLockIdempotentReleaseAsyncPoolSize); 157 } 158 ScheduledExecutorService readLockIdempotentReleaseExecutorService = (ScheduledExecutorService) params.get("readLockIdempotentReleaseExecutorService"); 159 if (readLockIdempotentReleaseExecutorService != null) { 160 readLockStrategy.setReadLockIdempotentReleaseExecutorService(readLockIdempotentReleaseExecutorService); 161 } 162 strategy = readLockStrategy; 163 } else if ("idempotent-changed".equals(readLock)) { 164 FileIdempotentChangedRepositoryReadLockStrategy readLockStrategy = new FileIdempotentChangedRepositoryReadLockStrategy(); 165 Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback"); 166 if (readLockRemoveOnRollback != null) { 167 readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback); 168 } 169 Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit"); 170 if (readLockRemoveOnCommit != null) { 171 readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit); 172 } 173 IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository"); 174 if (repo != null) { 175 readLockStrategy.setIdempotentRepository(repo); 176 } 177 Long minLength = (Long) params.get("readLockMinLength"); 178 if (minLength != null) { 179 readLockStrategy.setMinLength(minLength); 180 } 181 Long minAge = (Long) params.get("readLockMinAge"); 182 if (null != minAge) { 183 readLockStrategy.setMinAge(minAge); 184 } 185 Integer readLockIdempotentReleaseDelay = (Integer) params.get("readLockIdempotentReleaseDelay"); 186 if (readLockIdempotentReleaseDelay != null) { 187 readLockStrategy.setReadLockIdempotentReleaseDelay(readLockIdempotentReleaseDelay); 188 } 189 Boolean readLockIdempotentReleaseAsync = (Boolean) params.get("readLockIdempotentReleaseAsync"); 190 if (readLockIdempotentReleaseAsync != null) { 191 readLockStrategy.setReadLockIdempotentReleaseAsync(readLockIdempotentReleaseAsync); 192 } 193 Integer readLockIdempotentReleaseAsyncPoolSize = (Integer) params.get("readLockIdempotentReleaseAsyncPoolSize"); 194 if (readLockIdempotentReleaseAsyncPoolSize != null) { 195 readLockStrategy.setReadLockIdempotentReleaseAsyncPoolSize(readLockIdempotentReleaseAsyncPoolSize); 196 } 197 ScheduledExecutorService readLockIdempotentReleaseExecutorService = (ScheduledExecutorService) params.get("readLockIdempotentReleaseExecutorService"); 198 if (readLockIdempotentReleaseExecutorService != null) { 199 readLockStrategy.setReadLockIdempotentReleaseExecutorService(readLockIdempotentReleaseExecutorService); 200 } 201 strategy = readLockStrategy; 202 } else if ("idempotent-rename".equals(readLock)) { 203 FileIdempotentRenameRepositoryReadLockStrategy readLockStrategy = new FileIdempotentRenameRepositoryReadLockStrategy(); 204 Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback"); 205 if (readLockRemoveOnRollback != null) { 206 readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback); 207 } 208 Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit"); 209 if (readLockRemoveOnCommit != null) { 210 readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit); 211 } 212 IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository"); 213 if (repo != null) { 214 readLockStrategy.setIdempotentRepository(repo); 215 } 216 strategy = readLockStrategy; 217 } 218 219 if (strategy != null) { 220 Long timeout = (Long) params.get("readLockTimeout"); 221 if (timeout != null) { 222 strategy.setTimeout(timeout); 223 } 224 Long checkInterval = (Long) params.get("readLockCheckInterval"); 225 if (checkInterval != null) { 226 strategy.setCheckInterval(checkInterval); 227 } 228 LoggingLevel readLockLoggingLevel = (LoggingLevel) params.get("readLockLoggingLevel"); 229 if (readLockLoggingLevel != null) { 230 strategy.setReadLockLoggingLevel(readLockLoggingLevel); 231 } 232 Boolean readLockMarkerFile = (Boolean) params.get("readLockMarkerFile"); 233 if (readLockMarkerFile != null) { 234 strategy.setMarkerFiler(readLockMarkerFile); 235 } 236 Boolean readLockDeleteOrphanLockFiles = (Boolean) params.get("readLockDeleteOrphanLockFiles"); 237 if (readLockDeleteOrphanLockFiles != null) { 238 strategy.setDeleteOrphanLockFiles(readLockDeleteOrphanLockFiles); 239 } 240 } 241 } 242 243 return strategy; 244 } 245}