001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.stomp;
018
019import org.apache.activemq.command.*;
020
021import javax.jms.JMSException;
022import java.io.IOException;
023import java.util.Iterator;
024import java.util.LinkedHashMap;
025import java.util.LinkedList;
026import java.util.Map;
027import java.util.Map.Entry;
028
029/**
030 * Keeps track of the STOMP subscription so that acking is correctly done.
031 *
032 * @author <a href="http://hiramchirino.com">chirino</a>
033 */
034public class StompSubscription {
035
036    public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
037    public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
038    public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
039
040    protected final ProtocolConverter protocolConverter;
041    protected final String subscriptionId;
042    protected final ConsumerInfo consumerInfo;
043
044    protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
045    protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
046
047    protected String ackMode = AUTO_ACK;
048    protected ActiveMQDestination destination;
049    protected String transformation;
050
051    public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
052        this.protocolConverter = stompTransport;
053        this.subscriptionId = subscriptionId;
054        this.consumerInfo = consumerInfo;
055        this.transformation = transformation;
056    }
057
058    void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
059        ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
060        if (ackMode == CLIENT_ACK) {
061            synchronized (this) {
062                dispatchedMessage.put(message.getMessageId(), md);
063            }
064        } else if (ackMode == INDIVIDUAL_ACK) {
065            synchronized (this) {
066                dispatchedMessage.put(message.getMessageId(), md);
067            }
068        } else if (ackMode == AUTO_ACK) {
069            MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
070            protocolConverter.getStompTransport().sendToActiveMQ(ack);
071        }
072
073        boolean ignoreTransformation = false;
074
075        if (transformation != null && !( message instanceof ActiveMQBytesMessage ) ) {
076               message.setReadOnlyProperties(false);
077            message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
078        } else {
079            if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
080                ignoreTransformation = true;
081            }
082        }
083
084        StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation);
085
086        command.setAction(Stomp.Responses.MESSAGE);
087        if (subscriptionId != null) {
088            command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
089        }
090
091        protocolConverter.getStompTransport().sendToStomp(command);
092    }
093
094    synchronized void onStompAbort(TransactionId transactionId) {
095        unconsumedMessage.clear();
096    }
097
098    synchronized void onStompCommit(TransactionId transactionId) {
099        for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
100            @SuppressWarnings("rawtypes")
101            Map.Entry entry = (Entry)iter.next();
102            MessageDispatch msg = (MessageDispatch)entry.getValue();
103            if (unconsumedMessage.contains(msg)) {
104                iter.remove();
105            }
106        }
107
108        if (!unconsumedMessage.isEmpty()) {
109            MessageAck ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
110            protocolConverter.getStompTransport().sendToActiveMQ(ack);
111            unconsumedMessage.clear();
112        }
113    }
114
115    synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
116
117        MessageId msgId = new MessageId(messageId);
118
119        if (!dispatchedMessage.containsKey(msgId)) {
120            return null;
121        }
122
123        MessageAck ack = new MessageAck();
124        ack.setDestination(consumerInfo.getDestination());
125        ack.setConsumerId(consumerInfo.getConsumerId());
126
127        if (ackMode == CLIENT_ACK) {
128            if (transactionId == null) {
129                ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
130            } else {
131                ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
132            }
133            int count = 0;
134            for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
135
136                @SuppressWarnings("rawtypes")
137                Map.Entry entry = (Entry)iter.next();
138                MessageId id = (MessageId)entry.getKey();
139                MessageDispatch msg = (MessageDispatch)entry.getValue();
140
141                if (transactionId != null) {
142                    if (!unconsumedMessage.contains(msg)) {
143                        unconsumedMessage.add(msg);
144                        count++;
145                    }
146                } else {
147                    iter.remove();
148                    count++;
149                }
150
151                if (id.equals(msgId)) {
152                    ack.setLastMessageId(id);
153                    break;
154                }
155            }
156            ack.setMessageCount(count);
157            if (transactionId != null) {
158                ack.setTransactionId(transactionId);
159            }
160
161        } else if (ackMode == INDIVIDUAL_ACK) {
162            ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
163            ack.setMessageID(msgId);
164            if (transactionId != null) {
165                unconsumedMessage.add(dispatchedMessage.get(msgId));
166                ack.setTransactionId(transactionId);
167            }
168            dispatchedMessage.remove(msgId);
169        }
170        return ack;
171    }
172
173    public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
174
175        MessageId msgId = new MessageId(messageId);
176
177        if (!dispatchedMessage.containsKey(msgId)) {
178            return null;
179        }
180
181        MessageAck ack = new MessageAck();
182        ack.setDestination(consumerInfo.getDestination());
183        ack.setConsumerId(consumerInfo.getConsumerId());
184        ack.setAckType(MessageAck.POSION_ACK_TYPE);
185        ack.setMessageID(msgId);
186        if (transactionId != null) {
187            unconsumedMessage.add(dispatchedMessage.get(msgId));
188            ack.setTransactionId(transactionId);
189        }
190        dispatchedMessage.remove(msgId);
191
192        return ack;
193    }
194
195    public String getAckMode() {
196        return ackMode;
197    }
198
199    public void setAckMode(String ackMode) {
200        this.ackMode = ackMode;
201    }
202
203    public String getSubscriptionId() {
204        return subscriptionId;
205    }
206
207    public void setDestination(ActiveMQDestination destination) {
208        this.destination = destination;
209    }
210
211    public ActiveMQDestination getDestination() {
212        return destination;
213    }
214
215    public ConsumerInfo getConsumerInfo() {
216        return consumerInfo;
217    }
218}