001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.discovery;
018
019import java.net.URI;
020import java.net.URISyntaxException;
021import java.util.Map;
022import java.util.concurrent.ConcurrentHashMap;
023import org.apache.activemq.command.DiscoveryEvent;
024import org.apache.activemq.transport.CompositeTransport;
025import org.apache.activemq.transport.TransportFilter;
026import org.apache.activemq.util.ServiceStopper;
027import org.apache.activemq.util.URISupport;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * A {@link ReliableTransportChannel} which uses a {@link DiscoveryAgent} to
033 * discover remote broker instances and dynamically connect to them.
034 * 
035 * 
036 */
037public class DiscoveryTransport extends TransportFilter implements DiscoveryListener {
038
039    private static final Logger LOG = LoggerFactory.getLogger(DiscoveryTransport.class);
040
041    private final CompositeTransport next;
042    private DiscoveryAgent discoveryAgent;
043    private final ConcurrentHashMap<String, URI> serviceURIs = new ConcurrentHashMap<String, URI>();
044
045    private Map<String, String> parameters;
046
047    public DiscoveryTransport(CompositeTransport next) {
048        super(next);
049        this.next = next;
050    }
051
052    @Override
053    public void start() throws Exception {
054        if (discoveryAgent == null) {
055            throw new IllegalStateException("discoveryAgent not configured");
056        }
057
058        // lets pass into the agent the broker name and connection details
059        discoveryAgent.setDiscoveryListener(this);
060        discoveryAgent.start();
061        next.start();
062    }
063
064    @Override
065    public void stop() throws Exception {
066        ServiceStopper ss = new ServiceStopper();
067        ss.stop(discoveryAgent);
068        ss.stop(next);
069        ss.throwFirstException();
070    }
071
072    public void onServiceAdd(DiscoveryEvent event) {
073        String url = event.getServiceName();
074        if (url != null) {
075            try {
076                URI uri = new URI(url);
077                LOG.info("Adding new broker connection URL: " + uri);
078                uri = URISupport.applyParameters(uri, parameters, DISCOVERED_OPTION_PREFIX);
079                serviceURIs.put(event.getServiceName(), uri);
080                next.add(false,new URI[] {uri});
081            } catch (URISyntaxException e) {
082                LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
083            }
084        }
085    }
086
087    public void onServiceRemove(DiscoveryEvent event) {
088        URI uri = serviceURIs.get(event.getServiceName());
089        if (uri != null) {
090            next.remove(false,new URI[] {uri});
091        }
092    }
093
094    public DiscoveryAgent getDiscoveryAgent() {
095        return discoveryAgent;
096    }
097
098    public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
099        this.discoveryAgent = discoveryAgent;
100    }
101
102    public void setParameters(Map<String, String> parameters) {
103       this.parameters = parameters;      
104    }
105
106}