001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import java.io.IOException;
020import java.net.URISyntaxException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027
028import javax.jms.Connection;
029import javax.jms.InvalidSelectorException;
030import javax.jms.MessageProducer;
031import javax.jms.Session;
032import javax.management.MalformedObjectNameException;
033import javax.management.ObjectName;
034import javax.management.openmbean.CompositeData;
035import javax.management.openmbean.CompositeDataSupport;
036import javax.management.openmbean.CompositeType;
037import javax.management.openmbean.OpenDataException;
038import javax.management.openmbean.TabularData;
039import javax.management.openmbean.TabularDataSupport;
040import javax.management.openmbean.TabularType;
041import org.apache.activemq.ActiveMQConnectionFactory;
042import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
043import org.apache.activemq.broker.region.Destination;
044import org.apache.activemq.broker.region.Subscription;
045import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
046import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
047import org.apache.activemq.command.ActiveMQDestination;
048import org.apache.activemq.command.ActiveMQMessage;
049import org.apache.activemq.command.ActiveMQTextMessage;
050import org.apache.activemq.command.Message;
051import org.apache.activemq.filter.BooleanExpression;
052import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
053import org.apache.activemq.selector.SelectorParser;
054import org.apache.activemq.util.URISupport;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058public class DestinationView implements DestinationViewMBean {
059    private static final Logger LOG = LoggerFactory.getLogger(DestinationViewMBean.class);
060    protected final Destination destination;
061    protected final ManagedRegionBroker broker;
062
063    public DestinationView(ManagedRegionBroker broker, Destination destination) {
064        this.broker = broker;
065        this.destination = destination;
066    }
067
068    public void gc() {
069        destination.gc();
070    }
071
072    @Override
073    public String getName() {
074        return destination.getName();
075    }
076
077    @Override
078    public void resetStatistics() {
079        destination.getDestinationStatistics().reset();
080    }
081
082    @Override
083    public long getEnqueueCount() {
084        return destination.getDestinationStatistics().getEnqueues().getCount();
085    }
086
087    @Override
088    public long getDequeueCount() {
089        return destination.getDestinationStatistics().getDequeues().getCount();
090    }
091
092    @Override
093    public long getForwardCount() {
094        return destination.getDestinationStatistics().getForwards().getCount();
095    }
096
097    @Override
098    public long getDispatchCount() {
099        return destination.getDestinationStatistics().getDispatched().getCount();
100    }
101
102    @Override
103    public long getInFlightCount() {
104        return destination.getDestinationStatistics().getInflight().getCount();
105    }
106
107    @Override
108    public long getExpiredCount() {
109        return destination.getDestinationStatistics().getExpired().getCount();
110    }
111
112    @Override
113    public long getConsumerCount() {
114        return destination.getDestinationStatistics().getConsumers().getCount();
115    }
116
117    @Override
118    public long getQueueSize() {
119        return destination.getDestinationStatistics().getMessages().getCount();
120    }
121
122    public long getMessagesCached() {
123        return destination.getDestinationStatistics().getMessagesCached().getCount();
124    }
125
126    @Override
127    public int getMemoryPercentUsage() {
128        return destination.getMemoryUsage().getPercentUsage();
129    }
130
131    @Override
132    public long getMemoryUsageByteCount() {
133        return destination.getMemoryUsage().getUsage();
134    }
135
136    @Override
137    public long getMemoryLimit() {
138        return destination.getMemoryUsage().getLimit();
139    }
140
141    @Override
142    public void setMemoryLimit(long limit) {
143        destination.getMemoryUsage().setLimit(limit);
144    }
145
146    @Override
147    public double getAverageEnqueueTime() {
148        return destination.getDestinationStatistics().getProcessTime().getAverageTime();
149    }
150
151    @Override
152    public long getMaxEnqueueTime() {
153        return destination.getDestinationStatistics().getProcessTime().getMaxTime();
154    }
155
156    @Override
157    public long getMinEnqueueTime() {
158        return destination.getDestinationStatistics().getProcessTime().getMinTime();
159    }
160
161    /**
162     * @return the average size of a message (bytes)
163     */
164    @Override
165    public long getAverageMessageSize() {
166        // we are okay with the size without decimals so cast to long
167        return (long) destination.getDestinationStatistics().getMessageSize().getAverageSize();
168    }
169
170    /**
171     * @return the max size of a message (bytes)
172     */
173    @Override
174    public long getMaxMessageSize() {
175        return destination.getDestinationStatistics().getMessageSize().getMaxSize();
176    }
177
178    /**
179     * @return the min size of a message (bytes)
180     */
181    @Override
182    public long getMinMessageSize() {
183        return destination.getDestinationStatistics().getMessageSize().getMinSize();
184    }
185
186
187    @Override
188    public boolean isPrioritizedMessages() {
189        return destination.isPrioritizedMessages();
190    }
191
192    @Override
193    public CompositeData[] browse() throws OpenDataException {
194        try {
195            return browse(null);
196        } catch (InvalidSelectorException e) {
197            // should not happen.
198            throw new RuntimeException(e);
199        }
200    }
201
202    @Override
203    public CompositeData[] browse(String selector) throws OpenDataException, InvalidSelectorException {
204        Message[] messages = destination.browse();
205        ArrayList<CompositeData> c = new ArrayList<CompositeData>();
206
207        NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext();
208        ctx.setDestination(destination.getActiveMQDestination());
209        BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
210
211        for (int i = 0; i < messages.length; i++) {
212            try {
213
214                if (selectorExpression == null) {
215                    c.add(OpenTypeSupport.convert(messages[i]));
216                } else {
217                    ctx.setMessageReference(messages[i]);
218                    if (selectorExpression.matches(ctx)) {
219                        c.add(OpenTypeSupport.convert(messages[i]));
220                    }
221                }
222
223            } catch (Throwable e) {
224                LOG.warn("exception browsing destination", e);
225            }
226        }
227
228        CompositeData rc[] = new CompositeData[c.size()];
229        c.toArray(rc);
230        return rc;
231    }
232
233    /**
234     * Browses the current destination returning a list of messages
235     */
236    @Override
237    public List<Object> browseMessages() throws InvalidSelectorException {
238        return browseMessages(null);
239    }
240
241    /**
242     * Browses the current destination with the given selector returning a list
243     * of messages
244     */
245    @Override
246    public List<Object> browseMessages(String selector) throws InvalidSelectorException {
247        Message[] messages = destination.browse();
248        ArrayList<Object> answer = new ArrayList<Object>();
249
250        NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext();
251        ctx.setDestination(destination.getActiveMQDestination());
252        BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
253
254        for (int i = 0; i < messages.length; i++) {
255            try {
256                Message message = messages[i];
257                message.setReadOnlyBody(true);
258                if (selectorExpression == null) {
259                    answer.add(message);
260                } else {
261                    ctx.setMessageReference(message);
262                    if (selectorExpression.matches(ctx)) {
263                        answer.add(message);
264                    }
265                }
266
267            } catch (Throwable e) {
268                LOG.warn("exception browsing destination", e);
269            }
270        }
271        return answer;
272    }
273
274    @Override
275    public TabularData browseAsTable() throws OpenDataException {
276        try {
277            return browseAsTable(null);
278        } catch (InvalidSelectorException e) {
279            throw new RuntimeException(e);
280        }
281    }
282
283    @Override
284    public TabularData browseAsTable(String selector) throws OpenDataException, InvalidSelectorException {
285        OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
286        Message[] messages = destination.browse();
287        CompositeType ct = factory.getCompositeType();
288        TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" });
289        TabularDataSupport rc = new TabularDataSupport(tt);
290
291        NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext();
292        ctx.setDestination(destination.getActiveMQDestination());
293        BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
294
295        for (int i = 0; i < messages.length; i++) {
296            try {
297                if (selectorExpression == null) {
298                    rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
299                } else {
300                    ctx.setMessageReference(messages[i]);
301                    if (selectorExpression.matches(ctx)) {
302                        rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
303                    }
304                }
305            } catch (Throwable e) {
306                LOG.warn("exception browsing destination", e);
307            }
308        }
309
310        return rc;
311    }
312
313    @Override
314    public String sendTextMessageWithProperties(String properties) throws Exception {
315        String[] kvs = properties.split(",");
316        Map<String, String> props = new HashMap<String, String>();
317        for (String kv : kvs) {
318            String[] it = kv.split("=");
319            if (it.length == 2) {
320                props.put(it[0],it[1]);
321            }
322        }
323        return sendTextMessage(props, props.remove("body"), props.remove("username"), props.remove("password"));
324    }
325
326    @Override
327    public String sendTextMessage(String body) throws Exception {
328        return sendTextMessage(Collections.EMPTY_MAP, body);
329    }
330
331    @Override
332    public String sendTextMessage(Map headers, String body) throws Exception {
333        return sendTextMessage(headers, body, null, null);
334    }
335
336    @Override
337    public String sendTextMessage(String body, String user, @Sensitive String password) throws Exception {
338        return sendTextMessage(Collections.EMPTY_MAP, body, user, password);
339    }
340
341    @Override
342    public String sendTextMessage(Map<String, String> headers, String body, String userName, @Sensitive String password) throws Exception {
343
344        String brokerUrl = "vm://" + broker.getBrokerName();
345        ActiveMQDestination dest = destination.getActiveMQDestination();
346
347        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);
348        Connection connection = null;
349        try {
350
351            connection = cf.createConnection(userName, password);
352            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
353            MessageProducer producer = session.createProducer(dest);
354            ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body);
355
356            for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) {
357                Map.Entry entry = (Map.Entry) iter.next();
358                msg.setObjectProperty((String) entry.getKey(), entry.getValue());
359            }
360
361            producer.setDeliveryMode(msg.getJMSDeliveryMode());
362            producer.setPriority(msg.getPriority());
363            long ttl = 0;
364            if (msg.getExpiration() != 0) {
365                ttl = msg.getExpiration() - System.currentTimeMillis();
366            } else {
367                String timeToLive = headers.get("timeToLive");
368                if (timeToLive != null) {
369                    ttl = Integer.valueOf(timeToLive);
370                }
371            }
372            producer.setTimeToLive(ttl > 0 ? ttl : 0);
373            producer.send(msg);
374
375            return msg.getJMSMessageID();
376
377        } finally {
378            connection.close();
379        }
380
381    }
382
383    @Override
384    public int getMaxAuditDepth() {
385        return destination.getMaxAuditDepth();
386    }
387
388    @Override
389    public int getMaxProducersToAudit() {
390        return destination.getMaxProducersToAudit();
391    }
392
393    public boolean isEnableAudit() {
394        return destination.isEnableAudit();
395    }
396
397    public void setEnableAudit(boolean enableAudit) {
398        destination.setEnableAudit(enableAudit);
399    }
400
401    @Override
402    public void setMaxAuditDepth(int maxAuditDepth) {
403        destination.setMaxAuditDepth(maxAuditDepth);
404    }
405
406    @Override
407    public void setMaxProducersToAudit(int maxProducersToAudit) {
408        destination.setMaxProducersToAudit(maxProducersToAudit);
409    }
410
411    @Override
412    public float getMemoryUsagePortion() {
413        return destination.getMemoryUsage().getUsagePortion();
414    }
415
416    @Override
417    public long getProducerCount() {
418        return destination.getDestinationStatistics().getProducers().getCount();
419    }
420
421    @Override
422    public boolean isProducerFlowControl() {
423        return destination.isProducerFlowControl();
424    }
425
426    @Override
427    public void setMemoryUsagePortion(float value) {
428        destination.getMemoryUsage().setUsagePortion(value);
429    }
430
431    @Override
432    public void setProducerFlowControl(boolean producerFlowControl) {
433        destination.setProducerFlowControl(producerFlowControl);
434    }
435
436    @Override
437    public boolean isAlwaysRetroactive() {
438        return destination.isAlwaysRetroactive();
439    }
440
441    @Override
442    public void setAlwaysRetroactive(boolean alwaysRetroactive) {
443        destination.setAlwaysRetroactive(alwaysRetroactive);
444    }
445
446    /**
447     * Set's the interval at which warnings about producers being blocked by
448     * resource usage will be triggered. Values of 0 or less will disable
449     * warnings
450     *
451     * @param blockedProducerWarningInterval the interval at which warning about
452     *            blocked producers will be triggered.
453     */
454    @Override
455    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
456        destination.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
457    }
458
459    /**
460     *
461     * @return the interval at which warning about blocked producers will be
462     *         triggered.
463     */
464    @Override
465    public long getBlockedProducerWarningInterval() {
466        return destination.getBlockedProducerWarningInterval();
467    }
468
469    @Override
470    public int getMaxPageSize() {
471        return destination.getMaxPageSize();
472    }
473
474    @Override
475    public void setMaxPageSize(int pageSize) {
476        destination.setMaxPageSize(pageSize);
477    }
478
479    @Override
480    public boolean isUseCache() {
481        return destination.isUseCache();
482    }
483
484    @Override
485    public void setUseCache(boolean value) {
486        destination.setUseCache(value);
487    }
488
489    @Override
490    public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException {
491        List<Subscription> subscriptions = destination.getConsumers();
492        ObjectName[] answer = new ObjectName[subscriptions.size()];
493        ObjectName brokerObjectName = broker.getBrokerService().getBrokerObjectName();
494        int index = 0;
495        for (Subscription subscription : subscriptions) {
496            String connectionClientId = subscription.getContext().getClientId();
497            answer[index++] = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, connectionClientId, subscription.getConsumerInfo());
498        }
499        return answer;
500    }
501
502    @Override
503    public ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException {
504        ObjectName result = null;
505        SlowConsumerStrategy strategy = destination.getSlowConsumerStrategy();
506        if (strategy != null && strategy instanceof AbortSlowConsumerStrategy) {
507            result = broker.registerSlowConsumerStrategy((AbortSlowConsumerStrategy)strategy);
508        }
509        return result;
510    }
511
512    @Override
513    public String getOptions() {
514        Map<String, String> options = destination.getActiveMQDestination().getOptions();
515        String optionsString = "";
516        try {
517            if (options != null) {
518                optionsString = URISupport.createQueryString(options);
519            }
520        } catch (URISyntaxException ignored) {}
521        return optionsString;
522    }
523
524    @Override
525    public boolean isDLQ() {
526        return destination.getActiveMQDestination().isDLQ();
527    }
528
529    @Override
530    public void setDLQ(boolean val) {
531         destination.getActiveMQDestination().setDLQ(val);
532    }
533
534    @Override
535    public long getBlockedSends() {
536        return destination.getDestinationStatistics().getBlockedSends().getCount();
537    }
538
539    @Override
540    public double getAverageBlockedTime() {
541        return destination.getDestinationStatistics().getBlockedTime().getAverageTime();
542    }
543
544    @Override
545    public long getTotalBlockedTime() {
546        return destination.getDestinationStatistics().getBlockedTime().getTotalTime();
547    }
548
549}