001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import java.io.IOException;
020import java.net.URISyntaxException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027
028import javax.jms.Connection;
029import javax.jms.InvalidSelectorException;
030import javax.jms.MessageProducer;
031import javax.jms.Session;
032import javax.management.MalformedObjectNameException;
033import javax.management.ObjectName;
034import javax.management.openmbean.CompositeData;
035import javax.management.openmbean.CompositeDataSupport;
036import javax.management.openmbean.CompositeType;
037import javax.management.openmbean.OpenDataException;
038import javax.management.openmbean.TabularData;
039import javax.management.openmbean.TabularDataSupport;
040import javax.management.openmbean.TabularType;
041import org.apache.activemq.ActiveMQConnectionFactory;
042import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
043import org.apache.activemq.broker.region.Destination;
044import org.apache.activemq.broker.region.Subscription;
045import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
046import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
047import org.apache.activemq.command.ActiveMQDestination;
048import org.apache.activemq.command.ActiveMQMessage;
049import org.apache.activemq.command.ActiveMQTextMessage;
050import org.apache.activemq.command.Message;
051import org.apache.activemq.filter.BooleanExpression;
052import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
053import org.apache.activemq.selector.SelectorParser;
054import org.apache.activemq.util.URISupport;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058public class DestinationView implements DestinationViewMBean {
059    private static final Logger LOG = LoggerFactory.getLogger(DestinationViewMBean.class);
060    protected final Destination destination;
061    protected final ManagedRegionBroker broker;
062
063    public DestinationView(ManagedRegionBroker broker, Destination destination) {
064        this.broker = broker;
065        this.destination = destination;
066    }
067
068    public void gc() {
069        destination.gc();
070    }
071
072    @Override
073    public String getName() {
074        return destination.getName();
075    }
076
077    @Override
078    public void resetStatistics() {
079        destination.getDestinationStatistics().reset();
080    }
081
082    @Override
083    public long getEnqueueCount() {
084        return destination.getDestinationStatistics().getEnqueues().getCount();
085    }
086
087    @Override
088    public long getDequeueCount() {
089        return destination.getDestinationStatistics().getDequeues().getCount();
090    }
091
092    @Override
093    public long getForwardCount() {
094        return destination.getDestinationStatistics().getForwards().getCount();
095    }
096
097    @Override
098    public long getDispatchCount() {
099        return destination.getDestinationStatistics().getDispatched().getCount();
100    }
101
102    @Override
103    public long getInFlightCount() {
104        return destination.getDestinationStatistics().getInflight().getCount();
105    }
106
107    @Override
108    public long getExpiredCount() {
109        return destination.getDestinationStatistics().getExpired().getCount();
110    }
111
112    @Override
113    public long getConsumerCount() {
114        return destination.getDestinationStatistics().getConsumers().getCount();
115    }
116
117    @Override
118    public long getQueueSize() {
119        return destination.getDestinationStatistics().getMessages().getCount();
120    }
121
122    public long getMessagesCached() {
123        return destination.getDestinationStatistics().getMessagesCached().getCount();
124    }
125
126    @Override
127    public int getMemoryPercentUsage() {
128        return destination.getMemoryUsage().getPercentUsage();
129    }
130
131    @Override
132    public long getMemoryUsageByteCount() {
133        return destination.getMemoryUsage().getUsage();
134    }
135
136    @Override
137    public long getMemoryLimit() {
138        return destination.getMemoryUsage().getLimit();
139    }
140
141    @Override
142    public void setMemoryLimit(long limit) {
143        destination.getMemoryUsage().setLimit(limit);
144    }
145
146    @Override
147    public double getAverageEnqueueTime() {
148        return destination.getDestinationStatistics().getProcessTime().getAverageTime();
149    }
150
151    @Override
152    public long getMaxEnqueueTime() {
153        return destination.getDestinationStatistics().getProcessTime().getMaxTime();
154    }
155
156    @Override
157    public long getMinEnqueueTime() {
158        return destination.getDestinationStatistics().getProcessTime().getMinTime();
159    }
160
161    /**
162     * @return the average size of a message (bytes)
163     */
164    public long getAverageMessageSize() {
165        // we are okay with the size without decimals so cast to long
166        return (long) destination.getDestinationStatistics().getMessageSize().getAverageSize();
167    }
168
169    /**
170     * @return the max size of a message (bytes)
171     */
172    public long getMaxMessageSize() {
173        return destination.getDestinationStatistics().getMessageSize().getMaxSize();
174    }
175
176    /**
177     * @return the min size of a message (bytes)
178     */
179    public long getMinMessageSize() {
180        return destination.getDestinationStatistics().getMessageSize().getMinSize();
181    }
182
183
184    @Override
185    public boolean isPrioritizedMessages() {
186        return destination.isPrioritizedMessages();
187    }
188
189    @Override
190    public CompositeData[] browse() throws OpenDataException {
191        try {
192            return browse(null);
193        } catch (InvalidSelectorException e) {
194            // should not happen.
195            throw new RuntimeException(e);
196        }
197    }
198
199    @Override
200    public CompositeData[] browse(String selector) throws OpenDataException, InvalidSelectorException {
201        Message[] messages = destination.browse();
202        ArrayList<CompositeData> c = new ArrayList<CompositeData>();
203
204        NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext();
205        ctx.setDestination(destination.getActiveMQDestination());
206        BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
207
208        for (int i = 0; i < messages.length; i++) {
209            try {
210
211                if (selectorExpression == null) {
212                    c.add(OpenTypeSupport.convert(messages[i]));
213                } else {
214                    ctx.setMessageReference(messages[i]);
215                    if (selectorExpression.matches(ctx)) {
216                        c.add(OpenTypeSupport.convert(messages[i]));
217                    }
218                }
219
220            } catch (Throwable e) {
221                // TODO DELETE ME
222                System.out.println(e);
223                e.printStackTrace();
224                // TODO DELETE ME
225                LOG.warn("exception browsing destination", e);
226            }
227        }
228
229        CompositeData rc[] = new CompositeData[c.size()];
230        c.toArray(rc);
231        return rc;
232    }
233
234    /**
235     * Browses the current destination returning a list of messages
236     */
237    @Override
238    public List<Object> browseMessages() throws InvalidSelectorException {
239        return browseMessages(null);
240    }
241
242    /**
243     * Browses the current destination with the given selector returning a list
244     * of messages
245     */
246    @Override
247    public List<Object> browseMessages(String selector) throws InvalidSelectorException {
248        Message[] messages = destination.browse();
249        ArrayList<Object> answer = new ArrayList<Object>();
250
251        NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext();
252        ctx.setDestination(destination.getActiveMQDestination());
253        BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
254
255        for (int i = 0; i < messages.length; i++) {
256            try {
257                Message message = messages[i];
258                message.setReadOnlyBody(true);
259                if (selectorExpression == null) {
260                    answer.add(message);
261                } else {
262                    ctx.setMessageReference(message);
263                    if (selectorExpression.matches(ctx)) {
264                        answer.add(message);
265                    }
266                }
267
268            } catch (Throwable e) {
269                LOG.warn("exception browsing destination", e);
270            }
271        }
272        return answer;
273    }
274
275    @Override
276    public TabularData browseAsTable() throws OpenDataException {
277        try {
278            return browseAsTable(null);
279        } catch (InvalidSelectorException e) {
280            throw new RuntimeException(e);
281        }
282    }
283
284    @Override
285    public TabularData browseAsTable(String selector) throws OpenDataException, InvalidSelectorException {
286        OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
287        Message[] messages = destination.browse();
288        CompositeType ct = factory.getCompositeType();
289        TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" });
290        TabularDataSupport rc = new TabularDataSupport(tt);
291
292        NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext();
293        ctx.setDestination(destination.getActiveMQDestination());
294        BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
295
296        for (int i = 0; i < messages.length; i++) {
297            try {
298                if (selectorExpression == null) {
299                    rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
300                } else {
301                    ctx.setMessageReference(messages[i]);
302                    if (selectorExpression.matches(ctx)) {
303                        rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
304                    }
305                }
306            } catch (Throwable e) {
307                LOG.warn("exception browsing destination", e);
308            }
309        }
310
311        return rc;
312    }
313
314    @Override
315    public String sendTextMessageWithProperties(String properties) throws Exception {
316        String[] kvs = properties.split(",");
317        Map<String, String> props = new HashMap<String, String>();
318        for (String kv : kvs) {
319            String[] it = kv.split("=");
320            if (it.length == 2) {
321                props.put(it[0],it[1]);
322            }
323        }
324        return sendTextMessage(props, props.remove("body"), props.remove("username"), props.remove("password"));
325    }
326
327    @Override
328    public String sendTextMessage(String body) throws Exception {
329        return sendTextMessage(Collections.EMPTY_MAP, body);
330    }
331
332    @Override
333    public String sendTextMessage(Map headers, String body) throws Exception {
334        return sendTextMessage(headers, body, null, null);
335    }
336
337    @Override
338    public String sendTextMessage(String body, String user, @Sensitive String password) throws Exception {
339        return sendTextMessage(Collections.EMPTY_MAP, body, user, password);
340    }
341
342    @Override
343    public String sendTextMessage(Map<String, String> headers, String body, String userName, @Sensitive String password) throws Exception {
344
345        String brokerUrl = "vm://" + broker.getBrokerName();
346        ActiveMQDestination dest = destination.getActiveMQDestination();
347
348        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);
349        Connection connection = null;
350        try {
351
352            connection = cf.createConnection(userName, password);
353            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
354            MessageProducer producer = session.createProducer(dest);
355            ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body);
356
357            for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) {
358                Map.Entry entry = (Map.Entry) iter.next();
359                msg.setObjectProperty((String) entry.getKey(), entry.getValue());
360            }
361
362            producer.setDeliveryMode(msg.getJMSDeliveryMode());
363            producer.setPriority(msg.getPriority());
364            long ttl = 0;
365            if (msg.getExpiration() != 0) {
366                ttl = msg.getExpiration() - System.currentTimeMillis();
367            } else {
368                String timeToLive = headers.get("timeToLive");
369                if (timeToLive != null) {
370                    ttl = Integer.valueOf(timeToLive);
371                }
372            }
373            producer.setTimeToLive(ttl > 0 ? ttl : 0);
374            producer.send(msg);
375
376            return msg.getJMSMessageID();
377
378        } finally {
379            connection.close();
380        }
381
382    }
383
384    @Override
385    public int getMaxAuditDepth() {
386        return destination.getMaxAuditDepth();
387    }
388
389    @Override
390    public int getMaxProducersToAudit() {
391        return destination.getMaxProducersToAudit();
392    }
393
394    public boolean isEnableAudit() {
395        return destination.isEnableAudit();
396    }
397
398    public void setEnableAudit(boolean enableAudit) {
399        destination.setEnableAudit(enableAudit);
400    }
401
402    @Override
403    public void setMaxAuditDepth(int maxAuditDepth) {
404        destination.setMaxAuditDepth(maxAuditDepth);
405    }
406
407    @Override
408    public void setMaxProducersToAudit(int maxProducersToAudit) {
409        destination.setMaxProducersToAudit(maxProducersToAudit);
410    }
411
412    @Override
413    public float getMemoryUsagePortion() {
414        return destination.getMemoryUsage().getUsagePortion();
415    }
416
417    @Override
418    public long getProducerCount() {
419        return destination.getDestinationStatistics().getProducers().getCount();
420    }
421
422    @Override
423    public boolean isProducerFlowControl() {
424        return destination.isProducerFlowControl();
425    }
426
427    @Override
428    public void setMemoryUsagePortion(float value) {
429        destination.getMemoryUsage().setUsagePortion(value);
430    }
431
432    @Override
433    public void setProducerFlowControl(boolean producerFlowControl) {
434        destination.setProducerFlowControl(producerFlowControl);
435    }
436
437    @Override
438    public boolean isAlwaysRetroactive() {
439        return destination.isAlwaysRetroactive();
440    }
441
442    @Override
443    public void setAlwaysRetroactive(boolean alwaysRetroactive) {
444        destination.setAlwaysRetroactive(alwaysRetroactive);
445    }
446
447    /**
448     * Set's the interval at which warnings about producers being blocked by
449     * resource usage will be triggered. Values of 0 or less will disable
450     * warnings
451     *
452     * @param blockedProducerWarningInterval the interval at which warning about
453     *            blocked producers will be triggered.
454     */
455    @Override
456    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
457        destination.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
458    }
459
460    /**
461     *
462     * @return the interval at which warning about blocked producers will be
463     *         triggered.
464     */
465    @Override
466    public long getBlockedProducerWarningInterval() {
467        return destination.getBlockedProducerWarningInterval();
468    }
469
470    @Override
471    public int getMaxPageSize() {
472        return destination.getMaxPageSize();
473    }
474
475    @Override
476    public void setMaxPageSize(int pageSize) {
477        destination.setMaxPageSize(pageSize);
478    }
479
480    @Override
481    public boolean isUseCache() {
482        return destination.isUseCache();
483    }
484
485    @Override
486    public void setUseCache(boolean value) {
487        destination.setUseCache(value);
488    }
489
490    @Override
491    public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException {
492        List<Subscription> subscriptions = destination.getConsumers();
493        ObjectName[] answer = new ObjectName[subscriptions.size()];
494        ObjectName brokerObjectName = broker.getBrokerService().getBrokerObjectName();
495        int index = 0;
496        for (Subscription subscription : subscriptions) {
497            String connectionClientId = subscription.getContext().getClientId();
498            answer[index++] = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, connectionClientId, subscription.getConsumerInfo());
499        }
500        return answer;
501    }
502
503    @Override
504    public ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException {
505        ObjectName result = null;
506        SlowConsumerStrategy strategy = destination.getSlowConsumerStrategy();
507        if (strategy != null && strategy instanceof AbortSlowConsumerStrategy) {
508            result = broker.registerSlowConsumerStrategy((AbortSlowConsumerStrategy)strategy);
509        }
510        return result;
511    }
512
513    @Override
514    public String getOptions() {
515        Map<String, String> options = destination.getActiveMQDestination().getOptions();
516        String optionsString = "";
517        try {
518            if (options != null) {
519                optionsString = URISupport.createQueryString(options);
520            }
521        } catch (URISyntaxException ignored) {}
522        return optionsString;
523    }
524
525    @Override
526    public boolean isDLQ() {
527        return destination.getActiveMQDestination().isDLQ();
528    }
529
530    @Override
531    public void setDLQ(boolean val) {
532         destination.getActiveMQDestination().setDLQ(val);
533    }
534
535    @Override
536    public long getBlockedSends() {
537        return destination.getDestinationStatistics().getBlockedSends().getCount();
538    }
539
540    @Override
541    public double getAverageBlockedTime() {
542        return destination.getDestinationStatistics().getBlockedTime().getAverageTime();
543    }
544
545    @Override
546    public long getTotalBlockedTime() {
547        return destination.getDestinationStatistics().getBlockedTime().getTotalTime();
548    }
549
550}