001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.ArrayList;
020    import java.util.Iterator;
021    import java.util.List;
022    import java.util.concurrent.ExecutorService;
023    import java.util.concurrent.ScheduledExecutorService;
024    
025    import org.apache.camel.spi.ExecutorServiceManager;
026    import org.apache.camel.spi.RouteContext;
027    import org.apache.camel.util.ObjectHelper;
028    
029    /**
030     * Helper class for ProcessorDefinition and the other model classes.
031     */
032    public final class ProcessorDefinitionHelper {
033    
034        private ProcessorDefinitionHelper() {
035        }
036    
037        /**
038         * Looks for the given type in the list of outputs and recurring all the children as well.
039         *
040         * @param outputs  list of outputs, can be null or empty.
041         * @param type     the type to look for
042         * @return         the found definitions, or <tt>null</tt> if not found
043         */
044        public static <T> Iterator<T> filterTypeInOutputs(List<ProcessorDefinition<?>> outputs, Class<T> type) {
045            List<T> found = new ArrayList<T>();
046            doFindType(outputs, type, found);
047            return found.iterator();
048        }
049    
050        /**
051         * Looks for the given type in the list of outputs and recurring all the children as well.
052         * Will stop at first found and return it.
053         *
054         * @param outputs  list of outputs, can be null or empty.
055         * @param type     the type to look for
056         * @return         the first found type, or <tt>null</tt> if not found
057         */
058        public static <T> T findFirstTypeInOutputs(List<ProcessorDefinition<?>> outputs, Class<T> type) {
059            List<T> found = new ArrayList<T>();
060            doFindType(outputs, type, found);
061            if (found.isEmpty()) {
062                return null;
063            }
064            return found.iterator().next();
065        }
066    
067        /**
068         * Is the given child the first in the outputs from the parent?
069         *
070         * @param parentType the type the parent must be
071         * @param node the node
072         * @return <tt>true</tt> if first child, <tt>false</tt> otherwise
073         */
074        public static boolean isFirstChildOfType(Class<?> parentType, ProcessorDefinition<?> node) {
075            if (node == null || node.getParent() == null) {
076                return false;
077            }
078    
079            if (node.getParent().getOutputs().isEmpty()) {
080                return false;
081            }
082    
083            if (!(node.getParent().getClass().equals(parentType))) {
084                return false;
085            }
086    
087            return node.getParent().getOutputs().get(0).equals(node);
088        }
089    
090        /**
091         * Is the given node parent(s) of the given type
092         * @param parentType   the parent type
093         * @param node         the current node
094         * @param recursive    whether or not to check grand parent(s) as well
095         * @return <tt>true</tt> if parent(s) is of given type, <tt>false</tt> otherwise
096         */
097        public static boolean isParentOfType(Class<?> parentType, ProcessorDefinition<?> node, boolean recursive) {
098            if (node == null || node.getParent() == null) {
099                return false;
100            }
101    
102            if (parentType.isAssignableFrom(node.getParent().getClass())) {
103                return true;
104            } else if (recursive) {
105                // recursive up the tree of parents
106                return isParentOfType(parentType, node.getParent(), true);
107            } else {
108                // no match
109                return false;
110            }
111        }
112    
113        /**
114         * Gets the route definition the given node belongs to.
115         *
116         * @param node the node
117         * @return the route, or <tt>null</tt> if not possible to find
118         */
119        public static RouteDefinition getRoute(ProcessorDefinition<?> node) {
120            if (node == null) {
121                return null;
122            }
123    
124            ProcessorDefinition<?> def = node;
125            // drill to the top
126            while (def != null && def.getParent() != null) {
127                def = def.getParent();
128            }
129    
130            if (def instanceof RouteDefinition) {
131                return (RouteDefinition) def;
132            } else {
133                // not found
134                return null;
135            }
136    
137        }
138    
139        @SuppressWarnings({"unchecked", "rawtypes"})
140        private static <T> void doFindType(List<ProcessorDefinition<?>> outputs, Class<T> type, List<T> found) {
141            if (outputs == null || outputs.isEmpty()) {
142                return;
143            }
144    
145            for (ProcessorDefinition out : outputs) {
146                if (type.isInstance(out)) {
147                    found.add((T)out);
148                }
149    
150                // send is much common
151                if (out instanceof SendDefinition) {
152                    SendDefinition send = (SendDefinition) out;
153                    List<ProcessorDefinition<?>> children = send.getOutputs();
154                    doFindType(children, type, found);
155                }
156    
157                // special for choice
158                if (out instanceof ChoiceDefinition) {
159                    ChoiceDefinition choice = (ChoiceDefinition) out;
160                    for (WhenDefinition when : choice.getWhenClauses()) {
161                        List<ProcessorDefinition<?>> children = when.getOutputs();
162                        doFindType(children, type, found);
163                    }
164    
165                    // otherwise is optional
166                    if (choice.getOtherwise() != null) {
167                        List<ProcessorDefinition<?>> children = choice.getOtherwise().getOutputs();
168                        doFindType(children, type, found);
169                    }
170                }
171    
172                // try children as well
173                List<ProcessorDefinition<?>> children = out.getOutputs();
174                doFindType(children, type, found);
175            }
176        }
177    
178        /**
179         * Is there any outputs in the given list.
180         * <p/>
181         * Is used for check if the route output has any real outputs (non abstracts)
182         *
183         * @param outputs           the outputs
184         * @param excludeAbstract   whether or not to exclude abstract outputs (e.g. skip onException etc.)
185         * @return <tt>true</tt> if has outputs, otherwise <tt>false</tt> is returned
186         */
187        @SuppressWarnings({"unchecked", "rawtypes"})
188        public static boolean hasOutputs(List<ProcessorDefinition<?>> outputs, boolean excludeAbstract) {
189            if (outputs == null || outputs.isEmpty()) {
190                return false;
191            }
192            if (!excludeAbstract) {
193                return !outputs.isEmpty();
194            }
195            for (ProcessorDefinition output : outputs) {
196                if (output instanceof TransactedDefinition || output instanceof PolicyDefinition) {
197                    // special for those as they wrap entire output, so we should just check its output
198                    return hasOutputs(output.getOutputs(), excludeAbstract);
199                }
200                if (!output.isAbstract()) {
201                    return true;
202                }
203            }
204            return false;
205        }
206    
207        /**
208         * Determines whether a new thread pool will be created or not.
209         * <p/>
210         * This is used to know if a new thread pool will be created, and therefore is not shared by others, and therefore
211         * exclusive to the definition.
212         *
213         * @param routeContext   the route context
214         * @param definition     the node definition which may leverage executor service.
215         * @param useDefault     whether to fallback and use a default thread pool, if no explicit configured
216         * @return <tt>true</tt> if a new thread pool will be created, <tt>false</tt> if not
217         * @see #getConfiguredExecutorService(org.apache.camel.spi.RouteContext, String, ExecutorServiceAwareDefinition, boolean)
218         */
219        public static boolean willCreateNewThreadPool(RouteContext routeContext, ExecutorServiceAwareDefinition<?> definition, boolean useDefault) {
220            ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
221            ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext());
222            
223            if (definition.getExecutorService() != null) {
224                // no there is a custom thread pool configured
225                return false;
226            } else if (definition.getExecutorServiceRef() != null) {
227                ExecutorService answer = routeContext.getCamelContext().getRegistry().lookup(definition.getExecutorServiceRef(), ExecutorService.class);
228                // if no existing thread pool, then we will have to create a new thread pool
229                return answer == null;
230            } else if (useDefault) {
231                return true;
232            }
233    
234            return false;
235        }
236    
237        /**
238         * Will lookup in {@link org.apache.camel.spi.Registry} for a {@link ExecutorService} registered with the given
239         * <tt>executorServiceRef</tt> name.
240         * <p/>
241         * This method will lookup for configured thread pool in the following order
242         * <ul>
243         *   <li>from the {@link org.apache.camel.spi.Registry} if found</li>
244         *   <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li>
245         *   <li>if none found, then <tt>null</tt> is returned.</li>
246         * </ul>
247         * @param routeContext   the route context
248         * @param name           name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService}
249         *                       is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}.
250         * @param source         the source to use the thread pool
251         * @param executorServiceRef reference name of the thread pool
252         * @return the executor service, or <tt>null</tt> if none was found.
253         */
254        public static ExecutorService lookupExecutorServiceRef(RouteContext routeContext, String name,
255                                                               Object source, String executorServiceRef) {
256    
257            ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
258            ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext());
259            ObjectHelper.notNull(executorServiceRef, "executorServiceRef");
260    
261            // lookup in registry first and use existing thread pool if exists
262            ExecutorService answer = routeContext.getCamelContext().getRegistry().lookup(executorServiceRef, ExecutorService.class);
263            if (answer == null) {
264                // then create a thread pool assuming the ref is a thread pool profile id
265                answer = manager.newThreadPool(source, name, executorServiceRef);
266            }
267            return answer;
268        }
269    
270        /**
271         * Will lookup and get the configured {@link java.util.concurrent.ExecutorService} from the given definition.
272         * <p/>
273         * This method will lookup for configured thread pool in the following order
274         * <ul>
275         *   <li>from the definition if any explicit configured executor service.</li>
276         *   <li>from the {@link org.apache.camel.spi.Registry} if found</li>
277         *   <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li>
278         *   <li>if none found, then <tt>null</tt> is returned.</li>
279         * </ul>
280         * The various {@link ExecutorServiceAwareDefinition} should use this helper method to ensure they support
281         * configured executor services in the same coherent way.
282         *
283         * @param routeContext   the route context
284         * @param name           name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService}
285         *                       is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}.
286         * @param definition     the node definition which may leverage executor service.
287         * @param useDefault     whether to fallback and use a default thread pool, if no explicit configured
288         * @return the configured executor service, or <tt>null</tt> if none was configured.
289         * @throws IllegalArgumentException is thrown if lookup of executor service in {@link org.apache.camel.spi.Registry} was not found
290         */
291        public static ExecutorService getConfiguredExecutorService(RouteContext routeContext, String name,
292                                                                   ExecutorServiceAwareDefinition<?> definition,
293                                                                   boolean useDefault) throws IllegalArgumentException {
294            ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
295            ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext());
296    
297            // prefer to use explicit configured executor on the definition
298            if (definition.getExecutorService() != null) {
299                return definition.getExecutorService();
300            } else if (definition.getExecutorServiceRef() != null) {
301                // lookup in registry first and use existing thread pool if exists
302                ExecutorService answer = lookupExecutorServiceRef(routeContext, name, definition, definition.getExecutorServiceRef());
303                if (answer == null) {
304                    throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry or as a thread pool profile.");
305                }
306                return answer;
307            } else if (useDefault) {
308                return manager.newDefaultThreadPool(definition, name);
309            }
310    
311            return null;
312        }
313    
314        /**
315         * Will lookup in {@link org.apache.camel.spi.Registry} for a {@link ScheduledExecutorService} registered with the given
316         * <tt>executorServiceRef</tt> name.
317         * <p/>
318         * This method will lookup for configured thread pool in the following order
319         * <ul>
320         *   <li>from the {@link org.apache.camel.spi.Registry} if found</li>
321         *   <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li>
322         *   <li>if none found, then <tt>null</tt> is returned.</li>
323         * </ul>
324         * @param routeContext   the route context
325         * @param name           name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService}
326         *                       is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}.
327         * @param source         the source to use the thread pool
328         * @param executorServiceRef reference name of the thread pool
329         * @return the executor service, or <tt>null</tt> if none was found.
330         */
331        public static ScheduledExecutorService lookupScheduledExecutorServiceRef(RouteContext routeContext, String name,
332                                                                                 Object source, String executorServiceRef) {
333    
334            ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
335            ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext());
336            ObjectHelper.notNull(executorServiceRef, "executorServiceRef");
337    
338            // lookup in registry first and use existing thread pool if exists
339            ScheduledExecutorService answer = routeContext.getCamelContext().getRegistry().lookup(executorServiceRef, ScheduledExecutorService.class);
340            if (answer == null) {
341                // then create a thread pool assuming the ref is a thread pool profile id
342                answer = manager.newScheduledThreadPool(source, name, executorServiceRef);
343            }
344            return answer;
345        }
346    
347        /**
348         * Will lookup and get the configured {@link java.util.concurrent.ScheduledExecutorService} from the given definition.
349         * <p/>
350         * This method will lookup for configured thread pool in the following order
351         * <ul>
352         *   <li>from the definition if any explicit configured executor service.</li>
353         *   <li>from the {@link org.apache.camel.spi.Registry} if found</li>
354         *   <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li>
355         *   <li>if none found, then <tt>null</tt> is returned.</li>
356         * </ul>
357         * The various {@link ExecutorServiceAwareDefinition} should use this helper method to ensure they support
358         * configured executor services in the same coherent way.
359         *
360         * @param routeContext   the rout context
361         * @param name           name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService}
362         *                       is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}.
363         * @param definition     the node definition which may leverage executor service.
364         * @param useDefault     whether to fallback and use a default thread pool, if no explicit configured
365         * @return the configured executor service, or <tt>null</tt> if none was configured.
366         * @throws IllegalArgumentException is thrown if the found instance is not a ScheduledExecutorService type,
367         * or lookup of executor service in {@link org.apache.camel.spi.Registry} was not found
368         */
369        public static ScheduledExecutorService getConfiguredScheduledExecutorService(RouteContext routeContext, String name,
370                                                                   ExecutorServiceAwareDefinition<?> definition,
371                                                                   boolean useDefault) throws IllegalArgumentException {
372            ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
373            ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext());
374    
375            // prefer to use explicit configured executor on the definition
376            if (definition.getExecutorService() != null) {
377                ExecutorService executorService = definition.getExecutorService();
378                if (executorService instanceof ScheduledExecutorService) {
379                    return (ScheduledExecutorService) executorService;
380                }
381                throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " is not an ScheduledExecutorService instance");
382            } else if (definition.getExecutorServiceRef() != null) {
383                ScheduledExecutorService answer = lookupScheduledExecutorServiceRef(routeContext, name, definition, definition.getExecutorServiceRef());
384                if (answer == null) {
385                    throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry or as a thread pool profile.");
386                }
387                return answer;
388            } else if (useDefault) {
389                return manager.newDefaultScheduledThreadPool(definition, name);
390            }
391    
392            return null;
393        }
394    
395    }