while (continueRouting(processors, nextExchange)) { if (first) { first = false; } else { // prepare for next run nextExchange = createNextExchange(nextExchange); }
// get the next processor Processorprocessor= processors.next();
// continue as long its being processed synchronously if (!sync) { LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); // the remainder of the pipeline will be completed async // so we break out now, then the callback will be invoked which then continue routing from where we left here returnfalse; }
LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
// check for error if so we should break out if (!continueProcessing(nextExchange, "so breaking out of pipeline", LOG)) { break; } }
// logging nextExchange as it contains the exchange that might have altered the payload and since // we are logging the completion if will be confusing if we log the original instead // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), nextExchange);
// copy results back to the original exchange ExchangeHelper.copyResults(exchange, nextExchange);
// now lets set the input of the next exchange to the output of the // previous message if it is not null if (answer.hasOut()) { answer.setIn(answer.getOut()); answer.setOut(null); } return answer; }