001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.protocol;
018
019import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
020import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
021import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME;
022import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
023import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME;
024import static org.apache.activemq.transport.amqp.AmqpSupport.createDestination;
025import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
026
027import java.io.IOException;
028import java.util.HashMap;
029import java.util.Map;
030
031import javax.jms.InvalidSelectorException;
032
033import org.apache.activemq.command.ActiveMQDestination;
034import org.apache.activemq.command.ActiveMQTempDestination;
035import org.apache.activemq.command.ConsumerId;
036import org.apache.activemq.command.ConsumerInfo;
037import org.apache.activemq.command.ExceptionResponse;
038import org.apache.activemq.command.ProducerId;
039import org.apache.activemq.command.ProducerInfo;
040import org.apache.activemq.command.RemoveInfo;
041import org.apache.activemq.command.Response;
042import org.apache.activemq.command.SessionId;
043import org.apache.activemq.command.SessionInfo;
044import org.apache.activemq.command.TransactionId;
045import org.apache.activemq.selector.SelectorParser;
046import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
047import org.apache.activemq.transport.amqp.AmqpProtocolException;
048import org.apache.activemq.transport.amqp.ResponseHandler;
049import org.apache.activemq.util.IntrospectionSupport;
050import org.apache.qpid.proton.amqp.DescribedType;
051import org.apache.qpid.proton.amqp.Symbol;
052import org.apache.qpid.proton.amqp.messaging.Target;
053import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
054import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
055import org.apache.qpid.proton.amqp.transport.AmqpError;
056import org.apache.qpid.proton.amqp.transport.ErrorCondition;
057import org.apache.qpid.proton.engine.Receiver;
058import org.apache.qpid.proton.engine.Sender;
059import org.apache.qpid.proton.engine.Session;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063/**
064 * Wraps the AMQP Session and provides the services needed to manage the remote
065 * peer requests for link establishment.
066 */
067public class AmqpSession implements AmqpResource {
068
069    private static final Logger LOG = LoggerFactory.getLogger(AmqpSession.class);
070
071    private final Map<ConsumerId, AmqpSender> consumers = new HashMap<ConsumerId, AmqpSender>();
072
073    private final AmqpConnection connection;
074    private final Session protonSession;
075    private final SessionId sessionId;
076
077    private boolean enlisted;
078    private long nextProducerId = 0;
079    private long nextConsumerId = 0;
080
081    /**
082     * Create new AmqpSession instance whose parent is the given AmqpConnection.
083     *
084     * @param connection
085     *        the parent connection for this session.
086     * @param sessionId
087     *        the ActiveMQ SessionId that is used to identify this session.
088     * @param session
089     *        the AMQP Session that this class manages.
090     */
091    public AmqpSession(AmqpConnection connection, SessionId sessionId, Session session) {
092        this.connection = connection;
093        this.sessionId = sessionId;
094        this.protonSession = session;
095    }
096
097    @Override
098    public void open() {
099        LOG.debug("Session {} opened", getSessionId());
100
101        getEndpoint().setContext(this);
102        getEndpoint().setIncomingCapacity(Integer.MAX_VALUE);
103        getEndpoint().open();
104
105        connection.sendToActiveMQ(new SessionInfo(getSessionId()));
106    }
107
108    @Override
109    public void close() {
110        LOG.debug("Session {} closed", getSessionId());
111
112        getEndpoint().setContext(null);
113        getEndpoint().close();
114        getEndpoint().free();
115
116        connection.sendToActiveMQ(new RemoveInfo(getSessionId()));
117    }
118
119    /**
120     * Commits all pending work for all resources managed under this session.
121     *
122     * @throws Exception if an error occurs while attempting to commit work.
123     */
124    public void commit() throws Exception {
125        for (AmqpSender consumer : consumers.values()) {
126            consumer.commit();
127        }
128
129        enlisted = false;
130    }
131
132    /**
133     * Rolls back any pending work being down under this session.
134     *
135     * @throws Exception if an error occurs while attempting to roll back work.
136     */
137    public void rollback() throws Exception {
138        for (AmqpSender consumer : consumers.values()) {
139            consumer.rollback();
140        }
141
142        enlisted = false;
143    }
144
145    /**
146     * Used to direct all Session managed Senders to push any queued Messages
147     * out to the remote peer.
148     *
149     * @throws Exception if an error occurs while flushing the messages.
150     */
151    public void flushPendingMessages() throws Exception {
152        for (AmqpSender consumer : consumers.values()) {
153            consumer.pumpOutbound();
154        }
155    }
156
157    public void createCoordinator(final Receiver protonReceiver) throws Exception {
158        AmqpTransactionCoordinator txCoordinator = new AmqpTransactionCoordinator(this, protonReceiver);
159        txCoordinator.flow(connection.getConfiguredReceiverCredit());
160        txCoordinator.open();
161    }
162
163    public void createReceiver(final Receiver protonReceiver) throws Exception {
164        org.apache.qpid.proton.amqp.transport.Target remoteTarget = protonReceiver.getRemoteTarget();
165
166        ProducerInfo producerInfo = new ProducerInfo(getNextProducerId());
167        final AmqpReceiver receiver = new AmqpReceiver(this, protonReceiver, producerInfo);
168
169        LOG.debug("opening new receiver {} on link: {}", producerInfo.getProducerId(), protonReceiver.getName());
170
171        try {
172            Target target = (Target) remoteTarget;
173            ActiveMQDestination destination = null;
174            String targetNodeName = target.getAddress();
175
176            if (target.getDynamic()) {
177                destination = connection.createTemporaryDestination(protonReceiver, target.getCapabilities());
178                Target actualTarget = new Target();
179                actualTarget.setAddress(destination.getQualifiedName());
180                actualTarget.setDynamic(true);
181                protonReceiver.setTarget(actualTarget);
182                receiver.addCloseAction(new Runnable() {
183
184                    @Override
185                    public void run() {
186                        connection.deleteTemporaryDestination((ActiveMQTempDestination) receiver.getDestination());
187                    }
188                });
189            } else if (targetNodeName != null && !targetNodeName.isEmpty()) {
190                destination = createDestination(remoteTarget);
191                if (destination.isTemporary()) {
192                    String connectionId = ((ActiveMQTempDestination) destination).getConnectionId();
193                    if (connectionId == null) {
194                        throw new AmqpProtocolException(AmqpError.PRECONDITION_FAILED.toString(), "Not a broker created temp destination");
195                    }
196                }
197            }
198
199            receiver.setDestination(destination);
200            connection.sendToActiveMQ(producerInfo, new ResponseHandler() {
201                @Override
202                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
203                    if (response.isException()) {
204                        ErrorCondition error = null;
205                        Throwable exception = ((ExceptionResponse) response).getException();
206                        if (exception instanceof SecurityException) {
207                            error = new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage());
208                        } else {
209                            error = new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage());
210                        }
211
212                        receiver.close(error);
213                    } else {
214                        receiver.flow(connection.getConfiguredReceiverCredit());
215                        receiver.open();
216                    }
217                    pumpProtonToSocket();
218                }
219            });
220
221        } catch (AmqpProtocolException exception) {
222            receiver.close(new ErrorCondition(Symbol.getSymbol(exception.getSymbolicName()), exception.getMessage()));
223        }
224    }
225
226    @SuppressWarnings("unchecked")
227    public void createSender(final Sender protonSender) throws Exception {
228        org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) protonSender.getRemoteSource();
229
230        ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
231        final AmqpSender sender = new AmqpSender(this, protonSender, consumerInfo);
232
233        LOG.debug("opening new sender {} on link: {}", consumerInfo.getConsumerId(), protonSender.getName());
234
235        try {
236            final Map<Symbol, Object> supportedFilters = new HashMap<Symbol, Object>();
237            protonSender.setContext(sender);
238
239            boolean noLocal = false;
240            String selector = null;
241
242            if (source != null) {
243                Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
244                if (filter != null) {
245                    selector = filter.getValue().getDescribed().toString();
246                    // Validate the Selector.
247                    try {
248                        SelectorParser.parse(selector);
249                    } catch (InvalidSelectorException e) {
250                        sender.close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
251                        return;
252                    }
253
254                    supportedFilters.put(filter.getKey(), filter.getValue());
255                }
256
257                filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
258                if (filter != null) {
259                    noLocal = true;
260                    supportedFilters.put(filter.getKey(), filter.getValue());
261                }
262            }
263
264            ActiveMQDestination destination;
265            if (source == null) {
266                // Attempt to recover previous subscription
267                ConsumerInfo storedInfo = connection.lookupSubscription(protonSender.getName());
268
269                if (storedInfo != null) {
270                    destination = storedInfo.getDestination();
271
272                    source = new org.apache.qpid.proton.amqp.messaging.Source();
273                    source.setAddress(destination.getQualifiedName());
274                    source.setDurable(TerminusDurability.UNSETTLED_STATE);
275                    source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
276                    source.setDistributionMode(COPY);
277
278                    if (storedInfo.isNoLocal()) {
279                        supportedFilters.put(NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
280                    }
281
282                    if (storedInfo.getSelector() != null && !storedInfo.getSelector().trim().equals("")) {
283                        supportedFilters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(storedInfo.getSelector()));
284                    }
285                } else {
286                    sender.close(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription link: " + protonSender.getName()));
287                    return;
288                }
289            } else if (source.getDynamic()) {
290                // lets create a temp dest.
291                destination = connection.createTemporaryDestination(protonSender, source.getCapabilities());
292                source = new org.apache.qpid.proton.amqp.messaging.Source();
293                source.setAddress(destination.getQualifiedName());
294                source.setDynamic(true);
295                sender.addCloseAction(new Runnable() {
296
297                    @Override
298                    public void run() {
299                        connection.deleteTemporaryDestination((ActiveMQTempDestination) sender.getDestination());
300                    }
301                });
302            } else {
303                destination = createDestination(source);
304                if (destination.isTemporary()) {
305                    String connectionId = ((ActiveMQTempDestination) destination).getConnectionId();
306                    if (connectionId == null) {
307                        throw new AmqpProtocolException(AmqpError.INVALID_FIELD.toString(), "Not a broker created temp destination");
308                    }
309                }
310            }
311
312            source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
313            protonSender.setSource(source);
314
315            int senderCredit = protonSender.getRemoteCredit();
316
317            // Allows the options on the destination to configure the consumerInfo
318            if (destination.getOptions() != null) {
319                Map<String, Object> options = IntrospectionSupport.extractProperties(
320                    new HashMap<String, Object>(destination.getOptions()), "consumer.");
321                IntrospectionSupport.setProperties(consumerInfo, options);
322                if (options.size() > 0) {
323                    String msg = "There are " + options.size()
324                        + " consumer options that couldn't be set on the consumer."
325                        + " Check the options are spelled correctly."
326                        + " Unknown parameters=[" + options + "]."
327                        + " This consumer cannot be started.";
328                    LOG.warn(msg);
329                    throw new AmqpProtocolException(AmqpError.INVALID_FIELD.toString(), msg);
330                }
331            }
332
333            consumerInfo.setSelector(selector);
334            consumerInfo.setNoRangeAcks(true);
335            consumerInfo.setDestination(destination);
336            consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0);
337            consumerInfo.setDispatchAsync(true);
338            consumerInfo.setNoLocal(noLocal);
339
340            if (source.getDistributionMode() == COPY && destination.isQueue()) {
341                consumerInfo.setBrowser(true);
342            }
343
344            if ((TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) ||
345                 TerminusDurability.CONFIGURATION.equals(source.getDurable())) && destination.isTopic()) {
346                consumerInfo.setSubscriptionName(protonSender.getName());
347            }
348
349            connection.sendToActiveMQ(consumerInfo, new ResponseHandler() {
350                @Override
351                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
352                    if (response.isException()) {
353                        ErrorCondition error = null;
354                        Throwable exception = ((ExceptionResponse) response).getException();
355                        if (exception instanceof SecurityException) {
356                            error = new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage());
357                        } else if (exception instanceof InvalidSelectorException) {
358                            error = new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage());
359                        } else {
360                            error = new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage());
361                        }
362
363                        sender.close(error);
364                    } else {
365                        sender.open();
366                    }
367                    pumpProtonToSocket();
368                }
369            });
370
371        } catch (AmqpProtocolException e) {
372            sender.close(new ErrorCondition(Symbol.getSymbol(e.getSymbolicName()), e.getMessage()));
373        }
374    }
375
376    /**
377     * Send all pending work out to the remote peer.
378     */
379    public void pumpProtonToSocket() {
380        connection.pumpProtonToSocket();
381    }
382
383    public void registerSender(ConsumerId consumerId, AmqpSender sender) {
384        consumers.put(consumerId, sender);
385        connection.registerSender(consumerId, sender);
386    }
387
388    public void unregisterSender(ConsumerId consumerId) {
389        consumers.remove(consumerId);
390        connection.unregisterSender(consumerId);
391    }
392
393    public void enlist(TransactionId txId) {
394        if (!enlisted) {
395            connection.getTxCoordinator(txId).enlist(this);
396            enlisted = true;
397        }
398    }
399
400    //----- Configuration accessors ------------------------------------------//
401
402    public AmqpConnection getConnection() {
403        return connection;
404    }
405
406    public SessionId getSessionId() {
407        return sessionId;
408    }
409
410    public Session getEndpoint() {
411        return protonSession;
412    }
413
414    public long getMaxFrameSize() {
415        return connection.getMaxFrameSize();
416    }
417
418    //----- Internal Implementation ------------------------------------------//
419
420    private ConsumerId getNextConsumerId() {
421        return new ConsumerId(sessionId, nextConsumerId++);
422    }
423
424    private ProducerId getNextProducerId() {
425        return new ProducerId(sessionId, nextProducerId++);
426    }
427}