001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.lang.reflect.Method; 020 import java.util.Map; 021 import javax.xml.bind.annotation.XmlAccessType; 022 import javax.xml.bind.annotation.XmlAccessorType; 023 import javax.xml.bind.annotation.XmlAttribute; 024 import javax.xml.bind.annotation.XmlRootElement; 025 import javax.xml.bind.annotation.XmlTransient; 026 027 import org.apache.camel.NoSuchBeanException; 028 import org.apache.camel.Processor; 029 import org.apache.camel.RuntimeCamelException; 030 import org.apache.camel.processor.WrapProcessor; 031 import org.apache.camel.spi.Policy; 032 import org.apache.camel.spi.RouteContext; 033 import org.apache.camel.spi.TransactedPolicy; 034 import org.apache.camel.util.CamelContextHelper; 035 import org.apache.camel.util.ObjectHelper; 036 import org.slf4j.Logger; 037 import org.slf4j.LoggerFactory; 038 039 /** 040 * Represents an XML <transacted/> element 041 * 042 * @version 043 */ 044 @XmlRootElement(name = "transacted") 045 @XmlAccessorType(XmlAccessType.FIELD) 046 public class TransactedDefinition extends OutputDefinition<TransactedDefinition> { 047 048 // TODO: Align this code with PolicyDefinition 049 050 // JAXB does not support changing the ref attribute from required to optional 051 // if we extend PolicyDefinition so we must make a copy of the class 052 @XmlTransient 053 public static final String PROPAGATION_REQUIRED = "PROPAGATION_REQUIRED"; 054 055 private static final transient Logger LOG = LoggerFactory.getLogger(TransactedDefinition.class); 056 057 @XmlTransient 058 protected Class<? extends Policy> type = TransactedPolicy.class; 059 @XmlAttribute 060 protected String ref; 061 @XmlTransient 062 private Policy policy; 063 064 public TransactedDefinition() { 065 } 066 067 public TransactedDefinition(Policy policy) { 068 this.policy = policy; 069 } 070 071 @Override 072 public String toString() { 073 return "Transacted[" + description() + "]"; 074 } 075 076 protected String description() { 077 if (ref != null) { 078 return "ref:" + ref; 079 } else if (policy != null) { 080 return policy.toString(); 081 } else { 082 return ""; 083 } 084 } 085 086 @Override 087 public String getShortName() { 088 return "transacted"; 089 } 090 091 @Override 092 public String getLabel() { 093 return "transacted[" + description() + "]"; 094 } 095 096 @Override 097 public boolean isAbstract() { 098 return true; 099 } 100 101 public String getRef() { 102 return ref; 103 } 104 105 public void setRef(String ref) { 106 this.ref = ref; 107 } 108 109 /** 110 * Sets a policy type that this definition should scope within. 111 * <p/> 112 * Is used for convention over configuration situations where the policy 113 * should be automatic looked up in the registry and it should be based 114 * on this type. For instance a {@link org.apache.camel.spi.TransactedPolicy} 115 * can be set as type for easy transaction configuration. 116 * <p/> 117 * Will by default scope to the wide {@link Policy} 118 * 119 * @param type the policy type 120 */ 121 public void setType(Class<? extends Policy> type) { 122 this.type = type; 123 } 124 125 /** 126 * Sets a reference to use for lookup the policy in the registry. 127 * 128 * @param ref the reference 129 * @return the builder 130 */ 131 public TransactedDefinition ref(String ref) { 132 setRef(ref); 133 return this; 134 } 135 136 @Override 137 public Processor createProcessor(RouteContext routeContext) throws Exception { 138 Policy policy = resolvePolicy(routeContext); 139 ObjectHelper.notNull(policy, "policy", this); 140 141 // before wrap 142 policy.beforeWrap(routeContext, this); 143 144 // create processor after the before wrap 145 Processor childProcessor = this.createChildProcessor(routeContext, true); 146 147 // wrap 148 Processor target = policy.wrap(routeContext, childProcessor); 149 150 // wrap the target so it becomes a service and we can manage its lifecycle 151 WrapProcessor wrap = new WrapProcessor(target, childProcessor); 152 return wrap; 153 } 154 155 protected Policy resolvePolicy(RouteContext routeContext) { 156 if (policy != null) { 157 return policy; 158 } 159 return doResolvePolicy(routeContext, getRef(), type); 160 } 161 162 protected static Policy doResolvePolicy(RouteContext routeContext, String ref, Class<? extends Policy> type) { 163 // explicit ref given so lookup by it 164 if (ObjectHelper.isNotEmpty(ref)) { 165 return CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), ref, Policy.class); 166 } 167 168 // no explicit reference given from user so we can use some convention over configuration here 169 170 // try to lookup by scoped type 171 Policy answer = null; 172 if (type != null) { 173 // try find by type, note that this method is not supported by all registry 174 Map<String, ?> types = routeContext.lookupByType(type); 175 if (types.size() == 1) { 176 // only one policy defined so use it 177 Object found = types.values().iterator().next(); 178 if (type.isInstance(found)) { 179 return type.cast(found); 180 } 181 } 182 } 183 184 // for transacted routing try the default REQUIRED name 185 if (type == TransactedPolicy.class) { 186 // still not found try with the default name PROPAGATION_REQUIRED 187 answer = routeContext.lookup(PROPAGATION_REQUIRED, TransactedPolicy.class); 188 } 189 190 // this logic only applies if we are a transacted policy 191 // still no policy found then try lookup the platform transaction manager and use it as policy 192 if (answer == null && type == TransactedPolicy.class) { 193 Class<?> tmClazz = routeContext.getCamelContext().getClassResolver().resolveClass("org.springframework.transaction.PlatformTransactionManager"); 194 if (tmClazz != null) { 195 // see if we can find the platform transaction manager in the registry 196 Map<String, ?> maps = routeContext.lookupByType(tmClazz); 197 if (maps.size() == 1) { 198 // only one platform manager then use it as default and create a transacted 199 // policy with it and default to required 200 201 // as we do not want dependency on spring jars in the camel-core we use 202 // reflection to lookup classes and create new objects and call methods 203 // as this is only done during route building it does not matter that we 204 // use reflection as performance is no a concern during route building 205 Object transactionManager = maps.values().iterator().next(); 206 LOG.debug("One instance of PlatformTransactionManager found in registry: {}", transactionManager); 207 Class<?> txClazz = routeContext.getCamelContext().getClassResolver().resolveClass("org.apache.camel.spring.spi.SpringTransactionPolicy"); 208 if (txClazz != null) { 209 LOG.debug("Creating a new temporary SpringTransactionPolicy using the PlatformTransactionManager: {}", transactionManager); 210 TransactedPolicy txPolicy = ObjectHelper.newInstance(txClazz, TransactedPolicy.class); 211 Method method; 212 try { 213 method = txClazz.getMethod("setTransactionManager", tmClazz); 214 } catch (NoSuchMethodException e) { 215 throw new RuntimeCamelException("Cannot get method setTransactionManager(PlatformTransactionManager) on class: " + txClazz); 216 } 217 ObjectHelper.invokeMethod(method, txPolicy, transactionManager); 218 return txPolicy; 219 } else { 220 // camel-spring is missing on the classpath 221 throw new RuntimeCamelException("Cannot create a transacted policy as camel-spring.jar is not on the classpath!"); 222 } 223 } else { 224 if (maps.isEmpty()) { 225 throw new NoSuchBeanException(null, "PlatformTransactionManager"); 226 } else { 227 throw new IllegalArgumentException("Found " + maps.size() + " PlatformTransactionManager in registry. " 228 + "Cannot determine which one to use. Please configure a TransactionTemplate on the transacted policy."); 229 } 230 } 231 } 232 } 233 234 return answer; 235 } 236 237 }