001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.plugin;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Date;
022import java.util.Properties;
023
024import javax.management.JMException;
025import javax.management.ObjectName;
026import javax.xml.XMLConstants;
027import javax.xml.bind.JAXBContext;
028import javax.xml.bind.JAXBElement;
029import javax.xml.bind.JAXBException;
030import javax.xml.bind.Unmarshaller;
031import javax.xml.parsers.DocumentBuilder;
032import javax.xml.parsers.DocumentBuilderFactory;
033import javax.xml.parsers.ParserConfigurationException;
034import javax.xml.transform.Source;
035import javax.xml.transform.stream.StreamSource;
036import javax.xml.validation.Schema;
037import javax.xml.validation.SchemaFactory;
038
039import org.apache.activemq.broker.Broker;
040import org.apache.activemq.broker.BrokerContext;
041import org.apache.activemq.broker.jmx.ManagementContext;
042import org.apache.activemq.plugin.jmx.RuntimeConfigurationView;
043import org.apache.activemq.schema.core.DtoBroker;
044import org.apache.activemq.spring.Utils;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047import org.springframework.core.io.Resource;
048import org.w3c.dom.Document;
049import org.w3c.dom.Node;
050import org.xml.sax.SAXException;
051
052public class RuntimeConfigurationBroker extends AbstractRuntimeConfigurationBroker {
053
054    public static final Logger LOG = LoggerFactory.getLogger(RuntimeConfigurationBroker.class);
055    public static final String objectNamePropsAppendage = ",service=RuntimeConfiguration,name=Plugin";
056    PropertiesPlaceHolderUtil placeHolderUtil = null;
057    private long checkPeriod;
058    private long lastModified = -1;
059    private Resource configToMonitor;
060    private DtoBroker currentConfiguration;
061    private Schema schema;
062
063    public RuntimeConfigurationBroker(Broker next) {
064        super(next);
065    }
066
067    @Override
068    public void start() throws Exception {
069        super.start();
070        try {
071            BrokerContext brokerContext = next.getBrokerService().getBrokerContext();
072            if (brokerContext != null) {
073                configToMonitor = Utils.resourceFromString(brokerContext.getConfigurationUrl());
074                info("Configuration " + configToMonitor);
075            } else {
076                LOG.error("Null BrokerContext; impossible to determine configuration url resource from broker, updates cannot be tracked");
077            }
078        } catch (Exception error) {
079            LOG.error("failed to determine configuration url resource from broker, updates cannot be tracked", error);
080        }
081
082        currentConfiguration = loadConfiguration(configToMonitor);
083        monitorModification(configToMonitor);
084        registerMbean();
085    }
086
087    @Override
088    protected void registerMbean() {
089        if (getBrokerService().isUseJmx()) {
090            ManagementContext managementContext = getBrokerService().getManagementContext();
091            try {
092                objectName = new ObjectName(getBrokerService().getBrokerObjectName().toString() + objectNamePropsAppendage);
093                managementContext.registerMBean(new RuntimeConfigurationView(this), objectName);
094            } catch (Exception ignored) {
095                LOG.debug("failed to register RuntimeConfigurationMBean", ignored);
096            }
097        }
098    }
099
100    @Override
101    protected void unregisterMbean() {
102        if (objectName != null) {
103            try {
104                getBrokerService().getManagementContext().unregisterMBean(objectName);
105            } catch (JMException ignored) {
106            }
107        }
108    }
109
110    public String updateNow() {
111        LOG.info("Manual configuration update triggered");
112        infoString = "";
113        applyModifications(configToMonitor);
114        String result = infoString;
115        infoString = null;
116        return result;
117    }
118
119    private void monitorModification(final Resource configToMonitor) {
120        monitorTask = new Runnable() {
121            @Override
122            public void run() {
123                try {
124                    if (configToMonitor.lastModified() > lastModified) {
125                        applyModifications(configToMonitor);
126                    }
127                } catch (Throwable e) {
128                    LOG.error("Failed to determine lastModified time on configuration: " + configToMonitor, e);
129                }
130            }
131        };
132        if (lastModified > 0 && checkPeriod > 0) {
133            this.getBrokerService().getScheduler().executePeriodically(monitorTask, checkPeriod);
134            info("Monitoring for updates (every " + checkPeriod + "millis) : " + configToMonitor + ", lastUpdate: " + new Date(lastModified));
135        }
136    }
137
138
139
140    private void applyModifications(Resource configToMonitor) {
141        DtoBroker changed = loadConfiguration(configToMonitor);
142        if (changed != null && !currentConfiguration.equals(changed)) {
143            LOG.info("change in " + configToMonitor + " at: " + new Date(lastModified));
144            LOG.debug("current:" + filterPasswords(currentConfiguration));
145            LOG.debug("new    :" + filterPasswords(changed));
146            processSelectiveChanges(currentConfiguration, changed);
147            currentConfiguration = changed;
148        } else {
149            info("No material change to configuration in " + configToMonitor + " at: " + new Date(lastModified));
150        }
151    }
152
153    private void processSelectiveChanges(DtoBroker currentConfiguration, DtoBroker modifiedConfiguration) {
154
155        for (Class upDatable : new Class[]{
156                DtoBroker.DestinationPolicy.class,
157                DtoBroker.NetworkConnectors.class,
158                DtoBroker.DestinationInterceptors.class,
159                DtoBroker.Plugins.class,
160                DtoBroker.Destinations.class}) {
161            processChanges(currentConfiguration, modifiedConfiguration, upDatable);
162        }
163    }
164
165    private void processChanges(DtoBroker currentConfiguration, DtoBroker modifiedConfiguration, Class upDatable) {
166        ConfigurationProcessor processor = ProcessorFactory.createProcessor(this, upDatable);
167        processor.processChanges(currentConfiguration, modifiedConfiguration);
168    }
169
170
171
172    private DtoBroker loadConfiguration(Resource configToMonitor) {
173        DtoBroker jaxbConfig = null;
174        if (configToMonitor != null) {
175            try {
176                JAXBContext context = JAXBContext.newInstance(DtoBroker.class);
177                Unmarshaller unMarshaller = context.createUnmarshaller();
178                unMarshaller.setSchema(getSchema());
179
180                // skip beans and pull out the broker node to validate
181                DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
182                dbf.setNamespaceAware(true);
183                DocumentBuilder db = dbf.newDocumentBuilder();
184                Document doc = db.parse(configToMonitor.getInputStream());
185                Node brokerRootNode = doc.getElementsByTagNameNS("*","broker").item(0);
186
187                if (brokerRootNode != null) {
188
189                    JAXBElement<DtoBroker> brokerJAXBElement =
190                            unMarshaller.unmarshal(brokerRootNode, DtoBroker.class);
191                    jaxbConfig = brokerJAXBElement.getValue();
192
193                    // if we can parse we can track mods
194                    lastModified = configToMonitor.lastModified();
195
196                    loadPropertiesPlaceHolderSupport(doc);
197
198                } else {
199                    info("Failed to find 'broker' element by tag in: " + configToMonitor);
200                }
201
202            } catch (IOException e) {
203                info("Failed to access: " + configToMonitor, e);
204            } catch (JAXBException e) {
205                info("Failed to parse: " + configToMonitor, e);
206            } catch (ParserConfigurationException e) {
207                info("Failed to document parse: " + configToMonitor, e);
208            } catch (SAXException e) {
209                info("Failed to find broker element in: " + configToMonitor, e);
210            } catch (Exception e) {
211                info("Unexpected exception during load of: " + configToMonitor, e);
212            }
213        }
214        return jaxbConfig;
215    }
216
217    private void loadPropertiesPlaceHolderSupport(Document doc) {
218        BrokerContext brokerContext = getBrokerService().getBrokerContext();
219        if (brokerContext != null) {
220            Properties initialProperties = new Properties(System.getProperties());
221            placeHolderUtil = new PropertiesPlaceHolderUtil(initialProperties);
222            placeHolderUtil.mergeProperties(doc, initialProperties, brokerContext);
223        }
224    }
225
226    private Schema getSchema() throws SAXException, IOException {
227        if (schema == null) {
228            SchemaFactory schemaFactory = SchemaFactory.newInstance(
229                    XMLConstants.W3C_XML_SCHEMA_NS_URI);
230
231            ArrayList<StreamSource> schemas = new ArrayList<StreamSource>();
232            schemas.add(new StreamSource(getClass().getResource("/activemq.xsd").toExternalForm()));
233            schemas.add(new StreamSource(getClass().getResource("/org/springframework/beans/factory/xml/spring-beans-3.0.xsd").toExternalForm()));
234            schema = schemaFactory.newSchema(schemas.toArray(new Source[]{}));
235        }
236        return schema;
237    }
238
239    public long getLastModified() {
240        return lastModified;
241    }
242
243    public Resource getConfigToMonitor() {
244        return configToMonitor;
245    }
246
247    public long getCheckPeriod() {
248        return checkPeriod;
249    }
250
251    public void setCheckPeriod(long checkPeriod) {
252        this.checkPeriod = checkPeriod;
253    }
254
255}