001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.console.command.store; 018 019import java.io.BufferedOutputStream; 020import java.io.File; 021import java.io.FileOutputStream; 022import java.io.IOException; 023import java.net.URI; 024import java.net.URISyntaxException; 025import java.util.HashMap; 026 027import org.apache.activemq.broker.BrokerFactory; 028import org.apache.activemq.broker.BrokerService; 029import org.apache.activemq.command.ActiveMQDestination; 030import org.apache.activemq.command.ActiveMQQueue; 031import org.apache.activemq.command.ActiveMQTopic; 032import org.apache.activemq.command.Message; 033import org.apache.activemq.command.MessageAck; 034import org.apache.activemq.command.MessageId; 035import org.apache.activemq.command.SubscriptionInfo; 036import org.apache.activemq.command.XATransactionId; 037import org.apache.activemq.console.command.store.proto.MessagePB; 038import org.apache.activemq.console.command.store.proto.QueueEntryPB; 039import org.apache.activemq.console.command.store.proto.QueuePB; 040import org.apache.activemq.openwire.OpenWireFormat; 041import org.apache.activemq.store.MessageRecoveryListener; 042import org.apache.activemq.store.MessageStore; 043import org.apache.activemq.store.PersistenceAdapter; 044import org.apache.activemq.store.TopicMessageStore; 045import org.apache.activemq.store.TransactionRecoveryListener; 046import com.fasterxml.jackson.databind.ObjectMapper; 047import org.fusesource.hawtbuf.AsciiBuffer; 048import org.fusesource.hawtbuf.DataByteArrayOutputStream; 049import org.fusesource.hawtbuf.UTF8Buffer; 050 051/** 052 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 053 */ 054public class StoreExporter { 055 056 static final int OPENWIRE_VERSION = 8; 057 static final boolean TIGHT_ENCODING = false; 058 059 URI config; 060 File file; 061 062 private final ObjectMapper mapper = new ObjectMapper(); 063 private final AsciiBuffer ds_kind = new AsciiBuffer("ds"); 064 private final AsciiBuffer ptp_kind = new AsciiBuffer("ptp"); 065 private final AsciiBuffer codec_id = new AsciiBuffer("openwire"); 066 private final OpenWireFormat wireformat = new OpenWireFormat(); 067 068 public StoreExporter() throws URISyntaxException { 069 config = new URI("xbean:activemq.xml"); 070 wireformat.setCacheEnabled(false); 071 wireformat.setTightEncodingEnabled(TIGHT_ENCODING); 072 wireformat.setVersion(OPENWIRE_VERSION); 073 } 074 075 public void execute() throws Exception { 076 if (config == null) { 077 throw new Exception("required --config option missing"); 078 } 079 if (file == null) { 080 throw new Exception("required --file option missing"); 081 } 082 System.out.println("Loading: " + config); 083 BrokerFactory.setStartDefault(false); // to avoid the broker auto-starting.. 084 BrokerService broker = BrokerFactory.createBroker(config); 085 BrokerFactory.resetStartDefault(); 086 PersistenceAdapter store = broker.getPersistenceAdapter(); 087 System.out.println("Starting: " + store); 088 store.start(); 089 try { 090 BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(file)); 091 try { 092 export(store, fos); 093 } finally { 094 fos.close(); 095 } 096 } finally { 097 store.stop(); 098 } 099 } 100 101 void export(PersistenceAdapter store, BufferedOutputStream fos) throws Exception { 102 103 104 final long[] messageKeyCounter = new long[]{0}; 105 final long[] containerKeyCounter = new long[]{0}; 106 final ExportStreamManager manager = new ExportStreamManager(fos, 1); 107 108 109 final int[] preparedTxs = new int[]{0}; 110 store.createTransactionStore().recover(new TransactionRecoveryListener() { 111 @Override 112 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) { 113 preparedTxs[0] += 1; 114 } 115 }); 116 117 if (preparedTxs[0] > 0) { 118 throw new Exception("Cannot export a store with prepared XA transactions. Please commit or rollback those transactions before attempting to export."); 119 } 120 121 for (ActiveMQDestination odest : store.getDestinations()) { 122 containerKeyCounter[0]++; 123 if (odest instanceof ActiveMQQueue) { 124 ActiveMQQueue dest = (ActiveMQQueue) odest; 125 MessageStore queue = store.createQueueMessageStore(dest); 126 127 QueuePB.Bean destRecord = new QueuePB.Bean(); 128 destRecord.setKey(containerKeyCounter[0]); 129 destRecord.setBindingKind(ptp_kind); 130 131 final long[] seqKeyCounter = new long[]{0}; 132 133 HashMap<String, Object> jsonMap = new HashMap<String, Object>(); 134 jsonMap.put("@class", "queue_destination"); 135 jsonMap.put("name", dest.getQueueName()); 136 String json = mapper.writeValueAsString(jsonMap); 137 System.out.println(json); 138 destRecord.setBindingData(new UTF8Buffer(json)); 139 manager.store_queue(destRecord); 140 141 queue.recover(new MessageRecoveryListener() { 142 @Override 143 public boolean hasSpace() { 144 return true; 145 } 146 147 @Override 148 public boolean recoverMessageReference(MessageId ref) throws Exception { 149 return true; 150 } 151 152 @Override 153 public boolean isDuplicate(MessageId ref) { 154 return false; 155 } 156 157 @Override 158 public boolean recoverMessage(Message message) throws IOException { 159 messageKeyCounter[0]++; 160 seqKeyCounter[0]++; 161 162 MessagePB.Bean messageRecord = createMessagePB(message, messageKeyCounter[0]); 163 manager.store_message(messageRecord); 164 165 QueueEntryPB.Bean entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]); 166 manager.store_queue_entry(entryRecord); 167 168 return true; 169 } 170 }); 171 172 } else if (odest instanceof ActiveMQTopic) { 173 ActiveMQTopic dest = (ActiveMQTopic) odest; 174 175 TopicMessageStore topic = store.createTopicMessageStore(dest); 176 for (SubscriptionInfo sub : topic.getAllSubscriptions()) { 177 178 QueuePB.Bean destRecord = new QueuePB.Bean(); 179 destRecord.setKey(containerKeyCounter[0]); 180 destRecord.setBindingKind(ds_kind); 181 182 // TODO: use a real JSON encoder like jackson. 183 HashMap<String, Object> jsonMap = new HashMap<String, Object>(); 184 jsonMap.put("@class", "dsub_destination"); 185 jsonMap.put("name", sub.getClientId() + ":" + sub.getSubscriptionName()); 186 HashMap<String, Object> jsonTopic = new HashMap<String, Object>(); 187 jsonTopic.put("name", dest.getTopicName()); 188 jsonMap.put("topics", new Object[]{jsonTopic}); 189 if (sub.getSelector() != null) { 190 jsonMap.put("selector", sub.getSelector()); 191 } 192 String json = mapper.writeValueAsString(jsonMap); 193 System.out.println(json); 194 195 destRecord.setBindingData(new UTF8Buffer(json)); 196 manager.store_queue(destRecord); 197 198 final long seqKeyCounter[] = new long[]{0}; 199 topic.recoverSubscription(sub.getClientId(), sub.getSubscriptionName(), new MessageRecoveryListener() { 200 @Override 201 public boolean hasSpace() { 202 return true; 203 } 204 205 @Override 206 public boolean recoverMessageReference(MessageId ref) throws Exception { 207 return true; 208 } 209 210 @Override 211 public boolean isDuplicate(MessageId ref) { 212 return false; 213 } 214 215 @Override 216 public boolean recoverMessage(Message message) throws IOException { 217 messageKeyCounter[0]++; 218 seqKeyCounter[0]++; 219 220 MessagePB.Bean messageRecord = createMessagePB(message, messageKeyCounter[0]); 221 manager.store_message(messageRecord); 222 223 QueueEntryPB.Bean entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]); 224 manager.store_queue_entry(entryRecord); 225 return true; 226 } 227 }); 228 229 } 230 } 231 } 232 manager.finish(); 233 } 234 235 private QueueEntryPB.Bean createQueueEntryPB(Message message, long queueKey, long queueSeq, long messageKey) { 236 QueueEntryPB.Bean entryRecord = new QueueEntryPB.Bean(); 237 entryRecord.setQueueKey(queueKey); 238 entryRecord.setQueueSeq(queueSeq); 239 entryRecord.setMessageKey(messageKey); 240 entryRecord.setSize(message.getSize()); 241 if (message.getExpiration() != 0) { 242 entryRecord.setExpiration(message.getExpiration()); 243 } 244 if (message.getRedeliveryCounter() != 0) { 245 entryRecord.setRedeliveries(message.getRedeliveryCounter()); 246 } 247 return entryRecord; 248 } 249 250 private MessagePB.Bean createMessagePB(Message message, long messageKey) throws IOException { 251 DataByteArrayOutputStream mos = new DataByteArrayOutputStream(); 252 mos.writeBoolean(TIGHT_ENCODING); 253 mos.writeVarInt(OPENWIRE_VERSION); 254 wireformat.marshal(message, mos); 255 256 MessagePB.Bean messageRecord = new MessagePB.Bean(); 257 messageRecord.setCodec(codec_id); 258 messageRecord.setMessageKey(messageKey); 259 messageRecord.setSize(message.getSize()); 260 messageRecord.setValue(mos.toBuffer()); 261 return messageRecord; 262 } 263 264 public File getFile() { 265 return file; 266 } 267 268 public void setFile(String file) { 269 setFile(new File(file)); 270 } 271 272 public void setFile(File file) { 273 this.file = file; 274 } 275 276 public URI getConfig() { 277 return config; 278 } 279 280 public void setConfig(URI config) { 281 this.config = config; 282 } 283}