001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.Map;
022import java.util.Optional;
023import java.util.TreeMap;
024import java.util.concurrent.TimeUnit;
025
026import javax.xml.bind.annotation.XmlAccessType;
027import javax.xml.bind.annotation.XmlAccessorType;
028import javax.xml.bind.annotation.XmlAttribute;
029import javax.xml.bind.annotation.XmlElement;
030import javax.xml.bind.annotation.XmlRootElement;
031import javax.xml.bind.annotation.XmlTransient;
032
033import org.apache.camel.CamelContext;
034import org.apache.camel.Endpoint;
035import org.apache.camel.Expression;
036import org.apache.camel.Processor;
037import org.apache.camel.RuntimeCamelException;
038import org.apache.camel.processor.saga.SagaProcessorBuilder;
039import org.apache.camel.saga.CamelSagaService;
040import org.apache.camel.saga.CamelSagaStep;
041import org.apache.camel.spi.Metadata;
042import org.apache.camel.spi.RouteContext;
043import org.apache.camel.util.CamelContextHelper;
044import org.apache.camel.util.ObjectHelper;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * Enables sagas on the route
050 *
051 * @version
052 */
053@Metadata(label = "eip,routing")
054@XmlRootElement(name = "saga")
055@XmlAccessorType(XmlAccessType.FIELD)
056public class SagaDefinition extends OutputDefinition<SagaDefinition> {
057
058    private static final Logger LOG = LoggerFactory.getLogger(SagaDefinition.class);
059
060    @XmlAttribute
061    @Metadata(defaultValue = "REQUIRED")
062    private SagaPropagation propagation;
063
064    @XmlAttribute
065    @Metadata(defaultValue = "AUTO")
066    private SagaCompletionMode completionMode;
067
068    @XmlAttribute
069    private Long timeoutInMilliseconds;
070
071    @XmlElement
072    private SagaActionUriDefinition compensation;
073
074    @XmlElement
075    private SagaActionUriDefinition completion;
076
077    @XmlElement(name = "option")
078    private List<SagaOptionDefinition> options;
079
080    @XmlTransient
081    private CamelSagaService sagaService; // TODO add ref for xml configuration
082
083    public SagaDefinition() {
084    }
085
086    @Override
087    public Processor createProcessor(RouteContext routeContext) throws Exception {
088        Optional<Endpoint> compensationEndpoint = Optional.ofNullable(this.compensation)
089                .map(SagaActionUriDefinition::getUri)
090                .map(routeContext::resolveEndpoint);
091
092        Optional<Endpoint> completionEndpoint = Optional.ofNullable(this.completion)
093                .map(SagaActionUriDefinition::getUri)
094                .map(routeContext::resolveEndpoint);
095
096        Map<String, Expression> optionsMap = new TreeMap<>();
097        if (this.options != null) {
098            for (SagaOptionDefinition optionDef : this.options) {
099                String optionName = optionDef.getOptionName();
100                Expression expr = optionDef.getExpression();
101                optionsMap.put(optionName, expr);
102            }
103        }
104
105        CamelSagaStep step = new CamelSagaStep(compensationEndpoint, completionEndpoint, optionsMap, Optional.ofNullable(timeoutInMilliseconds));
106
107        SagaPropagation propagation = this.propagation;
108        if (propagation == null) {
109            // default propagation mode
110            propagation = SagaPropagation.REQUIRED;
111        }
112
113        SagaCompletionMode completionMode = this.completionMode;
114        if (completionMode == null) {
115            // default completion mode
116            completionMode = SagaCompletionMode.defaultCompletionMode();
117        }
118
119        Processor childProcessor = this.createChildProcessor(routeContext, true);
120        CamelSagaService camelSagaService = findSagaService(routeContext.getCamelContext());
121
122        camelSagaService.registerStep(step);
123
124        return new SagaProcessorBuilder()
125                .camelContext(routeContext.getCamelContext())
126                .childProcessor(childProcessor)
127                .sagaService(camelSagaService)
128                .step(step)
129                .propagation(propagation)
130                .completionMode(completionMode)
131                .build();
132    }
133
134    @Override
135    public boolean isAbstract() {
136        return true;
137    }
138
139    @Override
140    public boolean isTopLevelOnly() {
141        return true;
142    }
143
144    @Override
145    public boolean isWrappingEntireOutput() {
146        return true;
147    }
148
149    @Override
150    public String getLabel() {
151        String desc = description();
152        if (ObjectHelper.isEmpty(desc)) {
153            return "saga";
154        } else {
155            return "saga[" + desc + "]";
156        }
157    }
158
159    @Override
160    public String toString() {
161        String desc = description();
162        if (ObjectHelper.isEmpty(desc)) {
163            return "Saga -> [" + outputs + "]";
164        } else {
165            return "Saga[" + desc + "] -> [" + outputs + "]";
166        }
167    }
168
169    // Properties
170
171
172    public SagaActionUriDefinition getCompensation() {
173        return compensation;
174    }
175
176    /**
177     * The compensation endpoint URI that must be called to compensate all changes done in the route.
178     * The route corresponding to the compensation URI must perform compensation and complete without error.
179     *
180     * If errors occur during compensation, the saga service may call again the compensation URI to retry.
181     */
182    public void setCompensation(SagaActionUriDefinition compensation) {
183        this.compensation = compensation;
184    }
185
186    public SagaActionUriDefinition getCompletion() {
187        return completion;
188    }
189
190    /**
191     * The completion endpoint URI that will be called when the Saga is completed successfully.
192     * The route corresponding to the completion URI must perform completion tasks and terminate without error.
193     *
194     * If errors occur during completion, the saga service may call again the completion URI to retry.
195     */
196    public void setCompletion(SagaActionUriDefinition completion) {
197        this.completion = completion;
198    }
199
200    public SagaPropagation getPropagation() {
201        return propagation;
202    }
203
204    /**
205     * Set the Saga propagation mode (REQUIRED, REQUIRES_NEW, MANDATORY, SUPPORTS, NOT_SUPPORTED, NEVER).
206     */
207    public void setPropagation(SagaPropagation propagation) {
208        this.propagation = propagation;
209    }
210
211    public SagaCompletionMode getCompletionMode() {
212        return completionMode;
213    }
214
215    /**
216     * Determine how the saga should be considered complete. When set to AUTO, the saga is completed when the exchange that
217     * initiates the saga is processed successfully, or compensated when it completes exceptionally.
218     *
219     * When set to MANUAL, the user must complete or compensate the saga using the "saga:complete" or "saga:compensate" endpoints.
220     */
221    public void setCompletionMode(SagaCompletionMode completionMode) {
222        this.completionMode = completionMode;
223    }
224
225    public CamelSagaService getSagaService() {
226        return sagaService;
227    }
228
229    public void setSagaService(CamelSagaService sagaService) {
230        this.sagaService = sagaService;
231    }
232
233    public List<SagaOptionDefinition> getOptions() {
234        return options;
235    }
236
237    /**
238     * Allows to save properties of the current exchange in order to re-use them in a compensation/completion callback route.
239     * Options are usually helpful e.g. to store and retrieve identifiers of objects that should be deleted in compensating actions.
240     *
241     * Option values will be transformed into input headers of the compensation/completion exchange.
242     */
243    public void setOptions(List<SagaOptionDefinition> options) {
244        this.options = options;
245    }
246
247    public Long getTimeoutInMilliseconds() {
248        return timeoutInMilliseconds;
249    }
250
251    /**
252     * Set the maximum amount of time for the Saga. After the timeout is expired, the saga will be compensated
253     * automatically (unless a different decision has been taken in the meantime).
254     */
255    public void setTimeoutInMilliseconds(Long timeoutInMilliseconds) {
256        this.timeoutInMilliseconds = timeoutInMilliseconds;
257    }
258
259    private void addOption(String option, Expression expression) {
260        if (this.options == null) {
261            this.options = new ArrayList<>();
262        }
263        this.options.add(new SagaOptionDefinition(option, expression));
264    }
265
266    // Builders
267
268    public SagaDefinition compensation(String compensation) {
269        if (this.compensation != null) {
270            throw new IllegalStateException("Compensation has already been set");
271        }
272        this.compensation = new SagaActionUriDefinition(compensation);
273        return this;
274    }
275
276    public SagaDefinition completion(String completion) {
277        if (this.completion != null) {
278            throw new IllegalStateException("Completion has already been set");
279        }
280        this.completion = new SagaActionUriDefinition(completion);
281        return this;
282    }
283
284    public SagaDefinition propagation(SagaPropagation propagation) {
285        setPropagation(propagation);
286        return this;
287    }
288
289    public SagaDefinition sagaService(CamelSagaService sagaService) {
290        setSagaService(sagaService);
291        return this;
292    }
293
294    public SagaDefinition completionMode(SagaCompletionMode completionMode) {
295        setCompletionMode(completionMode);
296        return this;
297    }
298
299    public SagaDefinition option(String option, Expression expression) {
300        addOption(option, expression);
301        return this;
302    }
303
304    public SagaDefinition timeout(long timeout, TimeUnit unit) {
305        setTimeoutInMilliseconds(unit.toMillis(timeout));
306        return this;
307    }
308
309    // Utils
310
311    protected CamelSagaService findSagaService(CamelContext context) {
312        CamelSagaService sagaService = getSagaService();
313        if (sagaService != null) {
314            return sagaService;
315        }
316
317        sagaService = context.hasService(CamelSagaService.class);
318        if (sagaService != null) {
319            return sagaService;
320        }
321
322        sagaService = CamelContextHelper.findByType(context, CamelSagaService.class);
323        if (sagaService != null) {
324            return sagaService;
325        }
326
327        throw new RuntimeCamelException("Cannot find a CamelSagaService");
328    }
329
330    protected String description() {
331        StringBuilder desc = new StringBuilder();
332        addField(desc, "compensation", compensation);
333        addField(desc, "completion", completion);
334        addField(desc, "propagation", propagation);
335        return desc.toString();
336    }
337
338    private void addField(StringBuilder builder, String key, Object value) {
339        if (value == null) {
340            return;
341        }
342        if (builder.length() > 0) {
343            builder.append(',');
344        }
345        builder.append(key).append(':').append(value);
346    }
347
348}