001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.util.Enumeration;
020import java.util.concurrent.atomic.AtomicBoolean;
021
022import javax.jms.IllegalStateException;
023import javax.jms.JMSException;
024import javax.jms.Message;
025import javax.jms.Queue;
026import javax.jms.QueueBrowser;
027
028import org.apache.activemq.command.ActiveMQDestination;
029import org.apache.activemq.command.ConsumerId;
030import org.apache.activemq.command.MessageDispatch;
031
032/**
033 * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a
034 * queue without removing them. <p/>
035 * <P>
036 * The <CODE>getEnumeration</CODE> method returns a <CODE>
037 * java.util.Enumeration</CODE>
038 * that is used to scan the queue's messages. It may be an enumeration of the
039 * entire content of a queue, or it may contain only the messages matching a
040 * message selector. <p/>
041 * <P>
042 * Messages may be arriving and expiring while the scan is done. The JMS API
043 * does not require the content of an enumeration to be a static snapshot of
044 * queue content. Whether these changes are visible or not depends on the JMS
045 * provider. <p/>
046 * <P>
047 * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session
048 * </CODE>
049 * or a <CODE>QueueSession</CODE>.
050 *
051 * @see javax.jms.Session#createBrowser
052 * @see javax.jms.QueueSession#createBrowser
053 * @see javax.jms.QueueBrowser
054 * @see javax.jms.QueueReceiver
055 */
056
057public class ActiveMQQueueBrowser implements QueueBrowser, Enumeration {
058
059    private final ActiveMQSession session;
060    private final ActiveMQDestination destination;
061    private final String selector;
062
063    private ActiveMQMessageConsumer consumer;
064    private boolean closed;
065    private final ConsumerId consumerId;
066    private final AtomicBoolean browseDone = new AtomicBoolean(true);
067    private final boolean dispatchAsync;
068    private Object semaphore = new Object();
069
070    /**
071     * Constructor for an ActiveMQQueueBrowser - used internally
072     *
073     * @param theSession
074     * @param dest
075     * @param selector
076     * @throws JMSException
077     */
078    protected ActiveMQQueueBrowser(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination destination, String selector, boolean dispatchAsync) throws JMSException {
079        this.session = session;
080        this.consumerId = consumerId;
081        this.destination = destination;
082        this.selector = selector;
083        this.dispatchAsync = dispatchAsync;
084        this.consumer = createConsumer();
085    }
086
087    /**
088     * @param session
089     * @param originalDestination
090     * @param selectorExpression
091     * @param cnum
092     * @return
093     * @throws JMSException
094     */
095    private ActiveMQMessageConsumer createConsumer() throws JMSException {
096        browseDone.set(false);
097        ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy();
098
099        return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, prefetchPolicy.getQueueBrowserPrefetch(), prefetchPolicy
100            .getMaximumPendingMessageLimit(), false, true, dispatchAsync, null) {
101            public void dispatch(MessageDispatch md) {
102                if (md.getMessage() == null) {
103                    browseDone.set(true);
104                } else {
105                    super.dispatch(md);
106                }
107                notifyMessageAvailable();
108            }
109        };
110    }
111
112    private void destroyConsumer() {
113        if (consumer == null) {
114            return;
115        }
116        try {
117            if (session.getTransacted() && session.getTransactionContext().isInLocalTransaction()) {
118                session.commit();
119            }
120            consumer.close();
121            consumer = null;
122        } catch (JMSException e) {
123            e.printStackTrace();
124        }
125    }
126
127    /**
128     * Gets an enumeration for browsing the current queue messages in the order
129     * they would be received.
130     *
131     * @return an enumeration for browsing the messages
132     * @throws JMSException if the JMS provider fails to get the enumeration for
133     *                 this browser due to some internal error.
134     */
135
136    public Enumeration getEnumeration() throws JMSException {
137        checkClosed();
138        if (consumer == null) {
139            consumer = createConsumer();
140        }
141        return this;
142    }
143
144    private void checkClosed() throws IllegalStateException {
145        if (closed) {
146            throw new IllegalStateException("The Consumer is closed");
147        }
148    }
149
150    /**
151     * @return true if more messages to process
152     */
153    public boolean hasMoreElements() {
154        while (true) {
155
156            synchronized (this) {
157                if (consumer == null) {
158                    return false;
159                }
160            }
161
162            if (consumer.getMessageSize() > 0) {
163                return true;
164            }
165
166            if (browseDone.get() || !session.isRunning()) {
167                destroyConsumer();
168                return false;
169            }
170
171            waitForMessage();
172        }
173    }
174
175    /**
176     * @return the next message
177     */
178    public Object nextElement() {
179        while (true) {
180
181            synchronized (this) {
182                if (consumer == null) {
183                    return null;
184                }
185            }
186
187            try {
188                Message answer = consumer.receiveNoWait();
189                if (answer != null) {
190                    return answer;
191                }
192            } catch (JMSException e) {
193                this.session.connection.onClientInternalException(e);
194                return null;
195            }
196
197            if (browseDone.get() || !session.isRunning()) {
198                destroyConsumer();
199                return null;
200            }
201
202            waitForMessage();
203        }
204    }
205
206    public synchronized void close() throws JMSException {
207        destroyConsumer();
208        closed = true;
209    }
210
211    /**
212     * Gets the queue associated with this queue browser.
213     *
214     * @return the queue
215     * @throws JMSException if the JMS provider fails to get the queue
216     *                 associated with this browser due to some internal error.
217     */
218
219    public Queue getQueue() throws JMSException {
220        return (Queue)destination;
221    }
222
223    public String getMessageSelector() throws JMSException {
224        return selector;
225    }
226
227    // Implementation methods
228    // -------------------------------------------------------------------------
229
230    /**
231     * Wait on a semaphore for a fixed amount of time for a message to come in.
232     * @throws JMSException
233     */
234    protected void waitForMessage() {
235        try {
236            consumer.sendPullCommand(-1);
237            synchronized (semaphore) {
238                semaphore.wait(2000);
239            }
240        } catch (InterruptedException e) {
241            Thread.currentThread().interrupt();
242        } catch (JMSException e) {
243        }
244
245    }
246
247    protected void notifyMessageAvailable() {
248        synchronized (semaphore) {
249            semaphore.notifyAll();
250        }
251    }
252
253    public String toString() {
254        return "ActiveMQQueueBrowser { value=" + consumerId + " }";
255    }
256
257}