001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.concurrent.CancellationException;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.CopyOnWriteArrayList;
026import java.util.concurrent.Future;
027import java.util.concurrent.locks.ReentrantReadWriteLock;
028
029import org.apache.activemq.broker.BrokerService;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.broker.ProducerBrokerExchange;
032import org.apache.activemq.broker.region.policy.DispatchPolicy;
033import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
034import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
035import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
036import org.apache.activemq.broker.util.InsertionCountList;
037import org.apache.activemq.command.ActiveMQDestination;
038import org.apache.activemq.command.ExceptionResponse;
039import org.apache.activemq.command.Message;
040import org.apache.activemq.command.MessageAck;
041import org.apache.activemq.command.MessageId;
042import org.apache.activemq.command.ProducerAck;
043import org.apache.activemq.command.ProducerInfo;
044import org.apache.activemq.command.Response;
045import org.apache.activemq.command.SubscriptionInfo;
046import org.apache.activemq.filter.MessageEvaluationContext;
047import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
048import org.apache.activemq.store.MessageRecoveryListener;
049import org.apache.activemq.store.TopicMessageStore;
050import org.apache.activemq.thread.Task;
051import org.apache.activemq.thread.TaskRunner;
052import org.apache.activemq.thread.TaskRunnerFactory;
053import org.apache.activemq.transaction.Synchronization;
054import org.apache.activemq.util.SubscriptionKey;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058/**
059 * The Topic is a destination that sends a copy of a message to every active
060 * Subscription registered.
061 */
062public class Topic extends BaseDestination implements Task {
063    protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
064    private final TopicMessageStore topicStore;
065    protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
066    private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
067    private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
068    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
069    private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
070    private final TaskRunner taskRunner;
071    private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
072    private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
073        public void run() {
074            try {
075                Topic.this.taskRunner.wakeup();
076            } catch (InterruptedException e) {
077            }
078        };
079    };
080
081    public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
082            DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
083        super(brokerService, store, destination, parentStats);
084        this.topicStore = store;
085        // set default subscription recovery policy
086        subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
087        this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
088    }
089
090    @Override
091    public void initialize() throws Exception {
092        super.initialize();
093        if (store != null) {
094            // AMQ-2586: Better to leave this stat at zero than to give the user
095            // misleading metrics.
096            // int messageCount = store.getMessageCount();
097            // destinationStatistics.getMessages().setCount(messageCount);
098        }
099    }
100
101    public List<Subscription> getConsumers() {
102        synchronized (consumers) {
103            return new ArrayList<Subscription>(consumers);
104        }
105    }
106
107    public boolean lock(MessageReference node, LockOwner sub) {
108        return true;
109    }
110
111    public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
112
113       super.addSubscription(context, sub);
114
115        if (!sub.getConsumerInfo().isDurable()) {
116
117            // Do a retroactive recovery if needed.
118            if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {
119
120                // synchronize with dispatch method so that no new messages are sent
121                // while we are recovering a subscription to avoid out of order messages.
122                dispatchLock.writeLock().lock();
123                try {
124                    synchronized (consumers) {
125                        sub.add(context, this);
126                        consumers.add(sub);
127                    }
128                    subscriptionRecoveryPolicy.recover(context, this, sub);
129                } finally {
130                    dispatchLock.writeLock().unlock();
131                }
132
133            } else {
134                synchronized (consumers) {
135                    sub.add(context, this);
136                    consumers.add(sub);
137                }
138            }
139        } else {
140            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
141                sub.add(context, this);
142                if(dsub.isActive()) {
143                        synchronized (consumers) {
144                                boolean hasSubscription = false;
145        
146                                if(consumers.size()==0) {
147                                hasSubscription = false;
148                                } else {
149                                        for(Subscription currentSub : consumers) {
150                                                if(currentSub.getConsumerInfo().isDurable()) {
151                                            DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub;
152                                            if(dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) {
153                                                hasSubscription = true;
154                                                break;
155                                            }
156                                                }
157                                        }
158                                }
159                                
160                        if(!hasSubscription)
161                                consumers.add(sub);
162                    }
163                }
164            durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
165        }
166    }
167
168    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
169            throws Exception {
170        if (!sub.getConsumerInfo().isDurable()) {
171            super.removeSubscription(context, sub, lastDeliveredSequenceId);
172            synchronized (consumers) {
173                consumers.remove(sub);
174            }
175        }
176        sub.remove(context, this);
177    }
178
179    public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
180        if (topicStore != null) {
181            topicStore.deleteSubscription(key.clientId, key.subscriptionName);
182            DurableTopicSubscription removed = durableSubcribers.remove(key);
183            if (removed != null) {
184                destinationStatistics.getConsumers().decrement();
185                // deactivate and remove
186                removed.deactivate(false);
187                consumers.remove(removed);
188            }
189        }
190    }
191
192    public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
193        // synchronize with dispatch method so that no new messages are sent
194        // while we are recovering a subscription to avoid out of order messages.
195        dispatchLock.writeLock().lock();
196        try {
197
198            if (topicStore == null) {
199                return;
200            }
201
202            // Recover the durable subscription.
203            String clientId = subscription.getSubscriptionKey().getClientId();
204            String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
205            String selector = subscription.getConsumerInfo().getSelector();
206            SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
207            if (info != null) {
208                // Check to see if selector changed.
209                String s1 = info.getSelector();
210                if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) {
211                    // Need to delete the subscription
212                    topicStore.deleteSubscription(clientId, subscriptionName);
213                    info = null;
214                } else {
215                    synchronized (consumers) {
216                        consumers.add(subscription);
217                    }
218                }
219            }
220
221            // Do we need to create the subscription?
222            if (info == null) {
223                info = new SubscriptionInfo();
224                info.setClientId(clientId);
225                info.setSelector(selector);
226                info.setSubscriptionName(subscriptionName);
227                info.setDestination(getActiveMQDestination());
228                // This destination is an actual destination id.
229                info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
230                // This destination might be a pattern
231                synchronized (consumers) {
232                    consumers.add(subscription);
233                    topicStore.addSubsciption(info, subscription.getConsumerInfo().isRetroactive());
234                }
235            }
236
237            final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
238            msgContext.setDestination(destination);
239            if (subscription.isRecoveryRequired()) {
240                topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
241                    public boolean recoverMessage(Message message) throws Exception {
242                        message.setRegionDestination(Topic.this);
243                        try {
244                            msgContext.setMessageReference(message);
245                            if (subscription.matches(message, msgContext)) {
246                                subscription.add(message);
247                            }
248                        } catch (IOException e) {
249                            LOG.error("Failed to recover this message " + message);
250                        }
251                        return true;
252                    }
253
254                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
255                        throw new RuntimeException("Should not be called.");
256                    }
257
258                    public boolean hasSpace() {
259                        return true;
260                    }
261
262                    public boolean isDuplicate(MessageId id) {
263                        return false;
264                    }
265                });
266            }
267        } finally {
268            dispatchLock.writeLock().unlock();
269        }
270    }
271
272    public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception {
273        synchronized (consumers) {
274            consumers.remove(sub);
275        }
276        sub.remove(context, this);
277    }
278
279    protected void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
280        if (subscription.getConsumerInfo().isRetroactive()) {
281            subscriptionRecoveryPolicy.recover(context, this, subscription);
282        }
283    }
284
285    public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
286        final ConnectionContext context = producerExchange.getConnectionContext();
287
288        final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
289        final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
290                && !context.isInRecoveryMode();
291
292        // There is delay between the client sending it and it arriving at the
293        // destination.. it may have expired.
294        if (message.isExpired()) {
295            broker.messageExpired(context, message, null);
296            getDestinationStatistics().getExpired().increment();
297            if (sendProducerAck) {
298                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
299                context.getConnection().dispatchAsync(ack);
300            }
301            return;
302        }
303
304        if (memoryUsage.isFull()) {
305            isFull(context, memoryUsage);
306            fastProducer(context, producerInfo);
307
308            if (isProducerFlowControl() && context.isProducerFlowControl()) {
309
310                if (warnOnProducerFlowControl) {
311                    warnOnProducerFlowControl = false;
312                    LOG.info(memoryUsage + ", Usage Manager memory limit reached for "
313                                    + getActiveMQDestination().getQualifiedName()
314                                    + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
315                                    + " See http://activemq.apache.org/producer-flow-control.html for more info");
316                }
317
318                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
319                    throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
320                            + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
321                            + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
322                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
323                }
324
325                // We can avoid blocking due to low usage if the producer is
326                // sending
327                // a sync message or
328                // if it is using a producer window
329                if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
330                    synchronized (messagesWaitingForSpace) {
331                        messagesWaitingForSpace.add(new Runnable() {
332                            public void run() {
333                                try {
334
335                                    // While waiting for space to free up... the
336                                    // message may have expired.
337                                    if (message.isExpired()) {
338                                        broker.messageExpired(context, message, null);
339                                        getDestinationStatistics().getExpired().increment();
340                                    } else {
341                                        doMessageSend(producerExchange, message);
342                                    }
343
344                                    if (sendProducerAck) {
345                                        ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
346                                                .getSize());
347                                        context.getConnection().dispatchAsync(ack);
348                                    } else {
349                                        Response response = new Response();
350                                        response.setCorrelationId(message.getCommandId());
351                                        context.getConnection().dispatchAsync(response);
352                                    }
353
354                                } catch (Exception e) {
355                                    if (!sendProducerAck && !context.isInRecoveryMode()) {
356                                        ExceptionResponse response = new ExceptionResponse(e);
357                                        response.setCorrelationId(message.getCommandId());
358                                        context.getConnection().dispatchAsync(response);
359                                    }
360                                }
361
362                            }
363                        });
364
365                        registerCallbackForNotFullNotification();
366                        context.setDontSendReponse(true);
367                        return;
368                    }
369
370                } else {
371                    // Producer flow control cannot be used, so we have do the
372                    // flow
373                    // control at the broker
374                    // by blocking this thread until there is space available.
375
376                    if (memoryUsage.isFull()) {
377                        if (context.isInTransaction()) {
378
379                            int count = 0;
380                            while (!memoryUsage.waitForSpace(1000)) {
381                                if (context.getStopping().get()) {
382                                    throw new IOException("Connection closed, send aborted.");
383                                }
384                                if (count > 2 && context.isInTransaction()) {
385                                    count = 0;
386                                    int size = context.getTransaction().size();
387                                    LOG.warn("Waiting for space to send  transacted message - transaction elements = "
388                                            + size + " need more space to commit. Message = " + message);
389                                }
390                            }
391                        } else {
392                            waitForSpace(
393                                    context,
394                                    memoryUsage,
395                                    "Usage Manager Memory Usage limit reached. Stopping producer ("
396                                            + message.getProducerId()
397                                            + ") to prevent flooding "
398                                            + getActiveMQDestination().getQualifiedName()
399                                            + "."
400                                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
401                        }
402                    }
403
404                    // The usage manager could have delayed us by the time
405                    // we unblock the message could have expired..
406                    if (message.isExpired()) {
407                        getDestinationStatistics().getExpired().increment();
408                        if (LOG.isDebugEnabled()) {
409                            LOG.debug("Expired message: " + message);
410                        }
411                        return;
412                    }
413                }
414            }
415        }
416
417        doMessageSend(producerExchange, message);
418        messageDelivered(context, message);
419        if (sendProducerAck) {
420            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
421            context.getConnection().dispatchAsync(ack);
422        }
423    }
424
425    /**
426     * do send the message - this needs to be synchronized to ensure messages
427     * are stored AND dispatched in the right order
428     *
429     * @param producerExchange
430     * @param message
431     * @throws IOException
432     * @throws Exception
433     */
434    synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
435            throws IOException, Exception {
436        final ConnectionContext context = producerExchange.getConnectionContext();
437        message.setRegionDestination(this);
438        message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
439        Future<Object> result = null;
440
441        if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
442            if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
443                final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
444                        + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
445                        + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
446                        + " See http://activemq.apache.org/producer-flow-control.html for more info";
447                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
448                    throw new javax.jms.ResourceAllocationException(logMessage);
449                }
450
451                waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
452            }
453            result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
454        }
455
456        message.incrementReferenceCount();
457
458        if (context.isInTransaction()) {
459            context.getTransaction().addSynchronization(new Synchronization() {
460                @Override
461                public void afterCommit() throws Exception {
462                    // It could take while before we receive the commit
463                    // operation.. by that time the message could have
464                    // expired..
465                    if (broker.isExpired(message)) {
466                        getDestinationStatistics().getExpired().increment();
467                        broker.messageExpired(context, message, null);
468                        message.decrementReferenceCount();
469                        return;
470                    }
471                    try {
472                        dispatch(context, message);
473                    } finally {
474                        message.decrementReferenceCount();
475                    }
476                }
477            });
478
479        } else {
480            try {
481                dispatch(context, message);
482            } finally {
483                message.decrementReferenceCount();
484            }
485        }
486
487        if (result != null && !result.isCancelled()) {
488            try {
489                result.get();
490            } catch (CancellationException e) {
491                // ignore - the task has been cancelled if the message
492                // has already been deleted
493            }
494        }
495    }
496
497    private boolean canOptimizeOutPersistence() {
498        return durableSubcribers.size() == 0;
499    }
500
501    @Override
502    public String toString() {
503        return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
504    }
505
506    public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
507            final MessageReference node) throws IOException {
508        if (topicStore != null && node.isPersistent()) {
509            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
510            SubscriptionKey key = dsub.getSubscriptionKey();
511            topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), ack);
512        }
513        messageConsumed(context, node);
514    }
515
516    public void gc() {
517    }
518
519    public Message loadMessage(MessageId messageId) throws IOException {
520        return topicStore != null ? topicStore.getMessage(messageId) : null;
521    }
522
523    public void start() throws Exception {
524        this.subscriptionRecoveryPolicy.start();
525        if (memoryUsage != null) {
526            memoryUsage.start();
527        }
528
529        if (getExpireMessagesPeriod() > 0) {
530            scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
531        }
532    }
533
534    public void stop() throws Exception {
535        if (taskRunner != null) {
536            taskRunner.shutdown();
537        }
538        this.subscriptionRecoveryPolicy.stop();
539        if (memoryUsage != null) {
540            memoryUsage.stop();
541        }
542        if (this.topicStore != null) {
543            this.topicStore.stop();
544        }
545
546         scheduler.cancel(expireMessagesTask);
547    }
548
549    public Message[] browse() {
550        final List<Message> result = new ArrayList<Message>();
551        doBrowse(result, getMaxBrowsePageSize());
552        return result.toArray(new Message[result.size()]);
553    }
554
555    private void doBrowse(final List<Message> browseList, final int max) {
556        try {
557            if (topicStore != null) {
558                final List<Message> toExpire = new ArrayList<Message>();
559                topicStore.recover(new MessageRecoveryListener() {
560                    public boolean recoverMessage(Message message) throws Exception {
561                        if (message.isExpired()) {
562                            toExpire.add(message);
563                        }
564                        browseList.add(message);
565                        return true;
566                    }
567
568                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
569                        return true;
570                    }
571
572                    public boolean hasSpace() {
573                        return browseList.size() < max;
574                    }
575
576                    public boolean isDuplicate(MessageId id) {
577                        return false;
578                    }
579                });
580                final ConnectionContext connectionContext = createConnectionContext();
581                for (Message message : toExpire) {
582                    for (DurableTopicSubscription sub : durableSubcribers.values()) {
583                        if (!sub.isActive()) {
584                            messageExpired(connectionContext, sub, message);
585                        }
586                    }
587                }
588                Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
589                if (msgs != null) {
590                    for (int i = 0; i < msgs.length && browseList.size() < max; i++) {
591                        browseList.add(msgs[i]);
592                    }
593                }
594            }
595        } catch (Throwable e) {
596            LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e);
597        }
598    }
599
600    public boolean iterate() {
601        synchronized (messagesWaitingForSpace) {
602            while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
603                Runnable op = messagesWaitingForSpace.removeFirst();
604                op.run();
605            }
606
607            if (!messagesWaitingForSpace.isEmpty()) {
608                registerCallbackForNotFullNotification();
609            }
610        }
611        return false;
612    }
613
614    private void registerCallbackForNotFullNotification() {
615        // If the usage manager is not full, then the task will not
616        // get called..
617        if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
618            // so call it directly here.
619            sendMessagesWaitingForSpaceTask.run();
620        }
621    }
622
623    // Properties
624    // -------------------------------------------------------------------------
625
626    public DispatchPolicy getDispatchPolicy() {
627        return dispatchPolicy;
628    }
629
630    public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
631        this.dispatchPolicy = dispatchPolicy;
632    }
633
634    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
635        return subscriptionRecoveryPolicy;
636    }
637
638    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
639        this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
640    }
641
642    // Implementation methods
643    // -------------------------------------------------------------------------
644
645    public final void wakeup() {
646    }
647
648    protected void dispatch(final ConnectionContext context, Message message) throws Exception {
649        // AMQ-2586: Better to leave this stat at zero than to give the user
650        // misleading metrics.
651        // destinationStatistics.getMessages().increment();
652        destinationStatistics.getEnqueues().increment();
653        MessageEvaluationContext msgContext = null;
654
655        dispatchLock.readLock().lock();
656        try {
657            if (!subscriptionRecoveryPolicy.add(context, message)) {
658                return;
659            }
660            synchronized (consumers) {
661                if (consumers.isEmpty()) {
662                    onMessageWithNoConsumers(context, message);
663                    return;
664                }
665            }
666            msgContext = context.getMessageEvaluationContext();
667            msgContext.setDestination(destination);
668            msgContext.setMessageReference(message);
669            if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
670                onMessageWithNoConsumers(context, message);
671            }
672
673        } finally {
674            dispatchLock.readLock().unlock();
675            if (msgContext != null) {
676                msgContext.clear();
677            }
678        }
679    }
680
681    private final Runnable expireMessagesTask = new Runnable() {
682        public void run() {
683            List<Message> browsedMessages = new InsertionCountList<Message>();
684            doBrowse(browsedMessages, getMaxExpirePageSize());
685        }
686    };
687
688    public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
689        broker.messageExpired(context, reference, subs);
690        // AMQ-2586: Better to leave this stat at zero than to give the user
691        // misleading metrics.
692        // destinationStatistics.getMessages().decrement();
693        destinationStatistics.getEnqueues().decrement();
694        destinationStatistics.getExpired().increment();
695        MessageAck ack = new MessageAck();
696        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
697        ack.setDestination(destination);
698        ack.setMessageID(reference.getMessageId());
699        try {
700            if (subs instanceof DurableTopicSubscription) {
701                ((DurableTopicSubscription)subs).removePending(reference);
702            }
703            acknowledge(context, subs, ack, reference);
704        } catch (Exception e) {
705            LOG.error("Failed to remove expired Message from the store ", e);
706        }
707    }
708
709    @Override
710    protected Logger getLog() {
711        return LOG;
712    }
713
714    protected boolean isOptimizeStorage(){
715        boolean result = false;
716
717        if (isDoOptimzeMessageStorage() && durableSubcribers.isEmpty()==false){
718                result = true;
719                for (DurableTopicSubscription s : durableSubcribers.values()) {
720                    if (s.isActive()== false){
721                        result = false;
722                        break;
723                    }
724                    if (s.getPrefetchSize()==0){
725                        result = false;
726                        break;
727                    }
728                    if (s.isSlowConsumer()){
729                        result = false;
730                        break;
731                    }
732                    if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
733                        result = false;
734                        break;
735                    }
736                }
737        }
738        return result;
739    }
740}