001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.file.strategy;
018
019import java.io.File;
020import java.util.Map;
021import java.util.concurrent.ScheduledExecutorService;
022
023import org.apache.camel.CamelContext;
024import org.apache.camel.Expression;
025import org.apache.camel.LoggingLevel;
026import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
027import org.apache.camel.component.file.GenericFileProcessStrategy;
028import org.apache.camel.spi.IdempotentRepository;
029import org.apache.camel.spi.Language;
030import org.apache.camel.util.ObjectHelper;
031
032public final class FileProcessStrategyFactory {
033
034    private FileProcessStrategyFactory() {
035    }
036
037    public static GenericFileProcessStrategy<File> createGenericFileProcessStrategy(CamelContext context, Map<String, Object> params) {
038
039        // We assume a value is present only if its value not null for String and 'true' for boolean
040        Expression moveExpression = (Expression) params.get("move");
041        Expression moveFailedExpression = (Expression) params.get("moveFailed");
042        Expression preMoveExpression = (Expression) params.get("preMove");
043        boolean isNoop = params.get("noop") != null;
044        boolean isDelete = params.get("delete") != null;
045        boolean isMove = moveExpression != null || preMoveExpression != null || moveFailedExpression != null;
046
047        if (isDelete) {
048            GenericFileDeleteProcessStrategy<File> strategy = new GenericFileDeleteProcessStrategy<>();
049            strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
050            if (preMoveExpression != null) {
051                GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<>();
052                renamer.setExpression(preMoveExpression);
053                strategy.setBeginRenamer(renamer);
054            }
055            if (moveFailedExpression != null) {
056                GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<>();
057                renamer.setExpression(moveFailedExpression);
058                strategy.setFailureRenamer(renamer);
059            }
060            return strategy;
061        } else if (isMove || isNoop) {
062            GenericFileRenameProcessStrategy<File> strategy = new GenericFileRenameProcessStrategy<>();
063            strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
064            if (!isNoop) {
065                // move on commit is only possible if not noop
066                if (moveExpression != null) {
067                    GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<>();
068                    renamer.setExpression(moveExpression);
069                    strategy.setCommitRenamer(renamer);
070                } else {
071                    strategy.setCommitRenamer(getDefaultCommitRenamer(context));
072                }
073            }
074            // both move and noop supports pre move
075            if (preMoveExpression != null) {
076                GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<>();
077                renamer.setExpression(preMoveExpression);
078                strategy.setBeginRenamer(renamer);
079            }
080            // both move and noop supports move failed
081            if (moveFailedExpression != null) {
082                GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<>();
083                renamer.setExpression(moveFailedExpression);
084                strategy.setFailureRenamer(renamer);
085            }
086            return strategy;
087        } else {
088            // default strategy will move files in a .camel/ subfolder where the file was consumed
089            GenericFileRenameProcessStrategy<File> strategy = new GenericFileRenameProcessStrategy<>();
090            strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
091            strategy.setCommitRenamer(getDefaultCommitRenamer(context));
092            return strategy;
093        }
094    }
095
096    private static GenericFileExpressionRenamer<File> getDefaultCommitRenamer(CamelContext context) {
097        // use context to lookup language to let it be loose coupled
098        Language language = context.resolveLanguage("file");
099        Expression expression = language.createExpression("${file:parent}/.camel/${file:onlyname}");
100        return new GenericFileExpressionRenamer<>(expression);
101    }
102
103    @SuppressWarnings("unchecked")
104    private static GenericFileExclusiveReadLockStrategy<File> getExclusiveReadLockStrategy(Map<String, Object> params) {
105        GenericFileExclusiveReadLockStrategy<File> strategy = (GenericFileExclusiveReadLockStrategy<File>) params.get("exclusiveReadLockStrategy");
106        if (strategy != null) {
107            return strategy;
108        }
109
110        // no explicit strategy set then fallback to readLock option
111        String readLock = (String) params.get("readLock");
112        if (ObjectHelper.isNotEmpty(readLock)) {
113            if ("none".equals(readLock) || "false".equals(readLock)) {
114                return null;
115            } else if ("markerFile".equals(readLock)) {
116                strategy = new MarkerFileExclusiveReadLockStrategy();
117            } else if ("fileLock".equals(readLock)) {
118                strategy = new FileLockExclusiveReadLockStrategy();
119            } else if ("rename".equals(readLock)) {
120                strategy = new FileRenameExclusiveReadLockStrategy();
121            } else if ("changed".equals(readLock)) {
122                FileChangedExclusiveReadLockStrategy readLockStrategy = new FileChangedExclusiveReadLockStrategy();
123                Long minLength = (Long) params.get("readLockMinLength");
124                if (minLength != null) {
125                    readLockStrategy.setMinLength(minLength);
126                }
127                Long minAge = (Long) params.get("readLockMinAge");
128                if (null != minAge) {
129                    readLockStrategy.setMinAge(minAge);
130                }
131                strategy = readLockStrategy;
132            } else if ("idempotent".equals(readLock)) {
133                FileIdempotentRepositoryReadLockStrategy readLockStrategy = new FileIdempotentRepositoryReadLockStrategy();
134                Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback");
135                if (readLockRemoveOnRollback != null) {
136                    readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback);
137                }
138                Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit");
139                if (readLockRemoveOnCommit != null) {
140                    readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit);
141                }
142                IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository");
143                if (repo != null) {
144                    readLockStrategy.setIdempotentRepository(repo);
145                }
146                Integer readLockIdempotentReleaseDelay = (Integer) params.get("readLockIdempotentReleaseDelay");
147                if (readLockIdempotentReleaseDelay != null) {
148                    readLockStrategy.setReadLockIdempotentReleaseDelay(readLockIdempotentReleaseDelay);
149                }
150                Boolean readLockIdempotentReleaseAsync = (Boolean) params.get("readLockIdempotentReleaseAsync");
151                if (readLockIdempotentReleaseAsync != null) {
152                    readLockStrategy.setReadLockIdempotentReleaseAsync(readLockIdempotentReleaseAsync);
153                }
154                Integer readLockIdempotentReleaseAsyncPoolSize = (Integer) params.get("readLockIdempotentReleaseAsyncPoolSize");
155                if (readLockIdempotentReleaseAsyncPoolSize != null) {
156                    readLockStrategy.setReadLockIdempotentReleaseAsyncPoolSize(readLockIdempotentReleaseAsyncPoolSize);
157                }
158                ScheduledExecutorService readLockIdempotentReleaseExecutorService = (ScheduledExecutorService) params.get("readLockIdempotentReleaseExecutorService");
159                if (readLockIdempotentReleaseExecutorService != null) {
160                    readLockStrategy.setReadLockIdempotentReleaseExecutorService(readLockIdempotentReleaseExecutorService);
161                }
162                strategy = readLockStrategy;
163            } else if ("idempotent-changed".equals(readLock)) {
164                FileIdempotentChangedRepositoryReadLockStrategy readLockStrategy = new FileIdempotentChangedRepositoryReadLockStrategy();
165                Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback");
166                if (readLockRemoveOnRollback != null) {
167                    readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback);
168                }
169                Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit");
170                if (readLockRemoveOnCommit != null) {
171                    readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit);
172                }
173                IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository");
174                if (repo != null) {
175                    readLockStrategy.setIdempotentRepository(repo);
176                }
177                Long minLength = (Long) params.get("readLockMinLength");
178                if (minLength != null) {
179                    readLockStrategy.setMinLength(minLength);
180                }
181                Long minAge = (Long) params.get("readLockMinAge");
182                if (null != minAge) {
183                    readLockStrategy.setMinAge(minAge);
184                }
185                Integer readLockIdempotentReleaseDelay = (Integer) params.get("readLockIdempotentReleaseDelay");
186                if (readLockIdempotentReleaseDelay != null) {
187                    readLockStrategy.setReadLockIdempotentReleaseDelay(readLockIdempotentReleaseDelay);
188                }
189                Boolean readLockIdempotentReleaseAsync = (Boolean) params.get("readLockIdempotentReleaseAsync");
190                if (readLockIdempotentReleaseAsync != null) {
191                    readLockStrategy.setReadLockIdempotentReleaseAsync(readLockIdempotentReleaseAsync);
192                }
193                Integer readLockIdempotentReleaseAsyncPoolSize = (Integer) params.get("readLockIdempotentReleaseAsyncPoolSize");
194                if (readLockIdempotentReleaseAsyncPoolSize != null) {
195                    readLockStrategy.setReadLockIdempotentReleaseAsyncPoolSize(readLockIdempotentReleaseAsyncPoolSize);
196                }
197                ScheduledExecutorService readLockIdempotentReleaseExecutorService = (ScheduledExecutorService) params.get("readLockIdempotentReleaseExecutorService");
198                if (readLockIdempotentReleaseExecutorService != null) {
199                    readLockStrategy.setReadLockIdempotentReleaseExecutorService(readLockIdempotentReleaseExecutorService);
200                }
201                strategy = readLockStrategy;
202            } else if ("idempotent-rename".equals(readLock)) {
203                FileIdempotentRenameRepositoryReadLockStrategy readLockStrategy = new FileIdempotentRenameRepositoryReadLockStrategy();
204                Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback");
205                if (readLockRemoveOnRollback != null) {
206                    readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback);
207                }
208                Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit");
209                if (readLockRemoveOnCommit != null) {
210                    readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit);
211                }
212                IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository");
213                if (repo != null) {
214                    readLockStrategy.setIdempotentRepository(repo);
215                }
216                strategy = readLockStrategy;
217            }
218
219            if (strategy != null) {
220                Long timeout = (Long) params.get("readLockTimeout");
221                if (timeout != null) {
222                    strategy.setTimeout(timeout);
223                }
224                Long checkInterval = (Long) params.get("readLockCheckInterval");
225                if (checkInterval != null) {
226                    strategy.setCheckInterval(checkInterval);
227                }
228                LoggingLevel readLockLoggingLevel = (LoggingLevel) params.get("readLockLoggingLevel");
229                if (readLockLoggingLevel != null) {
230                    strategy.setReadLockLoggingLevel(readLockLoggingLevel);
231                }
232                Boolean readLockMarkerFile = (Boolean) params.get("readLockMarkerFile");
233                if (readLockMarkerFile != null) {
234                    strategy.setMarkerFiler(readLockMarkerFile);
235                }
236                Boolean readLockDeleteOrphanLockFiles = (Boolean) params.get("readLockDeleteOrphanLockFiles");
237                if (readLockDeleteOrphanLockFiles != null) {
238                    strategy.setDeleteOrphanLockFiles(readLockDeleteOrphanLockFiles);
239                }
240            }
241        }
242
243        return strategy;
244    }
245}