001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.management.mbean;
018
019import java.util.concurrent.BlockingQueue;
020import java.util.concurrent.ThreadPoolExecutor;
021import java.util.concurrent.TimeUnit;
022
023import org.apache.camel.CamelContext;
024import org.apache.camel.api.management.ManagedResource;
025import org.apache.camel.api.management.mbean.ManagedThreadsMBean;
026import org.apache.camel.model.ProcessorDefinition;
027import org.apache.camel.processor.ThreadsProcessor;
028
029@ManagedResource(description = "Managed Threads")
030public class ManagedThreads extends ManagedProcessor implements ManagedThreadsMBean {
031    private final ThreadsProcessor processor;
032
033    public ManagedThreads(CamelContext context, ThreadsProcessor processor, ProcessorDefinition<?> definition) {
034        super(context, processor, definition);
035        this.processor = processor;
036    }
037
038    @Override
039    public Boolean isCallerRunsWhenRejected() {
040        if (processor.getExecutorService() instanceof ThreadPoolExecutor) {
041            String name = getRejectedPolicy();
042            return "CallerRuns".equals(name);
043        } else {
044            return null;
045        }
046    }
047
048    @Override
049    public String getRejectedPolicy() {
050        if (processor.getExecutorService() instanceof ThreadPoolExecutor) {
051            return ((ThreadPoolExecutor) processor.getExecutorService()).getRejectedExecutionHandler().toString();
052        } else {
053            return null;
054        }
055    }
056
057    @Override
058    public int getCorePoolSize() {
059        if (processor.getExecutorService() instanceof ThreadPoolExecutor) {
060            return ((ThreadPoolExecutor) processor.getExecutorService()).getCorePoolSize();
061        } else {
062            return 0;
063        }
064    }
065
066    @Override
067    public int getPoolSize() {
068        if (processor.getExecutorService() instanceof ThreadPoolExecutor) {
069            return ((ThreadPoolExecutor) processor.getExecutorService()).getPoolSize();
070        } else {
071            return 0;
072        }
073    }
074
075    @Override
076    public int getMaximumPoolSize() {
077        if (processor.getExecutorService() instanceof ThreadPoolExecutor) {
078            return ((ThreadPoolExecutor) processor.getExecutorService()).getMaximumPoolSize();
079        } else {
080            return 0;
081        }
082    }
083
084    @Override
085    public int getLargestPoolSize() {
086        if (processor.getExecutorService() instanceof ThreadPoolExecutor) {
087            return ((ThreadPoolExecutor) processor.getExecutorService()).getLargestPoolSize();
088        } else {
089            return 0;
090        }
091    }
092
093    @Override
094    public int getActiveCount() {
095        if (processor.getExecutorService() instanceof ThreadPoolExecutor) {
096            return ((ThreadPoolExecutor) processor.getExecutorService()).getActiveCount();
097        } else {
098            return 0;
099        }
100    }
101
102    @Override
103    public long getTaskCount() {
104        if (processor.getExecutorService() instanceof ThreadPoolExecutor) {
105            return ((ThreadPoolExecutor) processor.getExecutorService()).getTaskCount();
106        } else {
107            return 0;
108        }
109    }
110
111    @Override
112    public long getCompletedTaskCount() {
113        if (processor.getExecutorService() instanceof ThreadPoolExecutor) {
114            return ((ThreadPoolExecutor) processor.getExecutorService()).getCompletedTaskCount();
115        } else {
116            return 0;
117        }
118    }
119
120    @Override
121    public long getTaskQueueSize() {
122        if (processor.getExecutorService() instanceof ThreadPoolExecutor) {
123            BlockingQueue queue = ((ThreadPoolExecutor) processor.getExecutorService()).getQueue();
124            return queue != null ? queue.size() : 0;
125        } else {
126            return 0;
127        }
128    }
129
130    @Override
131    public long getKeepAliveTime() {
132        if (processor.getExecutorService() instanceof ThreadPoolExecutor) {
133            return ((ThreadPoolExecutor) processor.getExecutorService()).getKeepAliveTime(TimeUnit.SECONDS);
134        } else {
135            return 0;
136        }
137    }
138
139    @Override
140    public boolean isAllowCoreThreadTimeout() {
141        if (processor.getExecutorService() instanceof ThreadPoolExecutor) {
142            return ((ThreadPoolExecutor) processor.getExecutorService()).allowsCoreThreadTimeOut();
143        } else {
144            return false;
145        }
146    }
147
148}