001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.util; 018 019 import java.io.ByteArrayOutputStream; 020 import java.io.Closeable; 021 import java.io.IOException; 022 import java.io.InputStream; 023 import java.util.Iterator; 024 import java.util.Scanner; 025 026 import org.apache.camel.CamelContext; 027 import org.apache.camel.NoTypeConversionAvailableException; 028 029 /** 030 * Group based {@link Iterator} which groups the given {@link Iterator} a number of times 031 * and then return a combined response as a String. 032 * <p/> 033 * This implementation uses as internal byte array buffer, to combine the response. 034 * The token is inserted between the individual parts. 035 * <p/> 036 * For example if you group by new line, then a new line token is inserted between the lines. 037 */ 038 public final class GroupIterator implements Iterator<Object>, Closeable { 039 040 private final CamelContext camelContext; 041 private final Iterator<?> it; 042 private final String token; 043 private final int group; 044 private boolean closed; 045 private final ByteArrayOutputStream bos = new ByteArrayOutputStream(); 046 047 /** 048 * Creates a new group iterator 049 * 050 * @param camelContext the camel context 051 * @param it the iterator to group 052 * @param token then token used to separate between the parts, use <tt>null</tt> to not add the token 053 * @param group number of parts to group together 054 * @throws IllegalArgumentException is thrown if group is not a positive number 055 */ 056 public GroupIterator(CamelContext camelContext, Iterator<?> it, String token, int group) { 057 this.camelContext = camelContext; 058 this.it = it; 059 this.token = token; 060 this.group = group; 061 if (group <= 0) { 062 throw new IllegalArgumentException("Group must be a positive number, was: " + group); 063 } 064 } 065 066 @Override 067 public void close() throws IOException { 068 if (it instanceof Closeable) { 069 IOHelper.close((Closeable) it); 070 } else if (it instanceof Scanner) { 071 // special for Scanner as it does not implement Closeable 072 ((Scanner) it).close(); 073 } 074 // close the buffer as well 075 bos.close(); 076 // we are now closed 077 closed = true; 078 } 079 080 @Override 081 public boolean hasNext() { 082 if (closed) { 083 return false; 084 } 085 086 boolean answer = it.hasNext(); 087 if (!answer) { 088 // auto close 089 try { 090 close(); 091 } catch (IOException e) { 092 // ignore 093 } 094 } 095 return answer; 096 } 097 098 @Override 099 public Object next() { 100 try { 101 return doNext(); 102 } catch (Exception e) { 103 throw ObjectHelper.wrapRuntimeCamelException(e); 104 } 105 } 106 107 private Object doNext() throws IOException, NoTypeConversionAvailableException { 108 int count = 0; 109 Object data = ""; 110 while (count < group && it.hasNext()) { 111 data = it.next(); 112 113 // include token in between 114 if (data != null && count > 0 && token != null) { 115 bos.write(token.getBytes()); 116 } 117 if (data instanceof InputStream) { 118 InputStream is = (InputStream) data; 119 IOHelper.copy(is, bos); 120 } else if (data instanceof byte[]) { 121 byte[] bytes = (byte[]) data; 122 bos.write(bytes); 123 } else if (data != null) { 124 // convert to input stream 125 InputStream is = camelContext.getTypeConverter().mandatoryConvertTo(InputStream.class, data); 126 IOHelper.copy(is, bos); 127 } 128 129 count++; 130 } 131 132 // prepare and return answer as String 133 String answer = bos.toString(); 134 bos.reset(); 135 return answer; 136 } 137 138 @Override 139 public void remove() { 140 it.remove(); 141 } 142 }