001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.model; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.Map; 022import java.util.Optional; 023import java.util.TreeMap; 024import java.util.concurrent.TimeUnit; 025 026import javax.xml.bind.annotation.XmlAccessType; 027import javax.xml.bind.annotation.XmlAccessorType; 028import javax.xml.bind.annotation.XmlAttribute; 029import javax.xml.bind.annotation.XmlElement; 030import javax.xml.bind.annotation.XmlRootElement; 031import javax.xml.bind.annotation.XmlTransient; 032 033import org.apache.camel.CamelContext; 034import org.apache.camel.Endpoint; 035import org.apache.camel.Expression; 036import org.apache.camel.Processor; 037import org.apache.camel.RuntimeCamelException; 038import org.apache.camel.processor.saga.SagaProcessorBuilder; 039import org.apache.camel.saga.CamelSagaService; 040import org.apache.camel.saga.CamelSagaStep; 041import org.apache.camel.spi.Metadata; 042import org.apache.camel.spi.RouteContext; 043import org.apache.camel.util.CamelContextHelper; 044import org.apache.camel.util.ObjectHelper; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * Enables sagas on the route 050 * 051 * @version 052 */ 053@Metadata(label = "eip,routing") 054@XmlRootElement(name = "saga") 055@XmlAccessorType(XmlAccessType.FIELD) 056public class SagaDefinition extends OutputDefinition<SagaDefinition> { 057 058 private static final Logger LOG = LoggerFactory.getLogger(SagaDefinition.class); 059 060 @XmlAttribute 061 @Metadata(defaultValue = "REQUIRED") 062 private SagaPropagation propagation; 063 064 @XmlAttribute 065 @Metadata(defaultValue = "AUTO") 066 private SagaCompletionMode completionMode; 067 068 @XmlAttribute 069 private Long timeoutInMilliseconds; 070 071 @XmlElement 072 private SagaActionUriDefinition compensation; 073 074 @XmlElement 075 private SagaActionUriDefinition completion; 076 077 @XmlElement(name = "option") 078 private List<SagaOptionDefinition> options; 079 080 @XmlTransient 081 private CamelSagaService sagaService; // TODO add ref for xml configuration 082 083 public SagaDefinition() { 084 } 085 086 @Override 087 public Processor createProcessor(RouteContext routeContext) throws Exception { 088 Optional<Endpoint> compensationEndpoint = Optional.ofNullable(this.compensation) 089 .map(SagaActionUriDefinition::getUri) 090 .map(routeContext::resolveEndpoint); 091 092 Optional<Endpoint> completionEndpoint = Optional.ofNullable(this.completion) 093 .map(SagaActionUriDefinition::getUri) 094 .map(routeContext::resolveEndpoint); 095 096 Map<String, Expression> optionsMap = new TreeMap<>(); 097 if (this.options != null) { 098 for (SagaOptionDefinition optionDef : this.options) { 099 String optionName = optionDef.getOptionName(); 100 Expression expr = optionDef.getExpression(); 101 optionsMap.put(optionName, expr); 102 } 103 } 104 105 CamelSagaStep step = new CamelSagaStep(compensationEndpoint, completionEndpoint, optionsMap, Optional.ofNullable(timeoutInMilliseconds)); 106 107 SagaPropagation propagation = this.propagation; 108 if (propagation == null) { 109 // default propagation mode 110 propagation = SagaPropagation.REQUIRED; 111 } 112 113 SagaCompletionMode completionMode = this.completionMode; 114 if (completionMode == null) { 115 // default completion mode 116 completionMode = SagaCompletionMode.defaultCompletionMode(); 117 } 118 119 Processor childProcessor = this.createChildProcessor(routeContext, true); 120 CamelSagaService camelSagaService = findSagaService(routeContext.getCamelContext()); 121 122 camelSagaService.registerStep(step); 123 124 return new SagaProcessorBuilder() 125 .camelContext(routeContext.getCamelContext()) 126 .childProcessor(childProcessor) 127 .sagaService(camelSagaService) 128 .step(step) 129 .propagation(propagation) 130 .completionMode(completionMode) 131 .build(); 132 } 133 134 @Override 135 public boolean isAbstract() { 136 return true; 137 } 138 139 @Override 140 public boolean isTopLevelOnly() { 141 return true; 142 } 143 144 @Override 145 public boolean isWrappingEntireOutput() { 146 return true; 147 } 148 149 @Override 150 public String getLabel() { 151 String desc = description(); 152 if (ObjectHelper.isEmpty(desc)) { 153 return "saga"; 154 } else { 155 return "saga[" + desc + "]"; 156 } 157 } 158 159 @Override 160 public String toString() { 161 String desc = description(); 162 if (ObjectHelper.isEmpty(desc)) { 163 return "Saga -> [" + outputs + "]"; 164 } else { 165 return "Saga[" + desc + "] -> [" + outputs + "]"; 166 } 167 } 168 169 // Properties 170 171 172 public SagaActionUriDefinition getCompensation() { 173 return compensation; 174 } 175 176 /** 177 * The compensation endpoint URI that must be called to compensate all changes done in the route. 178 * The route corresponding to the compensation URI must perform compensation and complete without error. 179 * 180 * If errors occur during compensation, the saga service may call again the compensation URI to retry. 181 */ 182 public void setCompensation(SagaActionUriDefinition compensation) { 183 this.compensation = compensation; 184 } 185 186 public SagaActionUriDefinition getCompletion() { 187 return completion; 188 } 189 190 /** 191 * The completion endpoint URI that will be called when the Saga is completed successfully. 192 * The route corresponding to the completion URI must perform completion tasks and terminate without error. 193 * 194 * If errors occur during completion, the saga service may call again the completion URI to retry. 195 */ 196 public void setCompletion(SagaActionUriDefinition completion) { 197 this.completion = completion; 198 } 199 200 public SagaPropagation getPropagation() { 201 return propagation; 202 } 203 204 /** 205 * Set the Saga propagation mode (REQUIRED, REQUIRES_NEW, MANDATORY, SUPPORTS, NOT_SUPPORTED, NEVER). 206 */ 207 public void setPropagation(SagaPropagation propagation) { 208 this.propagation = propagation; 209 } 210 211 public SagaCompletionMode getCompletionMode() { 212 return completionMode; 213 } 214 215 /** 216 * Determine how the saga should be considered complete. When set to AUTO, the saga is completed when the exchange that 217 * initiates the saga is processed successfully, or compensated when it completes exceptionally. 218 * 219 * When set to MANUAL, the user must complete or compensate the saga using the "saga:complete" or "saga:compensate" endpoints. 220 */ 221 public void setCompletionMode(SagaCompletionMode completionMode) { 222 this.completionMode = completionMode; 223 } 224 225 public CamelSagaService getSagaService() { 226 return sagaService; 227 } 228 229 public void setSagaService(CamelSagaService sagaService) { 230 this.sagaService = sagaService; 231 } 232 233 public List<SagaOptionDefinition> getOptions() { 234 return options; 235 } 236 237 /** 238 * Allows to save properties of the current exchange in order to re-use them in a compensation/completion callback route. 239 * Options are usually helpful e.g. to store and retrieve identifiers of objects that should be deleted in compensating actions. 240 * 241 * Option values will be transformed into input headers of the compensation/completion exchange. 242 */ 243 public void setOptions(List<SagaOptionDefinition> options) { 244 this.options = options; 245 } 246 247 public Long getTimeoutInMilliseconds() { 248 return timeoutInMilliseconds; 249 } 250 251 /** 252 * Set the maximum amount of time for the Saga. After the timeout is expired, the saga will be compensated 253 * automatically (unless a different decision has been taken in the meantime). 254 */ 255 public void setTimeoutInMilliseconds(Long timeoutInMilliseconds) { 256 this.timeoutInMilliseconds = timeoutInMilliseconds; 257 } 258 259 private void addOption(String option, Expression expression) { 260 if (this.options == null) { 261 this.options = new ArrayList<>(); 262 } 263 this.options.add(new SagaOptionDefinition(option, expression)); 264 } 265 266 // Builders 267 268 public SagaDefinition compensation(String compensation) { 269 if (this.compensation != null) { 270 throw new IllegalStateException("Compensation has already been set"); 271 } 272 this.compensation = new SagaActionUriDefinition(compensation); 273 return this; 274 } 275 276 public SagaDefinition completion(String completion) { 277 if (this.completion != null) { 278 throw new IllegalStateException("Completion has already been set"); 279 } 280 this.completion = new SagaActionUriDefinition(completion); 281 return this; 282 } 283 284 public SagaDefinition propagation(SagaPropagation propagation) { 285 setPropagation(propagation); 286 return this; 287 } 288 289 public SagaDefinition sagaService(CamelSagaService sagaService) { 290 setSagaService(sagaService); 291 return this; 292 } 293 294 public SagaDefinition completionMode(SagaCompletionMode completionMode) { 295 setCompletionMode(completionMode); 296 return this; 297 } 298 299 public SagaDefinition option(String option, Expression expression) { 300 addOption(option, expression); 301 return this; 302 } 303 304 public SagaDefinition timeout(long timeout, TimeUnit unit) { 305 setTimeoutInMilliseconds(unit.toMillis(timeout)); 306 return this; 307 } 308 309 // Utils 310 311 protected CamelSagaService findSagaService(CamelContext context) { 312 CamelSagaService sagaService = getSagaService(); 313 if (sagaService != null) { 314 return sagaService; 315 } 316 317 sagaService = context.hasService(CamelSagaService.class); 318 if (sagaService != null) { 319 return sagaService; 320 } 321 322 sagaService = CamelContextHelper.findByType(context, CamelSagaService.class); 323 if (sagaService != null) { 324 return sagaService; 325 } 326 327 throw new RuntimeCamelException("Cannot find a CamelSagaService"); 328 } 329 330 protected String description() { 331 StringBuilder desc = new StringBuilder(); 332 addField(desc, "compensation", compensation); 333 addField(desc, "completion", completion); 334 addField(desc, "propagation", propagation); 335 return desc.toString(); 336 } 337 338 private void addField(StringBuilder builder, String key, Object value) { 339 if (value == null) { 340 return; 341 } 342 if (builder.length() > 0) { 343 builder.append(','); 344 } 345 builder.append(key).append(':').append(value); 346 } 347 348}