001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.ra;
018
019import java.io.Serializable;
020
021import javax.jms.BytesMessage;
022import javax.jms.Destination;
023import javax.jms.JMSException;
024import javax.jms.MapMessage;
025import javax.jms.Message;
026import javax.jms.MessageConsumer;
027import javax.jms.MessageListener;
028import javax.jms.MessageProducer;
029import javax.jms.ObjectMessage;
030import javax.jms.Queue;
031import javax.jms.QueueBrowser;
032import javax.jms.QueueReceiver;
033import javax.jms.QueueSender;
034import javax.jms.QueueSession;
035import javax.jms.Session;
036import javax.jms.StreamMessage;
037import javax.jms.TemporaryQueue;
038import javax.jms.TemporaryTopic;
039import javax.jms.TextMessage;
040import javax.jms.Topic;
041import javax.jms.TopicPublisher;
042import javax.jms.TopicSession;
043import javax.jms.TopicSubscriber;
044
045/**
046 * A {@link Session} implementation which can be used with the ActiveMQ JCA
047 * Resource Adapter to publish messages using the same JMS session that is used
048 * to dispatch messages.
049 * 
050 * 
051 */
052public class InboundSessionProxy implements Session, QueueSession, TopicSession {
053
054    private InboundContext sessionAndProducer;
055
056    public Session getSession() throws JMSException {
057        return getSessionAndProducer().getSession();
058    }
059
060    public QueueSession getQueueSession() throws JMSException {
061        Session session = getSession();
062        if (session instanceof QueueSession) {
063            return (QueueSession)session;
064        } else {
065            throw new JMSException("The underlying JMS Session does not support QueueSession semantics: " + session);
066        }
067    }
068
069    public TopicSession getTopicSession() throws JMSException {
070        Session session = getSession();
071        if (session instanceof TopicSession) {
072            return (TopicSession)session;
073        } else {
074            throw new JMSException("The underlying JMS Session does not support TopicSession semantics: " + session);
075        }
076    }
077
078    public InboundContext getSessionAndProducer() throws JMSException {
079        if (sessionAndProducer == null) {
080            sessionAndProducer = InboundContextSupport.getActiveSessionAndProducer();
081            if (sessionAndProducer == null) {
082                throw new JMSException("No currently active Session. This JMS provider cannot be used outside a MessageListener.onMessage() invocation");
083            }
084        }
085        return sessionAndProducer;
086    }
087
088    public MessageProducer createProducer(Destination destination) throws JMSException {
089        return new InboundMessageProducerProxy(getSessionAndProducer().getMessageProducer(), destination);
090    }
091
092    public void close() throws JMSException {
093        // we don't allow users to close this session
094        // as its used by the JCA container
095    }
096
097    public void commit() throws JMSException {
098        // the JCA container will handle transactions
099    }
100
101    public void rollback() throws JMSException {
102        // the JCA container will handle transactions
103    }
104
105    public void recover() throws JMSException {
106        // the JCA container will handle recovery
107    }
108
109    public void run() {
110        try {
111            getSession().run();
112        } catch (JMSException e) {
113            throw new RuntimeException("Failed to run() on session due to: " + e, e);
114        }
115    }
116
117    // Straightforward delegation methods
118    // -------------------------------------------------------------------------
119
120    public QueueBrowser createBrowser(Queue queue) throws JMSException {
121        return getSession().createBrowser(queue);
122    }
123
124    public QueueBrowser createBrowser(Queue queue, String s) throws JMSException {
125        return getSession().createBrowser(queue, s);
126    }
127
128    public BytesMessage createBytesMessage() throws JMSException {
129        return getSession().createBytesMessage();
130    }
131
132    public MessageConsumer createConsumer(Destination destination) throws JMSException {
133        return getSession().createConsumer(destination);
134    }
135
136    public MessageConsumer createConsumer(Destination destination, String s) throws JMSException {
137        return getSession().createConsumer(destination, s);
138    }
139
140    public MessageConsumer createConsumer(Destination destination, String s, boolean b) throws JMSException {
141        return getSession().createConsumer(destination, s, b);
142    }
143
144    public TopicSubscriber createDurableSubscriber(Topic topic, String s) throws JMSException {
145        return getSession().createDurableSubscriber(topic, s);
146    }
147
148    public TopicSubscriber createDurableSubscriber(Topic topic, String s, String s1, boolean b) throws JMSException {
149        return getSession().createDurableSubscriber(topic, s, s1, b);
150    }
151
152    public MapMessage createMapMessage() throws JMSException {
153        return getSession().createMapMessage();
154    }
155
156    public Message createMessage() throws JMSException {
157        return getSession().createMessage();
158    }
159
160    public ObjectMessage createObjectMessage() throws JMSException {
161        return getSession().createObjectMessage();
162    }
163
164    public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
165        return getSession().createObjectMessage(serializable);
166    }
167
168    public Queue createQueue(String s) throws JMSException {
169        return getSession().createQueue(s);
170    }
171
172    public StreamMessage createStreamMessage() throws JMSException {
173        return getSession().createStreamMessage();
174    }
175
176    public TemporaryQueue createTemporaryQueue() throws JMSException {
177        return getSession().createTemporaryQueue();
178    }
179
180    public TemporaryTopic createTemporaryTopic() throws JMSException {
181        return getSession().createTemporaryTopic();
182    }
183
184    public TextMessage createTextMessage() throws JMSException {
185        return getSession().createTextMessage();
186    }
187
188    public TextMessage createTextMessage(String s) throws JMSException {
189        return getSession().createTextMessage(s);
190    }
191
192    public Topic createTopic(String s) throws JMSException {
193        return getSession().createTopic(s);
194    }
195
196    public int getAcknowledgeMode() throws JMSException {
197        return getSession().getAcknowledgeMode();
198    }
199
200    public MessageListener getMessageListener() throws JMSException {
201        return getSession().getMessageListener();
202    }
203
204    public boolean getTransacted() throws JMSException {
205        return getSession().getTransacted();
206    }
207
208    public void setMessageListener(MessageListener messageListener) throws JMSException {
209        getSession().setMessageListener(messageListener);
210    }
211
212    public void unsubscribe(String s) throws JMSException {
213        getSession().unsubscribe(s);
214    }
215
216    public QueueReceiver createReceiver(Queue queue) throws JMSException {
217        return getQueueSession().createReceiver(queue);
218    }
219
220    public QueueReceiver createReceiver(Queue queue, String s) throws JMSException {
221        return getQueueSession().createReceiver(queue, s);
222    }
223
224    public QueueSender createSender(Queue queue) throws JMSException {
225        return new InboundMessageProducerProxy(getSessionAndProducer().getMessageProducer(), queue);
226    }
227
228    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
229        return getTopicSession().createSubscriber(topic);
230    }
231
232    public TopicSubscriber createSubscriber(Topic topic, String s, boolean b) throws JMSException {
233        return getTopicSession().createSubscriber(topic, s, b);
234    }
235
236    public TopicPublisher createPublisher(Topic topic) throws JMSException {
237        return getTopicSession().createPublisher(topic);
238    }
239
240    public String toString() {
241        try {
242            return "InboundSessionProxy { " + getSession() + " }";
243        } catch (JMSException e) {
244            return "InboundSessionProxy { null }";
245        }
246    }
247
248}