001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import java.util.Collections; 020import java.util.HashMap; 021import java.util.List; 022import java.util.Map; 023import java.util.concurrent.CopyOnWriteArrayList; 024import org.apache.activemq.advisory.AdvisorySupport; 025import org.apache.activemq.broker.Broker; 026import org.apache.activemq.broker.ConnectionContext; 027import org.apache.activemq.broker.region.Destination; 028import org.apache.activemq.broker.region.DurableTopicSubscription; 029import org.apache.activemq.broker.region.MessageReference; 030import org.apache.activemq.broker.region.Topic; 031import org.apache.activemq.command.Message; 032import org.apache.activemq.usage.SystemUsage; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * persist pending messages pending message (messages awaiting dispatch to a 038 * consumer) cursor 039 * 040 * 041 */ 042public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { 043 044 private static final Logger LOG = LoggerFactory.getLogger(StoreDurableSubscriberCursor.class); 045 private final String clientId; 046 private final String subscriberName; 047 private final Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination, TopicStorePrefetch>(); 048 private final List<PendingMessageCursor> storePrefetches = new CopyOnWriteArrayList<PendingMessageCursor>(); 049 private final PendingMessageCursor nonPersistent; 050 private PendingMessageCursor currentCursor; 051 private final DurableTopicSubscription subscription; 052 private boolean immediatePriorityDispatch = true; 053 /** 054 * @param broker Broker for this cursor 055 * @param clientId clientId for this cursor 056 * @param subscriberName subscriber name for this cursor 057 * @param maxBatchSize currently ignored 058 * @param subscription subscription for this cursor 059 */ 060 public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, DurableTopicSubscription subscription) { 061 super(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,subscription)); 062 this.subscription=subscription; 063 this.clientId = clientId; 064 this.subscriberName = subscriberName; 065 if (broker.getBrokerService().isPersistent()) { 066 this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName,this.prioritizedMessages); 067 } else { 068 this.nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages); 069 } 070 071 this.nonPersistent.setMaxBatchSize(maxBatchSize); 072 this.nonPersistent.setSystemUsage(systemUsage); 073 this.storePrefetches.add(this.nonPersistent); 074 075 if (prioritizedMessages) { 076 setMaxAuditDepth(10*getMaxAuditDepth()); 077 } 078 } 079 080 @Override 081 public synchronized void start() throws Exception { 082 if (!isStarted()) { 083 super.start(); 084 for (PendingMessageCursor tsp : storePrefetches) { 085 tsp.setMessageAudit(getMessageAudit()); 086 tsp.start(); 087 } 088 } 089 } 090 091 @Override 092 public synchronized void stop() throws Exception { 093 if (isStarted()) { 094 if (subscription.isKeepDurableSubsActive()) { 095 super.gc(); 096 for (PendingMessageCursor tsp : storePrefetches) { 097 tsp.gc(); 098 } 099 } else { 100 super.stop(); 101 for (PendingMessageCursor tsp : storePrefetches) { 102 tsp.stop(); 103 } 104 getMessageAudit().clear(); 105 } 106 } 107 } 108 109 /** 110 * Add a destination 111 * 112 * @param context 113 * @param destination 114 * @throws Exception 115 */ 116 @Override 117 public synchronized void add(ConnectionContext context, Destination destination) throws Exception { 118 if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) { 119 TopicStorePrefetch tsp = new TopicStorePrefetch(this.subscription,(Topic)destination, clientId, subscriberName); 120 tsp.setMaxBatchSize(destination.getMaxPageSize()); 121 tsp.setSystemUsage(systemUsage); 122 tsp.setMessageAudit(getMessageAudit()); 123 tsp.setEnableAudit(isEnableAudit()); 124 tsp.setMemoryUsageHighWaterMark(getMemoryUsageHighWaterMark()); 125 tsp.setUseCache(isUseCache()); 126 topics.put(destination, tsp); 127 storePrefetches.add(tsp); 128 if (isStarted()) { 129 tsp.start(); 130 } 131 } 132 } 133 134 /** 135 * remove a destination 136 * 137 * @param context 138 * @param destination 139 * @throws Exception 140 */ 141 @Override 142 public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { 143 PendingMessageCursor tsp = topics.remove(destination); 144 if (tsp != null) { 145 storePrefetches.remove(tsp); 146 } 147 return Collections.EMPTY_LIST; 148 } 149 150 /** 151 * @return true if there are no pending messages 152 */ 153 @Override 154 public synchronized boolean isEmpty() { 155 for (PendingMessageCursor tsp : storePrefetches) { 156 if( !tsp.isEmpty() ) 157 return false; 158 } 159 return true; 160 } 161 162 @Override 163 public synchronized boolean isEmpty(Destination destination) { 164 boolean result = true; 165 TopicStorePrefetch tsp = topics.get(destination); 166 if (tsp != null) { 167 result = tsp.isEmpty(); 168 } 169 return result; 170 } 171 172 /** 173 * Informs the Broker if the subscription needs to intervention to recover 174 * it's state e.g. DurableTopicSubscriber may do 175 * 176 * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor 177 * @return true if recovery required 178 */ 179 @Override 180 public boolean isRecoveryRequired() { 181 return false; 182 } 183 184 @Override 185 public synchronized void addMessageLast(MessageReference node) throws Exception { 186 if (node != null) { 187 Message msg = node.getMessage(); 188 if (isStarted()) { 189 if (!msg.isPersistent()) { 190 nonPersistent.addMessageLast(node); 191 } 192 } 193 if (msg.isPersistent()) { 194 Destination dest = msg.getRegionDestination(); 195 TopicStorePrefetch tsp = topics.get(dest); 196 if (tsp != null) { 197 tsp.addMessageLast(node); 198 if (prioritizedMessages && immediatePriorityDispatch && tsp.isPaging()) { 199 if (msg.getPriority() > tsp.getLastRecoveredPriority()) { 200 tsp.recoverMessage(node.getMessage(), true); 201 if (LOG.isTraceEnabled()) { 202 LOG.trace("cached high priority (" + msg.getPriority() + ") message:" + 203 msg.getMessageId() + ", current paged batch priority: " + 204 tsp.getLastRecoveredPriority() + ", cache size:" + tsp.batchList.size()); 205 } 206 } 207 } 208 } 209 } 210 211 } 212 } 213 214 @Override 215 public boolean isTransient() { 216 return subscription.isKeepDurableSubsActive(); 217 } 218 219 @Override 220 public void addMessageFirst(MessageReference node) throws Exception { 221 // for keep durable subs active, need to deal with redispatch 222 if (node != null) { 223 Message msg = node.getMessage(); 224 if (!msg.isPersistent()) { 225 nonPersistent.addMessageFirst(node); 226 } else { 227 Destination dest = msg.getRegionDestination(); 228 TopicStorePrefetch tsp = topics.get(dest); 229 if (tsp != null) { 230 tsp.addMessageFirst(node); 231 } 232 } 233 } 234 } 235 236 @Override 237 public synchronized void addRecoveredMessage(MessageReference node) throws Exception { 238 nonPersistent.addMessageLast(node); 239 } 240 241 @Override 242 public synchronized void clear() { 243 for (PendingMessageCursor tsp : storePrefetches) { 244 tsp.clear(); 245 } 246 } 247 248 @Override 249 public synchronized boolean hasNext() { 250 boolean result = true; 251 if (result) { 252 try { 253 currentCursor = getNextCursor(); 254 } catch (Exception e) { 255 LOG.error("Failed to get current cursor ", e); 256 throw new RuntimeException(e); 257 } 258 result = currentCursor != null ? currentCursor.hasNext() : false; 259 } 260 return result; 261 } 262 263 @Override 264 public synchronized MessageReference next() { 265 MessageReference result = currentCursor != null ? currentCursor.next() : null; 266 return result; 267 } 268 269 @Override 270 public synchronized void remove() { 271 if (currentCursor != null) { 272 currentCursor.remove(); 273 } 274 } 275 276 @Override 277 public synchronized void remove(MessageReference node) { 278 for (PendingMessageCursor tsp : storePrefetches) { 279 tsp.remove(node); 280 } 281 } 282 283 @Override 284 public synchronized void reset() { 285 for (PendingMessageCursor storePrefetch : storePrefetches) { 286 storePrefetch.reset(); 287 } 288 } 289 290 @Override 291 public synchronized void release() { 292 for (PendingMessageCursor storePrefetch : storePrefetches) { 293 storePrefetch.release(); 294 } 295 } 296 297 @Override 298 public synchronized int size() { 299 int pendingCount=0; 300 for (PendingMessageCursor tsp : storePrefetches) { 301 pendingCount += tsp.size(); 302 } 303 return pendingCount; 304 } 305 306 @Override 307 public void setMaxBatchSize(int newMaxBatchSize) { 308 for (PendingMessageCursor storePrefetch : storePrefetches) { 309 storePrefetch.setMaxBatchSize(newMaxBatchSize); 310 } 311 super.setMaxBatchSize(newMaxBatchSize); 312 } 313 314 @Override 315 public synchronized void gc() { 316 for (PendingMessageCursor tsp : storePrefetches) { 317 tsp.gc(); 318 } 319 } 320 321 @Override 322 public void setSystemUsage(SystemUsage usageManager) { 323 super.setSystemUsage(usageManager); 324 for (PendingMessageCursor tsp : storePrefetches) { 325 tsp.setSystemUsage(usageManager); 326 } 327 } 328 329 @Override 330 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { 331 super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 332 for (PendingMessageCursor cursor : storePrefetches) { 333 cursor.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 334 } 335 } 336 337 @Override 338 public void setMaxProducersToAudit(int maxProducersToAudit) { 339 super.setMaxProducersToAudit(maxProducersToAudit); 340 for (PendingMessageCursor cursor : storePrefetches) { 341 cursor.setMaxAuditDepth(maxAuditDepth); 342 } 343 } 344 345 @Override 346 public void setMaxAuditDepth(int maxAuditDepth) { 347 super.setMaxAuditDepth(maxAuditDepth); 348 for (PendingMessageCursor cursor : storePrefetches) { 349 cursor.setMaxAuditDepth(maxAuditDepth); 350 } 351 } 352 353 @Override 354 public void setEnableAudit(boolean enableAudit) { 355 super.setEnableAudit(enableAudit); 356 for (PendingMessageCursor cursor : storePrefetches) { 357 cursor.setEnableAudit(enableAudit); 358 } 359 } 360 361 @Override 362 public void setUseCache(boolean useCache) { 363 super.setUseCache(useCache); 364 for (PendingMessageCursor cursor : storePrefetches) { 365 cursor.setUseCache(useCache); 366 } 367 } 368 369 protected synchronized PendingMessageCursor getNextCursor() throws Exception { 370 if (currentCursor == null || currentCursor.isEmpty()) { 371 currentCursor = null; 372 for (PendingMessageCursor tsp : storePrefetches) { 373 if (tsp.hasNext()) { 374 currentCursor = tsp; 375 break; 376 } 377 } 378 // round-robin 379 if (storePrefetches.size()>1) { 380 PendingMessageCursor first = storePrefetches.remove(0); 381 storePrefetches.add(first); 382 } 383 } 384 return currentCursor; 385 } 386 387 @Override 388 public String toString() { 389 return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")"; 390 } 391 392 public boolean isImmediatePriorityDispatch() { 393 return immediatePriorityDispatch; 394 } 395 396 public void setImmediatePriorityDispatch(boolean immediatePriorityDispatch) { 397 this.immediatePriorityDispatch = immediatePriorityDispatch; 398 } 399 400}