001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import org.apache.activemq.broker.Broker; 020import org.apache.activemq.broker.region.MessageReference; 021import org.apache.activemq.broker.region.Queue; 022import org.apache.activemq.command.Message; 023import org.apache.activemq.usage.SystemUsage; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026 027/** 028 * Store based Cursor for Queues 029 */ 030public class StoreQueueCursor extends AbstractPendingMessageCursor { 031 032 private static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursor.class); 033 private final Broker broker; 034 private int pendingCount; 035 private final Queue queue; 036 private PendingMessageCursor nonPersistent; 037 private final QueueStorePrefetch persistent; 038 private boolean started; 039 private PendingMessageCursor currentCursor; 040 041 /** 042 * Construct 043 * @param broker 044 * @param queue 045 */ 046 public StoreQueueCursor(Broker broker,Queue queue) { 047 super((queue != null ? queue.isPrioritizedMessages():false)); 048 this.broker=broker; 049 this.queue = queue; 050 this.persistent = new QueueStorePrefetch(queue); 051 currentCursor = persistent; 052 } 053 054 public synchronized void start() throws Exception { 055 started = true; 056 super.start(); 057 if (nonPersistent == null) { 058 if (broker.getBrokerService().isPersistent()) { 059 nonPersistent = new FilePendingMessageCursor(broker,queue.getName(),this.prioritizedMessages); 060 }else { 061 nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages); 062 } 063 nonPersistent.setMaxBatchSize(getMaxBatchSize()); 064 nonPersistent.setSystemUsage(systemUsage); 065 nonPersistent.setEnableAudit(isEnableAudit()); 066 nonPersistent.setMaxAuditDepth(getMaxAuditDepth()); 067 nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit()); 068 } 069 nonPersistent.setMessageAudit(getMessageAudit()); 070 nonPersistent.start(); 071 persistent.setMessageAudit(getMessageAudit()); 072 persistent.start(); 073 pendingCount = persistent.size() + nonPersistent.size(); 074 } 075 076 public synchronized void stop() throws Exception { 077 started = false; 078 if (nonPersistent != null) { 079// nonPersistent.clear(); 080// nonPersistent.stop(); 081// nonPersistent.gc(); 082 nonPersistent.destroy(); 083 } 084 persistent.stop(); 085 persistent.gc(); 086 super.stop(); 087 pendingCount = 0; 088 } 089 090 public synchronized void addMessageLast(MessageReference node) throws Exception { 091 if (node != null) { 092 Message msg = node.getMessage(); 093 if (started) { 094 pendingCount++; 095 if (!msg.isPersistent()) { 096 nonPersistent.addMessageLast(node); 097 } 098 } 099 if (msg.isPersistent()) { 100 persistent.addMessageLast(node); 101 } 102 } 103 } 104 105 public synchronized void addMessageFirst(MessageReference node) throws Exception { 106 if (node != null) { 107 Message msg = node.getMessage(); 108 if (started) { 109 pendingCount++; 110 if (!msg.isPersistent()) { 111 nonPersistent.addMessageFirst(node); 112 } 113 } 114 if (msg.isPersistent()) { 115 persistent.addMessageFirst(node); 116 } 117 } 118 } 119 120 public synchronized void clear() { 121 pendingCount = 0; 122 } 123 124 public synchronized boolean hasNext() { 125 try { 126 getNextCursor(); 127 } catch (Exception e) { 128 LOG.error("Failed to get current cursor ", e); 129 throw new RuntimeException(e); 130 } 131 return currentCursor != null ? currentCursor.hasNext() : false; 132 } 133 134 public synchronized MessageReference next() { 135 MessageReference result = currentCursor != null ? currentCursor.next() : null; 136 return result; 137 } 138 139 public synchronized void remove() { 140 if (currentCursor != null) { 141 currentCursor.remove(); 142 } 143 pendingCount--; 144 } 145 146 public synchronized void remove(MessageReference node) { 147 if (!node.isPersistent()) { 148 nonPersistent.remove(node); 149 } else { 150 persistent.remove(node); 151 } 152 pendingCount--; 153 } 154 155 public synchronized void reset() { 156 nonPersistent.reset(); 157 persistent.reset(); 158 pendingCount = persistent.size() + nonPersistent.size(); 159 } 160 161 public void release() { 162 nonPersistent.release(); 163 persistent.release(); 164 } 165 166 167 public synchronized int size() { 168 if (pendingCount < 0) { 169 pendingCount = persistent.size() + nonPersistent.size(); 170 } 171 return pendingCount; 172 } 173 174 public synchronized boolean isEmpty() { 175 // if negative, more messages arrived in store since last reset so non empty 176 return pendingCount == 0; 177 } 178 179 /** 180 * Informs the Broker if the subscription needs to intervention to recover 181 * it's state e.g. DurableTopicSubscriber may do 182 * 183 * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor 184 * @return true if recovery required 185 */ 186 public boolean isRecoveryRequired() { 187 return false; 188 } 189 190 /** 191 * @return the nonPersistent Cursor 192 */ 193 public PendingMessageCursor getNonPersistent() { 194 return this.nonPersistent; 195 } 196 197 /** 198 * @param nonPersistent cursor to set 199 */ 200 public void setNonPersistent(PendingMessageCursor nonPersistent) { 201 this.nonPersistent = nonPersistent; 202 } 203 204 public void setMaxBatchSize(int maxBatchSize) { 205 persistent.setMaxBatchSize(maxBatchSize); 206 if (nonPersistent != null) { 207 nonPersistent.setMaxBatchSize(maxBatchSize); 208 } 209 super.setMaxBatchSize(maxBatchSize); 210 } 211 212 213 public void setMaxProducersToAudit(int maxProducersToAudit) { 214 super.setMaxProducersToAudit(maxProducersToAudit); 215 if (persistent != null) { 216 persistent.setMaxProducersToAudit(maxProducersToAudit); 217 } 218 if (nonPersistent != null) { 219 nonPersistent.setMaxProducersToAudit(maxProducersToAudit); 220 } 221 } 222 223 public void setMaxAuditDepth(int maxAuditDepth) { 224 super.setMaxAuditDepth(maxAuditDepth); 225 if (persistent != null) { 226 persistent.setMaxAuditDepth(maxAuditDepth); 227 } 228 if (nonPersistent != null) { 229 nonPersistent.setMaxAuditDepth(maxAuditDepth); 230 } 231 } 232 233 public void setEnableAudit(boolean enableAudit) { 234 super.setEnableAudit(enableAudit); 235 if (persistent != null) { 236 persistent.setEnableAudit(enableAudit); 237 } 238 if (nonPersistent != null) { 239 nonPersistent.setEnableAudit(enableAudit); 240 } 241 } 242 243 @Override 244 public void setUseCache(boolean useCache) { 245 super.setUseCache(useCache); 246 if (persistent != null) { 247 persistent.setUseCache(useCache); 248 } 249 if (nonPersistent != null) { 250 nonPersistent.setUseCache(useCache); 251 } 252 } 253 254 @Override 255 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { 256 super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 257 if (persistent != null) { 258 persistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 259 } 260 if (nonPersistent != null) { 261 nonPersistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 262 } 263 } 264 265 266 267 public synchronized void gc() { 268 if (persistent != null) { 269 persistent.gc(); 270 } 271 if (nonPersistent != null) { 272 nonPersistent.gc(); 273 } 274 pendingCount = persistent.size() + nonPersistent.size(); 275 } 276 277 public void setSystemUsage(SystemUsage usageManager) { 278 super.setSystemUsage(usageManager); 279 if (persistent != null) { 280 persistent.setSystemUsage(usageManager); 281 } 282 if (nonPersistent != null) { 283 nonPersistent.setSystemUsage(usageManager); 284 } 285 } 286 287 protected synchronized PendingMessageCursor getNextCursor() throws Exception { 288 if (currentCursor == null || !currentCursor.hasMessagesBufferedToDeliver()) { 289 currentCursor = currentCursor == persistent ? nonPersistent : persistent; 290 // sanity check 291 if (currentCursor.isEmpty()) { 292 currentCursor = currentCursor == persistent ? nonPersistent : persistent; 293 } 294 } 295 return currentCursor; 296 } 297 298 @Override 299 public boolean isCacheEnabled() { 300 boolean cacheEnabled = isUseCache(); 301 if (cacheEnabled) { 302 if (persistent != null) { 303 cacheEnabled &= persistent.isCacheEnabled(); 304 } 305 if (nonPersistent != null) { 306 cacheEnabled &= nonPersistent.isCacheEnabled(); 307 } 308 setCacheEnabled(cacheEnabled); 309 } 310 return cacheEnabled; 311 } 312}