001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.plugin; 018 019import java.io.File; 020import java.io.FileInputStream; 021import java.io.FileOutputStream; 022import java.io.IOException; 023import java.io.ObjectInputStream; 024import java.io.ObjectOutputStream; 025import java.util.concurrent.ConcurrentHashMap; 026 027import org.apache.activemq.broker.Broker; 028import org.apache.activemq.broker.BrokerFilter; 029import org.apache.activemq.broker.ConnectionContext; 030import org.apache.activemq.broker.region.Subscription; 031import org.apache.activemq.command.ConsumerInfo; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * A plugin which allows the caching of the selector from a subscription queue. 037 * <p/> 038 * This stops the build-up of unwanted messages, especially when consumers may 039 * disconnect from time to time when using virtual destinations. 040 * <p/> 041 * This is influenced by code snippets developed by Maciej Rakowicz 042 * 043 * @author Roelof Naude roelof(dot)naude(at)gmail.com 044 * @see https://issues.apache.org/activemq/browse/AMQ-3004 045 * @see http://mail-archives.apache.org/mod_mbox/activemq-users/201011.mbox/%3C8A013711-2613-450A-A487-379E784AF1D6@homeaway.co.uk%3E 046 */ 047public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnable { 048 private static final Logger LOG = LoggerFactory.getLogger(SubQueueSelectorCacheBroker.class); 049 050 /** 051 * The subscription's selector cache. We cache compiled expressions keyed 052 * by the target destination. 053 */ 054 private ConcurrentHashMap<String, String> subSelectorCache = new ConcurrentHashMap<String, String>(); 055 056 private final File persistFile; 057 058 private boolean running = true; 059 private Thread persistThread; 060 private static final long MAX_PERSIST_INTERVAL = 600000; 061 private static final String SELECTOR_CACHE_PERSIST_THREAD_NAME = "SelectorCachePersistThread"; 062 063 /** 064 * Constructor 065 */ 066 public SubQueueSelectorCacheBroker(Broker next, final File persistFile) { 067 super(next); 068 this.persistFile = persistFile; 069 LOG.info("Using persisted selector cache from[" + persistFile + "]"); 070 071 readCache(); 072 073 persistThread = new Thread(this, SELECTOR_CACHE_PERSIST_THREAD_NAME); 074 persistThread.start(); 075 } 076 077 @Override 078 public void stop() throws Exception { 079 running = false; 080 if (persistThread != null) { 081 persistThread.interrupt(); 082 persistThread.join(); 083 } //if 084 } 085 086 @Override 087 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 088 LOG.debug("Caching consumer selector [" + info.getSelector() + "] on a " + info.getDestination().getQualifiedName()); 089 if (info.getSelector() != null) { 090 subSelectorCache.put(info.getDestination().getQualifiedName(), info.getSelector()); 091 } //if 092 return super.addConsumer(context, info); 093 } 094 095 private void readCache() { 096 if (persistFile != null && persistFile.exists()) { 097 try { 098 FileInputStream fis = new FileInputStream(persistFile); 099 try { 100 ObjectInputStream in = new ObjectInputStream(fis); 101 try { 102 subSelectorCache = (ConcurrentHashMap<String, String>) in.readObject(); 103 } catch (ClassNotFoundException ex) { 104 LOG.error("Invalid selector cache data found. Please remove file.", ex); 105 } finally { 106 in.close(); 107 } //try 108 } finally { 109 fis.close(); 110 } //try 111 } catch (IOException ex) { 112 LOG.error("Unable to read persisted selector cache...it will be ignored!", ex); 113 } //try 114 } //if 115 } 116 117 /** 118 * Persist the selector cache. 119 */ 120 private void persistCache() { 121 LOG.debug("Persisting selector cache...."); 122 try { 123 FileOutputStream fos = new FileOutputStream(persistFile); 124 try { 125 ObjectOutputStream out = new ObjectOutputStream(fos); 126 try { 127 out.writeObject(subSelectorCache); 128 } finally { 129 out.flush(); 130 out.close(); 131 } //try 132 } catch (IOException ex) { 133 LOG.error("Unable to persist selector cache", ex); 134 } finally { 135 fos.close(); 136 } //try 137 } catch (IOException ex) { 138 LOG.error("Unable to access file[" + persistFile + "]", ex); 139 } //try 140 } 141 142 /** 143 * @return The JMS selector for the specified {@code destination} 144 */ 145 public String getSelector(final String destination) { 146 return subSelectorCache.get(destination); 147 } 148 149 /** 150 * Persist the selector cache every {@code MAX_PERSIST_INTERVAL}ms. 151 * 152 * @see java.lang.Runnable#run() 153 */ 154 public void run() { 155 while (running) { 156 try { 157 Thread.sleep(MAX_PERSIST_INTERVAL); 158 } catch (InterruptedException ex) { 159 } //try 160 161 persistCache(); 162 } 163 } 164} 165