001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl.saga;
018
019import java.util.ArrayList;
020import java.util.Collections;
021import java.util.List;
022import java.util.Map;
023import java.util.Optional;
024import java.util.concurrent.CompletableFuture;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.CopyOnWriteArrayList;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicReference;
029import java.util.function.Function;
030
031import org.apache.camel.CamelContext;
032import org.apache.camel.Endpoint;
033import org.apache.camel.Exchange;
034import org.apache.camel.Expression;
035import org.apache.camel.RuntimeCamelException;
036import org.apache.camel.saga.CamelSagaCoordinator;
037import org.apache.camel.saga.CamelSagaStep;
038import org.apache.camel.util.ObjectHelper;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * A in-memory implementation of a saga coordinator.
044 */
045public class InMemorySagaCoordinator implements CamelSagaCoordinator {
046
047    private enum Status {
048        RUNNING,
049        COMPENSATING,
050        COMPENSATED,
051        COMPLETING,
052        COMPLETED
053    }
054
055    private static final Logger LOG = LoggerFactory.getLogger(InMemorySagaCoordinator.class);
056
057    private CamelContext camelContext;
058    private InMemorySagaService sagaService;
059    private String sagaId;
060    private List<CamelSagaStep> steps;
061    private Map<CamelSagaStep, Map<String, Object>> optionValues;
062    private AtomicReference<Status> currentStatus;
063
064    public InMemorySagaCoordinator(CamelContext camelContext, InMemorySagaService sagaService, String sagaId) {
065        this.camelContext = ObjectHelper.notNull(camelContext, "camelContext");
066        this.sagaService = ObjectHelper.notNull(sagaService, "sagaService");
067        this.sagaId = ObjectHelper.notNull(sagaId, "sagaId");
068        this.steps = new CopyOnWriteArrayList<>();
069        this.optionValues = new ConcurrentHashMap<>();
070        this.currentStatus = new AtomicReference<>(Status.RUNNING);
071    }
072
073    @Override
074    public String getId() {
075        return sagaId;
076    }
077
078    @Override
079    public CompletableFuture<Void> beginStep(Exchange exchange, CamelSagaStep step) {
080        this.steps.add(step);
081
082        if (!step.getOptions().isEmpty()) {
083            optionValues.putIfAbsent(step, new ConcurrentHashMap<>());
084            Map<String, Object> values = optionValues.get(step);
085            for (String option : step.getOptions().keySet()) {
086                Expression expression = step.getOptions().get(option);
087                values.put(option, expression.evaluate(exchange, Object.class));
088            }
089        }
090
091        if (step.getTimeoutInMilliseconds().isPresent()) {
092            sagaService.getExecutorService().schedule(() -> {
093                boolean doAction = currentStatus.compareAndSet(Status.RUNNING, Status.COMPENSATING);
094                if (doAction) {
095                    doCompensate();
096                }
097            }, step.getTimeoutInMilliseconds().get(), TimeUnit.MILLISECONDS);
098        }
099
100        return CompletableFuture.completedFuture(null);
101    }
102
103    @Override
104    public CompletableFuture<Void> compensate() {
105        boolean doAction = currentStatus.compareAndSet(Status.RUNNING, Status.COMPENSATING);
106
107        if (doAction) {
108            doCompensate();
109        } else {
110            Status status = currentStatus.get();
111            if (status != Status.COMPENSATING && status != Status.COMPENSATED) {
112                CompletableFuture<Void> res = new CompletableFuture<>();
113                res.completeExceptionally(new IllegalStateException("Cannot compensate: status is " + status));
114                return res;
115            }
116        }
117
118        return CompletableFuture.completedFuture(null);
119    }
120
121
122    @Override
123    public CompletableFuture<Void> complete() {
124        boolean doAction = currentStatus.compareAndSet(Status.RUNNING, Status.COMPLETING);
125
126        if (doAction) {
127            doComplete();
128        } else {
129            Status status = currentStatus.get();
130            if (status != Status.COMPLETING && status != Status.COMPLETED) {
131                CompletableFuture<Void> res = new CompletableFuture<>();
132                res.completeExceptionally(new IllegalStateException("Cannot complete: status is " + status));
133                return res;
134            }
135        }
136
137        return CompletableFuture.completedFuture(null);
138    }
139
140    public CompletableFuture<Boolean> doCompensate() {
141        return doFinalize(CamelSagaStep::getCompensation, "compensation")
142                .thenApply(res -> {
143                    currentStatus.set(Status.COMPENSATED);
144                    return res;
145                });
146    }
147
148    public CompletableFuture<Boolean> doComplete() {
149        return doFinalize(CamelSagaStep::getCompletion, "completion")
150                .thenApply(res -> {
151                    currentStatus.set(Status.COMPLETED);
152                    return res;
153                });
154    }
155
156    public CompletableFuture<Boolean> doFinalize(Function<CamelSagaStep, Optional<Endpoint>> endpointExtractor, String description) {
157        CompletableFuture<Boolean> result = CompletableFuture.completedFuture(true);
158        for (CamelSagaStep step : reversed(steps)) {
159            Optional<Endpoint> endpoint = endpointExtractor.apply(step);
160            if (endpoint.isPresent()) {
161                result = result.thenCompose(prevResult ->
162                        doFinalize(endpoint.get(), step, 0, description).thenApply(res -> prevResult && res));
163            }
164        }
165        return result.whenComplete((done, ex) -> {
166            if (ex != null) {
167                LOG.error("Cannot finalize " + description + " the saga", ex);
168            } else if (!done) {
169                LOG.warn("Unable to finalize " + description + " for all required steps of the saga " + sagaId);
170            }
171        });
172    }
173
174    private CompletableFuture<Boolean> doFinalize(Endpoint endpoint, CamelSagaStep step, int doneAttempts, String description) {
175        Exchange exchange = createExchange(endpoint, step);
176
177        return CompletableFuture.supplyAsync(() -> {
178            Exchange res = camelContext.createFluentProducerTemplate().to(endpoint).withExchange(exchange).send();
179            Exception ex = res.getException();
180            if (ex != null) {
181                throw new RuntimeCamelException(res.getException());
182            }
183            return true;
184        }, sagaService.getExecutorService()).exceptionally(ex -> {
185            LOG.warn("Exception thrown during " + description + " at " + endpoint.getEndpointUri()
186                    + ". Attempt " + (doneAttempts + 1) + " of " + sagaService.getMaxRetryAttempts(), ex);
187            return false;
188        }).thenCompose(executed -> {
189            int currentAttempt = doneAttempts + 1;
190            if (executed) {
191                return CompletableFuture.completedFuture(true);
192            } else if (currentAttempt >= sagaService.getMaxRetryAttempts()) {
193                return CompletableFuture.completedFuture(false);
194            } else {
195                CompletableFuture<Boolean> future = new CompletableFuture<>();
196                sagaService.getExecutorService().schedule(() -> {
197                    doFinalize(endpoint, step, currentAttempt, description).whenComplete((res, ex) -> {
198                        if (ex != null) {
199                            future.completeExceptionally(ex);
200                        } else {
201                            future.complete(res);
202                        }
203                    });
204                }, sagaService.getRetryDelayInMilliseconds(), TimeUnit.MILLISECONDS);
205                return future;
206            }
207        });
208    }
209
210    private Exchange createExchange(Endpoint endpoint, CamelSagaStep step) {
211        Exchange exchange = endpoint.createExchange();
212        exchange.getIn().setHeader(Exchange.SAGA_LONG_RUNNING_ACTION, getId());
213
214        Map<String, Object> values = optionValues.get(step);
215        if (values != null) {
216            for (Map.Entry<String, Object> entry : values.entrySet()) {
217                exchange.getIn().setHeader(entry.getKey(), entry.getValue());
218            }
219        }
220        return exchange;
221    }
222
223    private <T> List<T> reversed(List<T> list) {
224        List<T> reversed = new ArrayList<>(list);
225        Collections.reverse(reversed);
226        return reversed;
227    }
228}