001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport; 018 019import java.io.IOException; 020import java.util.concurrent.locks.ReentrantLock; 021 022/** 023 * Thread safe Transport Filter that serializes calls to and from the Transport Stack. 024 */ 025public class MutexTransport extends TransportFilter { 026 027 private final ReentrantLock writeLock = new ReentrantLock(); 028 private boolean syncOnCommand; 029 030 public MutexTransport(Transport next) { 031 super(next); 032 this.syncOnCommand = false; 033 } 034 035 public MutexTransport(Transport next, boolean syncOnCommand) { 036 super(next); 037 this.syncOnCommand = syncOnCommand; 038 } 039 040 @Override 041 public void onCommand(Object command) { 042 if (syncOnCommand) { 043 writeLock.lock(); 044 try { 045 transportListener.onCommand(command); 046 } finally { 047 writeLock.unlock(); 048 } 049 } else { 050 transportListener.onCommand(command); 051 } 052 } 053 054 @Override 055 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 056 writeLock.lock(); 057 try { 058 return next.asyncRequest(command, null); 059 } finally { 060 writeLock.unlock(); 061 } 062 } 063 064 @Override 065 public void oneway(Object command) throws IOException { 066 writeLock.lock(); 067 try { 068 next.oneway(command); 069 } finally { 070 writeLock.unlock(); 071 } 072 } 073 074 @Override 075 public Object request(Object command) throws IOException { 076 writeLock.lock(); 077 try { 078 return next.request(command); 079 } finally { 080 writeLock.unlock(); 081 } 082 } 083 084 @Override 085 public Object request(Object command, int timeout) throws IOException { 086 writeLock.lock(); 087 try { 088 return next.request(command, timeout); 089 } finally { 090 writeLock.unlock(); 091 } 092 } 093 094 @Override 095 public String toString() { 096 return next.toString(); 097 } 098 099 public boolean isSyncOnCommand() { 100 return syncOnCommand; 101 } 102 103 public void setSyncOnCommand(boolean syncOnCommand) { 104 this.syncOnCommand = syncOnCommand; 105 } 106}