001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.pool;
019
020import java.io.IOException;
021import java.util.Iterator;
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.ConcurrentLinkedQueue;
024import java.util.concurrent.atomic.AtomicBoolean;
025
026import javax.jms.JMSException;
027import javax.jms.Session;
028
029import org.apache.activemq.ActiveMQConnection;
030import org.apache.activemq.transport.TransportListener;
031import org.apache.commons.pool.ObjectPoolFactory;
032
033/**
034 * Holds a real JMS connection along with the session pools associated with it.
035 *
036 *
037 */
038public class ConnectionPool {
039
040    private ActiveMQConnection connection;
041    private ConcurrentHashMap<SessionKey, SessionPool> cache;
042    private ConcurrentLinkedQueue<PooledSession> loanedSessions = new ConcurrentLinkedQueue<PooledSession>();
043    private AtomicBoolean started = new AtomicBoolean(false);
044    private int referenceCount;
045    private ObjectPoolFactory poolFactory;
046    private long lastUsed = System.currentTimeMillis();
047    private long firstUsed = lastUsed;
048    private boolean hasFailed;
049    private boolean hasExpired;
050    private int idleTimeout = 30 * 1000;
051    private long expiryTimeout = 0l;
052
053    public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) {
054        this(connection, new ConcurrentHashMap<SessionKey, SessionPool>(), poolFactory);
055        // Add a transport Listener so that we can notice if this connection
056        // should be expired due to a connection failure.
057        connection.addTransportListener(new TransportListener() {
058            public void onCommand(Object command) {
059            }
060
061            public void onException(IOException error) {
062                synchronized (ConnectionPool.this) {
063                    hasFailed = true;
064                }
065            }
066
067            public void transportInterupted() {
068            }
069
070            public void transportResumed() {
071            }
072        });
073
074        // make sure that we set the hasFailed flag, in case the transport already failed
075        // prior to the addition of our new TransportListener
076        if(connection.isTransportFailed()) {
077            hasFailed = true;
078        }
079    }
080
081    public ConnectionPool(ActiveMQConnection connection, ConcurrentHashMap<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory) {
082        this.connection = connection;
083        this.cache = cache;
084        this.poolFactory = poolFactory;
085    }
086
087    public void start() throws JMSException {
088        if (started.compareAndSet(false, true)) {
089            try {
090                connection.start();
091            } catch (JMSException e) {
092                started.set(false);
093                throw(e);
094            }
095        }
096    }
097
098    public synchronized ActiveMQConnection getConnection() {
099        return connection;
100    }
101
102    public Session createSession(boolean transacted, int ackMode) throws JMSException {
103        SessionKey key = new SessionKey(transacted, ackMode);
104        SessionPool pool = null;
105        pool = cache.get(key);
106        if (pool == null) {
107            SessionPool newPool = createSessionPool(key);
108            SessionPool prevPool = cache.putIfAbsent(key, newPool);
109            if (prevPool != null && prevPool != newPool) {
110                // newPool was not the first one to be associated with this
111                // key... close created session pool
112                try {
113                    newPool.close();
114                } catch (Exception e) {
115                    throw new JMSException(e.getMessage());
116                }
117            }
118            pool = cache.get(key); // this will return a non-null value...
119        }
120        PooledSession session = pool.borrowSession();
121        this.loanedSessions.add(session);
122        return session;
123    }
124
125    public synchronized void close() {
126        if (connection != null) {
127            try {
128                Iterator<SessionPool> i = cache.values().iterator();
129                while (i.hasNext()) {
130                    SessionPool pool = i.next();
131                    i.remove();
132                    try {
133                        pool.close();
134                    } catch (Exception e) {
135                    }
136                }
137            } finally {
138                try {
139                    connection.close();
140                } catch (Exception e) {
141                } finally {
142                    connection = null;
143                }
144            }
145        }
146    }
147
148    public synchronized void incrementReferenceCount() {
149        referenceCount++;
150        lastUsed = System.currentTimeMillis();
151    }
152
153    public synchronized void decrementReferenceCount() {
154        referenceCount--;
155        lastUsed = System.currentTimeMillis();
156        if (referenceCount == 0) {
157            expiredCheck();
158
159            for (PooledSession session : this.loanedSessions) {
160                try {
161                    session.close();
162                } catch (Exception e) {
163                }
164            }
165            this.loanedSessions.clear();
166
167            // only clean up temp destinations when all users
168            // of this connection have called close
169            if (getConnection() != null) {
170                getConnection().cleanUpTempDestinations();
171            }
172        }
173    }
174
175    /**
176     * @return true if this connection has expired.
177     */
178    public synchronized boolean expiredCheck() {
179        if (connection == null) {
180            return true;
181        }
182        if (hasExpired) {
183            if (referenceCount == 0) {
184                close();
185            }
186            return true;
187        }
188        if (hasFailed
189                || (idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout)
190                || expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
191            hasExpired = true;
192            if (referenceCount == 0) {
193                close();
194            }
195            return true;
196        }
197        return false;
198    }
199
200    public int getIdleTimeout() {
201        return idleTimeout;
202    }
203
204    public void setIdleTimeout(int idleTimeout) {
205        this.idleTimeout = idleTimeout;
206    }
207
208    protected SessionPool createSessionPool(SessionKey key) {
209        return new SessionPool(this, key, poolFactory.createPool());
210    }
211
212    public void setExpiryTimeout(long expiryTimeout) {
213        this.expiryTimeout  = expiryTimeout;
214    }
215
216    public long getExpiryTimeout() {
217        return expiryTimeout;
218    }
219
220    void onSessionReturned(PooledSession session) {
221        this.loanedSessions.remove(session);
222    }
223
224    void onSessionInvalidated(PooledSession session) {
225        this.loanedSessions.remove(session);
226    }
227}