001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.protocol; 018 019import static org.apache.activemq.transport.amqp.AmqpSupport.COPY; 020import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS; 021import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME; 022import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS; 023import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME; 024import static org.apache.activemq.transport.amqp.AmqpSupport.createDestination; 025import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter; 026 027import java.io.IOException; 028import java.util.HashMap; 029import java.util.Map; 030 031import javax.jms.InvalidSelectorException; 032 033import org.apache.activemq.command.ActiveMQDestination; 034import org.apache.activemq.command.ActiveMQTempDestination; 035import org.apache.activemq.command.ConsumerId; 036import org.apache.activemq.command.ConsumerInfo; 037import org.apache.activemq.command.ExceptionResponse; 038import org.apache.activemq.command.ProducerId; 039import org.apache.activemq.command.ProducerInfo; 040import org.apache.activemq.command.RemoveInfo; 041import org.apache.activemq.command.Response; 042import org.apache.activemq.command.SessionId; 043import org.apache.activemq.command.SessionInfo; 044import org.apache.activemq.command.TransactionId; 045import org.apache.activemq.selector.SelectorParser; 046import org.apache.activemq.transport.amqp.AmqpProtocolConverter; 047import org.apache.activemq.transport.amqp.AmqpProtocolException; 048import org.apache.activemq.transport.amqp.ResponseHandler; 049import org.apache.activemq.util.IntrospectionSupport; 050import org.apache.qpid.proton.amqp.DescribedType; 051import org.apache.qpid.proton.amqp.Symbol; 052import org.apache.qpid.proton.amqp.messaging.Target; 053import org.apache.qpid.proton.amqp.messaging.TerminusDurability; 054import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; 055import org.apache.qpid.proton.amqp.transport.AmqpError; 056import org.apache.qpid.proton.amqp.transport.ErrorCondition; 057import org.apache.qpid.proton.engine.Receiver; 058import org.apache.qpid.proton.engine.Sender; 059import org.apache.qpid.proton.engine.Session; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063/** 064 * Wraps the AMQP Session and provides the services needed to manage the remote 065 * peer requests for link establishment. 066 */ 067public class AmqpSession implements AmqpResource { 068 069 private static final Logger LOG = LoggerFactory.getLogger(AmqpSession.class); 070 071 private final Map<ConsumerId, AmqpSender> consumers = new HashMap<ConsumerId, AmqpSender>(); 072 073 private final AmqpConnection connection; 074 private final Session protonSession; 075 private final SessionId sessionId; 076 077 private boolean enlisted; 078 private long nextProducerId = 0; 079 private long nextConsumerId = 0; 080 081 /** 082 * Create new AmqpSession instance whose parent is the given AmqpConnection. 083 * 084 * @param connection 085 * the parent connection for this session. 086 * @param sessionId 087 * the ActiveMQ SessionId that is used to identify this session. 088 * @param session 089 * the AMQP Session that this class manages. 090 */ 091 public AmqpSession(AmqpConnection connection, SessionId sessionId, Session session) { 092 this.connection = connection; 093 this.sessionId = sessionId; 094 this.protonSession = session; 095 } 096 097 @Override 098 public void open() { 099 LOG.debug("Session {} opened", getSessionId()); 100 101 getEndpoint().setContext(this); 102 getEndpoint().setIncomingCapacity(Integer.MAX_VALUE); 103 getEndpoint().open(); 104 105 connection.sendToActiveMQ(new SessionInfo(getSessionId())); 106 } 107 108 @Override 109 public void close() { 110 LOG.debug("Session {} closed", getSessionId()); 111 112 getEndpoint().setContext(null); 113 getEndpoint().close(); 114 getEndpoint().free(); 115 116 connection.sendToActiveMQ(new RemoveInfo(getSessionId())); 117 } 118 119 /** 120 * Commits all pending work for all resources managed under this session. 121 * 122 * @throws Exception if an error occurs while attempting to commit work. 123 */ 124 public void commit() throws Exception { 125 for (AmqpSender consumer : consumers.values()) { 126 consumer.commit(); 127 } 128 129 enlisted = false; 130 } 131 132 /** 133 * Rolls back any pending work being down under this session. 134 * 135 * @throws Exception if an error occurs while attempting to roll back work. 136 */ 137 public void rollback() throws Exception { 138 for (AmqpSender consumer : consumers.values()) { 139 consumer.rollback(); 140 } 141 142 enlisted = false; 143 } 144 145 /** 146 * Used to direct all Session managed Senders to push any queued Messages 147 * out to the remote peer. 148 * 149 * @throws Exception if an error occurs while flushing the messages. 150 */ 151 public void flushPendingMessages() throws Exception { 152 for (AmqpSender consumer : consumers.values()) { 153 consumer.pumpOutbound(); 154 } 155 } 156 157 public void createCoordinator(final Receiver protonReceiver) throws Exception { 158 AmqpTransactionCoordinator txCoordinator = new AmqpTransactionCoordinator(this, protonReceiver); 159 txCoordinator.flow(connection.getConfiguredReceiverCredit()); 160 txCoordinator.open(); 161 } 162 163 public void createReceiver(final Receiver protonReceiver) throws Exception { 164 org.apache.qpid.proton.amqp.transport.Target remoteTarget = protonReceiver.getRemoteTarget(); 165 166 ProducerInfo producerInfo = new ProducerInfo(getNextProducerId()); 167 final AmqpReceiver receiver = new AmqpReceiver(this, protonReceiver, producerInfo); 168 169 LOG.debug("opening new receiver {} on link: {}", producerInfo.getProducerId(), protonReceiver.getName()); 170 171 try { 172 Target target = (Target) remoteTarget; 173 ActiveMQDestination destination = null; 174 String targetNodeName = target.getAddress(); 175 176 if (target.getDynamic()) { 177 destination = connection.createTemporaryDestination(protonReceiver, target.getCapabilities()); 178 Target actualTarget = new Target(); 179 actualTarget.setAddress(destination.getQualifiedName()); 180 actualTarget.setDynamic(true); 181 protonReceiver.setTarget(actualTarget); 182 receiver.addCloseAction(new Runnable() { 183 184 @Override 185 public void run() { 186 connection.deleteTemporaryDestination((ActiveMQTempDestination) receiver.getDestination()); 187 } 188 }); 189 } else if (targetNodeName != null && !targetNodeName.isEmpty()) { 190 destination = createDestination(remoteTarget); 191 if (destination.isTemporary()) { 192 String connectionId = ((ActiveMQTempDestination) destination).getConnectionId(); 193 if (connectionId == null) { 194 throw new AmqpProtocolException(AmqpError.PRECONDITION_FAILED.toString(), "Not a broker created temp destination"); 195 } 196 } 197 } 198 199 receiver.setDestination(destination); 200 connection.sendToActiveMQ(producerInfo, new ResponseHandler() { 201 @Override 202 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 203 if (response.isException()) { 204 ErrorCondition error = null; 205 Throwable exception = ((ExceptionResponse) response).getException(); 206 if (exception instanceof SecurityException) { 207 error = new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()); 208 } else { 209 error = new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()); 210 } 211 212 receiver.close(error); 213 } else { 214 receiver.flow(connection.getConfiguredReceiverCredit()); 215 receiver.open(); 216 } 217 pumpProtonToSocket(); 218 } 219 }); 220 221 } catch (AmqpProtocolException exception) { 222 receiver.close(new ErrorCondition(Symbol.getSymbol(exception.getSymbolicName()), exception.getMessage())); 223 } 224 } 225 226 @SuppressWarnings("unchecked") 227 public void createSender(final Sender protonSender) throws Exception { 228 org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) protonSender.getRemoteSource(); 229 230 ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); 231 final AmqpSender sender = new AmqpSender(this, protonSender, consumerInfo); 232 233 LOG.debug("opening new sender {} on link: {}", consumerInfo.getConsumerId(), protonSender.getName()); 234 235 try { 236 final Map<Symbol, Object> supportedFilters = new HashMap<Symbol, Object>(); 237 protonSender.setContext(sender); 238 239 boolean noLocal = false; 240 String selector = null; 241 242 if (source != null) { 243 Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS); 244 if (filter != null) { 245 selector = filter.getValue().getDescribed().toString(); 246 // Validate the Selector. 247 try { 248 SelectorParser.parse(selector); 249 } catch (InvalidSelectorException e) { 250 sender.close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage())); 251 return; 252 } 253 254 supportedFilters.put(filter.getKey(), filter.getValue()); 255 } 256 257 filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS); 258 if (filter != null) { 259 noLocal = true; 260 supportedFilters.put(filter.getKey(), filter.getValue()); 261 } 262 } 263 264 ActiveMQDestination destination; 265 if (source == null) { 266 // Attempt to recover previous subscription 267 ConsumerInfo storedInfo = connection.lookupSubscription(protonSender.getName()); 268 269 if (storedInfo != null) { 270 destination = storedInfo.getDestination(); 271 272 source = new org.apache.qpid.proton.amqp.messaging.Source(); 273 source.setAddress(destination.getQualifiedName()); 274 source.setDurable(TerminusDurability.UNSETTLED_STATE); 275 source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); 276 source.setDistributionMode(COPY); 277 278 if (storedInfo.isNoLocal()) { 279 supportedFilters.put(NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL); 280 } 281 282 if (storedInfo.getSelector() != null && !storedInfo.getSelector().trim().equals("")) { 283 supportedFilters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(storedInfo.getSelector())); 284 } 285 } else { 286 sender.close(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription link: " + protonSender.getName())); 287 return; 288 } 289 } else if (source.getDynamic()) { 290 // lets create a temp dest. 291 destination = connection.createTemporaryDestination(protonSender, source.getCapabilities()); 292 source = new org.apache.qpid.proton.amqp.messaging.Source(); 293 source.setAddress(destination.getQualifiedName()); 294 source.setDynamic(true); 295 sender.addCloseAction(new Runnable() { 296 297 @Override 298 public void run() { 299 connection.deleteTemporaryDestination((ActiveMQTempDestination) sender.getDestination()); 300 } 301 }); 302 } else { 303 destination = createDestination(source); 304 if (destination.isTemporary()) { 305 String connectionId = ((ActiveMQTempDestination) destination).getConnectionId(); 306 if (connectionId == null) { 307 throw new AmqpProtocolException(AmqpError.INVALID_FIELD.toString(), "Not a broker created temp destination"); 308 } 309 } 310 } 311 312 source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters); 313 protonSender.setSource(source); 314 315 int senderCredit = protonSender.getRemoteCredit(); 316 317 // Allows the options on the destination to configure the consumerInfo 318 if (destination.getOptions() != null) { 319 Map<String, Object> options = IntrospectionSupport.extractProperties( 320 new HashMap<String, Object>(destination.getOptions()), "consumer."); 321 IntrospectionSupport.setProperties(consumerInfo, options); 322 if (options.size() > 0) { 323 String msg = "There are " + options.size() 324 + " consumer options that couldn't be set on the consumer." 325 + " Check the options are spelled correctly." 326 + " Unknown parameters=[" + options + "]." 327 + " This consumer cannot be started."; 328 LOG.warn(msg); 329 throw new AmqpProtocolException(AmqpError.INVALID_FIELD.toString(), msg); 330 } 331 } 332 333 consumerInfo.setSelector(selector); 334 consumerInfo.setNoRangeAcks(true); 335 consumerInfo.setDestination(destination); 336 consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0); 337 consumerInfo.setDispatchAsync(true); 338 consumerInfo.setNoLocal(noLocal); 339 340 if (source.getDistributionMode() == COPY && destination.isQueue()) { 341 consumerInfo.setBrowser(true); 342 } 343 344 if ((TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || 345 TerminusDurability.CONFIGURATION.equals(source.getDurable())) && destination.isTopic()) { 346 consumerInfo.setSubscriptionName(protonSender.getName()); 347 } 348 349 connection.sendToActiveMQ(consumerInfo, new ResponseHandler() { 350 @Override 351 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 352 if (response.isException()) { 353 ErrorCondition error = null; 354 Throwable exception = ((ExceptionResponse) response).getException(); 355 if (exception instanceof SecurityException) { 356 error = new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()); 357 } else if (exception instanceof InvalidSelectorException) { 358 error = new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage()); 359 } else { 360 error = new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()); 361 } 362 363 sender.close(error); 364 } else { 365 sender.open(); 366 } 367 pumpProtonToSocket(); 368 } 369 }); 370 371 } catch (AmqpProtocolException e) { 372 sender.close(new ErrorCondition(Symbol.getSymbol(e.getSymbolicName()), e.getMessage())); 373 } 374 } 375 376 /** 377 * Send all pending work out to the remote peer. 378 */ 379 public void pumpProtonToSocket() { 380 connection.pumpProtonToSocket(); 381 } 382 383 public void registerSender(ConsumerId consumerId, AmqpSender sender) { 384 consumers.put(consumerId, sender); 385 connection.registerSender(consumerId, sender); 386 } 387 388 public void unregisterSender(ConsumerId consumerId) { 389 consumers.remove(consumerId); 390 connection.unregisterSender(consumerId); 391 } 392 393 public void enlist(TransactionId txId) { 394 if (!enlisted) { 395 connection.getTxCoordinator(txId).enlist(this); 396 enlisted = true; 397 } 398 } 399 400 //----- Configuration accessors ------------------------------------------// 401 402 public AmqpConnection getConnection() { 403 return connection; 404 } 405 406 public SessionId getSessionId() { 407 return sessionId; 408 } 409 410 public Session getEndpoint() { 411 return protonSession; 412 } 413 414 public long getMaxFrameSize() { 415 return connection.getMaxFrameSize(); 416 } 417 418 //----- Internal Implementation ------------------------------------------// 419 420 private ConsumerId getNextConsumerId() { 421 return new ConsumerId(sessionId, nextConsumerId++); 422 } 423 424 private ProducerId getNextProducerId() { 425 return new ProducerId(sessionId, nextProducerId++); 426 } 427}