001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.pool;
018
019import java.util.HashMap;
020import java.util.Iterator;
021import java.util.LinkedList;
022import java.util.Map;
023import java.util.concurrent.atomic.AtomicBoolean;
024import javax.jms.Connection;
025import javax.jms.ConnectionFactory;
026import javax.jms.JMSException;
027import org.apache.activemq.ActiveMQConnection;
028import org.apache.activemq.ActiveMQConnectionFactory;
029import org.apache.activemq.Service;
030import org.apache.activemq.util.IOExceptionSupport;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033import org.apache.commons.pool.ObjectPoolFactory;
034import org.apache.commons.pool.impl.GenericObjectPool;
035import org.apache.commons.pool.impl.GenericObjectPoolFactory;
036
037/**
038 * A JMS provider which pools Connection, Session and MessageProducer instances
039 * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's <a
040 * href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
041 * Connections, sessions and producers are returned to a pool after use so that they can be reused later
042 * without having to undergo the cost of creating them again.
043 *
044 * b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers,
045 * it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which
046 * are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually
047 * just created at startup and left active, handling incoming messages as they come. When a consumer is
048 * complete, it is best to close it rather than return it to a pool for later reuse: this is because,
049 * even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer,
050 * where they'll get held until the consumer is active again.
051 *
052 * If you are creating a collection of consumers (for example, for multi-threaded message consumption), you
053 * might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that
054 * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
055 * http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
056 *
057 * @org.apache.xbean.XBean element="pooledConnectionFactory"
058 *
059 *
060 */
061public class PooledConnectionFactory implements ConnectionFactory, Service {
062    private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
063    private ConnectionFactory connectionFactory;
064    private Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>();
065    private ObjectPoolFactory poolFactory;
066    private int maximumActive = 500;
067    private int maxConnections = 1;
068    private int idleTimeout = 30 * 1000;
069    private boolean blockIfSessionPoolIsFull = true;
070    private AtomicBoolean stopped = new AtomicBoolean(false);
071    private long expiryTimeout = 0l;
072
073    public PooledConnectionFactory() {
074        this(new ActiveMQConnectionFactory());
075    }
076
077    public PooledConnectionFactory(String brokerURL) {
078        this(new ActiveMQConnectionFactory(brokerURL));
079    }
080
081    public PooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
082        this.connectionFactory = connectionFactory;
083    }
084
085    public ConnectionFactory getConnectionFactory() {
086        return connectionFactory;
087    }
088
089    public void setConnectionFactory(ConnectionFactory connectionFactory) {
090        this.connectionFactory = connectionFactory;
091    }
092
093    public Connection createConnection() throws JMSException {
094        return createConnection(null, null);
095    }
096
097    public synchronized Connection createConnection(String userName, String password) throws JMSException {
098        if (stopped.get()) {
099            LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
100            return null;
101        }
102
103        ConnectionKey key = new ConnectionKey(userName, password);
104        LinkedList<ConnectionPool> pools = cache.get(key);
105
106        if (pools == null) {
107            pools = new LinkedList<ConnectionPool>();
108            cache.put(key, pools);
109        }
110
111        ConnectionPool connection = null;
112        if (pools.size() == maxConnections) {
113            connection = pools.removeFirst();
114        }
115
116        // Now.. we might get a connection, but it might be that we need to
117        // dump it..
118        if (connection != null && connection.expiredCheck()) {
119            connection = null;
120        }
121
122        if (connection == null) {
123            ActiveMQConnection delegate = createConnection(key);
124            connection = createConnectionPool(delegate);
125        }
126        pools.add(connection);
127        return new PooledConnection(connection);
128    }
129
130    protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
131        ConnectionPool result =  new ConnectionPool(connection, getPoolFactory());
132        result.setIdleTimeout(getIdleTimeout());
133        result.setExpiryTimeout(getExpiryTimeout());
134        return result;
135    }
136
137    protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException {
138        if (key.getUserName() == null && key.getPassword() == null) {
139            return (ActiveMQConnection)connectionFactory.createConnection();
140        } else {
141            return (ActiveMQConnection)connectionFactory.createConnection(key.getUserName(), key.getPassword());
142        }
143    }
144
145    /**
146     * @see org.apache.activemq.service.Service#start()
147     */
148    public void start() {
149        try {
150            stopped.set(false);
151            createConnection();
152        } catch (JMSException e) {
153            LOG.warn("Create pooled connection during start failed.", e);
154            IOExceptionSupport.create(e);
155        }
156    }
157
158    public void stop() {
159        LOG.debug("Stop the PooledConnectionFactory, number of connections in cache: "+cache.size());
160        stopped.set(true);
161        for (Iterator<LinkedList<ConnectionPool>> iter = cache.values().iterator(); iter.hasNext();) {
162            for (ConnectionPool connection : iter.next()) {
163                try {
164                    connection.close();
165                }catch(Exception e) {
166                    LOG.warn("Close connection failed",e);
167                }
168            }
169        }
170        cache.clear();
171    }
172
173    public ObjectPoolFactory getPoolFactory() {
174        if (poolFactory == null) {
175            poolFactory = createPoolFactory();
176        }
177        return poolFactory;
178    }
179
180    /**
181     * Sets the object pool factory used to create individual session pools for
182     * each connection
183     */
184    public void setPoolFactory(ObjectPoolFactory poolFactory) {
185        this.poolFactory = poolFactory;
186    }
187
188    public int getMaximumActive() {
189        return maximumActive;
190    }
191
192    /**
193     * Sets the maximum number of active sessions per connection
194     */
195    public void setMaximumActive(int maximumActive) {
196        this.maximumActive = maximumActive;
197    }
198
199    /**
200     * Controls the behavior of the internal session pool. By default the call to
201     * Connection.getSession() will block if the session pool is full.  If the
202     * argument false is given, it will change the default behavior and instead the
203     * call to getSession() will throw a JMSException.
204     *
205     * The size of the session pool is controlled by the @see #maximumActive
206     * property.
207     *
208     * @param block - if true, the call to getSession() blocks if the pool is full
209     * until a session object is available.  defaults to true.
210     */
211    public void setBlockIfSessionPoolIsFull(boolean block) {
212        this.blockIfSessionPoolIsFull = block;
213    }
214
215    /**
216     * @return the maxConnections
217     */
218    public int getMaxConnections() {
219        return maxConnections;
220    }
221
222    /**
223     * @param maxConnections the maxConnections to set
224     */
225    public void setMaxConnections(int maxConnections) {
226        this.maxConnections = maxConnections;
227    }
228
229    /**
230     * Creates an ObjectPoolFactory. Its behavior is controlled by the two
231     * properties @see #maximumActive and @see #blockIfSessionPoolIsFull.
232     *
233     * @return the newly created but empty ObjectPoolFactory
234     */
235    protected ObjectPoolFactory createPoolFactory() {
236         if (blockIfSessionPoolIsFull) {
237            return new GenericObjectPoolFactory(null, maximumActive);
238        } else {
239            return new GenericObjectPoolFactory(null,
240                maximumActive,
241                GenericObjectPool.WHEN_EXHAUSTED_FAIL,
242                GenericObjectPool.DEFAULT_MAX_WAIT);
243        }
244    }
245
246    public int getIdleTimeout() {
247        return idleTimeout;
248    }
249
250    public void setIdleTimeout(int idleTimeout) {
251        this.idleTimeout = idleTimeout;
252    }
253
254    /**
255     * allow connections to expire, irrespective of load or idle time. This is useful with failover
256     * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
257     *
258     * @param expiryTimeout non zero in milliseconds
259     */
260    public void setExpiryTimeout(long expiryTimeout) {
261        this.expiryTimeout = expiryTimeout;
262    }
263
264    public long getExpiryTimeout() {
265        return expiryTimeout;
266    }
267}