001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.util; 018 019import java.util.Set; 020 021import javax.annotation.PostConstruct; 022 023import org.apache.activemq.broker.BrokerPluginSupport; 024import org.apache.activemq.broker.Connection; 025import org.apache.activemq.broker.ConnectionContext; 026import org.apache.activemq.broker.ConsumerBrokerExchange; 027import org.apache.activemq.broker.ProducerBrokerExchange; 028import org.apache.activemq.broker.region.Destination; 029import org.apache.activemq.broker.region.MessageReference; 030import org.apache.activemq.broker.region.Subscription; 031import org.apache.activemq.command.*; 032import org.apache.activemq.usage.Usage; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * A simple Broker intercepter which allows you to enable/disable logging. 038 * 039 * @org.apache.xbean.XBean 040 */ 041public class LoggingBrokerPlugin extends BrokerPluginSupport { 042 043 private static final Logger LOG = LoggerFactory.getLogger(LoggingBrokerPlugin.class); 044 045 private boolean logAll = false; 046 private boolean logConnectionEvents = true; 047 private boolean logSessionEvents = true; 048 private boolean logTransactionEvents = false; 049 private boolean logConsumerEvents = false; 050 private boolean logProducerEvents = false; 051 private boolean logInternalEvents = false; 052 private boolean perDestinationLogger = false; 053 054 /** 055 * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions 056 * 057 * delegates to afterPropertiesSet, done to prevent backwards incompatible signature change 058 */ 059 @PostConstruct 060 private void postConstruct() { 061 try { 062 afterPropertiesSet(); 063 } catch (Exception ex) { 064 throw new RuntimeException(ex); 065 } 066 } 067 068 /** 069 * @throws Exception 070 * @org.apache.xbean.InitMethod 071 */ 072 public void afterPropertiesSet() throws Exception { 073 LOG.info("Created LoggingBrokerPlugin: {}", this.toString()); 074 } 075 076 public boolean isLogAll() { 077 return logAll; 078 } 079 080 /** 081 * Logger all Events that go through the Plugin 082 */ 083 public void setLogAll(boolean logAll) { 084 this.logAll = logAll; 085 } 086 087 088 public boolean isLogConnectionEvents() { 089 return logConnectionEvents; 090 } 091 092 /** 093 * Logger Events that are related to connections 094 */ 095 public void setLogConnectionEvents(boolean logConnectionEvents) { 096 this.logConnectionEvents = logConnectionEvents; 097 } 098 099 public boolean isLogSessionEvents() { 100 return logSessionEvents; 101 } 102 103 /** 104 * Logger Events that are related to sessions 105 */ 106 public void setLogSessionEvents(boolean logSessionEvents) { 107 this.logSessionEvents = logSessionEvents; 108 } 109 110 public boolean isLogTransactionEvents() { 111 return logTransactionEvents; 112 } 113 114 /** 115 * Logger Events that are related to transaction processing 116 */ 117 public void setLogTransactionEvents(boolean logTransactionEvents) { 118 this.logTransactionEvents = logTransactionEvents; 119 } 120 121 public boolean isLogConsumerEvents() { 122 return logConsumerEvents; 123 } 124 125 /** 126 * Logger Events that are related to Consumers 127 */ 128 public void setLogConsumerEvents(boolean logConsumerEvents) { 129 this.logConsumerEvents = logConsumerEvents; 130 } 131 132 public boolean isLogProducerEvents() { 133 return logProducerEvents; 134 } 135 136 /** 137 * Logger Events that are related to Producers 138 */ 139 public void setLogProducerEvents(boolean logProducerEvents) { 140 this.logProducerEvents = logProducerEvents; 141 } 142 143 public boolean isLogInternalEvents() { 144 return logInternalEvents; 145 } 146 147 /** 148 * Logger Events that are normally internal to the broker 149 */ 150 public void setLogInternalEvents(boolean logInternalEvents) { 151 this.logInternalEvents = logInternalEvents; 152 } 153 154 @Override 155 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 156 if (isLogAll() || isLogConsumerEvents()) { 157 LOG.info("Acknowledging message for client ID: {}{}", consumerExchange.getConnectionContext().getClientId(), (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : "")); 158 if (ack.getMessageCount() > 1) { 159 LOG.trace("Message count: {}, First Message Id: {}, Last Message Id: {}", new Object[]{ ack.getMessageCount(), ack.getFirstMessageId(), ack.getLastMessageId() }); 160 } 161 } 162 super.acknowledge(consumerExchange, ack); 163 } 164 165 @Override 166 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 167 if (isLogAll() || isLogConsumerEvents()) { 168 LOG.info("Message Pull from: {} on {}", context.getClientId(), pull.getDestination().getPhysicalName()); 169 } 170 return super.messagePull(context, pull); 171 } 172 173 @Override 174 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 175 if (isLogAll() || isLogConnectionEvents()) { 176 LOG.info("Adding Connection: {}", info); 177 } 178 super.addConnection(context, info); 179 } 180 181 @Override 182 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 183 if (isLogAll() || isLogConsumerEvents()) { 184 LOG.info("Adding Consumer: {}", info); 185 } 186 return super.addConsumer(context, info); 187 } 188 189 @Override 190 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 191 if (isLogAll() || isLogProducerEvents()) { 192 LOG.info("Adding Producer: {}", info); 193 } 194 super.addProducer(context, info); 195 } 196 197 @Override 198 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 199 if (isLogAll() || isLogTransactionEvents()) { 200 LOG.info("Committing transaction: {}", xid.getTransactionKey()); 201 } 202 super.commitTransaction(context, xid, onePhase); 203 } 204 205 @Override 206 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 207 if (isLogAll() || isLogConsumerEvents()) { 208 LOG.info("Removing subscription: {}", info); 209 } 210 super.removeSubscription(context, info); 211 } 212 213 @Override 214 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { 215 216 TransactionId[] result = super.getPreparedTransactions(context); 217 if ((isLogAll() || isLogTransactionEvents()) && result != null) { 218 StringBuffer tids = new StringBuffer(); 219 for (TransactionId tid : result) { 220 if (tids.length() > 0) { 221 tids.append(", "); 222 } 223 tids.append(tid.getTransactionKey()); 224 } 225 LOG.info("Prepared transactions: {}", tids); 226 } 227 return result; 228 } 229 230 @Override 231 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 232 if (isLogAll() || isLogTransactionEvents()) { 233 LOG.info("Preparing transaction: {}", xid.getTransactionKey()); 234 } 235 return super.prepareTransaction(context, xid); 236 } 237 238 @Override 239 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 240 if (isLogAll() || isLogConnectionEvents()) { 241 LOG.info("Removing Connection: {}", info); 242 } 243 super.removeConnection(context, info, error); 244 } 245 246 @Override 247 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 248 if (isLogAll() || isLogConsumerEvents()) { 249 LOG.info("Removing Consumer: {}", info); 250 } 251 super.removeConsumer(context, info); 252 } 253 254 @Override 255 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 256 if (isLogAll() || isLogProducerEvents()) { 257 LOG.info("Removing Producer: {}", info); 258 } 259 super.removeProducer(context, info); 260 } 261 262 @Override 263 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 264 if (isLogAll() || isLogTransactionEvents()) { 265 LOG.info("Rolling back Transaction: {}", xid.getTransactionKey()); 266 } 267 super.rollbackTransaction(context, xid); 268 } 269 270 @Override 271 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 272 if (isLogAll() || isLogProducerEvents()) { 273 logSend(messageSend.copy()); 274 } 275 super.send(producerExchange, messageSend); 276 } 277 278 private void logSend(Message copy) { 279 copy.getSize(); 280 Logger perDestinationsLogger = LOG; 281 if (isPerDestinationLogger()) { 282 ActiveMQDestination destination = copy.getDestination(); 283 perDestinationsLogger = LoggerFactory.getLogger(LOG.getName() + 284 "." + destination.getDestinationTypeAsString() + "." + destination.getPhysicalName()); 285 } 286 perDestinationsLogger.info("Sending message: {}", copy); 287 } 288 289 @Override 290 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 291 if (isLogAll() || isLogTransactionEvents()) { 292 LOG.info("Beginning transaction: {}", xid.getTransactionKey()); 293 } 294 super.beginTransaction(context, xid); 295 } 296 297 @Override 298 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { 299 if (isLogAll() || isLogTransactionEvents()) { 300 LOG.info("Forgetting transaction: {}", transactionId.getTransactionKey()); 301 } 302 super.forgetTransaction(context, transactionId); 303 } 304 305 @Override 306 public Connection[] getClients() throws Exception { 307 Connection[] result = super.getClients(); 308 309 if (isLogAll() || isLogInternalEvents()) { 310 if (result == null) { 311 LOG.info("Get Clients returned empty list."); 312 } else { 313 StringBuffer cids = new StringBuffer(); 314 for (Connection c : result) { 315 cids.append(cids.length() > 0 ? ", " : ""); 316 cids.append(c.getConnectionId()); 317 } 318 LOG.info("Connected clients: {}", cids); 319 } 320 } 321 return super.getClients(); 322 } 323 324 @Override 325 public org.apache.activemq.broker.region.Destination addDestination(ConnectionContext context, 326 ActiveMQDestination destination, boolean create) throws Exception { 327 if (isLogAll() || isLogInternalEvents()) { 328 LOG.info("Adding destination: {}:{}", destination.getDestinationTypeAsString(), destination.getPhysicalName()); 329 } 330 return super.addDestination(context, destination, create); 331 } 332 333 @Override 334 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) 335 throws Exception { 336 if (isLogAll() || isLogInternalEvents()) { 337 LOG.info("Removing destination: {}:{}", destination.getDestinationTypeAsString(), destination.getPhysicalName()); 338 } 339 super.removeDestination(context, destination, timeout); 340 } 341 342 @Override 343 public ActiveMQDestination[] getDestinations() throws Exception { 344 ActiveMQDestination[] result = super.getDestinations(); 345 if (isLogAll() || isLogInternalEvents()) { 346 if (result == null) { 347 LOG.info("Get Destinations returned empty list."); 348 } else { 349 StringBuffer destinations = new StringBuffer(); 350 for (ActiveMQDestination dest : result) { 351 destinations.append(destinations.length() > 0 ? ", " : ""); 352 destinations.append(dest.getPhysicalName()); 353 } 354 LOG.info("Get Destinations: {}", destinations); 355 } 356 } 357 return result; 358 } 359 360 @Override 361 public void start() throws Exception { 362 if (isLogAll() || isLogInternalEvents()) { 363 LOG.info("Starting {}", getBrokerName()); 364 } 365 super.start(); 366 } 367 368 @Override 369 public void stop() throws Exception { 370 if (isLogAll() || isLogInternalEvents()) { 371 LOG.info("Stopping {}", getBrokerName()); 372 } 373 super.stop(); 374 } 375 376 @Override 377 public void addSession(ConnectionContext context, SessionInfo info) throws Exception { 378 if (isLogAll() || isLogSessionEvents()) { 379 LOG.info("Adding Session: {}", info); 380 } 381 super.addSession(context, info); 382 } 383 384 @Override 385 public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { 386 if (isLogAll() || isLogSessionEvents()) { 387 LOG.info("Removing Session: {}", info); 388 } 389 super.removeSession(context, info); 390 } 391 392 @Override 393 public void addBroker(Connection connection, BrokerInfo info) { 394 if (isLogAll() || isLogInternalEvents()) { 395 LOG.info("Adding Broker {}", info.getBrokerName()); 396 } 397 super.addBroker(connection, info); 398 } 399 400 @Override 401 public void removeBroker(Connection connection, BrokerInfo info) { 402 if (isLogAll() || isLogInternalEvents()) { 403 LOG.info("Removing Broker {}", info.getBrokerName()); 404 } 405 super.removeBroker(connection, info); 406 } 407 408 @Override 409 public BrokerInfo[] getPeerBrokerInfos() { 410 BrokerInfo[] result = super.getPeerBrokerInfos(); 411 if (isLogAll() || isLogInternalEvents()) { 412 if (result == null) { 413 LOG.info("Get Peer Broker Infos returned empty list."); 414 } else { 415 StringBuffer peers = new StringBuffer(); 416 for (BrokerInfo bi : result) { 417 peers.append(peers.length() > 0 ? ", " : ""); 418 peers.append(bi.getBrokerName()); 419 } 420 LOG.info("Get Peer Broker Infos: {}", peers); 421 } 422 } 423 return result; 424 } 425 426 @Override 427 public void preProcessDispatch(MessageDispatch messageDispatch) { 428 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { 429 LOG.info("preProcessDispatch: {}", messageDispatch); 430 } 431 super.preProcessDispatch(messageDispatch); 432 } 433 434 @Override 435 public void postProcessDispatch(MessageDispatch messageDispatch) { 436 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { 437 LOG.info("postProcessDispatch: {}", messageDispatch); 438 } 439 super.postProcessDispatch(messageDispatch); 440 } 441 442 @Override 443 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 444 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { 445 LOG.info("ProcessDispatchNotification: {}", messageDispatchNotification); 446 } 447 super.processDispatchNotification(messageDispatchNotification); 448 } 449 450 @Override 451 public Set<ActiveMQDestination> getDurableDestinations() { 452 Set<ActiveMQDestination> result = super.getDurableDestinations(); 453 if (isLogAll() || isLogInternalEvents()) { 454 if (result == null) { 455 LOG.info("Get Durable Destinations returned empty list."); 456 } else { 457 StringBuffer destinations = new StringBuffer(); 458 for (ActiveMQDestination dest : result) { 459 destinations.append(destinations.length() > 0 ? ", " : ""); 460 destinations.append(dest.getPhysicalName()); 461 } 462 LOG.info("Get Durable Destinations: {}", destinations); 463 } 464 } 465 return result; 466 } 467 468 @Override 469 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 470 if (isLogAll() || isLogInternalEvents()) { 471 LOG.info("Adding destination info: {}", info); 472 } 473 super.addDestinationInfo(context, info); 474 } 475 476 @Override 477 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 478 if (isLogAll() || isLogInternalEvents()) { 479 LOG.info("Removing destination info: {}", info); 480 } 481 super.removeDestinationInfo(context, info); 482 } 483 484 @Override 485 public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) { 486 if (isLogAll() || isLogInternalEvents()) { 487 String msg = "Unable to display message."; 488 489 msg = message.getMessage().toString(); 490 491 LOG.info("Message has expired: {}", msg); 492 } 493 super.messageExpired(context, message, subscription); 494 } 495 496 @Override 497 public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, 498 Subscription subscription, Throwable poisonCause) { 499 if (isLogAll() || isLogInternalEvents()) { 500 String msg = "Unable to display message."; 501 502 msg = messageReference.getMessage().toString(); 503 504 LOG.info("Sending to DLQ: {}", msg); 505 } 506 return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause); 507 } 508 509 @Override 510 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo,ActiveMQDestination destination) { 511 if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { 512 LOG.info("Fast Producer: {}", producerInfo); 513 } 514 super.fastProducer(context, producerInfo, destination); 515 } 516 517 @Override 518 public void isFull(ConnectionContext context, Destination destination, Usage usage) { 519 if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { 520 LOG.info("Destination is full: {}", destination.getName()); 521 } 522 super.isFull(context, destination, usage); 523 } 524 525 @Override 526 public void messageConsumed(ConnectionContext context, MessageReference messageReference) { 527 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { 528 String msg = "Unable to display message."; 529 530 msg = messageReference.getMessage().toString(); 531 532 LOG.info("Message consumed: {}", msg); 533 } 534 super.messageConsumed(context, messageReference); 535 } 536 537 @Override 538 public void messageDelivered(ConnectionContext context, MessageReference messageReference) { 539 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { 540 String msg = "Unable to display message."; 541 542 msg = messageReference.getMessage().toString(); 543 544 LOG.info("Message delivered: {}", msg); 545 } 546 super.messageDelivered(context, messageReference); 547 } 548 549 @Override 550 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { 551 if (isLogAll() || isLogInternalEvents()) { 552 String msg = "Unable to display message."; 553 554 msg = messageReference.getMessage().toString(); 555 556 LOG.info("Message discarded: {}", msg); 557 } 558 super.messageDiscarded(context, sub, messageReference); 559 } 560 561 @Override 562 public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) { 563 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { 564 LOG.info("Detected slow consumer on {}", destination.getName()); 565 StringBuffer buf = new StringBuffer("Connection("); 566 buf.append(subs.getConsumerInfo().getConsumerId().getConnectionId()); 567 buf.append(") Session("); 568 buf.append(subs.getConsumerInfo().getConsumerId().getSessionId()); 569 buf.append(")"); 570 LOG.info(buf.toString()); 571 } 572 super.slowConsumer(context, destination, subs); 573 } 574 575 @Override 576 public void nowMasterBroker() { 577 if (isLogAll() || isLogInternalEvents()) { 578 LOG.info("Is now the master broker: {}", getBrokerName()); 579 } 580 super.nowMasterBroker(); 581 } 582 583 @Override 584 public String toString() { 585 StringBuffer buf = new StringBuffer(); 586 buf.append("LoggingBrokerPlugin("); 587 buf.append("logAll="); 588 buf.append(isLogAll()); 589 buf.append(", logConnectionEvents="); 590 buf.append(isLogConnectionEvents()); 591 buf.append(", logSessionEvents="); 592 buf.append(isLogSessionEvents()); 593 buf.append(", logConsumerEvents="); 594 buf.append(isLogConsumerEvents()); 595 buf.append(", logProducerEvents="); 596 buf.append(isLogProducerEvents()); 597 buf.append(", logTransactionEvents="); 598 buf.append(isLogTransactionEvents()); 599 buf.append(", logInternalEvents="); 600 buf.append(isLogInternalEvents()); 601 buf.append(")"); 602 return buf.toString(); 603 } 604 605 public void setPerDestinationLogger(boolean perDestinationLogger) { 606 this.perDestinationLogger = perDestinationLogger; 607 } 608 609 public boolean isPerDestinationLogger() { 610 return perDestinationLogger; 611 } 612}