001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.util; 018 019 import java.util.concurrent.CountDownLatch; 020 import org.apache.camel.AsyncCallback; 021 import org.apache.camel.AsyncProcessor; 022 import org.apache.camel.Exchange; 023 import org.apache.camel.spi.UnitOfWork; 024 import org.slf4j.Logger; 025 import org.slf4j.LoggerFactory; 026 027 /** 028 * Helper methods for {@link AsyncProcessor} objects. 029 */ 030 public final class AsyncProcessorHelper { 031 032 private static final transient Logger LOG = LoggerFactory.getLogger(AsyncProcessorHelper.class); 033 034 private AsyncProcessorHelper() { 035 // utility class 036 } 037 038 /** 039 * Calls the async version of the processor's process method. 040 * <p/> 041 * This implementation supports transacted {@link Exchange}s which ensure those are run in a synchronous fashion. 042 * See more details at {@link org.apache.camel.AsyncProcessor}. 043 * 044 * @param processor the processor 045 * @param exchange the exchange 046 * @param callback the callback 047 * @return <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously 048 */ 049 public static boolean process(final AsyncProcessor processor, final Exchange exchange, final AsyncCallback callback) { 050 boolean sync; 051 052 if (exchange.isTransacted()) { 053 // must be synchronized for transacted exchanges 054 LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); 055 try { 056 process(processor, exchange); 057 } catch (Throwable e) { 058 exchange.setException(e); 059 } 060 callback.done(true); 061 sync = true; 062 } else { 063 final UnitOfWork uow = exchange.getUnitOfWork(); 064 065 // allow unit of work to wrap callback in case it need to do some special work 066 // for example the MDCUnitOfWork 067 AsyncCallback async = callback; 068 if (uow != null) { 069 async = uow.beforeProcess(processor, exchange, callback); 070 } 071 072 // we support asynchronous routing so invoke it 073 sync = processor.process(exchange, async); 074 075 // execute any after processor work (in current thread, not in the callback) 076 if (uow != null) { 077 uow.afterProcess(processor, exchange, callback, sync); 078 } 079 } 080 081 if (LOG.isTraceEnabled()) { 082 LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}", 083 new Object[]{sync ? "synchronously" : "asynchronously", exchange.getExchangeId(), exchange}); 084 } 085 return sync; 086 } 087 088 /** 089 * Calls the async version of the processor's process method and waits 090 * for it to complete before returning. This can be used by {@link AsyncProcessor} 091 * objects to implement their sync version of the process method. 092 * 093 * @param processor the processor 094 * @param exchange the exchange 095 * @throws Exception can be thrown if waiting is interrupted 096 */ 097 public static void process(final AsyncProcessor processor, final Exchange exchange) throws Exception { 098 final CountDownLatch latch = new CountDownLatch(1); 099 boolean sync = processor.process(exchange, new AsyncCallback() { 100 public void done(boolean doneSync) { 101 if (!doneSync) { 102 LOG.trace("Asynchronous callback received for exchangeId: {}", exchange.getExchangeId()); 103 latch.countDown(); 104 } 105 } 106 107 @Override 108 public String toString() { 109 return "Done " + processor; 110 } 111 }); 112 if (!sync) { 113 LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}", 114 exchange.getExchangeId(), exchange); 115 latch.await(); 116 LOG.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}", 117 exchange.getExchangeId(), exchange); 118 } 119 } 120 121 }