001/**
002gxfdgvdfg * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.tcp;
018
019import java.io.DataInputStream;
020import java.io.DataOutputStream;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.net.InetAddress;
024import java.net.InetSocketAddress;
025import java.net.Socket;
026import java.net.SocketAddress;
027import java.net.SocketException;
028import java.net.SocketTimeoutException;
029import java.net.URI;
030import java.net.UnknownHostException;
031import java.util.HashMap;
032import java.util.Map;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicReference;
036
037import javax.net.SocketFactory;
038
039import org.apache.activemq.Service;
040import org.apache.activemq.thread.DefaultThreadPools;
041import org.apache.activemq.transport.Transport;
042import org.apache.activemq.transport.TransportLoggerFactory;
043import org.apache.activemq.transport.TransportThreadSupport;
044import org.apache.activemq.util.InetAddressUtil;
045import org.apache.activemq.util.IntrospectionSupport;
046import org.apache.activemq.util.ServiceStopper;
047import org.apache.activemq.wireformat.WireFormat;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * An implementation of the {@link Transport} interface using raw tcp/ip
053 *
054 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
055 *
056 */
057public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
058    private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
059    protected final URI remoteLocation;
060    protected final URI localLocation;
061    protected final WireFormat wireFormat;
062
063    protected int connectionTimeout = 30000;
064    protected int soTimeout;
065    protected int socketBufferSize = 64 * 1024;
066    protected int ioBufferSize = 8 * 1024;
067    protected boolean closeAsync=true;
068    protected Socket socket;
069    protected DataOutputStream dataOut;
070    protected DataInputStream dataIn;
071    protected TimeStampStream buffOut = null;
072
073    /**
074     * The Traffic Class to be set on the socket.
075     */
076    protected int trafficClass = 0;
077    /**
078     * Keeps track of attempts to set the Traffic Class on the socket.
079     */
080    private boolean trafficClassSet = false;
081    /**
082     * Prevents setting both the Differentiated Services and Type of Service
083     * transport options at the same time, since they share the same spot in
084     * the TCP/IP packet headers.
085     */
086    protected boolean diffServChosen = false;
087    protected boolean typeOfServiceChosen = false;
088    /**
089     * trace=true -> the Transport stack where this TcpTransport
090     * object will be, will have a TransportLogger layer
091     * trace=false -> the Transport stack where this TcpTransport
092     * object will be, will NOT have a TransportLogger layer, and therefore
093     * will never be able to print logging messages.
094     * This parameter is most probably set in Connection or TransportConnector URIs.
095     */
096    protected boolean trace = false;
097    /**
098     * Name of the LogWriter implementation to use.
099     * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
100     * This parameter is most probably set in Connection or TransportConnector URIs.
101     */
102    protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
103    /**
104     * Specifies if the TransportLogger will be manageable by JMX or not.
105     * Also, as long as there is at least 1 TransportLogger which is manageable,
106     * a TransportLoggerControl MBean will me created.
107     */
108    protected boolean dynamicManagement = false;
109    /**
110     * startLogging=true -> the TransportLogger object of the Transport stack
111     * will initially write messages to the log.
112     * startLogging=false -> the TransportLogger object of the Transport stack
113     * will initially NOT write messages to the log.
114     * This parameter only has an effect if trace == true.
115     * This parameter is most probably set in Connection or TransportConnector URIs.
116     */
117    protected boolean startLogging = true;
118    /**
119     * Specifies the port that will be used by the JMX server to manage
120     * the TransportLoggers.
121     * This should only be set in an URI by a client (producer or consumer) since
122     * a broker will already create a JMX server.
123     * It is useful for people who test a broker and clients in the same machine
124     * and want to control both via JMX; a different port will be needed.
125     */
126    protected int jmxPort = 1099;
127    protected boolean useLocalHost = false;
128    protected int minmumWireFormatVersion;
129    protected SocketFactory socketFactory;
130    protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
131
132    private Map<String, Object> socketOptions;
133    private int soLinger = Integer.MIN_VALUE;
134    private Boolean keepAlive;
135    private Boolean tcpNoDelay;
136    private Thread runnerThread;
137    private volatile int receiveCounter;
138
139    /**
140     * Connect to a remote Node - e.g. a Broker
141     *
142     * @param wireFormat
143     * @param socketFactory
144     * @param remoteLocation
145     * @param localLocation - e.g. local InetAddress and local port
146     * @throws IOException
147     * @throws UnknownHostException
148     */
149    public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
150                        URI localLocation) throws UnknownHostException, IOException {
151        this.wireFormat = wireFormat;
152        this.socketFactory = socketFactory;
153        try {
154            this.socket = socketFactory.createSocket();
155        } catch (SocketException e) {
156            this.socket = null;
157        }
158        this.remoteLocation = remoteLocation;
159        this.localLocation = localLocation;
160        setDaemon(false);
161    }
162
163    /**
164     * Initialize from a server Socket
165     *
166     * @param wireFormat
167     * @param socket
168     * @throws IOException
169     */
170    public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
171        this.wireFormat = wireFormat;
172        this.socket = socket;
173        this.remoteLocation = null;
174        this.localLocation = null;
175        setDaemon(true);
176    }
177
178    /**
179     * A one way asynchronous send
180     */
181    public void oneway(Object command) throws IOException {
182        checkStarted();
183        wireFormat.marshal(command, dataOut);
184        dataOut.flush();
185    }
186
187    /**
188     * @return pretty print of 'this'
189     */
190    @Override
191    public String toString() {
192        return "" + (socket.isConnected() ? "tcp://" + socket.getInetAddress() + ":" + socket.getPort()
193                : (localLocation != null ? localLocation : remoteLocation)) ;
194    }
195
196    /**
197     * reads packets from a Socket
198     */
199    public void run() {
200        LOG.trace("TCP consumer thread for " + this + " starting");
201        this.runnerThread=Thread.currentThread();
202        try {
203            while (!isStopped()) {
204                doRun();
205            }
206        } catch (IOException e) {
207            stoppedLatch.get().countDown();
208            onException(e);
209        } catch (Throwable e){
210            stoppedLatch.get().countDown();
211            IOException ioe=new IOException("Unexpected error occured: " + e);
212            ioe.initCause(e);
213            onException(ioe);
214        }finally {
215            stoppedLatch.get().countDown();
216        }
217    }
218
219    protected void doRun() throws IOException {
220        try {
221            Object command = readCommand();
222            doConsume(command);
223        } catch (SocketTimeoutException e) {
224        } catch (InterruptedIOException e) {
225        }
226    }
227
228    protected Object readCommand() throws IOException {
229        return wireFormat.unmarshal(dataIn);
230    }
231
232    // Properties
233    // -------------------------------------------------------------------------
234    public String getDiffServ() {
235        // This is the value requested by the user by setting the Tcp Transport
236        // options. If the socket hasn't been created, then this value may not
237        // reflect the value returned by Socket.getTrafficClass().
238        return Integer.toString(this.trafficClass);
239    }
240
241    public void setDiffServ(String diffServ) throws IllegalArgumentException {
242        this.trafficClass = QualityOfServiceUtils.getDSCP(diffServ);
243        this.diffServChosen = true;
244    }
245
246    public int getTypeOfService() {
247        // This is the value requested by the user by setting the Tcp Transport
248        // options. If the socket hasn't been created, then this value may not
249        // reflect the value returned by Socket.getTrafficClass().
250        return this.trafficClass;
251    }
252
253    public void setTypeOfService(int typeOfService) {
254        this.trafficClass = QualityOfServiceUtils.getToS(typeOfService);
255        this.typeOfServiceChosen = true;
256    }
257
258    public boolean isTrace() {
259        return trace;
260    }
261
262    public void setTrace(boolean trace) {
263        this.trace = trace;
264    }
265
266    public String getLogWriterName() {
267        return logWriterName;
268    }
269
270    public void setLogWriterName(String logFormat) {
271        this.logWriterName = logFormat;
272    }
273
274    public boolean isDynamicManagement() {
275        return dynamicManagement;
276    }
277
278    public void setDynamicManagement(boolean useJmx) {
279        this.dynamicManagement = useJmx;
280    }
281
282    public boolean isStartLogging() {
283        return startLogging;
284    }
285
286    public void setStartLogging(boolean startLogging) {
287        this.startLogging = startLogging;
288    }
289
290    public int getJmxPort() {
291        return jmxPort;
292    }
293
294    public void setJmxPort(int jmxPort) {
295        this.jmxPort = jmxPort;
296    }
297
298    public int getMinmumWireFormatVersion() {
299        return minmumWireFormatVersion;
300    }
301
302    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
303        this.minmumWireFormatVersion = minmumWireFormatVersion;
304    }
305
306    public boolean isUseLocalHost() {
307        return useLocalHost;
308    }
309
310    /**
311     * Sets whether 'localhost' or the actual local host name should be used to
312     * make local connections. On some operating systems such as Macs its not
313     * possible to connect as the local host name so localhost is better.
314     */
315    public void setUseLocalHost(boolean useLocalHost) {
316        this.useLocalHost = useLocalHost;
317    }
318
319    public int getSocketBufferSize() {
320        return socketBufferSize;
321    }
322
323    /**
324     * Sets the buffer size to use on the socket
325     */
326    public void setSocketBufferSize(int socketBufferSize) {
327        this.socketBufferSize = socketBufferSize;
328    }
329
330    public int getSoTimeout() {
331        return soTimeout;
332    }
333
334    /**
335     * Sets the socket timeout
336     */
337    public void setSoTimeout(int soTimeout) {
338        this.soTimeout = soTimeout;
339    }
340
341    public int getConnectionTimeout() {
342        return connectionTimeout;
343    }
344
345    /**
346     * Sets the timeout used to connect to the socket
347     */
348    public void setConnectionTimeout(int connectionTimeout) {
349        this.connectionTimeout = connectionTimeout;
350    }
351
352    public Boolean getKeepAlive() {
353        return keepAlive;
354    }
355
356    /**
357     * Enable/disable TCP KEEP_ALIVE mode
358     */
359    public void setKeepAlive(Boolean keepAlive) {
360        this.keepAlive = keepAlive;
361    }
362
363    /**
364     * Enable/disable soLinger
365     * @param soLinger enabled if > -1, disabled if == -1, system default otherwise
366     */
367    public void setSoLinger(int soLinger) {
368        this.soLinger = soLinger;
369    }
370
371    public int getSoLinger() {
372        return soLinger;
373    }
374
375    public Boolean getTcpNoDelay() {
376        return tcpNoDelay;
377    }
378
379    /**
380     * Enable/disable the TCP_NODELAY option on the socket
381     */
382    public void setTcpNoDelay(Boolean tcpNoDelay) {
383        this.tcpNoDelay = tcpNoDelay;
384    }
385
386    /**
387     * @return the ioBufferSize
388     */
389    public int getIoBufferSize() {
390        return this.ioBufferSize;
391    }
392
393    /**
394     * @param ioBufferSize the ioBufferSize to set
395     */
396    public void setIoBufferSize(int ioBufferSize) {
397        this.ioBufferSize = ioBufferSize;
398    }
399
400    /**
401     * @return the closeAsync
402     */
403    public boolean isCloseAsync() {
404        return closeAsync;
405    }
406
407    /**
408     * @param closeAsync the closeAsync to set
409     */
410    public void setCloseAsync(boolean closeAsync) {
411        this.closeAsync = closeAsync;
412    }
413
414    // Implementation methods
415    // -------------------------------------------------------------------------
416    protected String resolveHostName(String host) throws UnknownHostException {
417        if (isUseLocalHost()) {
418            String localName = InetAddressUtil.getLocalHostName();
419            if (localName != null && localName.equals(host)) {
420                return "localhost";
421            }
422        }
423        return host;
424    }
425
426    /**
427     * Configures the socket for use
428     *
429     * @param sock
430     * @throws SocketException, IllegalArgumentException if setting the options
431     *         on the socket failed.
432     */
433    protected void initialiseSocket(Socket sock) throws SocketException,
434            IllegalArgumentException {
435        if (socketOptions != null) {
436            IntrospectionSupport.setProperties(socket, socketOptions);
437        }
438
439        try {
440            sock.setReceiveBufferSize(socketBufferSize);
441            sock.setSendBufferSize(socketBufferSize);
442        } catch (SocketException se) {
443            LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
444            LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
445        }
446        sock.setSoTimeout(soTimeout);
447
448        if (keepAlive != null) {
449            sock.setKeepAlive(keepAlive.booleanValue());
450        }
451
452        if (soLinger > -1) {
453            sock.setSoLinger(true, soLinger);
454        } else if (soLinger == -1) {
455            sock.setSoLinger(false, 0);
456        }
457        if (tcpNoDelay != null) {
458            sock.setTcpNoDelay(tcpNoDelay.booleanValue());
459        }
460        if (!this.trafficClassSet) {
461            this.trafficClassSet = setTrafficClass(sock);
462        }
463    }
464
465    @Override
466    protected void doStart() throws Exception {
467        connect();
468        stoppedLatch.set(new CountDownLatch(1));
469        super.doStart();
470    }
471
472    protected void connect() throws Exception {
473
474        if (socket == null && socketFactory == null) {
475            throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
476        }
477
478        InetSocketAddress localAddress = null;
479        InetSocketAddress remoteAddress = null;
480
481        if (localLocation != null) {
482            localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()),
483                                                 localLocation.getPort());
484        }
485
486        if (remoteLocation != null) {
487            String host = resolveHostName(remoteLocation.getHost());
488            remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
489        }
490        // Set the traffic class before the socket is connected when possible so
491        // that the connection packets are given the correct traffic class.
492        this.trafficClassSet = setTrafficClass(socket);
493
494        if (socket != null) {
495
496            if (localAddress != null) {
497                socket.bind(localAddress);
498            }
499
500            // If it's a server accepted socket.. we don't need to connect it
501            // to a remote address.
502            if (remoteAddress != null) {
503                if (connectionTimeout >= 0) {
504                    socket.connect(remoteAddress, connectionTimeout);
505                } else {
506                    socket.connect(remoteAddress);
507                }
508            }
509
510        } else {
511            // For SSL sockets.. you can't create an unconnected socket :(
512            // This means the timout option are not supported either.
513            if (localAddress != null) {
514                socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(),
515                                                    localAddress.getAddress(), localAddress.getPort());
516            } else {
517                socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
518            }
519        }
520
521        initialiseSocket(socket);
522        initializeStreams();
523    }
524
525    @Override
526    protected void doStop(ServiceStopper stopper) throws Exception {
527        if (LOG.isDebugEnabled()) {
528            LOG.debug("Stopping transport " + this);
529        }
530
531        // Closing the streams flush the sockets before closing.. if the socket
532        // is hung.. then this hangs the close.
533        // closeStreams();
534        if (socket != null) {
535            if (closeAsync) {
536                //closing the socket can hang also
537                final CountDownLatch latch = new CountDownLatch(1);
538
539                DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
540                    public void run() {
541                        try {
542                            socket.close();
543                        } catch (IOException e) {
544                            if (LOG.isDebugEnabled()) {
545                                LOG.debug("Caught exception closing socket", e);
546                            }
547                        } finally {
548                            latch.countDown();
549                        }
550                    }
551                });
552
553                try {
554                    latch.await(1,TimeUnit.SECONDS);
555                } catch (InterruptedException e) {
556                    Thread.currentThread().interrupt();
557                }
558
559            } else {
560
561                try {
562                    socket.close();
563                } catch (IOException e) {
564                    LOG.debug("Caught exception closing socket",e);
565                }
566            }
567        }
568    }
569
570    /**
571     * Override so that stop() blocks until the run thread is no longer running.
572     */
573    @Override
574    public void stop() throws Exception {
575        super.stop();
576        CountDownLatch countDownLatch = stoppedLatch.get();
577        if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
578            countDownLatch.await(1,TimeUnit.SECONDS);
579        }
580    }
581
582    protected void initializeStreams() throws Exception {
583        TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) {
584            @Override
585            public int read() throws IOException {
586                receiveCounter++;
587                return super.read();
588            }
589            @Override
590            public int read(byte[] b, int off, int len) throws IOException {
591                receiveCounter++;
592                return super.read(b, off, len);
593            }
594            @Override
595            public long skip(long n) throws IOException {
596                receiveCounter++;
597                return super.skip(n);
598            }
599            @Override
600            protected void fill() throws IOException {
601                receiveCounter++;
602                super.fill();
603            }
604        };
605        this.dataIn = new DataInputStream(buffIn);
606        TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
607        this.dataOut = new DataOutputStream(outputStream);
608        this.buffOut = outputStream;
609    }
610
611    protected void closeStreams() throws IOException {
612        if (dataOut != null) {
613            dataOut.close();
614        }
615        if (dataIn != null) {
616            dataIn.close();
617        }
618    }
619
620    public void setSocketOptions(Map<String, Object> socketOptions) {
621        this.socketOptions = new HashMap<String, Object>(socketOptions);
622    }
623
624    public String getRemoteAddress() {
625        if (socket != null) {
626            SocketAddress address = socket.getRemoteSocketAddress();
627            if (address instanceof InetSocketAddress) {
628                return "tcp://" + ((InetSocketAddress)address).getAddress().getHostAddress() + ":" + ((InetSocketAddress)address).getPort();
629            } else {
630                return "" + socket.getRemoteSocketAddress();
631            }
632        }
633        return null;
634    }
635
636    @Override
637    public <T> T narrow(Class<T> target) {
638        if (target == Socket.class) {
639            return target.cast(socket);
640        } else if ( target == TimeStampStream.class) {
641            return target.cast(buffOut);
642        }
643        return super.narrow(target);
644    }
645
646    public int getReceiveCounter() {
647        return receiveCounter;
648    }
649
650    /**
651     * @param sock The socket on which to set the Traffic Class.
652     * @return Whether or not the Traffic Class was set on the given socket.
653     * @throws SocketException if the system does not support setting the
654     *         Traffic Class.
655     * @throws IllegalArgumentException if both the Differentiated Services and
656     *         Type of Services transport options have been set on the same
657     *         connection.
658     */
659    private boolean setTrafficClass(Socket sock) throws SocketException,
660            IllegalArgumentException {
661        if (sock == null
662            || (!this.diffServChosen && !this.typeOfServiceChosen)) {
663            return false;
664        }
665        if (this.diffServChosen && this.typeOfServiceChosen) {
666            throw new IllegalArgumentException("Cannot set both the "
667                + " Differentiated Services and Type of Services transport "
668                + " options on the same connection.");
669        }
670
671        sock.setTrafficClass(this.trafficClass);
672
673        int resultTrafficClass = sock.getTrafficClass();
674        if (this.trafficClass != resultTrafficClass) {
675            // In the case where the user has specified the ECN bits (e.g. in
676            // Type of Service) but the system won't allow the ECN bits to be
677            // set or in the case where setting the traffic class failed for
678            // other reasons, emit a warning.
679            if ((this.trafficClass >> 2) == (resultTrafficClass >> 2)
680                    && (this.trafficClass & 3) != (resultTrafficClass & 3)) {
681                LOG.warn("Attempted to set the Traffic Class to "
682                    + this.trafficClass + " but the result Traffic Class was "
683                    + resultTrafficClass + ". Please check that your system "
684                    + "allows you to set the ECN bits (the first two bits).");
685            } else {
686                LOG.warn("Attempted to set the Traffic Class to "
687                    + this.trafficClass + " but the result Traffic Class was "
688                    + resultTrafficClass + ". Please check that your system "
689                         + "supports java.net.setTrafficClass.");
690            }
691            return false;
692        }
693        // Reset the guards that prevent both the Differentiated Services
694        // option and the Type of Service option from being set on the same
695        // connection.
696        this.diffServChosen = false;
697        this.typeOfServiceChosen = false;
698        return true;
699    }
700
701    public WireFormat getWireFormat() {
702        return wireFormat;
703    }
704}