001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.model; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.Map; 022import java.util.Optional; 023import java.util.TreeMap; 024import java.util.concurrent.TimeUnit; 025import javax.xml.bind.annotation.XmlAccessType; 026import javax.xml.bind.annotation.XmlAccessorType; 027import javax.xml.bind.annotation.XmlAttribute; 028import javax.xml.bind.annotation.XmlElement; 029import javax.xml.bind.annotation.XmlRootElement; 030import javax.xml.bind.annotation.XmlTransient; 031 032import org.apache.camel.CamelContext; 033import org.apache.camel.Endpoint; 034import org.apache.camel.Expression; 035import org.apache.camel.Processor; 036import org.apache.camel.RuntimeCamelException; 037import org.apache.camel.processor.saga.SagaProcessorBuilder; 038import org.apache.camel.saga.CamelSagaService; 039import org.apache.camel.saga.CamelSagaStep; 040import org.apache.camel.spi.Metadata; 041import org.apache.camel.spi.RouteContext; 042import org.apache.camel.util.CamelContextHelper; 043import org.apache.camel.util.ObjectHelper; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * Enables sagas on the route 049 * 050 * @version 051 */ 052@Metadata(label = "eip,routing") 053@XmlRootElement(name = "saga") 054@XmlAccessorType(XmlAccessType.FIELD) 055public class SagaDefinition extends OutputDefinition<SagaDefinition> { 056 057 private static final Logger LOG = LoggerFactory.getLogger(SagaDefinition.class); 058 059 @XmlAttribute 060 @Metadata(defaultValue = "REQUIRED") 061 private SagaPropagation propagation; 062 063 @XmlAttribute 064 @Metadata(defaultValue = "AUTO") 065 private SagaCompletionMode completionMode; 066 067 @XmlAttribute 068 private Long timeoutInMilliseconds; 069 070 @XmlElement 071 private SagaActionUriDefinition compensation; 072 073 @XmlElement 074 private SagaActionUriDefinition completion; 075 076 @XmlElement(name = "option") 077 private List<SagaOptionDefinition> options; 078 079 @XmlTransient 080 private CamelSagaService sagaService; // TODO add ref for xml configuration 081 082 public SagaDefinition() { 083 } 084 085 @Override 086 public Processor createProcessor(RouteContext routeContext) throws Exception { 087 Optional<Endpoint> compensationEndpoint = Optional.ofNullable(this.compensation) 088 .map(SagaActionUriDefinition::getUri) 089 .map(routeContext::resolveEndpoint); 090 091 Optional<Endpoint> completionEndpoint = Optional.ofNullable(this.completion) 092 .map(SagaActionUriDefinition::getUri) 093 .map(routeContext::resolveEndpoint); 094 095 Map<String, Expression> optionsMap = new TreeMap<>(); 096 if (this.options != null) { 097 for (SagaOptionDefinition optionDef : this.options) { 098 String optionName = optionDef.getOptionName(); 099 Expression expr = optionDef.getExpression(); 100 optionsMap.put(optionName, expr); 101 } 102 } 103 104 CamelSagaStep step = new CamelSagaStep(compensationEndpoint, completionEndpoint, optionsMap, Optional.ofNullable(timeoutInMilliseconds)); 105 106 SagaPropagation propagation = this.propagation; 107 if (propagation == null) { 108 // default propagation mode 109 propagation = SagaPropagation.REQUIRED; 110 } 111 112 SagaCompletionMode completionMode = this.completionMode; 113 if (completionMode == null) { 114 // default completion mode 115 completionMode = SagaCompletionMode.defaultCompletionMode(); 116 } 117 118 Processor childProcessor = this.createChildProcessor(routeContext, true); 119 CamelSagaService camelSagaService = findSagaService(routeContext.getCamelContext()); 120 121 camelSagaService.registerStep(step); 122 123 return new SagaProcessorBuilder() 124 .camelContext(routeContext.getCamelContext()) 125 .childProcessor(childProcessor) 126 .sagaService(camelSagaService) 127 .step(step) 128 .propagation(propagation) 129 .completionMode(completionMode) 130 .build(); 131 } 132 133 @Override 134 public boolean isAbstract() { 135 return true; 136 } 137 138 @Override 139 public boolean isTopLevelOnly() { 140 return true; 141 } 142 143 @Override 144 public boolean isWrappingEntireOutput() { 145 return true; 146 } 147 148 @Override 149 public String getLabel() { 150 String desc = description(); 151 if (ObjectHelper.isEmpty(desc)) { 152 return "saga"; 153 } else { 154 return "saga[" + desc + "]"; 155 } 156 } 157 158 @Override 159 public String toString() { 160 String desc = description(); 161 if (ObjectHelper.isEmpty(desc)) { 162 return "Saga -> [" + outputs + "]"; 163 } else { 164 return "Saga[" + desc + "] -> [" + outputs + "]"; 165 } 166 } 167 168 // Properties 169 170 171 public SagaActionUriDefinition getCompensation() { 172 return compensation; 173 } 174 175 /** 176 * The compensation endpoint URI that must be called to compensate all changes done in the route. 177 * The route corresponding to the compensation URI must perform compensation and complete without error. 178 * 179 * If errors occur during compensation, the saga service may call again the compensation URI to retry. 180 */ 181 public void setCompensation(SagaActionUriDefinition compensation) { 182 this.compensation = compensation; 183 } 184 185 public SagaActionUriDefinition getCompletion() { 186 return completion; 187 } 188 189 /** 190 * The completion endpoint URI that will be called when the Saga is completed successfully. 191 * The route corresponding to the completion URI must perform completion tasks and terminate without error. 192 * 193 * If errors occur during completion, the saga service may call again the completion URI to retry. 194 */ 195 public void setCompletion(SagaActionUriDefinition completion) { 196 this.completion = completion; 197 } 198 199 public SagaPropagation getPropagation() { 200 return propagation; 201 } 202 203 /** 204 * Set the Saga propagation mode (REQUIRED, REQUIRES_NEW, MANDATORY, SUPPORTS, NOT_SUPPORTED, NEVER). 205 */ 206 public void setPropagation(SagaPropagation propagation) { 207 this.propagation = propagation; 208 } 209 210 public SagaCompletionMode getCompletionMode() { 211 return completionMode; 212 } 213 214 /** 215 * Determine how the saga should be considered complete. When set to AUTO, the saga is completed when the exchange that 216 * initiates the saga is processed successfully, or compensated when it completes exceptionally. 217 * 218 * When set to MANUAL, the user must complete or compensate the saga using the "saga:complete" or "saga:compensate" endpoints. 219 */ 220 public void setCompletionMode(SagaCompletionMode completionMode) { 221 this.completionMode = completionMode; 222 } 223 224 public CamelSagaService getSagaService() { 225 return sagaService; 226 } 227 228 public void setSagaService(CamelSagaService sagaService) { 229 this.sagaService = sagaService; 230 } 231 232 public List<SagaOptionDefinition> getOptions() { 233 return options; 234 } 235 236 /** 237 * Allows to save properties of the current exchange in order to re-use them in a compensation/completion callback route. 238 * Options are usually helpful e.g. to store and retrieve identifiers of objects that should be deleted in compensating actions. 239 * 240 * Option values will be transformed into input headers of the compensation/completion exchange. 241 */ 242 public void setOptions(List<SagaOptionDefinition> options) { 243 this.options = options; 244 } 245 246 public Long getTimeoutInMilliseconds() { 247 return timeoutInMilliseconds; 248 } 249 250 /** 251 * Set the maximum amount of time for the Saga. After the timeout is expired, the saga will be compensated 252 * automatically (unless a different decision has been taken in the meantime). 253 */ 254 public void setTimeoutInMilliseconds(Long timeoutInMilliseconds) { 255 this.timeoutInMilliseconds = timeoutInMilliseconds; 256 } 257 258 private void addOption(String option, Expression expression) { 259 if (this.options == null) { 260 this.options = new ArrayList<>(); 261 } 262 this.options.add(new SagaOptionDefinition(option, expression)); 263 } 264 265 // Builders 266 267 public SagaDefinition compensation(String compensation) { 268 if (this.compensation != null) { 269 throw new IllegalStateException("Compensation has already been set"); 270 } 271 this.compensation = new SagaActionUriDefinition(compensation); 272 return this; 273 } 274 275 public SagaDefinition completion(String completion) { 276 if (this.completion != null) { 277 throw new IllegalStateException("Completion has already been set"); 278 } 279 this.completion = new SagaActionUriDefinition(completion); 280 return this; 281 } 282 283 public SagaDefinition propagation(SagaPropagation propagation) { 284 setPropagation(propagation); 285 return this; 286 } 287 288 public SagaDefinition sagaService(CamelSagaService sagaService) { 289 setSagaService(sagaService); 290 return this; 291 } 292 293 public SagaDefinition completionMode(SagaCompletionMode completionMode) { 294 setCompletionMode(completionMode); 295 return this; 296 } 297 298 public SagaDefinition option(String option, Expression expression) { 299 addOption(option, expression); 300 return this; 301 } 302 303 public SagaDefinition timeout(long timeout, TimeUnit unit) { 304 setTimeoutInMilliseconds(unit.toMillis(timeout)); 305 return this; 306 } 307 308 // Utils 309 310 protected CamelSagaService findSagaService(CamelContext context) { 311 CamelSagaService sagaService = getSagaService(); 312 if (sagaService != null) { 313 return sagaService; 314 } 315 316 sagaService = context.hasService(CamelSagaService.class); 317 if (sagaService != null) { 318 return sagaService; 319 } 320 321 sagaService = CamelContextHelper.findByType(context, CamelSagaService.class); 322 if (sagaService != null) { 323 return sagaService; 324 } 325 326 throw new RuntimeCamelException("Cannot find a CamelSagaService"); 327 } 328 329 protected String description() { 330 StringBuilder desc = new StringBuilder(); 331 addField(desc, "compensation", compensation); 332 addField(desc, "completion", completion); 333 addField(desc, "propagation", propagation); 334 return desc.toString(); 335 } 336 337 private void addField(StringBuilder builder, String key, Object value) { 338 if (value == null) { 339 return; 340 } 341 if (builder.length() > 0) { 342 builder.append(','); 343 } 344 builder.append(key).append(':').append(value); 345 } 346 347}