001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.plugin;
018
019import java.io.File;
020import java.io.FileInputStream;
021import java.io.FileOutputStream;
022import java.io.IOException;
023import java.io.ObjectInputStream;
024import java.io.ObjectOutputStream;
025import java.util.concurrent.ConcurrentHashMap;
026
027import org.apache.activemq.broker.Broker;
028import org.apache.activemq.broker.BrokerFilter;
029import org.apache.activemq.broker.ConnectionContext;
030import org.apache.activemq.broker.region.Subscription;
031import org.apache.activemq.command.ConsumerInfo;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * A plugin which allows the caching of the selector from a subscription queue.
037 * <p/>
038 * This stops the build-up of unwanted messages, especially when consumers may
039 * disconnect from time to time when using virtual destinations.
040 * <p/>
041 * This is influenced by code snippets developed by Maciej Rakowicz
042 *
043 * @author Roelof Naude roelof(dot)naude(at)gmail.com
044 * @see https://issues.apache.org/activemq/browse/AMQ-3004
045 * @see http://mail-archives.apache.org/mod_mbox/activemq-users/201011.mbox/%3C8A013711-2613-450A-A487-379E784AF1D6@homeaway.co.uk%3E
046 */
047public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnable {
048    private static final Logger LOG = LoggerFactory.getLogger(SubQueueSelectorCacheBroker.class);
049
050    /**
051     * The subscription's selector cache. We cache compiled expressions keyed
052     * by the target destination.
053     */
054    private ConcurrentHashMap<String, String> subSelectorCache = new ConcurrentHashMap<String, String>();
055
056    private final File persistFile;
057
058    private boolean running = true;
059    private Thread persistThread;
060    private static final long MAX_PERSIST_INTERVAL = 600000;
061    private static final String SELECTOR_CACHE_PERSIST_THREAD_NAME = "SelectorCachePersistThread";
062
063    /**
064     * Constructor
065     */
066    public SubQueueSelectorCacheBroker(Broker next, final File persistFile) {
067        super(next);
068        this.persistFile = persistFile;
069        LOG.info("Using persisted selector cache from[" + persistFile + "]");
070
071        readCache();
072
073        persistThread = new Thread(this, SELECTOR_CACHE_PERSIST_THREAD_NAME);
074        persistThread.start();
075    }
076
077    @Override
078    public void stop() throws Exception {
079        running = false;
080        if (persistThread != null) {
081            persistThread.interrupt();
082            persistThread.join();
083        } //if
084    }
085
086    @Override
087    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
088        LOG.debug("Caching consumer selector [" + info.getSelector() + "] on a " + info.getDestination().getQualifiedName());
089        if (info.getSelector() != null) {
090            subSelectorCache.put(info.getDestination().getQualifiedName(), info.getSelector());
091        } //if
092        return super.addConsumer(context, info);
093    }
094
095    private void readCache() {
096        if (persistFile != null && persistFile.exists()) {
097            try {
098                FileInputStream fis = new FileInputStream(persistFile);
099                try {
100                    ObjectInputStream in = new ObjectInputStream(fis);
101                    try {
102                        subSelectorCache = (ConcurrentHashMap<String, String>) in.readObject();
103                    } catch (ClassNotFoundException ex) {
104                        LOG.error("Invalid selector cache data found. Please remove file.", ex);
105                    } finally {
106                        in.close();
107                    } //try
108                } finally {
109                    fis.close();
110                } //try
111            } catch (IOException ex) {
112                LOG.error("Unable to read persisted selector cache...it will be ignored!", ex);
113            } //try
114        } //if
115    }
116
117    /**
118     * Persist the selector cache.
119     */
120    private void persistCache() {
121        LOG.debug("Persisting selector cache....");
122        try {
123            FileOutputStream fos = new FileOutputStream(persistFile);
124            try {
125                ObjectOutputStream out = new ObjectOutputStream(fos);
126                try {
127                    out.writeObject(subSelectorCache);
128                } finally {
129                    out.flush();
130                    out.close();
131                } //try
132            } catch (IOException ex) {
133                LOG.error("Unable to persist selector cache", ex);
134            } finally {
135                fos.close();
136            } //try
137        } catch (IOException ex) {
138            LOG.error("Unable to access file[" + persistFile + "]", ex);
139        } //try
140    }
141
142    /**
143     * @return The JMS selector for the specified {@code destination}
144     */
145    public String getSelector(final String destination) {
146        return subSelectorCache.get(destination);
147    }
148
149    /**
150     * Persist the selector cache every {@code MAX_PERSIST_INTERVAL}ms.
151     *
152     * @see java.lang.Runnable#run()
153     */
154    public void run() {
155        while (running) {
156            try {
157                Thread.sleep(MAX_PERSIST_INTERVAL);
158            } catch (InterruptedException ex) {
159            } //try
160
161            persistCache();
162        }
163    }
164}
165