001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.plist;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.IOException;
022import java.util.Iterator;
023import java.util.Map;
024import java.util.NoSuchElementException;
025import java.util.Set;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicReference;
028
029import org.apache.activemq.broker.region.MessageReference;
030import org.apache.activemq.command.Message;
031import org.apache.activemq.store.PList;
032import org.apache.activemq.store.PListEntry;
033import org.apache.activemq.store.kahadb.disk.index.ListIndex;
034import org.apache.activemq.store.kahadb.disk.journal.Location;
035import org.apache.activemq.store.kahadb.disk.page.Transaction;
036import org.apache.activemq.util.ByteSequence;
037import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
038import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
039import org.apache.activemq.wireformat.WireFormat;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043public class PListImpl extends ListIndex<String, Location> implements PList {
044    static final Logger LOG = LoggerFactory.getLogger(PListImpl.class);
045    final PListStoreImpl store;
046    private String name;
047    Object indexLock;
048
049    PListImpl(PListStoreImpl store) {
050        this.store = store;
051        this.indexLock = store.getIndexLock();
052        setPageFile(store.getPageFile());
053        setKeyMarshaller(StringMarshaller.INSTANCE);
054        setValueMarshaller(LocationMarshaller.INSTANCE);
055    }
056
057    public void setName(String name) {
058        this.name = name;
059    }
060
061    @Override
062    public String getName() {
063        return this.name;
064    }
065
066    void read(DataInput in) throws IOException {
067        setHeadPageId(in.readLong());
068    }
069
070    public void write(DataOutput out) throws IOException {
071        out.writeLong(getHeadPageId());
072    }
073
074    @Override
075    public synchronized void destroy() throws IOException {
076        synchronized (indexLock) {
077            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
078                public void execute(Transaction tx) throws IOException {
079                    clear(tx);
080                    unload(tx);
081                }
082            });
083        }
084    }
085
086    class Locator {
087        final String id;
088
089        Locator(String id) {
090            this.id = id;
091        }
092
093        PListImpl plist() {
094            return PListImpl.this;
095        }
096    }
097
098    @Override
099    public Object addLast(final String id, final ByteSequence bs) throws IOException {
100        final Location location = this.store.write(bs, false);
101        synchronized (indexLock) {
102            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
103                public void execute(Transaction tx) throws IOException {
104                    add(tx, id, location);
105                }
106            });
107        }
108        return new Locator(id);
109    }
110
111    @Override
112    public Object addFirst(final String id, final ByteSequence bs) throws IOException {
113        final Location location = this.store.write(bs, false);
114        synchronized (indexLock) {
115            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
116                public void execute(Transaction tx) throws IOException {
117                    addFirst(tx, id, location);
118                }
119            });
120        }
121        return new Locator(id);
122    }
123
124    @Override
125    public boolean remove(final Object l) throws IOException {
126        Locator locator = (Locator) l;
127        assert locator!=null;
128        assert locator.plist()==this;
129        return remove(locator.id);
130    }
131
132    public boolean remove(final String id) throws IOException {
133        final AtomicBoolean result = new AtomicBoolean();
134        synchronized (indexLock) {
135            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
136                public void execute(Transaction tx) throws IOException {
137                    result.set(remove(tx, id) != null);
138                }
139            });
140        }
141        return result.get();
142    }
143
144    public boolean remove(final long position) throws IOException {
145        final AtomicBoolean result = new AtomicBoolean();
146        synchronized (indexLock) {
147            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
148                public void execute(Transaction tx) throws IOException {
149                    Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
150                    if (iterator.hasNext()) {
151                        iterator.next();
152                        iterator.remove();
153                        result.set(true);
154                    } else {
155                        result.set(false);
156                    }
157                }
158            });
159        }
160        return result.get();
161    }
162
163    public PListEntry get(final long position) throws IOException {
164        PListEntry result = null;
165        final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
166        synchronized (indexLock) {
167            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
168                public void execute(Transaction tx) throws IOException {
169                    Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
170                    ref.set(iterator.next());
171                }
172            });
173        }
174        if (ref.get() != null) {
175            ByteSequence bs = this.store.getPayload(ref.get().getValue());
176            result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey()));
177        }
178        return result;
179    }
180
181    public PListEntry getFirst() throws IOException {
182        PListEntry result = null;
183        final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
184        synchronized (indexLock) {
185            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
186                public void execute(Transaction tx) throws IOException {
187                    ref.set(getFirst(tx));
188                }
189            });
190        }
191        if (ref.get() != null) {
192            ByteSequence bs = this.store.getPayload(ref.get().getValue());
193            result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey()));
194        }
195        return result;
196    }
197
198    public PListEntry getLast() throws IOException {
199        PListEntry result = null;
200        final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
201        synchronized (indexLock) {
202            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
203                public void execute(Transaction tx) throws IOException {
204                    ref.set(getLast(tx));
205                }
206            });
207        }
208        if (ref.get() != null) {
209            ByteSequence bs = this.store.getPayload(ref.get().getValue());
210            result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey()));
211        }
212        return result;
213    }
214
215    @Override
216    public boolean isEmpty() {
217        return size() == 0;
218    }
219
220    @Override
221    public PListIterator iterator() throws IOException {
222        return new PListIteratorImpl();
223    }
224
225    final class PListIteratorImpl implements PListIterator {
226        final Iterator<Map.Entry<String, Location>> iterator;
227        final Transaction tx;
228
229        PListIteratorImpl() throws IOException {
230            tx = store.pageFile.tx();
231            synchronized (indexLock) {
232                this.iterator = iterator(tx);
233            }
234        }
235
236        @Override
237        public boolean hasNext() {
238            return iterator.hasNext();
239        }
240
241        @Override
242        public PListEntry next() {
243            Map.Entry<String, Location> entry = iterator.next();
244            ByteSequence bs = null;
245            try {
246                bs = store.getPayload(entry.getValue());
247            } catch (IOException unexpected) {
248                NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage());
249                e.initCause(unexpected);
250                throw e;
251            }
252            return new PListEntry(entry.getKey(), bs, new Locator(entry.getKey()));
253        }
254
255        @Override
256        public void remove() {
257            try {
258                synchronized (indexLock) {
259                    tx.execute(new Transaction.Closure<IOException>() {
260                        @Override
261                        public void execute(Transaction tx) throws IOException {
262                            iterator.remove();
263                        }
264                    });
265                }
266            } catch (IOException unexpected) {
267                IllegalStateException e = new IllegalStateException(unexpected);
268                e.initCause(unexpected);
269                throw e;
270            }
271        }
272
273        public void release() {
274            try {
275                tx.rollback();
276            } catch (IOException unexpected) {
277                IllegalStateException e = new IllegalStateException(unexpected);
278                e.initCause(unexpected);
279                throw e;
280            }
281        }
282    }
283
284    public void claimFileLocations(final Set<Integer> candidates) throws IOException {
285        synchronized (indexLock) {
286            if (loaded.get()) {
287                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
288                    public void execute(Transaction tx) throws IOException {
289                        Iterator<Map.Entry<String,Location>> iterator = iterator(tx);
290                        while (iterator.hasNext()) {
291                            Location location = iterator.next().getValue();
292                            candidates.remove(location.getDataFileId());
293                        }
294                    }
295                });
296            }
297        }
298    }
299
300    @Override
301    public String toString() {
302        return name + "[headPageId=" + getHeadPageId()  + ",tailPageId=" + getTailPageId() + ", size=" + size() + "]";
303    }
304}