001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.Serializable;
020import org.slf4j.Logger;
021import org.slf4j.LoggerFactory;
022
023/**
024 * Defines the prefetch message policies for different types of consumers
025 *
026 * @org.apache.xbean.XBean element="prefetchPolicy"
027 *
028 */
029@SuppressWarnings("serial")
030public class ActiveMQPrefetchPolicy extends Object implements Serializable {
031    public static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE;
032    public static final int DEFAULT_QUEUE_PREFETCH = 1000;
033    public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = 500;
034    public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = 100;
035    public static final int DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH=1000;
036    public static final int DEFAULT_INPUT_STREAM_PREFETCH=100;
037    public static final int DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE;
038
039    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQPrefetchPolicy.class);
040
041    private int queuePrefetch;
042    private int queueBrowserPrefetch;
043    private int topicPrefetch;
044    private int durableTopicPrefetch;
045    private int optimizeDurableTopicPrefetch;
046    private int inputStreamPrefetch;
047    private int maximumPendingMessageLimit;
048
049    /**
050     * Initialize default prefetch policies
051     */
052    public ActiveMQPrefetchPolicy() {
053        this.queuePrefetch = DEFAULT_QUEUE_PREFETCH;
054        this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH;
055        this.topicPrefetch = DEFAULT_TOPIC_PREFETCH;
056        this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH;
057        this.optimizeDurableTopicPrefetch = DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH;
058        this.inputStreamPrefetch = DEFAULT_INPUT_STREAM_PREFETCH;
059    }
060
061    /**
062     * @return Returns the durableTopicPrefetch.
063     */
064    public int getDurableTopicPrefetch() {
065        return durableTopicPrefetch;
066    }
067
068    /**
069     * @param durableTopicPrefetch The durableTopicPrefetch to set.
070     */
071    public void setDurableTopicPrefetch(int durableTopicPrefetch) {
072        this.durableTopicPrefetch = getMaxPrefetchLimit(durableTopicPrefetch);
073    }
074
075    /**
076     * @return Returns the queuePrefetch.
077     */
078    public int getQueuePrefetch() {
079        return queuePrefetch;
080    }
081
082    /**
083     * @param queuePrefetch The queuePrefetch to set.
084     */
085    public void setQueuePrefetch(int queuePrefetch) {
086        this.queuePrefetch = getMaxPrefetchLimit(queuePrefetch);
087    }
088
089    /**
090     * @return Returns the queueBrowserPrefetch.
091     */
092    public int getQueueBrowserPrefetch() {
093        return queueBrowserPrefetch;
094    }
095
096    /**
097     * @param queueBrowserPrefetch The queueBrowserPrefetch to set.
098     */
099    public void setQueueBrowserPrefetch(int queueBrowserPrefetch) {
100        this.queueBrowserPrefetch = getMaxPrefetchLimit(queueBrowserPrefetch);
101    }
102
103    /**
104     * @return Returns the topicPrefetch.
105     */
106    public int getTopicPrefetch() {
107        return topicPrefetch;
108    }
109
110    /**
111     * @param topicPrefetch The topicPrefetch to set.
112     */
113    public void setTopicPrefetch(int topicPrefetch) {
114        this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch);
115    }
116
117    /**
118     * @return Returns the optimizeDurableTopicPrefetch.
119     */
120    public int getOptimizeDurableTopicPrefetch() {
121        return optimizeDurableTopicPrefetch;
122    }
123
124    /**
125     * @param optimizeAcknowledgePrefetch The optimizeDurableTopicPrefetch to
126     *                set.
127     */
128    public void setOptimizeDurableTopicPrefetch(int optimizeAcknowledgePrefetch) {
129        this.optimizeDurableTopicPrefetch = optimizeAcknowledgePrefetch;
130    }
131
132    public int getMaximumPendingMessageLimit() {
133        return maximumPendingMessageLimit;
134    }
135
136    /**
137     * Sets how many messages a broker will keep around, above the prefetch
138     * limit, for non-durable topics before starting to discard older messages.
139     */
140    public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
141        this.maximumPendingMessageLimit = maximumPendingMessageLimit;
142    }
143
144    private int getMaxPrefetchLimit(int value) {
145        int result = Math.min(value, MAX_PREFETCH_SIZE);
146        if (result < value) {
147            LOG.warn("maximum prefetch limit has been reset from " + value + " to " + MAX_PREFETCH_SIZE);
148        }
149        return result;
150    }
151
152    public void setAll(int i) {
153        this.durableTopicPrefetch = i;
154        this.queueBrowserPrefetch = i;
155        this.queuePrefetch = i;
156        this.topicPrefetch = i;
157        this.inputStreamPrefetch = 1;
158        this.optimizeDurableTopicPrefetch = i;
159    }
160
161    public int getInputStreamPrefetch() {
162        return inputStreamPrefetch;
163    }
164
165    public void setInputStreamPrefetch(int inputStreamPrefetch) {
166        this.inputStreamPrefetch = getMaxPrefetchLimit(inputStreamPrefetch);
167    }
168
169    public boolean equals(Object object){
170        if (object instanceof ActiveMQPrefetchPolicy){
171            ActiveMQPrefetchPolicy other = (ActiveMQPrefetchPolicy) object;
172            return this.queuePrefetch == other.queuePrefetch &&
173            this.queueBrowserPrefetch == other.queueBrowserPrefetch &&
174            this.topicPrefetch == other.topicPrefetch &&
175            this.durableTopicPrefetch == other.durableTopicPrefetch &&
176            this.optimizeDurableTopicPrefetch == other.optimizeDurableTopicPrefetch &&
177            this.inputStreamPrefetch == other.inputStreamPrefetch;
178        }
179        return false;
180    }
181
182}