001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.mqtt;
018
019import java.io.DataOutputStream;
020import java.io.EOFException;
021import java.io.IOException;
022import java.net.Socket;
023import java.net.URI;
024import java.net.UnknownHostException;
025import java.nio.ByteBuffer;
026import java.nio.channels.SelectionKey;
027import java.nio.channels.SocketChannel;
028
029import javax.net.SocketFactory;
030import org.apache.activemq.transport.nio.NIOOutputStream;
031import org.apache.activemq.transport.nio.SelectorManager;
032import org.apache.activemq.transport.nio.SelectorSelection;
033import org.apache.activemq.transport.tcp.TcpTransport;
034import org.apache.activemq.util.IOExceptionSupport;
035import org.apache.activemq.util.ServiceStopper;
036import org.apache.activemq.wireformat.WireFormat;
037import org.fusesource.hawtbuf.DataByteArrayInputStream;
038
039/**
040 * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using MQTT over NIO
041 */
042public class MQTTNIOTransport extends TcpTransport {
043
044    private SocketChannel channel;
045    private SelectorSelection selection;
046
047    private ByteBuffer inputBuffer;
048    MQTTCodec codec;
049
050    public MQTTNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
051        super(wireFormat, socketFactory, remoteLocation, localLocation);
052    }
053
054    public MQTTNIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
055        super(wireFormat, socket);
056    }
057
058    protected void initializeStreams() throws IOException {
059        channel = socket.getChannel();
060        channel.configureBlocking(false);
061        // listen for events telling us when the socket is readable.
062        selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
063            public void onSelect(SelectorSelection selection) {
064                if (!isStopped()) {
065                    serviceRead();
066                }
067            }
068
069            public void onError(SelectorSelection selection, Throwable error) {
070                if (error instanceof IOException) {
071                    onException((IOException) error);
072                } else {
073                    onException(IOExceptionSupport.create(error));
074                }
075            }
076        });
077
078        inputBuffer = ByteBuffer.allocate(8 * 1024);
079        NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
080        this.dataOut = new DataOutputStream(outPutStream);
081        this.buffOut = outPutStream;
082        codec = new MQTTCodec(this);
083    }
084
085    private void serviceRead() {
086        try {
087
088            while (isStarted()) {
089                // read channel
090                int readSize = channel.read(inputBuffer);
091                // channel is closed, cleanup
092                if (readSize == -1) {
093                    onException(new EOFException());
094                    selection.close();
095                    break;
096                }
097                // nothing more to read, break
098                if (readSize == 0) {
099                    break;
100                }
101
102                inputBuffer.flip();
103                DataByteArrayInputStream dis = new DataByteArrayInputStream(inputBuffer.array());
104                codec.parse(dis, readSize);
105
106                // clear the buffer
107                inputBuffer.clear();
108
109            }
110        } catch (IOException e) {
111            onException(e);
112        } catch (Throwable e) {
113            onException(IOExceptionSupport.create(e));
114        }
115    }
116
117    protected void doStart() throws Exception {
118        connect();
119        selection.setInterestOps(SelectionKey.OP_READ);
120        selection.enable();
121    }
122
123    protected void doStop(ServiceStopper stopper) throws Exception {
124        try {
125            if (selection != null) {
126                selection.close();
127            }
128        } finally {
129            super.doStop(stopper);
130        }
131    }
132}