001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.util;
018
019import org.apache.activemq.broker.Broker;
020import org.apache.activemq.broker.ConnectionContext;
021import org.apache.activemq.broker.ProducerBrokerExchange;
022import org.apache.activemq.command.ActiveMQDestination;
023import org.apache.activemq.command.Message;
024import org.apache.activemq.command.ProducerInfo;
025import org.apache.activemq.security.SecurityContext;
026import org.apache.activemq.state.ProducerState;
027
028/**
029 * Utility class for broker operations
030 *
031 */
032public final class BrokerSupport {
033
034    private BrokerSupport() {        
035    }
036    
037    public static void resendNoCopy(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination) throws Exception {
038        doResend(context, originalMessage, deadLetterDestination, false);
039    }
040    
041    /**
042     * @param context
043     * @param originalMessage 
044     * @param deadLetterDestination
045     * @throws Exception
046     */
047    public static void resend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination) throws Exception {
048        doResend(context, originalMessage, deadLetterDestination, true);
049    }
050    
051    public static void doResend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination, boolean copy) throws Exception {       
052        Message message = copy ? originalMessage.copy() : originalMessage;
053        message.setOriginalDestination(message.getDestination());
054        message.setOriginalTransactionId(message.getTransactionId());
055        message.setDestination(deadLetterDestination);
056        message.setTransactionId(null);
057        message.setMemoryUsage(null);
058        message.setRedeliveryCounter(0);
059        boolean originalFlowControl = context.isProducerFlowControl();
060        try {
061            context.setProducerFlowControl(false);
062            ProducerInfo info = new ProducerInfo();
063            ProducerState state = new ProducerState(info);
064            ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
065            producerExchange.setProducerState(state);
066            producerExchange.setMutable(true);
067            producerExchange.setConnectionContext(context);
068            context.getBroker().send(producerExchange, message);
069        } finally {
070            context.setProducerFlowControl(originalFlowControl);
071        }
072    }
073
074    /**
075     * Returns the broker's administration connection context used for
076     * configuring the broker at startup
077     */
078    public static ConnectionContext getConnectionContext(Broker broker) {
079        ConnectionContext adminConnectionContext = broker.getAdminConnectionContext();
080        if (adminConnectionContext == null) {
081            adminConnectionContext = createAdminConnectionContext(broker);
082            broker.setAdminConnectionContext(adminConnectionContext);
083        }
084        return adminConnectionContext;
085    }
086
087    /**
088     * Factory method to create the new administration connection context
089     * object. Note this method is here rather than inside a default broker
090     * implementation to ensure that the broker reference inside it is the outer
091     * most interceptor
092     */
093    protected static ConnectionContext createAdminConnectionContext(Broker broker) {
094        ConnectionContext context = new ConnectionContext();
095        context.setBroker(broker);
096        context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
097        return context;
098    }
099}