001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport;
018
019import java.io.IOException;
020import java.util.concurrent.locks.ReentrantLock;
021
022/**
023 * Thread safe Transport Filter that serializes calls to and from the Transport Stack.
024 */
025public class MutexTransport extends TransportFilter {
026
027    private final ReentrantLock writeLock = new ReentrantLock();
028    private boolean syncOnCommand;
029
030    public MutexTransport(Transport next) {
031        super(next);
032        this.syncOnCommand = false;
033    }
034
035    public MutexTransport(Transport next, boolean syncOnCommand) {
036        super(next);
037        this.syncOnCommand = syncOnCommand;
038    }
039
040    @Override
041    public void onCommand(Object command) {
042        if (syncOnCommand) {
043            writeLock.lock();
044            try {
045                transportListener.onCommand(command);
046            } finally {
047                writeLock.unlock();
048            }
049        } else {
050            transportListener.onCommand(command);
051        }
052    }
053
054    @Override
055    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
056        writeLock.lock();
057        try {
058            return next.asyncRequest(command, null);
059        } finally {
060            writeLock.unlock();
061        }
062    }
063
064    @Override
065    public void oneway(Object command) throws IOException {
066        writeLock.lock();
067        try {
068            next.oneway(command);
069        } finally {
070            writeLock.unlock();
071        }
072    }
073
074    @Override
075    public Object request(Object command) throws IOException {
076        writeLock.lock();
077        try {
078            return next.request(command);
079        } finally {
080            writeLock.unlock();
081        }
082    }
083
084    @Override
085    public Object request(Object command, int timeout) throws IOException {
086        writeLock.lock();
087        try {
088            return next.request(command, timeout);
089        } finally {
090            writeLock.unlock();
091        }
092    }
093
094    @Override
095    public String toString() {
096        return next.toString();
097    }
098
099    public boolean isSyncOnCommand() {
100        return syncOnCommand;
101    }
102
103    public void setSyncOnCommand(boolean syncOnCommand) {
104        this.syncOnCommand = syncOnCommand;
105    }
106}