001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.util;
018    
019    import java.io.ByteArrayOutputStream;
020    import java.io.Closeable;
021    import java.io.IOException;
022    import java.io.InputStream;
023    import java.util.Iterator;
024    import java.util.Scanner;
025    
026    import org.apache.camel.CamelContext;
027    import org.apache.camel.NoTypeConversionAvailableException;
028    
029    /**
030     * Group based {@link Iterator} which groups the given {@link Iterator} a number of times
031     * and then return a combined response as a String.
032     * <p/>
033     * This implementation uses as internal byte array buffer, to combine the response.
034     * The token is inserted between the individual parts.
035     * <p/>
036     * For example if you group by new line, then a new line token is inserted between the lines.
037     */
038    public final class GroupIterator implements Iterator<Object>, Closeable {
039    
040        private final CamelContext camelContext;
041        private final Iterator<?> it;
042        private final String token;
043        private final int group;
044        private boolean closed;
045        private final ByteArrayOutputStream bos = new ByteArrayOutputStream();
046    
047        /**
048         * Creates a new group iterator
049         *
050         * @param camelContext  the camel context
051         * @param it            the iterator to group
052         * @param token         then token used to separate between the parts, use <tt>null</tt> to not add the token
053         * @param group         number of parts to group together
054         * @throws IllegalArgumentException is thrown if group is not a positive number
055         */
056        public GroupIterator(CamelContext camelContext, Iterator<?> it, String token, int group) {
057            this.camelContext = camelContext;
058            this.it = it;
059            this.token = token;
060            this.group = group;
061            if (group <= 0) {
062                throw new IllegalArgumentException("Group must be a positive number, was: " + group);
063            }
064        }
065    
066        @Override
067        public void close() throws IOException {
068            if (it instanceof Closeable) {
069                IOHelper.close((Closeable) it);
070            } else if (it instanceof Scanner) {
071                // special for Scanner as it does not implement Closeable
072                ((Scanner) it).close();
073            }
074            // close the buffer as well
075            bos.close();
076            // we are now closed
077            closed = true;
078        }
079    
080        @Override
081        public boolean hasNext() {
082            if (closed) {
083                return false;
084            }
085    
086            boolean answer = it.hasNext();
087            if (!answer) {
088                // auto close
089                try {
090                    close();
091                } catch (IOException e) {
092                    // ignore
093                }
094            }
095            return answer;
096        }
097    
098        @Override
099        public Object next() {
100            try {
101                return doNext();
102            } catch (Exception e) {
103                throw ObjectHelper.wrapRuntimeCamelException(e);
104            }
105        }
106    
107        private Object doNext() throws IOException, NoTypeConversionAvailableException {
108            int count = 0;
109            Object data = "";
110            while (count < group && it.hasNext()) {
111                data = it.next();
112    
113                // include token in between
114                if (data != null && count > 0 && token != null) {
115                    bos.write(token.getBytes());
116                }
117                if (data instanceof InputStream) {
118                    InputStream is = (InputStream) data;
119                    IOHelper.copy(is, bos);
120                } else if (data instanceof byte[]) {
121                    byte[] bytes = (byte[]) data;
122                    bos.write(bytes);
123                } else if (data != null) {
124                    // convert to input stream
125                    InputStream is = camelContext.getTypeConverter().mandatoryConvertTo(InputStream.class, data);
126                    IOHelper.copy(is, bos);
127                }
128    
129                count++;
130            }
131    
132            // prepare and return answer as String
133            String answer = bos.toString();
134            bos.reset();
135            return answer;
136        }
137    
138        @Override
139        public void remove() {
140            it.remove();
141        }
142    }