001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Iterator;
022import java.util.List;
023import java.util.concurrent.CountDownLatch;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.atomic.AtomicInteger;
026
027import javax.jms.InvalidSelectorException;
028import javax.jms.JMSException;
029
030import org.apache.activemq.broker.Broker;
031import org.apache.activemq.broker.ConnectionContext;
032import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
033import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
034import org.apache.activemq.command.ActiveMQMessage;
035import org.apache.activemq.command.ConsumerControl;
036import org.apache.activemq.command.ConsumerInfo;
037import org.apache.activemq.command.Message;
038import org.apache.activemq.command.MessageAck;
039import org.apache.activemq.command.MessageDispatch;
040import org.apache.activemq.command.MessageDispatchNotification;
041import org.apache.activemq.command.MessageId;
042import org.apache.activemq.command.MessagePull;
043import org.apache.activemq.command.Response;
044import org.apache.activemq.thread.Scheduler;
045import org.apache.activemq.transaction.Synchronization;
046import org.apache.activemq.usage.SystemUsage;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * A subscription that honors the pre-fetch option of the ConsumerInfo.
052 */
053public abstract class PrefetchSubscription extends AbstractSubscription {
054
055    private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class);
056    protected final Scheduler scheduler;
057
058    protected PendingMessageCursor pending;
059    protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
060    protected final AtomicInteger prefetchExtension = new AtomicInteger();
061    protected boolean usePrefetchExtension = true;
062    protected long enqueueCounter;
063    protected long dispatchCounter;
064    protected long dequeueCounter;
065    private int maxProducersToAudit=32;
066    private int maxAuditDepth=2048;
067    protected final SystemUsage usageManager;
068    protected final Object pendingLock = new Object();
069    protected final Object dispatchLock = new Object();
070    private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
071
072    public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
073        super(broker,context, info);
074        this.usageManager=usageManager;
075        pending = cursor;
076        this.scheduler = broker.getScheduler();
077    }
078
079    public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
080        this(broker,usageManager,context, info, new VMPendingMessageCursor(false));
081    }
082
083    /**
084     * Allows a message to be pulled on demand by a client
085     */
086    public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
087        // The slave should not deliver pull messages.
088        // TODO: when the slave becomes a master, He should send a NULL message to all the
089        // consumers to 'wake them up' in case they were waiting for a message.
090        if (getPrefetchSize() == 0 && !isSlave()) {
091
092            prefetchExtension.incrementAndGet();
093            final long dispatchCounterBeforePull = dispatchCounter;
094
095            // Have the destination push us some messages.
096            for (Destination dest : destinations) {
097                dest.iterate();
098            }
099            dispatchPending();
100
101            synchronized(this) {
102                // If there was nothing dispatched.. we may need to setup a timeout.
103                if (dispatchCounterBeforePull == dispatchCounter) {
104                    // immediate timeout used by receiveNoWait()
105                    if (pull.getTimeout() == -1) {
106                        // Send a NULL message.
107                        add(QueueMessageReference.NULL_MESSAGE);
108                        dispatchPending();
109                    }
110                    if (pull.getTimeout() > 0) {
111                        scheduler.executeAfterDelay(new Runnable() {
112                            @Override
113                            public void run() {
114                                pullTimeout(dispatchCounterBeforePull);
115                            }
116                        }, pull.getTimeout());
117                    }
118                }
119            }
120        }
121        return null;
122    }
123
124    /**
125     * Occurs when a pull times out. If nothing has been dispatched since the
126     * timeout was setup, then send the NULL message.
127     */
128    final void pullTimeout(long dispatchCounterBeforePull) {
129        synchronized (pendingLock) {
130            if (dispatchCounterBeforePull == dispatchCounter) {
131                try {
132                    add(QueueMessageReference.NULL_MESSAGE);
133                    dispatchPending();
134                } catch (Exception e) {
135                    context.getConnection().serviceException(e);
136                }
137            }
138        }
139    }
140
141    public void add(MessageReference node) throws Exception {
142        synchronized (pendingLock) {
143            // The destination may have just been removed...
144            if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) {
145                // perhaps we should inform the caller that we are no longer valid to dispatch to?
146                return;
147            }
148            enqueueCounter++;
149            pending.addMessageLast(node);
150        }
151        dispatchPending();
152    }
153
154    public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
155        synchronized(pendingLock) {
156            try {
157                pending.reset();
158                while (pending.hasNext()) {
159                    MessageReference node = pending.next();
160                    node.decrementReferenceCount();
161                    if (node.getMessageId().equals(mdn.getMessageId())) {
162                        // Synchronize between dispatched list and removal of messages from pending list
163                        // related to remove subscription action
164                        synchronized(dispatchLock) {
165                            pending.remove();
166                            createMessageDispatch(node, node.getMessage());
167                            dispatched.add(node);
168                            onDispatch(node, node.getMessage());
169                        }
170                        return;
171                    }
172                }
173            } finally {
174                pending.release();
175            }
176        }
177        throw new JMSException(
178                "Slave broker out of sync with master: Dispatched message ("
179                        + mdn.getMessageId() + ") was not in the pending list for "
180                        + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName());
181    }
182
183    public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
184        // Handle the standard acknowledgment case.
185        boolean callDispatchMatched = false;
186        Destination destination = null;
187
188        if (!isSlave()) {
189            if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
190                // suppress unexpected ack exception in this expected case
191                LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: " + ack);
192                return;
193            }
194        }
195        if (LOG.isTraceEnabled()) {
196            LOG.trace("ack:" + ack);
197        }
198        synchronized(dispatchLock) {
199            if (ack.isStandardAck()) {
200                // First check if the ack matches the dispatched. When using failover this might
201                // not be the case. We don't ever want to ack the wrong messages.
202                assertAckMatchesDispatched(ack);
203
204                // Acknowledge all dispatched messages up till the message id of
205                // the acknowledgment.
206                int index = 0;
207                boolean inAckRange = false;
208                List<MessageReference> removeList = new ArrayList<MessageReference>();
209                for (final MessageReference node : dispatched) {
210                    MessageId messageId = node.getMessageId();
211                    if (ack.getFirstMessageId() == null
212                            || ack.getFirstMessageId().equals(messageId)) {
213                        inAckRange = true;
214                    }
215                    if (inAckRange) {
216                        // Don't remove the nodes until we are committed.
217                        if (!context.isInTransaction()) {
218                            dequeueCounter++;
219                            node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
220                            removeList.add(node);
221                        } else {
222                            registerRemoveSync(context, node);
223                        }
224                        index++;
225                        acknowledge(context, ack, node);
226                        if (ack.getLastMessageId().equals(messageId)) {
227                            // contract prefetch if dispatch required a pull
228                            if (getPrefetchSize() == 0) {
229                                // Protect extension update against parallel updates.
230                                while (true) {
231                                    int currentExtension = prefetchExtension.get();
232                                    int newExtension = Math.max(0, currentExtension - index);
233                                    if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
234                                        break;
235                                    }
236                                }
237                            } else if (usePrefetchExtension && context.isInTransaction()) {
238                                // extend prefetch window only if not a pulling consumer
239                                while (true) {
240                                    int currentExtension = prefetchExtension.get();
241                                    int newExtension = Math.max(currentExtension, index);
242                                    if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
243                                        break;
244                                    }
245                                }
246                            }
247                            destination = node.getRegionDestination();
248                            callDispatchMatched = true;
249                            break;
250                        }
251                    }
252                }
253                for (final MessageReference node : removeList) {
254                    dispatched.remove(node);
255                }
256                // this only happens after a reconnect - get an ack which is not
257                // valid
258                if (!callDispatchMatched) {
259                    LOG.warn("Could not correlate acknowledgment with dispatched message: "
260                                  + ack);
261                }
262            } else if (ack.isIndividualAck()) {
263                // Message was delivered and acknowledge - but only delete the
264                // individual message
265                for (final MessageReference node : dispatched) {
266                    MessageId messageId = node.getMessageId();
267                    if (ack.getLastMessageId().equals(messageId)) {
268                        // Don't remove the nodes until we are committed - immediateAck option
269                        if (!context.isInTransaction()) {
270                            dequeueCounter++;
271                            node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
272                            dispatched.remove(node);
273                        } else {
274                            registerRemoveSync(context, node);
275                        }
276
277                        // Protect extension update against parallel updates.
278                        while (true) {
279                            int currentExtension = prefetchExtension.get();
280                            int newExtension = Math.max(0, currentExtension - 1);
281                            if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
282                                break;
283                            }
284                        }
285                        acknowledge(context, ack, node);
286                        destination = node.getRegionDestination();
287                        callDispatchMatched = true;
288                        break;
289                    }
290                }
291            }else if (ack.isDeliveredAck()) {
292                // Message was delivered but not acknowledged: update pre-fetch
293                // counters.
294                int index = 0;
295                for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
296                    final MessageReference node = iter.next();
297                    if (node.isExpired()) {
298                        if (broker.isExpired(node)) {
299                            node.getRegionDestination().messageExpired(context, this, node);
300                        }
301                        iter.remove();
302                        node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
303                    }
304                    if (ack.getLastMessageId().equals(node.getMessageId())) {
305                        if (usePrefetchExtension) {
306                            while (true) {
307                                int currentExtension = prefetchExtension.get();
308                                int newExtension = Math.max(currentExtension, index + 1);
309                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
310                                    break;
311                                }
312                            }
313                        }
314                        destination = node.getRegionDestination();
315                        callDispatchMatched = true;
316                        break;
317                    }
318                }
319                if (!callDispatchMatched) {
320                    throw new JMSException(
321                            "Could not correlate acknowledgment with dispatched message: "
322                                    + ack);
323                }
324            } else if (ack.isRedeliveredAck()) {
325                // Message was re-delivered but it was not yet considered to be
326                // a DLQ message.
327                boolean inAckRange = false;
328                for (final MessageReference node : dispatched) {
329                    MessageId messageId = node.getMessageId();
330                    if (ack.getFirstMessageId() == null
331                            || ack.getFirstMessageId().equals(messageId)) {
332                        inAckRange = true;
333                    }
334                    if (inAckRange) {
335                        if (ack.getLastMessageId().equals(messageId)) {
336                            destination = node.getRegionDestination();
337                            callDispatchMatched = true;
338                            break;
339                        }
340                    }
341                }
342                if (!callDispatchMatched) {
343                    throw new JMSException(
344                            "Could not correlate acknowledgment with dispatched message: "
345                                    + ack);
346                }
347            } else if (ack.isPoisonAck()) {
348                // TODO: what if the message is already in a DLQ???
349                // Handle the poison ACK case: we need to send the message to a
350                // DLQ
351                if (ack.isInTransaction()) {
352                    throw new JMSException("Poison ack cannot be transacted: "
353                            + ack);
354                }
355                int index = 0;
356                boolean inAckRange = false;
357                List<MessageReference> removeList = new ArrayList<MessageReference>();
358                for (final MessageReference node : dispatched) {
359                    MessageId messageId = node.getMessageId();
360                    if (ack.getFirstMessageId() == null
361                            || ack.getFirstMessageId().equals(messageId)) {
362                        inAckRange = true;
363                    }
364                    if (inAckRange) {
365                        if (ack.getPoisonCause() != null) {
366                            node.getMessage().setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
367                                    ack.getPoisonCause().toString());
368                        }
369                        sendToDLQ(context, node);
370                        node.getRegionDestination().getDestinationStatistics()
371                                .getInflight().decrement();
372                        removeList.add(node);
373                        dequeueCounter++;
374                        index++;
375                        acknowledge(context, ack, node);
376                        if (ack.getLastMessageId().equals(messageId)) {
377                            while (true) {
378                                int currentExtension = prefetchExtension.get();
379                                int newExtension = Math.max(0, currentExtension - (index + 1));
380                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
381                                    break;
382                                }
383                            }
384                            destination = node.getRegionDestination();
385                            callDispatchMatched = true;
386                            break;
387                        }
388                    }
389                }
390                for (final MessageReference node : removeList) {
391                    dispatched.remove(node);
392                }
393                if (!callDispatchMatched) {
394                    throw new JMSException(
395                            "Could not correlate acknowledgment with dispatched message: "
396                                    + ack);
397                }
398            }
399        }
400        if (callDispatchMatched && destination != null) {
401            destination.wakeup();
402            dispatchPending();
403        } else {
404            if (isSlave()) {
405                throw new JMSException(
406                        "Slave broker out of sync with master: Acknowledgment ("
407                                + ack + ") was not in the dispatch list: "
408                                + dispatched);
409            } else {
410                LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
411                        + ack);
412            }
413        }
414    }
415
416    private void registerRemoveSync(ConnectionContext context, final MessageReference node) {
417        // setup a Synchronization to remove nodes from the
418        // dispatched list.
419        context.getTransaction().addSynchronization(
420                new Synchronization() {
421
422                    @Override
423                    public void afterCommit()
424                            throws Exception {
425                        synchronized(dispatchLock) {
426                            dequeueCounter++;
427                            dispatched.remove(node);
428                            node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
429                        }
430                    }
431
432                    @Override
433                    public void afterRollback() throws Exception {
434                        synchronized(dispatchLock) {
435                            if (isSlave()) {
436                                node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
437                            } else {
438                                // poisionAck will decrement - otherwise still inflight on client
439                            }
440                        }
441                    }
442                });
443    }
444
445    /**
446     * Checks an ack versus the contents of the dispatched list.
447     *  called with dispatchLock held
448     * @param ack
449     * @throws JMSException if it does not match
450     */
451    protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
452        MessageId firstAckedMsg = ack.getFirstMessageId();
453        MessageId lastAckedMsg = ack.getLastMessageId();
454        int checkCount = 0;
455        boolean checkFoundStart = false;
456        boolean checkFoundEnd = false;
457        for (MessageReference node : dispatched) {
458
459            if (firstAckedMsg == null) {
460                checkFoundStart = true;
461            } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
462                checkFoundStart = true;
463            }
464
465            if (checkFoundStart) {
466                checkCount++;
467            }
468
469            if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
470                checkFoundEnd = true;
471                break;
472            }
473        }
474        if (!checkFoundStart && firstAckedMsg != null)
475            throw new JMSException("Unmatched acknowledge: " + ack
476                    + "; Could not find Message-ID " + firstAckedMsg
477                    + " in dispatched-list (start of ack)");
478        if (!checkFoundEnd && lastAckedMsg != null)
479            throw new JMSException("Unmatched acknowledge: " + ack
480                    + "; Could not find Message-ID " + lastAckedMsg
481                    + " in dispatched-list (end of ack)");
482        if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
483            throw new JMSException("Unmatched acknowledge: " + ack
484                    + "; Expected message count (" + ack.getMessageCount()
485                    + ") differs from count in dispatched-list (" + checkCount
486                    + ")");
487        }
488    }
489
490    /**
491     * @param context
492     * @param node
493     * @throws IOException
494     * @throws Exception
495     */
496    protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
497        broker.getRoot().sendToDeadLetterQueue(context, node, this);
498    }
499
500    public int getInFlightSize() {
501        return dispatched.size();
502    }
503
504    /**
505     * Used to determine if the broker can dispatch to the consumer.
506     *
507     * @return
508     */
509    public boolean isFull() {
510        return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
511    }
512
513    /**
514     * @return true when 60% or more room is left for dispatching messages
515     */
516    public boolean isLowWaterMark() {
517        return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
518    }
519
520    /**
521     * @return true when 10% or less room is left for dispatching messages
522     */
523    public boolean isHighWaterMark() {
524        return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
525    }
526
527    @Override
528    public int countBeforeFull() {
529        return info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
530    }
531
532    public int getPendingQueueSize() {
533        return pending.size();
534    }
535
536    public int getDispatchedQueueSize() {
537        return dispatched.size();
538    }
539
540    public long getDequeueCounter() {
541        return dequeueCounter;
542    }
543
544    public long getDispatchedCounter() {
545        return dispatchCounter;
546    }
547
548    public long getEnqueueCounter() {
549        return enqueueCounter;
550    }
551
552    @Override
553    public boolean isRecoveryRequired() {
554        return pending.isRecoveryRequired();
555    }
556
557    public PendingMessageCursor getPending() {
558        return this.pending;
559    }
560
561    public void setPending(PendingMessageCursor pending) {
562        this.pending = pending;
563        if (this.pending!=null) {
564            this.pending.setSystemUsage(usageManager);
565            this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
566        }
567    }
568
569   @Override
570    public void add(ConnectionContext context, Destination destination) throws Exception {
571        synchronized(pendingLock) {
572            super.add(context, destination);
573            pending.add(context, destination);
574        }
575    }
576
577    @Override
578    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
579        List<MessageReference> rc = new ArrayList<MessageReference>();
580        synchronized(pendingLock) {
581            super.remove(context, destination);
582            // Here is a potential problem concerning Inflight stat:
583            // Messages not already committed or rolled back may not be removed from dispatched list at the moment
584            // Except if each commit or rollback callback action comes before remove of subscriber.
585            rc.addAll(pending.remove(context, destination));
586
587            // Synchronized to DispatchLock
588            synchronized(dispatchLock) {
589                ArrayList<MessageReference> references = new ArrayList<MessageReference>();
590                for (MessageReference r : dispatched) {
591                    if( r.getRegionDestination() == destination) {
592                        references.add(r);
593                    }
594                }
595                rc.addAll(references);
596                destination.getDestinationStatistics().getDispatched().subtract(references.size());
597                destination.getDestinationStatistics().getInflight().subtract(references.size());
598                dispatched.removeAll(references);
599            }
600        }
601        return rc;
602    }
603
604    protected void dispatchPending() throws IOException {
605        if (!isSlave()) {
606           synchronized(pendingLock) {
607                try {
608                    int numberToDispatch = countBeforeFull();
609                    if (numberToDispatch > 0) {
610                        setSlowConsumer(false);
611                        setPendingBatchSize(pending, numberToDispatch);
612                        int count = 0;
613                        pending.reset();
614                        while (pending.hasNext() && !isFull()
615                                && count < numberToDispatch) {
616                            MessageReference node = pending.next();
617                            if (node == null) {
618                                break;
619                            }
620
621                            // Synchronize between dispatched list and remove of message from pending list
622                            // related to remove subscription action
623                            synchronized(dispatchLock) {
624                                pending.remove();
625                                node.decrementReferenceCount();
626                                if( !isDropped(node) && canDispatch(node)) {
627
628                                    // Message may have been sitting in the pending
629                                    // list a while waiting for the consumer to ak the message.
630                                    if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
631                                        //increment number to dispatch
632                                        numberToDispatch++;
633                                        if (broker.isExpired(node)) {
634                                            node.getRegionDestination().messageExpired(context, this, node);
635                                        }
636                                        continue;
637                                    }
638                                    dispatch(node);
639                                    count++;
640                                }
641                            }
642                        }
643                    } else if (!isSlowConsumer()) {
644                        setSlowConsumer(true);
645                        for (Destination dest :destinations) {
646                            dest.slowConsumer(context, this);
647                        }
648                    }
649                } finally {
650                    pending.release();
651                }
652            }
653        }
654    }
655
656    protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
657        pending.setMaxBatchSize(numberToDispatch);
658    }
659
660    // called with dispatchLock held
661    protected boolean dispatch(final MessageReference node) throws IOException {
662        final Message message = node.getMessage();
663        if (message == null) {
664            return false;
665        }
666
667        okForAckAsDispatchDone.countDown();
668
669        // No reentrant lock - Patch needed to IndirectMessageReference on method lock
670        if (!isSlave()) {
671
672            MessageDispatch md = createMessageDispatch(node, message);
673            // NULL messages don't count... they don't get Acked.
674            if (node != QueueMessageReference.NULL_MESSAGE) {
675                dispatchCounter++;
676                dispatched.add(node);
677            } else {
678                while (true) {
679                    int currentExtension = prefetchExtension.get();
680                    int newExtension = Math.max(0, currentExtension - 1);
681                    if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
682                        break;
683                    }
684                }
685            }
686            if (info.isDispatchAsync()) {
687                md.setTransmitCallback(new Runnable() {
688
689                    public void run() {
690                        // Since the message gets queued up in async dispatch,
691                        // we don't want to
692                        // decrease the reference count until it gets put on the
693                        // wire.
694                        onDispatch(node, message);
695                    }
696                });
697                context.getConnection().dispatchAsync(md);
698            } else {
699                context.getConnection().dispatchSync(md);
700                onDispatch(node, message);
701            }
702            return true;
703        } else {
704            return false;
705        }
706    }
707
708    protected void onDispatch(final MessageReference node, final Message message) {
709        if (node.getRegionDestination() != null) {
710            if (node != QueueMessageReference.NULL_MESSAGE) {
711                node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
712                node.getRegionDestination().getDestinationStatistics().getInflight().increment();
713                if (LOG.isTraceEnabled()) {
714                    LOG.trace(info.getConsumerId() + " dispatched: " + message.getMessageId() + " - "
715                            + message.getDestination()  + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size());
716                }
717            }
718        }
719
720        if (info.isDispatchAsync()) {
721            try {
722                dispatchPending();
723            } catch (IOException e) {
724                context.getConnection().serviceExceptionAsync(e);
725            }
726        }
727    }
728
729    /**
730     * inform the MessageConsumer on the client to change it's prefetch
731     *
732     * @param newPrefetch
733     */
734    public void updateConsumerPrefetch(int newPrefetch) {
735        if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
736            ConsumerControl cc = new ConsumerControl();
737            cc.setConsumerId(info.getConsumerId());
738            cc.setPrefetch(newPrefetch);
739            context.getConnection().dispatchAsync(cc);
740        }
741    }
742
743    /**
744     * @param node
745     * @param message
746     * @return MessageDispatch
747     */
748    protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
749        MessageDispatch md = new MessageDispatch();
750        md.setConsumerId(info.getConsumerId());
751
752        if (node == QueueMessageReference.NULL_MESSAGE) {
753            md.setMessage(null);
754            md.setDestination(null);
755        } else {
756            md.setDestination(node.getRegionDestination().getActiveMQDestination());
757            md.setMessage(message);
758            md.setRedeliveryCounter(node.getRedeliveryCounter());
759        }
760
761        return md;
762    }
763
764    /**
765     * Use when a matched message is about to be dispatched to the client.
766     *
767     * @param node
768     * @return false if the message should not be dispatched to the client
769     *         (another sub may have already dispatched it for example).
770     * @throws IOException
771     */
772    protected abstract boolean canDispatch(MessageReference node) throws IOException;
773
774    protected abstract boolean isDropped(MessageReference node);
775
776    /**
777     * Used during acknowledgment to remove the message.
778     *
779     * @throws IOException
780     */
781    protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException;
782
783
784    public int getMaxProducersToAudit() {
785        return maxProducersToAudit;
786    }
787
788    public void setMaxProducersToAudit(int maxProducersToAudit) {
789        this.maxProducersToAudit = maxProducersToAudit;
790    }
791
792    public int getMaxAuditDepth() {
793        return maxAuditDepth;
794    }
795
796    public void setMaxAuditDepth(int maxAuditDepth) {
797        this.maxAuditDepth = maxAuditDepth;
798    }
799
800    public boolean isUsePrefetchExtension() {
801        return usePrefetchExtension;
802    }
803
804    public void setUsePrefetchExtension(boolean usePrefetchExtension) {
805        this.usePrefetchExtension = usePrefetchExtension;
806    }
807
808    protected int getPrefetchExtension() {
809        return this.prefetchExtension.get();
810    }
811}