001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.plugin; 018 019import java.util.ArrayList; 020import java.util.Arrays; 021import java.util.Collections; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Set; 025 026import org.apache.activemq.broker.ConnectionContext; 027import org.apache.activemq.broker.region.CompositeDestinationInterceptor; 028import org.apache.activemq.broker.region.DestinationInterceptor; 029import org.apache.activemq.broker.region.RegionBroker; 030import org.apache.activemq.broker.region.virtual.VirtualDestination; 031import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035public abstract class UpdateVirtualDestinationsTask implements Runnable { 036 037 public static final Logger LOG = LoggerFactory.getLogger(UpdateVirtualDestinationsTask.class); 038 private final AbstractRuntimeConfigurationBroker plugin; 039 040 public UpdateVirtualDestinationsTask( 041 AbstractRuntimeConfigurationBroker plugin) { 042 super(); 043 this.plugin = plugin; 044 } 045 046 @Override 047 public void run() { 048 049 boolean updatedExistingInterceptor = false; 050 RegionBroker regionBroker = (RegionBroker) plugin.getBrokerService() 051 .getRegionBroker(); 052 053 for (DestinationInterceptor destinationInterceptor : plugin 054 .getBrokerService().getDestinationInterceptors()) { 055 if (destinationInterceptor instanceof VirtualDestinationInterceptor) { 056 // update existing interceptor 057 final VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) destinationInterceptor; 058 059 Set<VirtualDestination> existingVirtualDests = new HashSet<>(); 060 Collections.addAll(existingVirtualDests, virtualDestinationInterceptor.getVirtualDestinations()); 061 062 Set<VirtualDestination> newVirtualDests = new HashSet<>(); 063 Collections.addAll(newVirtualDests, getVirtualDestinations()); 064 065 Set<VirtualDestination> addedVirtualDests = new HashSet<>(); 066 Set<VirtualDestination> removedVirtualDests = new HashSet<>(); 067 //detect new virtual destinations 068 for (VirtualDestination newVirtualDest : newVirtualDests) { 069 if (!existingVirtualDests.contains(newVirtualDest)) { 070 addedVirtualDests.add(newVirtualDest); 071 } 072 } 073 //detect removed virtual destinations 074 for (VirtualDestination existingVirtualDest : existingVirtualDests) { 075 if (!newVirtualDests.contains(existingVirtualDest)) { 076 removedVirtualDests.add(existingVirtualDest); 077 } 078 } 079 080 virtualDestinationInterceptor 081 .setVirtualDestinations(getVirtualDestinations()); 082 plugin.info("applied updates to: " 083 + virtualDestinationInterceptor); 084 updatedExistingInterceptor = true; 085 086 ConnectionContext connectionContext; 087 try { 088 connectionContext = plugin.getBrokerService().getAdminConnectionContext(); 089 //signal updates 090 if (plugin.getBrokerService().isUseVirtualDestSubs()) { 091 for (VirtualDestination removedVirtualDest : removedVirtualDests) { 092 plugin.virtualDestinationRemoved(connectionContext, removedVirtualDest); 093 LOG.info("Removing virtual destination: {}", removedVirtualDest); 094 } 095 096 for (VirtualDestination addedVirtualDest : addedVirtualDests) { 097 plugin.virtualDestinationAdded(connectionContext, addedVirtualDest); 098 LOG.info("Adding virtual destination: {}", addedVirtualDest); 099 } 100 } 101 102 } catch (Exception e) { 103 LOG.warn("Could not process virtual destination advisories", e); 104 } 105 } 106 } 107 108 if (!updatedExistingInterceptor) { 109 // add 110 VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor(); 111 virtualDestinationInterceptor.setVirtualDestinations(getVirtualDestinations()); 112 113 List<DestinationInterceptor> interceptorsList = new ArrayList<DestinationInterceptor>(); 114 interceptorsList.addAll(Arrays.asList(plugin.getBrokerService() 115 .getDestinationInterceptors())); 116 interceptorsList.add(virtualDestinationInterceptor); 117 118 DestinationInterceptor[] destinationInterceptors = interceptorsList 119 .toArray(new DestinationInterceptor[] {}); 120 plugin.getBrokerService().setDestinationInterceptors( 121 destinationInterceptors); 122 123 ((CompositeDestinationInterceptor) regionBroker 124 .getDestinationInterceptor()) 125 .setInterceptors(destinationInterceptors); 126 plugin.info("applied new: " + interceptorsList); 127 } 128 regionBroker.reapplyInterceptor(); 129 } 130 131 protected abstract VirtualDestination[] getVirtualDestinations(); 132}