001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network.jms;
018
019import javax.jms.Connection;
020import javax.jms.Destination;
021import javax.jms.ExceptionListener;
022import javax.jms.JMSException;
023import javax.jms.Queue;
024import javax.jms.QueueConnection;
025import javax.jms.QueueConnectionFactory;
026import javax.jms.QueueSession;
027import javax.jms.Session;
028import javax.naming.NamingException;
029
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * A Bridge to other JMS Queue providers
035 *
036 * @org.apache.xbean.XBean
037 */
038public class JmsQueueConnector extends JmsConnector {
039    private static final Logger LOG = LoggerFactory.getLogger(JmsQueueConnector.class);
040    private String outboundQueueConnectionFactoryName;
041    private String localConnectionFactoryName;
042    private QueueConnectionFactory outboundQueueConnectionFactory;
043    private QueueConnectionFactory localQueueConnectionFactory;
044    private InboundQueueBridge[] inboundQueueBridges;
045    private OutboundQueueBridge[] outboundQueueBridges;
046
047    /**
048     * @return Returns the inboundQueueBridges.
049     */
050    public InboundQueueBridge[] getInboundQueueBridges() {
051        return inboundQueueBridges;
052    }
053
054    /**
055     * @param inboundQueueBridges The inboundQueueBridges to set.
056     */
057    public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges) {
058        this.inboundQueueBridges = inboundQueueBridges;
059    }
060
061    /**
062     * @return Returns the outboundQueueBridges.
063     */
064    public OutboundQueueBridge[] getOutboundQueueBridges() {
065        return outboundQueueBridges;
066    }
067
068    /**
069     * @param outboundQueueBridges The outboundQueueBridges to set.
070     */
071    public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges) {
072        this.outboundQueueBridges = outboundQueueBridges;
073    }
074
075    /**
076     * @return Returns the localQueueConnectionFactory.
077     */
078    public QueueConnectionFactory getLocalQueueConnectionFactory() {
079        return localQueueConnectionFactory;
080    }
081
082    /**
083     * @param localQueueConnectionFactory The localQueueConnectionFactory to
084     *                set.
085     */
086    public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory) {
087        this.localQueueConnectionFactory = localConnectionFactory;
088    }
089
090    /**
091     * @return Returns the outboundQueueConnectionFactory.
092     */
093    public QueueConnectionFactory getOutboundQueueConnectionFactory() {
094        return outboundQueueConnectionFactory;
095    }
096
097    /**
098     * @return Returns the outboundQueueConnectionFactoryName.
099     */
100    public String getOutboundQueueConnectionFactoryName() {
101        return outboundQueueConnectionFactoryName;
102    }
103
104    /**
105     * @param outboundQueueConnectionFactoryName The
106     *                outboundQueueConnectionFactoryName to set.
107     */
108    public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName) {
109        this.outboundQueueConnectionFactoryName = foreignQueueConnectionFactoryName;
110    }
111
112    /**
113     * @return Returns the localConnectionFactoryName.
114     */
115    public String getLocalConnectionFactoryName() {
116        return localConnectionFactoryName;
117    }
118
119    /**
120     * @param localConnectionFactoryName The localConnectionFactoryName to set.
121     */
122    public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
123        this.localConnectionFactoryName = localConnectionFactoryName;
124    }
125
126    /**
127     * @return Returns the localQueueConnection.
128     */
129    public QueueConnection getLocalQueueConnection() {
130        return (QueueConnection) localConnection.get();
131    }
132
133    /**
134     * @param localQueueConnection The localQueueConnection to set.
135     */
136    public void setLocalQueueConnection(QueueConnection localQueueConnection) {
137        this.localConnection.set(localQueueConnection);
138    }
139
140    /**
141     * @return Returns the outboundQueueConnection.
142     */
143    public QueueConnection getOutboundQueueConnection() {
144        return (QueueConnection) foreignConnection.get();
145    }
146
147    /**
148     * @param outboundQueueConnection The outboundQueueConnection to set.
149     */
150    public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) {
151        this.foreignConnection.set(foreignQueueConnection);
152    }
153
154    /**
155     * @param outboundQueueConnectionFactory The outboundQueueConnectionFactory
156     *                to set.
157     */
158    public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory) {
159        this.outboundQueueConnectionFactory = foreignQueueConnectionFactory;
160    }
161
162    @Override
163    protected void initializeForeignConnection() throws NamingException, JMSException {
164
165        final QueueConnection newConnection;
166
167        if (foreignConnection.get() == null) {
168            // get the connection factories
169            if (outboundQueueConnectionFactory == null) {
170                // look it up from JNDI
171                if (outboundQueueConnectionFactoryName != null) {
172                    outboundQueueConnectionFactory = (QueueConnectionFactory)jndiOutboundTemplate
173                        .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class);
174                    if (outboundUsername != null) {
175                        newConnection = outboundQueueConnectionFactory
176                            .createQueueConnection(outboundUsername, outboundPassword);
177                    } else {
178                        newConnection = outboundQueueConnectionFactory.createQueueConnection();
179                    }
180                } else {
181                    throw new JMSException("Cannot create foreignConnection - no information");
182                }
183            } else {
184                if (outboundUsername != null) {
185                    newConnection = outboundQueueConnectionFactory
186                        .createQueueConnection(outboundUsername, outboundPassword);
187                } else {
188                    newConnection = outboundQueueConnectionFactory.createQueueConnection();
189                }
190            }
191        } else {
192            // Clear if for now in case something goes wrong during the init.
193            newConnection = (QueueConnection) foreignConnection.getAndSet(null);
194        }
195
196        if (outboundClientId != null && outboundClientId.length() > 0) {
197            newConnection.setClientID(getOutboundClientId());
198        }
199        newConnection.start();
200
201        outboundMessageConvertor.setConnection(newConnection);
202
203        // Configure the bridges with the new Outbound connection.
204        initializeInboundDestinationBridgesOutboundSide(newConnection);
205        initializeOutboundDestinationBridgesOutboundSide(newConnection);
206
207        // Register for any async error notifications now so we can reset in the
208        // case where there's not a lot of activity and a connection drops.
209        newConnection.setExceptionListener(new ExceptionListener() {
210            @Override
211            public void onException(JMSException exception) {
212                handleConnectionFailure(newConnection);
213            }
214        });
215
216        // At this point all looks good, so this our current connection now.
217        foreignConnection.set(newConnection);
218    }
219
220    @Override
221    protected void initializeLocalConnection() throws NamingException, JMSException {
222
223        final QueueConnection newConnection;
224
225        if (localConnection.get() == null) {
226            // get the connection factories
227            if (localQueueConnectionFactory == null) {
228                if (embeddedConnectionFactory == null) {
229                    // look it up from JNDI
230                    if (localConnectionFactoryName != null) {
231                        localQueueConnectionFactory = (QueueConnectionFactory)jndiLocalTemplate
232                            .lookup(localConnectionFactoryName, QueueConnectionFactory.class);
233                        if (localUsername != null) {
234                            newConnection = localQueueConnectionFactory
235                                .createQueueConnection(localUsername, localPassword);
236                        } else {
237                            newConnection = localQueueConnectionFactory.createQueueConnection();
238                        }
239                    } else {
240                        throw new JMSException("Cannot create localConnection - no information");
241                    }
242                } else {
243                    newConnection = embeddedConnectionFactory.createQueueConnection();
244                }
245            } else {
246                if (localUsername != null) {
247                    newConnection = localQueueConnectionFactory.
248                            createQueueConnection(localUsername, localPassword);
249                } else {
250                    newConnection = localQueueConnectionFactory.createQueueConnection();
251                }
252            }
253
254        } else {
255            // Clear if for now in case something goes wrong during the init.
256            newConnection = (QueueConnection) localConnection.getAndSet(null);
257        }
258
259        if (localClientId != null && localClientId.length() > 0) {
260            newConnection.setClientID(getLocalClientId());
261        }
262        newConnection.start();
263
264        inboundMessageConvertor.setConnection(newConnection);
265
266        // Configure the bridges with the new Local connection.
267        initializeInboundDestinationBridgesLocalSide(newConnection);
268        initializeOutboundDestinationBridgesLocalSide(newConnection);
269
270        // Register for any async error notifications now so we can reset in the
271        // case where there's not a lot of activity and a connection drops.
272        newConnection.setExceptionListener(new ExceptionListener() {
273            @Override
274            public void onException(JMSException exception) {
275                handleConnectionFailure(newConnection);
276            }
277        });
278
279        // At this point all looks good, so this our current connection now.
280        localConnection.set(newConnection);
281    }
282
283    protected void initializeInboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException {
284        if (inboundQueueBridges != null) {
285            QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
286
287            for (InboundQueueBridge bridge : inboundQueueBridges) {
288                String queueName = bridge.getInboundQueueName();
289                Queue foreignQueue = createForeignQueue(outboundSession, queueName);
290                bridge.setConsumer(null);
291                bridge.setConsumerQueue(foreignQueue);
292                bridge.setConsumerConnection(connection);
293                bridge.setJmsConnector(this);
294                addInboundBridge(bridge);
295            }
296            outboundSession.close();
297        }
298    }
299
300    protected void initializeInboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException {
301        if (inboundQueueBridges != null) {
302            QueueSession localSession = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
303
304            for (InboundQueueBridge bridge : inboundQueueBridges) {
305                String localQueueName = bridge.getLocalQueueName();
306                Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
307                bridge.setProducerQueue(activemqQueue);
308                bridge.setProducerConnection(connection);
309                if (bridge.getJmsMessageConvertor() == null) {
310                    bridge.setJmsMessageConvertor(getInboundMessageConvertor());
311                }
312                bridge.setJmsConnector(this);
313                addInboundBridge(bridge);
314            }
315            localSession.close();
316        }
317    }
318
319    protected void initializeOutboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException {
320        if (outboundQueueBridges != null) {
321            QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
322
323            for (OutboundQueueBridge bridge : outboundQueueBridges) {
324                String queueName = bridge.getOutboundQueueName();
325                Queue foreignQueue = createForeignQueue(outboundSession, queueName);
326                bridge.setProducerQueue(foreignQueue);
327                bridge.setProducerConnection(connection);
328                if (bridge.getJmsMessageConvertor() == null) {
329                    bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
330                }
331                bridge.setJmsConnector(this);
332                addOutboundBridge(bridge);
333            }
334            outboundSession.close();
335        }
336    }
337
338    protected void initializeOutboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException {
339        if (outboundQueueBridges != null) {
340            QueueSession localSession =
341                    connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
342
343            for (OutboundQueueBridge bridge : outboundQueueBridges) {
344                String localQueueName = bridge.getLocalQueueName();
345                Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
346                bridge.setConsumer(null);
347                bridge.setConsumerQueue(activemqQueue);
348                bridge.setConsumerConnection(connection);
349                bridge.setJmsConnector(this);
350                addOutboundBridge(bridge);
351            }
352            localSession.close();
353        }
354    }
355
356    protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
357                                              Connection replyToConsumerConnection) {
358        Queue replyToProducerQueue = (Queue)destination;
359        boolean isInbound = replyToProducerConnection.equals(localConnection.get());
360
361        if (isInbound) {
362            InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue);
363            if (bridge == null) {
364                bridge = new InboundQueueBridge() {
365                    protected Destination processReplyToDestination(Destination destination) {
366                        return null;
367                    }
368                };
369                try {
370                    QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
371                        .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
372                    Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
373                    replyToConsumerSession.close();
374                    bridge.setConsumerQueue(replyToConsumerQueue);
375                    bridge.setProducerQueue(replyToProducerQueue);
376                    bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
377                    bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
378                    bridge.setDoHandleReplyTo(false);
379                    if (bridge.getJmsMessageConvertor() == null) {
380                        bridge.setJmsMessageConvertor(getInboundMessageConvertor());
381                    }
382                    bridge.setJmsConnector(this);
383                    bridge.start();
384                    LOG.info("Created replyTo bridge for " + replyToProducerQueue);
385                } catch (Exception e) {
386                    LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
387                    return null;
388                }
389                replyToBridges.put(replyToProducerQueue, bridge);
390            }
391            return bridge.getConsumerQueue();
392        } else {
393            OutboundQueueBridge bridge = (OutboundQueueBridge)replyToBridges.get(replyToProducerQueue);
394            if (bridge == null) {
395                bridge = new OutboundQueueBridge() {
396                    protected Destination processReplyToDestination(Destination destination) {
397                        return null;
398                    }
399                };
400                try {
401                    QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
402                        .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
403                    Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
404                    replyToConsumerSession.close();
405                    bridge.setConsumerQueue(replyToConsumerQueue);
406                    bridge.setProducerQueue(replyToProducerQueue);
407                    bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
408                    bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
409                    bridge.setDoHandleReplyTo(false);
410                    if (bridge.getJmsMessageConvertor() == null) {
411                        bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
412                    }
413                    bridge.setJmsConnector(this);
414                    bridge.start();
415                    LOG.info("Created replyTo bridge for " + replyToProducerQueue);
416                } catch (Exception e) {
417                    LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
418                    return null;
419                }
420                replyToBridges.put(replyToProducerQueue, bridge);
421            }
422            return bridge.getConsumerQueue();
423        }
424    }
425
426    protected Queue createActiveMQQueue(QueueSession session, String queueName) throws JMSException {
427        return session.createQueue(queueName);
428    }
429
430    protected Queue createForeignQueue(QueueSession session, String queueName) throws JMSException {
431        Queue result = null;
432        try {
433            result = session.createQueue(queueName);
434        } catch (JMSException e) {
435            // look-up the Queue
436            try {
437                result = (Queue)jndiOutboundTemplate.lookup(queueName, Queue.class);
438            } catch (NamingException e1) {
439                String errStr = "Failed to look-up Queue for name: " + queueName;
440                LOG.error(errStr, e);
441                JMSException jmsEx = new JMSException(errStr);
442                jmsEx.setLinkedException(e1);
443                throw jmsEx;
444            }
445        }
446        return result;
447    }
448
449}