001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Iterator;
022import java.util.LinkedList;
023import java.util.List;
024import java.util.concurrent.atomic.AtomicBoolean;
025import java.util.concurrent.atomic.AtomicLong;
026
027import org.apache.activemq.broker.Broker;
028import org.apache.activemq.broker.ConnectionContext;
029import org.apache.activemq.broker.region.Destination;
030import org.apache.activemq.broker.region.IndirectMessageReference;
031import org.apache.activemq.broker.region.MessageReference;
032import org.apache.activemq.broker.region.QueueMessageReference;
033import org.apache.activemq.command.Message;
034import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
035import org.apache.activemq.openwire.OpenWireFormat;
036import org.apache.activemq.store.PList;
037import org.apache.activemq.store.PListEntry;
038import org.apache.activemq.store.PListStore;
039import org.apache.activemq.usage.SystemUsage;
040import org.apache.activemq.usage.Usage;
041import org.apache.activemq.usage.UsageListener;
042import org.apache.activemq.util.ByteSequence;
043import org.apache.activemq.wireformat.WireFormat;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * persist pending messages pending message (messages awaiting dispatch to a
049 * consumer) cursor
050 */
051public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
052
053    static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class);
054
055    private static final AtomicLong NAME_COUNT = new AtomicLong();
056
057    protected Broker broker;
058    private final PListStore store;
059    private final String name;
060    private PendingList memoryList;
061    private PList diskList;
062    private Iterator<MessageReference> iter;
063    private Destination regionDestination;
064    private boolean iterating;
065    private boolean flushRequired;
066    private final AtomicBoolean started = new AtomicBoolean();
067    private final WireFormat wireFormat = new OpenWireFormat();
068
069    /**
070     * @param broker
071     * @param name
072     * @param prioritizedMessages
073     */
074    public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) {
075        super(prioritizedMessages);
076        if (this.prioritizedMessages) {
077            this.memoryList = new PrioritizedPendingList();
078        } else {
079            this.memoryList = new OrderedPendingList();
080        }
081        this.broker = broker;
082        // the store can be null if the BrokerService has persistence
083        // turned off
084        this.store = broker.getTempDataStore();
085        this.name = NAME_COUNT.incrementAndGet() + "_" + name;
086    }
087
088    @Override
089    public void start() throws Exception {
090        if (started.compareAndSet(false, true)) {
091            if( this.broker != null) {
092                wireFormat.setVersion(this.broker.getBrokerService().getStoreOpenWireVersion());
093            }
094            super.start();
095            if (systemUsage != null) {
096                systemUsage.getMemoryUsage().addUsageListener(this);
097            }
098        }
099    }
100
101    @Override
102    public void stop() throws Exception {
103        if (started.compareAndSet(true, false)) {
104            super.stop();
105            if (systemUsage != null) {
106                systemUsage.getMemoryUsage().removeUsageListener(this);
107            }
108        }
109    }
110
111    /**
112     * @return true if there are no pending messages
113     */
114    @Override
115    public synchronized boolean isEmpty() {
116        if (memoryList.isEmpty() && isDiskListEmpty()) {
117            return true;
118        }
119        for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
120            MessageReference node = iterator.next();
121            if (node == QueueMessageReference.NULL_MESSAGE) {
122                continue;
123            }
124            if (!node.isDropped()) {
125                return false;
126            }
127            // We can remove dropped references.
128            iterator.remove();
129        }
130        return isDiskListEmpty();
131    }
132
133    /**
134     * reset the cursor
135     */
136    @Override
137    public synchronized void reset() {
138        iterating = true;
139        last = null;
140        if (isDiskListEmpty()) {
141            this.iter = this.memoryList.iterator();
142        } else {
143            this.iter = new DiskIterator();
144        }
145    }
146
147    @Override
148    public synchronized void release() {
149        iterating = false;
150        if (iter instanceof DiskIterator) {
151           ((DiskIterator)iter).release();
152        };
153        if (flushRequired) {
154            flushRequired = false;
155            if (!hasSpace()) {
156                flushToDisk();
157            }
158        }
159        // ensure any memory ref is released
160        iter = null;
161    }
162
163    @Override
164    public synchronized void destroy() throws Exception {
165        stop();
166        for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
167            MessageReference node = i.next();
168            node.decrementReferenceCount();
169        }
170        memoryList.clear();
171        destroyDiskList();
172    }
173
174    private void destroyDiskList() throws Exception {
175        if (diskList != null) {
176            store.removePList(name);
177            diskList = null;
178        }
179    }
180
181    @Override
182    public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
183        LinkedList<MessageReference> result = new LinkedList<MessageReference>();
184        int count = 0;
185        for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) {
186            MessageReference ref = i.next();
187            ref.incrementReferenceCount();
188            result.add(ref);
189            count++;
190        }
191        if (count < maxItems && !isDiskListEmpty()) {
192            for (Iterator<MessageReference> i = new DiskIterator(); i.hasNext() && count < maxItems;) {
193                Message message = (Message) i.next();
194                message.setRegionDestination(regionDestination);
195                message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
196                message.incrementReferenceCount();
197                result.add(message);
198                count++;
199            }
200        }
201        return result;
202    }
203
204    /**
205     * add message to await dispatch
206     * 
207     * @param node
208     * @throws Exception 
209     */
210    @Override
211    public synchronized boolean addMessageLast(MessageReference node) throws Exception {
212        return tryAddMessageLast(node, 0);
213    }
214    
215    @Override
216    public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
217        if (!node.isExpired()) {
218            try {
219                regionDestination = (Destination) node.getMessage().getRegionDestination();
220                if (isDiskListEmpty()) {
221                    if (hasSpace() || this.store == null) {
222                        memoryList.addMessageLast(node);
223                        node.incrementReferenceCount();
224                        setCacheEnabled(true);
225                        return true;
226                    }
227                }
228                if (!hasSpace()) {
229                    if (isDiskListEmpty()) {
230                        expireOldMessages();
231                        if (hasSpace()) {
232                            memoryList.addMessageLast(node);
233                            node.incrementReferenceCount();
234                            return true;
235                        } else {
236                            flushToDisk();
237                        }
238                    }
239                }
240                if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) {
241                    ByteSequence bs = getByteSequence(node.getMessage());
242                    getDiskList().addLast(node.getMessageId().toString(), bs);
243                    return true;
244                }
245                return false;
246
247            } catch (Exception e) {
248                LOG.error("Caught an Exception adding a message: {} first to FilePendingMessageCursor ", node, e);
249                throw new RuntimeException(e);
250            }
251        } else {
252            discardExpiredMessage(node);
253        }
254        //message expired
255        return true;
256    }
257
258    /**
259     * add message to await dispatch
260     * 
261     * @param node
262     */
263    @Override
264    public synchronized void addMessageFirst(MessageReference node) {
265        if (!node.isExpired()) {
266            try {
267                regionDestination = (Destination) node.getMessage().getRegionDestination();
268                if (isDiskListEmpty()) {
269                    if (hasSpace()) {
270                        memoryList.addMessageFirst(node);
271                        node.incrementReferenceCount();
272                        setCacheEnabled(true);
273                        return;
274                    }
275                }
276                if (!hasSpace()) {
277                    if (isDiskListEmpty()) {
278                        expireOldMessages();
279                        if (hasSpace()) {
280                            memoryList.addMessageFirst(node);
281                            node.incrementReferenceCount();
282                            return;
283                        } else {
284                            flushToDisk();
285                        }
286                    }
287                }
288                systemUsage.getTempUsage().waitForSpace();
289                node.decrementReferenceCount();
290                ByteSequence bs = getByteSequence(node.getMessage());
291                Object locator = getDiskList().addFirst(node.getMessageId().toString(), bs);
292                node.getMessageId().setPlistLocator(locator);
293
294            } catch (Exception e) {
295                LOG.error("Caught an Exception adding a message: {} first to FilePendingMessageCursor ", node, e);
296                throw new RuntimeException(e);
297            }
298        } else {
299            discardExpiredMessage(node);
300        }
301    }
302
303    /**
304     * @return true if there pending messages to dispatch
305     */
306    @Override
307    public synchronized boolean hasNext() {
308        return iter.hasNext();
309    }
310
311    /**
312     * @return the next pending message
313     */
314    @Override
315    public synchronized MessageReference next() {
316        MessageReference reference = iter.next();
317        last = reference;
318        if (!isDiskListEmpty()) {
319            // got from disk
320            reference.getMessage().setRegionDestination(regionDestination);
321            reference.getMessage().setMemoryUsage(this.getSystemUsage().getMemoryUsage());
322        }
323        reference.incrementReferenceCount();
324        return reference;
325    }
326
327    /**
328     * remove the message at the cursor position
329     */
330    @Override
331    public synchronized void remove() {
332        iter.remove();
333        if (last != null) {
334            last.decrementReferenceCount();
335        }
336    }
337
338    /**
339     * @param node
340     * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
341     */
342    @Override
343    public synchronized void remove(MessageReference node) {
344        if (memoryList.remove(node) != null) {
345            node.decrementReferenceCount();
346        }
347        if (!isDiskListEmpty()) {
348            try {
349                getDiskList().remove(node.getMessageId().getPlistLocator());
350            } catch (IOException e) {
351                throw new RuntimeException(e);
352            }
353        }
354    }
355
356    /**
357     * @return the number of pending messages
358     */
359    @Override
360    public synchronized int size() {
361        return memoryList.size() + (isDiskListEmpty() ? 0 : (int)getDiskList().size());
362    }
363
364    /**
365     * clear all pending messages
366     */
367    @Override
368    public synchronized void clear() {
369        memoryList.clear();
370        if (!isDiskListEmpty()) {
371            try {
372                getDiskList().destroy();
373            } catch (IOException e) {
374                throw new RuntimeException(e);
375            }
376        }
377        last = null;
378    }
379
380    @Override
381    public synchronized boolean isFull() {
382        return super.isFull() || (!isDiskListEmpty() && systemUsage != null && systemUsage.getTempUsage().isFull());
383    }
384
385    @Override
386    public boolean hasMessagesBufferedToDeliver() {
387        return !isEmpty();
388    }
389
390    @Override
391    public void setSystemUsage(SystemUsage usageManager) {
392        super.setSystemUsage(usageManager);
393    }
394
395    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
396        if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
397            List<MessageReference> expiredMessages = null;
398            synchronized (this) {
399                if (!flushRequired && size() != 0) {
400                    flushRequired =true;
401                    if (!iterating) {
402                        expiredMessages = expireOldMessages();
403                        if (!hasSpace()) {
404                            flushToDisk();
405                            flushRequired = false;
406                        }
407                    }
408                }
409            }
410
411            if (expiredMessages != null) {
412                for (MessageReference node : expiredMessages) {
413                    discardExpiredMessage(node);
414                }
415            }
416        }
417    }
418
419    @Override
420    public boolean isTransient() {
421        return true;
422    }
423
424    private synchronized List<MessageReference> expireOldMessages() {
425        List<MessageReference> expired = new ArrayList<MessageReference>();
426        if (!memoryList.isEmpty()) {
427            for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
428                MessageReference node = iterator.next();
429                if (node.isExpired()) {
430                    node.decrementReferenceCount();
431                    expired.add(node);
432                    iterator.remove();
433                }
434            }
435        }
436
437        return expired;
438    }
439
440    protected synchronized void flushToDisk() {
441        if (!memoryList.isEmpty() && store != null) {
442            long start = 0;
443            if (LOG.isTraceEnabled()) {
444                start = System.currentTimeMillis();
445                LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[] { name, memoryList.size(),
446                    (systemUsage != null ? systemUsage.getMemoryUsage() : "") });
447            }
448            for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
449                MessageReference node = iterator.next();
450                node.decrementReferenceCount();
451                ByteSequence bs;
452                try {
453                    bs = getByteSequence(node.getMessage());
454                    getDiskList().addLast(node.getMessageId().toString(), bs);
455                } catch (IOException e) {
456                    LOG.error("Failed to write to disk list", e);
457                    throw new RuntimeException(e);
458                }
459
460            }
461            memoryList.clear();
462            setCacheEnabled(false);
463            LOG.trace("{}, flushToDisk() done - {} ms {}", new Object[]{ name, (System.currentTimeMillis() - start), (systemUsage != null ? systemUsage.getMemoryUsage() : "") });
464        }
465    }
466
467    protected boolean isDiskListEmpty() {
468        return diskList == null || diskList.isEmpty();
469    }
470
471    public PList getDiskList() {
472        if (diskList == null) {
473            try {
474                diskList = store.getPList(name);
475            } catch (Exception e) {
476                LOG.error("Caught an IO Exception getting the DiskList {}", name, e);
477                throw new RuntimeException(e);
478            }
479        }
480        return diskList;
481    }
482
483    private void discardExpiredMessage(MessageReference reference) {
484        LOG.debug("Discarding expired message {}", reference);
485        if (reference.isExpired() && broker.isExpired(reference)) {
486            ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
487            context.setBroker(broker);
488            ((Destination)reference.getRegionDestination()).messageExpired(context, null, new IndirectMessageReference(reference.getMessage()));
489        }
490    }
491
492    protected ByteSequence getByteSequence(Message message) throws IOException {
493        org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
494        return new ByteSequence(packet.data, packet.offset, packet.length);
495    }
496
497    protected Message getMessage(ByteSequence bs) throws IOException {
498        org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(bs.getData(), bs
499                .getOffset(), bs.getLength());
500        return (Message) this.wireFormat.unmarshal(packet);
501
502    }
503
504    final class DiskIterator implements Iterator<MessageReference> {
505        private final PList.PListIterator iterator;
506        DiskIterator() {
507            try {
508                iterator = getDiskList().iterator();
509            } catch (Exception e) {
510                throw new RuntimeException(e);
511            }
512        }
513
514        public boolean hasNext() {
515            return iterator.hasNext();
516        }
517
518        public MessageReference next() {
519            try {
520                PListEntry entry = iterator.next();
521                Message message = getMessage(entry.getByteSequence());
522                message.getMessageId().setPlistLocator(entry.getLocator());
523                return message;
524            } catch (IOException e) {
525                LOG.error("I/O error", e);
526                throw new RuntimeException(e);
527            }
528        }
529
530        public void remove() {
531            iterator.remove();
532        }
533
534        public void release() {
535            iterator.release();
536        }
537    }
538}