001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.Iterator;
021import java.util.concurrent.ConcurrentHashMap;
022import java.util.concurrent.atomic.AtomicBoolean;
023import java.util.concurrent.atomic.AtomicLong;
024
025import javax.jms.InvalidSelectorException;
026import javax.jms.JMSException;
027
028import org.apache.activemq.broker.Broker;
029import org.apache.activemq.broker.ConnectionContext;
030import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
031import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
032import org.apache.activemq.command.ActiveMQDestination;
033import org.apache.activemq.command.ConsumerInfo;
034import org.apache.activemq.command.Message;
035import org.apache.activemq.command.MessageAck;
036import org.apache.activemq.command.MessageDispatch;
037import org.apache.activemq.command.MessageId;
038import org.apache.activemq.store.TopicMessageStore;
039import org.apache.activemq.usage.SystemUsage;
040import org.apache.activemq.usage.Usage;
041import org.apache.activemq.usage.UsageListener;
042import org.apache.activemq.util.SubscriptionKey;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
047
048    private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class);
049    private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
050    private final ConcurrentHashMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
051    private final SubscriptionKey subscriptionKey;
052    private final boolean keepDurableSubsActive;
053    private AtomicBoolean active = new AtomicBoolean();
054    private AtomicLong offlineTimestamp = new AtomicLong(-1);
055
056    public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
057        throws JMSException {
058        super(broker,usageManager, context, info);
059        this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
060        this.pending.setSystemUsage(usageManager);
061        this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
062        this.keepDurableSubsActive = keepDurableSubsActive;
063        subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
064
065    }
066
067    public final boolean isActive() {
068        return active.get();
069    }
070
071    public final long getOfflineTimestamp() {
072        return offlineTimestamp.get();
073    }
074
075    public boolean isFull() {
076        return !active.get() || super.isFull();
077    }
078
079    public void gc() {
080    }
081
082    /**
083     * store will have a pending ack for all durables, irrespective of the selector
084     * so we need to ack if node is un-matched
085     */
086    public void unmatched(MessageReference node) throws IOException {
087        MessageAck ack = new MessageAck();
088        ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE);
089        ack.setMessageID(node.getMessageId());
090        node.getRegionDestination().acknowledge(this.getContext(), this, ack, node);
091    }
092
093    @Override
094    protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
095        // statically configured via maxPageSize
096    }
097
098    public void add(ConnectionContext context, Destination destination) throws Exception {
099        if (!destinations.contains(destination)) {
100            super.add(context, destination);
101        }
102        // do it just once per destination
103        if (durableDestinations.containsKey(destination.getActiveMQDestination())) {
104             return;
105        }
106        durableDestinations.put(destination.getActiveMQDestination(), destination);
107
108        if (active.get() || keepDurableSubsActive) {
109            Topic topic = (Topic)destination;
110            topic.activate(context, this);
111            if (pending.isEmpty(topic)) {
112                topic.recoverRetroactiveMessages(context, this);
113            }
114            this.enqueueCounter+=pending.size();
115        } else if (destination.getMessageStore() != null) {
116            TopicMessageStore store = (TopicMessageStore)destination.getMessageStore();
117            try {
118                this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
119            } catch (IOException e) {
120                JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from store "+ e);
121                jmsEx.setLinkedException(e);
122                throw jmsEx;
123            }
124        }
125        dispatchPending();
126    }
127
128    public void activate(SystemUsage memoryManager, ConnectionContext context,
129            ConsumerInfo info) throws Exception {
130        if (!active.get()) {
131            this.context = context;
132            this.info = info;
133            LOG.debug("Activating " + this);
134            if (!keepDurableSubsActive) {
135                for (Iterator<Destination> iter = durableDestinations.values()
136                        .iterator(); iter.hasNext();) {
137                    Topic topic = (Topic) iter.next();
138                    add(context, topic);
139                    topic.activate(context, this);
140                }
141            }
142            synchronized (pendingLock) {
143                pending.setSystemUsage(memoryManager);
144                pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
145                pending.setMaxAuditDepth(getMaxAuditDepth());
146                pending.setMaxProducersToAudit(getMaxProducersToAudit());
147                pending.start();
148                // If nothing was in the persistent store, then try to use the
149                // recovery policy.
150                if (pending.isEmpty()) {
151                    for (Iterator<Destination> iter = durableDestinations.values()
152                            .iterator(); iter.hasNext();) {
153                        Topic topic = (Topic) iter.next();
154                        topic.recoverRetroactiveMessages(context, this);
155                    }
156                }
157            }
158            this.active.set(true);
159            this.offlineTimestamp.set(-1);
160            dispatchPending();
161            this.usageManager.getMemoryUsage().addUsageListener(this);
162        }
163    }
164
165    public void deactivate(boolean keepDurableSubsActive) throws Exception {
166        LOG.debug("Deactivating keepActive=" + keepDurableSubsActive + ", " + this);
167        active.set(false);
168        offlineTimestamp.set(System.currentTimeMillis());
169        this.usageManager.getMemoryUsage().removeUsageListener(this);
170        synchronized (pendingLock) {
171            pending.stop();
172
173            synchronized (dispatchLock) {
174                for (Iterator<Destination> iter = durableDestinations.values().iterator(); iter.hasNext();) {
175                    Topic topic = (Topic)iter.next();
176                    if (!keepDurableSubsActive) {
177                        topic.deactivate(context, this);
178                    } else {
179                        topic.getDestinationStatistics().getInflight().subtract(dispatched.size());
180                    }
181                }
182
183                for (final MessageReference node : dispatched) {
184                    // Mark the dispatched messages as redelivered for next time.
185                    Integer count = redeliveredMessages.get(node.getMessageId());
186                    if (count != null) {
187                        redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
188                    } else {
189                        redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
190                    }
191                    if (keepDurableSubsActive && pending.isTransient()) {
192                        pending.addMessageFirst(node);
193                        pending.rollback(node.getMessageId());
194                    } else {
195                        node.decrementReferenceCount();
196                    }
197                }
198                dispatched.clear();
199            }
200            if (!keepDurableSubsActive && pending.isTransient()) {
201                try {
202                    pending.reset();
203                    while (pending.hasNext()) {
204                        MessageReference node = pending.next();
205                        node.decrementReferenceCount();
206                        pending.remove();
207                    }
208                } finally {
209                    pending.release();
210                }
211            }
212        }
213        prefetchExtension.set(0);
214    }
215
216
217    protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
218        MessageDispatch md = super.createMessageDispatch(node, message);
219        if (node != QueueMessageReference.NULL_MESSAGE) {
220            Integer count = redeliveredMessages.get(node.getMessageId());
221            if (count != null) {
222                md.setRedeliveryCounter(count.intValue());
223            }
224        }
225        return md;
226    }
227
228    public void add(MessageReference node) throws Exception {
229        if (!active.get() && !keepDurableSubsActive) {
230            return;
231        }
232        super.add(node);
233    }
234
235    protected void dispatchPending() throws IOException {
236        if (isActive()) {
237            super.dispatchPending();
238        }
239    }
240
241    public void removePending(MessageReference node) throws IOException {
242        pending.remove(node);
243    }
244
245    protected void doAddRecoveredMessage(MessageReference message) throws Exception {
246        synchronized(pending) {
247            pending.addRecoveredMessage(message);
248        }
249    }
250
251    public int getPendingQueueSize() {
252        if (active.get() || keepDurableSubsActive) {
253            return super.getPendingQueueSize();
254        }
255        // TODO: need to get from store
256        return 0;
257    }
258
259    public void setSelector(String selector) throws InvalidSelectorException {
260        throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
261    }
262
263    protected boolean canDispatch(MessageReference node) {
264        return isActive();
265    }
266
267    protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
268        node.getRegionDestination().acknowledge(context, this, ack, node);
269        redeliveredMessages.remove(node.getMessageId());
270        node.decrementReferenceCount();
271    }
272
273    public synchronized String toString() {
274        return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + enqueueCounter + ", pending="
275               + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
276    }
277
278    public SubscriptionKey getSubscriptionKey() {
279        return subscriptionKey;
280    }
281
282    /**
283     * Release any references that we are holding.
284     */
285    public void destroy() {
286        synchronized (pendingLock) {
287            try {
288
289                pending.reset();
290                while (pending.hasNext()) {
291                    MessageReference node = pending.next();
292                    node.decrementReferenceCount();
293                }
294
295            } finally {
296                pending.release();
297                pending.clear();
298            }
299        }
300        synchronized  (dispatchLock) {
301            for (MessageReference node : dispatched) {
302                node.decrementReferenceCount();
303            }
304            dispatched.clear();
305        }
306        setSlowConsumer(false);
307    }
308
309    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
310        if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
311            try {
312                dispatchPending();
313            } catch (IOException e) {
314                LOG.warn("problem calling dispatchMatched", e);
315            }
316        }
317    }
318
319    protected boolean isDropped(MessageReference node) {
320       return false;
321    }
322
323    public boolean isKeepDurableSubsActive() {
324        return keepDurableSubsActive;
325    }
326}