001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.console.command.store;
018
019import java.io.BufferedOutputStream;
020import java.io.File;
021import java.io.FileOutputStream;
022import java.io.IOException;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.util.HashMap;
026
027import org.apache.activemq.broker.BrokerFactory;
028import org.apache.activemq.broker.BrokerService;
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.ActiveMQQueue;
031import org.apache.activemq.command.ActiveMQTopic;
032import org.apache.activemq.command.Message;
033import org.apache.activemq.command.MessageAck;
034import org.apache.activemq.command.MessageId;
035import org.apache.activemq.command.SubscriptionInfo;
036import org.apache.activemq.command.XATransactionId;
037import org.apache.activemq.console.command.store.proto.MessagePB;
038import org.apache.activemq.console.command.store.proto.QueueEntryPB;
039import org.apache.activemq.console.command.store.proto.QueuePB;
040import org.apache.activemq.openwire.OpenWireFormat;
041import org.apache.activemq.store.MessageRecoveryListener;
042import org.apache.activemq.store.MessageStore;
043import org.apache.activemq.store.PersistenceAdapter;
044import org.apache.activemq.store.TopicMessageStore;
045import org.apache.activemq.store.TransactionRecoveryListener;
046import com.fasterxml.jackson.databind.ObjectMapper;
047import org.fusesource.hawtbuf.AsciiBuffer;
048import org.fusesource.hawtbuf.DataByteArrayOutputStream;
049import org.fusesource.hawtbuf.UTF8Buffer;
050
051/**
052 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
053 */
054public class StoreExporter {
055
056    static final int OPENWIRE_VERSION = 8;
057    static final boolean TIGHT_ENCODING = false;
058
059    URI config;
060    File file;
061
062    private final ObjectMapper mapper = new ObjectMapper();
063    private final AsciiBuffer ds_kind = new AsciiBuffer("ds");
064    private final AsciiBuffer ptp_kind = new AsciiBuffer("ptp");
065    private final AsciiBuffer codec_id = new AsciiBuffer("openwire");
066    private final OpenWireFormat wireformat = new OpenWireFormat();
067
068    public StoreExporter() throws URISyntaxException {
069        config = new URI("xbean:activemq.xml");
070        wireformat.setCacheEnabled(false);
071        wireformat.setTightEncodingEnabled(TIGHT_ENCODING);
072        wireformat.setVersion(OPENWIRE_VERSION);
073    }
074
075    public void execute() throws Exception {
076        if (config == null) {
077            throw new Exception("required --config option missing");
078        }
079        if (file == null) {
080            throw new Exception("required --file option missing");
081        }
082        System.out.println("Loading: " + config);
083        BrokerFactory.setStartDefault(false); // to avoid the broker auto-starting..
084        BrokerService broker = BrokerFactory.createBroker(config);
085        BrokerFactory.resetStartDefault();
086        PersistenceAdapter store = broker.getPersistenceAdapter();
087        System.out.println("Starting: " + store);
088        store.start();
089        try {
090            BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(file));
091            try {
092                export(store, fos);
093            } finally {
094                fos.close();
095            }
096        } finally {
097            store.stop();
098        }
099    }
100
101    void export(PersistenceAdapter store, BufferedOutputStream fos) throws Exception {
102
103
104        final long[] messageKeyCounter = new long[]{0};
105        final long[] containerKeyCounter = new long[]{0};
106        final ExportStreamManager manager = new ExportStreamManager(fos, 1);
107
108
109        final int[] preparedTxs = new int[]{0};
110        store.createTransactionStore().recover(new TransactionRecoveryListener() {
111            @Override
112            public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
113                preparedTxs[0] += 1;
114            }
115        });
116
117        if (preparedTxs[0] > 0) {
118            throw new Exception("Cannot export a store with prepared XA transactions.  Please commit or rollback those transactions before attempting to export.");
119        }
120
121        for (ActiveMQDestination odest : store.getDestinations()) {
122            containerKeyCounter[0]++;
123            if (odest instanceof ActiveMQQueue) {
124                ActiveMQQueue dest = (ActiveMQQueue) odest;
125                MessageStore queue = store.createQueueMessageStore(dest);
126
127                QueuePB.Bean destRecord = new QueuePB.Bean();
128                destRecord.setKey(containerKeyCounter[0]);
129                destRecord.setBindingKind(ptp_kind);
130
131                final long[] seqKeyCounter = new long[]{0};
132
133                HashMap<String, Object> jsonMap = new HashMap<String, Object>();
134                jsonMap.put("@class", "queue_destination");
135                jsonMap.put("name", dest.getQueueName());
136                String json = mapper.writeValueAsString(jsonMap);
137                System.out.println(json);
138                destRecord.setBindingData(new UTF8Buffer(json));
139                manager.store_queue(destRecord);
140
141                queue.recover(new MessageRecoveryListener() {
142                    @Override
143                    public boolean hasSpace() {
144                        return true;
145                    }
146
147                    @Override
148                    public boolean recoverMessageReference(MessageId ref) throws Exception {
149                        return true;
150                    }
151
152                    @Override
153                    public boolean isDuplicate(MessageId ref) {
154                        return false;
155                    }
156
157                    @Override
158                    public boolean recoverMessage(Message message) throws IOException {
159                        messageKeyCounter[0]++;
160                        seqKeyCounter[0]++;
161
162                        MessagePB.Bean messageRecord = createMessagePB(message, messageKeyCounter[0]);
163                        manager.store_message(messageRecord);
164
165                        QueueEntryPB.Bean entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]);
166                        manager.store_queue_entry(entryRecord);
167
168                        return true;
169                    }
170                });
171
172            } else if (odest instanceof ActiveMQTopic) {
173                ActiveMQTopic dest = (ActiveMQTopic) odest;
174
175                TopicMessageStore topic = store.createTopicMessageStore(dest);
176                for (SubscriptionInfo sub : topic.getAllSubscriptions()) {
177
178                    QueuePB.Bean destRecord = new QueuePB.Bean();
179                    destRecord.setKey(containerKeyCounter[0]);
180                    destRecord.setBindingKind(ds_kind);
181
182                    // TODO: use a real JSON encoder like jackson.
183                    HashMap<String, Object> jsonMap = new HashMap<String, Object>();
184                    jsonMap.put("@class", "dsub_destination");
185                    jsonMap.put("name", sub.getClientId() + ":" + sub.getSubscriptionName());
186                    HashMap<String, Object> jsonTopic = new HashMap<String, Object>();
187                    jsonTopic.put("name", dest.getTopicName());
188                    jsonMap.put("topics", new Object[]{jsonTopic});
189                    if (sub.getSelector() != null) {
190                        jsonMap.put("selector", sub.getSelector());
191                    }
192                    String json = mapper.writeValueAsString(jsonMap);
193                    System.out.println(json);
194
195                    destRecord.setBindingData(new UTF8Buffer(json));
196                    manager.store_queue(destRecord);
197
198                    final long seqKeyCounter[] = new long[]{0};
199                    topic.recoverSubscription(sub.getClientId(), sub.getSubscriptionName(), new MessageRecoveryListener() {
200                        @Override
201                        public boolean hasSpace() {
202                            return true;
203                        }
204
205                        @Override
206                        public boolean recoverMessageReference(MessageId ref) throws Exception {
207                            return true;
208                        }
209
210                        @Override
211                        public boolean isDuplicate(MessageId ref) {
212                            return false;
213                        }
214
215                        @Override
216                        public boolean recoverMessage(Message message) throws IOException {
217                            messageKeyCounter[0]++;
218                            seqKeyCounter[0]++;
219
220                            MessagePB.Bean messageRecord = createMessagePB(message, messageKeyCounter[0]);
221                            manager.store_message(messageRecord);
222
223                            QueueEntryPB.Bean entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]);
224                            manager.store_queue_entry(entryRecord);
225                            return true;
226                        }
227                    });
228
229                }
230            }
231        }
232        manager.finish();
233    }
234
235    private QueueEntryPB.Bean createQueueEntryPB(Message message, long queueKey, long queueSeq, long messageKey) {
236        QueueEntryPB.Bean entryRecord = new QueueEntryPB.Bean();
237        entryRecord.setQueueKey(queueKey);
238        entryRecord.setQueueSeq(queueSeq);
239        entryRecord.setMessageKey(messageKey);
240        entryRecord.setSize(message.getSize());
241        if (message.getExpiration() != 0) {
242            entryRecord.setExpiration(message.getExpiration());
243        }
244        if (message.getRedeliveryCounter() != 0) {
245            entryRecord.setRedeliveries(message.getRedeliveryCounter());
246        }
247        return entryRecord;
248    }
249
250    private MessagePB.Bean createMessagePB(Message message, long messageKey) throws IOException {
251        DataByteArrayOutputStream mos = new DataByteArrayOutputStream();
252        mos.writeBoolean(TIGHT_ENCODING);
253        mos.writeVarInt(OPENWIRE_VERSION);
254        wireformat.marshal(message, mos);
255
256        MessagePB.Bean messageRecord = new MessagePB.Bean();
257        messageRecord.setCodec(codec_id);
258        messageRecord.setMessageKey(messageKey);
259        messageRecord.setSize(message.getSize());
260        messageRecord.setValue(mos.toBuffer());
261        return messageRecord;
262    }
263
264    public File getFile() {
265        return file;
266    }
267
268    public void setFile(String file) {
269        setFile(new File(file));
270    }
271
272    public void setFile(File file) {
273        this.file = file;
274    }
275
276    public URI getConfig() {
277        return config;
278    }
279
280    public void setConfig(URI config) {
281        this.config = config;
282    }
283}