001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport; 018 019import java.io.ByteArrayOutputStream; 020import java.io.DataInputStream; 021import java.io.IOException; 022 023import org.apache.activemq.command.Command; 024import org.apache.activemq.command.LastPartialCommand; 025import org.apache.activemq.command.PartialCommand; 026import org.apache.activemq.openwire.OpenWireFormat; 027import org.apache.activemq.util.ByteArrayInputStream; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031/** 032 * Joins together of partial commands which were split into individual chunks of 033 * data. 034 * 035 * 036 */ 037public class CommandJoiner extends TransportFilter { 038 private static final Logger LOG = LoggerFactory.getLogger(CommandJoiner.class); 039 040 private ByteArrayOutputStream out = new ByteArrayOutputStream(); 041 private OpenWireFormat wireFormat; 042 043 public CommandJoiner(Transport next, OpenWireFormat wireFormat) { 044 super(next); 045 this.wireFormat = wireFormat; 046 } 047 048 public void onCommand(Object o) { 049 Command command = (Command)o; 050 byte type = command.getDataStructureType(); 051 if (type == PartialCommand.DATA_STRUCTURE_TYPE || type == LastPartialCommand.DATA_STRUCTURE_TYPE) { 052 PartialCommand header = (PartialCommand)command; 053 byte[] partialData = header.getData(); 054 try { 055 out.write(partialData); 056 } catch (IOException e) { 057 getTransportListener().onException(e); 058 } 059 if (type == LastPartialCommand.DATA_STRUCTURE_TYPE) { 060 try { 061 byte[] fullData = out.toByteArray(); 062 out.reset(); 063 DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(fullData)); 064 Command completeCommand = (Command)wireFormat.unmarshal(dataIn); 065 066 LastPartialCommand lastCommand = (LastPartialCommand)command; 067 lastCommand.configure(completeCommand); 068 069 getTransportListener().onCommand(completeCommand); 070 } catch (IOException e) { 071 LOG.warn("Failed to unmarshal partial command: " + command); 072 getTransportListener().onException(e); 073 } 074 } 075 } else { 076 getTransportListener().onCommand(command); 077 } 078 } 079 080 public void stop() throws Exception { 081 super.stop(); 082 out = null; 083 } 084 085 public String toString() { 086 return next.toString(); 087 } 088}