001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.protocol; 018 019import static org.apache.activemq.transport.amqp.AmqpSupport.COPY; 020import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS; 021import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME; 022import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS; 023import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME; 024import static org.apache.activemq.transport.amqp.AmqpSupport.createDestination; 025import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter; 026 027import java.io.IOException; 028import java.util.HashMap; 029import java.util.Map; 030 031import javax.jms.InvalidSelectorException; 032 033import org.apache.activemq.command.ActiveMQDestination; 034import org.apache.activemq.command.ActiveMQTempDestination; 035import org.apache.activemq.command.ConsumerId; 036import org.apache.activemq.command.ConsumerInfo; 037import org.apache.activemq.command.ExceptionResponse; 038import org.apache.activemq.command.ProducerId; 039import org.apache.activemq.command.ProducerInfo; 040import org.apache.activemq.command.RemoveInfo; 041import org.apache.activemq.command.Response; 042import org.apache.activemq.command.SessionId; 043import org.apache.activemq.command.SessionInfo; 044import org.apache.activemq.command.TransactionId; 045import org.apache.activemq.selector.SelectorParser; 046import org.apache.activemq.transport.amqp.AmqpProtocolConverter; 047import org.apache.activemq.transport.amqp.AmqpProtocolException; 048import org.apache.activemq.transport.amqp.ResponseHandler; 049import org.apache.qpid.proton.amqp.DescribedType; 050import org.apache.qpid.proton.amqp.Symbol; 051import org.apache.qpid.proton.amqp.messaging.Target; 052import org.apache.qpid.proton.amqp.messaging.TerminusDurability; 053import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; 054import org.apache.qpid.proton.amqp.transport.AmqpError; 055import org.apache.qpid.proton.amqp.transport.ErrorCondition; 056import org.apache.qpid.proton.engine.Receiver; 057import org.apache.qpid.proton.engine.Sender; 058import org.apache.qpid.proton.engine.Session; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062/** 063 * Wraps the AMQP Session and provides the services needed to manage the remote 064 * peer requests for link establishment. 065 */ 066public class AmqpSession implements AmqpResource { 067 068 private static final Logger LOG = LoggerFactory.getLogger(AmqpSession.class); 069 070 private final Map<ConsumerId, AmqpSender> consumers = new HashMap<ConsumerId, AmqpSender>(); 071 072 private final AmqpConnection connection; 073 private final Session protonSession; 074 private final SessionId sessionId; 075 076 private boolean enlisted; 077 private long nextProducerId = 0; 078 private long nextConsumerId = 0; 079 080 /** 081 * Create new AmqpSession instance whose parent is the given AmqpConnection. 082 * 083 * @param connection 084 * the parent connection for this session. 085 * @param sessionId 086 * the ActiveMQ SessionId that is used to identify this session. 087 * @param session 088 * the AMQP Session that this class manages. 089 */ 090 public AmqpSession(AmqpConnection connection, SessionId sessionId, Session session) { 091 this.connection = connection; 092 this.sessionId = sessionId; 093 this.protonSession = session; 094 } 095 096 @Override 097 public void open() { 098 LOG.debug("Session {} opened", getSessionId()); 099 100 getEndpoint().setContext(this); 101 getEndpoint().setIncomingCapacity(Integer.MAX_VALUE); 102 getEndpoint().open(); 103 104 connection.sendToActiveMQ(new SessionInfo(getSessionId())); 105 } 106 107 @Override 108 public void close() { 109 LOG.debug("Session {} closed", getSessionId()); 110 111 getEndpoint().setContext(null); 112 getEndpoint().close(); 113 getEndpoint().free(); 114 115 connection.sendToActiveMQ(new RemoveInfo(getSessionId())); 116 } 117 118 /** 119 * Commits all pending work for all resources managed under this session. 120 * 121 * @throws Exception if an error occurs while attempting to commit work. 122 */ 123 public void commit() throws Exception { 124 for (AmqpSender consumer : consumers.values()) { 125 consumer.commit(); 126 } 127 128 enlisted = false; 129 } 130 131 /** 132 * Rolls back any pending work being down under this session. 133 * 134 * @throws Exception if an error occurs while attempting to roll back work. 135 */ 136 public void rollback() throws Exception { 137 for (AmqpSender consumer : consumers.values()) { 138 consumer.rollback(); 139 } 140 141 enlisted = false; 142 } 143 144 /** 145 * Used to direct all Session managed Senders to push any queued Messages 146 * out to the remote peer. 147 * 148 * @throws Exception if an error occurs while flushing the messages. 149 */ 150 public void flushPendingMessages() throws Exception { 151 for (AmqpSender consumer : consumers.values()) { 152 consumer.pumpOutbound(); 153 } 154 } 155 156 public void createCoordinator(final Receiver protonReceiver) throws Exception { 157 AmqpTransactionCoordinator txCoordinator = new AmqpTransactionCoordinator(this, protonReceiver); 158 txCoordinator.flow(connection.getConfiguredReceiverCredit()); 159 txCoordinator.open(); 160 } 161 162 public void createReceiver(final Receiver protonReceiver) throws Exception { 163 org.apache.qpid.proton.amqp.transport.Target remoteTarget = protonReceiver.getRemoteTarget(); 164 165 ProducerInfo producerInfo = new ProducerInfo(getNextProducerId()); 166 final AmqpReceiver receiver = new AmqpReceiver(this, protonReceiver, producerInfo); 167 168 LOG.debug("opening new receiver {} on link: {}", producerInfo.getProducerId(), protonReceiver.getName()); 169 170 try { 171 Target target = (Target) remoteTarget; 172 ActiveMQDestination destination = null; 173 String targetNodeName = target.getAddress(); 174 175 if (target.getDynamic()) { 176 destination = connection.createTemporaryDestination(protonReceiver, target.getCapabilities()); 177 Target actualTarget = new Target(); 178 actualTarget.setAddress(destination.getQualifiedName()); 179 actualTarget.setDynamic(true); 180 protonReceiver.setTarget(actualTarget); 181 receiver.addCloseAction(new Runnable() { 182 183 @Override 184 public void run() { 185 connection.deleteTemporaryDestination((ActiveMQTempDestination) receiver.getDestination()); 186 } 187 }); 188 } else if (targetNodeName != null && !targetNodeName.isEmpty()) { 189 destination = createDestination(remoteTarget); 190 if (destination.isTemporary()) { 191 String connectionId = ((ActiveMQTempDestination) destination).getConnectionId(); 192 if (connectionId == null) { 193 throw new AmqpProtocolException(AmqpError.PRECONDITION_FAILED.toString(), "Not a broker created temp destination"); 194 } 195 } 196 } 197 198 receiver.setDestination(destination); 199 connection.sendToActiveMQ(producerInfo, new ResponseHandler() { 200 @Override 201 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 202 if (response.isException()) { 203 ErrorCondition error = null; 204 Throwable exception = ((ExceptionResponse) response).getException(); 205 if (exception instanceof SecurityException) { 206 error = new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()); 207 } else { 208 error = new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()); 209 } 210 211 receiver.close(error); 212 } else { 213 receiver.flow(connection.getConfiguredReceiverCredit()); 214 receiver.open(); 215 } 216 pumpProtonToSocket(); 217 } 218 }); 219 220 } catch (AmqpProtocolException exception) { 221 receiver.close(new ErrorCondition(Symbol.getSymbol(exception.getSymbolicName()), exception.getMessage())); 222 } 223 } 224 225 @SuppressWarnings("unchecked") 226 public void createSender(final Sender protonSender) throws Exception { 227 org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) protonSender.getRemoteSource(); 228 229 ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); 230 final AmqpSender sender = new AmqpSender(this, protonSender, consumerInfo); 231 232 LOG.debug("opening new sender {} on link: {}", consumerInfo.getConsumerId(), protonSender.getName()); 233 234 try { 235 final Map<Symbol, Object> supportedFilters = new HashMap<Symbol, Object>(); 236 protonSender.setContext(sender); 237 238 boolean noLocal = false; 239 String selector = null; 240 241 if (source != null) { 242 Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS); 243 if (filter != null) { 244 selector = filter.getValue().getDescribed().toString(); 245 // Validate the Selector. 246 try { 247 SelectorParser.parse(selector); 248 } catch (InvalidSelectorException e) { 249 sender.close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage())); 250 return; 251 } 252 253 supportedFilters.put(filter.getKey(), filter.getValue()); 254 } 255 256 filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS); 257 if (filter != null) { 258 noLocal = true; 259 supportedFilters.put(filter.getKey(), filter.getValue()); 260 } 261 } 262 263 ActiveMQDestination destination; 264 if (source == null) { 265 // Attempt to recover previous subscription 266 ConsumerInfo storedInfo = connection.lookupSubscription(protonSender.getName()); 267 268 if (storedInfo != null) { 269 destination = storedInfo.getDestination(); 270 271 source = new org.apache.qpid.proton.amqp.messaging.Source(); 272 source.setAddress(destination.getQualifiedName()); 273 source.setDurable(TerminusDurability.UNSETTLED_STATE); 274 source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); 275 source.setDistributionMode(COPY); 276 277 if (storedInfo.isNoLocal()) { 278 supportedFilters.put(NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL); 279 } 280 281 if (storedInfo.getSelector() != null && !storedInfo.getSelector().trim().equals("")) { 282 supportedFilters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(storedInfo.getSelector())); 283 } 284 } else { 285 sender.close(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription link: " + protonSender.getName())); 286 return; 287 } 288 } else if (source.getDynamic()) { 289 // lets create a temp dest. 290 destination = connection.createTemporaryDestination(protonSender, source.getCapabilities()); 291 source = new org.apache.qpid.proton.amqp.messaging.Source(); 292 source.setAddress(destination.getQualifiedName()); 293 source.setDynamic(true); 294 sender.addCloseAction(new Runnable() { 295 296 @Override 297 public void run() { 298 connection.deleteTemporaryDestination((ActiveMQTempDestination) sender.getDestination()); 299 } 300 }); 301 } else { 302 destination = createDestination(source); 303 if (destination.isTemporary()) { 304 String connectionId = ((ActiveMQTempDestination) destination).getConnectionId(); 305 if (connectionId == null) { 306 throw new AmqpProtocolException(AmqpError.INVALID_FIELD.toString(), "Not a broker created temp destination"); 307 } 308 } 309 } 310 311 source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters); 312 protonSender.setSource(source); 313 314 int senderCredit = protonSender.getRemoteCredit(); 315 316 consumerInfo.setSelector(selector); 317 consumerInfo.setNoRangeAcks(true); 318 consumerInfo.setDestination(destination); 319 consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0); 320 consumerInfo.setDispatchAsync(true); 321 consumerInfo.setNoLocal(noLocal); 322 323 if (source.getDistributionMode() == COPY && destination.isQueue()) { 324 consumerInfo.setBrowser(true); 325 } 326 327 if ((TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || 328 TerminusDurability.CONFIGURATION.equals(source.getDurable())) && destination.isTopic()) { 329 consumerInfo.setSubscriptionName(protonSender.getName()); 330 } 331 332 connection.sendToActiveMQ(consumerInfo, new ResponseHandler() { 333 @Override 334 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 335 if (response.isException()) { 336 ErrorCondition error = null; 337 Throwable exception = ((ExceptionResponse) response).getException(); 338 if (exception instanceof SecurityException) { 339 error = new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()); 340 } else if (exception instanceof InvalidSelectorException) { 341 error = new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage()); 342 } else { 343 error = new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()); 344 } 345 346 sender.close(error); 347 } else { 348 sender.open(); 349 } 350 pumpProtonToSocket(); 351 } 352 }); 353 354 } catch (AmqpProtocolException e) { 355 sender.close(new ErrorCondition(Symbol.getSymbol(e.getSymbolicName()), e.getMessage())); 356 } 357 } 358 359 /** 360 * Send all pending work out to the remote peer. 361 */ 362 public void pumpProtonToSocket() { 363 connection.pumpProtonToSocket(); 364 } 365 366 public void registerSender(ConsumerId consumerId, AmqpSender sender) { 367 consumers.put(consumerId, sender); 368 connection.registerSender(consumerId, sender); 369 } 370 371 public void unregisterSender(ConsumerId consumerId) { 372 consumers.remove(consumerId); 373 connection.unregisterSender(consumerId); 374 } 375 376 public void enlist(TransactionId txId) { 377 if (!enlisted) { 378 connection.getTxCoordinator(txId).enlist(this); 379 enlisted = true; 380 } 381 } 382 383 //----- Configuration accessors ------------------------------------------// 384 385 public AmqpConnection getConnection() { 386 return connection; 387 } 388 389 public SessionId getSessionId() { 390 return sessionId; 391 } 392 393 public Session getEndpoint() { 394 return protonSession; 395 } 396 397 public long getMaxFrameSize() { 398 return connection.getMaxFrameSize(); 399 } 400 401 //----- Internal Implementation ------------------------------------------// 402 403 private ConsumerId getNextConsumerId() { 404 return new ConsumerId(sessionId, nextConsumerId++); 405 } 406 407 private ProducerId getNextProducerId() { 408 return new ProducerId(sessionId, nextProducerId++); 409 } 410}