001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl.saga; 018 019import java.util.ArrayList; 020import java.util.Collections; 021import java.util.List; 022import java.util.Map; 023import java.util.Optional; 024import java.util.concurrent.CompletableFuture; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.CopyOnWriteArrayList; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicReference; 029import java.util.function.Function; 030 031import org.apache.camel.CamelContext; 032import org.apache.camel.Endpoint; 033import org.apache.camel.Exchange; 034import org.apache.camel.Expression; 035import org.apache.camel.RuntimeCamelException; 036import org.apache.camel.saga.CamelSagaCoordinator; 037import org.apache.camel.saga.CamelSagaStep; 038import org.apache.camel.util.ObjectHelper; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * A in-memory implementation of a saga coordinator. 044 */ 045public class InMemorySagaCoordinator implements CamelSagaCoordinator { 046 047 private enum Status { 048 RUNNING, 049 COMPENSATING, 050 COMPENSATED, 051 COMPLETING, 052 COMPLETED 053 } 054 055 private static final Logger LOG = LoggerFactory.getLogger(InMemorySagaCoordinator.class); 056 057 private CamelContext camelContext; 058 private InMemorySagaService sagaService; 059 private String sagaId; 060 private List<CamelSagaStep> steps; 061 private Map<CamelSagaStep, Map<String, Object>> optionValues; 062 private AtomicReference<Status> currentStatus; 063 064 public InMemorySagaCoordinator(CamelContext camelContext, InMemorySagaService sagaService, String sagaId) { 065 this.camelContext = ObjectHelper.notNull(camelContext, "camelContext"); 066 this.sagaService = ObjectHelper.notNull(sagaService, "sagaService"); 067 this.sagaId = ObjectHelper.notNull(sagaId, "sagaId"); 068 this.steps = new CopyOnWriteArrayList<>(); 069 this.optionValues = new ConcurrentHashMap<>(); 070 this.currentStatus = new AtomicReference<>(Status.RUNNING); 071 } 072 073 @Override 074 public String getId() { 075 return sagaId; 076 } 077 078 @Override 079 public CompletableFuture<Void> beginStep(Exchange exchange, CamelSagaStep step) { 080 this.steps.add(step); 081 082 if (!step.getOptions().isEmpty()) { 083 optionValues.putIfAbsent(step, new ConcurrentHashMap<>()); 084 Map<String, Object> values = optionValues.get(step); 085 for (String option : step.getOptions().keySet()) { 086 Expression expression = step.getOptions().get(option); 087 values.put(option, expression.evaluate(exchange, Object.class)); 088 } 089 } 090 091 if (step.getTimeoutInMilliseconds().isPresent()) { 092 sagaService.getExecutorService().schedule(() -> { 093 boolean doAction = currentStatus.compareAndSet(Status.RUNNING, Status.COMPENSATING); 094 if (doAction) { 095 doCompensate(); 096 } 097 }, step.getTimeoutInMilliseconds().get(), TimeUnit.MILLISECONDS); 098 } 099 100 return CompletableFuture.completedFuture(null); 101 } 102 103 @Override 104 public CompletableFuture<Void> compensate() { 105 boolean doAction = currentStatus.compareAndSet(Status.RUNNING, Status.COMPENSATING); 106 107 if (doAction) { 108 doCompensate(); 109 } else { 110 Status status = currentStatus.get(); 111 if (status != Status.COMPENSATING && status != Status.COMPENSATED) { 112 CompletableFuture<Void> res = new CompletableFuture<>(); 113 res.completeExceptionally(new IllegalStateException("Cannot compensate: status is " + status)); 114 return res; 115 } 116 } 117 118 return CompletableFuture.completedFuture(null); 119 } 120 121 122 @Override 123 public CompletableFuture<Void> complete() { 124 boolean doAction = currentStatus.compareAndSet(Status.RUNNING, Status.COMPLETING); 125 126 if (doAction) { 127 doComplete(); 128 } else { 129 Status status = currentStatus.get(); 130 if (status != Status.COMPLETING && status != Status.COMPLETED) { 131 CompletableFuture<Void> res = new CompletableFuture<>(); 132 res.completeExceptionally(new IllegalStateException("Cannot complete: status is " + status)); 133 return res; 134 } 135 } 136 137 return CompletableFuture.completedFuture(null); 138 } 139 140 public CompletableFuture<Boolean> doCompensate() { 141 return doFinalize(CamelSagaStep::getCompensation, "compensation") 142 .thenApply(res -> { 143 currentStatus.set(Status.COMPENSATED); 144 return res; 145 }); 146 } 147 148 public CompletableFuture<Boolean> doComplete() { 149 return doFinalize(CamelSagaStep::getCompletion, "completion") 150 .thenApply(res -> { 151 currentStatus.set(Status.COMPLETED); 152 return res; 153 }); 154 } 155 156 public CompletableFuture<Boolean> doFinalize(Function<CamelSagaStep, Optional<Endpoint>> endpointExtractor, String description) { 157 CompletableFuture<Boolean> result = CompletableFuture.completedFuture(true); 158 for (CamelSagaStep step : reversed(steps)) { 159 Optional<Endpoint> endpoint = endpointExtractor.apply(step); 160 if (endpoint.isPresent()) { 161 result = result.thenCompose(prevResult -> 162 doFinalize(endpoint.get(), step, 0, description).thenApply(res -> prevResult && res)); 163 } 164 } 165 return result.whenComplete((done, ex) -> { 166 if (ex != null) { 167 LOG.error("Cannot finalize " + description + " the saga", ex); 168 } else if (!done) { 169 LOG.warn("Unable to finalize " + description + " for all required steps of the saga " + sagaId); 170 } 171 }); 172 } 173 174 private CompletableFuture<Boolean> doFinalize(Endpoint endpoint, CamelSagaStep step, int doneAttempts, String description) { 175 Exchange exchange = createExchange(endpoint, step); 176 177 return CompletableFuture.supplyAsync(() -> { 178 Exchange res = camelContext.createFluentProducerTemplate().to(endpoint).withExchange(exchange).send(); 179 Exception ex = res.getException(); 180 if (ex != null) { 181 throw new RuntimeCamelException(res.getException()); 182 } 183 return true; 184 }, sagaService.getExecutorService()).exceptionally(ex -> { 185 LOG.warn("Exception thrown during " + description + " at " + endpoint.getEndpointUri() 186 + ". Attempt " + (doneAttempts + 1) + " of " + sagaService.getMaxRetryAttempts(), ex); 187 return false; 188 }).thenCompose(executed -> { 189 int currentAttempt = doneAttempts + 1; 190 if (executed) { 191 return CompletableFuture.completedFuture(true); 192 } else if (currentAttempt >= sagaService.getMaxRetryAttempts()) { 193 return CompletableFuture.completedFuture(false); 194 } else { 195 CompletableFuture<Boolean> future = new CompletableFuture<>(); 196 sagaService.getExecutorService().schedule(() -> { 197 doFinalize(endpoint, step, currentAttempt, description).whenComplete((res, ex) -> { 198 if (ex != null) { 199 future.completeExceptionally(ex); 200 } else { 201 future.complete(res); 202 } 203 }); 204 }, sagaService.getRetryDelayInMilliseconds(), TimeUnit.MILLISECONDS); 205 return future; 206 } 207 }); 208 } 209 210 private Exchange createExchange(Endpoint endpoint, CamelSagaStep step) { 211 Exchange exchange = endpoint.createExchange(); 212 exchange.getIn().setHeader(Exchange.SAGA_LONG_RUNNING_ACTION, getId()); 213 214 Map<String, Object> values = optionValues.get(step); 215 if (values != null) { 216 for (Map.Entry<String, Object> entry : values.entrySet()) { 217 exchange.getIn().setHeader(entry.getKey(), entry.getValue()); 218 } 219 } 220 return exchange; 221 } 222 223 private <T> List<T> reversed(List<T> list) { 224 List<T> reversed = new ArrayList<>(list); 225 Collections.reverse(reversed); 226 return reversed; 227 } 228}