001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.plugin;
018
019import org.apache.activemq.command.ActiveMQDestination;
020import org.apache.activemq.command.ActiveMQQueue;
021import org.apache.activemq.command.ActiveMQTopic;
022
023import org.apache.activemq.schema.core.DtoQueue;
024import org.apache.activemq.schema.core.DtoTopic;
025
026import java.util.Arrays;
027import java.util.List;
028
029public class DestinationsProcessor extends DefaultConfigurationProcessor {
030
031    public DestinationsProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
032        super(plugin, configurationClass);
033    }
034
035    @Override
036    public void processChanges(List current, List modified) {
037        for (Object destinations : modified) {
038            for (Object dto : getContents(destinations)) {
039                try {
040                    ActiveMQDestination destination = createDestination(dto);
041                    if (!containsDestination(destination)) {
042                        plugin.addDestination(plugin.getBrokerService().getAdminConnectionContext(), destination, true);
043                        plugin.info("Added destination " + destination);
044                    }
045                } catch (Exception e) {
046                    plugin.info("Failed to add a new destination for DTO: " + dto, e);
047                }
048            }
049        }
050    }
051
052    protected boolean containsDestination(ActiveMQDestination destination) throws Exception {
053        return Arrays.asList(plugin.getBrokerService().getRegionBroker().getDestinations()).contains(destination);
054    }
055
056    @Override
057    public void addNew(Object o) {
058        try {
059            ActiveMQDestination destination = createDestination(o);
060            plugin.addDestination(plugin.getBrokerService().getAdminConnectionContext(), destination, true);
061            plugin.info("Added destination " + destination);
062        } catch (Exception e) {
063            plugin.info("Failed to add a new destination for DTO: " + o, e);
064        }
065    }
066
067    private ActiveMQDestination createDestination(Object dto) throws Exception {
068        if (dto instanceof DtoQueue) {
069            return new ActiveMQQueue(((DtoQueue) dto).getPhysicalName());
070        } else if (dto instanceof DtoTopic) {
071            return new ActiveMQTopic(((DtoTopic) dto).getPhysicalName());
072        } else {
073            throw new Exception("Unknown destination type for DTO " + dto);
074        }
075    }
076}