001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.plugin; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Date; 022import java.util.Properties; 023 024import javax.management.JMException; 025import javax.management.ObjectName; 026import javax.xml.XMLConstants; 027import javax.xml.bind.JAXBContext; 028import javax.xml.bind.JAXBElement; 029import javax.xml.bind.JAXBException; 030import javax.xml.bind.Unmarshaller; 031import javax.xml.parsers.DocumentBuilder; 032import javax.xml.parsers.DocumentBuilderFactory; 033import javax.xml.parsers.ParserConfigurationException; 034import javax.xml.transform.Source; 035import javax.xml.transform.stream.StreamSource; 036import javax.xml.validation.Schema; 037import javax.xml.validation.SchemaFactory; 038 039import org.apache.activemq.broker.Broker; 040import org.apache.activemq.broker.BrokerContext; 041import org.apache.activemq.broker.jmx.ManagementContext; 042import org.apache.activemq.plugin.jmx.RuntimeConfigurationView; 043import org.apache.activemq.schema.core.DtoBroker; 044import org.apache.activemq.spring.Utils; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047import org.springframework.core.io.Resource; 048import org.w3c.dom.Document; 049import org.w3c.dom.Node; 050import org.xml.sax.SAXException; 051 052public class RuntimeConfigurationBroker extends AbstractRuntimeConfigurationBroker { 053 054 public static final Logger LOG = LoggerFactory.getLogger(RuntimeConfigurationBroker.class); 055 public static final String objectNamePropsAppendage = ",service=RuntimeConfiguration,name=Plugin"; 056 PropertiesPlaceHolderUtil placeHolderUtil = null; 057 private long checkPeriod; 058 private long lastModified = -1; 059 private Resource configToMonitor; 060 private DtoBroker currentConfiguration; 061 private Schema schema; 062 063 public RuntimeConfigurationBroker(Broker next) { 064 super(next); 065 } 066 067 @Override 068 public void start() throws Exception { 069 super.start(); 070 try { 071 BrokerContext brokerContext = next.getBrokerService().getBrokerContext(); 072 if (brokerContext != null) { 073 configToMonitor = Utils.resourceFromString(brokerContext.getConfigurationUrl()); 074 info("Configuration " + configToMonitor); 075 } else { 076 LOG.error("Null BrokerContext; impossible to determine configuration url resource from broker, updates cannot be tracked"); 077 } 078 } catch (Exception error) { 079 LOG.error("failed to determine configuration url resource from broker, updates cannot be tracked", error); 080 } 081 082 currentConfiguration = loadConfiguration(configToMonitor); 083 monitorModification(configToMonitor); 084 registerMbean(); 085 } 086 087 @Override 088 protected void registerMbean() { 089 if (getBrokerService().isUseJmx()) { 090 ManagementContext managementContext = getBrokerService().getManagementContext(); 091 try { 092 objectName = new ObjectName(getBrokerService().getBrokerObjectName().toString() + objectNamePropsAppendage); 093 managementContext.registerMBean(new RuntimeConfigurationView(this), objectName); 094 } catch (Exception ignored) { 095 LOG.debug("failed to register RuntimeConfigurationMBean", ignored); 096 } 097 } 098 } 099 100 @Override 101 protected void unregisterMbean() { 102 if (objectName != null) { 103 try { 104 getBrokerService().getManagementContext().unregisterMBean(objectName); 105 } catch (JMException ignored) { 106 } 107 } 108 } 109 110 public String updateNow() { 111 LOG.info("Manual configuration update triggered"); 112 infoString = ""; 113 applyModifications(configToMonitor); 114 String result = infoString; 115 infoString = null; 116 return result; 117 } 118 119 private void monitorModification(final Resource configToMonitor) { 120 monitorTask = new Runnable() { 121 @Override 122 public void run() { 123 try { 124 if (configToMonitor.lastModified() > lastModified) { 125 applyModifications(configToMonitor); 126 } 127 } catch (Throwable e) { 128 LOG.error("Failed to determine lastModified time on configuration: " + configToMonitor, e); 129 } 130 } 131 }; 132 if (lastModified > 0 && checkPeriod > 0) { 133 this.getBrokerService().getScheduler().executePeriodically(monitorTask, checkPeriod); 134 info("Monitoring for updates (every " + checkPeriod + "millis) : " + configToMonitor + ", lastUpdate: " + new Date(lastModified)); 135 } 136 } 137 138 139 140 private void applyModifications(Resource configToMonitor) { 141 DtoBroker changed = loadConfiguration(configToMonitor); 142 if (changed != null && !currentConfiguration.equals(changed)) { 143 LOG.info("change in " + configToMonitor + " at: " + new Date(lastModified)); 144 LOG.debug("current:" + filterPasswords(currentConfiguration)); 145 LOG.debug("new :" + filterPasswords(changed)); 146 processSelectiveChanges(currentConfiguration, changed); 147 currentConfiguration = changed; 148 } else { 149 info("No material change to configuration in " + configToMonitor + " at: " + new Date(lastModified)); 150 } 151 } 152 153 private void processSelectiveChanges(DtoBroker currentConfiguration, DtoBroker modifiedConfiguration) { 154 155 for (Class upDatable : new Class[]{ 156 DtoBroker.DestinationPolicy.class, 157 DtoBroker.NetworkConnectors.class, 158 DtoBroker.DestinationInterceptors.class, 159 DtoBroker.Plugins.class, 160 DtoBroker.Destinations.class}) { 161 processChanges(currentConfiguration, modifiedConfiguration, upDatable); 162 } 163 } 164 165 private void processChanges(DtoBroker currentConfiguration, DtoBroker modifiedConfiguration, Class upDatable) { 166 ConfigurationProcessor processor = ProcessorFactory.createProcessor(this, upDatable); 167 processor.processChanges(currentConfiguration, modifiedConfiguration); 168 } 169 170 171 172 private DtoBroker loadConfiguration(Resource configToMonitor) { 173 DtoBroker jaxbConfig = null; 174 if (configToMonitor != null) { 175 try { 176 JAXBContext context = JAXBContext.newInstance(DtoBroker.class); 177 Unmarshaller unMarshaller = context.createUnmarshaller(); 178 unMarshaller.setSchema(getSchema()); 179 180 // skip beans and pull out the broker node to validate 181 DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); 182 dbf.setNamespaceAware(true); 183 DocumentBuilder db = dbf.newDocumentBuilder(); 184 Document doc = db.parse(configToMonitor.getInputStream()); 185 Node brokerRootNode = doc.getElementsByTagNameNS("*","broker").item(0); 186 187 if (brokerRootNode != null) { 188 189 JAXBElement<DtoBroker> brokerJAXBElement = 190 unMarshaller.unmarshal(brokerRootNode, DtoBroker.class); 191 jaxbConfig = brokerJAXBElement.getValue(); 192 193 // if we can parse we can track mods 194 lastModified = configToMonitor.lastModified(); 195 196 loadPropertiesPlaceHolderSupport(doc); 197 198 } else { 199 info("Failed to find 'broker' element by tag in: " + configToMonitor); 200 } 201 202 } catch (IOException e) { 203 info("Failed to access: " + configToMonitor, e); 204 } catch (JAXBException e) { 205 info("Failed to parse: " + configToMonitor, e); 206 } catch (ParserConfigurationException e) { 207 info("Failed to document parse: " + configToMonitor, e); 208 } catch (SAXException e) { 209 info("Failed to find broker element in: " + configToMonitor, e); 210 } catch (Exception e) { 211 info("Unexpected exception during load of: " + configToMonitor, e); 212 } 213 } 214 return jaxbConfig; 215 } 216 217 private void loadPropertiesPlaceHolderSupport(Document doc) { 218 BrokerContext brokerContext = getBrokerService().getBrokerContext(); 219 if (brokerContext != null) { 220 Properties initialProperties = new Properties(System.getProperties()); 221 placeHolderUtil = new PropertiesPlaceHolderUtil(initialProperties); 222 placeHolderUtil.mergeProperties(doc, initialProperties, brokerContext); 223 } 224 } 225 226 private Schema getSchema() throws SAXException, IOException { 227 if (schema == null) { 228 SchemaFactory schemaFactory = SchemaFactory.newInstance( 229 XMLConstants.W3C_XML_SCHEMA_NS_URI); 230 231 ArrayList<StreamSource> schemas = new ArrayList<StreamSource>(); 232 schemas.add(new StreamSource(getClass().getResource("/activemq.xsd").toExternalForm())); 233 schemas.add(new StreamSource(getClass().getResource("/org/springframework/beans/factory/xml/spring-beans-3.0.xsd").toExternalForm())); 234 schema = schemaFactory.newSchema(schemas.toArray(new Source[]{})); 235 } 236 return schema; 237 } 238 239 public long getLastModified() { 240 return lastModified; 241 } 242 243 public Resource getConfigToMonitor() { 244 return configToMonitor; 245 } 246 247 public long getCheckPeriod() { 248 return checkPeriod; 249 } 250 251 public void setCheckPeriod(long checkPeriod) { 252 this.checkPeriod = checkPeriod; 253 } 254 255}