001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.util.*; 020import java.util.concurrent.ConcurrentHashMap; 021import javax.jms.InvalidDestinationException; 022import javax.jms.JMSException; 023import org.apache.activemq.advisory.AdvisorySupport; 024import org.apache.activemq.broker.ConnectionContext; 025import org.apache.activemq.broker.region.policy.PolicyEntry; 026import org.apache.activemq.command.ActiveMQDestination; 027import org.apache.activemq.command.ConnectionId; 028import org.apache.activemq.command.ConsumerId; 029import org.apache.activemq.command.ConsumerInfo; 030import org.apache.activemq.command.RemoveSubscriptionInfo; 031import org.apache.activemq.command.SessionId; 032import org.apache.activemq.command.SubscriptionInfo; 033import org.apache.activemq.store.TopicMessageStore; 034import org.apache.activemq.thread.TaskRunnerFactory; 035import org.apache.activemq.usage.SystemUsage; 036import org.apache.activemq.util.LongSequenceGenerator; 037import org.apache.activemq.util.SubscriptionKey; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * 043 */ 044public class TopicRegion extends AbstractRegion { 045 private static final Logger LOG = LoggerFactory.getLogger(TopicRegion.class); 046 protected final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>(); 047 private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator(); 048 private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId()); 049 private boolean keepDurableSubsActive; 050 051 private Timer cleanupTimer; 052 private TimerTask cleanupTask; 053 054 public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, 055 DestinationFactory destinationFactory) { 056 super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 057 if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) { 058 this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true); 059 this.cleanupTask = new TimerTask() { 060 public void run() { 061 doCleanup(); 062 } 063 }; 064 this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(), broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule()); 065 } 066 } 067 068 @Override 069 public void stop() throws Exception { 070 super.stop(); 071 if (cleanupTimer != null) { 072 cleanupTimer.cancel(); 073 } 074 } 075 076 public void doCleanup() { 077 long now = System.currentTimeMillis(); 078 for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) { 079 DurableTopicSubscription sub = entry.getValue(); 080 if (!sub.isActive()) { 081 long offline = sub.getOfflineTimestamp(); 082 if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) { 083 LOG.info("Destroying durable subscriber due to inactivity: " + sub); 084 try { 085 RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); 086 info.setClientId(entry.getKey().getClientId()); 087 info.setSubscriptionName(entry.getKey().getSubscriptionName()); 088 ConnectionContext context = new ConnectionContext(); 089 context.setBroker(broker); 090 context.setClientId(entry.getKey().getClientId()); 091 removeSubscription(context, info); 092 } catch (Exception e) { 093 LOG.error("Failed to remove inactive durable subscriber", e); 094 } 095 } 096 } 097 } 098 } 099 100 @Override 101 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 102 if (info.isDurable()) { 103 ActiveMQDestination destination = info.getDestination(); 104 if (!destination.isPattern()) { 105 // Make sure the destination is created. 106 lookup(context, destination,true); 107 } 108 String clientId = context.getClientId(); 109 String subscriptionName = info.getSubscriptionName(); 110 SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 111 DurableTopicSubscription sub = durableSubscriptions.get(key); 112 if (sub != null) { 113 if (sub.isActive()) { 114 throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subscriptionName); 115 } 116 // Has the selector changed?? 117 if (hasDurableSubChanged(info, sub.getConsumerInfo())) { 118 // Remove the consumer first then add it. 119 durableSubscriptions.remove(key); 120 destinationsLock.readLock().lock(); 121 try { 122 for (Destination dest : destinations.values()) { 123 //Account for virtual destinations 124 if (dest instanceof Topic){ 125 Topic topic = (Topic)dest; 126 topic.deleteSubscription(context, key); 127 } 128 } 129 } finally { 130 destinationsLock.readLock().unlock(); 131 } 132 super.removeConsumer(context, sub.getConsumerInfo()); 133 super.addConsumer(context, info); 134 sub = durableSubscriptions.get(key); 135 } else { 136 // Change the consumer id key of the durable sub. 137 if (sub.getConsumerInfo().getConsumerId() != null) { 138 subscriptions.remove(sub.getConsumerInfo().getConsumerId()); 139 } 140 subscriptions.put(info.getConsumerId(), sub); 141 } 142 } else { 143 super.addConsumer(context, info); 144 sub = durableSubscriptions.get(key); 145 if (sub == null) { 146 throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: " + key.getClientId() 147 + " subscriberName: " + key.getSubscriptionName()); 148 } 149 } 150 sub.activate(usageManager, context, info); 151 return sub; 152 } else { 153 return super.addConsumer(context, info); 154 } 155 } 156 157 @Override 158 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 159 if (info.isDurable()) { 160 161 SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 162 DurableTopicSubscription sub = durableSubscriptions.get(key); 163 if (sub != null) { 164 sub.deactivate(keepDurableSubsActive); 165 } 166 167 } else { 168 super.removeConsumer(context, info); 169 } 170 } 171 172 @Override 173 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 174 SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubscriptionName()); 175 DurableTopicSubscription sub = durableSubscriptions.get(key); 176 if (sub == null) { 177 throw new InvalidDestinationException("No durable subscription exists for: " + info.getSubscriptionName()); 178 } 179 if (sub.isActive()) { 180 throw new JMSException("Durable consumer is in use"); 181 } else { 182 durableSubscriptions.remove(key); 183 } 184 185 destinationsLock.readLock().lock(); 186 try { 187 for (Destination dest : destinations.values()) { 188 //Account for virtual destinations 189 if (dest instanceof Topic){ 190 Topic topic = (Topic)dest; 191 topic.deleteSubscription(context, key); 192 } 193 } 194 } finally { 195 destinationsLock.readLock().unlock(); 196 } 197 198 if (subscriptions.get(sub.getConsumerInfo().getConsumerId()) != null) { 199 super.removeConsumer(context, sub.getConsumerInfo()); 200 } else { 201 // try destroying inactive subscriptions 202 destroySubscription(sub); 203 } 204 } 205 206 @Override 207 public String toString() { 208 return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%"; 209 } 210 211 @Override 212 protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception { 213 List<Subscription> rc = super.addSubscriptionsForDestination(context, dest); 214 Set<Subscription> dupChecker = new HashSet<Subscription>(rc); 215 216 TopicMessageStore store = (TopicMessageStore)dest.getMessageStore(); 217 // Eagerly recover the durable subscriptions 218 if (store != null) { 219 SubscriptionInfo[] infos = store.getAllSubscriptions(); 220 for (int i = 0; i < infos.length; i++) { 221 222 SubscriptionInfo info = infos[i]; 223 if (LOG.isDebugEnabled()) { 224 LOG.debug("Restoring durable subscription: " + info); 225 } 226 SubscriptionKey key = new SubscriptionKey(info); 227 228 // A single durable sub may be subscribing to multiple topics. 229 // so it might exist already. 230 DurableTopicSubscription sub = durableSubscriptions.get(key); 231 ConsumerInfo consumerInfo = createInactiveConsumerInfo(info); 232 if (sub == null) { 233 ConnectionContext c = new ConnectionContext(); 234 c.setBroker(context.getBroker()); 235 c.setClientId(key.getClientId()); 236 c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId()); 237 sub = (DurableTopicSubscription)createSubscription(c, consumerInfo); 238 } 239 240 if (dupChecker.contains(sub)) { 241 continue; 242 } 243 244 dupChecker.add(sub); 245 rc.add(sub); 246 dest.addSubscription(context, sub); 247 } 248 249 // Now perhaps there other durable subscriptions (via wild card) 250 // that would match this destination.. 251 durableSubscriptions.values(); 252 for (DurableTopicSubscription sub : durableSubscriptions.values()) { 253 // Skip over subscriptions that we allready added.. 254 if (dupChecker.contains(sub)) { 255 continue; 256 } 257 258 if (sub.matches(dest.getActiveMQDestination())) { 259 rc.add(sub); 260 dest.addSubscription(context, sub); 261 } 262 } 263 } 264 return rc; 265 } 266 267 public ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) { 268 ConsumerInfo rc = new ConsumerInfo(); 269 rc.setSelector(info.getSelector()); 270 rc.setSubscriptionName(info.getSubscriptionName()); 271 rc.setDestination(info.getSubscribedDestination()); 272 rc.setConsumerId(createConsumerId()); 273 return rc; 274 } 275 276 private ConsumerId createConsumerId() { 277 return new ConsumerId(recoveredDurableSubSessionId, recoveredDurableSubIdGenerator.getNextSequenceId()); 278 } 279 280 protected void configureTopic(Topic topic, ActiveMQDestination destination) { 281 if (broker.getDestinationPolicy() != null) { 282 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 283 if (entry != null) { 284 entry.configure(broker,topic); 285 } 286 } 287 } 288 289 @Override 290 protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException { 291 ActiveMQDestination destination = info.getDestination(); 292 293 if (info.isDurable()) { 294 if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 295 throw new JMSException("Cannot create a durable subscription for an advisory Topic"); 296 } 297 SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 298 DurableTopicSubscription sub = durableSubscriptions.get(key); 299 300 if (sub == null) { 301 302 sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive); 303 304 if (destination != null && broker.getDestinationPolicy() != null) { 305 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 306 if (entry != null) { 307 entry.configure(broker, usageManager, sub); 308 } 309 } 310 durableSubscriptions.put(key, sub); 311 } else { 312 throw new JMSException("That durable subscription is already active."); 313 } 314 return sub; 315 } 316 try { 317 TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager); 318 // lets configure the subscription depending on the destination 319 if (destination != null && broker.getDestinationPolicy() != null) { 320 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 321 if (entry != null) { 322 entry.configure(broker, usageManager, answer); 323 } 324 } 325 answer.init(); 326 return answer; 327 } catch (Exception e) { 328 LOG.error("Failed to create TopicSubscription ", e); 329 JMSException jmsEx = new JMSException("Couldn't create TopicSubscription"); 330 jmsEx.setLinkedException(e); 331 throw jmsEx; 332 } 333 } 334 335 /** 336 */ 337 private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) { 338 if (info1.getSelector() != null ^ info2.getSelector() != null) { 339 return true; 340 } 341 if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) { 342 return true; 343 } 344 return !info1.getDestination().equals(info2.getDestination()); 345 } 346 347 @Override 348 protected Set<ActiveMQDestination> getInactiveDestinations() { 349 Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations(); 350 for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) { 351 ActiveMQDestination dest = iter.next(); 352 if (!dest.isTopic()) { 353 iter.remove(); 354 } 355 } 356 return inactiveDestinations; 357 } 358 359 public boolean isKeepDurableSubsActive() { 360 return keepDurableSubsActive; 361 } 362 363 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 364 this.keepDurableSubsActive = keepDurableSubsActive; 365 } 366 367 public boolean durableSubscriptionExists(SubscriptionKey key) { 368 return this.durableSubscriptions.containsKey(key); 369 } 370 371}