001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.plist; 018 019import java.io.DataInput; 020import java.io.DataOutput; 021import java.io.IOException; 022import java.util.Iterator; 023import java.util.Map; 024import java.util.NoSuchElementException; 025import java.util.Set; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicReference; 028 029import org.apache.activemq.broker.region.MessageReference; 030import org.apache.activemq.command.Message; 031import org.apache.activemq.store.PList; 032import org.apache.activemq.store.PListEntry; 033import org.apache.activemq.store.kahadb.disk.index.ListIndex; 034import org.apache.activemq.store.kahadb.disk.journal.Location; 035import org.apache.activemq.store.kahadb.disk.page.Transaction; 036import org.apache.activemq.util.ByteSequence; 037import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller; 038import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; 039import org.apache.activemq.wireformat.WireFormat; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043public class PListImpl extends ListIndex<String, Location> implements PList { 044 static final Logger LOG = LoggerFactory.getLogger(PListImpl.class); 045 final PListStoreImpl store; 046 private String name; 047 Object indexLock; 048 049 PListImpl(PListStoreImpl store) { 050 this.store = store; 051 this.indexLock = store.getIndexLock(); 052 setPageFile(store.getPageFile()); 053 setKeyMarshaller(StringMarshaller.INSTANCE); 054 setValueMarshaller(LocationMarshaller.INSTANCE); 055 } 056 057 public void setName(String name) { 058 this.name = name; 059 } 060 061 @Override 062 public String getName() { 063 return this.name; 064 } 065 066 void read(DataInput in) throws IOException { 067 setHeadPageId(in.readLong()); 068 } 069 070 public void write(DataOutput out) throws IOException { 071 out.writeLong(getHeadPageId()); 072 } 073 074 @Override 075 public synchronized void destroy() throws IOException { 076 synchronized (indexLock) { 077 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 078 public void execute(Transaction tx) throws IOException { 079 clear(tx); 080 unload(tx); 081 } 082 }); 083 } 084 } 085 086 class Locator { 087 final String id; 088 089 Locator(String id) { 090 this.id = id; 091 } 092 093 PListImpl plist() { 094 return PListImpl.this; 095 } 096 } 097 098 @Override 099 public Object addLast(final String id, final ByteSequence bs) throws IOException { 100 final Location location = this.store.write(bs, false); 101 synchronized (indexLock) { 102 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 103 public void execute(Transaction tx) throws IOException { 104 add(tx, id, location); 105 } 106 }); 107 } 108 return new Locator(id); 109 } 110 111 @Override 112 public Object addFirst(final String id, final ByteSequence bs) throws IOException { 113 final Location location = this.store.write(bs, false); 114 synchronized (indexLock) { 115 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 116 public void execute(Transaction tx) throws IOException { 117 addFirst(tx, id, location); 118 } 119 }); 120 } 121 return new Locator(id); 122 } 123 124 @Override 125 public boolean remove(final Object l) throws IOException { 126 Locator locator = (Locator) l; 127 assert locator!=null; 128 assert locator.plist()==this; 129 return remove(locator.id); 130 } 131 132 public boolean remove(final String id) throws IOException { 133 final AtomicBoolean result = new AtomicBoolean(); 134 synchronized (indexLock) { 135 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 136 public void execute(Transaction tx) throws IOException { 137 result.set(remove(tx, id) != null); 138 } 139 }); 140 } 141 return result.get(); 142 } 143 144 public boolean remove(final long position) throws IOException { 145 final AtomicBoolean result = new AtomicBoolean(); 146 synchronized (indexLock) { 147 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 148 public void execute(Transaction tx) throws IOException { 149 Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position); 150 if (iterator.hasNext()) { 151 iterator.next(); 152 iterator.remove(); 153 result.set(true); 154 } else { 155 result.set(false); 156 } 157 } 158 }); 159 } 160 return result.get(); 161 } 162 163 public PListEntry get(final long position) throws IOException { 164 PListEntry result = null; 165 final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>(); 166 synchronized (indexLock) { 167 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 168 public void execute(Transaction tx) throws IOException { 169 Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position); 170 ref.set(iterator.next()); 171 } 172 }); 173 } 174 if (ref.get() != null) { 175 ByteSequence bs = this.store.getPayload(ref.get().getValue()); 176 result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey())); 177 } 178 return result; 179 } 180 181 public PListEntry getFirst() throws IOException { 182 PListEntry result = null; 183 final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>(); 184 synchronized (indexLock) { 185 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 186 public void execute(Transaction tx) throws IOException { 187 ref.set(getFirst(tx)); 188 } 189 }); 190 } 191 if (ref.get() != null) { 192 ByteSequence bs = this.store.getPayload(ref.get().getValue()); 193 result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey())); 194 } 195 return result; 196 } 197 198 public PListEntry getLast() throws IOException { 199 PListEntry result = null; 200 final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>(); 201 synchronized (indexLock) { 202 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 203 public void execute(Transaction tx) throws IOException { 204 ref.set(getLast(tx)); 205 } 206 }); 207 } 208 if (ref.get() != null) { 209 ByteSequence bs = this.store.getPayload(ref.get().getValue()); 210 result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey())); 211 } 212 return result; 213 } 214 215 @Override 216 public boolean isEmpty() { 217 return size() == 0; 218 } 219 220 @Override 221 public PListIterator iterator() throws IOException { 222 return new PListIteratorImpl(); 223 } 224 225 final class PListIteratorImpl implements PListIterator { 226 final Iterator<Map.Entry<String, Location>> iterator; 227 final Transaction tx; 228 229 PListIteratorImpl() throws IOException { 230 tx = store.pageFile.tx(); 231 synchronized (indexLock) { 232 this.iterator = iterator(tx); 233 } 234 } 235 236 @Override 237 public boolean hasNext() { 238 return iterator.hasNext(); 239 } 240 241 @Override 242 public PListEntry next() { 243 Map.Entry<String, Location> entry = iterator.next(); 244 ByteSequence bs = null; 245 try { 246 bs = store.getPayload(entry.getValue()); 247 } catch (IOException unexpected) { 248 NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage()); 249 e.initCause(unexpected); 250 throw e; 251 } 252 return new PListEntry(entry.getKey(), bs, new Locator(entry.getKey())); 253 } 254 255 @Override 256 public void remove() { 257 try { 258 synchronized (indexLock) { 259 tx.execute(new Transaction.Closure<IOException>() { 260 @Override 261 public void execute(Transaction tx) throws IOException { 262 iterator.remove(); 263 } 264 }); 265 } 266 } catch (IOException unexpected) { 267 IllegalStateException e = new IllegalStateException(unexpected); 268 e.initCause(unexpected); 269 throw e; 270 } 271 } 272 273 public void release() { 274 try { 275 tx.rollback(); 276 } catch (IOException unexpected) { 277 IllegalStateException e = new IllegalStateException(unexpected); 278 e.initCause(unexpected); 279 throw e; 280 } 281 } 282 } 283 284 public void claimFileLocations(final Set<Integer> candidates) throws IOException { 285 synchronized (indexLock) { 286 if (loaded.get()) { 287 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 288 public void execute(Transaction tx) throws IOException { 289 Iterator<Map.Entry<String,Location>> iterator = iterator(tx); 290 while (iterator.hasNext()) { 291 Location location = iterator.next().getValue(); 292 candidates.remove(location.getDataFileId()); 293 } 294 } 295 }); 296 } 297 } 298 } 299 300 @Override 301 public String toString() { 302 return name + "[headPageId=" + getHeadPageId() + ",tailPageId=" + getTailPageId() + ", size=" + size() + "]"; 303 } 304}