001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.util.HashMap;
020import java.util.HashSet;
021import java.util.Map;
022import java.util.Set;
023import java.util.Timer;
024import java.util.TimerTask;
025
026import org.apache.activemq.broker.ConnectionContext;
027import org.apache.activemq.command.ActiveMQDestination;
028import org.apache.activemq.thread.TaskRunnerFactory;
029import org.apache.activemq.usage.SystemUsage;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * 
035 */
036public abstract class AbstractTempRegion extends AbstractRegion {
037    private static final Logger LOG = LoggerFactory.getLogger(TempQueueRegion.class);
038
039    private Map<CachedDestination, Destination> cachedDestinations = new HashMap<CachedDestination, Destination>();
040    private final boolean doCacheTempDestinations;
041    private final int purgeTime;
042    private Timer purgeTimer;
043    private TimerTask purgeTask;
044   
045
046    /**
047     * @param broker
048     * @param destinationStatistics
049     * @param memoryManager
050     * @param taskRunnerFactory
051     * @param destinationFactory
052     */
053    public AbstractTempRegion(RegionBroker broker,
054            DestinationStatistics destinationStatistics,
055            SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
056            DestinationFactory destinationFactory) {
057        super(broker, destinationStatistics, memoryManager, taskRunnerFactory,
058                destinationFactory);
059        this.doCacheTempDestinations=broker.getBrokerService().isCacheTempDestinations();
060        this.purgeTime = broker.getBrokerService().getTimeBeforePurgeTempDestinations();
061        if (this.doCacheTempDestinations) {
062            this.purgeTimer = new Timer("ActiveMQ Temp destination purge timer", true);
063            this.purgeTask = new TimerTask() {
064                public void run() {
065                    doPurge();
066                }
067    
068            };
069            this.purgeTimer.schedule(purgeTask, purgeTime, purgeTime);
070        }
071       
072    }
073
074    public void stop() throws Exception {
075        super.stop();
076        if (purgeTimer != null) {
077            purgeTimer.cancel();
078        }
079    }
080
081    protected abstract Destination doCreateDestination(
082            ConnectionContext context, ActiveMQDestination destination)
083            throws Exception;
084
085    protected synchronized Destination createDestination(
086            ConnectionContext context, ActiveMQDestination destination)
087            throws Exception {
088        Destination result = cachedDestinations.remove(new CachedDestination(
089                destination));
090        if (result == null) {
091            result = doCreateDestination(context, destination);
092        }
093        return result;
094    }
095
096    protected final synchronized void dispose(ConnectionContext context,
097            Destination dest) throws Exception {
098        // add to cache
099        if (this.doCacheTempDestinations) {
100            cachedDestinations.put(new CachedDestination(dest
101                    .getActiveMQDestination()), dest);
102        }else {
103            try {
104                dest.dispose(context);
105                dest.stop();
106            } catch (Exception e) {
107                LOG.warn("Failed to dispose of " + dest, e);
108            }
109        }
110    }
111
112    private void doDispose(Destination dest) {
113        ConnectionContext context = new ConnectionContext();
114        try {
115            dest.dispose(context);
116            dest.stop();
117        } catch (Exception e) {
118            LOG.warn("Failed to dispose of " + dest, e);
119        }
120
121    }
122
123    private synchronized void doPurge() {
124        long currentTime = System.currentTimeMillis();
125        if (cachedDestinations.size() > 0) {
126            Set<CachedDestination> tmp = new HashSet<CachedDestination>(
127                    cachedDestinations.keySet());
128            for (CachedDestination key : tmp) {
129                if ((key.timeStamp + purgeTime) < currentTime) {
130                    Destination dest = cachedDestinations.remove(key);
131                    if (dest != null) {
132                        doDispose(dest);
133                    }
134                }
135            }
136        }
137    }
138
139    static class CachedDestination {
140        long timeStamp;
141
142        ActiveMQDestination destination;
143
144        CachedDestination(ActiveMQDestination destination) {
145            this.destination = destination;
146            this.timeStamp = System.currentTimeMillis();
147        }
148
149        public int hashCode() {
150            return destination.hashCode();
151        }
152
153        public boolean equals(Object o) {
154            if (o instanceof CachedDestination) {
155                CachedDestination other = (CachedDestination) o;
156                return other.destination.equals(this.destination);
157            }
158            return false;
159        }
160
161    }
162
163}