001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.IOException;
020import java.sql.Connection;
021import java.sql.PreparedStatement;
022import java.sql.SQLException;
023import java.sql.SQLFeatureNotSupportedException;
024
025import javax.sql.DataSource;
026
027import org.apache.activemq.util.Handler;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * Represents an exclusive lock on a database to avoid multiple brokers running
033 * against the same logical database.
034 * 
035 * @org.apache.xbean.XBean element="database-locker"
036 * 
037 */
038public class DefaultDatabaseLocker implements DatabaseLocker {
039    public static final long DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL = 1000;
040    private static final Logger LOG = LoggerFactory.getLogger(DefaultDatabaseLocker.class);
041    protected DataSource dataSource;
042    protected Statements statements;
043    protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
044
045    protected PreparedStatement lockCreateStatement;
046    protected PreparedStatement lockUpdateStatement;
047    protected Connection connection;
048    protected boolean stopping;
049    protected Handler<Exception> exceptionHandler;
050    protected int queryTimeout = 10;
051    
052    public DefaultDatabaseLocker() {
053    }
054    
055    public DefaultDatabaseLocker(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
056        setPersistenceAdapter(persistenceAdapter);
057    }
058
059    public void setPersistenceAdapter(JDBCPersistenceAdapter adapter) throws IOException {
060        this.dataSource = adapter.getLockDataSource();
061        this.statements = adapter.getStatements();
062    }
063    
064    public void start() throws Exception {
065        stopping = false;
066
067        LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
068        String sql = statements.getLockCreateStatement();
069        LOG.debug("Locking Query is "+sql);
070        
071        while (true) {
072            try {
073                connection = dataSource.getConnection();
074                connection.setAutoCommit(false);
075                lockCreateStatement = connection.prepareStatement(sql);
076                lockCreateStatement.execute();
077                break;
078            } catch (Exception e) {
079                try {
080                    if (stopping) {
081                        throw new Exception(
082                                "Cannot start broker as being asked to shut down. " 
083                                        + "Interrupted attempt to acquire lock: "
084                                        + e, e);
085                    }
086                    if (exceptionHandler != null) {
087                        try {
088                            exceptionHandler.handle(e);
089                        } catch (Throwable handlerException) {
090                            LOG.error( "The exception handler "
091                                    + exceptionHandler.getClass().getCanonicalName()
092                                    + " threw this exception: "
093                                    + handlerException
094                                    + " while trying to handle this exception: "
095                                    + e, handlerException);
096                        }
097
098                    } else {
099                        LOG.debug("Lock failure: "+ e, e);
100                    }
101                } finally {
102                    // Let's make sure the database connection is properly
103                    // closed when an error occurs so that we're not leaking
104                    // connections 
105                    if (null != connection) {
106                        try {
107                            connection.close();
108                        } catch (SQLException e1) {
109                            LOG.error("Caught exception while closing connection: " + e1, e1);
110                        }
111                        
112                        connection = null;
113                    }
114                }
115            } finally {
116                if (null != lockCreateStatement) {
117                    try {
118                        lockCreateStatement.close();
119                    } catch (SQLException e1) {
120                        LOG.debug("Caught while closing statement: " + e1, e1);
121                    }
122                    lockCreateStatement = null;
123                }
124            }
125
126            LOG.info("Failed to acquire lock.  Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again...");
127            try {
128                Thread.sleep(lockAcquireSleepInterval);
129            } catch (InterruptedException ie) {
130                LOG.warn("Master lock retry sleep interrupted", ie);
131            }
132        }
133
134        LOG.info("Becoming the master on dataSource: " + dataSource);
135    }
136
137    public void stop() throws Exception {
138        stopping = true;
139        try {
140            if (lockCreateStatement != null) {
141                lockCreateStatement.cancel();                           
142            }
143        } catch (SQLFeatureNotSupportedException e) {
144            LOG.warn("Failed to cancel locking query on dataSource" + dataSource, e);                   
145        }
146        try {
147            if (lockUpdateStatement != null) {
148                    lockUpdateStatement.cancel();                       
149            }
150        } catch (SQLFeatureNotSupportedException e) {
151            LOG.warn("Failed to cancel locking query on dataSource" + dataSource, e);                   
152        }
153        try {
154            if (connection != null && !connection.isClosed()) {
155                try {
156                    connection.rollback();
157                } catch (SQLException sqle) {
158                    LOG.warn("Exception while rollbacking the connection on shutdown", sqle);
159                } finally {
160                    try {
161                        connection.close();
162                    } catch (SQLException ignored) {
163                        LOG.debug("Exception while closing connection on shutdown", ignored);
164                    }
165                    lockCreateStatement = null;
166                }
167            }
168        } catch (SQLException sqle) {
169            LOG.warn("Exception while checking close status of connection on shutdown", sqle);
170        }
171    }
172
173    public boolean keepAlive() {
174        boolean result = false;
175        try {
176            lockUpdateStatement = connection.prepareStatement(statements.getLockUpdateStatement());
177            lockUpdateStatement.setLong(1, System.currentTimeMillis());
178            if (queryTimeout > 0) {
179                lockUpdateStatement.setQueryTimeout(queryTimeout);
180            }
181            int rows = lockUpdateStatement.executeUpdate();
182            if (rows == 1) {
183                result=true;
184            }
185        } catch (Exception e) {
186            LOG.error("Failed to update database lock: " + e, e);
187        } finally {
188            if (lockUpdateStatement != null) {
189                try {
190                    lockUpdateStatement.close();
191                } catch (SQLException e) {
192                    LOG.error("Failed to close statement",e);
193                }
194                lockUpdateStatement = null;
195            }
196        }
197        return result;
198    }
199 
200    public long getLockAcquireSleepInterval() {
201        return lockAcquireSleepInterval;
202    }
203
204    public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
205        this.lockAcquireSleepInterval = lockAcquireSleepInterval;
206    }
207    
208    public Handler getExceptionHandler() {
209        return exceptionHandler;
210    }
211
212    public void setExceptionHandler(Handler exceptionHandler) {
213        this.exceptionHandler = exceptionHandler;
214    }
215
216    public int getQueryTimeout() {
217        return queryTimeout;
218    }
219
220    public void setQueryTimeout(int queryTimeout) {
221        this.queryTimeout = queryTimeout;
222    }
223}