001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.File;
020import java.io.FileFilter;
021import java.io.IOException;
022import java.nio.charset.Charset;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.CopyOnWriteArrayList;
030
031import javax.transaction.xa.Xid;
032
033import org.apache.activemq.broker.BrokerService;
034import org.apache.activemq.broker.BrokerServiceAware;
035import org.apache.activemq.broker.ConnectionContext;
036import org.apache.activemq.broker.Lockable;
037import org.apache.activemq.broker.LockableServiceSupport;
038import org.apache.activemq.broker.Locker;
039import org.apache.activemq.broker.scheduler.JobSchedulerStore;
040import org.apache.activemq.command.ActiveMQDestination;
041import org.apache.activemq.command.ActiveMQQueue;
042import org.apache.activemq.command.ActiveMQTopic;
043import org.apache.activemq.command.LocalTransactionId;
044import org.apache.activemq.command.ProducerId;
045import org.apache.activemq.command.TransactionId;
046import org.apache.activemq.command.XATransactionId;
047import org.apache.activemq.filter.AnyDestination;
048import org.apache.activemq.filter.DestinationMap;
049import org.apache.activemq.filter.DestinationMapEntry;
050import org.apache.activemq.store.MessageStore;
051import org.apache.activemq.store.PersistenceAdapter;
052import org.apache.activemq.store.SharedFileLocker;
053import org.apache.activemq.store.TopicMessageStore;
054import org.apache.activemq.store.TransactionIdTransformer;
055import org.apache.activemq.store.TransactionIdTransformerAware;
056import org.apache.activemq.store.TransactionStore;
057import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
058import org.apache.activemq.usage.StoreUsage;
059import org.apache.activemq.usage.SystemUsage;
060import org.apache.activemq.util.IOExceptionSupport;
061import org.apache.activemq.util.IOHelper;
062import org.apache.activemq.util.IntrospectionSupport;
063import org.apache.activemq.util.ServiceStopper;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067import static org.apache.activemq.store.kahadb.MessageDatabase.DEFAULT_DIRECTORY;
068
069/**
070 * An implementation of {@link org.apache.activemq.store.PersistenceAdapter}  that supports
071 * distribution of destinations across multiple kahaDB persistence adapters
072 *
073 * @org.apache.xbean.XBean element="mKahaDB"
074 */
075public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, BrokerServiceAware {
076    static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBPersistenceAdapter.class);
077
078    final static ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
079    final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616"));
080
081    final class DelegateDestinationMap extends DestinationMap {
082        @Override
083        public void setEntries(List<DestinationMapEntry>  entries) {
084            super.setEntries(entries);
085        }
086    };
087    final DelegateDestinationMap destinationMap = new DelegateDestinationMap();
088
089    List<PersistenceAdapter> adapters = new CopyOnWriteArrayList<PersistenceAdapter>();
090    private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB");
091
092    MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this);
093
094    // all local store transactions are XA, 2pc if more than one adapter involved
095    TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() {
096        @Override
097        public TransactionId transform(TransactionId txid) {
098            if (txid == null) {
099                return null;
100            }
101            if (txid.isLocalTransaction()) {
102                final LocalTransactionId t = (LocalTransactionId) txid;
103                return new XATransactionId(new Xid() {
104                    @Override
105                    public int getFormatId() {
106                        return LOCAL_FORMAT_ID_MAGIC;
107                    }
108
109                    @Override
110                    public byte[] getGlobalTransactionId() {
111                        return t.getConnectionId().getValue().getBytes(Charset.forName("utf-8"));
112                    }
113
114                    @Override
115                    public byte[] getBranchQualifier() {
116                        return Long.toString(t.getValue()).getBytes(Charset.forName("utf-8"));
117                    }
118                });
119            } else {
120                return txid;
121            }
122        }
123    };
124
125    /**
126     * Sets the  FilteredKahaDBPersistenceAdapter entries
127     *
128     * @org.apache.xbean.ElementType class="org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter"
129     */
130    @SuppressWarnings({ "rawtypes", "unchecked" })
131    public void setFilteredPersistenceAdapters(List entries) {
132        for (Object entry : entries) {
133            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) entry;
134            PersistenceAdapter adapter = filteredAdapter.getPersistenceAdapter();
135            if (filteredAdapter.getDestination() == null) {
136                filteredAdapter.setDestination(matchAll);
137            }
138
139            if (filteredAdapter.isPerDestination()) {
140                configureDirectory(adapter, null);
141                // per destination adapters will be created on demand or during recovery
142                continue;
143            } else {
144                configureDirectory(adapter, nameFromDestinationFilter(filteredAdapter.getDestination()));
145            }
146
147            configureAdapter(adapter);
148            adapters.add(adapter);
149        }
150        destinationMap.setEntries(entries);
151    }
152
153    public static String nameFromDestinationFilter(ActiveMQDestination destination) {
154        if (destination.getQualifiedName().length() > IOHelper.getMaxFileNameLength()) {
155            LOG.warn("Destination name is longer than 'MaximumFileNameLength' system property, " +
156                     "potential problem with recovery can result from name truncation.");
157        }
158
159        return IOHelper.toFileSystemSafeName(destination.getQualifiedName());
160    }
161
162    public boolean isLocalXid(TransactionId xid) {
163        return xid instanceof XATransactionId &&
164                ((XATransactionId)xid).getFormatId() == LOCAL_FORMAT_ID_MAGIC;
165    }
166
167    @Override
168    public void beginTransaction(ConnectionContext context) throws IOException {
169        throw new IllegalStateException();
170    }
171
172    @Override
173    public void checkpoint(final boolean sync) throws IOException {
174        for (PersistenceAdapter persistenceAdapter : adapters) {
175            persistenceAdapter.checkpoint(sync);
176        }
177    }
178
179    @Override
180    public void commitTransaction(ConnectionContext context) throws IOException {
181        throw new IllegalStateException();
182    }
183
184    @Override
185    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
186        PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
187        return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createQueueMessageStore(destination));
188    }
189
190    private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination) throws IOException {
191        Object result = destinationMap.chooseValue(destination);
192        if (result == null) {
193            throw new RuntimeException("No matching persistence adapter configured for destination: " + destination + ", options:" + adapters);
194        }
195        FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
196        if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
197            filteredAdapter = addAdapter(filteredAdapter, destination);
198            if (LOG.isTraceEnabled()) {
199                LOG.info("created per destination adapter for: " + destination  + ", " + result);
200            }
201        }
202        startAdapter(filteredAdapter.getPersistenceAdapter(), destination.getQualifiedName());
203        LOG.debug("destination {} matched persistence adapter {}", new Object[]{destination.getQualifiedName(), filteredAdapter.getPersistenceAdapter()});
204        return filteredAdapter.getPersistenceAdapter();
205    }
206
207    private void startAdapter(PersistenceAdapter kahaDBPersistenceAdapter, String destination) {
208        try {
209            kahaDBPersistenceAdapter.start();
210        } catch (Exception e) {
211            RuntimeException detail = new RuntimeException("Failed to start per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
212            LOG.error(detail.toString(), e);
213            throw detail;
214        }
215    }
216
217    private void stopAdapter(PersistenceAdapter kahaDBPersistenceAdapter, String destination) {
218        try {
219            kahaDBPersistenceAdapter.stop();
220        } catch (Exception e) {
221            RuntimeException detail = new RuntimeException("Failed to stop per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
222            LOG.error(detail.toString(), e);
223            throw detail;
224        }
225    }
226
227    @Override
228    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
229        PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
230        return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination));
231    }
232
233    @Override
234    public TransactionStore createTransactionStore() throws IOException {
235        return transactionStore;
236    }
237
238    @Override
239    public void deleteAllMessages() throws IOException {
240        for (PersistenceAdapter persistenceAdapter : adapters) {
241            persistenceAdapter.deleteAllMessages();
242        }
243        transactionStore.deleteAllMessages();
244        IOHelper.deleteChildren(getDirectory());
245        for (Object o : destinationMap.get(new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")}))) {
246            if (o instanceof FilteredKahaDBPersistenceAdapter) {
247                FilteredKahaDBPersistenceAdapter filteredKahaDBPersistenceAdapter = (FilteredKahaDBPersistenceAdapter) o;
248                if (filteredKahaDBPersistenceAdapter.getPersistenceAdapter().getDirectory() != DEFAULT_DIRECTORY) {
249                    IOHelper.deleteChildren(filteredKahaDBPersistenceAdapter.getPersistenceAdapter().getDirectory());
250                }
251                if (filteredKahaDBPersistenceAdapter.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
252                    KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) filteredKahaDBPersistenceAdapter.getPersistenceAdapter();
253                    if (kahaDBPersistenceAdapter.getIndexDirectory() != null) {
254                        IOHelper.deleteChildren(kahaDBPersistenceAdapter.getIndexDirectory());
255                    }
256                }
257            }
258        }
259    }
260
261    @Override
262    public Set<ActiveMQDestination> getDestinations() {
263        Set<ActiveMQDestination> results = new HashSet<ActiveMQDestination>();
264        for (PersistenceAdapter persistenceAdapter : adapters) {
265            results.addAll(persistenceAdapter.getDestinations());
266        }
267        return results;
268    }
269
270    @Override
271    public long getLastMessageBrokerSequenceId() throws IOException {
272        long maxId = -1;
273        for (PersistenceAdapter persistenceAdapter : adapters) {
274            maxId = Math.max(maxId, persistenceAdapter.getLastMessageBrokerSequenceId());
275        }
276        return maxId;
277    }
278
279    @Override
280    public long getLastProducerSequenceId(ProducerId id) throws IOException {
281        long maxId = -1;
282        for (PersistenceAdapter persistenceAdapter : adapters) {
283            maxId = Math.max(maxId, persistenceAdapter.getLastProducerSequenceId(id));
284        }
285        return maxId;
286    }
287
288    @Override
289    public void removeQueueMessageStore(ActiveMQQueue destination) {
290        PersistenceAdapter adapter = null;
291        try {
292            adapter = getMatchingPersistenceAdapter(destination);
293        } catch (IOException e) {
294            throw new RuntimeException(e);
295        }
296        if (adapter instanceof PersistenceAdapter && adapter.getDestinations().isEmpty()) {
297            adapter.removeQueueMessageStore(destination);
298            removeMessageStore(adapter, destination);
299            destinationMap.removeAll(destination);
300        }
301    }
302
303    @Override
304    public void removeTopicMessageStore(ActiveMQTopic destination) {
305        PersistenceAdapter adapter = null;
306        try {
307            adapter = getMatchingPersistenceAdapter(destination);
308        } catch (IOException e) {
309            throw new RuntimeException(e);
310        }
311        if (adapter instanceof PersistenceAdapter && adapter.getDestinations().isEmpty()) {
312            adapter.removeTopicMessageStore(destination);
313            removeMessageStore(adapter, destination);
314            destinationMap.removeAll(destination);
315        }
316    }
317
318    private void removeMessageStore(PersistenceAdapter adapter, ActiveMQDestination destination) {
319        stopAdapter(adapter, destination.toString());
320        File adapterDir = adapter.getDirectory();
321        if (adapterDir != null) {
322            if (IOHelper.deleteFile(adapterDir)) {
323                if (LOG.isTraceEnabled()) {
324                    LOG.info("deleted per destination adapter directory for: " + destination);
325                }
326            } else {
327                if (LOG.isTraceEnabled()) {
328                    LOG.info("failed to deleted per destination adapter directory for: " + destination);
329                }
330            }
331        }
332    }
333
334    @Override
335    public void rollbackTransaction(ConnectionContext context) throws IOException {
336        throw new IllegalStateException();
337    }
338
339    @Override
340    public void setBrokerName(String brokerName) {
341        for (PersistenceAdapter persistenceAdapter : adapters) {
342            persistenceAdapter.setBrokerName(brokerName);
343        }
344    }
345
346    @Override
347    public void setUsageManager(SystemUsage usageManager) {
348        for (PersistenceAdapter persistenceAdapter : adapters) {
349            persistenceAdapter.setUsageManager(usageManager);
350        }
351    }
352
353    @Override
354    public long size() {
355        long size = 0;
356        for (PersistenceAdapter persistenceAdapter : adapters) {
357            size += persistenceAdapter.size();
358        }
359        return size;
360    }
361
362    @Override
363    public void doStart() throws Exception {
364        Object result = destinationMap.chooseValue(matchAll);
365        if (result != null) {
366            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
367            if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
368                findAndRegisterExistingAdapters(filteredAdapter);
369            }
370        }
371        for (PersistenceAdapter persistenceAdapter : adapters) {
372            persistenceAdapter.start();
373        }
374    }
375
376    private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) throws IOException {
377        FileFilter destinationNames = new FileFilter() {
378            @Override
379            public boolean accept(File file) {
380                return file.getName().startsWith("queue#") || file.getName().startsWith("topic#");
381            }
382        };
383        File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames);
384        if (candidates != null) {
385            for (File candidate : candidates) {
386                registerExistingAdapter(template, candidate);
387            }
388        }
389    }
390
391    private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) throws IOException {
392        PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter, candidate.getName());
393        startAdapter(adapter, candidate.getName());
394        Set<ActiveMQDestination> destinations = adapter.getDestinations();
395        if (destinations.size() != 0) {
396            registerAdapter(filteredAdapter, adapter, destinations.toArray(new ActiveMQDestination[]{})[0]);
397        } else {
398            stopAdapter(adapter, candidate.getName());
399        }
400    }
401
402    private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) throws IOException {
403        PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter, nameFromDestinationFilter(destination));
404        return registerAdapter(filteredAdapter, adapter, destination);
405    }
406
407    private PersistenceAdapter adapterFromTemplate(FilteredKahaDBPersistenceAdapter template, String destinationName) throws IOException {
408        PersistenceAdapter adapter = kahaDBFromTemplate(template.getPersistenceAdapter());
409        configureAdapter(adapter);
410        configureDirectory(adapter, destinationName);
411        configureIndexDirectory(adapter, template.getPersistenceAdapter(), destinationName);
412        return adapter;
413    }
414
415    private void configureIndexDirectory(PersistenceAdapter adapter, PersistenceAdapter template, String destinationName) {
416        if (template instanceof KahaDBPersistenceAdapter) {
417            KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) template;
418            if (kahaDBPersistenceAdapter.getIndexDirectory() != null) {
419                if (adapter instanceof KahaDBPersistenceAdapter) {
420                    File directory = kahaDBPersistenceAdapter.getIndexDirectory();
421                    if (destinationName != null) {
422                        directory = new File(directory, destinationName);
423                    }
424                    ((KahaDBPersistenceAdapter)adapter).setIndexDirectory(directory);
425                }
426            }
427        }
428    }
429
430    private void configureDirectory(PersistenceAdapter adapter, String fileName) {
431        File directory = null;
432        File defaultDir = DEFAULT_DIRECTORY;
433        try {
434            defaultDir = adapter.getClass().newInstance().getDirectory();
435        } catch (Exception e) {
436        }
437        if (defaultDir.equals(adapter.getDirectory())) {
438            // not set so inherit from mkahadb
439            directory = getDirectory();
440        } else {
441            directory = adapter.getDirectory();
442        }
443
444        if (fileName != null) {
445            directory = new File(directory, fileName);
446        }
447        adapter.setDirectory(directory);
448    }
449
450    private FilteredKahaDBPersistenceAdapter registerAdapter(FilteredKahaDBPersistenceAdapter template, PersistenceAdapter adapter, ActiveMQDestination destination) {
451        adapters.add(adapter);
452        FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(template, destination, adapter);
453        destinationMap.put(destination, result);
454        return result;
455    }
456
457    private void configureAdapter(PersistenceAdapter adapter) {
458        // need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans
459        ((TransactionIdTransformerAware)adapter).setTransactionIdTransformer(transactionIdTransformer);
460        if (isUseLock()) {
461            if( adapter instanceof Lockable ) {
462                ((Lockable)adapter).setUseLock(false);
463            }
464        }
465        if( adapter instanceof BrokerServiceAware ) {
466            ((BrokerServiceAware)adapter).setBrokerService(getBrokerService());
467        }
468    }
469
470    private PersistenceAdapter kahaDBFromTemplate(PersistenceAdapter template) throws IOException {
471        try {
472            Map<String, Object> configuration = new HashMap<String, Object>();
473            IntrospectionSupport.getProperties(template, configuration, null);
474            PersistenceAdapter adapter = template.getClass().newInstance();
475            IntrospectionSupport.setProperties(adapter, configuration);
476            return adapter;
477        } catch (Exception e) {
478            throw IOExceptionSupport.create(e);
479        }
480    }
481
482    @Override
483    protected void doStop(ServiceStopper stopper) throws Exception {
484        for (PersistenceAdapter persistenceAdapter : adapters) {
485            stopper.stop(persistenceAdapter);
486        }
487    }
488
489    @Override
490    public File getDirectory() {
491        return this.directory;
492    }
493
494    @Override
495    public void setDirectory(File directory) {
496        this.directory = directory;
497    }
498
499    @Override
500    public void init() throws Exception {
501    }
502
503    @Override
504    public void setBrokerService(BrokerService brokerService) {
505        super.setBrokerService(brokerService);
506        for (PersistenceAdapter persistenceAdapter : adapters) {
507            if( persistenceAdapter instanceof BrokerServiceAware ) {
508                ((BrokerServiceAware)persistenceAdapter).setBrokerService(getBrokerService());
509            }
510        }
511    }
512
513    public void setTransactionStore(MultiKahaDBTransactionStore transactionStore) {
514        this.transactionStore = transactionStore;
515    }
516
517    /**
518     * Set the max file length of the transaction journal
519     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
520     * be used
521     *
522     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
523     */
524    public void setJournalMaxFileLength(int maxFileLength) {
525        transactionStore.setJournalMaxFileLength(maxFileLength);
526    }
527
528    public int getJournalMaxFileLength() {
529        return transactionStore.getJournalMaxFileLength();
530    }
531
532    /**
533     * Set the max write batch size of  the transaction journal
534     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
535     * be used
536     *
537     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
538     */
539    public void setJournalWriteBatchSize(int journalWriteBatchSize) {
540        transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize);
541    }
542
543    public int getJournalWriteBatchSize() {
544        return transactionStore.getJournalMaxWriteBatchSize();
545    }
546
547    public List<PersistenceAdapter> getAdapters() {
548        return Collections.unmodifiableList(adapters);
549    }
550
551    @Override
552    public String toString() {
553        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
554        return "MultiKahaDBPersistenceAdapter[" + path + "]" + adapters;
555    }
556
557    @Override
558    public Locker createDefaultLocker() throws IOException {
559        SharedFileLocker locker = new SharedFileLocker();
560        locker.configure(this);
561        return locker;
562    }
563
564    @Override
565    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
566        return new JobSchedulerStoreImpl();
567    }
568}