001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.File;
020import java.io.FileFilter;
021import java.io.IOException;
022import java.nio.charset.Charset;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.CopyOnWriteArrayList;
030
031import javax.transaction.xa.Xid;
032
033import org.apache.activemq.broker.BrokerService;
034import org.apache.activemq.broker.BrokerServiceAware;
035import org.apache.activemq.broker.ConnectionContext;
036import org.apache.activemq.broker.Lockable;
037import org.apache.activemq.broker.LockableServiceSupport;
038import org.apache.activemq.broker.Locker;
039import org.apache.activemq.broker.scheduler.JobSchedulerStore;
040import org.apache.activemq.command.ActiveMQDestination;
041import org.apache.activemq.command.ActiveMQQueue;
042import org.apache.activemq.command.ActiveMQTopic;
043import org.apache.activemq.command.LocalTransactionId;
044import org.apache.activemq.command.ProducerId;
045import org.apache.activemq.command.TransactionId;
046import org.apache.activemq.command.XATransactionId;
047import org.apache.activemq.filter.AnyDestination;
048import org.apache.activemq.filter.DestinationMap;
049import org.apache.activemq.filter.DestinationMapEntry;
050import org.apache.activemq.store.MessageStore;
051import org.apache.activemq.store.PersistenceAdapter;
052import org.apache.activemq.store.SharedFileLocker;
053import org.apache.activemq.store.TopicMessageStore;
054import org.apache.activemq.store.TransactionIdTransformer;
055import org.apache.activemq.store.TransactionIdTransformerAware;
056import org.apache.activemq.store.TransactionStore;
057import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
058import org.apache.activemq.usage.SystemUsage;
059import org.apache.activemq.util.IOExceptionSupport;
060import org.apache.activemq.util.IOHelper;
061import org.apache.activemq.util.IntrospectionSupport;
062import org.apache.activemq.util.ServiceStopper;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import static org.apache.activemq.store.kahadb.MessageDatabase.DEFAULT_DIRECTORY;
067
068/**
069 * An implementation of {@link org.apache.activemq.store.PersistenceAdapter}  that supports
070 * distribution of destinations across multiple kahaDB persistence adapters
071 *
072 * @org.apache.xbean.XBean element="mKahaDB"
073 */
074public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, BrokerServiceAware {
075    static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBPersistenceAdapter.class);
076
077    final static ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
078    final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616"));
079
080    final class DelegateDestinationMap extends DestinationMap {
081        @Override
082        public void setEntries(List<DestinationMapEntry>  entries) {
083            super.setEntries(entries);
084        }
085    };
086    final DelegateDestinationMap destinationMap = new DelegateDestinationMap();
087
088    List<PersistenceAdapter> adapters = new CopyOnWriteArrayList<PersistenceAdapter>();
089    private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB");
090
091    MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this);
092
093    // all local store transactions are XA, 2pc if more than one adapter involved
094    TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() {
095        @Override
096        public TransactionId transform(TransactionId txid) {
097            if (txid == null) {
098                return null;
099            }
100            if (txid.isLocalTransaction()) {
101                final LocalTransactionId t = (LocalTransactionId) txid;
102                return new XATransactionId(new Xid() {
103                    @Override
104                    public int getFormatId() {
105                        return LOCAL_FORMAT_ID_MAGIC;
106                    }
107
108                    @Override
109                    public byte[] getGlobalTransactionId() {
110                        return t.getConnectionId().getValue().getBytes(Charset.forName("utf-8"));
111                    }
112
113                    @Override
114                    public byte[] getBranchQualifier() {
115                        return Long.toString(t.getValue()).getBytes(Charset.forName("utf-8"));
116                    }
117                });
118            } else {
119                return txid;
120            }
121        }
122    };
123
124    /**
125     * Sets the  FilteredKahaDBPersistenceAdapter entries
126     *
127     * @org.apache.xbean.ElementType class="org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter"
128     */
129    @SuppressWarnings({ "rawtypes", "unchecked" })
130    public void setFilteredPersistenceAdapters(List entries) {
131        for (Object entry : entries) {
132            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) entry;
133            PersistenceAdapter adapter = filteredAdapter.getPersistenceAdapter();
134            if (filteredAdapter.getDestination() == null) {
135                filteredAdapter.setDestination(matchAll);
136            }
137
138            if (filteredAdapter.isPerDestination()) {
139                configureDirectory(adapter, null);
140                // per destination adapters will be created on demand or during recovery
141                continue;
142            } else {
143                configureDirectory(adapter, nameFromDestinationFilter(filteredAdapter.getDestination()));
144            }
145
146            configureAdapter(adapter);
147            adapters.add(adapter);
148        }
149        destinationMap.setEntries(entries);
150    }
151
152    public static String nameFromDestinationFilter(ActiveMQDestination destination) {
153        if (destination.getQualifiedName().length() > IOHelper.getMaxFileNameLength()) {
154            LOG.warn("Destination name is longer than 'MaximumFileNameLength' system property, " +
155                     "potential problem with recovery can result from name truncation.");
156        }
157
158        return IOHelper.toFileSystemSafeName(destination.getQualifiedName());
159    }
160
161    public boolean isLocalXid(TransactionId xid) {
162        return xid instanceof XATransactionId &&
163                ((XATransactionId)xid).getFormatId() == LOCAL_FORMAT_ID_MAGIC;
164    }
165
166    @Override
167    public void beginTransaction(ConnectionContext context) throws IOException {
168        throw new IllegalStateException();
169    }
170
171    @Override
172    public void checkpoint(final boolean cleanup) throws IOException {
173        for (PersistenceAdapter persistenceAdapter : adapters) {
174            persistenceAdapter.checkpoint(cleanup);
175        }
176    }
177
178    @Override
179    public void commitTransaction(ConnectionContext context) throws IOException {
180        throw new IllegalStateException();
181    }
182
183    @Override
184    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
185        PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
186        return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createQueueMessageStore(destination));
187    }
188
189    private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination) throws IOException {
190        Object result = destinationMap.chooseValue(destination);
191        if (result == null) {
192            throw new RuntimeException("No matching persistence adapter configured for destination: " + destination + ", options:" + adapters);
193        }
194        FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
195        if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
196            filteredAdapter = addAdapter(filteredAdapter, destination);
197            if (LOG.isTraceEnabled()) {
198                LOG.info("created per destination adapter for: " + destination  + ", " + result);
199            }
200        }
201        startAdapter(filteredAdapter.getPersistenceAdapter(), destination.getQualifiedName());
202        LOG.debug("destination {} matched persistence adapter {}", new Object[]{destination.getQualifiedName(), filteredAdapter.getPersistenceAdapter()});
203        return filteredAdapter.getPersistenceAdapter();
204    }
205
206    private void startAdapter(PersistenceAdapter kahaDBPersistenceAdapter, String destination) {
207        try {
208            kahaDBPersistenceAdapter.start();
209        } catch (Exception e) {
210            RuntimeException detail = new RuntimeException("Failed to start per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
211            LOG.error(detail.toString(), e);
212            throw detail;
213        }
214    }
215
216    private void stopAdapter(PersistenceAdapter kahaDBPersistenceAdapter, String destination) {
217        try {
218            kahaDBPersistenceAdapter.stop();
219        } catch (Exception e) {
220            RuntimeException detail = new RuntimeException("Failed to stop per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
221            LOG.error(detail.toString(), e);
222            throw detail;
223        }
224    }
225
226    @Override
227    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
228        PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
229        return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination));
230    }
231
232    @Override
233    public TransactionStore createTransactionStore() throws IOException {
234        return transactionStore;
235    }
236
237    @Override
238    public void deleteAllMessages() throws IOException {
239        for (PersistenceAdapter persistenceAdapter : adapters) {
240            persistenceAdapter.deleteAllMessages();
241        }
242        transactionStore.deleteAllMessages();
243        IOHelper.deleteChildren(getDirectory());
244        for (Object o : destinationMap.get(new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")}))) {
245            if (o instanceof FilteredKahaDBPersistenceAdapter) {
246                FilteredKahaDBPersistenceAdapter filteredKahaDBPersistenceAdapter = (FilteredKahaDBPersistenceAdapter) o;
247                if (filteredKahaDBPersistenceAdapter.getPersistenceAdapter().getDirectory() != DEFAULT_DIRECTORY) {
248                    IOHelper.deleteChildren(filteredKahaDBPersistenceAdapter.getPersistenceAdapter().getDirectory());
249                }
250                if (filteredKahaDBPersistenceAdapter.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
251                    KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) filteredKahaDBPersistenceAdapter.getPersistenceAdapter();
252                    if (kahaDBPersistenceAdapter.getIndexDirectory() != null) {
253                        IOHelper.deleteChildren(kahaDBPersistenceAdapter.getIndexDirectory());
254                    }
255                }
256            }
257        }
258    }
259
260    @Override
261    public Set<ActiveMQDestination> getDestinations() {
262        Set<ActiveMQDestination> results = new HashSet<ActiveMQDestination>();
263        for (PersistenceAdapter persistenceAdapter : adapters) {
264            results.addAll(persistenceAdapter.getDestinations());
265        }
266        return results;
267    }
268
269    @Override
270    public long getLastMessageBrokerSequenceId() throws IOException {
271        long maxId = -1;
272        for (PersistenceAdapter persistenceAdapter : adapters) {
273            maxId = Math.max(maxId, persistenceAdapter.getLastMessageBrokerSequenceId());
274        }
275        return maxId;
276    }
277
278    @Override
279    public long getLastProducerSequenceId(ProducerId id) throws IOException {
280        long maxId = -1;
281        for (PersistenceAdapter persistenceAdapter : adapters) {
282            maxId = Math.max(maxId, persistenceAdapter.getLastProducerSequenceId(id));
283        }
284        return maxId;
285    }
286
287    @Override
288    public void allowIOResumption() {
289        for (PersistenceAdapter persistenceAdapter : adapters) {
290            persistenceAdapter.allowIOResumption();
291        }
292    }
293
294    @Override
295    public void removeQueueMessageStore(ActiveMQQueue destination) {
296        PersistenceAdapter adapter = null;
297        try {
298            adapter = getMatchingPersistenceAdapter(destination);
299        } catch (IOException e) {
300            throw new RuntimeException(e);
301        }
302        if (adapter instanceof PersistenceAdapter && adapter.getDestinations().isEmpty()) {
303            adapter.removeQueueMessageStore(destination);
304            removeMessageStore(adapter, destination);
305            destinationMap.remove(destination, adapter);
306        }
307    }
308
309    @Override
310    public void removeTopicMessageStore(ActiveMQTopic destination) {
311        PersistenceAdapter adapter = null;
312        try {
313            adapter = getMatchingPersistenceAdapter(destination);
314        } catch (IOException e) {
315            throw new RuntimeException(e);
316        }
317        if (adapter instanceof PersistenceAdapter && adapter.getDestinations().isEmpty()) {
318            adapter.removeTopicMessageStore(destination);
319            removeMessageStore(adapter, destination);
320            destinationMap.remove(destination, adapter);
321        }
322    }
323
324    private void removeMessageStore(PersistenceAdapter adapter, ActiveMQDestination destination) {
325        stopAdapter(adapter, destination.toString());
326        File adapterDir = adapter.getDirectory();
327        if (adapterDir != null) {
328            if (IOHelper.deleteFile(adapterDir)) {
329                if (LOG.isTraceEnabled()) {
330                    LOG.info("deleted per destination adapter directory for: " + destination);
331                }
332            } else {
333                if (LOG.isTraceEnabled()) {
334                    LOG.info("failed to deleted per destination adapter directory for: " + destination);
335                }
336            }
337        }
338    }
339
340    @Override
341    public void rollbackTransaction(ConnectionContext context) throws IOException {
342        throw new IllegalStateException();
343    }
344
345    @Override
346    public void setBrokerName(String brokerName) {
347        for (PersistenceAdapter persistenceAdapter : adapters) {
348            persistenceAdapter.setBrokerName(brokerName);
349        }
350    }
351
352    @Override
353    public void setUsageManager(SystemUsage usageManager) {
354        for (PersistenceAdapter persistenceAdapter : adapters) {
355            persistenceAdapter.setUsageManager(usageManager);
356        }
357    }
358
359    @Override
360    public long size() {
361        long size = 0;
362        for (PersistenceAdapter persistenceAdapter : adapters) {
363            size += persistenceAdapter.size();
364        }
365        return size;
366    }
367
368    @Override
369    public void doStart() throws Exception {
370        Object result = destinationMap.chooseValue(matchAll);
371        if (result != null) {
372            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
373            if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
374                findAndRegisterExistingAdapters(filteredAdapter);
375            }
376        }
377        for (PersistenceAdapter persistenceAdapter : adapters) {
378            persistenceAdapter.start();
379        }
380    }
381
382    private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) throws IOException {
383        FileFilter destinationNames = new FileFilter() {
384            @Override
385            public boolean accept(File file) {
386                return file.getName().startsWith("queue#") || file.getName().startsWith("topic#");
387            }
388        };
389        File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames);
390        if (candidates != null) {
391            for (File candidate : candidates) {
392                registerExistingAdapter(template, candidate);
393            }
394        }
395    }
396
397    private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) throws IOException {
398        PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter, candidate.getName());
399        startAdapter(adapter, candidate.getName());
400        Set<ActiveMQDestination> destinations = adapter.getDestinations();
401        if (destinations.size() != 0) {
402            registerAdapter(filteredAdapter, adapter, destinations.toArray(new ActiveMQDestination[]{})[0]);
403        } else {
404            stopAdapter(adapter, candidate.getName());
405        }
406    }
407
408    private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) throws IOException {
409        PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter, nameFromDestinationFilter(destination));
410        return registerAdapter(filteredAdapter, adapter, destination);
411    }
412
413    private PersistenceAdapter adapterFromTemplate(FilteredKahaDBPersistenceAdapter template, String destinationName) throws IOException {
414        PersistenceAdapter adapter = kahaDBFromTemplate(template.getPersistenceAdapter());
415        configureAdapter(adapter);
416        configureDirectory(adapter, destinationName);
417        configureIndexDirectory(adapter, template.getPersistenceAdapter(), destinationName);
418        return adapter;
419    }
420
421    private void configureIndexDirectory(PersistenceAdapter adapter, PersistenceAdapter template, String destinationName) {
422        if (template instanceof KahaDBPersistenceAdapter) {
423            KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) template;
424            if (kahaDBPersistenceAdapter.getIndexDirectory() != null) {
425                if (adapter instanceof KahaDBPersistenceAdapter) {
426                    File directory = kahaDBPersistenceAdapter.getIndexDirectory();
427                    if (destinationName != null) {
428                        directory = new File(directory, destinationName);
429                    }
430                    ((KahaDBPersistenceAdapter)adapter).setIndexDirectory(directory);
431                }
432            }
433        }
434    }
435
436    private void configureDirectory(PersistenceAdapter adapter, String fileName) {
437        File directory = null;
438        File defaultDir = DEFAULT_DIRECTORY;
439        try {
440            defaultDir = adapter.getClass().newInstance().getDirectory();
441        } catch (Exception e) {
442        }
443        if (defaultDir.equals(adapter.getDirectory())) {
444            // not set so inherit from mkahadb
445            directory = getDirectory();
446        } else {
447            directory = adapter.getDirectory();
448        }
449
450        if (fileName != null) {
451            directory = new File(directory, fileName);
452        }
453        adapter.setDirectory(directory);
454    }
455
456    private FilteredKahaDBPersistenceAdapter registerAdapter(FilteredKahaDBPersistenceAdapter template, PersistenceAdapter adapter, ActiveMQDestination destination) {
457        adapters.add(adapter);
458        FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(template, destination, adapter);
459        destinationMap.put(destination, result);
460        return result;
461    }
462
463    private void configureAdapter(PersistenceAdapter adapter) {
464        // need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans
465        ((TransactionIdTransformerAware)adapter).setTransactionIdTransformer(transactionIdTransformer);
466        if (isUseLock()) {
467            if( adapter instanceof Lockable ) {
468                ((Lockable)adapter).setUseLock(false);
469            }
470        }
471        if( adapter instanceof BrokerServiceAware ) {
472            ((BrokerServiceAware)adapter).setBrokerService(getBrokerService());
473        }
474    }
475
476    private PersistenceAdapter kahaDBFromTemplate(PersistenceAdapter template) throws IOException {
477        try {
478            Map<String, Object> configuration = new HashMap<String, Object>();
479            IntrospectionSupport.getProperties(template, configuration, null);
480            PersistenceAdapter adapter = template.getClass().newInstance();
481            IntrospectionSupport.setProperties(adapter, configuration);
482            return adapter;
483        } catch (Exception e) {
484            throw IOExceptionSupport.create(e);
485        }
486    }
487
488    @Override
489    protected void doStop(ServiceStopper stopper) throws Exception {
490        for (PersistenceAdapter persistenceAdapter : adapters) {
491            stopper.stop(persistenceAdapter);
492        }
493    }
494
495    @Override
496    public File getDirectory() {
497        return this.directory;
498    }
499
500    @Override
501    public void setDirectory(File directory) {
502        this.directory = directory;
503    }
504
505    @Override
506    public void init() throws Exception {
507    }
508
509    @Override
510    public void setBrokerService(BrokerService brokerService) {
511        super.setBrokerService(brokerService);
512        for (PersistenceAdapter persistenceAdapter : adapters) {
513            if( persistenceAdapter instanceof BrokerServiceAware ) {
514                ((BrokerServiceAware)persistenceAdapter).setBrokerService(getBrokerService());
515            }
516        }
517    }
518
519    public void setTransactionStore(MultiKahaDBTransactionStore transactionStore) {
520        this.transactionStore = transactionStore;
521    }
522
523    /**
524     * Set the max file length of the transaction journal
525     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
526     * be used
527     *
528     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
529     */
530    public void setJournalMaxFileLength(int maxFileLength) {
531        transactionStore.setJournalMaxFileLength(maxFileLength);
532    }
533
534    public int getJournalMaxFileLength() {
535        return transactionStore.getJournalMaxFileLength();
536    }
537
538    /**
539     * Set the max write batch size of  the transaction journal
540     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
541     * be used
542     *
543     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
544     */
545    public void setJournalWriteBatchSize(int journalWriteBatchSize) {
546        transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize);
547    }
548
549    public int getJournalWriteBatchSize() {
550        return transactionStore.getJournalMaxWriteBatchSize();
551    }
552
553
554    public void setJournalCleanupInterval(long journalCleanupInterval) {
555        transactionStore.setJournalCleanupInterval(journalCleanupInterval);
556    }
557
558    public long getJournalCleanupInterval() {
559        return transactionStore.getJournalCleanupInterval();
560    }
561
562    public void setCheckForCorruption(boolean checkForCorruption) {
563        transactionStore.setCheckForCorruption(checkForCorruption);
564    }
565
566    public boolean isCheckForCorruption() {
567        return transactionStore.isCheckForCorruption();
568    }
569
570    public List<PersistenceAdapter> getAdapters() {
571        return Collections.unmodifiableList(adapters);
572    }
573
574    @Override
575    public String toString() {
576        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
577        return "MultiKahaDBPersistenceAdapter[" + path + "]" + adapters;
578    }
579
580    @Override
581    public Locker createDefaultLocker() throws IOException {
582        SharedFileLocker locker = new SharedFileLocker();
583        locker.configure(this);
584        return locker;
585    }
586
587    @Override
588    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
589        return new JobSchedulerStoreImpl();
590    }
591}