001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.console.filter;
018
019import java.net.URI;
020import java.util.Collections;
021import java.util.Iterator;
022import java.util.List;
023
024import javax.jms.Connection;
025import javax.jms.ConnectionFactory;
026import javax.jms.Destination;
027import javax.jms.JMSException;
028import javax.jms.QueueBrowser;
029import javax.jms.Session;
030
031import org.apache.activemq.ActiveMQConnectionFactory;
032import org.apache.activemq.command.ActiveMQQueue;
033import org.apache.activemq.command.ActiveMQTopic;
034
035public class AmqMessagesQueryFilter extends AbstractQueryFilter {
036
037    private URI brokerUrl;
038    private Destination destination;
039
040    private ConnectionFactory connectionFactory;
041
042    /**
043     * Create a JMS message query filter
044     *
045     * @param brokerUrl   - broker url to connect to
046     * @param destination - JMS destination to query
047     */
048    public AmqMessagesQueryFilter(URI brokerUrl, Destination destination) {
049        super(null);
050        this.brokerUrl = brokerUrl;
051        this.destination = destination;
052    }
053
054    /**
055     * Create a JMS message query filter
056     *
057     * @param brokerUrl   - broker url to connect to
058     * @param destination - JMS destination to query
059     */
060    public AmqMessagesQueryFilter(ConnectionFactory connectionFactory, Destination destination) {
061        super(null);
062        this.destination = destination;
063        this.connectionFactory = connectionFactory;
064    }
065
066    /**
067     * Queries the specified destination using the message selector format query
068     *
069     * @param queries - message selector queries
070     * @return list messages that matches the selector
071     * @throws Exception
072     */
073    public List query(List queries) throws Exception {
074        String selector = "";
075
076        // Convert to message selector
077        for (Iterator i = queries.iterator(); i.hasNext(); ) {
078            selector = selector + "(" + i.next().toString() + ") AND ";
079        }
080
081        // Remove last AND
082        if (!selector.equals("")) {
083            selector = selector.substring(0, selector.length() - 5);
084        }
085
086        if (destination instanceof ActiveMQQueue) {
087            return queryMessages((ActiveMQQueue) destination, selector);
088        } else {
089            return queryMessages((ActiveMQTopic) destination, selector);
090        }
091    }
092
093    /**
094     * Query the messages of a queue destination using a queue browser
095     *
096     * @param queue    - queue destination
097     * @param selector - message selector
098     * @return list of messages that matches the selector
099     * @throws Exception
100     */
101    protected List queryMessages(ActiveMQQueue queue, String selector) throws Exception {
102        Connection conn = createConnection();
103
104        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
105        QueueBrowser browser = sess.createBrowser(queue, selector);
106
107        List messages = Collections.list(browser.getEnumeration());
108
109        conn.close();
110
111        return messages;
112    }
113
114    /**
115     * Query the messages of a topic destination using a message consumer
116     *
117     * @param topic    - topic destination
118     * @param selector - message selector
119     * @return list of messages that matches the selector
120     * @throws Exception
121     */
122    protected List queryMessages(ActiveMQTopic topic, String selector) throws Exception {
123        // TODO: should we use a durable subscriber or a retroactive non-durable
124        // subscriber?
125        // TODO: if a durable subscriber is used, how do we manage it?
126        // subscribe/unsubscribe tasks?
127        return null;
128    }
129
130    /**
131     * Create and start a JMS connection
132     *
133     * @param brokerUrl - broker url to connect to.
134     * @return JMS connection
135     * @throws JMSException
136     * @deprecated Use createConnection() instead, and pass the url to the ConnectionFactory when it's created.
137     */
138    @Deprecated
139    protected Connection createConnection(URI brokerUrl) throws JMSException {
140        // maintain old behaviour, when called this way.
141        connectionFactory = (new ActiveMQConnectionFactory(brokerUrl));
142        return createConnection();
143    }
144
145    /**
146     * Create and start a JMS connection
147     *
148     * @return JMS connection
149     * @throws JMSException
150     */
151    protected Connection createConnection() throws JMSException {
152        // maintain old behaviour, when called either way.
153        if (null == connectionFactory)
154            connectionFactory = (new ActiveMQConnectionFactory(getBrokerUrl()));
155
156        Connection conn = connectionFactory.createConnection();
157        conn.start();
158        return conn;
159    }
160
161
162    /**
163     * Get the broker url being used.
164     *
165     * @return broker url
166     */
167    public URI getBrokerUrl() {
168        return brokerUrl;
169    }
170
171    /**
172     * Set the broker url to use.
173     *
174     * @param brokerUrl - broker url
175     */
176    public void setBrokerUrl(URI brokerUrl) {
177        this.brokerUrl = brokerUrl;
178    }
179
180    /**
181     * Get the destination being used.
182     *
183     * @return - JMS destination
184     */
185    public Destination getDestination() {
186        return destination;
187    }
188
189    /**
190     * Set the destination to use.
191     *
192     * @param destination - JMS destination
193     */
194    public void setDestination(Destination destination) {
195        this.destination = destination;
196    }
197
198}