001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.Iterator; 021 import java.util.List; 022 import java.util.concurrent.ExecutorService; 023 import java.util.concurrent.ScheduledExecutorService; 024 025 import org.apache.camel.spi.ExecutorServiceManager; 026 import org.apache.camel.spi.RouteContext; 027 import org.apache.camel.util.ObjectHelper; 028 029 /** 030 * Helper class for ProcessorDefinition and the other model classes. 031 */ 032 public final class ProcessorDefinitionHelper { 033 034 private ProcessorDefinitionHelper() { 035 } 036 037 /** 038 * Looks for the given type in the list of outputs and recurring all the children as well. 039 * 040 * @param outputs list of outputs, can be null or empty. 041 * @param type the type to look for 042 * @return the found definitions, or <tt>null</tt> if not found 043 */ 044 public static <T> Iterator<T> filterTypeInOutputs(List<ProcessorDefinition<?>> outputs, Class<T> type) { 045 List<T> found = new ArrayList<T>(); 046 doFindType(outputs, type, found); 047 return found.iterator(); 048 } 049 050 /** 051 * Looks for the given type in the list of outputs and recurring all the children as well. 052 * Will stop at first found and return it. 053 * 054 * @param outputs list of outputs, can be null or empty. 055 * @param type the type to look for 056 * @return the first found type, or <tt>null</tt> if not found 057 */ 058 public static <T> T findFirstTypeInOutputs(List<ProcessorDefinition<?>> outputs, Class<T> type) { 059 List<T> found = new ArrayList<T>(); 060 doFindType(outputs, type, found); 061 if (found.isEmpty()) { 062 return null; 063 } 064 return found.iterator().next(); 065 } 066 067 /** 068 * Is the given child the first in the outputs from the parent? 069 * 070 * @param parentType the type the parent must be 071 * @param node the node 072 * @return <tt>true</tt> if first child, <tt>false</tt> otherwise 073 */ 074 public static boolean isFirstChildOfType(Class<?> parentType, ProcessorDefinition<?> node) { 075 if (node == null || node.getParent() == null) { 076 return false; 077 } 078 079 if (node.getParent().getOutputs().isEmpty()) { 080 return false; 081 } 082 083 if (!(node.getParent().getClass().equals(parentType))) { 084 return false; 085 } 086 087 return node.getParent().getOutputs().get(0).equals(node); 088 } 089 090 /** 091 * Is the given node parent(s) of the given type 092 * @param parentType the parent type 093 * @param node the current node 094 * @param recursive whether or not to check grand parent(s) as well 095 * @return <tt>true</tt> if parent(s) is of given type, <tt>false</tt> otherwise 096 */ 097 public static boolean isParentOfType(Class<?> parentType, ProcessorDefinition<?> node, boolean recursive) { 098 if (node == null || node.getParent() == null) { 099 return false; 100 } 101 102 if (parentType.isAssignableFrom(node.getParent().getClass())) { 103 return true; 104 } else if (recursive) { 105 // recursive up the tree of parents 106 return isParentOfType(parentType, node.getParent(), true); 107 } else { 108 // no match 109 return false; 110 } 111 } 112 113 /** 114 * Gets the route definition the given node belongs to. 115 * 116 * @param node the node 117 * @return the route, or <tt>null</tt> if not possible to find 118 */ 119 public static RouteDefinition getRoute(ProcessorDefinition<?> node) { 120 if (node == null) { 121 return null; 122 } 123 124 ProcessorDefinition<?> def = node; 125 // drill to the top 126 while (def != null && def.getParent() != null) { 127 def = def.getParent(); 128 } 129 130 if (def instanceof RouteDefinition) { 131 return (RouteDefinition) def; 132 } else { 133 // not found 134 return null; 135 } 136 137 } 138 139 @SuppressWarnings({"unchecked", "rawtypes"}) 140 private static <T> void doFindType(List<ProcessorDefinition<?>> outputs, Class<T> type, List<T> found) { 141 if (outputs == null || outputs.isEmpty()) { 142 return; 143 } 144 145 for (ProcessorDefinition out : outputs) { 146 if (type.isInstance(out)) { 147 found.add((T)out); 148 } 149 150 // send is much common 151 if (out instanceof SendDefinition) { 152 SendDefinition send = (SendDefinition) out; 153 List<ProcessorDefinition<?>> children = send.getOutputs(); 154 doFindType(children, type, found); 155 } 156 157 // special for choice 158 if (out instanceof ChoiceDefinition) { 159 ChoiceDefinition choice = (ChoiceDefinition) out; 160 for (WhenDefinition when : choice.getWhenClauses()) { 161 List<ProcessorDefinition<?>> children = when.getOutputs(); 162 doFindType(children, type, found); 163 } 164 165 // otherwise is optional 166 if (choice.getOtherwise() != null) { 167 List<ProcessorDefinition<?>> children = choice.getOtherwise().getOutputs(); 168 doFindType(children, type, found); 169 } 170 } 171 172 // try children as well 173 List<ProcessorDefinition<?>> children = out.getOutputs(); 174 doFindType(children, type, found); 175 } 176 } 177 178 /** 179 * Is there any outputs in the given list. 180 * <p/> 181 * Is used for check if the route output has any real outputs (non abstracts) 182 * 183 * @param outputs the outputs 184 * @param excludeAbstract whether or not to exclude abstract outputs (e.g. skip onException etc.) 185 * @return <tt>true</tt> if has outputs, otherwise <tt>false</tt> is returned 186 */ 187 @SuppressWarnings({"unchecked", "rawtypes"}) 188 public static boolean hasOutputs(List<ProcessorDefinition<?>> outputs, boolean excludeAbstract) { 189 if (outputs == null || outputs.isEmpty()) { 190 return false; 191 } 192 if (!excludeAbstract) { 193 return !outputs.isEmpty(); 194 } 195 for (ProcessorDefinition output : outputs) { 196 if (output instanceof TransactedDefinition || output instanceof PolicyDefinition) { 197 // special for those as they wrap entire output, so we should just check its output 198 return hasOutputs(output.getOutputs(), excludeAbstract); 199 } 200 if (!output.isAbstract()) { 201 return true; 202 } 203 } 204 return false; 205 } 206 207 /** 208 * Determines whether a new thread pool will be created or not. 209 * <p/> 210 * This is used to know if a new thread pool will be created, and therefore is not shared by others, and therefore 211 * exclusive to the definition. 212 * 213 * @param routeContext the route context 214 * @param definition the node definition which may leverage executor service. 215 * @param useDefault whether to fallback and use a default thread pool, if no explicit configured 216 * @return <tt>true</tt> if a new thread pool will be created, <tt>false</tt> if not 217 * @see #getConfiguredExecutorService(org.apache.camel.spi.RouteContext, String, ExecutorServiceAwareDefinition, boolean) 218 */ 219 public static boolean willCreateNewThreadPool(RouteContext routeContext, ExecutorServiceAwareDefinition<?> definition, boolean useDefault) { 220 ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); 221 ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); 222 223 if (definition.getExecutorService() != null) { 224 // no there is a custom thread pool configured 225 return false; 226 } else if (definition.getExecutorServiceRef() != null) { 227 ExecutorService answer = routeContext.getCamelContext().getRegistry().lookup(definition.getExecutorServiceRef(), ExecutorService.class); 228 // if no existing thread pool, then we will have to create a new thread pool 229 return answer == null; 230 } else if (useDefault) { 231 return true; 232 } 233 234 return false; 235 } 236 237 /** 238 * Will lookup in {@link org.apache.camel.spi.Registry} for a {@link ExecutorService} registered with the given 239 * <tt>executorServiceRef</tt> name. 240 * <p/> 241 * This method will lookup for configured thread pool in the following order 242 * <ul> 243 * <li>from the {@link org.apache.camel.spi.Registry} if found</li> 244 * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li> 245 * <li>if none found, then <tt>null</tt> is returned.</li> 246 * </ul> 247 * @param routeContext the route context 248 * @param name name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService} 249 * is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}. 250 * @param source the source to use the thread pool 251 * @param executorServiceRef reference name of the thread pool 252 * @return the executor service, or <tt>null</tt> if none was found. 253 */ 254 public static ExecutorService lookupExecutorServiceRef(RouteContext routeContext, String name, 255 Object source, String executorServiceRef) { 256 257 ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); 258 ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); 259 ObjectHelper.notNull(executorServiceRef, "executorServiceRef"); 260 261 // lookup in registry first and use existing thread pool if exists 262 ExecutorService answer = routeContext.getCamelContext().getRegistry().lookup(executorServiceRef, ExecutorService.class); 263 if (answer == null) { 264 // then create a thread pool assuming the ref is a thread pool profile id 265 answer = manager.newThreadPool(source, name, executorServiceRef); 266 } 267 return answer; 268 } 269 270 /** 271 * Will lookup and get the configured {@link java.util.concurrent.ExecutorService} from the given definition. 272 * <p/> 273 * This method will lookup for configured thread pool in the following order 274 * <ul> 275 * <li>from the definition if any explicit configured executor service.</li> 276 * <li>from the {@link org.apache.camel.spi.Registry} if found</li> 277 * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li> 278 * <li>if none found, then <tt>null</tt> is returned.</li> 279 * </ul> 280 * The various {@link ExecutorServiceAwareDefinition} should use this helper method to ensure they support 281 * configured executor services in the same coherent way. 282 * 283 * @param routeContext the route context 284 * @param name name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService} 285 * is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}. 286 * @param definition the node definition which may leverage executor service. 287 * @param useDefault whether to fallback and use a default thread pool, if no explicit configured 288 * @return the configured executor service, or <tt>null</tt> if none was configured. 289 * @throws IllegalArgumentException is thrown if lookup of executor service in {@link org.apache.camel.spi.Registry} was not found 290 */ 291 public static ExecutorService getConfiguredExecutorService(RouteContext routeContext, String name, 292 ExecutorServiceAwareDefinition<?> definition, 293 boolean useDefault) throws IllegalArgumentException { 294 ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); 295 ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); 296 297 // prefer to use explicit configured executor on the definition 298 if (definition.getExecutorService() != null) { 299 return definition.getExecutorService(); 300 } else if (definition.getExecutorServiceRef() != null) { 301 // lookup in registry first and use existing thread pool if exists 302 ExecutorService answer = lookupExecutorServiceRef(routeContext, name, definition, definition.getExecutorServiceRef()); 303 if (answer == null) { 304 throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry or as a thread pool profile."); 305 } 306 return answer; 307 } else if (useDefault) { 308 return manager.newDefaultThreadPool(definition, name); 309 } 310 311 return null; 312 } 313 314 /** 315 * Will lookup in {@link org.apache.camel.spi.Registry} for a {@link ScheduledExecutorService} registered with the given 316 * <tt>executorServiceRef</tt> name. 317 * <p/> 318 * This method will lookup for configured thread pool in the following order 319 * <ul> 320 * <li>from the {@link org.apache.camel.spi.Registry} if found</li> 321 * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li> 322 * <li>if none found, then <tt>null</tt> is returned.</li> 323 * </ul> 324 * @param routeContext the route context 325 * @param name name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService} 326 * is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}. 327 * @param source the source to use the thread pool 328 * @param executorServiceRef reference name of the thread pool 329 * @return the executor service, or <tt>null</tt> if none was found. 330 */ 331 public static ScheduledExecutorService lookupScheduledExecutorServiceRef(RouteContext routeContext, String name, 332 Object source, String executorServiceRef) { 333 334 ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); 335 ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); 336 ObjectHelper.notNull(executorServiceRef, "executorServiceRef"); 337 338 // lookup in registry first and use existing thread pool if exists 339 ScheduledExecutorService answer = routeContext.getCamelContext().getRegistry().lookup(executorServiceRef, ScheduledExecutorService.class); 340 if (answer == null) { 341 // then create a thread pool assuming the ref is a thread pool profile id 342 answer = manager.newScheduledThreadPool(source, name, executorServiceRef); 343 } 344 return answer; 345 } 346 347 /** 348 * Will lookup and get the configured {@link java.util.concurrent.ScheduledExecutorService} from the given definition. 349 * <p/> 350 * This method will lookup for configured thread pool in the following order 351 * <ul> 352 * <li>from the definition if any explicit configured executor service.</li> 353 * <li>from the {@link org.apache.camel.spi.Registry} if found</li> 354 * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li> 355 * <li>if none found, then <tt>null</tt> is returned.</li> 356 * </ul> 357 * The various {@link ExecutorServiceAwareDefinition} should use this helper method to ensure they support 358 * configured executor services in the same coherent way. 359 * 360 * @param routeContext the rout context 361 * @param name name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService} 362 * is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}. 363 * @param definition the node definition which may leverage executor service. 364 * @param useDefault whether to fallback and use a default thread pool, if no explicit configured 365 * @return the configured executor service, or <tt>null</tt> if none was configured. 366 * @throws IllegalArgumentException is thrown if the found instance is not a ScheduledExecutorService type, 367 * or lookup of executor service in {@link org.apache.camel.spi.Registry} was not found 368 */ 369 public static ScheduledExecutorService getConfiguredScheduledExecutorService(RouteContext routeContext, String name, 370 ExecutorServiceAwareDefinition<?> definition, 371 boolean useDefault) throws IllegalArgumentException { 372 ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); 373 ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); 374 375 // prefer to use explicit configured executor on the definition 376 if (definition.getExecutorService() != null) { 377 ExecutorService executorService = definition.getExecutorService(); 378 if (executorService instanceof ScheduledExecutorService) { 379 return (ScheduledExecutorService) executorService; 380 } 381 throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " is not an ScheduledExecutorService instance"); 382 } else if (definition.getExecutorServiceRef() != null) { 383 ScheduledExecutorService answer = lookupScheduledExecutorServiceRef(routeContext, name, definition, definition.getExecutorServiceRef()); 384 if (answer == null) { 385 throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry or as a thread pool profile."); 386 } 387 return answer; 388 } else if (useDefault) { 389 return manager.newDefaultScheduledThreadPool(definition, name); 390 } 391 392 return null; 393 } 394 395 }