001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.Map;
022import java.util.Optional;
023import java.util.TreeMap;
024import java.util.concurrent.TimeUnit;
025import javax.xml.bind.annotation.XmlAccessType;
026import javax.xml.bind.annotation.XmlAccessorType;
027import javax.xml.bind.annotation.XmlAttribute;
028import javax.xml.bind.annotation.XmlElement;
029import javax.xml.bind.annotation.XmlRootElement;
030import javax.xml.bind.annotation.XmlTransient;
031
032import org.apache.camel.CamelContext;
033import org.apache.camel.Endpoint;
034import org.apache.camel.Expression;
035import org.apache.camel.Processor;
036import org.apache.camel.RuntimeCamelException;
037import org.apache.camel.processor.saga.SagaProcessorBuilder;
038import org.apache.camel.saga.CamelSagaService;
039import org.apache.camel.saga.CamelSagaStep;
040import org.apache.camel.spi.Metadata;
041import org.apache.camel.spi.RouteContext;
042import org.apache.camel.util.CamelContextHelper;
043import org.apache.camel.util.ObjectHelper;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * Enables sagas on the route
049 *
050 * @version
051 */
052@Metadata(label = "eip,routing")
053@XmlRootElement(name = "saga")
054@XmlAccessorType(XmlAccessType.FIELD)
055public class SagaDefinition extends OutputDefinition<SagaDefinition> {
056
057    private static final Logger LOG = LoggerFactory.getLogger(SagaDefinition.class);
058
059    @XmlAttribute
060    @Metadata(defaultValue = "REQUIRED")
061    private SagaPropagation propagation;
062
063    @XmlAttribute
064    @Metadata(defaultValue = "AUTO")
065    private SagaCompletionMode completionMode;
066
067    @XmlAttribute
068    private Long timeoutInMilliseconds;
069
070    @XmlElement
071    private SagaActionUriDefinition compensation;
072
073    @XmlElement
074    private SagaActionUriDefinition completion;
075
076    @XmlElement(name = "option")
077    private List<SagaOptionDefinition> options;
078
079    @XmlTransient
080    private CamelSagaService sagaService; // TODO add ref for xml configuration
081
082    public SagaDefinition() {
083    }
084
085    @Override
086    public Processor createProcessor(RouteContext routeContext) throws Exception {
087        Optional<Endpoint> compensationEndpoint = Optional.ofNullable(this.compensation)
088                .map(SagaActionUriDefinition::getUri)
089                .map(routeContext::resolveEndpoint);
090
091        Optional<Endpoint> completionEndpoint = Optional.ofNullable(this.completion)
092                .map(SagaActionUriDefinition::getUri)
093                .map(routeContext::resolveEndpoint);
094
095        Map<String, Expression> optionsMap = new TreeMap<>();
096        if (this.options != null) {
097            for (SagaOptionDefinition optionDef : this.options) {
098                String optionName = optionDef.getOptionName();
099                Expression expr = optionDef.getExpression();
100                optionsMap.put(optionName, expr);
101            }
102        }
103
104        CamelSagaStep step = new CamelSagaStep(compensationEndpoint, completionEndpoint, optionsMap, Optional.ofNullable(timeoutInMilliseconds));
105
106        SagaPropagation propagation = this.propagation;
107        if (propagation == null) {
108            // default propagation mode
109            propagation = SagaPropagation.REQUIRED;
110        }
111
112        SagaCompletionMode completionMode = this.completionMode;
113        if (completionMode == null) {
114            // default completion mode
115            completionMode = SagaCompletionMode.defaultCompletionMode();
116        }
117
118        Processor childProcessor = this.createChildProcessor(routeContext, true);
119        CamelSagaService camelSagaService = findSagaService(routeContext.getCamelContext());
120
121        camelSagaService.registerStep(step);
122
123        return new SagaProcessorBuilder()
124                .camelContext(routeContext.getCamelContext())
125                .childProcessor(childProcessor)
126                .sagaService(camelSagaService)
127                .step(step)
128                .propagation(propagation)
129                .completionMode(completionMode)
130                .build();
131    }
132
133    @Override
134    public boolean isAbstract() {
135        return true;
136    }
137
138    @Override
139    public boolean isTopLevelOnly() {
140        return true;
141    }
142
143    @Override
144    public boolean isWrappingEntireOutput() {
145        return true;
146    }
147
148    @Override
149    public String getLabel() {
150        String desc = description();
151        if (ObjectHelper.isEmpty(desc)) {
152            return "saga";
153        } else {
154            return "saga[" + desc + "]";
155        }
156    }
157
158    @Override
159    public String toString() {
160        String desc = description();
161        if (ObjectHelper.isEmpty(desc)) {
162            return "Saga -> [" + outputs + "]";
163        } else {
164            return "Saga[" + desc + "] -> [" + outputs + "]";
165        }
166    }
167
168    // Properties
169
170
171    public SagaActionUriDefinition getCompensation() {
172        return compensation;
173    }
174
175    /**
176     * The compensation endpoint URI that must be called to compensate all changes done in the route.
177     * The route corresponding to the compensation URI must perform compensation and complete without error.
178     *
179     * If errors occur during compensation, the saga service may call again the compensation URI to retry.
180     */
181    public void setCompensation(SagaActionUriDefinition compensation) {
182        this.compensation = compensation;
183    }
184
185    public SagaActionUriDefinition getCompletion() {
186        return completion;
187    }
188
189    /**
190     * The completion endpoint URI that will be called when the Saga is completed successfully.
191     * The route corresponding to the completion URI must perform completion tasks and terminate without error.
192     *
193     * If errors occur during completion, the saga service may call again the completion URI to retry.
194     */
195    public void setCompletion(SagaActionUriDefinition completion) {
196        this.completion = completion;
197    }
198
199    public SagaPropagation getPropagation() {
200        return propagation;
201    }
202
203    /**
204     * Set the Saga propagation mode (REQUIRED, REQUIRES_NEW, MANDATORY, SUPPORTS, NOT_SUPPORTED, NEVER).
205     */
206    public void setPropagation(SagaPropagation propagation) {
207        this.propagation = propagation;
208    }
209
210    public SagaCompletionMode getCompletionMode() {
211        return completionMode;
212    }
213
214    /**
215     * Determine how the saga should be considered complete. When set to AUTO, the saga is completed when the exchange that
216     * initiates the saga is processed successfully, or compensated when it completes exceptionally.
217     *
218     * When set to MANUAL, the user must complete or compensate the saga using the "saga:complete" or "saga:compensate" endpoints.
219     */
220    public void setCompletionMode(SagaCompletionMode completionMode) {
221        this.completionMode = completionMode;
222    }
223
224    public CamelSagaService getSagaService() {
225        return sagaService;
226    }
227
228    public void setSagaService(CamelSagaService sagaService) {
229        this.sagaService = sagaService;
230    }
231
232    public List<SagaOptionDefinition> getOptions() {
233        return options;
234    }
235
236    /**
237     * Allows to save properties of the current exchange in order to re-use them in a compensation/completion callback route.
238     * Options are usually helpful e.g. to store and retrieve identifiers of objects that should be deleted in compensating actions.
239     *
240     * Option values will be transformed into input headers of the compensation/completion exchange.
241     */
242    public void setOptions(List<SagaOptionDefinition> options) {
243        this.options = options;
244    }
245
246    public Long getTimeoutInMilliseconds() {
247        return timeoutInMilliseconds;
248    }
249
250    /**
251     * Set the maximum amount of time for the Saga. After the timeout is expired, the saga will be compensated
252     * automatically (unless a different decision has been taken in the meantime).
253     */
254    public void setTimeoutInMilliseconds(Long timeoutInMilliseconds) {
255        this.timeoutInMilliseconds = timeoutInMilliseconds;
256    }
257
258    private void addOption(String option, Expression expression) {
259        if (this.options == null) {
260            this.options = new ArrayList<>();
261        }
262        this.options.add(new SagaOptionDefinition(option, expression));
263    }
264
265    // Builders
266
267    public SagaDefinition compensation(String compensation) {
268        if (this.compensation != null) {
269            throw new IllegalStateException("Compensation has already been set");
270        }
271        this.compensation = new SagaActionUriDefinition(compensation);
272        return this;
273    }
274
275    public SagaDefinition completion(String completion) {
276        if (this.completion != null) {
277            throw new IllegalStateException("Completion has already been set");
278        }
279        this.completion = new SagaActionUriDefinition(completion);
280        return this;
281    }
282
283    public SagaDefinition propagation(SagaPropagation propagation) {
284        setPropagation(propagation);
285        return this;
286    }
287
288    public SagaDefinition sagaService(CamelSagaService sagaService) {
289        setSagaService(sagaService);
290        return this;
291    }
292
293    public SagaDefinition completionMode(SagaCompletionMode completionMode) {
294        setCompletionMode(completionMode);
295        return this;
296    }
297
298    public SagaDefinition option(String option, Expression expression) {
299        addOption(option, expression);
300        return this;
301    }
302
303    public SagaDefinition timeout(long timeout, TimeUnit unit) {
304        setTimeoutInMilliseconds(unit.toMillis(timeout));
305        return this;
306    }
307
308    // Utils
309
310    protected CamelSagaService findSagaService(CamelContext context) {
311        CamelSagaService sagaService = getSagaService();
312        if (sagaService != null) {
313            return sagaService;
314        }
315
316        sagaService = context.hasService(CamelSagaService.class);
317        if (sagaService != null) {
318            return sagaService;
319        }
320
321        sagaService = CamelContextHelper.findByType(context, CamelSagaService.class);
322        if (sagaService != null) {
323            return sagaService;
324        }
325
326        throw new RuntimeCamelException("Cannot find a CamelSagaService");
327    }
328
329    protected String description() {
330        StringBuilder desc = new StringBuilder();
331        addField(desc, "compensation", compensation);
332        addField(desc, "completion", completion);
333        addField(desc, "propagation", propagation);
334        return desc.toString();
335    }
336
337    private void addField(StringBuilder builder, String key, Object value) {
338        if (value == null) {
339            return;
340        }
341        if (builder.length() > 0) {
342            builder.append(',');
343        }
344        builder.append(key).append(':').append(value);
345    }
346
347}