001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.mqtt;
018
019import java.io.IOException;
020
021import javax.jms.JMSException;
022import org.apache.activemq.transport.tcp.TcpTransport;
023import org.fusesource.hawtbuf.DataByteArrayInputStream;
024import org.fusesource.hawtbuf.DataByteArrayOutputStream;
025import org.fusesource.mqtt.codec.*;
026
027public class MQTTCodec {
028
029    TcpTransport transport;
030
031    DataByteArrayOutputStream currentCommand = new DataByteArrayOutputStream();
032    boolean processedHeader = false;
033    String action;
034    byte header;
035    int contentLength = -1;
036    int previousByte = -1;
037    int payLoadRead = 0;
038
039    public MQTTCodec(TcpTransport transport) {
040        this.transport = transport;
041    }
042
043    public void parse(DataByteArrayInputStream input, int readSize) throws Exception {
044        int i = 0;
045        byte b;
046        while (i++ < readSize) {
047            b = input.readByte();
048            // skip repeating nulls
049            if (!processedHeader && b == 0) {
050                previousByte = 0;
051                continue;
052            }
053
054            if (!processedHeader) {
055                i += processHeader(b, input);
056                if (contentLength == 0) {
057                    processCommand();
058                }
059
060            } else {
061
062                if (contentLength == -1) {
063                    // end of command reached, unmarshal
064                    if (b == 0) {
065                        processCommand();
066                    } else {
067                        currentCommand.write(b);
068                    }
069                } else {
070                    // read desired content length
071                    if (payLoadRead == contentLength) {
072                        processCommand();
073                        i += processHeader(b, input);
074                    } else {
075                        currentCommand.write(b);
076                        payLoadRead++;
077                    }
078                }
079            }
080
081            previousByte = b;
082        }
083        if (processedHeader && payLoadRead == contentLength) {
084            processCommand();
085        }
086    }
087
088    /**
089     * sets the content length
090     *
091     * @return number of bytes read
092     */
093    private int processHeader(byte header, DataByteArrayInputStream input) {
094        this.header = header;
095        byte digit;
096        int multiplier = 1;
097        int read = 0;
098        int length = 0;
099        do {
100            digit = input.readByte();
101            length += (digit & 0x7F) * multiplier;
102            multiplier <<= 7;
103            read++;
104        } while ((digit & 0x80) != 0);
105
106        contentLength = length;
107        processedHeader = true;
108        return read;
109    }
110
111
112    private void processCommand() throws Exception {
113        MQTTFrame frame = new MQTTFrame(currentCommand.toBuffer().deepCopy()).header(header);
114        transport.doConsume(frame);
115        processedHeader = false;
116        currentCommand.reset();
117        contentLength = -1;
118        payLoadRead = 0;
119    }
120
121    public static String commandType(byte header) throws IOException, JMSException {
122
123        byte messageType = (byte) ((header & 0xF0) >>> 4);
124        switch (messageType) {
125            case PINGREQ.TYPE: {
126                return "PINGREQ";
127            }
128            case CONNECT.TYPE: {
129                return "CONNECT";
130            }
131            case DISCONNECT.TYPE: {
132                return "DISCONNECT";
133            }
134            case SUBSCRIBE.TYPE: {
135                return "SUBSCRIBE";
136            }
137            case UNSUBSCRIBE.TYPE: {
138                return "UNSUBSCRIBE";
139            }
140            case PUBLISH.TYPE: {
141                return "PUBLISH";
142            }
143            case PUBACK.TYPE: {
144                return "PUBACK";
145            }
146            case PUBREC.TYPE: {
147                return "PUBREC";
148            }
149            case PUBREL.TYPE: {
150                return "PUBREL";
151            }
152            case PUBCOMP.TYPE: {
153                return "PUBCOMP";
154            }
155            default:
156                return "UNKNOWN";
157        }
158
159    }
160
161}