001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.protocol;
018
019import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
020import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
021import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME;
022import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
023import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME;
024import static org.apache.activemq.transport.amqp.AmqpSupport.createDestination;
025import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
026
027import java.io.IOException;
028import java.util.HashMap;
029import java.util.Map;
030
031import javax.jms.InvalidSelectorException;
032
033import org.apache.activemq.command.ActiveMQDestination;
034import org.apache.activemq.command.ActiveMQTempDestination;
035import org.apache.activemq.command.ConsumerId;
036import org.apache.activemq.command.ConsumerInfo;
037import org.apache.activemq.command.ExceptionResponse;
038import org.apache.activemq.command.ProducerId;
039import org.apache.activemq.command.ProducerInfo;
040import org.apache.activemq.command.RemoveInfo;
041import org.apache.activemq.command.Response;
042import org.apache.activemq.command.SessionId;
043import org.apache.activemq.command.SessionInfo;
044import org.apache.activemq.command.TransactionId;
045import org.apache.activemq.selector.SelectorParser;
046import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
047import org.apache.activemq.transport.amqp.AmqpProtocolException;
048import org.apache.activemq.transport.amqp.ResponseHandler;
049import org.apache.qpid.proton.amqp.DescribedType;
050import org.apache.qpid.proton.amqp.Symbol;
051import org.apache.qpid.proton.amqp.messaging.Target;
052import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
053import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
054import org.apache.qpid.proton.amqp.transport.AmqpError;
055import org.apache.qpid.proton.amqp.transport.ErrorCondition;
056import org.apache.qpid.proton.engine.Receiver;
057import org.apache.qpid.proton.engine.Sender;
058import org.apache.qpid.proton.engine.Session;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062/**
063 * Wraps the AMQP Session and provides the services needed to manage the remote
064 * peer requests for link establishment.
065 */
066public class AmqpSession implements AmqpResource {
067
068    private static final Logger LOG = LoggerFactory.getLogger(AmqpSession.class);
069
070    private final Map<ConsumerId, AmqpSender> consumers = new HashMap<ConsumerId, AmqpSender>();
071
072    private final AmqpConnection connection;
073    private final Session protonSession;
074    private final SessionId sessionId;
075
076    private boolean enlisted;
077    private long nextProducerId = 0;
078    private long nextConsumerId = 0;
079
080    /**
081     * Create new AmqpSession instance whose parent is the given AmqpConnection.
082     *
083     * @param connection
084     *        the parent connection for this session.
085     * @param sessionId
086     *        the ActiveMQ SessionId that is used to identify this session.
087     * @param session
088     *        the AMQP Session that this class manages.
089     */
090    public AmqpSession(AmqpConnection connection, SessionId sessionId, Session session) {
091        this.connection = connection;
092        this.sessionId = sessionId;
093        this.protonSession = session;
094    }
095
096    @Override
097    public void open() {
098        LOG.debug("Session {} opened", getSessionId());
099
100        getEndpoint().setContext(this);
101        getEndpoint().setIncomingCapacity(Integer.MAX_VALUE);
102        getEndpoint().open();
103
104        connection.sendToActiveMQ(new SessionInfo(getSessionId()));
105    }
106
107    @Override
108    public void close() {
109        LOG.debug("Session {} closed", getSessionId());
110
111        getEndpoint().setContext(null);
112        getEndpoint().close();
113        getEndpoint().free();
114
115        connection.sendToActiveMQ(new RemoveInfo(getSessionId()));
116    }
117
118    /**
119     * Commits all pending work for all resources managed under this session.
120     *
121     * @throws Exception if an error occurs while attempting to commit work.
122     */
123    public void commit() throws Exception {
124        for (AmqpSender consumer : consumers.values()) {
125            consumer.commit();
126        }
127
128        enlisted = false;
129    }
130
131    /**
132     * Rolls back any pending work being down under this session.
133     *
134     * @throws Exception if an error occurs while attempting to roll back work.
135     */
136    public void rollback() throws Exception {
137        for (AmqpSender consumer : consumers.values()) {
138            consumer.rollback();
139        }
140
141        enlisted = false;
142    }
143
144    /**
145     * Used to direct all Session managed Senders to push any queued Messages
146     * out to the remote peer.
147     *
148     * @throws Exception if an error occurs while flushing the messages.
149     */
150    public void flushPendingMessages() throws Exception {
151        for (AmqpSender consumer : consumers.values()) {
152            consumer.pumpOutbound();
153        }
154    }
155
156    public void createCoordinator(final Receiver protonReceiver) throws Exception {
157        AmqpTransactionCoordinator txCoordinator = new AmqpTransactionCoordinator(this, protonReceiver);
158        txCoordinator.flow(connection.getConfiguredReceiverCredit());
159        txCoordinator.open();
160    }
161
162    public void createReceiver(final Receiver protonReceiver) throws Exception {
163        org.apache.qpid.proton.amqp.transport.Target remoteTarget = protonReceiver.getRemoteTarget();
164
165        ProducerInfo producerInfo = new ProducerInfo(getNextProducerId());
166        final AmqpReceiver receiver = new AmqpReceiver(this, protonReceiver, producerInfo);
167
168        LOG.debug("opening new receiver {} on link: {}", producerInfo.getProducerId(), protonReceiver.getName());
169
170        try {
171            Target target = (Target) remoteTarget;
172            ActiveMQDestination destination = null;
173            String targetNodeName = target.getAddress();
174
175            if (target.getDynamic()) {
176                destination = connection.createTemporaryDestination(protonReceiver, target.getCapabilities());
177                Target actualTarget = new Target();
178                actualTarget.setAddress(destination.getQualifiedName());
179                actualTarget.setDynamic(true);
180                protonReceiver.setTarget(actualTarget);
181                receiver.addCloseAction(new Runnable() {
182
183                    @Override
184                    public void run() {
185                        connection.deleteTemporaryDestination((ActiveMQTempDestination) receiver.getDestination());
186                    }
187                });
188            } else if (targetNodeName != null && !targetNodeName.isEmpty()) {
189                destination = createDestination(remoteTarget);
190                if (destination.isTemporary()) {
191                    String connectionId = ((ActiveMQTempDestination) destination).getConnectionId();
192                    if (connectionId == null) {
193                        throw new AmqpProtocolException(AmqpError.PRECONDITION_FAILED.toString(), "Not a broker created temp destination");
194                    }
195                }
196            }
197
198            receiver.setDestination(destination);
199            connection.sendToActiveMQ(producerInfo, new ResponseHandler() {
200                @Override
201                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
202                    if (response.isException()) {
203                        ErrorCondition error = null;
204                        Throwable exception = ((ExceptionResponse) response).getException();
205                        if (exception instanceof SecurityException) {
206                            error = new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage());
207                        } else {
208                            error = new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage());
209                        }
210
211                        receiver.close(error);
212                    } else {
213                        receiver.flow(connection.getConfiguredReceiverCredit());
214                        receiver.open();
215                    }
216                    pumpProtonToSocket();
217                }
218            });
219
220        } catch (AmqpProtocolException exception) {
221            receiver.close(new ErrorCondition(Symbol.getSymbol(exception.getSymbolicName()), exception.getMessage()));
222        }
223    }
224
225    @SuppressWarnings("unchecked")
226    public void createSender(final Sender protonSender) throws Exception {
227        org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) protonSender.getRemoteSource();
228
229        ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
230        final AmqpSender sender = new AmqpSender(this, protonSender, consumerInfo);
231
232        LOG.debug("opening new sender {} on link: {}", consumerInfo.getConsumerId(), protonSender.getName());
233
234        try {
235            final Map<Symbol, Object> supportedFilters = new HashMap<Symbol, Object>();
236            protonSender.setContext(sender);
237
238            boolean noLocal = false;
239            String selector = null;
240
241            if (source != null) {
242                Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
243                if (filter != null) {
244                    selector = filter.getValue().getDescribed().toString();
245                    // Validate the Selector.
246                    try {
247                        SelectorParser.parse(selector);
248                    } catch (InvalidSelectorException e) {
249                        sender.close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
250                        return;
251                    }
252
253                    supportedFilters.put(filter.getKey(), filter.getValue());
254                }
255
256                filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
257                if (filter != null) {
258                    noLocal = true;
259                    supportedFilters.put(filter.getKey(), filter.getValue());
260                }
261            }
262
263            ActiveMQDestination destination;
264            if (source == null) {
265                // Attempt to recover previous subscription
266                ConsumerInfo storedInfo = connection.lookupSubscription(protonSender.getName());
267
268                if (storedInfo != null) {
269                    destination = storedInfo.getDestination();
270
271                    source = new org.apache.qpid.proton.amqp.messaging.Source();
272                    source.setAddress(destination.getQualifiedName());
273                    source.setDurable(TerminusDurability.UNSETTLED_STATE);
274                    source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
275                    source.setDistributionMode(COPY);
276
277                    if (storedInfo.isNoLocal()) {
278                        supportedFilters.put(NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
279                    }
280
281                    if (storedInfo.getSelector() != null && !storedInfo.getSelector().trim().equals("")) {
282                        supportedFilters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(storedInfo.getSelector()));
283                    }
284                } else {
285                    sender.close(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription link: " + protonSender.getName()));
286                    return;
287                }
288            } else if (source.getDynamic()) {
289                // lets create a temp dest.
290                destination = connection.createTemporaryDestination(protonSender, source.getCapabilities());
291                source = new org.apache.qpid.proton.amqp.messaging.Source();
292                source.setAddress(destination.getQualifiedName());
293                source.setDynamic(true);
294                sender.addCloseAction(new Runnable() {
295
296                    @Override
297                    public void run() {
298                        connection.deleteTemporaryDestination((ActiveMQTempDestination) sender.getDestination());
299                    }
300                });
301            } else {
302                destination = createDestination(source);
303                if (destination.isTemporary()) {
304                    String connectionId = ((ActiveMQTempDestination) destination).getConnectionId();
305                    if (connectionId == null) {
306                        throw new AmqpProtocolException(AmqpError.INVALID_FIELD.toString(), "Not a broker created temp destination");
307                    }
308                }
309            }
310
311            source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
312            protonSender.setSource(source);
313
314            int senderCredit = protonSender.getRemoteCredit();
315
316            consumerInfo.setSelector(selector);
317            consumerInfo.setNoRangeAcks(true);
318            consumerInfo.setDestination(destination);
319            consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0);
320            consumerInfo.setDispatchAsync(true);
321            consumerInfo.setNoLocal(noLocal);
322
323            if (source.getDistributionMode() == COPY && destination.isQueue()) {
324                consumerInfo.setBrowser(true);
325            }
326
327            if ((TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) ||
328                 TerminusDurability.CONFIGURATION.equals(source.getDurable())) && destination.isTopic()) {
329                consumerInfo.setSubscriptionName(protonSender.getName());
330            }
331
332            connection.sendToActiveMQ(consumerInfo, new ResponseHandler() {
333                @Override
334                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
335                    if (response.isException()) {
336                        ErrorCondition error = null;
337                        Throwable exception = ((ExceptionResponse) response).getException();
338                        if (exception instanceof SecurityException) {
339                            error = new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage());
340                        } else if (exception instanceof InvalidSelectorException) {
341                            error = new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage());
342                        } else {
343                            error = new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage());
344                        }
345
346                        sender.close(error);
347                    } else {
348                        sender.open();
349                    }
350                    pumpProtonToSocket();
351                }
352            });
353
354        } catch (AmqpProtocolException e) {
355            sender.close(new ErrorCondition(Symbol.getSymbol(e.getSymbolicName()), e.getMessage()));
356        }
357    }
358
359    /**
360     * Send all pending work out to the remote peer.
361     */
362    public void pumpProtonToSocket() {
363        connection.pumpProtonToSocket();
364    }
365
366    public void registerSender(ConsumerId consumerId, AmqpSender sender) {
367        consumers.put(consumerId, sender);
368        connection.registerSender(consumerId, sender);
369    }
370
371    public void unregisterSender(ConsumerId consumerId) {
372        consumers.remove(consumerId);
373        connection.unregisterSender(consumerId);
374    }
375
376    public void enlist(TransactionId txId) {
377        if (!enlisted) {
378            connection.getTxCoordinator(txId).enlist(this);
379            enlisted = true;
380        }
381    }
382
383    //----- Configuration accessors ------------------------------------------//
384
385    public AmqpConnection getConnection() {
386        return connection;
387    }
388
389    public SessionId getSessionId() {
390        return sessionId;
391    }
392
393    public Session getEndpoint() {
394        return protonSession;
395    }
396
397    public long getMaxFrameSize() {
398        return connection.getMaxFrameSize();
399    }
400
401    //----- Internal Implementation ------------------------------------------//
402
403    private ConsumerId getNextConsumerId() {
404        return new ConsumerId(sessionId, nextConsumerId++);
405    }
406
407    private ProducerId getNextProducerId() {
408        return new ProducerId(sessionId, nextProducerId++);
409    }
410}