001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.udp;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.net.UnknownHostException;
023import java.util.HashMap;
024import java.util.Map;
025
026import org.apache.activemq.openwire.OpenWireFormat;
027import org.apache.activemq.transport.CommandJoiner;
028import org.apache.activemq.transport.InactivityMonitor;
029import org.apache.activemq.transport.Transport;
030import org.apache.activemq.transport.TransportFactory;
031import org.apache.activemq.transport.TransportLoggerFactory;
032import org.apache.activemq.transport.TransportServer;
033import org.apache.activemq.transport.reliable.DefaultReplayStrategy;
034import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
035import org.apache.activemq.transport.reliable.ReliableTransport;
036import org.apache.activemq.transport.reliable.ReplayStrategy;
037import org.apache.activemq.transport.reliable.Replayer;
038import org.apache.activemq.transport.tcp.TcpTransportFactory;
039import org.apache.activemq.util.IOExceptionSupport;
040import org.apache.activemq.util.IntSequenceGenerator;
041import org.apache.activemq.util.IntrospectionSupport;
042import org.apache.activemq.util.URISupport;
043import org.apache.activemq.wireformat.WireFormat;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
049 * 
050 */
051public class UdpTransportFactory extends TransportFactory {
052    
053    private static final Logger log = LoggerFactory.getLogger(TcpTransportFactory.class);
054    
055    public TransportServer doBind(final URI location) throws IOException {
056        try {
057            Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
058            if (options.containsKey("port")) {
059                throw new IllegalArgumentException("The port property cannot be specified on a UDP server transport - please use the port in the URI syntax");
060            }
061            WireFormat wf = createWireFormat(options);
062            int port = location.getPort();
063            OpenWireFormat openWireFormat = asOpenWireFormat(wf);
064            UdpTransport transport = (UdpTransport) createTransport(location.getPort(), wf);
065
066            Transport configuredTransport = configure(transport, wf, options, true);
067            UdpTransportServer server = new UdpTransportServer(location, transport, configuredTransport, createReplayStrategy());
068            return server;
069        } catch (URISyntaxException e) {
070            throw IOExceptionSupport.create(e);
071        } catch (Exception e) {
072            throw IOExceptionSupport.create(e);
073        }
074    }
075
076    public Transport configure(Transport transport, WireFormat format, Map options) throws Exception {
077        return configure(transport, format, options, false);
078    }
079
080    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
081        IntrospectionSupport.setProperties(transport, options);
082        final UdpTransport udpTransport = (UdpTransport)transport;
083
084        // deal with fragmentation
085        transport = new CommandJoiner(transport, asOpenWireFormat(format));
086
087        if (udpTransport.isTrace()) {
088            try {
089                transport = TransportLoggerFactory.getInstance().createTransportLogger(transport);
090            } catch (Throwable e) {
091                log.error("Could not create TransportLogger object for: " + TransportLoggerFactory.defaultLogWriterName + ", reason: " + e, e);
092            }
093        }
094
095        transport = new InactivityMonitor(transport, format);
096
097        if (format instanceof OpenWireFormat) {
098            transport = configureClientSideNegotiator(transport, format, udpTransport);
099        }
100
101        return transport;
102    }
103
104    protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
105        OpenWireFormat wireFormat = asOpenWireFormat(wf);
106        return new UdpTransport(wireFormat, location);
107    }
108
109    protected Transport createTransport(int port, WireFormat wf) throws UnknownHostException, IOException {
110        OpenWireFormat wireFormat = asOpenWireFormat(wf);
111        return new UdpTransport(wireFormat, port);
112    }
113
114    /**
115     * Configures the transport
116     * 
117     * @param acceptServer true if this transport is used purely as an 'accept'
118     *                transport for new connections which work like TCP
119     *                SocketServers where new connections spin up a new separate
120     *                UDP transport
121     */
122    protected Transport configure(Transport transport, WireFormat format, Map options, boolean acceptServer) throws Exception {
123        IntrospectionSupport.setProperties(transport, options);
124        UdpTransport udpTransport = (UdpTransport)transport;
125
126        OpenWireFormat openWireFormat = asOpenWireFormat(format);
127
128        if (udpTransport.isTrace()) {
129            transport = TransportLoggerFactory.getInstance().createTransportLogger(transport);
130        }
131
132        transport = new InactivityMonitor(transport, format);
133
134        if (!acceptServer && format instanceof OpenWireFormat) {
135            transport = configureClientSideNegotiator(transport, format, udpTransport);
136        }
137
138        // deal with fragmentation
139
140        if (acceptServer) {
141            // lets not support a buffer of messages to enable reliable
142            // messaging on the 'accept server' transport
143            udpTransport.setReplayEnabled(false);
144
145            // we don't want to do reliable checks on this transport as we
146            // delegate to one that does
147            transport = new CommandJoiner(transport, openWireFormat);
148            return transport;
149        } else {
150            ReliableTransport reliableTransport = new ReliableTransport(transport, udpTransport);
151            Replayer replayer = reliableTransport.getReplayer();
152            reliableTransport.setReplayStrategy(createReplayStrategy(replayer));
153
154            // Joiner must be on outside as the inbound messages must be
155            // processed by the reliable transport first
156            return new CommandJoiner(reliableTransport, openWireFormat);
157        }
158    }
159
160    protected ReplayStrategy createReplayStrategy(Replayer replayer) {
161        if (replayer != null) {
162            return new DefaultReplayStrategy(5);
163        }
164        return new ExceptionIfDroppedReplayStrategy(1);
165    }
166
167    protected ReplayStrategy createReplayStrategy() {
168        return new DefaultReplayStrategy(5);
169    }
170
171    protected Transport configureClientSideNegotiator(Transport transport, WireFormat format, final UdpTransport udpTransport) {
172        return new ResponseRedirectInterceptor(transport, udpTransport);
173    }
174
175    protected OpenWireFormat asOpenWireFormat(WireFormat wf) {
176        OpenWireFormat answer = (OpenWireFormat)wf;
177        return answer;
178    }
179}