001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.File;
020import java.io.FileFilter;
021import java.io.IOException;
022import java.nio.charset.Charset;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.CopyOnWriteArrayList;
030
031import javax.transaction.xa.Xid;
032
033import org.apache.activemq.broker.BrokerService;
034import org.apache.activemq.broker.BrokerServiceAware;
035import org.apache.activemq.broker.ConnectionContext;
036import org.apache.activemq.broker.Lockable;
037import org.apache.activemq.broker.LockableServiceSupport;
038import org.apache.activemq.broker.Locker;
039import org.apache.activemq.broker.scheduler.JobSchedulerStore;
040import org.apache.activemq.command.ActiveMQDestination;
041import org.apache.activemq.command.ActiveMQQueue;
042import org.apache.activemq.command.ActiveMQTopic;
043import org.apache.activemq.command.LocalTransactionId;
044import org.apache.activemq.command.ProducerId;
045import org.apache.activemq.command.TransactionId;
046import org.apache.activemq.command.XATransactionId;
047import org.apache.activemq.filter.AnyDestination;
048import org.apache.activemq.filter.DestinationMap;
049import org.apache.activemq.filter.DestinationMapEntry;
050import org.apache.activemq.store.MessageStore;
051import org.apache.activemq.store.PersistenceAdapter;
052import org.apache.activemq.store.SharedFileLocker;
053import org.apache.activemq.store.TopicMessageStore;
054import org.apache.activemq.store.TransactionIdTransformer;
055import org.apache.activemq.store.TransactionIdTransformerAware;
056import org.apache.activemq.store.TransactionStore;
057import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
058import org.apache.activemq.usage.StoreUsage;
059import org.apache.activemq.usage.SystemUsage;
060import org.apache.activemq.util.IOExceptionSupport;
061import org.apache.activemq.util.IOHelper;
062import org.apache.activemq.util.IntrospectionSupport;
063import org.apache.activemq.util.ServiceStopper;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067import static org.apache.activemq.store.kahadb.MessageDatabase.DEFAULT_DIRECTORY;
068
069/**
070 * An implementation of {@link org.apache.activemq.store.PersistenceAdapter}  that supports
071 * distribution of destinations across multiple kahaDB persistence adapters
072 *
073 * @org.apache.xbean.XBean element="mKahaDB"
074 */
075public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, BrokerServiceAware {
076    static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBPersistenceAdapter.class);
077
078    final static ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
079    final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616"));
080
081    final class DelegateDestinationMap extends DestinationMap {
082        @Override
083        public void setEntries(List<DestinationMapEntry>  entries) {
084            super.setEntries(entries);
085        }
086    };
087    final DelegateDestinationMap destinationMap = new DelegateDestinationMap();
088
089    List<PersistenceAdapter> adapters = new CopyOnWriteArrayList<PersistenceAdapter>();
090    private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB");
091
092    MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this);
093
094    // all local store transactions are XA, 2pc if more than one adapter involved
095    TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() {
096        @Override
097        public TransactionId transform(TransactionId txid) {
098            if (txid == null) {
099                return null;
100            }
101            if (txid.isLocalTransaction()) {
102                final LocalTransactionId t = (LocalTransactionId) txid;
103                return new XATransactionId(new Xid() {
104                    @Override
105                    public int getFormatId() {
106                        return LOCAL_FORMAT_ID_MAGIC;
107                    }
108
109                    @Override
110                    public byte[] getGlobalTransactionId() {
111                        return t.getConnectionId().getValue().getBytes(Charset.forName("utf-8"));
112                    }
113
114                    @Override
115                    public byte[] getBranchQualifier() {
116                        return Long.toString(t.getValue()).getBytes(Charset.forName("utf-8"));
117                    }
118                });
119            } else {
120                return txid;
121            }
122        }
123    };
124
125    /**
126     * Sets the  FilteredKahaDBPersistenceAdapter entries
127     *
128     * @org.apache.xbean.ElementType class="org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter"
129     */
130    @SuppressWarnings({ "rawtypes", "unchecked" })
131    public void setFilteredPersistenceAdapters(List entries) {
132        for (Object entry : entries) {
133            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) entry;
134            PersistenceAdapter adapter = filteredAdapter.getPersistenceAdapter();
135            if (filteredAdapter.getDestination() == null) {
136                filteredAdapter.setDestination(matchAll);
137            }
138
139            if (filteredAdapter.isPerDestination()) {
140                configureDirectory(adapter, null);
141                // per destination adapters will be created on demand or during recovery
142                continue;
143            } else {
144                configureDirectory(adapter, nameFromDestinationFilter(filteredAdapter.getDestination()));
145            }
146
147            configureAdapter(adapter);
148            adapters.add(adapter);
149        }
150        destinationMap.setEntries(entries);
151    }
152
153    public static String nameFromDestinationFilter(ActiveMQDestination destination) {
154        if (destination.getQualifiedName().length() > IOHelper.getMaxFileNameLength()) {
155            LOG.warn("Destination name is longer than 'MaximumFileNameLength' system property, " +
156                     "potential problem with recovery can result from name truncation.");
157        }
158
159        return IOHelper.toFileSystemSafeName(destination.getQualifiedName());
160    }
161
162    public boolean isLocalXid(TransactionId xid) {
163        return xid instanceof XATransactionId &&
164                ((XATransactionId)xid).getFormatId() == LOCAL_FORMAT_ID_MAGIC;
165    }
166
167    @Override
168    public void beginTransaction(ConnectionContext context) throws IOException {
169        throw new IllegalStateException();
170    }
171
172    @Override
173    public void checkpoint(final boolean cleanup) throws IOException {
174        for (PersistenceAdapter persistenceAdapter : adapters) {
175            persistenceAdapter.checkpoint(cleanup);
176        }
177    }
178
179    @Override
180    public void commitTransaction(ConnectionContext context) throws IOException {
181        throw new IllegalStateException();
182    }
183
184    @Override
185    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
186        PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
187        return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createQueueMessageStore(destination));
188    }
189
190    private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination) throws IOException {
191        Object result = destinationMap.chooseValue(destination);
192        if (result == null) {
193            throw new RuntimeException("No matching persistence adapter configured for destination: " + destination + ", options:" + adapters);
194        }
195        FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
196        if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
197            filteredAdapter = addAdapter(filteredAdapter, destination);
198            if (LOG.isTraceEnabled()) {
199                LOG.info("created per destination adapter for: " + destination  + ", " + result);
200            }
201        }
202        startAdapter(filteredAdapter.getPersistenceAdapter(), destination.getQualifiedName());
203        LOG.debug("destination {} matched persistence adapter {}", new Object[]{destination.getQualifiedName(), filteredAdapter.getPersistenceAdapter()});
204        return filteredAdapter.getPersistenceAdapter();
205    }
206
207    private void startAdapter(PersistenceAdapter kahaDBPersistenceAdapter, String destination) {
208        try {
209            kahaDBPersistenceAdapter.start();
210        } catch (Exception e) {
211            RuntimeException detail = new RuntimeException("Failed to start per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
212            LOG.error(detail.toString(), e);
213            throw detail;
214        }
215    }
216
217    private void stopAdapter(PersistenceAdapter kahaDBPersistenceAdapter, String destination) {
218        try {
219            kahaDBPersistenceAdapter.stop();
220        } catch (Exception e) {
221            RuntimeException detail = new RuntimeException("Failed to stop per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
222            LOG.error(detail.toString(), e);
223            throw detail;
224        }
225    }
226
227    @Override
228    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
229        PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
230        return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination));
231    }
232
233    @Override
234    public TransactionStore createTransactionStore() throws IOException {
235        return transactionStore;
236    }
237
238    @Override
239    public void deleteAllMessages() throws IOException {
240        for (PersistenceAdapter persistenceAdapter : adapters) {
241            persistenceAdapter.deleteAllMessages();
242        }
243        transactionStore.deleteAllMessages();
244        IOHelper.deleteChildren(getDirectory());
245        for (Object o : destinationMap.get(new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")}))) {
246            if (o instanceof FilteredKahaDBPersistenceAdapter) {
247                FilteredKahaDBPersistenceAdapter filteredKahaDBPersistenceAdapter = (FilteredKahaDBPersistenceAdapter) o;
248                if (filteredKahaDBPersistenceAdapter.getPersistenceAdapter().getDirectory() != DEFAULT_DIRECTORY) {
249                    IOHelper.deleteChildren(filteredKahaDBPersistenceAdapter.getPersistenceAdapter().getDirectory());
250                }
251                if (filteredKahaDBPersistenceAdapter.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
252                    KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) filteredKahaDBPersistenceAdapter.getPersistenceAdapter();
253                    if (kahaDBPersistenceAdapter.getIndexDirectory() != null) {
254                        IOHelper.deleteChildren(kahaDBPersistenceAdapter.getIndexDirectory());
255                    }
256                }
257            }
258        }
259    }
260
261    @Override
262    public Set<ActiveMQDestination> getDestinations() {
263        Set<ActiveMQDestination> results = new HashSet<ActiveMQDestination>();
264        for (PersistenceAdapter persistenceAdapter : adapters) {
265            results.addAll(persistenceAdapter.getDestinations());
266        }
267        return results;
268    }
269
270    @Override
271    public long getLastMessageBrokerSequenceId() throws IOException {
272        long maxId = -1;
273        for (PersistenceAdapter persistenceAdapter : adapters) {
274            maxId = Math.max(maxId, persistenceAdapter.getLastMessageBrokerSequenceId());
275        }
276        return maxId;
277    }
278
279    @Override
280    public long getLastProducerSequenceId(ProducerId id) throws IOException {
281        long maxId = -1;
282        for (PersistenceAdapter persistenceAdapter : adapters) {
283            maxId = Math.max(maxId, persistenceAdapter.getLastProducerSequenceId(id));
284        }
285        return maxId;
286    }
287
288    @Override
289    public void allowIOResumption() {
290        for (PersistenceAdapter persistenceAdapter : adapters) {
291            persistenceAdapter.allowIOResumption();
292        }
293    }
294
295    @Override
296    public void removeQueueMessageStore(ActiveMQQueue destination) {
297        PersistenceAdapter adapter = null;
298        try {
299            adapter = getMatchingPersistenceAdapter(destination);
300        } catch (IOException e) {
301            throw new RuntimeException(e);
302        }
303        if (adapter instanceof PersistenceAdapter && adapter.getDestinations().isEmpty()) {
304            adapter.removeQueueMessageStore(destination);
305            removeMessageStore(adapter, destination);
306            destinationMap.remove(destination, adapter);
307        }
308    }
309
310    @Override
311    public void removeTopicMessageStore(ActiveMQTopic destination) {
312        PersistenceAdapter adapter = null;
313        try {
314            adapter = getMatchingPersistenceAdapter(destination);
315        } catch (IOException e) {
316            throw new RuntimeException(e);
317        }
318        if (adapter instanceof PersistenceAdapter && adapter.getDestinations().isEmpty()) {
319            adapter.removeTopicMessageStore(destination);
320            removeMessageStore(adapter, destination);
321            destinationMap.remove(destination, adapter);
322        }
323    }
324
325    private void removeMessageStore(PersistenceAdapter adapter, ActiveMQDestination destination) {
326        stopAdapter(adapter, destination.toString());
327        File adapterDir = adapter.getDirectory();
328        if (adapterDir != null) {
329            if (IOHelper.deleteFile(adapterDir)) {
330                if (LOG.isTraceEnabled()) {
331                    LOG.info("deleted per destination adapter directory for: " + destination);
332                }
333            } else {
334                if (LOG.isTraceEnabled()) {
335                    LOG.info("failed to deleted per destination adapter directory for: " + destination);
336                }
337            }
338        }
339    }
340
341    @Override
342    public void rollbackTransaction(ConnectionContext context) throws IOException {
343        throw new IllegalStateException();
344    }
345
346    @Override
347    public void setBrokerName(String brokerName) {
348        for (PersistenceAdapter persistenceAdapter : adapters) {
349            persistenceAdapter.setBrokerName(brokerName);
350        }
351    }
352
353    @Override
354    public void setUsageManager(SystemUsage usageManager) {
355        for (PersistenceAdapter persistenceAdapter : adapters) {
356            persistenceAdapter.setUsageManager(usageManager);
357        }
358    }
359
360    @Override
361    public long size() {
362        long size = 0;
363        for (PersistenceAdapter persistenceAdapter : adapters) {
364            size += persistenceAdapter.size();
365        }
366        return size;
367    }
368
369    @Override
370    public void doStart() throws Exception {
371        Object result = destinationMap.chooseValue(matchAll);
372        if (result != null) {
373            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
374            if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
375                findAndRegisterExistingAdapters(filteredAdapter);
376            }
377        }
378        for (PersistenceAdapter persistenceAdapter : adapters) {
379            persistenceAdapter.start();
380        }
381    }
382
383    private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) throws IOException {
384        FileFilter destinationNames = new FileFilter() {
385            @Override
386            public boolean accept(File file) {
387                return file.getName().startsWith("queue#") || file.getName().startsWith("topic#");
388            }
389        };
390        File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames);
391        if (candidates != null) {
392            for (File candidate : candidates) {
393                registerExistingAdapter(template, candidate);
394            }
395        }
396    }
397
398    private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) throws IOException {
399        PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter, candidate.getName());
400        startAdapter(adapter, candidate.getName());
401        Set<ActiveMQDestination> destinations = adapter.getDestinations();
402        if (destinations.size() != 0) {
403            registerAdapter(filteredAdapter, adapter, destinations.toArray(new ActiveMQDestination[]{})[0]);
404        } else {
405            stopAdapter(adapter, candidate.getName());
406        }
407    }
408
409    private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) throws IOException {
410        PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter, nameFromDestinationFilter(destination));
411        return registerAdapter(filteredAdapter, adapter, destination);
412    }
413
414    private PersistenceAdapter adapterFromTemplate(FilteredKahaDBPersistenceAdapter template, String destinationName) throws IOException {
415        PersistenceAdapter adapter = kahaDBFromTemplate(template.getPersistenceAdapter());
416        configureAdapter(adapter);
417        configureDirectory(adapter, destinationName);
418        configureIndexDirectory(adapter, template.getPersistenceAdapter(), destinationName);
419        return adapter;
420    }
421
422    private void configureIndexDirectory(PersistenceAdapter adapter, PersistenceAdapter template, String destinationName) {
423        if (template instanceof KahaDBPersistenceAdapter) {
424            KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) template;
425            if (kahaDBPersistenceAdapter.getIndexDirectory() != null) {
426                if (adapter instanceof KahaDBPersistenceAdapter) {
427                    File directory = kahaDBPersistenceAdapter.getIndexDirectory();
428                    if (destinationName != null) {
429                        directory = new File(directory, destinationName);
430                    }
431                    ((KahaDBPersistenceAdapter)adapter).setIndexDirectory(directory);
432                }
433            }
434        }
435    }
436
437    private void configureDirectory(PersistenceAdapter adapter, String fileName) {
438        File directory = null;
439        File defaultDir = DEFAULT_DIRECTORY;
440        try {
441            defaultDir = adapter.getClass().newInstance().getDirectory();
442        } catch (Exception e) {
443        }
444        if (defaultDir.equals(adapter.getDirectory())) {
445            // not set so inherit from mkahadb
446            directory = getDirectory();
447        } else {
448            directory = adapter.getDirectory();
449        }
450
451        if (fileName != null) {
452            directory = new File(directory, fileName);
453        }
454        adapter.setDirectory(directory);
455    }
456
457    private FilteredKahaDBPersistenceAdapter registerAdapter(FilteredKahaDBPersistenceAdapter template, PersistenceAdapter adapter, ActiveMQDestination destination) {
458        adapters.add(adapter);
459        FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(template, destination, adapter);
460        destinationMap.put(destination, result);
461        return result;
462    }
463
464    private void configureAdapter(PersistenceAdapter adapter) {
465        // need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans
466        ((TransactionIdTransformerAware)adapter).setTransactionIdTransformer(transactionIdTransformer);
467        if (isUseLock()) {
468            if( adapter instanceof Lockable ) {
469                ((Lockable)adapter).setUseLock(false);
470            }
471        }
472        if( adapter instanceof BrokerServiceAware ) {
473            ((BrokerServiceAware)adapter).setBrokerService(getBrokerService());
474        }
475    }
476
477    private PersistenceAdapter kahaDBFromTemplate(PersistenceAdapter template) throws IOException {
478        try {
479            Map<String, Object> configuration = new HashMap<String, Object>();
480            IntrospectionSupport.getProperties(template, configuration, null);
481            PersistenceAdapter adapter = template.getClass().newInstance();
482            IntrospectionSupport.setProperties(adapter, configuration);
483            return adapter;
484        } catch (Exception e) {
485            throw IOExceptionSupport.create(e);
486        }
487    }
488
489    @Override
490    protected void doStop(ServiceStopper stopper) throws Exception {
491        for (PersistenceAdapter persistenceAdapter : adapters) {
492            stopper.stop(persistenceAdapter);
493        }
494    }
495
496    @Override
497    public File getDirectory() {
498        return this.directory;
499    }
500
501    @Override
502    public void setDirectory(File directory) {
503        this.directory = directory;
504    }
505
506    @Override
507    public void init() throws Exception {
508    }
509
510    @Override
511    public void setBrokerService(BrokerService brokerService) {
512        super.setBrokerService(brokerService);
513        for (PersistenceAdapter persistenceAdapter : adapters) {
514            if( persistenceAdapter instanceof BrokerServiceAware ) {
515                ((BrokerServiceAware)persistenceAdapter).setBrokerService(getBrokerService());
516            }
517        }
518    }
519
520    public void setTransactionStore(MultiKahaDBTransactionStore transactionStore) {
521        this.transactionStore = transactionStore;
522    }
523
524    /**
525     * Set the max file length of the transaction journal
526     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
527     * be used
528     *
529     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
530     */
531    public void setJournalMaxFileLength(int maxFileLength) {
532        transactionStore.setJournalMaxFileLength(maxFileLength);
533    }
534
535    public int getJournalMaxFileLength() {
536        return transactionStore.getJournalMaxFileLength();
537    }
538
539    /**
540     * Set the max write batch size of  the transaction journal
541     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
542     * be used
543     *
544     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
545     */
546    public void setJournalWriteBatchSize(int journalWriteBatchSize) {
547        transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize);
548    }
549
550    public int getJournalWriteBatchSize() {
551        return transactionStore.getJournalMaxWriteBatchSize();
552    }
553
554    public List<PersistenceAdapter> getAdapters() {
555        return Collections.unmodifiableList(adapters);
556    }
557
558    @Override
559    public String toString() {
560        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
561        return "MultiKahaDBPersistenceAdapter[" + path + "]" + adapters;
562    }
563
564    @Override
565    public Locker createDefaultLocker() throws IOException {
566        SharedFileLocker locker = new SharedFileLocker();
567        locker.configure(this);
568        return locker;
569    }
570
571    @Override
572    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
573        return new JobSchedulerStoreImpl();
574    }
575}