001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.util;
018
019import java.util.Set;
020
021import javax.annotation.PostConstruct;
022
023import org.apache.activemq.broker.BrokerPluginSupport;
024import org.apache.activemq.broker.Connection;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.ConsumerBrokerExchange;
027import org.apache.activemq.broker.ProducerBrokerExchange;
028import org.apache.activemq.broker.region.Destination;
029import org.apache.activemq.broker.region.MessageReference;
030import org.apache.activemq.broker.region.Subscription;
031import org.apache.activemq.command.*;
032import org.apache.activemq.usage.Usage;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * A simple Broker intercepter which allows you to enable/disable logging.
038 *
039 * @org.apache.xbean.XBean
040 */
041public class LoggingBrokerPlugin extends BrokerPluginSupport {
042
043    private static final Logger LOG = LoggerFactory.getLogger(LoggingBrokerPlugin.class);
044
045    private boolean logAll = false;
046    private boolean logConnectionEvents = true;
047    private boolean logSessionEvents = true;
048    private boolean logTransactionEvents = false;
049    private boolean logConsumerEvents = false;
050    private boolean logProducerEvents = false;
051    private boolean logInternalEvents = false;
052    private boolean perDestinationLogger = false;
053
054    /**
055     * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
056     *
057     * delegates to afterPropertiesSet, done to prevent backwards incompatible signature change
058     */
059    @PostConstruct
060    private void postConstruct() {
061        try {
062            afterPropertiesSet();
063        } catch (Exception ex) {
064            throw new RuntimeException(ex);
065        }
066    }
067
068    /**
069     * @throws Exception
070     * @org.apache.xbean.InitMethod
071     */
072    public void afterPropertiesSet() throws Exception {
073        LOG.info("Created LoggingBrokerPlugin: {}", this.toString());
074    }
075
076    public boolean isLogAll() {
077        return logAll;
078    }
079
080    /**
081     * Logger all Events that go through the Plugin
082     */
083    public void setLogAll(boolean logAll) {
084        this.logAll = logAll;
085    }
086
087
088    public boolean isLogConnectionEvents() {
089        return logConnectionEvents;
090    }
091
092    /**
093     * Logger Events that are related to connections
094     */
095    public void setLogConnectionEvents(boolean logConnectionEvents) {
096        this.logConnectionEvents = logConnectionEvents;
097    }
098
099    public boolean isLogSessionEvents() {
100        return logSessionEvents;
101    }
102
103    /**
104     * Logger Events that are related to sessions
105     */
106    public void setLogSessionEvents(boolean logSessionEvents) {
107        this.logSessionEvents = logSessionEvents;
108    }
109
110    public boolean isLogTransactionEvents() {
111        return logTransactionEvents;
112    }
113
114    /**
115     * Logger Events that are related to transaction processing
116     */
117    public void setLogTransactionEvents(boolean logTransactionEvents) {
118        this.logTransactionEvents = logTransactionEvents;
119    }
120
121    public boolean isLogConsumerEvents() {
122        return logConsumerEvents;
123    }
124
125    /**
126     * Logger Events that are related to Consumers
127     */
128    public void setLogConsumerEvents(boolean logConsumerEvents) {
129        this.logConsumerEvents = logConsumerEvents;
130    }
131
132    public boolean isLogProducerEvents() {
133        return logProducerEvents;
134    }
135
136    /**
137     * Logger Events that are related to Producers
138     */
139    public void setLogProducerEvents(boolean logProducerEvents) {
140        this.logProducerEvents = logProducerEvents;
141    }
142
143    public boolean isLogInternalEvents() {
144        return logInternalEvents;
145    }
146
147    /**
148     * Logger Events that are normally internal to the broker
149     */
150    public void setLogInternalEvents(boolean logInternalEvents) {
151        this.logInternalEvents = logInternalEvents;
152    }
153
154    @Override
155    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
156        if (isLogAll() || isLogConsumerEvents()) {
157            LOG.info("Acknowledging message for client ID: {}{}", consumerExchange.getConnectionContext().getClientId(), (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : ""));
158            if (ack.getMessageCount() > 1) {
159                LOG.trace("Message count: {}, First Message Id: {}, Last Message Id: {}", new Object[]{ ack.getMessageCount(), ack.getFirstMessageId(), ack.getLastMessageId() });
160            }
161        }
162        super.acknowledge(consumerExchange, ack);
163    }
164
165    @Override
166    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
167        if (isLogAll() || isLogConsumerEvents()) {
168            LOG.info("Message Pull from: {} on {}", context.getClientId(), pull.getDestination().getPhysicalName());
169        }
170        return super.messagePull(context, pull);
171    }
172
173    @Override
174    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
175        if (isLogAll() || isLogConnectionEvents()) {
176            LOG.info("Adding Connection: {}", info);
177        }
178        super.addConnection(context, info);
179    }
180
181    @Override
182    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
183        if (isLogAll() || isLogConsumerEvents()) {
184            LOG.info("Adding Consumer: {}", info);
185        }
186        return super.addConsumer(context, info);
187    }
188
189    @Override
190    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
191        if (isLogAll() || isLogProducerEvents()) {
192            LOG.info("Adding Producer: {}", info);
193        }
194        super.addProducer(context, info);
195    }
196
197    @Override
198    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
199        if (isLogAll() || isLogTransactionEvents()) {
200            LOG.info("Committing transaction: {}", xid.getTransactionKey());
201        }
202        super.commitTransaction(context, xid, onePhase);
203    }
204
205    @Override
206    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
207        if (isLogAll() || isLogConsumerEvents()) {
208            LOG.info("Removing subscription: {}", info);
209        }
210        super.removeSubscription(context, info);
211    }
212
213    @Override
214    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
215
216        TransactionId[] result = super.getPreparedTransactions(context);
217        if ((isLogAll() || isLogTransactionEvents()) && result != null) {
218            StringBuffer tids = new StringBuffer();
219            for (TransactionId tid : result) {
220                if (tids.length() > 0) {
221                    tids.append(", ");
222                }
223                tids.append(tid.getTransactionKey());
224            }
225            LOG.info("Prepared transactions: {}", tids);
226        }
227        return result;
228    }
229
230    @Override
231    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
232        if (isLogAll() || isLogTransactionEvents()) {
233            LOG.info("Preparing transaction: {}", xid.getTransactionKey());
234        }
235        return super.prepareTransaction(context, xid);
236    }
237
238    @Override
239    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
240        if (isLogAll() || isLogConnectionEvents()) {
241            LOG.info("Removing Connection: {}", info);
242        }
243        super.removeConnection(context, info, error);
244    }
245
246    @Override
247    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
248        if (isLogAll() || isLogConsumerEvents()) {
249            LOG.info("Removing Consumer: {}", info);
250        }
251        super.removeConsumer(context, info);
252    }
253
254    @Override
255    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
256        if (isLogAll() || isLogProducerEvents()) {
257            LOG.info("Removing Producer: {}", info);
258        }
259        super.removeProducer(context, info);
260    }
261
262    @Override
263    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
264        if (isLogAll() || isLogTransactionEvents()) {
265            LOG.info("Rolling back Transaction: {}", xid.getTransactionKey());
266        }
267        super.rollbackTransaction(context, xid);
268    }
269
270    @Override
271    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
272        if (isLogAll() || isLogProducerEvents()) {
273            logSend(messageSend.copy());
274        }
275        super.send(producerExchange, messageSend);
276    }
277
278    private void logSend(Message copy) {
279        copy.getSize();
280        Logger perDestinationsLogger = LOG;
281        if (isPerDestinationLogger()) {
282            ActiveMQDestination destination = copy.getDestination();
283            perDestinationsLogger = LoggerFactory.getLogger(LOG.getName() +
284                    "." + destination.getDestinationTypeAsString() + "." + destination.getPhysicalName());
285        }
286        perDestinationsLogger.info("Sending message: {}", copy);
287    }
288
289    @Override
290    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
291        if (isLogAll() || isLogTransactionEvents()) {
292            LOG.info("Beginning transaction: {}", xid.getTransactionKey());
293        }
294        super.beginTransaction(context, xid);
295    }
296
297    @Override
298    public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
299        if (isLogAll() || isLogTransactionEvents()) {
300            LOG.info("Forgetting transaction: {}", transactionId.getTransactionKey());
301        }
302        super.forgetTransaction(context, transactionId);
303    }
304
305    @Override
306    public Connection[] getClients() throws Exception {
307        Connection[] result = super.getClients();
308
309        if (isLogAll() || isLogInternalEvents()) {
310            if (result == null) {
311                LOG.info("Get Clients returned empty list.");
312            } else {
313                StringBuffer cids = new StringBuffer();
314                for (Connection c : result) {
315                    cids.append(cids.length() > 0 ? ", " : "");
316                    cids.append(c.getConnectionId());
317                }
318                LOG.info("Connected clients: {}", cids);
319            }
320        }
321        return super.getClients();
322    }
323
324    @Override
325    public org.apache.activemq.broker.region.Destination addDestination(ConnectionContext context,
326            ActiveMQDestination destination, boolean create) throws Exception {
327        if (isLogAll() || isLogInternalEvents()) {
328            LOG.info("Adding destination: {}:{}", destination.getDestinationTypeAsString(), destination.getPhysicalName());
329        }
330        return super.addDestination(context, destination, create);
331    }
332
333    @Override
334    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
335            throws Exception {
336        if (isLogAll() || isLogInternalEvents()) {
337            LOG.info("Removing destination: {}:{}", destination.getDestinationTypeAsString(), destination.getPhysicalName());
338        }
339        super.removeDestination(context, destination, timeout);
340    }
341
342    @Override
343    public ActiveMQDestination[] getDestinations() throws Exception {
344        ActiveMQDestination[] result = super.getDestinations();
345        if (isLogAll() || isLogInternalEvents()) {
346            if (result == null) {
347                LOG.info("Get Destinations returned empty list.");
348            } else {
349                StringBuffer destinations = new StringBuffer();
350                for (ActiveMQDestination dest : result) {
351                    destinations.append(destinations.length() > 0 ? ", " : "");
352                    destinations.append(dest.getPhysicalName());
353                }
354                LOG.info("Get Destinations: {}", destinations);
355            }
356        }
357        return result;
358    }
359
360    @Override
361    public void start() throws Exception {
362        if (isLogAll() || isLogInternalEvents()) {
363            LOG.info("Starting {}", getBrokerName());
364        }
365        super.start();
366    }
367
368    @Override
369    public void stop() throws Exception {
370        if (isLogAll() || isLogInternalEvents()) {
371            LOG.info("Stopping {}", getBrokerName());
372        }
373        super.stop();
374    }
375
376    @Override
377    public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
378        if (isLogAll() || isLogSessionEvents()) {
379            LOG.info("Adding Session: {}", info);
380        }
381        super.addSession(context, info);
382    }
383
384    @Override
385    public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
386        if (isLogAll() || isLogSessionEvents()) {
387            LOG.info("Removing Session: {}", info);
388        }
389        super.removeSession(context, info);
390    }
391
392    @Override
393    public void addBroker(Connection connection, BrokerInfo info) {
394        if (isLogAll() || isLogInternalEvents()) {
395            LOG.info("Adding Broker {}", info.getBrokerName());
396        }
397        super.addBroker(connection, info);
398    }
399
400    @Override
401    public void removeBroker(Connection connection, BrokerInfo info) {
402        if (isLogAll() || isLogInternalEvents()) {
403            LOG.info("Removing Broker {}", info.getBrokerName());
404        }
405        super.removeBroker(connection, info);
406    }
407
408    @Override
409    public BrokerInfo[] getPeerBrokerInfos() {
410        BrokerInfo[] result = super.getPeerBrokerInfos();
411        if (isLogAll() || isLogInternalEvents()) {
412            if (result == null) {
413                LOG.info("Get Peer Broker Infos returned empty list.");
414            } else {
415                StringBuffer peers = new StringBuffer();
416                for (BrokerInfo bi : result) {
417                    peers.append(peers.length() > 0 ? ", " : "");
418                    peers.append(bi.getBrokerName());
419                }
420                LOG.info("Get Peer Broker Infos: {}", peers);
421            }
422        }
423        return result;
424    }
425
426    @Override
427    public void preProcessDispatch(MessageDispatch messageDispatch) {
428        if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
429            LOG.info("preProcessDispatch: {}", messageDispatch);
430        }
431        super.preProcessDispatch(messageDispatch);
432    }
433
434    @Override
435    public void postProcessDispatch(MessageDispatch messageDispatch) {
436        if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
437            LOG.info("postProcessDispatch: {}", messageDispatch);
438        }
439        super.postProcessDispatch(messageDispatch);
440    }
441
442    @Override
443    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
444        if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
445            LOG.info("ProcessDispatchNotification: {}", messageDispatchNotification);
446        }
447        super.processDispatchNotification(messageDispatchNotification);
448    }
449
450    @Override
451    public Set<ActiveMQDestination> getDurableDestinations() {
452        Set<ActiveMQDestination> result = super.getDurableDestinations();
453        if (isLogAll() || isLogInternalEvents()) {
454            if (result == null) {
455                LOG.info("Get Durable Destinations returned empty list.");
456            } else {
457                StringBuffer destinations = new StringBuffer();
458                for (ActiveMQDestination dest : result) {
459                    destinations.append(destinations.length() > 0 ? ", " : "");
460                    destinations.append(dest.getPhysicalName());
461                }
462                LOG.info("Get Durable Destinations: {}", destinations);
463            }
464        }
465        return result;
466    }
467
468    @Override
469    public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
470        if (isLogAll() || isLogInternalEvents()) {
471            LOG.info("Adding destination info: {}", info);
472        }
473        super.addDestinationInfo(context, info);
474    }
475
476    @Override
477    public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
478        if (isLogAll() || isLogInternalEvents()) {
479            LOG.info("Removing destination info: {}", info);
480        }
481        super.removeDestinationInfo(context, info);
482    }
483
484    @Override
485    public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
486        if (isLogAll() || isLogInternalEvents()) {
487            String msg = "Unable to display message.";
488
489            msg = message.getMessage().toString();
490
491            LOG.info("Message has expired: {}", msg);
492        }
493        super.messageExpired(context, message, subscription);
494    }
495
496    @Override
497    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
498                                         Subscription subscription, Throwable poisonCause) {
499        if (isLogAll() || isLogInternalEvents()) {
500            String msg = "Unable to display message.";
501
502            msg = messageReference.getMessage().toString();
503
504            LOG.info("Sending to DLQ: {}", msg);
505        }
506        return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
507    }
508
509    @Override
510    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo,ActiveMQDestination destination) {
511        if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
512            LOG.info("Fast Producer: {}", producerInfo);
513        }
514        super.fastProducer(context, producerInfo, destination);
515    }
516
517    @Override
518    public void isFull(ConnectionContext context, Destination destination, Usage usage) {
519        if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
520            LOG.info("Destination is full: {}", destination.getName());
521        }
522        super.isFull(context, destination, usage);
523    }
524
525    @Override
526    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
527        if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
528            String msg = "Unable to display message.";
529
530            msg = messageReference.getMessage().toString();
531
532            LOG.info("Message consumed: {}", msg);
533        }
534        super.messageConsumed(context, messageReference);
535    }
536
537    @Override
538    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
539        if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
540            String msg = "Unable to display message.";
541
542            msg = messageReference.getMessage().toString();
543
544            LOG.info("Message delivered: {}", msg);
545        }
546        super.messageDelivered(context, messageReference);
547    }
548
549    @Override
550    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
551        if (isLogAll() || isLogInternalEvents()) {
552            String msg = "Unable to display message.";
553
554            msg = messageReference.getMessage().toString();
555
556            LOG.info("Message discarded: {}", msg);
557        }
558        super.messageDiscarded(context, sub, messageReference);
559    }
560
561    @Override
562    public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) {
563        if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
564            LOG.info("Detected slow consumer on {}", destination.getName());
565            StringBuffer buf = new StringBuffer("Connection(");
566            buf.append(subs.getConsumerInfo().getConsumerId().getConnectionId());
567            buf.append(") Session(");
568            buf.append(subs.getConsumerInfo().getConsumerId().getSessionId());
569            buf.append(")");
570            LOG.info(buf.toString());
571        }
572        super.slowConsumer(context, destination, subs);
573    }
574
575    @Override
576    public void nowMasterBroker() {
577        if (isLogAll() || isLogInternalEvents()) {
578            LOG.info("Is now the master broker: {}", getBrokerName());
579        }
580        super.nowMasterBroker();
581    }
582
583    @Override
584    public String toString() {
585        StringBuffer buf = new StringBuffer();
586        buf.append("LoggingBrokerPlugin(");
587        buf.append("logAll=");
588        buf.append(isLogAll());
589        buf.append(", logConnectionEvents=");
590        buf.append(isLogConnectionEvents());
591        buf.append(", logSessionEvents=");
592        buf.append(isLogSessionEvents());
593        buf.append(", logConsumerEvents=");
594        buf.append(isLogConsumerEvents());
595        buf.append(", logProducerEvents=");
596        buf.append(isLogProducerEvents());
597        buf.append(", logTransactionEvents=");
598        buf.append(isLogTransactionEvents());
599        buf.append(", logInternalEvents=");
600        buf.append(isLogInternalEvents());
601        buf.append(")");
602        return buf.toString();
603    }
604
605    public void setPerDestinationLogger(boolean perDestinationLogger) {
606        this.perDestinationLogger = perDestinationLogger;
607    }
608
609    public boolean isPerDestinationLogger() {
610        return perDestinationLogger;
611    }
612}